前面我们已经介绍了 Supervisor 如何指派任务, 那么 Worker 是如何运行任务的呢? 今天就从源码上了解大概的流程.
middleManager 接收 task 指派
前面提到 Supervisor 通过 Zookeeper 指派任务, 熟悉 Zookeeper 的同学估计也已经猜到, middleManager 应该是通过监听 task 指派的目录进行任务接收的.
首先我们看一下 middleManager 启动 task 监视器的地方 :
WorkerTaskMonitor.start()
|
进入其中的 WorkerTaskMonitor.registerRunListener() 可以发现注册监听的代码 :
private void registerRunListener() |
当有新指派的 task 时, 会将 task 封装为 WorkerTaskMonitor.RunNotice 放入 notices 队列, RunNotice 的执行细节如下:
private class RunNotice implements Notice |
如上, RunNotice 执行时首先判断 running 中是否有 task.id, 无则更新 task 的 zk 状态,
然后运行 task , 同时加到 running 内存记录中, 并注册运行状态变更的 StatusNotice .
其中 taskRunner 是 ForkingTaskRunner, run() 方法比较长, 不方便贴出来, 可以 clone 源码来看.
主要的逻辑就是使用 ProcessBuilder 构建一个 java 运行的命令, 然后运行这个 process , 等待其执行结束.
其中, 运行的 command 是 :
java -cp ... io.druid.cli.Main internal peon ..../task.json .../status.json
等待执行结束的代码是 :
final int statusCode = processHolder.process.waitFor(); |
Peon 启动 task 运行
如上, 我们已经启动 Peon 来运行 task, task 已经保存在 task.json 之中.
peon 的运行是在 CliPeon.run() 中.
|
而真正 run task 是在 ExecutorLifecycle.start() :
|
如上, taskRunner.run(task) 开始执行指派的 task, 这里的 taskRunner 为 ThreadPoolTaskRunner
, task 和之前博客保持一致, 为 KafkaIndexTask .
执行 KafkaIndexTask
如上, 由于运行 task 的是 KafkaIndexTask , 下面就进入 KafkaIndexTask.run() 方法:
public TaskStatus run(final TaskToolbox toolbox) throws Exception |
如上, task 主要包括几部分:
- 读取 kafka 消息
- 持久化 segments
- publish segments
读取 kafka 消息
从 task 描述中创建 KafkaConsumer:
final KafkaConsumer<byte[], byte[]> consumer = newConsumer() |
while stillReading , consumer.poll() 进行 kafka 消息读取并转换, 直到读取完 task 指定的 offset 范围.
对于每条 kafka 消息, 将会转换为 InputRow, 然后通过 driver.add() 加到 appenderator 中.
driver 的创建如下 :
FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox) |
persist()
从 kafka 读取消息并 add 之后, 将会调用 driver.persist() 进行持久化, 实现是 FiniteAppenderatorDriver.persist() :
public Object persist(final Committer committer) throws InterruptedException |
appenderator.persistAll 实现是: AppenderatorImpl.persistAll()
public ListenableFuture<Object> persistAll(final Committer committer) |
如上, persistAll 会对没有进行 persist 的 hydrants 进行持久化操作,
提交一个线程到 persistExecutor , 里边用 AppenderatorImpl#persistHydrant() 来持久化 indexesToPersist 中的 index :
AppenderatorImpl.persistHydrant() :
private int persistHydrant(FireHydrant indexToPersist, SegmentIdentifier identifier) |
如上, indexMerger.persist() 进行 index 持久化, 并返回一个 persistedFile .
indexToPersist.swapSegment() 把 FireHydrant.segment 设置为新的 QueryableIndexSegment .
publish
persist 之后就是 publish
定义 publisher
首先定义 TransactionalSegmentPublisher 进行 publish:
final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() |
如上, 首先将定义一个 SegmentTransactionalInsertAction , 用来插入 segments 信息到 meta storage 中.
然后提交这个 action :
toolbox.getTaskActionClient().submit(action).isSuccess(); |
submit() 的实现是 TaskActionClient.submit(), 如下:
public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException |
如上, action 的执行逻辑是在 taskAction.perform(task, toolbox) 之中 ,
其实现是 SegmentTransactionalInsertAction.perform():
public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) throws IOException |
其中, announceHistoricalSegments() 的实现是 IndexerSQLMetadataStorageCoordinator.announceHistoricalSegments() :
public SegmentPublishResult announceHistoricalSegments( |
如上, 通过 updateDataSourceMetadataWithHandle() 进行 dataSource metadata 记录的更新.
之后 announceHistoricalSegent(handle, segment) 进行数据库 segment 记录插入.
最终返回 SegmentPublishResult 结果.
以上只是 publisher 的定义, 真正执行的地方是在后面.
我们回到 KafkaIndexTask.run() 方法 , 定义 publisher 之后, 我们将使用 driver.finish(publisher, )
进行 push 和 publish 操作 .
driver.finish(publisher, ) 执行
finish() 如下:
public SegmentsAndMetadata finish( |
其中 publishAll(publisher, wrapCommitter(committer)) 返回 SegmentsAndMetadata .
publishAll()
publishAll 是 FiniteAppenderatorDriver.publishAll(), 如下:
private SegmentsAndMetadata publishAll( |
如上, 具体看 appenderator.push() 方法.
appenderator.push()
push() 的实现是 AppenderatorImpl.push(), 如下:
public ListenableFuture<SegmentsAndMetadata> push( |
其中, transform() 会执行 AppenderatorImpl.persistAll() , 并把结果转换为 SegmentsAndMetadata :
看一下 transform 定义的 Function 中, 主要操作是 mergeAndPush() , 将进行 segment 合并操作 , 并 push 到 Deep Storage :
DataSegment dataSegment = mergeAndPush(entry.getKey(), entry.getValue()) |
如上, indexMerger.mergeQueryableIndex() 会进行 merge segments 返回 mergedFile,
然后 indexIO.loadIndex(mergedFile) 加载.
最后通过 dataSegmentPusher.push() 把 mergedFile push, 并将 segment 信息写入 descriptorFile .
其中 dataSegmentPusher.push() 是 DataSegmentPusher.push() 接口方法, 它有多个实现 , 比如 HdfsDataSegmentPusher / LocalDataSegmentPusher 等等.
如果是 LocalDataSegmentPusher , 会复制 segment 到指定目录 , 比如以下日志所示:
var/druid/segments/ds-test-01/2018-06-04T12:00:00.000Z_2018-06-04T13:00:00.000Z/2018-06-04T12:38:14.881Z/0
var/druid/indexing-logs/index_kafka_ds-test-01_859074593c222df_jcbcickc.log:729:2018-06-04T12:42:45,399 INFO [appenderator_merge_0] io.druid.segment.loading.LocalDataSegmentPusher - Copying segment[ds-test-01_2018-06-04T12:00:00.000Z_2018-06-04T13:00:00.000Z_2018-06-04T12:38:14.881Z] to local filesystem at location[var/druid/segments/ds-test-01/2018-06-04T12:00:00.000Z_2018-06-04T13:00:00.000Z/2018-06-04T12:38:14.881Z/0]
var/druid/indexing-logs/index_kafka_ds-test-01_09ec8505814100a_cebgdhom.log:582:2018-06-04T13:29:59,694 INFO [appenderator_merge_0] io.druid.segment.loading.LocalDataSegmentPusher - Copying segment[ds-test-01_2018-06-04T12:00:00.000Z_2018-06-04T13:00:00.000Z_2018-06-04T12:38:14.881Z_1] to local filesystem at location[var/druid/segments/ds-test-01/2018-06-04T12:00:00.000Z_2018-06-04T13:00:00.000Z/2018-06-04T12:38:14.881Z/1]
publisher.publishSegments()
push 成功之后, 就可以进行 publish 操作了.
这里进行的 publishSegments() 操作,
就是前面我们定义的 TransactionalSegmentPublisher , 这里不再赘述.
handoffNotifier
之后就是对 segmentsAndMetadata.getSegments() 进行 handoffNotifier.registerSegmentHandoffCallback(),
即是注册一个 handOffCallback , 当 handoff 完成时, 将会执行回调.
方法实现是 CoordinatorBasedSegmentHandoffNotifier#registerSegmentHandoffCallback ,
callback 被保存在 CoordinatorBasedSegmentHandoffNotifier.handOffCallbacks,
被使用的地方是 checkForSegmentHandoffs() , 调用栈是:
CoordinatorBasedSegmentHandoffNotifier.checkForSegmentHandoffs() (io.druid.segment.realtime.plumber) |
也就是前面 KafkaIndexTask.run() 中, 通过 driver.startJob() 启动的 ScheduledExecutorService 任务,
定期对 CoordinatorBasedSegmentHandoffNotifier.handOffCallbacks 进行检查是否完成和是否需要调用对应的 callback .
callback 的内容是 appenderator.drop(identifier).
checkForSegmentHandoffs() :
void checkForSegmentHandoffs() |
如上, coordinatorClient.fetchServerView() 是通过对 coordinator 的 http 请求来判断 loadedSegments 的 handoff 状态.
返回 TaskStatus
最后 Task.run() 会返回任务状态, 再回到前面,
作为 taskRunner.run(task) 返回的状态, 将会被写入 statusFile,
如此便结束了一个 peon 的 task 运行.
最后
以上是 middleManager 接收和运行任务的概要流程, 由于篇幅优先,
一些重要的细节 (如 segment 持久化 、 coordinator 管理 segment 、实时查询的实现等等) 将在后续博客中再介绍.