ํ์์ฑ
- ์ ๊ท ๊ตฌ์ถ ์คํ ์ด์ง ๋๋ถ๋ถ API 2๋ -> ์ค์ผ์ผ๋ง ์ด์ ๊ณ ๋ฏผ์ด ํ์
- ํ์ฌ ๊ฐ๊ฐ API์ Subscribe, broadcast๊ฐ ์์ผ๋ฉด ์ด๋ฒคํธ ๋ฐ์ํ ์ชฝ์ ์๋ต ๋๋ฝ ๊ฐ๋ฅ์ฑ์ด ์์.
๊ตฌํ ๋ฐฉ๋ฒ
- MQ๋ฅผ ์ด์ฉ
- ๊ธฐ์กด์ ์ฌ์ฉ์ค์ธ ๋ฆฌ์์ค
- Redis๋ Kafka๋ฅผ ๋ง์ด ์ฌ์ฉํ๋, ํ์ฌ ์ํฉ์์ SSE๋๋ฌธ์ ์ธํ๋ผ ์ถ๊ฐ๋ ๋ถ๊ฐํ ์ํฉ
- ๋ค์ค ๋ก๊ทธ์ธ ์ด๋ ๊ฒ ์ฌํ๊น์ง ํ์ํ์ง ์์ (๋จ์ผ ๊ธฐ๊ธฐ ๋ก๊ทธ์ธ ํ๊ฒฝ)
- ๊ตฌํ์ ์ด๊ฒ์ ๊ฒ ๋ณด๋ MQ๋ก ์ถฉ๋ถํ๊ฒ ๋์ ๊ฐ๋ฅํ ๊ฒ ๊ฐ์
- Subscribe๋ ์ด๋ ์๋ฒ์์ ๋ฐ์ํด๋ ์๊ด์ด ์์, ๋ฐ๋ผ์ API๋ก ์งํ, Broadcast๋ MQ๋ก ์ ์ฉ
- ์ด๋ฒคํธ ๋ฐ์ ์ ์ด๋ฒคํธ ID Prefix๋ฅผ ํ์ธํ์ฌ ๊ตฌ๋ ์ค์ธ ๋ชจ๋ ์๋ฒ์ ๋งคํ๋๋ ๋ชจ๋ ์ด๋ฒคํธ๋ฅผ ๋์์ Broadcastํ๋๋ก ๊ตฌํ
- Fanout ๋ฐฉ์์ผ๋ก ๊ตฌํ ์งํ
- ํด๋น Exchange๋ฅผ ๊ตฌ๋ ํ๊ณ ์๋ ๋ชจ๋ ํ์ broadcast๋ฅผ ๋ ๋ ค prefix ํ์๋ฅผ ๋ชจ๋ broadcastํ๋ ๋ฐฉ์
๊ตฌํ
Batch
- ์ ๋ฌด Flow์ subscrie๊ฐ ๋์ด์๋ค๋ ๊ฐ์ ํ์, ๊ท์ ์ค์๊ฒ์ฌ Batch Logic ๋์ ์ค์ broadcast ์ํ
- broadcast ๋ฉ์๋ ํธ์ถ ์ EXCHANGE_SSE_BROADCAST์ผ๋ก ๋ฉ์์ง ๋ฐํ
- broadcast๋ API์ ๊ตฌํํด ๋์๊ธฐ ๋๋ฌธ์ Batch์์๋ Exchange ์ ์ธ๋ง ํ๋ฉด ๋จ
- ์์ ์ฝ๋
//Menu1SseBroadcastStrategy
@Override
public void sseBroadcast(String cmpnId, String subId, SseExecutionStatus status) {
SseBroadcastModel sseBroadcastModel = SseBroadcastModel.builder()
.cmpnId(cmpnId)
.menuName(getSseBroadcastMenu().getMenuName())
.subId(subId)
.status(status.getValue())
.build();
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_SSE_BROADCAST, "", sseBroadcastModel);
}
API
- ์ค์ broadcast ๋ฉ์์ง๋ฅผ ๋ฐ์ ์ฒ๋ฆฌํ๋ ๊ณณ
- ์์ ์ฝ๋
//SSEConsumer
@RabbitListener(queues = RabbitMqConfig.QUEUE_SSE_BROADCAST_PROCESS, concurrency = "${consumer.sse-broadcast}")
public void sseReceiveMessage(SseBroadcastModel message) {
sseService.broadcastToMappedEvent(message);
}
// SSEService
public void broadcastToMappedEvent(SseBroadcastModel message) {
String idPrefix = getEventIdPrefix(message.getCmpnId(), message.getMenuName());
emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(idPrefix))
.forEach(entry -> {
SseEmitter emitter = entry.getValue();
String eventId = entry.getKey();
sendEventToClient(emitter, eventId, message.getMenuName(), eventId + "_" + message.getSubId()+ "_" + message.getStatus());
});
}
private void sendEventToClient(SseEmitter emitter, String eventId, String menuName, String data) {
try {
emitter.send(
SseEmitter.event()
.id(eventId)
.name(menuName)
.data(data));
} catch (Exception e) {
emitter.completeWithError(e);
}
}
private String getEventIdPrefix(String cmpnId, String menuName) {
StringJoiner eventIdBuilder = new StringJoiner("_");
eventIdBuilder.add(cmpnId)
.add(menuName);
return eventIdBuilder.toString();
}
// RabbitMQConfig
...
public static final String EXCHANGE_SSE_BROADCAST = "fanout.sse.broadcast";
public static final String QUEUE_SSE_BROADCAST_PROCESS = "sse.broadcast.process";
...
@Bean
public FanoutExchange sseBroadcastFanoutExchange() {
return new FanoutExchange(EXCHANGE_SSE_BROADCAST);
}
@Bean
public Queue getSseBroadcastQueue() {
return new Queue(QUEUE_SSE_BROADCAST_PROCESS);
}
@Bean
public Binding bindingSseExchangeAndQueue(Queue getSseBroadcastQueue, FanoutExchange sseBroadcastFanoutExchange) {
return BindingBuilder.bind(getSseBroadcastQueue).to(sseBroadcastFanoutExchange);
}
...
Client
- API์์ broadcast๊ฐ ์ผ์ด๋๋ฉด watch์ sseEventData response ๋ณ๊ฒฝ์ด ๊ฐ์ง๋๊ณ ๊ทธ๋ ์กฐ๊ฑด์ ๋ฐ๋ผ ์ ๋ฐ์ดํธ ์งํ
- ์์ ์ฝ๋
// menu1.vue
...
sseResponseData(newVal) {
if (newVal.indexOf('_complete') !== -1) {
this.getGridList();
}
},
...
//menu1Detail.vue
...
sseResponseData(newVal) {
if (newVal.indexOf('_complete') !== -1 && newVal.includes(this.menu1DataId)) {
this.fetchDetail();
}
},
...
'๊ฐ๋ฐ์ํ > ์ด๊ฒ์ ๊ฒ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
์ด๋ฏธ ์์ฑ๋์ด ์๋ ๋์ปค ์ปจํ ์ด๋(mysql)์ ์ database ์ถ๊ฐ (0) | 2024.10.26 |
---|---|
SSE ์ ์ฉ (3) - SSE ์ ์ฉ ์ ์ด์ ์ฒ๋ฆฌ (0) | 2024.08.19 |
H2 ๋ฐ์ดํฐ ๋ฒ ์ด์ค ์ค์น ๋ฐ ์ด๊ธฐ ์ค์ ๋ฐฉ๋ฒ (0) | 2024.04.15 |
SSE ์ ์ฉ (1) - SSE๋ฅผ ์ ์ฉํด๋ณด์ ๊ฐ๋ (0) | 2024.04.12 |
ํ๋ก์ ํธ Tree ๊ตฌ์กฐ ์ถ๊ฐ (0) | 2023.10.15 |