Kafka Indexing Service (简称 KIS) 可以从 Kafka 拉取数据并写入 Druid 的 DataSource 中,
KIS 的代码并不复杂, 我们可以从中了解到更多的细节。下面我们来一起看看提交 SupervisorSpec 的大概流程。
定义 SuperVisor
使用 KIS 时, 需要定义一个 SuperVisor 描述 json, 假设为 supervisor-01.json 文件, 如下例子:
首先定义了一个 dataSource 叫 "ds-test-01",然后定义输入消息的 parser 格式为 json,
再设置 DataSource 中的 timestamp 和 dimensions 对应的 json fields ,
接着是定义需要的 metrics ,常见的如平均值/最大/最小值,
ioConfig 设置 Kafka 的输入信息,tuningConfig 是一些优化的配置,这里不是重点,更多参数请对照此文档。
{ |
提交 SuperVisorSpec
我们现在把上一步定义的 SuperVisor Spec 的 json 通过 HTTP POST 提交:
curl -X POST -H 'Content-Type: application/json' -d @supervisor-01.json http://${overlord_host}:8090/druid/indexer/v1/supervisor |
接收处理 SuperVisorSpec
SuperVisorSpec 接收处理是在 SupervisorResource:
"/druid/indexer/v1/supervisor") ( |
如上, specPost() 输入是 SupervisorSpec
, 它的实现是 KafkaSupervisorSpec
,
但因为 SupervisorSpec 是接口, 反序列化结果需要是实现类,
因此是有配置 Json 映射的, 会根据 type
来选择反序列化的结果类, 定义如下 :
// SupervisorSpec.java |
我们可以看看实现类 KafkaSupervisorSpec
, 可以发现 KafkaSupervisorSpec 和我们提交的 Json 内容是对应的。
// KafkaSupervisorSpec.java |
回到 specPost(),接下来就是根据 KafkaSupervisorSpec
来启动创建和启动 Supervisor:
private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean persistSpec) |
上面的 spec.createSupervisor() 将会创建 supervisor,
实现是 KafkaSupervisorSpec.createSupervisor(), 如下:
@Override |
创建之后, 执行 supervisor.start()
启动 supervisor , 也就是 KafkaSupervisor.start()
:
|
上面主要进行几个操作:
1. 创建 KafkaConsumer
通过 getKafkaConsumer() 创建 KafkaConsumer
2. notices 队列处理线程
ExecutorService.submit() 一个线程, 该线程不断从 BlockingQueue
3. 定期往 notices 队列推 notice
ScheduledExecutorService.scheduleAtFixedRate() 定期往 notices 队列里增加一个 RunNotice.
RunNotice 处理
从上面可知, Notice 是 Supervisor 运行的关键。
其中从 RunNotice 的定义可知:
|
核心处理逻辑在 KafkaSupervisor.runInternal() 中,接下来就分别看看这几个方法。
possiblyRegisterListener()
主要进行 TaskRunner 的 Listener 注册, 主要是 statusChanged() 回调定义, 只是往 notices 队列增加一个 RunNotice() :
taskRunner.get().registerListener( |
updatePartitionDataFromKafka()
首先来看一下 KafkaSupervisor.partitionGroups 这个数据结构, 他是一个 Map<Integer, Map<Integer, Long>>,
它记录的内容是 Map<{group ID}, Map<{partition ID}, {startingOffset}>>,
我们把其中的 Map<{partition ID}, {startingOffset}> 称为 partitionMap,
这个方法的主要工作就是:
检查 ioConfig.getTopic() 的所有分区,
- 查看每个分区对应的 taskGroupId 是否有 key, 无则创建.
taskGroupId 等于partition % ioConfig.getTaskCount()
, 也即是说如果设置了 3 个 task,
Kafka topic 有 9 个分区, 那将会有 3 个 taskGroupId, 每个 taskGroupId 会消费 3 个分区.
如果 taskCount 大于 分区数, 那么 taskCount 的值会被设置为分区数。 - 检查 taskGroupId 的 partitionMap 是否有某个 partition 的 key, 无则创建.
discoverTasks()
这部分将会从 taskStorage 读取 activeTasks, 如果是属于当前 dataSource 而且类型属于 KafkaIndexTask, 则:
通过 task 对应的 taskGroupId 查看有对应的 TaskGroup , 无则创建, 并把 task 加入到该 TaskGroup。
其中数据库读取出来的 KafkaIndexTask
如下所示, 定义了运行所需的信息 :
|
updateTaskStatus()
从 taskStorage 中获取 task 的最新状态, 同步到内存中。
checkTaskDuration()
检查每个 taskGroup 中的任务运行时间, 如果超过设置中的 ioConfig.taskDuration, 则保存状态并停止它。
checkPendingCompletionTasks()
这个方法检查 pendingCompletionTaskGroups ,寻找 group 中已经完成的 tasks。
如果 group 中有任意一个 task 完成了,我们都可以停止 group 中的其他 tasks。
如果 group 已经达到 publishing timeout , 则停掉当前 group 、同 groupId 中接下来的 group 和正在运行的 group。
checkCurrentTaskState()
检查 taskGroups 中的任务状态,停止和删除错误或失败的 tasks , 如果 group 中某一任务成功了,停止 group 中的所有任务。
createNewTasks()
终于到了创建 task 的地方 , 这里首先会遍历 partitionGroups 的 keys, 也即是 groupId,
看 taskGroups 中是否存在该 groupId, 否则创建一个 TaskGroup.
然后循环 taskGroups , 检查 ioConfig.getReplicas() > taskGroup.tasks.size()
,
tasks 数目是否会小于配置的 replicas 数目, 是则通过 createKafkaTasksForGroup() 创建 task。
createKafkaTasksForGroup()
我们来看看 createKafkaTasksForGroup(int groupId, int replicas):
private void createKafkaTasksForGroup(int groupId, int replicas) |
如上, 主要根据 replicas 参数进行 task 创建,然后通过 taskMaster.getTaskQueue().get().add() 增加新建的 task。
TaskQueue.add() 定义如下:
public boolean add(final Task task) throws EntryExistsException |
如上, 首先将通过 taskStorage 把 task 信息存储持久化,
然后把 task 加入到 tasks 中, tasks 的使用是在 manage() 进行,
而 manage() 是在一个 forever loop 中调用,
从 TaskQueue.start()
可以知道:
|
因此, 继续看 manage() ,此方法会不断检查 tasks , 如果有未运行的 task, 则通过 taskRunner.run(task)
运行它:
private void manage() throws InterruptedException |
TaskRunner
的实现是 RemoteTaskRunner (io.druid.indexing.overlord):
|
如上, 对于一个 task, 分别判断是否在 pendingTasks 、 runningTasks 或 completeTasks 中, 然后返回相应的状态。
如果都不在, 则通过 addPendingTask(task) 加到 pendingTasks 中.
addPendingTask(task) 中会调用 runPendingTasks() :
private void runPendingTasks() |
如上, 主要工作是通过 tryAssignTask(task, taskRunnerWorkItem) 进行 Task 指派.
private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception |
如上,指派 task 是通过 Zookeeper 来实现的。
- strategy.findWorkerForTask()
strategy 实现是FillCapacityWorkerSelectStrategy
, 主要是对 worker 通过 CurrCapacityUsed 和 Version 进行排序,返回最合适的 worker。
public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy |
- 如果有合适的 worker , 会通过 announceTask(task, assignedWorker, taskRunnerWorkItem) 进行任务指派:
private boolean announceTask( |
如上,CuratorUtils.createIfNotExists() 会把 task 写入 Zookeeper,
然后将 task 从 pendingTasks 移除, 并加入到 runningTasks 中,
然后会等待 taskAssignmentTimeout 时间,等待 worker 把 task 变更为 RUNNING 状态,
如果超时,则停止该 task。
最后
至此, Supervisor 创建、启动、下发任务的大概流程已经介绍完,至于 worker 如何执行 task,只能下一篇再介绍。