Embedded Kafka 를 사용하게 된 이유
- 외부 Kafka 서버에 의존하지 않고 안정적이고 독립적으로 테스트 하는 방법이 필요하였음
- 타부서에서 발행하는 message를 consume하여 처리하기로 하였는데, 개발계에 카프카 서버가 셋팅되지 않음
- 내가만든 @KafkaListener 가 정상 동작하는지 검증필요
상황
- @KafkaListener를 통하여 consume & 비즈니스 로직 실행 코드는 완성된 상태
- 일정상 개발계 kafka 셋팅이 완료되었을 때 테스트 지원이 힘든 상황이여 테스트 코드로 검증을 대신해야함
- 역직렬화(deserializer)가 정상 동작 하는지
- 여러 consumer configuration이 정상 동작 하는지
Embedded Kafka를 활용한 간단 테스트
참고) https://docs.spring.io/spring-kafka/reference/3.1/testing.html
참고) https://www.baeldung.com/spring-boot-kafka-testing
의존성 추가
testImplementation("org.springframework.kafka:spring-kafka-test")
@EmbeddedKafka설정 및 테스트 코드 작성
AccountDeletionListener 간단 설명 : 계정 탈회 Message가 발행되면, 계정에 등록된 카드들을 모두 삭제
AccountDeletionListenerTest.kt
@EmbeddedKafka 정의한 주소로 in-memory kafka가 실행되며 해당 kafka를 이용하여 produce/consume 테스트가 가능
@SpringBootTest(
classes = [MockTestKafkaConfig::class, KafkaConfig::class], // 테스트에 필요한 bean들만 등록하기 위해 지정
)
@EmbeddedKafka(
partitions = 1,
brokerProperties = ["listeners=PLAINTEXT://\${kafka.bootstrap-servers}"], // spring properties 활용 가능
kraft = false,
)
@DirtiesContext
class DeletionListenerTest(
private val kafkaTemplate: KafkaTemplate<String, Any>,
private val broker: EmbeddedKafkaBroker, // EmbeddedKafkaBroker 생성 후 주입된다.
@Value("\${topics.account-deletion.topic}") private val topic: String
): FreeSpec() {
override fun extensions(): List<Extension> = listOf(SpringExtension)
@Autowired
lateinit var cardLoadService: CardLoadService
@Autowired
lateinit var cardDeletionService: CardDeletionService
@Autowired
lateinit var retryService: CardRetryService
val logger = KotlinLogging.logger {}
init {
beforeSpec {
// mocking 에 필요한 로직을 추가한다.
// 생략
}
"메시지를 consume 하면, 등록된 카드를 모두 삭제한다. 만약 카드 삭제에 실패하면 retry 테이블에 적재한다." {
// given
val servers = broker.brokersAsString
logger.debug { "Kafka Broker is running at : $servers " } // Kafka Broker is running at : 127.0.0.1:9092
every { cardLoadService.selectList(any()) } answers {
lisfOf(
Card("key1", "사용자Seq"), // 삭제 성공할 카드
Card("key2", "사용자Seq") // 삭제 실패할 카드
)
}
val deletionRequestCard = slot<Card>()
every { deletionService.delete(capture(card)) } answers {
if ("key1" == deletionRequestCard.captured.key) {
throw IllegalStateException("카드 삭제 실패")
}
"사용자Seq"
}
// when
val payload = DeletionPayload("사용자Seq", ....)
kafkaTemplate.send(topic, payload)
// then
await().atMost(3, TimeUnit.SECONDS).untilAsserted {
verify(exactly = 1) {
retryService.saveDeletionRetryTarget(
// 삭제 실패된 카드에 대한 retry 호출 검증
SaveRequestDeletionRetryTarget(Card("key2", "사용자Seq"), "카드 삭제 실패")
)
}
}
verify(exactly = 2) { cardDeletionService.delete(any()) } // 카드 삭제 요청 2번 호출
}
}
}
KafkaConfig.kt
KafkaListener 설정(Consumer 관련)
@Configuration
@EnableKafka
class KafkaConfig {
@Bean
fun kafkaListenerContainerFactory(
@Value("\${kafka.bootstrap-servers}") bootStrapServers: String,
kafkaConsumerProperties: KafkaConsumerProperties // consumer 설정(생략 함)
): KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, Any>> =
ConcurrentKafkaListenerContainerFactory<String, Any>().apply {
consumerFactory =
DefaultKafkaConsumerFactory(
nPayInternal2kafkaConsumerProperties.toConfigMap(bootStrapServers),
StringDeserializer(),
JsonDeserializer(Any::class.java).apply {
addTrustedPackages("*")
ignoreTypeHeaders()
},
)
containerProperties.ackMode = ContainerProperties.AckMode.MANUAL_IMMEDIATE // Acknowledgement.acknowledge() 호출시 즉시 커밋
}
}
MockTestKafkaConfig.kt
Message Produce, Mocking에 필요한 bean 정의
@Configuration
class MockTestKafkaConfig {
@Bean
fun producerFactory(
@Value("\${kafka.bootstrap-servers}") bootStrapServers: String,
): ProducerFactory<String, Any> =
DefaultKafkaProducerFactory(
mapOf(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG to bootStrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG to StringSerializer::class.java,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG to JsonSerializer::class.java,
ProducerConfig.ACKS_CONFIG to "all",
ProducerConfig.LINGER_MS_CONFIG to 0, // 메시지 전송하기 전 기다리는 시간
),
)
@Bean
fun kafkaTemplate(factory: ProducerFactory<String, Any>): KafkaTemplate<String, Any> =
KafkaTemplate(factory).apply {
setProducerListener(produceListener())
}
...
@Bean
fun cardLoadService(): CardLoadService = mockk()
...
}
트러블 슈팅
EmbeddedKafka 실행시 지정된 port로 실행이 안된다?
@EmbeddedKafka 설정 시, borkerProperties를 설정하면 해당 port로 borker가 실행되어야 하는데 random한 포트로 실행이 된다.
KRaft mode시에는 port 지정이 안된다고 하여 kraft = false
옵션을 추가하여 broker를 실행했다.
- EmbeddedKafkaKraftBroker -> random port
- EmbeddedKafkaZKBroker -> port 지정 가능
관련 이슈 : https://github.com/spring-projects/spring-kafka/issues/2936
kraft란 무엇인가? : https://devocean.sk.com/blog/techBoardDetail.do?ID=165711&boardType=techBlog
test코드를 실행했는데, consume이 되지 않고 테스트가 종료된다?
메시지 consume이 되기전, 테스트 코드가 끝나는 현상이 있어 delay를 시키는 로직이 필요했다.
awaitility 라이브러리를 사용하면 비동기 코드를 테스트할 수 있다.
await().atMost(3, TimeUnit.SECONDS).untilAsserted { // 최대 3초 동안 해당 로직이 실행됬는지 검증
// listener 처리 대기
verify(exactly = 1) {
mockRetryService.saveDeletionRetryTarget(
// 삭제 실패된 카드에 대한 retry 저장
SaveRequestDeletionRetryTarget(
deletionFailureCard,
"카드 삭제 실패 ${deletionFailureCard.key}",
),
)
}
verify(exactly = 2) { mockDeletionService.delete(any()) } // 카드 삭제 요청 2번 호출
}
최대 대기 시간(atMost)를 지정하고 polling을 통해서 untilAsserted 에 지정한 코드가 호출됬는지 검증이 가능하다.
내부적으로 polling 시간을 지정할 수 있는 것으로 보인다.
awaitility : http://www.awaitility.org/
'프로그래밍 노트 > kafka' 카테고리의 다른 글
[kafka] 카프카 프로듀서 주요 옵션 및 전송방법 (0) | 2021.08.09 |
---|---|
[kafka] 콘솔, 자바를 이용하여 프로듀서(producer) 구현 (0) | 2021.07.16 |
[kafka] Docker 사용하여 카프카 클러스터 구성하기 (1) | 2021.07.16 |
[kafka] 카프카 설치 및 실행 (환경설정) (0) | 2021.07.10 |
[kafka] 주키퍼(zookeeper) 설치 및 실행 (환경설정) (0) | 2021.06.30 |