已复制
全屏展示
复制代码

Kafka 生产者与消费者 JAVA API


· 4 min read

本文总结 Kafka 生产者与消费者的 JAVA API 示例。

一. Kafka常见配置

1.1 pom依赖

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.8.0</version>
</dependency>

1.2 序列化与反序列化

序列化配置
  • byte[]:org.apache.kafka.common.serialization.ByteArraySerializer
  • ByteBuffer: org.apache.kafka.common.serialization.ByteBufferSerializer
  • Interger: org.apache.kafka.common.serialization.IntegerSerializer
  • Short:org.apache.kafka.common.serialization.ShortSerializer
  • Long:org.apache.kafka.common.serialization.LongSerializer
  • Double:org.apache.kafka.common.serialization.DoubleSerializer
  • String: org.apache.kafka.common.serialization.StringSerializer
反序列化配置
  • org.apache.kafka.common.serialization.ByteArrayDeserializer
  • org.apache.kafka.common.serialization.ByteBufferDeserializer
  • org.apache.kafka.common.serialization.IntegerDeserializer
  • org.apache.kafka.common.serialization.ShortDeserializer
  • org.apache.kafka.common.serialization.LongDeserializer
  • org.apache.kafka.common.serialization.DoubleDeserializer
  • org.apache.kafka.common.serialization.StringDeserializer

1.3 ProducerRecord参数

  • 下面是kafka消息记录完整参数与说明。
/**
* Creates a record with a specified timestamp to be sent to a specified topic and partition
* 
* @param topic The topic the record will be appended to
* @param partition The partition to which the record should be sent
* @param timestamp The timestamp of the record, if null, default is System.currentTimeMillis().
* @param key The key that will be included in the record
* @param value The record contents
* @param headers the headers that will be included in the record
*/
public ProducerRecord(String topic, 
                      Integer partition, 
                      Long timestamp, 
                      K key, 
                      V value, 
                      Iterable<Header> headers) {
    if (topic == null)
        throw new IllegalArgumentException("Topic cannot be null.");
    if (timestamp != null && timestamp < 0)
        throw new IllegalArgumentException(
        String.format("Invalid timestamp: %d.", timestamp));
    if (partition != null && partition < 0)
        throw new IllegalArgumentException(
        String.format("Invalid partition: %d.", partition));
    this.topic = topic;
    this.partition = partition;
    this.key = key;
    this.value = value;
    this.timestamp = timestamp;
    this.headers = new RecordHeaders(headers);
}

二. 生产者

  • 创建测试kafka
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic blogTopic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 3 --partitions 3 --topic blogTopic

# 查看topic详情
./bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic blogTopic
完整代码示例
  • ProducerRecord 消息记录可以简化参数,最简单的可以只有两个参数:topic 和 value,其他的都是用默认值。
  • 注意:kafka消息的timestamp如果是自定义的话,不能晚于当前7天,因为晚于7天的消息会被自动删除。
package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Random;

public class CustomProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092,node4:9092");
        props.put("acks", "-1");
        props.put("retries", 0);
        props.put("batch.size", 10000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        String topic = "blogTopic";
        int topicPartitions = 3;
        long startTimestamp = System.currentTimeMillis();
        List<Header> headers = new ArrayList<>() {{
            add(new Header() {
                @Override
                public String key() {
                    return "testHeaderKey1";
                }
                @Override
                public byte[] value() {
                    return "testHeaderValue1".getBytes();
                }
            });
        }};

        // 1000 条消息写入一次
        int cnt=0;

        for (int userId = 0; userId < 10000; userId++) {
            int partition = userId % topicPartitions;
            long timestamp = startTimestamp + userId;
            String key = null;
            String value = getRandomString(50);
            producer.send(new ProducerRecord<String, String>(topic, partition, timestamp, key, value, headers));
            cnt ++;
            if (cnt == 1000) {
                producer.flush();
            }
        }
        producer.flush();
        producer.close();
    }


    public static String getRandomString(int length) {
        String str = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
        Random random = new Random();
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < length; i++) {
            int number = random.nextInt(62);
            sb.append(str.charAt(number));
        }
        return sb.toString();
    }

}
  • 查看生产的消息
# 消费指定的 topic,查看时间戳、offset等属性。
./bin/kafka-console-consumer.sh --bootstrap-server node1:9092 \
  --from-beginning \
  --topic blogTopic \
  --property print.timestamp=true \
  --property print.offset=true \
  --property print.key=true \
  --property print.headers=true
  
  
# 消费指定分区号
./bin/kafka-console-consumer.sh \
  --bootstrap-server node1:9092 \
  --topic blogTopic --offset 0 --partition 0

三. 消费者

  • 注意:  auto.offset.reset   表示在没有提交过任何 offset 的时候,从什么位置开始消费数据
package org.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092,node4:9092");
        props.put("group.id", "CUSTOM_GROUP_ID3");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("max.poll.records", "1000");
        props.put("auto.offset.reset", "earliest"); // 表示在没有提交过任何 offset的时候,从什么位置开始消费数据
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        String topic = "blogTopic";
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            System.out.println("records = " + records.count());
            if (records != null && records.count() > 0) {
                for (ConsumerRecord<String, String> record : records) {
                    String topicName = record.topic();
                    int partition = record.partition();
                    long timestamp = record.timestamp();
                    long offset = record.offset();
                    String key = record.key();
                    String value = record.value();

                    Map<String, String> headersMap = new HashMap<>();
                    Iterator<Header> headers = record.headers().iterator();
                    while (headers.hasNext()) {
                        Header next = headers.next();
                        headersMap.put(next.key(), new String(next.value()));
                    }

                    String output = "topicName:%s,partition:%s,timestamp:%s,offset:%s,key:%s,value:%s,headers:%s";
                    output = String.format(output, topicName, partition, timestamp, offset, key, value, headersMap);
                    System.out.println(output);
                }
            }
            Thread.sleep(50);
        }

    }
}
🔗

文章推荐