今天组里新来的实习生在编写和测试一个流处理程序, 问了一个 Kafka 问题,
程序大概是这样的: Consumer 会把 Kafka 队列的消息持久化并 commit offset.
实习生童鞋反馈有时 KafkaConsumer.poll() 会重复拉取消息,
直觉告诉我这应该是 offset commit 没有管理好的原因,
不过还是看了 poll() API 源码 ...
poll() API 源码
首先从 org.apache.kafka.clients.consumer.KafkaConsumer.poll()
方法开始, 里边会调用 pollOnce() :
public ConsumerRecords<K, V> poll(long timeout) { |
pollOnce() 中 :
coordinator.poll(time.milliseconds())
会向 the designated broker (the coordinator)
发送请求, 加入 group, 获取 topic 分区对应的消费 position .- 接着 subscriptions.hasAllFetchPositions() 中会检查是否获取了
所有的消费 positions, 如果没有就去 fetch 和 update. - 然后 fetcher.fetchedRecords() 获取消息记录 records, 如果是空的,
那么 fetcher.sendFetches() 继续发送 Fetch 请求新的 records.
subscriptions.hasAllFetchPositions()
其中, subscriptions.hasAllFetchPositions() 如下 :
SubscriptionState.java :
|
this.assignment.stateValue(tp)
可得 topic 分区的 TopicPartitionState
.
this.assignment 的类型是
PartitionStates<TopicPartitionState>
,
是在coordinator.poll(time.milliseconds())
阶段 join group 成功后,
存放由 coordinator assign 的分区信息.
得到的 TopicPartitionState
这个类记录了 position
和 committed
两个值,
对应最后消费的位置 和 最后 committed 的位置:
private static class TopicPartitionState { |
fetcher.sendFetches()
回到 pollOnce() 中的 fetcher.sendFetches()
, 这里将进行
fetch request 的构建, 用来向服务端获取数据:
Fetcher.java
:
public int sendFetches() { |
构建 FetchRequest 时, 请求参数主要有 partition, position 和 fetchSize.
其中 position 来自 this.subscriptions.position(partition)
, 如下:
SubscriptionState.java :
public Long position(TopicPartition tp) { |
这里 this.assignment.stateValue(tp) 获取的 TopicPartitionState,
就是前面我们提到的实例, FetchRequest 使用的也是这个 position
值.
并且 TopicPartitionState.position 的值被修改的地方,
除了 position(TopicPartition, long)
之外, 只有 seek(long offset)
这个地方.
private void seek(long offset) { |
可以查找得知 TopicPartitionState.position 值在哪些地方被更新:
如上所示的 SubscriptionState.position(TopicPartition, long)
的被调用链,
在 fetcher.fetchedRecords()
源码中可以发现在返回 records 给客户端时, 会更新 TopicPartitionState.position
,
而 fetcher.fetchedRecords()
这个方法也仅被 KafkaConsumer.poll()
调用.
所以, 从源码的调用关系来看, 一个 consumer instance 的多次 poll() 是不会重复拉取的,
因为 position
变量值在 poll() 时就会更新, client 初次调用 poll() 后,
多次调用是不会重复消费消息, 重复消费只会在第一次时出现.
重复消费的原因
重复消费的问题是 Kafka 新人经常碰到的, 这个主要跟 consumer commit 的处理有关.
比如在 consumer 客户端未能成功把 offset commit 到服务端(比如被kill/崩溃/不优雅的重启等等),
那么新启的 consumer 客户端是可能重复消费的.
管理 offset
在 offset 提交上, KafkaConsumer
支持自动提交 offset,
构建 KafkaConsumer 时设置 Properties :
enable.auto.commit
设置为 true 启用.auto.commit.interval.ms
可以设置自动提交的频率(单位毫秒).
当然, 更好的是手动管理 offset 的 commit, 分别有同步和异步的方法:
commitSync(Map<TopicPartition,OffsetAndMetadata>)
commitAsync(OffsetCommitCallback)
要处理好重复消费的问题, 是需要根据业务情况和要求来决定的,
比如是否需要 exactly-once
语义, 还需要考虑和权衡对吞吐能力的影响.
比如加入自定义事务, 可以实现 exactly-once
, 但会降低程序的吞吐量.
当然也可以在 Kafka 之外, 自主管理消息的 offset 和 状态, 自主选择拉取不同 offset 范围的消息,
来满足更严格的业务场景和需求. (比如 seek(TopicPartition partition, long offset)
等 API 支持)
从 Kafka Streams (v0.11 开始), 也有 Transactional API 支持了, 相信将来会有更多相应的功能支持.