๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๊ฐœ๋ฐœ์ƒํ™œ/์ด๊ฒƒ์ €๊ฒƒ

SSE ์ ์šฉ (2) - ์ด์ค‘ํ™”๋ฅผ ์ ์šฉํ•ด๋ณด์ž

by cocococo331 2024. 8. 19.

ํ•„์š”์„ฑ

- ์‹ ๊ทœ ๊ตฌ์ถ• ์Šคํ…Œ์ด์ง€ ๋Œ€๋ถ€๋ถ„ API 2๋Œ€ -> ์Šค์ผ€์ผ๋ง ์ด์Šˆ ๊ณ ๋ฏผ์ด ํ•„์š”

- ํ˜„์žฌ ๊ฐ๊ฐ API์— Subscribe, broadcast๊ฐ€ ์žˆ์œผ๋ฉด ์ด๋ฒคํŠธ ๋ฐœ์ƒํ•œ ์ชฝ์— ์‘๋‹ต ๋ˆ„๋ฝ ๊ฐ€๋Šฅ์„ฑ์ด ์žˆ์Œ.

 

๊ตฌํ˜„ ๋ฐฉ๋ฒ•

- MQ๋ฅผ ์ด์šฉ

  - ๊ธฐ์กด์— ์‚ฌ์šฉ์ค‘์ธ ๋ฆฌ์†Œ์Šค

  - Redis๋‚˜ Kafka๋ฅผ ๋งŽ์ด ์‚ฌ์šฉํ•˜๋‚˜, ํ˜„์žฌ ์ƒํ™ฉ์—์„œ SSE๋•Œ๋ฌธ์— ์ธํ”„๋ผ ์ถ”๊ฐ€๋Š” ๋ถˆ๊ฐ€ํ•œ ์ƒํ™ฉ

  - ๋‹ค์ค‘ ๋กœ๊ทธ์ธ ์ด๋ ‡๊ฒŒ ์‹ฌํ™”๊นŒ์ง€ ํ•„์š”ํ•˜์ง€ ์•Š์Œ (๋‹จ์ผ ๊ธฐ๊ธฐ ๋กœ๊ทธ์ธ ํ™˜๊ฒฝ)

  - ๊ตฌํ˜„์„ ์ด๊ฒƒ์ €๊ฒƒ ๋ณด๋‹ˆ MQ๋กœ ์ถฉ๋ถ„ํ•˜๊ฒŒ ๋Œ€์‘ ๊ฐ€๋Šฅํ•  ๊ฒƒ ๊ฐ™์Œ

- Subscribe๋Š” ์–ด๋Š ์„œ๋ฒ„์—์„œ ๋ฐœ์ƒํ•ด๋„ ์ƒ๊ด€์ด ์—†์Œ, ๋”ฐ๋ผ์„œ API๋กœ ์ง„ํ–‰, Broadcast๋Š” MQ๋กœ ์ ์šฉ

- ์ด๋ฒคํŠธ ๋ฐœ์ƒ ์‹œ ์ด๋ฒคํŠธ ID Prefix๋ฅผ ํ™•์ธํ•˜์—ฌ ๊ตฌ๋…์ค‘์ธ ๋ชจ๋“  ์„œ๋ฒ„์— ๋งคํ•‘๋˜๋Š” ๋ชจ๋“  ์ด๋ฒคํŠธ๋ฅผ ๋™์‹œ์— Broadcastํ•˜๋„๋ก ๊ตฌํ˜„

- Fanout ๋ฐฉ์‹์œผ๋กœ ๊ตฌํ˜„ ์ง„ํ–‰

  - ํ•ด๋‹น Exchange๋ฅผ ๊ตฌ๋…ํ•˜๊ณ  ์žˆ๋Š” ๋ชจ๋“  ํ์— broadcast๋ฅผ ๋‚ ๋ ค prefix ํ•˜์œ„๋ฅผ ๋ชจ๋‘ broadcastํ•˜๋Š” ๋ฐฉ์‹

 

๊ตฌํ˜„

Batch

- ์—…๋ฌด Flow์ƒ subscrie๊ฐ€ ๋˜์–ด์žˆ๋‹ค๋Š” ๊ฐ€์ •ํ•˜์—, ๊ทœ์ •์ค€์ˆ˜๊ฒ€์‚ฌ Batch Logic ๋™์ž‘ ์ค‘์— broadcast ์ˆ˜ํ–‰

- broadcast ๋ฉ”์„œ๋“œ ํ˜ธ์ถœ ์‹œ EXCHANGE_SSE_BROADCAST์œผ๋กœ ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰

- broadcast๋Š” API์— ๊ตฌํ˜„ํ•ด ๋†“์•˜๊ธฐ ๋•Œ๋ฌธ์— Batch์—์„œ๋Š” Exchange ์„ ์–ธ๋งŒ ํ•˜๋ฉด ๋จ

- ์˜ˆ์ œ ์ฝ”๋“œ 

//Menu1SseBroadcastStrategy

@Override
public void sseBroadcast(String cmpnId, String subId, SseExecutionStatus status) {
 
    SseBroadcastModel sseBroadcastModel = SseBroadcastModel.builder()
            .cmpnId(cmpnId)
            .menuName(getSseBroadcastMenu().getMenuName())
            .subId(subId)
            .status(status.getValue())
            .build();
 
    rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_SSE_BROADCAST, "", sseBroadcastModel);
}

 

 

API

- ์‹ค์ œ broadcast ๋ฉ”์‹œ์ง€๋ฅผ ๋ฐ›์•„ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ณณ

- ์˜ˆ์ œ ์ฝ”๋“œ

//SSEConsumer

@RabbitListener(queues = RabbitMqConfig.QUEUE_SSE_BROADCAST_PROCESS, concurrency = "${consumer.sse-broadcast}")
public void sseReceiveMessage(SseBroadcastModel message) {
 
    sseService.broadcastToMappedEvent(message);
}

 

// SSEService
public void broadcastToMappedEvent(SseBroadcastModel message) {
    String idPrefix = getEventIdPrefix(message.getCmpnId(), message.getMenuName());
 
    emitters.entrySet().stream()
            .filter(entry -> entry.getKey().startsWith(idPrefix))
            .forEach(entry -> {
                SseEmitter emitter = entry.getValue();
                String eventId = entry.getKey();
                sendEventToClient(emitter, eventId, message.getMenuName(), eventId + "_" + message.getSubId()+ "_" + message.getStatus());
            });
}
 
private void sendEventToClient(SseEmitter emitter, String eventId, String menuName, String data) {
    try {
        emitter.send(
                SseEmitter.event()
                        .id(eventId)
                        .name(menuName)
                        .data(data));
    } catch (Exception e) {
        emitter.completeWithError(e);
    }
}
 
private String getEventIdPrefix(String cmpnId, String menuName) {
    StringJoiner eventIdBuilder = new StringJoiner("_");
    eventIdBuilder.add(cmpnId)
                  .add(menuName);
    return eventIdBuilder.toString();
}

 

// RabbitMQConfig

...
public static final String EXCHANGE_SSE_BROADCAST = "fanout.sse.broadcast";
public static final String QUEUE_SSE_BROADCAST_PROCESS = "sse.broadcast.process";
...
@Bean
public FanoutExchange sseBroadcastFanoutExchange() {
    return new FanoutExchange(EXCHANGE_SSE_BROADCAST);
}
 
@Bean
public Queue getSseBroadcastQueue() {
    return new Queue(QUEUE_SSE_BROADCAST_PROCESS);
}
 
@Bean
public Binding bindingSseExchangeAndQueue(Queue getSseBroadcastQueue, FanoutExchange sseBroadcastFanoutExchange) {
    return BindingBuilder.bind(getSseBroadcastQueue).to(sseBroadcastFanoutExchange);
}
...

 

Client

- API์—์„œ broadcast๊ฐ€ ์ผ์–ด๋‚˜๋ฉด watch์— sseEventData response ๋ณ€๊ฒฝ์ด ๊ฐ์ง€๋˜๊ณ  ๊ทธ๋•Œ ์กฐ๊ฑด์— ๋”ฐ๋ผ ์—…๋ฐ์ดํŠธ ์ง„ํ–‰

- ์˜ˆ์ œ ์ฝ”๋“œ

// menu1.vue
...
sseResponseData(newVal) {
    if (newVal.indexOf('_complete') !== -1) {
        this.getGridList();
    }
},
...
 
//menu1Detail.vue
...
sseResponseData(newVal) {
    if (newVal.indexOf('_complete') !== -1 && newVal.includes(this.menu1DataId)) {
        this.fetchDetail();
    }
},
...