已复制
全屏展示
复制代码

Kafka 使用 Java 多线程消费数据

· 1 min read

下面的示例中,使用多线程消费数据Kafka数据,并手动提交 offset

手动提交的的好处是不会丢失数据,如果程序崩溃,会重复消费部分数据,所以你的程序针对相同的数据可以幂等地运行。

package org.example;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class KafkaConsumerExample {

    public static int NUM_THREADS = 5;
    private static final String BROKERS = "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094";
    private static final String TOPIC = "t1";

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
        List<Future> futures = new ArrayList<>();

        Properties props = new Properties();
        props.put("bootstrap.servers", BROKERS);
        props.put("group.id", "KafkaConsumerExample_13");
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("max.poll.records", "1000");
        props.put("auto.offset.reset", "latest"); // 表示在还没有提交过任何 offset 的时候,从什么位置开始消费数据
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        for (int i = 0; i < NUM_THREADS; i++) {
            futures.add(executor.submit(() -> {
                int commitSyncByTimes = 0;
                long threadId = Thread.currentThread().getId();
                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
                consumer.subscribe(Collections.singletonList(TOPIC));
                try {
                    while (true) {
                        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                        if (records.count() > 0) {
                            commitSyncByTimes++;
                            for (ConsumerRecord<String, String> record : records) {
                                String partition_id = String.valueOf(record.partition());
                                String partition_offset = String.valueOf(record.offset());
                                String value = record.value();
                                System.out.println("threadId: " + threadId
                                        + " partition_id: " + partition_id
                                        + " partition_offset: " + partition_offset
                                        + " value: " + value);
                            }
                            if (commitSyncByTimes % 10 == 0) {
                                consumer.commitSync();
                                commitSyncByTimes = 0;
                            }
                        }
                    }
                } finally {
                    consumer.close();
                }
            }));
        }

        try {
            for (Future future : futures) {
                future.get();
            }
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            executor.shutdown();
        }

    }

}
🔗

文章推荐