已复制
全屏展示
复制代码

Kafka 管理-生产-消费命令行工具


· 2 min read

本文将总结 Kafka 中的管理 topic、生产消息、消费消息的常用命令行。

一. 启动停止 Kafka

  • 启动

nohup ./bin/kafka-server-start.sh config/server.properties &

# 或者

./bin/kafka-server-start.sh -daemon config/server.properties
  • 停止

./bin/kafka-server-start.sh config/server.properties

# 或者

ps aux | grep "kafka\.Kafka" | awk '{print $2}' | xargs kill

二. Kafka常用命令

2.1 topic 管理

# 创建 blogTopic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic blogTopic

# 删除 topic
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic blogTopic

# 查看 topic 列表
./bin/kafka-topics.sh --zookeeper localhost:2181 --list

# 查看 topic 详情
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic blogTopic

# 修改 topic 分区数,增加分区数为10个,修改后的分区数为10
./bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic blogTopic --partitions 10

# 修改 topic 保留时间,修改为 86400000 毫秒(24小时)
./bin/kafka-topics.sh --zookeeper server1:2181 --alter --topic flowai_0509 --config retention.ms=86400000 --config cleanup.policy=delete

$ 查看某分区偏移量(最大值、最小值):time为-1时表示最大值,time为-2时表示最小值。
./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic blogTopic --partitions 0 --time -1



2.2 生产消息

# 生产消息(交互式命令行),每一行是一条消息。
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic blogTopic
>add a blog
>add another blog


# 生产消息(命令行读取文件),每一行是一条消息。
cat /etc/profile | ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic blogTopic

2.3 消费消息

# 消费指定的 topic。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic blogTopic

# 消费指定的 topic,查看时间戳、offset等属性。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic blogTopic \
  --property print.timestamp=true \
  --property print.offset=true \
  --property print.key=true \
  --property print.headers=true

# 消费从指定的 offset 开始,必须指定分区号
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic blogTopic --offset 36 --partition 0


# 消费组:通过组消费,blogGroup 会被自动创建,重新启动时从上次消费的位置开始
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic blogTopic --group blogGroup

# 消费组:查看消费组列表
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

# 消费组:查看组详情,包括消费进度、积压数量等
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group blogGroup


# 只读取已经提交的消息
--isolation-level read_committed
🔗

文章推荐