프로그래밍 노트/kafka

[Kafka] Embedded Kafka Broker를 활용한 Kafka 로컬 테스트

깡냉쓰 2024. 12. 18. 17:32
728x90
반응형

Embedded Kafka 를 사용하게 된 이유

  • 외부 Kafka 서버에 의존하지 않고 안정적이고 독립적으로 테스트 하는 방법이 필요하였음
  • 타부서에서 발행하는 message를 consume하여 처리하기로 하였는데, 개발계에 카프카 서버가 셋팅되지 않음
    • 내가만든 @KafkaListener 가 정상 동작하는지 검증필요

상황

  • @KafkaListener를 통하여 consume & 비즈니스 로직 실행 코드는 완성된 상태
  • 일정상 개발계 kafka 셋팅이 완료되었을 때 테스트 지원이 힘든 상황이여 테스트 코드로 검증을 대신해야함
    • 역직렬화(deserializer)가 정상 동작 하는지
    • 여러 consumer configuration이 정상 동작 하는지

Embedded Kafka를 활용한 간단 테스트

참고) https://docs.spring.io/spring-kafka/reference/3.1/testing.html
참고) https://www.baeldung.com/spring-boot-kafka-testing

의존성 추가

testImplementation("org.springframework.kafka:spring-kafka-test")

@EmbeddedKafka설정 및 테스트 코드 작성

AccountDeletionListener 간단 설명 : 계정 탈회 Message가 발행되면, 계정에 등록된 카드들을 모두 삭제

AccountDeletionListenerTest.kt

@EmbeddedKafka 정의한 주소로 in-memory kafka가 실행되며 해당 kafka를 이용하여 produce/consume 테스트가 가능

@SpringBootTest(
    classes = [MockTestKafkaConfig::class, KafkaConfig::class], // 테스트에 필요한 bean들만 등록하기 위해 지정
)
@EmbeddedKafka(
    partitions = 1,
    brokerProperties = ["listeners=PLAINTEXT://\${kafka.bootstrap-servers}"], // spring properties 활용 가능
    kraft = false,
)
@DirtiesContext
class DeletionListenerTest(
  private val kafkaTemplate: KafkaTemplate<String, Any>,
  private val broker: EmbeddedKafkaBroker, // EmbeddedKafkaBroker 생성 후 주입된다.
  @Value("\${topics.account-deletion.topic}") private val topic: String
): FreeSpec() {

  override fun extensions(): List<Extension> = listOf(SpringExtension)

  @Autowired
  lateinit var cardLoadService: CardLoadService

  @Autowired
  lateinit var cardDeletionService: CardDeletionService

  @Autowired
  lateinit var retryService: CardRetryService

  val logger = KotlinLogging.logger {}

  init { 
    beforeSpec {
      // mocking 에 필요한 로직을 추가한다.
      // 생략
    }

    "메시지를 consume 하면, 등록된 카드를 모두 삭제한다. 만약 카드 삭제에 실패하면 retry 테이블에 적재한다." {
      // given
      val servers = broker.brokersAsString
      logger.debug { "Kafka Broker is running at : $servers " } // Kafka Broker is running at : 127.0.0.1:9092

      every { cardLoadService.selectList(any()) } answers {
        lisfOf(
          Card("key1", "사용자Seq"), // 삭제 성공할 카드
          Card("key2", "사용자Seq")  // 삭제 실패할 카드
        )
      }

      val deletionRequestCard = slot<Card>()
      every { deletionService.delete(capture(card)) } answers {
        if ("key1" == deletionRequestCard.captured.key) {
          throw IllegalStateException("카드 삭제 실패")
        }
        "사용자Seq"
      }

      // when
      val payload = DeletionPayload("사용자Seq", ....)
      kafkaTemplate.send(topic, payload)


      // then
      await().atMost(3, TimeUnit.SECONDS).untilAsserted {
        verify(exactly = 1) {
          retryService.saveDeletionRetryTarget(
            // 삭제 실패된 카드에 대한 retry 호출 검증
            SaveRequestDeletionRetryTarget(Card("key2", "사용자Seq"), "카드 삭제 실패")
          )
        }
      }
      verify(exactly = 2) { cardDeletionService.delete(any()) } // 카드 삭제 요청 2번 호출
    }
  }
}

KafkaConfig.kt

KafkaListener 설정(Consumer 관련)

@Configuration
@EnableKafka
class KafkaConfig {
  @Bean
  fun kafkaListenerContainerFactory(
    @Value("\${kafka.bootstrap-servers}") bootStrapServers: String,
    kafkaConsumerProperties: KafkaConsumerProperties // consumer 설정(생략 함)
  ): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Any>> =
    ConcurrentKafkaListenerContainerFactory<String, Any>().apply {
      consumerFactory =
        DefaultKafkaConsumerFactory(
            nPayInternal2kafkaConsumerProperties.toConfigMap(bootStrapServers),
            StringDeserializer(),
            JsonDeserializer(Any::class.java).apply {
                addTrustedPackages("*")
                ignoreTypeHeaders()
            },
        )
      containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE // Acknowledgement.acknowledge() 호출시 즉시 커밋
    }
}

MockTestKafkaConfig.kt

Message Produce, Mocking에 필요한 bean 정의

@Configuration
class MockTestKafkaConfig {

  @Bean
  fun producerFactory(
    @Value("\${kafka.bootstrap-servers}") bootStrapServers: String,
  ): ProducerFactory<String, Any> = 
    DefaultKafkaProducerFactory(
      mapOf(
        ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootStrapServers,
        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
        ProducerConfig.ACKS_CONFIG to "all",
        ProducerConfig.LINGER_MS_CONFIG to 0, // 메시지 전송하기 전 기다리는 시간
      ),
    )

  @Bean
  fun kafkaTemplate(factory: ProducerFactory<String, Any>): KafkaTemplate<String, Any> =
      KafkaTemplate(factory).apply {
        setProducerListener(produceListener())
      }

  ...

  @Bean
  fun cardLoadService(): CardLoadService = mockk()

  ...
}

트러블 슈팅

EmbeddedKafka 실행시 지정된 port로 실행이 안된다?

@EmbeddedKafka 설정 시, borkerProperties를 설정하면 해당 port로 borker가 실행되어야 하는데 random한 포트로 실행이 된다.
KRaft mode시에는 port 지정이 안된다고 하여 kraft = false 옵션을 추가하여 broker를 실행했다.

  • EmbeddedKafkaKraftBroker -> random port
  • EmbeddedKafkaZKBroker -> port 지정 가능

관련 이슈 : https://github.com/spring-projects/spring-kafka/issues/2936
kraft란 무엇인가? : https://devocean.sk.com/blog/techBoardDetail.do?ID=165711&boardType=techBlog

test코드를 실행했는데, consume이 되지 않고 테스트가 종료된다?

메시지 consume이 되기전, 테스트 코드가 끝나는 현상이 있어 delay를 시키는 로직이 필요했다.
awaitility 라이브러리를 사용하면 비동기 코드를 테스트할 수 있다.

await().atMost(3, TimeUnit.SECONDS).untilAsserted { // 최대 3초 동안 해당 로직이 실행됬는지 검증
    // listener 처리 대기
    verify(exactly = 1) {
        mockRetryService.saveDeletionRetryTarget(
            // 삭제 실패된 카드에 대한 retry 저장
            SaveRequestDeletionRetryTarget(
                deletionFailureCard,
                "카드 삭제 실패 ${deletionFailureCard.key}",
            ),
        )
    }
    verify(exactly = 2) { mockDeletionService.delete(any()) } // 카드 삭제 요청 2번 호출
}

최대 대기 시간(atMost)를 지정하고 polling을 통해서 untilAsserted 에 지정한 코드가 호출됬는지 검증이 가능하다.
내부적으로 polling 시간을 지정할 수 있는 것으로 보인다.
awaitility : http://www.awaitility.org/

728x90
반응형