๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๊ฐœ๋ฐœ์ƒํ™œ/์ด๊ฒƒ์ €๊ฒƒ

SSE ์ ์šฉ (3) - SSE ์ ์šฉ ์‹œ ์ด์Šˆ ์ฒ˜๋ฆฌ

by cocococo331 2024. 8. 19.

1. Event Share ์ด์Šˆ ์ฒ˜๋ฆฌ (๋™์‹œ ์ ‘์† ๊ณ ๋ ค)

์ด์Šˆ
- ์—ฌ๋Ÿฌ๊ฐœ์˜ ํƒญ์„ ๋„์›Œ๋†จ์„ ๋•Œ Emitter Subscribe Response๋Š” ๋‹ค OK๊ณ  Event๊ฐ€ ์ œ๋Œ€๋กœ ์ƒ์„ฑ์—ˆ์ง€๋งŒ ๊ฐ€์žฅ ์ตœ๊ทผ์— ์ƒ์„ฑ๋œ ํƒญ๋งŒ event data ์—…ํ…Œ์ดํŠธ๊ฐ€ ๋จ
์›์ธ 
- API์—์„œ ๋™์ผ ์ด๋ฒคํŠธ๊ฐ€ ์•„๋‹Œ ๋‹ค์Œ ์ƒ์„ฑ๋œ ๋™์ผ ํ‚ค์˜ ์ด๋ฒคํŠธ๋กœ ๋ฎ์–ด ์“ฐ๊ธฐ๊ฐ€ ๋˜์–ด์ ธ๋ฒ„๋ฆผ
- = API์—์„œ concurrentHashMap์— emitter ๋‹ด๊ณ  ์žˆ๋Š”๋ฐ ๋™์ผ key๋กœ ์ƒˆ ๊ฒƒ์œผ๋กœ ์—Ž์–ด์ณ์ ธ์„œ
ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•
- emitter๋Š” ๊ณต์œ  ์•ˆ๋จ
- ์ด๋ฒคํŠธ ๊ตฌ๋…์ด ํ•„์š”ํ•  ๋•Œ๋งˆ๋‹ค ์—ฌ๋Ÿฌ๊ฐœ emitter๋ฅผ uuid ์ถ”๊ฐ€ ๋ฐ idPrefix์ ์šฉํ•ด์„œ Broadcast ์‹œ idPrefix ํ•˜์œ„ ๊ฒƒ๋“ค ๋ชจ๋‘ ์—…๋ฐ์ดํŠธ ๋˜๋„๋ก ๋ณ€๊ฒฝ
 

2. ์ธ์ฆ

์ด์Šˆ
- event subscribe์‹œ auth๋ฅผ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์—†์Œ
์›์ธ 
- ์ธ์ฆ์„ ์œ„ํ•ด ํ—ค๋”์— ํ† ํฐ์„ ์ „๋‹ฌํ•ด์•ผํ•˜๋Š”๋ฐ ๊ธฐ๋ณธ EventSource์—๋Š” header ์ถ”๊ฐ€๊ฐ€ ์•ˆ๋˜์–ด๋ฒ„๋ฆผ
ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•
a. EventSourcePolyfill ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ ์‚ฌ์šฉํ•ด์„œ ํ—ค๋”์— Authorization ์ถ”๊ฐ€
  - ์˜ˆ์ œ ์ฝ”๋“œ 

let tokenKey = Profile.env==='PRODUCTION' ? VueCookies.get(GlobalConstants.ACCESS_TOKEN) : VueCookies.get(GlobalConstants.ACCESS_TOKEN_DEV);
const EventSource = EventSourcePolyfill || NativeEventSource;
this.eventSource = new EventSource(
    `${UrlsConfig.menu1Url}/api/sse/subscribe/${this.currentCmpnId}/gridList`,
    {
        headers: {
                    Authorization: `Bearer ${tokenKey}`,
                  },
        heartbeatTimeout: 12 * 60 * 1000,
    }
);

  - ์ด์Šˆ
    i) ๊ฒ€์‚ฌ ์™„๋ฃŒ์‹œ ๋ธŒ๋กœ๋“œ์บ์ŠคํŠธ๋Š” ์ •์ƒ ๋™์ž‘(401 ์•ˆ๋‚จ) ์ด์Šˆ๊ฐ€ ๊ฐœ๋ฐœ์ž ๋„๊ตฌ์—์„œ EventStream ์—…๋ฐ์ดํŠธ ๋˜๋Š” ๋ฐ์ดํ„ฐ๊ฐ€ ์•ˆ๋ณด์ž„(pending์€ ์•„๋‹Œ๋ฐ ์•ˆ๋œธ )
    ii) https://github.com/Yaffle/EventSource/issues/79
  - -> ๋กค๋ฐฑ
b. ๊ตฌ๋… ์‹œ ์ธ์ฆ ์ œ์™ธ
- ์–ด๋–ค ๊ณ ๊ฐ์ด ๋‹ค๋ฅธ ํšŒ์‚ฌ์˜ ๋‹ค๋ฅธ ๊ณ ๊ฐ์˜ ์•„์ด๋”” ๋ชจ๋‘๋ฅผ ์•Œ์•„์„œ ์ด๋ฒคํŠธ๋ฅผ ํ›”์ณ๋ณธ๋‹ค๊ณ  ํ–ˆ์„๋•Œ ์œ ์ถœ์ด ๋  ๋งŒํ•œ ์ •๋ณด
  - ์ด๋ฒคํŠธ ์ŠคํŠธ๋ฆผ ๋ฌด๋‹จ ๊ตฌ๋…์„ ํ†ตํ•ด์„œ ๋ญ”์ง€ ๋ชจ๋ฅผ ๊ฒ€์‚ฌ๊ฐ€ ๋๋‚ฌ๋‹ค๋Š” ์‚ฌ์‹ค
  - companyID, dataID
- ํฌ๊ฒŒ ์˜๋ฏธ์žˆ๋Š” ๋ฐ์ดํ„ฐ X โ†’ ๊ฒฐ๋ก  : Subscribe์‹œ์—๋Š” ์ธ์ฆ ์ œ์™ธํ•˜๋„๋ก ๊ตฌํ˜„
 

3. ํ”„๋ก์‹œ ํ†ตํ•œ API Call๋กœ์˜ ๋ณ€๊ฒฝ

API GW ์„ค์ •์ด ์•ˆ๋˜์–ด์žˆ๊ธฐ ๋•Œ๋ฌธ์— ํ”„๋กœ์ ํŠธ ํ”„๋ก์‹œ ํ†ตํ•ด์„œ ClientUrl โ†’ BackendUrl๋กœ ๋ณ€๊ฒฝํ•ด์„œ API ์ฝœ ํ•ด์•ผํ•จ
์ด์Šˆ
- ํ”„๋ก์‹œ ํ†ตํ•ด์„œ clientURL๋กœ subscribe ์ง„ํ–‰์‹œ header connection reponse close error โ†’ pending
ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•
- webpack โ†’ compress: false ์˜ต์…˜ ๋ณ€๊ฒฝ
 

4. http 1.1 ๋ธŒ๋ผ์šฐ์ € ์ œํ•œ ์ด์Šˆ

์ด์Šˆ
- http1.1 ์‚ฌ์šฉ ์‹œ ๋ธŒ๋ผ์šฐ์ €๋‹น 6๊ฐœ eventSource๋ฅผ ์ƒ์„ฑ ๊ฐ€๋Šฅ
  - ์ด๋ฏธ emitter์— ๋“ค์–ด์žˆ๋Š” ์ด๋ฒคํŠธ ๊ฐ์ฒด๊ฐ€ 6๊ฐœ๊ฐ€ ์ดˆ๊ณผํ•œ๋‹ค๊ณ  ํ•˜๋”๋ผ๋„ ๋ธŒ๋ผ์šฐ์ €๊ฐ€ 6๊ฐœ ๋ฏธ๋งŒ์ด๋ฉด ์ƒ๊ด€์—†์Œ
ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•
- ์›น ์„œ๋ฒ„(์˜ˆ: Apache, Nginx)์—์„œ๋Š” HTTP/2.0๋ฅผ ์ง€์›, ์ด๋ฅผ ์ด์šฉํ•˜์—ฌ ํŠน์ • ๊ฒฝ๋กœ ๋˜๋Š” URL์— ๋Œ€ํ•ด์„œ๋งŒ HTTP/2.0์„ ํ™œ์„ฑํ™”ํ•˜๋Š” ๋ฐฉ๋ฒ•
- id์— miliseconds๋ฅผ ์ถ”๊ฐ€ํ•˜์—ฌ maximum์ด ๋˜๋Š” ๊ฒฝ์šฐ ์ตœ์ดˆ ๊ฒƒ๋ถ€ํ„ฐ ์‚ญ์ œํ•˜๋„๋ก ๋ณ€๊ฒฝ
  - user๊ฐ€ ์ถ”๊ฐ€๋˜์–ด์•ผํ•ด์„œ URL์ด ๊ธธ์–ด์ง
- โ†’ ๋ณต์žกํ•˜๊ธฐ๋„ ํ•˜๊ณ  ๋‹ค๋ฅธ ์œ ์ €๊ฐ€ ์•„๋‹Œ ํ•œ ๊ฐœ์ธ์ด ์—ฌ๋Ÿฌ ๋ธŒ๋ผ์šฐ์ €๋ฅผ ๋„์šฐ๋Š”๊ฑฐ๋ผ ํฐ ์ด์Šˆ๋กœ ๋ณด์—ฌ์ง€์ง„์•Š์•„์„œ ์šฐ์„ ์€ Timeout์„ ์ ์ ˆํ•˜๊ฒŒ ์ค˜์„œ ๋น ๋ฅธ ํ•ด์ง€ ๋น ๋ฅธ reconnect(์ƒˆ ํƒญ์—์„œ ๋จผ์ € ๊ฐ€์ ธ๊ฐˆ ์ˆ˜ ์žˆ๋„๋ก)
 

5. SEEmitter ๊ณตํ†ตํ™”

์ด์Šˆ
- Event list, detail ๋‚˜๋ˆ ๋†“์Œ โ†’ ๋ถ„๊ธฐ๊ฐ€ ๋งŽ์ด ์ƒ๊น€
- detail ๊ฒ€์‚ฌ ์ง„ํ–‰ ์‹œ ๊ฒ€์‚ฌ์‹œ๋งˆ๋‹ค ์ด๋ฒคํŠธ๊ฐ€ ์ƒˆ๋กœ ์ƒ์„ฑ๋จ
ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•
- vue store ์‚ฌ์šฉํ•˜์—ฌ ํ•˜๋‚˜์˜ ์œ ์ €๊ฐ€ Menu1 ๋ฉ”๋‰ด์—์„œ emitter๋Š” ํ•˜๋‚˜๋กœ ๊ณ„์† ์‚ฌ์šฉํ•˜๋„๋ก ์ฒ˜๋ฆฌ
 

6. net::ERR_HTTP2_PROTOCOL_ERROR 200 (OK) ๋ฐ ์—ฌ๋Ÿฌ Nginx ์ด์Šˆ

์ด์Šˆ
- subscribe ์‹œ pending ์ด์Šˆ 
- nginx ์‚ฌ์šฉ ๊ฐœ๋ฐœ ์ ์šฉ ์‹œ 1๋ถ„ ์ฃผ๊ธฐ๋กœ client์—์„œ sse ์ ‘์† ์žฌ์‹œ๋„ ๋ฐ Error ๋ฐœ์ƒ ์ด์Šˆ
ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•
- ๋ณ„๋„ ๋ฆฌํŠธ๋ผ์ด ์—†์–ด๋„ client์ชฝ์—์„œ๋Š” ์—ฐ๊ฒฐ์ด ๋Š์–ด์ง€๋ฉด sse ์ž๋™์œผ๋กœ ํ˜ธ์ถœํ•˜๋Š” ๊ฒƒ์œผ๋กœ ๋ณด์ž„ (=api๋กœ์ง์˜ retry ์ œ๊ฑฐ ๋ฐ client proxy_read_timeout 360; ์ถ”๊ฐ€ ์ œ๊ณต)
- api ์—์„œ ํƒ€์ž„์•„์›ƒ์€ client๋ณด๋‹ค ๋จผ์ € ๋ฐœ์ƒ์‹œ์ผœ์„œ client๊ฐ€ ์ž์—ฐ์Šค๋Ÿฝ๊ฒŒ ์žฌ์‹œ๋„ํ•˜๋„๋ก ์„ค์ • (api timeout 3m)
- SSE๊ด€๋ จ Nginx ์„ค์ •๋“ค

location /api/sse/ {
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    proxy_pass http://${api}:{port}$request_uri;
    proxy_read_timeout 360;
    charset utf-8;
}

 

7. ๋ฉ”๋ชจ๋ฆฌ ์ด์Šˆ ๋ฐฉ์ง€ (client, server lifecycle ๊ณ ๋ ค)

์ด์Šˆ
- ์ด๋ฒคํŠธ ๋ฐœ์ƒ ํŽ˜์ด์ง€๋ฅผ ๋ฒ—์–ด๋‚˜ ๋‹ค๋ฅธ ๋ฉ”๋‰ด๋กœ ๊ฐ€๋Š” ๊ฒฝ์šฐ ์ด๋ฒคํŠธ๊ฐ€ ์œ ์ง€๋˜์–ด ์žฅ๊ธฐ์ ์œผ๋กœ ๋ณด๋ฉด ๋ฉ”๋ชจ๋ฆฌ ์ด์Šˆ ๋ฐœ์ƒ ๊ฐ€๋Šฅ์„ฑ์ด ์žˆ์Œ
- client life cycle๊ณผ server lifecycle์„ ๋งž์ถ”์ง€ ์•Š์œผ๋ฉด ์–ด๋””์„ ๊ฐ€ ์ด๋ฒคํŠธ ํ•ด์ง€๋˜์ง€ ์•Š์•„ ๋ˆ„์ˆ˜ ๋ฐœ์ƒ ๊ฐ€๋Šฅ์„ฑ
ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•
- ๊ธฐ๋ณธ์ ์œผ๋กœ client์™€ server ๋‘ ์‚ฌ์ดํด์„ ๋งž์ถ”๊ธด ์–ด๋ ค์›€ โ†’ ์ ์ ˆํ•œ timeout ์ฃผ๊ธฐ + ๋ฉ”๋‰ด ๋ฒ—์–ด๋‚˜๋Š” ๊ฒฝ์šฐ client, server์˜ ์ด๋ฒคํŠธ ๋ช…์‹œ์  ํ•ด์ง€
- API onComplete ๋ถ€๋ถ„ ์ˆ˜์ •
  a. ์ด์ค‘ํ™” ์„œ๋ฒ„๋ณ„ remove๋กœ ์ž‘์—… ์ง„ํ–‰ โ†’ remove๊ฐ€ ์•„๋‹Œ complete๋กœ ๋ณ€๊ฒฝ
  b. ์„œ๋ฒ„์ชฝ์—์„œ sse ์ด๋ฒคํŠธ ์ฒ˜๋ฆฌ๊ฐ€ ์™„๋ฃŒ๋˜๋ฉด sse emitters์—์„œ ํ•ด๋‹น emitter ์‚ญ์ œํ•˜๋Š”๊ฒŒ ์•„๋‹ˆ๊ณ  complete ์ณ์„œ ํ”„๋กœ์„ธ์Šค๋ฅผ ํƒœ์›Œ์•ผ ์ •์ƒ์ ์œผ๋กœ ์ข…๋ฃŒ
  c. ์ง์ ‘ ์‚ญ์ œ๋Š” onComplete ๋‚ด๋ถ€์—์„œ๋งŒ ํ•˜๋ฉด ๋จ
- sse client ์ด๋ฒคํŠธ ์ดˆ๊ธฐํ™” ๋ฐ unsubscribe ๊ตฌํ˜„ (mq)
  - Flow
    - ์ด๋ฒคํŠธ ์ƒ์„ฑ ์™ธ ํŽ˜์ด์ง€์—์„œ๋Š” ํŽ˜์ด์ง€ ์ง„์ž… ์‹œ store์— ๋‹ด๊ธด event ์ดˆ๊ธฐํ™” / ์ด๋ฒคํŠธ ์ƒ์„ฑ ํŽ˜์ด์ง€๋Š” event ์ดˆ๊ธฐํ™” ๋ฐ ์‹ ๊ทœ ์ƒ์„ฑ
    - client์—์„œ event close๋ฐ store ์ดˆ๊ธฐํ™” ์ง„ํ–‰ํ•˜๋ฉด์„œ api์— ํ•ด์ง€ ์š”์ฒญ์„ ๋‚ ๋ฆผ 
    - unsubscribe api์—์„œ complete ์ˆ˜ํ–‰
  - Client
    - ์˜ˆ์ œ ์ฝ”๋“œ 

//list.vue

...
initSse() {
    let payload = {
        cmpnId : this.currentCmpnId,
        menuName : 'menu1'
    }
    if (!this.sseEventSource || (this.currentCmpnId !== this.getCmpnIdInLastEventId(this.lastEventId))) {
        this.$store.dispatch('sse/completeSseEventSource'); //ํ•ด์ง€
        this.$store.dispatch('sse/createSseEventSource', payload); //์ƒ์„ฑ
    }
},
 
...
 
 
//actions.vue
import axios from 'axios';
...
const completeSseEventSource = (context) => {
    if (!context.state.sseEventSource) {
        return;
    }
 
    context.state.sseEventSource.close();
    context.commit('setSseEventSource', null);
 
    let url = `/api/sse/unsubscribe/${context.state.lastEventId}`;
    try {
        axios.delete(url);
    } catch (err) {
        console.error("SSE COMPLETE ERROR : ", err);
    } finally {
        context.commit('setLastEventId', '');
    }
}
...

  - API
    - ์˜ˆ์ œ ์ฝ”๋“œ 

//Controller
@DeleteMapping(value = "/api/sse/unsubscribe/{eventId}")
    public ResultModel removeDestroyedEmitter (@PathVariable("eventId") String eventId) {
        return sseService.sseUnsubscribe(eventId);
}
 
 
//Service
//์ด์ค‘ํ™”๋กœ ์–ด๋Š ์„œ๋ฒ„์— ํ•ด๋‹น ์ด๋ฒคํŠธ๊ฐ€ ์žˆ์„์ง€ ๋ชจ๋ฅด๋‹ˆ MQ๋กœ ์ฒ˜๋ฆฌ
public ResultModel sseUnsubscribe(String eventId) {
    ResultModel result = new ResultModel();
    try {
            rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_SSE_UNSUBSCRIBE_BROADCAST, "", eventId);
        } catch (Exception e) {
            result.setError("500", e.getMessage());
            result.setStatus("fail");
        }
        return result;
 
}
 
 
//RabbitMqConfig
public static final String EXCHANGE_SSE_UNSUBSCRIBE_BROADCAST = "fanout.sse.unsubscribe.broadcast";
public static final String QUEUE_SSE_UNSUBSCRIBE_BROADCAST_PROCESS = "sse.unsubscribe.broadcast.process";
 
@Bean
public FanoutExchange sseBroadcastUnsubscribeFanoutExchange() {
    return new FanoutExchange(EXCHANGE_SSE_UNSUBSCRIBE_BROADCAST);
}
 
 
@Bean
public Queue getSseUnsubscribeQueue() {
    return new Queue(QUEUE_SSE_UNSUBSCRIBE_BROADCAST_PROCESS);
}
 
@Bean
public Binding bindingSseUnsubscribeExchangeAndQueue(Queue getSseUnsubscribeQueue, FanoutExchange sseBroadcastUnsubscribeFanoutExchange) {
    return BindingBuilder.bind(getSseUnsubscribeQueue).to(sseBroadcastUnsubscribeFanoutExchange);
}
 
 
//Consumer
@RabbitListener(queues = RabbitMqConfig.QUEUE_SSE_UNSUBSCRIBE_BROADCAST_PROCESS, concurrency = "${consumer.sse-unsubscribe-broadcast}")
public void sseUnsubscribeReceiveMessage(String eventId) {
    sseService.broadcastToUnsubscribe(eventId);
}
 
 
//Service
public void broadcastToUnsubscribe(String eventId) {
    try {
        SseEmitter emitter = emitters.get(eventId); //subscribe์‹œ map์— ๋„ฃ์–ด๋†จ๋˜ emitter
        if (!ObjectUtils.isEmpty(emitter)) {
            emitter.complete();
        }
 
    } catch (Exception e) {
        log.error("[Sse unsubscribe Error] cause : {}, eventId : {}", e.getMessage(), eventId);
    }
}

- ๋‹ค๋ฅธ ์„œ๋น„์Šค๋กœ ๊ฐˆ๋•Œ์—๋Š” ํƒ€์ž„์•„์›ƒ ์ดํ›„ ์ž๋™์œผ๋กœ ์ด๋ฒคํŠธ ํ•ด์ง€ ๋จ (์žฌ์—ฐ๊ฒฐ์•ˆํ•จ)