[@EmbeddedKafka] Kafka 테스트 라이브러리 | produce, consume 테스트 작성
로컬 환경에서 Kafka를 테스트하기 위해
Kafka를 설치하고, 설정하는 과정은 매우 복잡합니다.
그래서, 로컬환경에서 간단하게 kafka produce, consume을 해 볼 수 있는 라이브러리로 테스터를 구현해보겠습니다.
📌 Kafka Test 의존성추가
SpringBoot에서 제공하는 Kafka Test를 의존성 추가합니다. (maven 기준)
version은 java 버전에따라 자동으로 설정되니, 오류가 없으면 버전은 따로 명시하지 않는 것이 안전합니다.
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Kafka Test 의존성 (테스트 전용) -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
📌 application-xxx.yml 설정파일 작성
spring, kafka 설정이 담긴 SpringBoot 설정파일을 작성합니다.
보안상의 내용은 x로 표시합니다. 구성정도만 참고해주세요.
server:
port: 9092 # kafka에 쓰이는 기본포트와 동일하게 맞춤
spring:
application:
name: 프로젝트명
profiles:
- test
mysql:
datasource:
jdbc-url : dbUrl
driver-class-name: org.mariadb.jdbc.Driver
username: db아이디
password: db비번
type: com.zaxxer.hikari.HikariDataSource
pool-name: test-dbcp-mysql
maximumPoolSize: 10
mvc:
throw-exception-if-no-handler-found: true
cache:
jcache:
config: classpath:ehcache-test.xml
kafka:
listener:
concurrency: 1
consumer:
bootstrap-servers: localhost:9092
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
topic: TOKEN1
group-id: GROUP-STAT
producer:
bootstrap-servers: localhost:9092
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
topic:
id-data: TOKEN1 # 실제(TOKEN2) PRODUCER, CONSUMER의 토큰이 다르나, KAFKA 발신수신 테스트를 위해 동일토큰사용
nid-data: TOKEN1 # 실제(TOKEN3) PRODUCE 될 곳이 달라서 TOPIC을 리스트로 작성하였어요.
mybatis:
mapper-locations:
- classpath*:mapper/*.xml
- classpath:/mapper/**/*.xml
logging:
call:
multiline: true
except-query-log: true
config: file:./config/logback-spring-local.xml
abnormal:
max-limit-ratio: 8 # cbl에 ratio를 곱한 게 최대 상한값. 수신된 전기(발전, 소비) 데이터가 이 값보다 크면 0으로 보정
thread-pool:
core-size: 50 # consume한 메시지를 처리할 스레드 개수(최소값)
max-size: 200 # consume한 메시지를 처리할 스레드 개수(최대값)
queue-capacity: 100000 # core-size 만큼 스레드가 모두 사용될 때 이 후에 들어오는 작업을 대기시킬 큐 개수(큐까지 모두 차면 그 때 max-size 만큼 스레드가 증가
run-delay-ms: 8 # 각각의 메시지처리 스레드가 실행되기 전 대기시간
use: "on" # on:멀티스레드로 메시지 처리 사용, off:카프카 리스너로만 메시지 처리
thread-pool-db: # db처리할 스레드풀
core-size: 4 # db처리할 스레드 개수(최소값)
max-size: 4 # db처리할 스레드 개수(최대값)(이 값은 db-process.max-size 가 maria-dbpool 보다 작아야 함)
queue-capacity: 1000 # core-size 만큼 스레드가 모두 사용될 때 이 후에 들어오는 작업을 대기시킬 큐 개수(큐까지 모두 차면 그 때 max-size 만큼 스레드가 증가
run-delay-ms: 8 # 각각의 메시지처리 스레드가 실행되기 전 대기시간
use: "off" # on:멀티스레드로 메시지 처리 사용, off:단일스레드로 메시지 처리
quartz:
threadpool-monitor-job-property:
enable: true
schedule: "0/1 * * * * ?"
device-filter: # 통계를 처리할 디바이스 목록
use : "off"
devices:
- v-1
- v-2
- v-3
management: # prometheus, grafana 연동을 위한 설정
endpoint:
gateway:
enable: true
endpoints:
web:
base-path: "/a/b/c/d"
exposure:
include: "*"
metrics:
tags:
application: ${spring.application.name}
📌 Kafka Config 파일 추가
SpringBoot 실행시 @Configuration 어노테이션으로 자동으로 설정될 수 있도록 Kafka 설정파일을 작성해줍시다.
연결되는 bootstrap-servers 주소는 테스트와, 운영 시 연결되는 kafka 서버가 달라서 properties를 주입받아 사용했습니다.
@Configuration
public class KafkaConfig {
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootstrapServers;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(2);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfig());
}
@Bean
public Map<String, Object> consumerConfig() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
}
📌 Kafka Consumer 작성
메세지 수신용 kafka 서비스를 작성합니다.
@Service
@RequiredArgsConstructor
@EnableKafka
@Slf4j
public class KafkaConsumer {
@Autowired
private CallLogger call;
@Autowired
private AmiDataService amiDataService;
@Autowired
@Qualifier("ami-consume-executor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Value("${thread-pool.run-delay-ms}")
private int runDelay;
@Value("${thread-pool.use}")
private String useThreadPool;
// 테스트용 변수-메서드 : 테스트를 위해서 꼭 추가가 필요한 부분입니다.
private CountDownLatch latch = new CountDownLatch(1); // 메시지 소비 확인용
private List<String> receivedMessages = new ArrayList<>(); // 수신된 메시지
private boolean lastMessageProcessedSuccessfully = false; // 마지막 메시지 성공 여부
public void resetLatch(int count) {
this.latch = new CountDownLatch(count);
}
public CountDownLatch getLatch() {
return latch;
}
public List<String> getReceivedMessages() {
return receivedMessages;
}
public boolean isLastMessageProcessedSuccessfully() {
return lastMessageProcessedSuccessfully;
}
// Kafka 메세지를 수신하려면 꼭 작성이 필요한부분
@KafkaListener(
topics = "${spring.kafka.consumer.topic}",
groupId = "${spring.kafka.consumer.group-id}",
concurrency = "${spring.kafka.listener.concurrency}"
)
public void consume(String message) throws Exception {
call.info("useThreadPool:{}, message consumed. {}", useThreadPool, message);
receivedMessages.add(message); // 수신된 메시지 리스트에 추가.
// latch.countDown(); // 메시지 수신 대기 해제 (테스트용).
if("on".equals(useThreadPool)) {
try {
TimeUnit.MILLISECONDS.sleep(runDelay);
threadPoolTaskExecutor.execute(() -> { amiDataService.processAmidata(message);});
call.info("threadPool coreSize:{}, activeCounts:{}, maxSize:{}, delay-ms:{}",
threadPoolTaskExecutor.getCorePoolSize(),
threadPoolTaskExecutor.getActiveCount(),
threadPoolTaskExecutor.getMaxPoolSize(),
runDelay);
} catch (InterruptedException e) {
lastMessageProcessedSuccessfully = false; // 실패로 표시
log.info("KafkaConsumer failed with error: " + e.getMessage());
throw new RuntimeException(e);
}
finally {
// 이 부분을 누락하면 테스트시 consume 소비를 확인하는
// assertThat(messageConsumed).isTrue(); 단계에서 에러가 발생합니다.
latch.countDown(); // 메시지 처리 완료
}
}else {
amiDataService.processAmidata(message);
// 이 부분을 누락하면 테스트시 consume 소비를 확인하는
// assertThat(messageConsumed).isTrue(); 단계에서 에러가 발생합니다.
latch.countDown(); // 메시지 처리 완료
}
}
}
📌 Kafka Produce 작성
메세지 발송용 kafka 서비스를 작성합니다.
이따 테스트 코드를 작성할 때, 발송할 메세지 내용에 따라 토픽을 구분하여 발송할 예정이라
ID_TOPIC, NID_TOPIC으로 구분하여 topic 아이디를 properties로 빼왔습니다.
@Service
@EnableKafka
public class KafkaProducer {
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Autowired
CallLogger call;
@Autowired
JsonUtil jsonUtil;
@Value("${spring.kafka.producer.topic.id-topic}")
private String ID_TOPIC; // 프로젝트로 전송되어오는 consume을 구분하여 발송 (case1)
@Value("${spring.kafka.producer.topic.nid-topic}")
private String NID_TOPIC; // 프로젝트로 전송되어오는 consume을 구분하여 발송 (case2)
// ID-TOPIC
public void sendPiggy2Stat(KafkaMessage kafkaMessage, String key) throws CommonException {
sendMessage(ID_TOPIC, key, kafkaMessage);
}
public void sendMessage(String topic, String key, String message) {
call.info("send kafka message. topic:{}, key:{}, message:{}", topic, key, message);
// Kafka 메시지를 비동기 방식으로 전송
final ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(topic, key, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
@SneakyThrows
@Override
public void onFailure(Throwable ex) {
call.error("fail sending kafka.", ex);
throw new CommonException(HttpResponseStatus.KAFKA_SEND_ERROR);
}
@Override
public void onSuccess(SendResult<String, String> result) {
call.info("success sending kafka. {}", result.getProducerRecord().toString());
call.debug("success sending kafka. {}", result);
}
});
}
public void sendMessage(String topic, String key, KafkaMessage kafkaMessage) throws CommonException {
sendMessage(topic, key, jsonUtil.objectToJson(kafkaMessage));
}
// NID-TOPIC
@Autowired
public KafkaProducer(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
// NID-TOPIC
public void sendMessage(String message) {
call.info("Produce message : {}", message);
this.kafkaTemplate.send(NID_TOPIC, message);
}
}
📌 @EmbeddedKafka로 카프카 유닛테스트 작성
이제 드디어 테스트코드를 작성해보겠습니다. 설정파일들과 KafkaProducer, KafkaConsumer를 이용할 것이고,
@EmbeddedKafka와 @SpringBootTest @KafkaListener 어노테이션이 필수입니다.
@Slf4j
@ActiveProfiles("test") //application-test.yml 설정을 사용하겠다는 의미
@SpringBootTest // (필수)
@EmbeddedKafka(partitions = 1, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"}) //application.yml과 동일한 bootstrap-servers 작성
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@EnableKafka // (필수)Kafka를 이용하겠다
class KafkaProduceConsumeTest {
@Autowired
private KafkaProducer kafkaProducer;
@Autowired
private KafkaConsumer kafkaConsumer;
@Autowired
private AmiDataService amiDataService; // 서비스 디버그를 찍어보기 위함. 필수아님
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Test
@KafkaListener(
topics = "${spring.kafka.consumer.topic}",
groupId = "${spring.kafka.consumer.group-id}",
concurrency = "${spring.kafka.listener.concurrency}"
) // (필수)
void testProduceAndConsumeWithAmiDataService() throws Exception {
// Given: Kafka 토픽, 키, 메시지 정의
// producer와 consumer가 동일한 토픽을 사용
String topic = "TOPIC1"; //application.yml에 작성한 spring.kafka.producer.topic
// 현재 시간을 밀리초 단위로 조회 (요청메세지용)
long timestamp = Instant.now().toEpochMilli();
String timestampStr = String.valueOf(timestamp); // %s에 들어갈 현재시간
// 회원 세대 데이터
// 1. "HERIT-HG2000E-202106291145 / v-e6a0f303de494e27ab8be55446932212 / 300,310,320,330,340,302,312,322,332,342 / 101101
String key = "kafka key로 사용될 String";
final String testMessage = String.format(" {\"header\":{\"from\":\"TEST-SERVICE\",\"ti\":\"%s\",\"txId\":\"XXX.%s\"}," +
"\"data\":{\"msg\":{\"o\":\"n\",\"e\":[{\"n\":\"/data1/data2/data3\",\"sv\":\"123.456\",\"ti\":\"%s\"}," +
"{\"n\":\"/data1/data2/data3\",\"sv\":\"12.45\",\"ti\":\"%s\"}," +
"{\"n\":\"/data1/data2/data3\",\"sv\":\"56.78\",\"ti\":\"%s\"}]}," +
"\"subdata\":{\"request_txid\":\"XXX\"," +
"\"vdevice_id\":\"XXX\"}," +
"\"device_id\":\XXX\"}}",
timestampStr, timestampStr, timestampStr, timestampStr, timestampStr, timestampStr, timestampStr, timestampStr, timestampStr, timestampStr, timestampStr, timestampStr);
// KafkaConsumer 상태 초기화
kafkaConsumer.resetLatch(1);
// When: KafkaProducer로 메시지 전송
kafkaProducer.sendMessage(topic, key, testMessage);
// Then: latch 기다리기 (소비 대기)
try {
boolean messageConsumed = kafkaConsumer.getLatch().await(10, TimeUnit.SECONDS); // 메시지 수신 대기
assertThat(messageConsumed).isTrue(); // 메시지 소비 확인
// Consumer의 `receivedMessages` 리스트에 메시지가 포함되었는지 확인
List<String> consumedMessages = kafkaConsumer.getReceivedMessages();
assertThat(consumedMessages).isNotEmpty();
assertThat(consumedMessages).contains(testMessage);
// 1-1. Service 호출 및 처리에 대한 검증
// `Service`에 전달된 메시지 기반으로 데이터베이스가 업데이트되었는지 확인
// 예: DB 내용을 repository 또는 직접 query를 통해 검증
Thread.sleep(100000); // KafkaConsumer에서 비동기 처리가 완료되도록 대기
// 1-2. 메시지가 성공적으로 처리되었는지 확인
amiDataService.processAmidata(testMessage); // 처리 성공 여부 검증 - 디버그 접근용
// 예측 데이터베이스 상태 또는 로그를 검증하여 간접적으로 서비스가 실행되었는지 확인
log.info("Service 처리 완료.");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
📌 @EmbeddedKafka 주의사항
제대로 된 동작을 수행하는 유닛테스트를 작성하는데 약 1달 가량이 걸렸는데요.
이유는, EmbeddedKafka 테스트자체가 로컬단에서 유닛테스트 용도 정도여서
같은 @Test 코드내에 produce와 consume이 같이 이루어져야 메세지 수신이 가능합니다.
(타 프로젝트에서 EmbeddedKafka로 발송한 kafka 메세지는 Consume으로 받는게 불가능해요)
그리고 Kafka 메세지 자체가 워낙 디버그 찍어보기도 어렵기 때문에
테스트결과가 성공으로 나오더라도 못잡은 부분이 있을 수 있기때문에
Kafka 성공시 실제 데이터가 db로 입력된다던지, 테스트 결과를 명확하게 파악할 수 있는 장치가 필요합니다.
저는 DB에 데이터를 적재하는 방법으로 기존 프로젝트 로직이 짜여져있어서
데이터 구성으로 Kafka 테스트 성공여부를 쉽게 파악할 수 있었어요.
추가적으로 궁금하신 부분이나 코드작성방법이 필요하신 분은 댓글로 남겨주세요.