下面的示例中,使用多线程消费数据Kafka数据,并手动提交 offset
手动提交的的好处是不会丢失数据,如果程序崩溃,会重复消费部分数据,所以你的程序针对相同的数据可以幂等地运行。
package org.example;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class KafkaConsumerExample {
public static int NUM_THREADS = 5;
private static final String BROKERS = "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094";
private static final String TOPIC = "t1";
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
List<Future> futures = new ArrayList<>();
Properties props = new Properties();
props.put("bootstrap.servers", BROKERS);
props.put("group.id", "KafkaConsumerExample_13");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("max.poll.records", "1000");
props.put("auto.offset.reset", "latest"); // 表示在还没有提交过任何 offset 的时候,从什么位置开始消费数据
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
for (int i = 0; i < NUM_THREADS; i++) {
futures.add(executor.submit(() -> {
int commitSyncByTimes = 0;
long threadId = Thread.currentThread().getId();
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Collections.singletonList(TOPIC));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
if (records.count() > 0) {
commitSyncByTimes++;
for (ConsumerRecord<String, String> record : records) {
String partition_id = String.valueOf(record.partition());
String partition_offset = String.valueOf(record.offset());
String value = record.value();
System.out.println("threadId: " + threadId
+ " partition_id: " + partition_id
+ " partition_offset: " + partition_offset
+ " value: " + value);
}
if (commitSyncByTimes % 10 == 0) {
consumer.commitSync();
commitSyncByTimes = 0;
}
}
}
} finally {
consumer.close();
}
}));
}
try {
for (Future future : futures) {
future.get();
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
executor.shutdown();
}
}
}