카테고리 없음

[kafka] shell command 명령어 모음

왓츠뉴 whatsnew 2025. 3. 10. 10:04
반응형

 

 

 

 shell에서 kafka 명령어로 자주쓰이는 command를 정리했습니다.

 

 

📌 카프카 사용전 준비단계

카프카 설치 위치로 이동

cd /svc/app/kafka_2.13-2.7.0/bin/

📌 서비스 실행/중지

✳️ zookeeper 실행

cd ${kafka_home}/bin
./zookeeper-server-start.sh ../config/zookeeper.properties &

✳️ kafka 실행

cd ${kafka_home}/bin
./kafka-server-start.sh ../config/server.properties &

✳️ process 확인

ps -ef |grep zook
ps -ef |grep kafka

✳️ kafka 중지

cd ${kafka_home}/bin
./kafka-server-stop.sh

✳️ zookeeper 중지

cd ${kafka_home}/bin
./zookeeper-server-stop.sh

📌 토픽 관련 커맨드

✳️ 토픽 생성

-- topic make
./kafka-topics.sh --create --zookeeper 10.101.101.211:2181 --replication-factor 1 --partitions 1 --topic test.topic
./kafka-topics.sh --create --zookeeper 10.101.101.211:2181 --replication-factor 1 --partitions 3 --topic test.topic3
./kafka-topics.sh --create --zookeeper 10.101.101.211:2181 --replication-factor 1 --partitions 3 --topic TOPICNAME1

✳️ 토픽 리스트 확인

./kafka-topics.sh --list --zookeeper 10.101.101.211:2181

✳️ 특정 토픽 메세지 produce

./kafka-console-producer.sh --broker-list 10.101.101.211:9092 --topic TOPICNAME1

✳️ 특정 토픽 메세지 consume

./kafka-console-consumer.sh --bootstrap-server 10.101.101.211:9092 --topic TOPICNAME1 --from-beginning

✳️ 특정 토픽 특정 파티션 메세지 consume

./kafka-console-consumer.sh --bootstrap-server 10.101.101.211:9092 --topic TOPICNAME1 --partition 1 --from-beginning

📌 kafka 확인 커맨드

✳️ 특정 카프카그룹 lag 확인 - 1회성

/svc/app/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.101.102.215:9092 --group TOPICGROUP1  --describe;

✳️ 특정 카프카그룹 lag 확인 - 2초 단위 반복

while true; do /svc/app/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.101.102.215:9092 --group TOPICGROUP1  --describe; sleep 2; done

📌 kafka consume 커맨드

✳️ 특정 토픽에서 메시지 소비(consumer)

💡카프가 lag이 소비되지않고 auto commit되지 않을때, consumer로 특정 토픽을 지정하여 소비하는 기능.

lag이 이유없이 쌓일때 사용하면 강제 소비할 수 있습니다.

./kafka-console-consumer.sh --bootstrap-server 10.101.102.215:9092 --topic TOPICNAME1 --group TOPICGROUP1 --from-beginning

✳️ consumer 실행

/svc/app/kafka_2.13-2.7.0/bin/kafka-consumer-groups.sh --bootstrap-server 10.101.102.215

✳️ consumer 리셋

💡earliest로 consume이 소비되고 있을때, lag이 제대로 소비되지 않는 경우가 있습니다. latest로 소비될 수 있도록 컨슈머를 리셋하는 명령어입니다.

./kafka-consumer-groups.sh --bootstrap-server 10.101.102.215:9092 --group TOPICGROUP1 --reset-offsets --to-latest --execute

✳️ consumer 실행 중 설정 확인

./kafka-consumer-groups.sh --bootstrap-server 10.101.102.215:9092 --group TOPICGROUP1 --describe --verbose

📌 kafka produce 커맨드

✳️ kafka 메세지 - json 검증

💡 kafka 메세지가 json 규격에 맞는지 확인합니다.

echo -n '{"header":{"from":"value","ti":"value","txId":"value"},"data":{"msg":{"o":"n","e":[{"n":"value","sv":"value","ti":"value"},{"n":"value","sv":"value","ti":"value"}]},"subdata":{"request_txid":"value","vdevice_id":"value","data":{"period_type":"type1"}},"device_id":"value"}}' | jq .

✳️ kafka 메세지 발송

echo -n '{"header":{"from":"value","ti":"value","txId":"value"},"data":{"msg":{"o":"n","e":[{"n":"value","sv":"value","ti":"value"},{"n":"value","sv":"value","ti":"value"}]},"subdata":{"request_txid":"value","vdevice_id":"value","data":{"period_type":"type1"}},"device_id":"value"}}' | ./kafka-console-producer.sh --broker-list 10.101.102.215:9092 --topic TOPICNAME1

📌 kafka group 삭제 재생성

/svc/app/kafka/bin/kafka-consumer-groups.sh --bootstrap-server  10.101.102.215:9092 --delete --group PIGGY2-STAT-GRP
/svc/app/kafka/bin/kafka-console-consumer.sh --bootstrap-server 10.101.102.215:9092 \
  --topic PIGGY2-STAT-TN --group PIGGY2-STAT-GRP

그룹내 소비자그룹이 잘못생성되는경우, 단일삭제는 안되고 전체 그룹을 삭제해야합니다.

전체 그룹 삭제 시도시에,  consumer 프로세스가 실행되어 있는경우 삭제가안되는데요. 아래와같은 오류메세지가 나오면

herit@mqserver bin]$ ./kafka-consumer-groups.sh --bootstrap-server 10.101.102.215:9092 --delete --group PIGGY2-STAT-GRP

Error: Deletion of some consumer groups failed:
* Group 'PIGGY2-STAT-GRP' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.

프로세스 서버와 토픽을 확인하고,  (아래에서는 TOPIC-PIGGY2-STAT-TN이 실행중)

[herit@mqserver bin]$ ./kafka-consumer-groups.sh --bootstrap-server 10.101.102.215:9092   --describe --group PIGGY2-STAT-GRP

GROUP           TOPIC              PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                     HOST            CLIENT-ID
PIGGY2-STAT-GRP PIGGY2-STAT-TN     0          3091            3091            0               consumer-PIGGY2-STAT-GRP-1-35dada30-1c07-45c9-9d9f-268959c1d20f /10.101.101.95  consumer-PIGGY2-STAT-GRP-1
PIGGY2-STAT-GRP SOURCE-PIGGY-ZE-TN 0          724             724             0               -                                                               -               -

 해당 HOST에 접속해서 프로세스를 종료시켜준 후 다시 그룹을 삭제시도하면 됩니다.

./run.sh stop

 

728x90
반응형