已复制
全屏展示
复制代码

Kafka 事务与 ExactlyOnce 语义


· 8 min read

一. Producer幂等性发送

在0.11.0.0之前版本中有两个问题:

  • Broker保存消息后,发送ACK前宕机,Producer认为消息未发送成功并重试,造成数据重复
  • 前一条消息发送失败,后一条消息发送成功,前一条消息重试后成功,造成数据乱序

为了实现Producer的幂等语义,Kafka引入了Producer ID(即PID)和Sequence Number。每个新的Producer在初始化的时候会被分配一个唯一的PID,该PID对用户完全透明而不会暴露给用户。对于每个PID,该Producer发送数据的每个<Topic, Partition>都对应一个从0开始单调递增的Sequence Number。

类似地,Broker端也会为每个<PID, Topic, Partition>维护一个序号,并且每次Commit一条消息时将其对应序号递增。对于接收的每条消息,如果其序号比Broker维护的序号(即最后一次Commit的消息的序号)大1,则Broker会接受它,否则将其丢弃:

  • 如果消息序号比Broker维护的序号大1以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
  • 如果消息序号小于或等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber

二. Producer事务性保证

为了实现原子操作,即事务性操作,应用程序必须提供一个稳定的(重启后不变)唯一的ID,也即Transaction ID。Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。

另外,为了保证新的Producer启动后,旧的具有相同Transaction ID的Producer立即失效,每次Producer通过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。由于旧的Producer的epoch比新Producer的epoch小,Kafka可以很容易识别出该Producer是老的Producer并拒绝其请求。

2.1 事务性消息传递

Producer将多条消息作为一个事务批量发送,要么全部成功要么全部失败。为了实现这一点,Kafka 0.11.0.0引入了一个服务器端的模块,名为Transaction Coordinator,用于管理Producer发送的消息的事务性。

该Transaction Coordinator维护Transaction Log,该log存于一个内部的Topic内。由于Topic数据具有持久性,因此事务的状态也具有持久性。Producer并不直接读写Transaction Log,它与Transaction Coordinator通信,然后由Transaction Coordinator将该事务的状态插入相应的Transaction Log。Transaction Log的设计与Offset Log用于保存Consumer的Offset类似。

2.2 用于事务特性的控制型消息

为了区分写入Partition的消息被Commit还是Abort,Kafka引入了一种特殊类型的消息,即Control Message。该类消息的Value内不包含任何应用相关的数据,并且不会暴露给应用程序。它只用于Broker与Client间的内部通信。

对于Producer端事务,Kafka以Control Message的形式引入一系列的Transaction Marker。Consumer即可通过该标记判定对应的消息被Commit了还是Abort了。

2.3 事务处理样例代码

  • kafka client依赖
<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.8.0</version>
</dependency>
  • 注意:这里消费者消费的时候,要指定 --isolation-level read_committed,只有提交的消息才消费。默认是 read_uncommitted
# 创建 blogTopic
./bin/kafka-topics.sh --zookeeper localhost:2181 --create --replication-factor 3 --partitions 3 --topic blogTopic


# 消费指定的 topic。
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic blogTopic --isolation-level read_committed


# 删除 topic
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic blogTopic

  • main方法
package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class Main {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092,node4:9092");
        props.put("acks", "-1");
        props.put("enable.idempotence", "true");

        // 事务超时设置,默认是 60000
        props.put("transaction.timeout.ms", "60000");

        // 不同的 Producer 使用不同的 transactional.id
        props.put("transactional.id", "test_transactional_id2");

        props.put("retries", 3);
        props.put("batch.size", 10000);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

        // 初始化事务,包括结束该Transaction ID对应的未完成的事务(如果有)
        producer.initTransactions();

        // 开始事务
        producer.beginTransaction();
        try {
            for (int i = 0; i < 20; i++) {
                // 发送数据
                producer.send(new ProducerRecord<String, String>("blogTopic", "value" + i));
                Thread.sleep(1000);
                System.out.println("i = " + i);
                if (i == 15) {
                    throw new Exception();
                }
            }

            // 数据发送及Offset发送均成功的情况下,提交事务
            producer.commitTransaction();
        } catch (Exception e) {
            // 数据发送出现异常时,终止事务
            producer.abortTransaction();
            System.out.println("transaction aborted!");
        } finally {
            // 关闭Producer
            producer.close();
            System.out.println("producer closed!");
        }

    }

}

三. 完整事务过程

找到Transaction Coordinator

  • 由于Transaction Coordinator是分配PID和管理事务的核心,因此Producer要做的第一件事情就是通过向任意一个Broker发送FindCoordinator请求找到Transaction Coordinator的位置。
  • 注意:只有应用程序为Producer配置了Transaction ID时才可使用事务特性,也才需要这一步。另外,由于事务性要求Producer开启幂等特性,因此通过将transactional.id设置为非空从而开启事务特性的同时也需要通过将enable.idempotence设置为true来开启幂等特性。

获取PID

  • 找到Transaction Coordinator后,具有幂等特性的Producer必须发起InitPidRequest请求以获取PID。注意:只要开启了幂等特性即必须执行该操作,而无须考虑该Producer是否开启了事务特性。

如果事务特性被开启

  • InitPidRequest会发送给Transaction Coordinator。如果Transaction Coordinator是第一次收到包含有该Transaction ID的InitPidRequest请求,它将会把该<TransactionID, PID>存入Transaction Log,如图中步骤2.1所示。这样可保证该对应关系被持久化,从而保证即使Transaction Coordinator宕机该对应关系也不会丢失。
  • 除了返回PID外,InitPidRequest还会执行如下两个任务:
  • 任务1:增加该PID对应的epoch。具有相同PID但epoch小于该epoch的其它Producer(如果有)新开启的事务将被拒绝。
  • 任务2:恢复(Commit或Abort)之前的Producer未完成的事务(如果有)。
  • 注意:InitPidRequest的处理过程是同步阻塞的。一旦该调用正确返回,Producer即可开始新的事务。
  • 另外,如果事务特性未开启,InitPidRequest可发送至任意Broker,并且会得到一个全新的唯一的PID。该Producer将只能使用幂等特性以及单一Session内的事务特性,而不能使用跨Session的事务特性。

开启事务

  • Kafka从0.11.0.0版本开始,提供beginTransaction()方法用于开启一个事务。调用该方法后,Producer本地会记录已经开启了事务,但Transaction Coordinator只有在Producer发送第一条消息后才认为事务已经开启。

Consume-Transform-Produce

  • 这一阶段,包含了整个事务的数据处理过程,并且包含了多种请求。

Commit或Abort事务

  • 一旦上述数据写入操作完成,应用程序必须调用KafkaProducer的commitTransaction方法或者abortTransaction方法以结束当前事务。

四. 事务过期机制

事务超时

  • Broker端参数

max.transaction.timeout.ms,默认是900000 (15 min)

  • Producer端参数

transaction.timeout.ms,默认是60000(60秒),不能超过max.transaction.timeout.ms 的值。

终止过期事务

当Producer失败时,Transaction Coordinator必须能够主动的让某些进行中的事务过期。否则没有Producer的参与,Transaction Coordinator无法判断这些事务应该如何处理

终止Transaction ID

某 Transaction ID 的 Producer 可能很长时间不再发送数据,Transaction Coordinator没必要再保存该Transaction ID与PID等的映射,否则可能会造成大量的资源浪费。因此需要有一个机制探测不再活跃的Transaction ID并将其信息删除。

Transaction Coordinator会周期性遍历内存中的 Transaction ID 与PID映射,如果某Transaction ID没有对应的正在进行中的事务并且它对应的最后一个事务的结束时间与当前时间差大于transactional.id.expiration.ms(默认值是7天),则将其从内存中删除并在Transaction Log中将其对应的日志的值设置为null从而使得Log Compact可将其记录删除。

  • 参考资料

http://www.jasongj.com/kafka/transaction/

🔗

文章推荐