티스토리 뷰
Overview
지난 포스팅에 이어서 outbox pattern중 Polling publisher pattern을 구현해 보겠습니다.
사실 그림처럼 구현난이도는 어렵지 않습니다.
다음 순서대로 구현 해 보았습니다.
- 주문이 들어올 때 한 트랜잭션 안에서 order 테이블, outbox 테이블을 insert한다.
- outbox 테이블을 polling한다.
- outbox 테이블의 payload를 aws sqs로 전송한다.
order 저장
@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outBoxRepository: OutboxRepository,
) : Log {
@Transactional
fun order(orderReqDto: OrderReqDto): OrderResDto {
val order = OrderEntity(
sku = orderReqDto.sku,
quantity = orderReqDto.quantity,
buyer = orderReqDto.buyer,
).let {
orderRepository.save(it)
}
OutboxEntity(
topic = "orderQueue",
payload = ObjectMapper().writeValueAsString(order),
).let {
outBoxRepository.save(it)
}
return order.toS()
}
fun OrderEntity.toS() = OrderResDto(
id = this.id,
sku = this.sku,
quantity = this.quantity,
buyer = this.buyer,
)
}
order table과 outbox table을 한 트랜잭션에서 저장하도록 하였습니다.
Outbox table polling
@Component
class OutboxService(
private val sqsService: SqsService,
private val outboxRepository: OutboxRepository
) : Log {
@PersistenceUnit
private lateinit var entityManagerFactory: EntityManagerFactory
@Bean
fun myIntegrationFlow(): IntegrationFlow {
return IntegrationFlows.from(
MessageSource { outboxRepository.findAll().let { if (it.isEmpty()) return@let null else GenericMessage<List<OutboxEntity>>(it) } }
) { o ->
o.poller(
PollerMetadata()
.apply { trigger = PeriodicTrigger(1, TimeUnit.SECONDS) }
)
}.handle { message ->
logger.info("message: $message")
(message.payload as List<*>).forEach {
it as OutboxEntity
val entityManager = entityManagerFactory.createEntityManager()
entityManager.transaction.begin()
sqsService.sendSqsMessage(it.topic, it.payload)
outboxRepository.delete(it)
entityManager.transaction.commit()
}
}.get()
}
}
Outbox table polling은 Spring integration을 이용해서 Polling하도록 구현해 보았고, 1초마다 Polling하도록 구현 해 보았습니다.
SQS send
@Component
class SqsService(
private val sqsClient: SqsClient
): Log {
fun sendSqsMessage(queueName: String, message: Any) {
val getQueueRequest = GetQueueUrlRequest.builder()
.queueName(queueName)
.build()
val queueUrl: String = sqsClient.getQueueUrl(getQueueRequest).queueUrl()
val sendMsgRequest = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(ObjectMapper().writeValueAsString(message))
.delaySeconds(1)
.build()
sqsClient.sendMessage(sendMsgRequest)
}
fun getSqsMessage(queueName: String) : List<Message> {
val getQueueRequest = GetQueueUrlRequest.builder()
.queueName(queueName)
.build()
val queueUrl: String = sqsClient.getQueueUrl(getQueueRequest).queueUrl()
val receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(5)
.build()
return sqsClient.receiveMessage(receiveMessageRequest).messages()
}
}
SQS로 전송하는 부분은 중요하지 않지만 이렇게 구현해 보았습니다.
모든 코드는 github에 있습니다.
https://github.com/hoon7566/blog-code/tree/main/spring-boot-outbox
결론
MSA에서는 서로 서비스간 결합을 느슨하게 하기 위해서 타 시스템에 대한 의존과 영향을 줄이고 시스템에 목적에 맞게 집중하기 위해 Event Driven 구조를 많이들 채택합니다. 그래서 각 서비스에서 Event를 발생시킬때 안전한 구조에 대해서 공부해 보다가 outobx에 대해서 공부 해보았는데, 데이터 일관성이 맞는 서비스를 개발하기 위해서 좋은 공부였던것 같습니다.
또한, outbox pattern을 이용하여 메시지 전송 뿐만이 아니라 API전송이 필요 할 때도 적용하여 비즈니스 요건에 따른 API전송에 필요한 payload를 outbox table에 넣고 polling 하여 전송하도록 구현하면 API전송을 최소 한번 전송이 가능한 서비스를 구현이 가능해보입니다.
'Spring' 카테고리의 다른 글
관습적인 추상화 service와 serviceImpl (2) | 2023.09.05 |
---|---|
outbox patten 구현해보기- (1) (0) | 2023.07.16 |
[Spring] Jcache에 cache eventListener 추가하기 (0) | 2023.07.12 |
[Spring] Cache적용하여 속도 개선하기 (0) | 2023.07.10 |
[Spring boot] resourse 실시간 반영하기. (1) | 2021.05.28 |
- Total
- Today
- Yesterday
- 삼성기출
- 제네릭 타입
- 반례
- 제네릭(Generic)
- 문자열 압축
- 프로그래머스
- 삼성 코테
- 날짜 유효성
- 오버로딩
- 청소년상어
- 삼각달팽이
- DP
- vaild
- 카카오 인턴십
- RGB거리
- local cache
- javascript
- 1629
- for of
- yyyy-MM-dd
- 19236
- spring cache
- 키패드 누르기
- 카카오 코딩 테스트
- java
- 01타일
- 가장 큰 수
- 백준
- 39회차
- 커링
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |