티스토리 뷰

Spring

outbox patten 구현해보기- (2)

hoony__93 2023. 7. 17. 15:49
728x90
반응형

Overview

 

outbox patten 구현해보기- (1)

이벤트기반 환경에서 DB상태변경과 함께 이벤트를 발생시켜야하는 경우가 있습니다. 예를들어 쇼핑몰에서 주문, 상품, 알림 서비스로 나누어져 있다고 생각했을 때, 주문이 완료되면 상품 재고

hoony-devblog.tistory.com

지난 포스팅에 이어서 outbox pattern중 Polling publisher pattern을 구현해 보겠습니다.

사실 그림처럼 구현난이도는 어렵지 않습니다.

다음 순서대로 구현 해 보았습니다.

  1. 주문이 들어올 때 한 트랜잭션 안에서 order 테이블, outbox 테이블을 insert한다.
  2. outbox 테이블을 polling한다.
  3. outbox 테이블의 payload를 aws sqs로 전송한다.

order 저장

@Service
class OrderService(
  private val orderRepository: OrderRepository,
  private val outBoxRepository: OutboxRepository,
) : Log {

  @Transactional
  fun order(orderReqDto: OrderReqDto): OrderResDto {
    val order = OrderEntity(
      sku = orderReqDto.sku,
      quantity = orderReqDto.quantity,
      buyer = orderReqDto.buyer,
    ).let {
      orderRepository.save(it)
    }

    OutboxEntity(
      topic = "orderQueue",
      payload = ObjectMapper().writeValueAsString(order),
    ).let {
      outBoxRepository.save(it)
    }
    return order.toS()
  }

  fun OrderEntity.toS() = OrderResDto(
    id = this.id,
    sku = this.sku,
    quantity = this.quantity,
    buyer = this.buyer,
  )
  
}

order table과 outbox table을 한 트랜잭션에서 저장하도록 하였습니다.

Outbox table polling

@Component
class OutboxService(
  private val sqsService: SqsService,
  private val outboxRepository: OutboxRepository

) : Log {

  @PersistenceUnit
  private lateinit var entityManagerFactory: EntityManagerFactory

  @Bean
  fun myIntegrationFlow(): IntegrationFlow {
    return IntegrationFlows.from(
      MessageSource { outboxRepository.findAll().let { if (it.isEmpty()) return@let null else GenericMessage<List<OutboxEntity>>(it) } }
    ) { o ->
      o.poller(
        PollerMetadata()
          .apply { trigger = PeriodicTrigger(1, TimeUnit.SECONDS) }
      )
    }.handle { message ->
      logger.info("message: $message")

      (message.payload as List<*>).forEach {
        it as OutboxEntity
        val entityManager = entityManagerFactory.createEntityManager()
        entityManager.transaction.begin()
        sqsService.sendSqsMessage(it.topic, it.payload)
        outboxRepository.delete(it)
        entityManager.transaction.commit()
      }
    }.get()
  }
}

Outbox table polling은 Spring integration을 이용해서 Polling하도록 구현해 보았고, 1초마다 Polling하도록 구현 해 보았습니다.

SQS send

@Component
class SqsService(
  private val sqsClient: SqsClient
): Log {

  fun sendSqsMessage(queueName: String, message: Any) {

    val getQueueRequest = GetQueueUrlRequest.builder()
      .queueName(queueName)
      .build()

    val queueUrl: String = sqsClient.getQueueUrl(getQueueRequest).queueUrl()

    val sendMsgRequest = SendMessageRequest.builder()
      .queueUrl(queueUrl)
      .messageBody(ObjectMapper().writeValueAsString(message))
      .delaySeconds(1)
      .build()

    sqsClient.sendMessage(sendMsgRequest)
  }

  fun getSqsMessage(queueName: String) : List<Message> {
    val getQueueRequest = GetQueueUrlRequest.builder()
      .queueName(queueName)
      .build()

    val queueUrl: String = sqsClient.getQueueUrl(getQueueRequest).queueUrl()

    val receiveMessageRequest = ReceiveMessageRequest.builder()
      .queueUrl(queueUrl)
      .maxNumberOfMessages(5)
      .build()

    return sqsClient.receiveMessage(receiveMessageRequest).messages()

  }
}

SQS로 전송하는 부분은 중요하지 않지만 이렇게 구현해 보았습니다.

 

모든 코드는 github에 있습니다.

https://github.com/hoon7566/blog-code/tree/main/spring-boot-outbox

 

결론

MSA에서는 서로 서비스간 결합을 느슨하게 하기 위해서 타 시스템에 대한 의존과 영향을 줄이고 시스템에 목적에 맞게 집중하기 위해 Event Driven 구조를 많이들 채택합니다. 그래서 각 서비스에서 Event를 발생시킬때 안전한 구조에 대해서 공부해 보다가 outobx에 대해서 공부 해보았는데, 데이터 일관성이 맞는 서비스를 개발하기 위해서 좋은 공부였던것 같습니다.

또한, outbox pattern을 이용하여 메시지 전송 뿐만이 아니라 API전송이 필요 할 때도 적용하여 비즈니스 요건에 따른 API전송에 필요한 payload를 outbox table에 넣고 polling 하여 전송하도록 구현하면 API전송을 최소 한번 전송이 가능한 서비스를 구현이 가능해보입니다.

 

728x90
반응형
250x250
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함