Kafka 管理-生产-消费命令行工具
本文将总结 Kafka 中的管理 topic、生产消息、消费消息的常用命令行。
一. 启动停止 Kafka
- 启动
nohup ./bin/kafka-server-start.sh config/server.properties &
# 或者
./bin/kafka-server-start.sh -daemon config/server.properties
- 停止
./bin/kafka-server-start.sh config/server.properties
# 或者
ps aux | grep "kafka\.Kafka" | awk '{print $2}' | xargs kill
二. Kafka常用命令
2.1 topic 管理
# 创建 blogTopic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic blogTopic
# 删除 topic
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic blogTopic
# 查看 topic 列表
./bin/kafka-topics.sh --zookeeper localhost:2181 --list
# 查看 topic 详情
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic blogTopic
# 修改 topic 分区数,增加分区数为10个,修改后的分区数为10
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic blogTopic --partitions 10
# 修改 topic 保留时间,修改为 86400000 毫秒(24小时)
./bin/kafka-topics.sh --zookeeper server1:2181 --alter --topic flowai_0509 --config retention.ms=86400000 --config cleanup.policy=delete
$ 查看某分区偏移量(最大值、最小值):time为-1时表示最大值,time为-2时表示最小值。
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic blogTopic --partitions 0 --time -1
2.2 生产消息
# 生产消息(交互式命令行),每一行是一条消息。
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic blogTopic
>add a blog
>add another blog
# 生产消息(命令行读取文件),每一行是一条消息。
cat /etc/profile | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic blogTopic
2.3 消费消息
# 消费指定的 topic。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic blogTopic
# 消费指定的 topic,查看时间戳、offset等属性。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic blogTopic \
--property print.timestamp=true \
--property print.offset=true \
--property print.key=true \
--property print.headers=true
# 消费从指定的 offset 开始,必须指定分区号
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic blogTopic --offset 36 --partition 0
# 消费组:通过组消费,blogGroup 会被自动创建,重新启动时从上次消费的位置开始
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic blogTopic --group blogGroup
# 消费组:查看消费组列表
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
# 消费组:查看组详情,包括消费进度、积压数量等
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group blogGroup
# 只读取已经提交的消息
--isolation-level read_committed