Kafka 生产者与消费者 JAVA API
本文总结 Kafka 生产者与消费者的 JAVA API 示例。
一. Kafka常见配置
1.1 pom依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
1.2 序列化与反序列化
序列化配置
- byte[]:org.apache.kafka.common.serialization.ByteArraySerializer
- ByteBuffer: org.apache.kafka.common.serialization.ByteBufferSerializer
- Interger: org.apache.kafka.common.serialization.IntegerSerializer
- Short:org.apache.kafka.common.serialization.ShortSerializer
- Long:org.apache.kafka.common.serialization.LongSerializer
- Double:org.apache.kafka.common.serialization.DoubleSerializer
- String: org.apache.kafka.common.serialization.StringSerializer
反序列化配置
- org.apache.kafka.common.serialization.ByteArrayDeserializer
- org.apache.kafka.common.serialization.ByteBufferDeserializer
- org.apache.kafka.common.serialization.IntegerDeserializer
- org.apache.kafka.common.serialization.ShortDeserializer
- org.apache.kafka.common.serialization.LongDeserializer
- org.apache.kafka.common.serialization.DoubleDeserializer
- org.apache.kafka.common.serialization.StringDeserializer
1.3 ProducerRecord参数
- 下面是kafka消息记录完整参数与说明。
/**
* Creates a record with a specified timestamp to be sent to a specified topic and partition
*
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param timestamp The timestamp of the record, if null, default is System.currentTimeMillis().
* @param key The key that will be included in the record
* @param value The record contents
* @param headers the headers that will be included in the record
*/
public ProducerRecord(String topic,
Integer partition,
Long timestamp,
K key,
V value,
Iterable<Header> headers) {
if (topic == null)
throw new IllegalArgumentException("Topic cannot be null.");
if (timestamp != null && timestamp < 0)
throw new IllegalArgumentException(
String.format("Invalid timestamp: %d.", timestamp));
if (partition != null && partition < 0)
throw new IllegalArgumentException(
String.format("Invalid partition: %d.", partition));
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = new RecordHeaders(headers);
}
二. 生产者
- 创建测试kafka
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic blogTopic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 3 --partitions 3 --topic blogTopic
# 查看topic详情
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic blogTopic
完整代码示例
- ProducerRecord 消息记录可以简化参数,最简单的可以只有两个参数:topic 和 value,其他的都是用默认值。
- 注意:kafka消息的timestamp如果是自定义的话,不能晚于当前7天,因为晚于7天的消息会被自动删除。
package org.example;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
public class CustomProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092,node4:9092");
props.put("acks", "-1");
props.put("retries", 0);
props.put("batch.size", 10000);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
String topic = "blogTopic";
int topicPartitions = 3;
long startTimestamp = System.currentTimeMillis();
List<Header> headers = new ArrayList<>() {{
add(new Header() {
@Override
public String key() {
return "testHeaderKey1";
}
@Override
public byte[] value() {
return "testHeaderValue1".getBytes();
}
});
}};
// 1000 条消息写入一次
int cnt=0;
for (int userId = 0; userId < 10000; userId++) {
int partition = userId % topicPartitions;
long timestamp = startTimestamp + userId;
String key = null;
String value = getRandomString(50);
producer.send(new ProducerRecord<String, String>(topic, partition, timestamp, key, value, headers));
cnt ++;
if (cnt == 1000) {
producer.flush();
}
}
producer.flush();
producer.close();
}
public static String getRandomString(int length) {
String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
Random random = new Random();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < length; i++) {
int number = random.nextInt(62);
sb.append(str.charAt(number));
}
return sb.toString();
}
}
- 查看生产的消息
# 消费指定的 topic,查看时间戳、offset等属性。
./bin/kafka-console-consumer.sh --bootstrap-server node1:9092 \
--from-beginning \
--topic blogTopic \
--property print.timestamp=true \
--property print.offset=true \
--property print.key=true \
--property print.headers=true
# 消费指定分区号
./bin/kafka-console-consumer.sh \
--bootstrap-server node1:9092 \
--topic blogTopic --offset 0 --partition 0
三. 消费者
- 注意: auto.offset.reset 表示在没有提交过任何 offset 的时候,从什么位置开始消费数据
package org.example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
public class CustomConsumer {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092,node4:9092");
props.put("group.id", "CUSTOM_GROUP_ID3");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", "1000");
props.put("auto.offset.reset", "earliest"); // 表示在没有提交过任何 offset的时候,从什么位置开始消费数据
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
String topic = "blogTopic";
consumer.subscribe(Collections.singletonList(topic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
System.out.println("records = " + records.count());
if (records != null && records.count() > 0) {
for (ConsumerRecord<String, String> record : records) {
String topicName = record.topic();
int partition = record.partition();
long timestamp = record.timestamp();
long offset = record.offset();
String key = record.key();
String value = record.value();
Map<String, String> headersMap = new HashMap<>();
Iterator<Header> headers = record.headers().iterator();
while (headers.hasNext()) {
Header next = headers.next();
headersMap.put(next.key(), new String(next.value()));
}
String output = "topicName:%s,partition:%s,timestamp:%s,offset:%s,key:%s,value:%s,headers:%s";
output = String.format(output, topicName, partition, timestamp, offset, key, value, headersMap);
System.out.println(output);
}
}
Thread.sleep(50);
}
}
}