Kafka | Consumer 的 commit offset 处理

今天组里新来的实习生在编写和测试一个流处理程序, 问了一个 Kafka 问题,
程序大概是这样的: Consumer 会把 Kafka 队列的消息持久化并 commit offset.
实习生童鞋反馈有时 KafkaConsumer.poll() 会重复拉取消息,
直觉告诉我这应该是 offset commit 没有管理好的原因,
不过还是看了 poll() API 源码 ...

poll() API 源码

首先从 org.apache.kafka.clients.consumer.KafkaConsumer.poll() 方法开始, 里边会调用 pollOnce() :

public ConsumerRecords<K, V> poll(long timeout) {
...
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = pollOnce(remaining);
...
}

private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {

coordinator.poll(time.milliseconds());

// 检查是否获取到了分区的 position
if (!subscriptions.hasAllFetchPositions())
updateFetchPositions(this.subscriptions.missingFetchPositions());

// if data is available already, return it immediately
Map<TopicPartition, List<ConsumerRecord<K, V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty())
return records;
fetcher.sendFetches();
...
}

pollOnce() 中 :

  • coordinator.poll(time.milliseconds()) 会向 the designated broker (the coordinator)
    发送请求, 加入 group, 获取 topic 分区对应的消费 position .
  • 接着 subscriptions.hasAllFetchPositions() 中会检查是否获取了
    所有的消费 positions, 如果没有就去 fetch 和 update.
  • 然后 fetcher.fetchedRecords() 获取消息记录 records, 如果是空的,
    那么 fetcher.sendFetches() 继续发送 Fetch 请求新的 records.

subscriptions.hasAllFetchPositions()

其中, subscriptions.hasAllFetchPositions() 如下 :

SubscriptionState.java :


public boolean hasAllFetchPositions() {
return hasAllFetchPositions(this.assignedPartitions());
}

public boolean hasAllFetchPositions(Collection<TopicPartition> partitions) {
for (TopicPartition partition : partitions)
if (!hasValidPosition(partition))
return false;
return true;
}

public boolean hasValidPosition(TopicPartition tp) {
return isAssigned(tp) && assignedState(tp).hasValidPosition();
}

private TopicPartitionState assignedState(TopicPartition tp) {
TopicPartitionState state = this.assignment.stateValue(tp);
if (state == null)
throw new IllegalStateException("No current assignment for partition " + tp);
return state;
}

this.assignment.stateValue(tp) 可得 topic 分区的 TopicPartitionState.

this.assignment 的类型是 PartitionStates<TopicPartitionState>,
是在 coordinator.poll(time.milliseconds()) 阶段 join group 成功后,
存放由 coordinator assign 的分区信息.

得到的 TopicPartitionState 这个类记录了 positioncommitted 两个值,
对应最后消费的位置 和 最后 committed 的位置:

private static class TopicPartitionState {
private Long position; // last consumed position
private Long highWatermark; // the high watermark from last fetch
private OffsetAndMetadata committed; // last committed position
private boolean paused; // whether this partition has been paused by the user
private OffsetResetStrategy resetStrategy; // the strategy to use if the offset needs resetting
...
}

fetcher.sendFetches()

回到 pollOnce() 中的 fetcher.sendFetches(), 这里将进行
fetch request 的构建, 用来向服务端获取数据:

Fetcher.java :

public int sendFetches() {
Map<Node, FetchRequest.Builder> fetchRequestMap = createFetchRequests();
...
}

private Map<Node, FetchRequest.Builder> createFetchRequests() {
...
for (TopicPartition partition : fetchablePartitions()) {
Node node = cluster.leaderFor(partition);
if (node == null) {
metadata.requestUpdate();
} else if (this.client.pendingRequestCount(node) == 0) {
// if there is a leader and no in-flight requests, issue a new fetch
LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node);
if (fetch == null) {
fetch = new LinkedHashMap<>();
fetchable.put(node, fetch);
}

long position = this.subscriptions.position(partition);
fetch.put(partition, new FetchRequest.PartitionData(position, this.fetchSize));
log.trace("Added fetch request for partition {} at offset {} to node {}", partition, position, node);
}
...
}
...
}

构建 FetchRequest 时, 请求参数主要有 partition, position 和 fetchSize.
其中 position 来自 this.subscriptions.position(partition), 如下:

SubscriptionState.java :

public Long position(TopicPartition tp) {
return assignedState(tp).position;
}

private TopicPartitionState assignedState(TopicPartition tp) {
TopicPartitionState state = this.assignment.stateValue(tp);
if (state == null)
throw new IllegalStateException("No current assignment for partition " + tp);
return state;
}

这里 this.assignment.stateValue(tp) 获取的 TopicPartitionState,
就是前面我们提到的实例, FetchRequest 使用的也是这个 position 值.

并且 TopicPartitionState.position 的值被修改的地方,
除了 position(TopicPartition, long) 之外, 只有 seek(long offset) 这个地方.

private void seek(long offset) {
this.position = offset;
this.resetStrategy = null;
}

可以查找得知 TopicPartitionState.position 值在哪些地方被更新:

如上所示的 SubscriptionState.position(TopicPartition, long) 的被调用链,
fetcher.fetchedRecords() 源码中可以发现在返回 records 给客户端时, 会更新 TopicPartitionState.position,
fetcher.fetchedRecords() 这个方法也仅被 KafkaConsumer.poll() 调用.

所以, 从源码的调用关系来看, 一个 consumer instance 的多次 poll() 是不会重复拉取的,
因为 position 变量值在 poll() 时就会更新, client 初次调用 poll() 后,
多次调用是不会重复消费消息, 重复消费只会在第一次时出现.

重复消费的原因

重复消费的问题是 Kafka 新人经常碰到的, 这个主要跟 consumer commit 的处理有关.
比如在 consumer 客户端未能成功把 offset commit 到服务端(比如被kill/崩溃/不优雅的重启等等),
那么新启的 consumer 客户端是可能重复消费的.

管理 offset

在 offset 提交上, KafkaConsumer 支持自动提交 offset,
构建 KafkaConsumer 时设置 Properties :

  • enable.auto.commit 设置为 true 启用.
  • auto.commit.interval.ms 可以设置自动提交的频率(单位毫秒).

当然, 更好的是手动管理 offset 的 commit, 分别有同步和异步的方法:

  • commitSync(Map<TopicPartition,OffsetAndMetadata>)
  • commitAsync(OffsetCommitCallback)

要处理好重复消费的问题, 是需要根据业务情况和要求来决定的,
比如是否需要 exactly-once 语义, 还需要考虑和权衡对吞吐能力的影响.
比如加入自定义事务, 可以实现 exactly-once, 但会降低程序的吞吐量.

当然也可以在 Kafka 之外, 自主管理消息的 offset 和 状态, 自主选择拉取不同 offset 范围的消息,
来满足更严格的业务场景和需求. (比如 seek(TopicPartition partition, long offset) 等 API 支持)

从 Kafka Streams (v0.11 开始), 也有 Transactional API 支持了, 相信将来会有更多相应的功能支持.