Notice
Recent Posts
Recent Comments
Link
«   2025/08   »
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31
Tags
more
Archives
Today
Total
관리 메뉴

Kuma's Curious Paradise

[이룸] 240308 SSE 알림 기능 본문

이룸 프로젝트

[이룸] 240308 SSE 알림 기능

쿠마냥 2024. 3. 13. 11:51

알림 기능 개발의 이유

이룸은 학습 챌린지 커뮤니티 서비스로, 사용자의 패턴은 크게 두 가지로 나눠진다. 첫 번째는 챌린지를 만들고 관리하기. 두 번째는 챌린지를 신청하고 참여하기. 즉, 인증글 올리기다. 챌린지에 다른 사람이 신청했는지 아닌지, 인증글이 승인됐는지 안 됐는지 계속해서 확인해야 한다면 사용자 부담은 매우 커질 것이다. 따라서 이를 위한 알림 기능의 개발이 필요했다.

더불어, 이룸은 사용자가 챌린지를 포기하지 않고 이어갈 수 있도록 도와주어야 한다. 우리의 모토는 ‘혼자가 아니라 함께 이룸’이기 때문이다. 따라서 위에 제시한 사용자 편의성을 달성하고 나면, 이후 동기 부여를 위한 알림 기능이 개발되어야 할 것이다. 때가 되면 인증을 올리도록 알림을 보내고, 아무 챌린지에도 참여하고 있지 않다면 흥미로운 챌린지를 보내 주어야 한다.


알림 기능은 어떤 방법으로 개발되어야 할까?

[백엔드 스프링부트] 알림 기능은 어떻게 구현하는게 좋을까?

  1. Polling : 일정 주기에 한 번씩 서버에 요청을 보내 알림(응답)을 받는 방식. 구현은 쉬우나 실시간 데이터 갱신이 안 될 뿐더러 갱신 사항이 없음에도 요청을 보내기 때문에 서버 부하가 발생.
  2. Long-Polling : 서버로 요청 들어올 경우, 대기하였다가 갱신 사항이 생기면 알림(응답)을 보내줌. 연결이 된 경우 실시간으로 데이터를 받을 수 있으며, 불필요한 응답을 주는 경우도 줄어듦. 그러나 요청-응답을 계속해야 하므로 갱신사항 많을수록 서버 부하는 큼.
  3. SSE : 웹 브라우저에서 서버로 특정 이벤트를 구독(subscribe)하면, 이벤트 발생 시 알림을 보내주는 방식. 연결 요청 한 번으로 데이터를 계속해서 보낼 수 있지만, 서버에서 브라우저로만 데이터 전송이 가능한 단방향 통신임.
  4. Web Socket : 웹 브라우저와 서버 사이 양방향 통신이 가능한 방식.


SSE 선택의 이유

  1. 단방향 통신 : 서버에서 일어난 일을 브라우저에 보내기만 하면 되므로, 굳이 양방향으로 통신하여 비용을 증가시킬 필요가 없음.
  2. 간단한 구현: 웹소켓과 달리, HTTP 프로토콜 사용하기 때문에 비교적 구현이 간단함. 따라서 개발 비용과 시간을 절약할 수 있으며 유지보수도 상대적으로 간편.


SSE(Server-Sent Events)로 알림 기능을 구현할 때 알아야 할 주요 개념

1. SSE 프로토콜 이해

  • 통신 방식: 클라이언트는 JavaScript의 ‘**EventSource**’ 인터페이스를 사용해서 서버에 연결을 요청. 이때 HTTP 프로토콜이 사용됨. → 사용자가 특정 웹페이지를 방문하면 SSE 연결이 요청되도록 코드를 짜는 것. → 서버는 요청을 수락하고 200 OK와 함께 ‘**text/event-stream’** 형태의 **‘Content-Type’**으로 응답. → 이제부터 SSE 사용하여 새로운 정보가 있을 때마다 데이터 스트림을 통해 정보를 전송하겠다는 뜻! → 서버의 메시지는 ‘data: ‘로 시작하고, 메시지의 끝은 두 개의 개행문자 (\n\n)로 표시됨. → 혹여나 연결이 끊어지면 클라이언트에서 자동으로 서버에 재연결을 시도함.
  • **SseEmitter** : 스프링에서 SSE를 구현하는 데 사용되는 클래스. 클라이언트와 서버를 연결해 주는 무언가, 정도로 생각하는 것이 편함. emit은 뿌린다라는 뜻으로, Server에서 Sent하는 Event를 Emit, 뿌려주는 객체임.

2. 메시지 포맷팅

  • **text/event-stream** : SSE를 사용할 때 HTTP 응답의 Content-Type으로 지정하는 MIME(multipurpose Internet Mail Extensions) 타입. 이 타입은 서버가 클라이언트로 텍스트 스트림을 실시간으로 전송하는 데 사용됨. 실시간 뉴스 피딩, 스포츠 점수 업데이트, 주식 시세 변경 알림 등.

    클라이언트는 요청 Accept 헤더에 ‘text/event-stream’을 지정 → 서버는 이 요청을 받고 Content-Type 헤더를 text/event-stream으로 설정 후 클라이언트에게 응답. 이 응답은 연결을 열린 상태로 유지하여 새로운 데이터가 생길 때마다 이 연결을 통해 데이터를 클라이언트에게 전송.

    데이터는 event: (이벤트 타입을 지정), data: (실제 데이터를 전송), id: (이벤트 ID를 지정) 등의 필드를 사용하여 전송됩니다. 각 이벤트 메시지는 두 개의 연속된 줄바꿈 문자(\\n\\n)로 종료됨.
  • 데이터 전송: 메시지는 data: 필드로 시작해야 하며, 메시지의 끝은 두 개의 연속된 개행 문자(\\n\\n)로 표시됨.
  • 이벤트 ID: id: 필드를 사용하여 메시지에 고유 ID를 설정할 수 있고, 이는 클라이언트의 재연결 시 마지막 수신된 이벤트 이후로 메시지를 다시 전송하기 위해 사용됨.
  • 이벤트 타입: event: 필드를 사용하여 메시지 유형을 지정할 수 있으며, 클라이언트는 이 정보를 사용하여 다른 유형의 메시지를 다르게 처리할 수 있음.

3. 연결 관리

  • 자동 재연결: 클라이언트가 자동으로 서버에 재연결을 시도할 수 있도록 retry: 필드를 설정하여 재연결 시간 간격을 제어할 수 있습니다.
  • 동시 연결 관리: 서버는 동시에 여러 클라이언트와의 연결을 관리할 수 있어야 합니다. 이를 위해 비동기 처리 모델과 스레드, 프로세스 관리가 중요합니다.
  • 이벤트 캐싱 : 이벤트 또는 메시지를 일시적으로 저장해 두는 메커니즘. 놓친 이벤트를 추후에 재전송하기 위함.


Notification 클래스

@Entity
@Getter
@Builder
@AllArgsConstructor
// 기본 생성자 접근 레벨을 protected로 설정하여, 외부에서 무분별하게 객체 생성하는 것을 방지
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Notification {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @Column(name="notification_id")
    private Long id;

    @Column(nullable = false)
    private Long challengeId;

    @Column
    private Long authId;

    @Column(nullable = false)
    private String content;

//  이후 notification content 내용이 복잡해질 것을 고려하여 @Embedded 추가해 놓음 
//    @Embedded
//    private NotificationContent content;

    // 해당 필드를 문자열 형태로 데이터베이스에 저장하도록 지정 
    @Enumerated(EnumType.STRING)
    @Column(nullable = false)
    // 알림의 유형을 나타냄 (NotificationType 클래스 참조)
    private NotificationType notificationType;

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "member_id")
    // 회원 삭제될 때, 연결된 알림들도 자동 삭제됨. 
    @OnDelete(action = OnDeleteAction.CASCADE)
    private Member receiver;

    public Notification(String content, Long challengeId, NotificationType notificationType, Member member) {
        this.content = content;
        this.challengeId = challengeId;
        this.notificationType = notificationType;
        this.receiver = member;
    }

    public Notification(String content, Long challengeId, Long authId, NotificationType notificationType, Member member) {
        this.content = content;
        this.challengeId = challengeId;
        this.authId = authId;
        this.notificationType = notificationType;
        this.receiver = member;
    }

    public Member getMember() {
        return receiver;
    }
}

NotificationType 클래스

public enum NotificationType {
    REGISTER,
    APPROVE,
    DENY;
}

NotificationRepository 클래스

public interface NotificationRepository extends JpaRepository<Notification, Long> {
}


EmitterRepository 클래스

: 현재는 HashMap으로 Emitter와 이벤트를 저장하는 형식으로 구현하였으나, 추후 다른 방식으로 구현할 수 있기 때문에 유연한 전환을 위해 interface 생성 후 그 아래에 구현체로 구현하는 방식을 채택

@Repository
public interface EmitterRepository {
    SseEmitter save(String emitterId, SseEmitter sseEmitter);
    void saveEventCache(String emitterId, Object event);
    Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId);
    Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId);
    void deleteById(String id);
    void deleteAllEmitterStartWithId(String memberId);
    void deleteAllEventCacheStartWithId(String memberId);
}
  • Emitter Repository는 RDBS에 저장할 수도, NoSQL에 저장할 수도 있음. 지금 리팩토링을 생각하고 있는 부분은 Redis인데, key-value 저장소로 널리 사용되며 데이터 접근 시간을 대폭 줄여서 서버 부하를 줄이고 반응 속도를 높일 수 있기 때문. 대신 용량이 적고 데이터 복구가 어려운데, 이 부분이 추후 문제가 될 수 있음.
  • RabbitMQ는 데이터베이스는 아니지만 경량 메시지 브로커 시스템으로 복잡한 메시지 기능을 구현할 때 사용. 메시지를 임시로 보관하고 애플리케이션 간 메시지 전송에 초점을 맞춰 나온 메시지 관리소인 셈. 제대로 알아보지는 못했지만 양방향의 세밀한 알림 기능이 필요하다면 더 찾아볼 예정. → 예를 들어, 좋아요나 댓글이 달리면 챌린지나 인증글 작성자는 이에 대한 알림을 받고 작성자가 댓글에 답변하면 해당 사용자에게도 알림이 가야할 때


EmitterRepositoryImpl 클래스

@Repository
public class EmitterRepositoryImpl implements EmitterRepository {
		// String이 key,SseEmitter가 value인 HashMap.
    private final Map<String, SseEmitter> emitters = new HashMap<>();
    // 발생한 이벤트를 캐싱하기 위한 HashMap. evnetId를 key로, 이벤트 객체를 value로 저장. 
    private final Map<String, Object> eventCache = new HashMap<>();

		// 새 SseEmitter를 저장하거나 기존 것을 업데이트. 
    @Override
    public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
        return sseEmitter;
    }

		// 특정 이벤트 ID에 대한 이벤트 객체를 캐시에 저장
    @Override
    public void saveEventCache(String eventCacheId, Object event) {
        eventCache.put(eventCacheId, event);
    }

		// memberId로 시작하는 모든 SSE 연결을 검색 후 반환 
    @Override
    public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().split("_")[0].equals(memberId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

		// memberId로 시작하는 모든 이벤트 캐시 검색 후 반환 
    @Override
    public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
        return eventCache.entrySet().stream()
                .filter(entry -> entry.getKey().split("_")[0].equals(memberId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

		// id에 해당하는 SseEmitter 삭제 
    @Override
    public void deleteById(String id) {
        emitters.remove(id);
    }

		// memberId로 시작하는 모든 SSE 연결 종료 
    @Override
    public void deleteAllEmitterStartWithId(String memberId) {
        Set<String> keysToDelete = emitters.keySet().stream()
                .filter(key -> key.startsWith(memberId))
                .collect(Collectors.toSet());
        keysToDelete.forEach(emitters::remove);
    }

		// memberId로 시작하는 모든 이벤트 캐시 삭제 
    @Override
    public void deleteAllEventCacheStartWithId(String memberId) {
        Set<String> keysToDelete = eventCache.keySet().stream()
                .filter(key -> key.startsWith(memberId))
                .collect(Collectors.toSet());
        keysToDelete.forEach(eventCache::remove);
    }

		// emitterId에 해당하는 SseEmitter 반환 
    public SseEmitter findByEmitterId(String emitterId) {
        return emitters.get(emitterId);
    }
}
  • **emitters.entrySet().stream().filter(entry -> entry.getKey().split("_")[0].equals(memberId)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));** : emitters.entrySet().stream()으로 emitters를 스트림으로 변환. 이제 함수형 방식으로 처리 가능 → filter(…)로 key를 _를 기준으로 분리한 다음, [0]번째 요소가 파라미터로 들어온 memberId와 같은지 확인 → collect(…)를 호출하여 스트림을 다시 맵으로 변환하는데, 이때 맵의 키-밸류는 각각 원본의 키-밸류로 설정됨.
  • **HashMap** 이란?
    • 순서 보장 X (요소를 삽입한 순서대로 조회되지 않음)
    • 검색 속도 빠름 (key를 받으면 바로 접근 가능. 따라서 업데이트도 빠름)
    • 자동으로 크기를 조정하기 때문에 데이터 양이 많아져도 성능 유지 (사용자 수나 알림이 늘어나도 ok)
    • 멀티스레드 환경에서는 스레드 안전하지 않아서, 이후 멀티스레드 도입할 경우 변경 필요
  • HashMap은 데이터를 key-value 쌍으로 저장하며, 각 키는 모두 유니크함. 내부적으로 버킷 배열을 사용하여 데이터를 저장하는데, 각 키의 해시 코드를 사용해서 해당 키가 저장될 버킷을 결정함.


NotificationRequestDto 클래스

@Getter
@Builder
public class NotificationRequestDto {
    private Long challengeId;
    private Long authId;
    private NotificationType notificationType;
    private String content;
    private Member receiver;
	}

NotificationResponseDto 클래스

@Getter
public class NotificationResponseDto {
    private Long notificationId;
    private Long challengeId;
    private Long authId;
    private NotificationType notificationType;
    private String content;

    public NotificationResponseDto(Notification notification) {
        this.notificationId = notification.getId();
        this.challengeId = notification.getChallengeId();
        this.authId = notification.getAuthId();
        this.notificationType = notification.getNotificationType();
        this.content = notification.getContent();
    }

    // 정적 팩토리 메서드 of
    public static NotificationResponseDto of(Notification notification) {
        return new NotificationResponseDto(notification);
    }
}
  • **of 메서드**란?’정적 팩토리 메서드’는 객체 생성 과정을 캡슐화하는 정적(static) 메서드를 통해 팩토리(factory)처럼 동작하여 객체를 생성함. 이름이 of인 이유는 Notification 객체를 파라미터로 받아 NotificationResponseDto를 만들겠다는 의도가 명확하게 보이기 때문.
  • 생성자는 클래스 이름과 동일해야 하므로, 객체 생성 시 의도를 명확히 전달하기 어려움. 이럴 때 메서드 이름을 통해 생성 로직의 의도를 더욱 명확하게 전달할 수 있는 방법이 있는데, 바로 정적 팩토리 메서드 of!
  • **캡슐화**란?정적 팩토리 메서드는 객체가 어떻게 생성되는지 숨기는 기능이 있으며, 구현 클래스 대신 인터페이스나 상위 클래스 반환도 가능해서 구현 클래스를 변경할 경우 영향을 최소화함. of를 쓰는 사람은 of만 알면 필요한 객체를 얻을 수 있음.
  • 캡슐화는 객체의 내부 구조와 구현 로직을 외부로부터 숨겨서, 객체 간 느슨한 결합을 가능하게 하고 객체의 데이터를 보호함. 외부의 영향으로부터 내부를 보호하는 것이 핵심.


NotificationController 클래스

@RestController
@RequestMapping("/api/notifications")
@RequiredArgsConstructor
public class NotificationController {
    private final NotificationService notificationService;

    /**
     * 클라이언트의 알림 구독 요청을 처리하고 Server-Sent Events(SSE) 스트림 반환.
     *
     * @param userDetails 현재 사용자의 인증된 정보를 포함하는 UserDetailsImpl 객체
     * @param lastEventId 클라이언트가 마지막으로 수신한 이벤트의 ID
     * @param response    HTTP 응답 객체
     * @return Server-Sent Events(SSE) 스트림을 포함하는 ResponseEntity
     */
    @GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> subscribe(@AuthenticationPrincipal UserDetailsImpl userDetails,
                                @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId, HttpServletResponse response) {
        return new ResponseEntity<>(notificationService.subscribe(userDetails.getMember(), lastEventId, response), HttpStatus.OK);
    }
}
  • **produces = MediaType.TEXT_EVENT_STREAM_VALUE** : 컨트롤러 메서드가 생성(produce)하는 HTTP 응답의 Content-Type을 지정하는 속성. 꼭 포함해야 하는 설정!!!
  • @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId: HTTP 요청 헤더 중 "Last-Event-ID"의 값을 String 타입으로 주입받음. 이 값은 클라이언트가 마지막으로 수신한 이벤트의 ID를 나타내며, 이벤트 스트림이 중단된 후 재연결 시 사용됨. 필수 값은 아니며 기본값은 빈 문자열.


NotificationService 클래스

@Slf4j
@Service
@RequiredArgsConstructor
public class NotificationService {
    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60; // 기본 타임아웃 설정
    private final EmitterRepositoryImpl emitterRepository;
    private final NotificationRepository notificationRepository;

    /**
     * 클라이언트가 SSE를 구독하는 서비스 메서드.
     * 새로운 SseEmitter를 생성하고 Nginx 버퍼링 방지를 위해 필요한 Http 헤서를 설정.
     * 클라이언트가 놓친 이벤트도 함께 전송
     *
     * @param member 로그인한 유저 객체
     * @param lastEventId 유저가 마지막으로 수신한 이벤트의 id
     * @param response http 응답 객체
     * @return 구독한 클라이언트와 연결된 emitter 객체
     */
    public SseEmitter subscribe(Member member, String lastEventId, HttpServletResponse response) {
        // Java에서는 SseEmitter 클래스를 통해 SSE를 구현할 수 있음. 
        // SseEmitter 객체는 한 클라이언트에 대한 연결을 나타내며, 이 객체를 통해 서버에서 클라이언트로 데이터 스트리밍 가능. 
        // DEFAULT_TIMEOUT은 SseEmitter의 최대 수명 정의. 이 시간 지나면 연결 자동 종료
        String emitterId = makeTimeIncludeId(member);
        SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);

				// NIGNX 프록시 서버를 사용할 경우 필요한 설정 
        response.setHeader("X-Accel-Buffering", "no"); // NGINX PROXY 에서의 필요설정 불필요한 버퍼링방지

				// SseEmitter의 완료, 타임아웃, 에러 상황 발생 시 실행될 로직을 정의. 
        emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
        emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
        emitter.onError((e) -> log.error("SSE Emitter Error: ", e));

				// 위에서 생성한 emitter를 저장
        emitterRepository.save(emitterId, emitter);

        // 클라이언트에게 첫 연결 시 503 에러를 방지하기 위해 더미 이벤트 전송
        String eventId = makeTimeIncludeId(member);
        sendDummyData(eventId, emitter, emitterId, "EventStream Created. [memberId=" + member.getMemberId() + "]");

        // 클라이언트가 마지막으로 수신한 이벤트 이후 발생한 모든 이벤트를 재전송. 
        // 네트워크 문제나 기타 이유로 이벤트가 유실되었을 경우를 대비. 
        resendLostData(lastEventId, member, emitter);
        return emitter;
    }

    /**
     * 알림 전송 기능을 세 개의 메서드로 나누어 설계.
     * send() : 알림 전송 프로세스를 시작하는 공개 메서드
     * sendNotification() : 저장됨 알림 객체와 함께 클라이언트에게 알림을 전송
     * sendToClient() : 실제로 클라이언트에게 SseEmitter를 통해 알림을 전송
     *
     * @param requestDto 알림에 필요한 정보들을 담은 request dto
     */
     
    // 알림 데이터를 저장하고, 저장된 알림 객체를 전송하기 위해 sendNotification() 호출. 
    // 데이터 저장과 전송의 두 단계를 명확히 구분하기 위함.
    public void send(NotificationRequestDto requestDto) {
        Notification notification = saveNotification(requestDto);
        sendNotification(notification);
    }

    /**
     * 알림 전송 서비스 메서드. 주어진 정보를 바탕으로 특정 클라이언트에게 알림을 전송.
     * 전송 실패 시, 연결된 emitter 객체 삭제
     *
     * @param notification 알림 객체
     */
     
	  // 특정 수신자에게 등록된 모든 SseEmitter를 조회하고, 각각에 대해 알림 응답 객체를 생성하여 전송. 
	  // 알림 저장과 전송 로직을 분리하여 독립적으로 관리하기 위함. 
    private void sendNotification( Notification notification) {
        String receiverId = String.valueOf(notification.getMember().getMemberId());
        // 유저의 모든 SseEmitter 가져옴
        Map<String, SseEmitter> emitters = emitterRepository
                .findAllEmitterStartWithByMemberId(receiverId);
        emitters.forEach((key, emitter) -> {
            NotificationResponseDto responseDto = NotificationResponseDto.of(notification);
            // 데이터 캐시 저장 (유실된 데이터 처리 위함)
            emitterRepository.saveEventCache(key, responseDto);
            // 데이터 전송
            sendToClient(key, responseDto);
        });
    }

    /**
     * 실제로 클라이언트에게 SseEmitter를 통해 알림을 전송하는 메서드 
     * emitter를 사용하여 실시간으로 알림 전송
     *
     * @param emitterId 해당 클라이언트와 연결시켜주는 emitter 식별자
     * @param responseDto 알림 응답 객체
     */
    private void sendToClient(String emitterId, NotificationResponseDto responseDto) {
        SseEmitter emitter = emitterRepository.findByEmitterId(emitterId);
        // 해당 emitterId를 가진 클라이언트와의 연결이 존재하면, 클라이언트에게 알림 전송 
        if (emitter != null) {
            try {
                log.info("Sending notification to client: {}", responseDto);
                // SseEmitter.event()로 이벤트를 생성. 
                // 이 때, 이벤트id, 이벤트 이름(여기서는 "sse"), 전송할 데이터를 지정. 
                // .data()는 실제로 전송될 데이터를 지정. 객체를 json 형태로 자동 변환하여 전송. 
                emitter.send(SseEmitter.event().id(emitterId).name("sse").data(responseDto));
                log.info("Notification sent successfully");
            } catch (IOException e) {
                emitterRepository.deleteById(emitterId);
                log.error("Failed to send notification", e);
                throw new RuntimeException("Connection error!");
            }
        } else {
            log.warn("No emitter found for ID: {}", emitterId);
        }
    }

    /**
     * 회원 ID와 현재 시간을 조합하여 고유한 식별자를 생성하는 메서드
     * 각 회원을 특정 이벤트와 연결시킴
     *
     * @param member 알림을 수신할 회원 객체
     * @return 생성된 고유 식별자 문자열
     */
    private String makeTimeIncludeId(Member member) {
        return member.getMemberId() + "_" + System.currentTimeMillis();
    }

    /**
     * 회원이 놓친 이벤트를 전송하는 메서드
     *
     * @param lastEventId 회원이 마지막으로 수신한 이벤트 id
     * @param member
     * @param emitter
     */
    private void resendLostData(String lastEventId, Member member, SseEmitter emitter) {
        // 놓친 이벤트가 있다면
        if (!lastEventId.isEmpty()) {
            // 멤버 아이디를 기준으로 캐시된 모든 이벤트를 가져온다.
            Map<String, Object> cachedEvents = emitterRepository.findAllEventCacheStartWithByMemberId(String.valueOf(member.getMemberId()));

            // 모든 이벤트를 순회하며
            for (Map.Entry<String, Object> entry : cachedEvents.entrySet()) {
                // lastEventId보다 큰 ID(뒷 시간에 일어난 이벤트)만 필터링하여
                if (lastEventId.compareTo(entry.getKey()) < 0) {
                    try {
                        // 재전송한다.
                        emitter.send(SseEmitter.event().id(entry.getKey()).data(entry.getValue()));
                    } catch (IOException e) {
                        log.error("Resending lost data failed for memberId: {}", member.getMemberId(), e);
                    }
                }
            }
        }
    }

    /**
     * 클라이언트 연결 초기에 503 에러가 뜨지 않도록 더미 데이터를 전송하는 메서드
     * 연결이 성공적으로 이루어졌는지 확인
     *
     * @param emitterId 해당 클라이언트와 고유하게 연결된 emitter id
     * @param emitter
     * @param eventId
     * @param data 더미 데이터
     */
    private void sendDummyData(String emitterId, SseEmitter emitter, String eventId, Object data) {
        try {
            emitter.send(SseEmitter.event()
                    .id(eventId)
                    .data(data));
        } catch (IOException exception) {
            emitterRepository.deleteById(emitterId);
        }
    }

    /**
     * 새 알림 생성하고 저장하는 서비스 메서드
     *
     * @param requestDto 알림 생성에 필요한 정보를 담고 있는 dto
     * @return 알림 객체
     */
    @Transactional
    protected Notification saveNotification(NotificationRequestDto requestDto) {
        Notification notification = Notification.builder()
                .receiver(requestDto.getReceiver())
                .notificationType(requestDto.getNotificationType())
                .content(requestDto.getContent())
                .challengeId(requestDto.getChallengeId())
                .authId(requestDto.getAuthId())
                .build();
        return notificationRepository.save(notification);
    }
}


이렇게 만들어진 notification을 직접 사용하기

@Transactional
    public AuthDataResponseDto updateLeaderAuth(AuthLeaderRequestDto requestDto, Long challengeId, Long authId, Member loginMember) { // 챌린지 인증 허가 및 불가 처리(leader)
        Challenge challenge = challengeRepository.findById(challengeId).orElseThrow(
                () -> new IllegalArgumentException("해당 챌린지를 신청하지 않습니다.")
        );
        Auth auth = authRepository.findById(authId).orElseThrow(
                () -> new IllegalArgumentException("해당 인증이 존재하지 않습니다.")
        );
        Member member = memberRepository.findById(loginMember.getMemberId()).orElseThrow(
                () -> new IllegalArgumentException("멤버가 존재하지 않습니다.")
        );
        Optional<Challenger> challengerOptional = challengerRepository.findByChallengeAndMember(challenge, member);
        if (challengerOptional.isPresent()) {
            try {
                if (challengerOptional.get().getMember().getMemberId() != member.getMemberId()) {
                    return new AuthDataResponseDto(null, "해당 인증과 관련있는 맴버가 아닙니다.", HttpStatus.BAD_REQUEST);
                }
                if (challengerOptional.get().getRole() == ChallengerRole.LEADER) {
                    auth.leaderUpdate(auth, requestDto);
                    auth.getChallenger().getMember().incrementBricksCount();
                    AuthResponseDto responseDto = new AuthResponseDto(auth);

                    **// 알림 전송 로직
                    if (requestDto.getAuthStatus().equals("APPROVED")) {
                        String content = auth.getChallenger().getMember().getNickname() + "님의 인증글이 승인되었습니다.";
                        notify(auth.getChallenger().getMember(), NotificationType.APPROVE, content, challengeId, authId);
                    } else if (requestDto.getAuthStatus().equals("DENIED")) {
                        String content = auth.getChallenger().getMember().getNickname() + "님의 인증글이 인증 조건을 만족시키지 못하였습니다. 인증글을 수정하여 주세요.";
                        notify(auth.getChallenger().getMember(), NotificationType.DENY, content, challengeId, authId);
                    }**
                    return new AuthDataResponseDto(responseDto, "챌린지 상태 수정 성공", HttpStatus.CREATED);
                } else {
                    return new AuthDataResponseDto(null, "해당 권한이 없습니다.", HttpStatus.BAD_REQUEST);
                }
            } catch (Exception e) {
                return new AuthDataResponseDto(null, "에러: " + e.getMessage(), HttpStatus.INTERNAL_SERVER_ERROR);
            }
        } else {
            return new AuthDataResponseDto(null, "해당 챌린지를 신청한 멤버가 아닙니다.", HttpStatus.BAD_REQUEST);
        }
    }
    
    
    **private void notify(Member receiver, NotificationType type, String content, Long challengeId, Long authId) {
        NotificationRequestDto notificationRequest = NotificationRequestDto.builder()
                .receiver(receiver)
                .notificationType(type)
                .content(content)
                .challengeId(challengeId)
                .authId(authId)
                .build();
        notificationService.send(notificationRequest);
    }**