๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๊ฐœ๋ฐœ์ƒํ™œ/์ด๊ฒƒ์ €๊ฒƒ

SSE ์ ์šฉ (1) - SSE๋ฅผ ์ ์šฉํ•ด๋ณด์ž ๊ฐœ๋…

by cocococo331 2024. 4. 12.

๋ฐฐ์น˜ ๋™์ž‘ ์™„๋ฃŒ ์‹œ์ ๋งˆ๋‹ค ํ™”๋ฉด์— update๋ฅผ ํ•ด์ค˜์„œ ์‚ฌ์šฉ์ž๊ฐ€ ์ตœ์‹  ๊ฒฐ๊ณผ๋ฅผ ๋ฐ”๋กœ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋„๋ก ์„œ๋น„์Šค๋ฅผ ์ œ๊ณตํ•˜๊ธฐ์œ„ํ•ด 

์ ์ ˆํ•œ ๊ธฐ์ˆ  ๊ฒ€ํ† ์ค‘์— SSE ์ถ”์ฒœ์„ ๋ฐ›์•„ ๋ถ„์„ ํ›„ ๊ฐœ๋ฐœ์„ ์ง„ํ–‰ํ•˜๊ฒŒ ๋˜์—ˆ๋‹ค.

๊ด€๋ จ ๋‚ด์šฉ์„ ์ •๋ฆฌ

 

SSE๋ž€, Server Sent Event

์›น ๋ธŒ๋ผ์šฐ์ €์™€ ์„œ๋ฒ„ ๊ฐ„ ๋‹จ๋ฐฉํ–ฅ ํ†ต์‹ ์„ ๊ฐ€๋Šฅํ•˜๊ฒŒ ํ•˜๋Š” ์›น ๊ธฐ์ˆ 

์„œ๋ฒ„์—์„œ ํด๋ผ์ด์–ธํŠธ(๋ธŒ๋ผ์šฐ์ €)๋กœ ์‹ค์‹œ๊ฐ„ ์ด๋ฒคํŠธ๋ฅผ ๋ณด๋‚ผ ์ˆ˜ ์žˆ๋‹ค.

ํด๋ผ์ด์–ธํŠธ ์ธก์—์„œ ์„œ๋ฒ„๋กœ์˜ ์—ฐ๊ฒฐ์„ ์—ด์–ด๋‘๊ณ  ์„œ๋ฒ„์—์„œ ์ด๋ฒคํŠธ๋ฅผ ํ‘ธ์‰ฌํ•˜๋Š” ๋ฐฉ์‹

์„œ๋ฒ„๋Š” ์ด๋ฒคํŠธ๋ฅผ ์ƒ์„ฑํ•˜๊ณ , ์ด๋ฒคํŠธ์— ํ•„์š”ํ•œ Data๋ฅผ ํฌํ•จํ•˜์—ฌ Client์—๊ฒŒ ์ „๋‹ฌํ•œ๋‹ค.

Client๋Š” ์ด๋ฒคํŠธ ์ˆ˜์‹  ํ›„ ํ•„์š”ํ•œ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋Š”๋ฐ ๋ฐ์ดํ„ฐ๋ฅผ ์‚ฌ์šฉ

 

SSE ํŠน์ง•

- ๋‹จ๋ฐฉํ–ฅ ํ†ต์‹  ์„œ๋ฒ„ -> ํด๋ผ์ด์–ธํŠธ๋กœ๋งŒ Data ์ „์†ก

- ๊ธฐ๋ณธ HTTP ํ”„๋กœํ† ์ฝœ ์‚ฌ์šฉ -> ์ถ”๊ฐ€ Library๋‚˜ ํ”„๋ ˆ์ž„์›Œํฌ ์—†์ด๋„ ์‰ฝ๊ฒŒ ๊ตฌํ˜„์ด ๊ฐ€๋Šฅํ•˜๋‹ค.

- ์‹ค์‹œ๊ฐ„์œผ๋กœ ์„œ๋ฒ„์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์‹ ํ•˜์—ฌ ๋™์ ์œผ๋กœ ์—…๋ฐ์ดํŠธ๊ฐ€ ๊ฐ€๋Šฅํ•จ

- client๋Š” same sse emitter๋ฅผ ๊ณต์œ ํ•  ์ˆ˜ ์—†๋‹ค. ๊ตฌ๋…๋‹น ํ•˜๋‚˜์˜ emitter๋ฅผ ๋งŒ๋“ค์–ด์•ผํ•œ๋‹ค. (๋™์ผ ๋ธŒ๋ผ์šฐ์ €๋“  ๋‹ค๋ฅธ user๋“  ์–ด๋””์„œ๋“  ๋…๋ฆฝ์ ์œผ๋กœ ์œ ์ง€๊ฐ€ ๋จ)

- ๋ธŒ๋ผ์šฐ์ € ์ตœ๋Œ€ ๋™์‹œ์ ‘์†์ˆ˜๊ฐ€ http1.1์˜ ๊ฒฝ์šฐ 6๊ฐœ, http2์˜ ๊ฒฝ์šฐ 100๊ฐœ ๊นŒ์ง€ ๊ฐ€๋Šฅํ•˜๋‹ค.

- ํด๋ผ์ด์–ธํŠธ์—์„œ emitter๋ฅผ closeํ•ด๋„ ์„œ๋ฒ„์—์„œ ๊ฐ์ง€ํ•˜๊ธฐ ์–ด๋ ต๋‹ค. (๋‹จ๋ฐฉํ–ฅ ๋‹จ์ )

 

Sample ๊ตฌํ˜„

- ์ง„์งœ Sample Level์—์„œ ์กฐ๊ธˆ ์†๋ด„

- API, Client, Batch(ํŠน์ • ์ž‘์—… ์™„๋ฃŒ ํ›„ broadcast ํ˜ธ์ถœ์„ ์œ„ํ•จ)

- ๊ธฐ๋ณธ ํ•ต์‹ฌ ๊ตฌ์„ฑ Subscribe, Broadcast ์ •๋„๋งŒ

 

API

๊ตฌ์„ฑ

- subscribe(api)

    - ๊ตฌ๋… ์š”์ฒญ -> emitter ์ƒ์„ฑ -> client์— ์ƒ์„ฑ์ด ์™„๋ฃŒ๋˜์—ˆ๋‹ค๋Š” ์ดˆ๊ธฐ ๋ฐ์ดํ„ฐ ์‘๋‹ต -> ์™„๋ฃŒ

- broadcast(mq)

    - ๋‹จ์ผ ์„œ๋ฒ„๋ผ๋ฉด API์— ๋†“๊ณ  ์ƒ์„ฑํ•ด๋„ ์ƒ๊ด€์ด ์—†์ง€๋งŒ ์ด์ค‘ํ™” ๋œ ์„œ๋ฒ„์ด๊ธฐ ๋•Œ๋ฌธ์— ์–ด๋””์„œ ์ƒ์„ฑ์„ ํ–ˆ๋˜ ์ด์Šˆ๊ฐ€ ์—†๋„๋ก MQ๋ฅผ ์ ์šฉ

    - ํŠน์ • ์ž‘์—… ์™„๋ฃŒ ์ดํ›„ broadcast ์š”์ฒญ -> ๋งคํ•‘ ์ด๋ฒคํŠธ ์ฐพ์•„ ์‘๋‹ต -> ์™„๋ฃŒ

 

์ฝ”๋“œ ์ƒ˜ํ”Œ

1. ๊ตฌ๋… API

//Controller

@GetMapping(value = {"/api/sse/subscribe/{orgId}/{menuName}"}, produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribeSse(
        @PathVariable String orgId,
        @PathVariable String menuName) {
    return sseService.subscribe(orgId, menuName);
}
 
 
//Service

private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
private static final long SSE_TIMEOUT_LIMIT_MINUTE = 3;
private final RabbitTemplate rabbitTemplate;
 
public SseEmitter subscribe(String orgId, String menuName) {
    SseEmitter emitter = new SseEmitter(TimeUnit.MINUTES.toMillis(SSE_TIMEOUT_LIMIT_MINUTE));
    String eventId = getEventIdPrefix(cmpnId, menuName) + "_" + Utils.getUuid();
    emitters.put(eventId, emitter);
 
    emitter.onCompletion(() -> emitters.remove(eventId));
    emitter.onTimeout(emitter::complete);
    emitter.onError(error -> {
        log.error("[Sse Error] eventId : {}, cause: {}", eventId, error.getMessage());
        emitter.complete();
    });
 
    sendEventToClient(emitter, eventId, menuName, SseExecutionStatus.EVENT_CREATED.getValue());
    return emitter;
}
 
private String getEventIdPrefix(String orgId, String menuName) {
    StringJoiner eventIdBuilder = new StringJoiner("_");
    eventIdBuilder.add(orgId)
                  .add(menuName);
    return eventIdBuilder.toString();
}
 
private void sendEventToClient(SseEmitter emitter, String eventId, String menuName, String data) {
    try {
        emitter.send(
                SseEmitter.event()
                        .id(eventId)
                        .name(menuName)
                        .data(data));
    } catch (Exception e) {
        emitter.completeWithError(e);
    }
}

 

2. ์ด๋ฒคํŠธ Broadcasting API

// Service (mq ์‚ฌ์šฉํ•˜์—ฌ ๋”ฐ๋กœ API ์—†์ด consumer ๋ถ€๋ถ„)
// getEventIdPrefix, sendEventToClient๋Š” ์œ„์™€ ๋™์ผ 

public void broadcastToMappedEvent(SseBroadcastModel message) {
    String idPrefix = getEventIdPrefix(message.getCmpnId(), message.getMenuName());
 
    emitters.entrySet().stream()
            .filter(entry -> entry.getKey().startsWith(idPrefix))
            .forEach(entry -> {
                SseEmitter emitter = entry.getValue();
                String eventId = entry.getKey();
                sendEventToClient(emitter, eventId, message.getMenuName(), eventId + "_" + message.getSubId()+ "_" + message.getStatus());
            });
}

 

Client

๊ตฌ์„ฑ

- sse ์ƒ์„ฑ ํ•„์š” ๋ฉ”๋‰ด ์ง„์ž… ์‹œ event ์ƒ์„ฑ

- vue store์—์„œ ์ด๋ฒคํŠธ ์ƒ์„ฑ ๋ฐ ์ €์žฅ

- api์—์„œ broadcast๋ฅผ ํ†ตํ•ด response data๊ฐ€ ์—…๋ฐ์ดํŠธ ๋˜๋ฉด, ๊ฐ ํ™”๋ฉด์—์„œ ํ•„์š” ์ž‘์—… ์ˆ˜ํ–‰

 

์ฝ”๋“œ ์ƒ˜ํ”Œ

1. ํ™”๋ฉด

...
created: function() {
    ...
    this.initSse();
},
computed: {
    ...mapGetters({
        ....
        sseEventSource: 'sse/sseEventSource',
        sseResponseData: 'sse/sseResponseData',
        isSseEventSourceClosed: 'sse/isSseEventSourceClosed',
        lastEventId: 'sse/lastEventId'
    }),
},
watch: {
    ...
    sseResponseData(newVal) {
        if (newVal.indexOf('_complete') !== -1) {
            this.getDataList();
        }
    },
    isSseEventSourceClosed: function() {
       this.getDataList();
    }
}
methods: {
    ...,
    initSse() {
        let payload = {
            orgId : this.orgId,
            menuName : 'sampleMenu'
        }
        if (!this.sseEventSource || (this.orgId !== this.getOrgIdInLastEventId(this.lastEventId))) {
            this.$store.dispatch('sse/completeSseEventSource');
            this.$store.dispatch('sse/createSseEventSource', payload);
        }
   },
   getOrgIdInLastEventId(lastEventId) {
       return lastEventId.substring(0, lastEventId.indexOf('_'));
   }
}

 

2. Store

//actions.js
 
const createSseEventSource = (context, payload)  => {
    let url = `${UrlsConfig.govUrl}/api/sse/subscribe/${payload.orgId}/${payload.menuName}`;
    let eventSource = new EventSource(url);
 
    eventSource.addEventListener(payload.menuName,event => {
        context.commit('setLastEventId', event.lastEventId);
        context.commit('setSseResponseData', event.data+'_'+event.timeStamp);
    })
    eventSource.onerror = (event) => {
        if (event.target.readyState === EventSource.CLOSED) {
            context.commit('setIsSseEventSourceClosed', true);
            console.error('SSE CREATE ERROR : ' + '(' + event + ')');
        }
 
    }
    context.commit('setSseEventSource', eventSource);
}
 
const completeSseEventSource = (context) => {
    if (!context.state.sseEventSource) {
        return;
    }
 
    context.state.sseEventSource.close();
    context.commit('setSseEventSource', null);
 
    let url = `/api/sse/unsubscribe/${context.state.lastEventId}`;
    try {
        axios.delete(url);
    } catch (err) {
        console.error("SSE COMPLETE ERROR : ", err);
    } finally {
        context.commit('setLastEventId', '');
    }
}
 
 
//getters.js
 
const sseEventSource = (state) => state.sseEventSource;
const sseResponseData = (state) => state.sseResponseData;
const isSseEventSourceClosed = (state) => state.isSseEventSourceClosed;
const lastEventId = (state) => state.lastEventId;
 
 
//index.js
 
const state = {
    sseEventSource: null,
    lastEventId: '',
    sseResponseData: null,
    isSseEventSourceClosed: false
}
 
 
//mutations.js
 
const setSseEventSource = (state, sseEventSource) => {
    state.sseEventSource = sseEventSource;
};
 
const setSseResponseData = (state, sseResponseData) => {
    state.sseResponseData = sseResponseData;
};
 
const setIsSseEventSourceClosed = (state, isClosed) => {
    state.isSseEventSourceClosed = isClosed;
}
 
const setLastEventId = (state, lastEventId) => {
    state.lastEventId = lastEventId;
}

Batch

์—ญํ• 

- ํŠน์ • ์ž‘์—… ์™„๋ฃŒ ์‹œ ๊ฐ ํ™”๋ฉด์—์„œ ํ•„์š”ํ•œ ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์œ„ํ•ด Broadcast๋กœ ๋ฉ”์‹œ์ง€ ์ „๋‹ฌ(Trigger)

- ์‹ค์ œ Broadcast ์ž์ฒด๋Š” ๋ฐฐ์น˜๊ฐ€ ์•„๋‹Œ API์—์„œ ์ด๋ฃจ์–ด์ง (๋ฐฐ์น˜๋Š” ํŠธ๋ฆฌ๊ฑฐ๋งŒ + ์ด์ค‘ํ™” ์œ„ํ•ด MQ ์‚ฌ์šฉ)

public void sseBroadcast(String orgId, SseExecutionStatus status) {
    SseBroadcastModel sseBroadcastModel = SseBroadcastModel.builder()
            .cmpnId(cmpnId)
            .menuName(menuName)
            .status(status.getValue())
            .build();
 
    rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_SSE_BROADCAST, "", sseBroadcastModel);
}