Druid | Kafka Indexing Service - 2 - Worker 运行任务

前面我们已经介绍了 Supervisor 如何指派任务, 那么 Worker 是如何运行任务的呢? 今天就从源码上了解大概的流程.

middleManager 接收 task 指派


前面提到 Supervisor 通过 Zookeeper 指派任务, 熟悉 Zookeeper 的同学估计也已经猜到, middleManager 应该是通过监听 task 指派的目录进行任务接收的.

首先我们看一下 middleManager 启动 task 监视器的地方 :

WorkerTaskMonitor.start()

@LifecycleStart
public void start() throws Exception
{
synchronized (lifecycleLock) {
Preconditions.checkState(!started, "already started");
Preconditions.checkState(!exec.isShutdown(), "already stopped");
started = true;

try {
restoreRestorableTasks();
cleanupStaleAnnouncements();
registerRunListener();
registerLocationListener();
pathChildrenCache.start();
exec.submit(
new Runnable()
{
@Override
public void run()
{
mainLoop();
}
}
);

log.info("Started WorkerTaskMonitor.");
started = true;
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
log.makeAlert(e, "Exception starting WorkerTaskMonitor")
.emit();
throw e;
}
}
}

进入其中的 WorkerTaskMonitor.registerRunListener() 可以发现注册监听的代码 :

private void registerRunListener()
{
pathChildrenCache.getListenable().addListener(
new PathChildrenCacheListener()
{
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
throws Exception
{
if (pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
final Task task = jsonMapper.readValue(
cf.getData().forPath(pathChildrenCacheEvent.getData().getPath()),
Task.class
);

notices.add(new RunNotice(task));
}
}
}
);
}

当有新指派的 task 时, 会将 task 封装为 WorkerTaskMonitor.RunNotice 放入 notices 队列, RunNotice 的执行细节如下:

private class RunNotice implements Notice
{
private final Task task;

public RunNotice(Task task)
{
this.task = task;
}

@Override
public String getTaskId()
{
return task.getId();
}

@Override
public void handle() throws Exception
{
if (running.containsKey(task.getId())) {
log.warn(
"Got run notice for task [%s] that I am already running...",
task.getId()
);
workerCuratorCoordinator.removeTaskRunZnode(task.getId());
return;
}

log.info("Submitting runnable for task[%s]", task.getId());

workerCuratorCoordinator.updateTaskStatusAnnouncement(
TaskAnnouncement.create(
task,
TaskStatus.running(task.getId()),
TaskLocation.unknown()
)
);

log.info("Affirmative. Running task [%s]", task.getId());
workerCuratorCoordinator.removeTaskRunZnode(task.getId());
final ListenableFuture<TaskStatus> future = taskRunner.run(task);
addRunningTask(task, future);
}
}

如上, RunNotice 执行时首先判断 running 中是否有 task.id, 无则更新 task 的 zk 状态,
然后运行 task , 同时加到 running 内存记录中, 并注册运行状态变更的 StatusNotice .

其中 taskRunner 是 ForkingTaskRunner, run() 方法比较长, 不方便贴出来, 可以 clone 源码来看.

主要的逻辑就是使用 ProcessBuilder 构建一个 java 运行的命令, 然后运行这个 process , 等待其执行结束.

其中, 运行的 command 是 :

java -cp ... io.druid.cli.Main internal peon ..../task.json .../status.json

等待执行结束的代码是 :

final int statusCode = processHolder.process.waitFor();

Peon 启动 task 运行


如上, 我们已经启动 Peon 来运行 task, task 已经保存在 task.json 之中.

peon 的运行是在 CliPeon.run() 中.

@Override
public void run()
{
try {
Injector injector = makeInjector();
try {
final Lifecycle lifecycle = initLifecycle(injector);
final Thread hook = new Thread(
new Runnable()
{
@Override
public void run()
{
log.info("Running shutdown hook");
lifecycle.stop();
}
}
);
Runtime.getRuntime().addShutdownHook(hook);
injector.getInstance(ExecutorLifecycle.class).join();

// Sanity check to help debug unexpected non-daemon threads
final Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
for (Thread thread : threadSet) {
if (!thread.isDaemon() && thread != Thread.currentThread()) {
log.info("Thread [%s] is non daemon.", thread);
}
}

// Explicitly call lifecycle stop, dont rely on shutdown hook.
lifecycle.stop();
Runtime.getRuntime().removeShutdownHook(hook);
}
catch (Throwable t) {
log.error(t, "Error when starting up. Failing.");
System.exit(1);
}
log.info("Finished peon task");
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

而真正 run task 是在 ExecutorLifecycle.start() :

@LifecycleStart
public void start() throws InterruptedException
{
final File taskFile = Preconditions.checkNotNull(taskExecutorConfig.getTaskFile(), "taskFile");
final File statusFile = Preconditions.checkNotNull(taskExecutorConfig.getStatusFile(), "statusFile");
final InputStream parentStream = Preconditions.checkNotNull(taskExecutorConfig.getParentStream(), "parentStream");

try {
task = jsonMapper.readValue(taskFile, Task.class);

log.info(
"Running with task: %s",
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task)
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}

// Avoid running the same task twice on the same machine by locking the task base directory.

final File taskLockFile = taskConfig.getTaskLockFile(task.getId());

try {
synchronized (this) {
if (taskLockChannel == null && taskLockFileLock == null) {
taskLockChannel = FileChannel.open(
taskLockFile.toPath(),
StandardOpenOption.CREATE,
StandardOpenOption.WRITE
);

log.info("Attempting to lock file[%s].", taskLockFile);
final long startLocking = System.currentTimeMillis();
final long timeout = new DateTime(startLocking).plus(taskConfig.getDirectoryLockTimeout()).getMillis();
while (taskLockFileLock == null && System.currentTimeMillis() < timeout) {
taskLockFileLock = taskLockChannel.tryLock();
if (taskLockFileLock == null) {
Thread.sleep(100);
}
}

if (taskLockFileLock == null) {
throw new ISE("Could not acquire lock file[%s] within %,dms.", taskLockFile, timeout - startLocking);
} else {
log.info("Acquired lock file[%s] in %,dms.", taskLockFile, System.currentTimeMillis() - startLocking);
}
} else {
throw new ISE("Already started!");
}
}
}
catch (IOException e) {
throw Throwables.propagate(e);
}

// Spawn monitor thread to keep a watch on parent's stdin
// If stdin reaches eof, the parent is gone, and we should shut down
parentMonitorExec.submit(
new Runnable()
{
@Override
public void run()
{
try {
while (parentStream.read() != -1) {
// Toss the byte
}
}
catch (Exception e) {
log.error(e, "Failed to read from stdin");
}

// Kind of gross, but best way to kill the JVM as far as I know
log.info("Triggering JVM shutdown.");
System.exit(2);
}
}
);

// Won't hurt in remote mode, and is required for setting up locks in local mode:
try {
if (!task.isReady(taskActionClientFactory.create(task))) {
throw new ISE("Task is not ready to run yet!", task.getId());
}
}
catch (Exception e) {
throw new ISE(e, "Failed to run isReady", task.getId());
}

statusFuture = Futures.transform(
taskRunner.run(task),
new Function<TaskStatus, TaskStatus>()
{
@Override
public TaskStatus apply(TaskStatus taskStatus)
{
try {
log.info(
"Task completed with status: %s",
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(taskStatus)
);

final File statusFileParent = statusFile.getParentFile();
if (statusFileParent != null) {
statusFileParent.mkdirs();
}
jsonMapper.writeValue(statusFile, taskStatus);

return taskStatus;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}

如上, taskRunner.run(task) 开始执行指派的 task, 这里的 taskRunner 为 ThreadPoolTaskRunner , task 和之前博客保持一致, 为 KafkaIndexTask .

执行 KafkaIndexTask


如上, 由于运行 task 的是 KafkaIndexTask , 下面就进入 KafkaIndexTask.run() 方法:

public TaskStatus run(final TaskToolbox toolbox) throws Exception
{
log.info("Starting up!");
startTime = DateTime.now();
mapper = toolbox.getObjectMapper();
status = Status.STARTING;

if (chatHandlerProvider.isPresent()) {
log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName());
chatHandlerProvider.get().register(getId(), this);
} else {
log.warn("No chat handler detected");
}

runThread = Thread.currentThread();

// Set up FireDepartmentMetrics
final FireDepartment fireDepartmentForMetrics = new FireDepartment(
dataSchema,
new RealtimeIOConfig(null, null, null),
null
);
fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
toolbox.getMonitorScheduler().addMonitor(
new RealtimeMetricsMonitor(
ImmutableList.of(fireDepartmentForMetrics),
ImmutableMap.of(DruidMetrics.TASK_ID, new String[]{getId()})
)
);

try (
final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox);
final KafkaConsumer<byte[], byte[]> consumer = newConsumer()
) {
appenderator = appenderator0;

final String topic = ioConfig.getStartPartitions().getTopic();

// Start up, set up initial offsets.
final Object restoredMetadata = driver.startJob();
if (restoredMetadata == null) {
nextOffsets.putAll(ioConfig.getStartPartitions().getPartitionOffsetMap());
} else {
final Map<String, Object> restoredMetadataMap = (Map) restoredMetadata;
final KafkaPartitions restoredNextPartitions = toolbox.getObjectMapper().convertValue(
restoredMetadataMap.get(METADATA_NEXT_PARTITIONS),
KafkaPartitions.class
);
nextOffsets.putAll(restoredNextPartitions.getPartitionOffsetMap());

// Sanity checks.
if (!restoredNextPartitions.getTopic().equals(ioConfig.getStartPartitions().getTopic())) {
throw new ISE(
"WTF?! Restored topic[%s] but expected topic[%s]",
restoredNextPartitions.getTopic(),
ioConfig.getStartPartitions().getTopic()
);
}

if (!nextOffsets.keySet().equals(ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) {
throw new ISE(
"WTF?! Restored partitions[%s] but expected partitions[%s]",
nextOffsets.keySet(),
ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()
);
}
}

// Set up sequenceNames.
final Map<Integer, String> sequenceNames = Maps.newHashMap();
for (Integer partitionNum : nextOffsets.keySet()) {
sequenceNames.put(partitionNum, String.format("%s_%s", ioConfig.getBaseSequenceName(), partitionNum));
}

// Set up committer.
final Supplier<Committer> committerSupplier = new Supplier<Committer>()
{
@Override
public Committer get()
{
final Map<Integer, Long> snapshot = ImmutableMap.copyOf(nextOffsets);

return new Committer()
{
@Override
public Object getMetadata()
{
return ImmutableMap.of(
METADATA_NEXT_PARTITIONS, new KafkaPartitions(
ioConfig.getStartPartitions().getTopic(),
snapshot
)
);
}

@Override
public void run()
{
// Do nothing.
}
};
}
};

Set<Integer> assignment = assignPartitionsAndSeekToNext(consumer, topic);

// Main loop.
// Could eventually support leader/follower mode (for keeping replicas more in sync)
boolean stillReading = !assignment.isEmpty();
try {
while (stillReading) {
if (possiblyPause(assignment)) {
// The partition assignments may have changed while paused by a call to setEndOffsets() so reassign
// partitions upon resuming. This is safe even if the end offsets have not been modified.
assignment = assignPartitionsAndSeekToNext(consumer, topic);

if (assignment.isEmpty()) {
log.info("All partitions have been fully read");
publishOnStop = true;
stopRequested = true;
}
}

if (stopRequested) {
break;
}

// The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to
// offset is not present in the topic-partition. This can happen if we're asking a task to read from data
// that has not been written yet (which is totally legitimate). So let's wait for it to show up.
final ConsumerRecords<byte[], byte[]> records = RetryUtils.retry(
new Callable<ConsumerRecords<byte[], byte[]>>()
{
@Override
public ConsumerRecords<byte[], byte[]> call() throws Exception
{
try {
return consumer.poll(POLL_TIMEOUT);
}
finally {
status = Status.READING;
}
}
},
new Predicate<Throwable>()
{
@Override
public boolean apply(Throwable input)
{
return input instanceof OffsetOutOfRangeException;
}
},
Integer.MAX_VALUE
);

for (ConsumerRecord<byte[], byte[]> record : records) {
if (log.isTraceEnabled()) {
log.trace(
"Got topic[%s] partition[%d] offset[%,d].",
record.topic(),
record.partition(),
record.offset()
);
}

if (record.offset() < endOffsets.get(record.partition())) {
if (record.offset() != nextOffsets.get(record.partition())) {
throw new ISE(
"WTF?! Got offset[%,d] after offset[%,d] in partition[%d].",
record.offset(),
nextOffsets.get(record.partition()),
record.partition()
);
}

try {
final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row");

if (!ioConfig.getMinimumMessageTime().isPresent() ||
!ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) {

final SegmentIdentifier identifier = driver.add(
row,
sequenceNames.get(record.partition()),
committerSupplier
);

if (identifier == null) {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
}

fireDepartmentMetrics.incrementProcessed();
} else {
fireDepartmentMetrics.incrementThrownAway();
}
}
catch (ParseException e) {
if (tuningConfig.isReportParseExceptions()) {
throw e;
} else {
log.debug(
e,
"Dropping unparseable row from partition[%d] offset[%,d].",
record.partition(),
record.offset()
);

fireDepartmentMetrics.incrementUnparseable();
}
}

nextOffsets.put(record.partition(), record.offset() + 1);
}

if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition()))
&& assignment.remove(record.partition())) {
log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
assignPartitions(consumer, topic, assignment);
stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
}
}
}
}
finally {
driver.persist(committerSupplier.get()); // persist pending data
}

if (stopRequested && !publishOnStop) {
throw new InterruptedException("Stopping without publishing");
}

status = Status.PUBLISHING;
final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher()
{
@Override
public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata) throws IOException
{
final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS),
KafkaPartitions.class
);

// Sanity check, we should only be publishing things that match our desired end state.
if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) {
throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
}

final SegmentTransactionalInsertAction action;

if (ioConfig.isUseTransaction()) {
action = new SegmentTransactionalInsertAction(
segments,
new KafkaDataSourceMetadata(ioConfig.getStartPartitions()),
new KafkaDataSourceMetadata(finalPartitions)
);
} else {
action = new SegmentTransactionalInsertAction(segments, null, null);
}

log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());

return toolbox.getTaskActionClient().submit(action).isSuccess();
}
};

final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get());
if (published == null) {
throw new ISE("Transaction failure publishing segments, aborting");
} else {
log.info(
"Published segments[%s] with metadata[%s].",
Joiner.on(", ").join(
Iterables.transform(
published.getSegments(),
new Function<DataSegment, String>()
{
@Override
public String apply(DataSegment input)
{
return input.getIdentifier();
}
}
)
),
published.getCommitMetadata()
);
}
}
catch (InterruptedException e) {
// if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow
if (!stopRequested) {
Thread.currentThread().interrupt();
throw e;
}

log.info("The task was asked to stop before completing");
}

return success();
}

如上, task 主要包括几部分:

  • 读取 kafka 消息
  • 持久化 segments
  • publish segments

读取 kafka 消息


从 task 描述中创建 KafkaConsumer:

final KafkaConsumer<byte[], byte[]> consumer = newConsumer()

while stillReading , consumer.poll() 进行 kafka 消息读取并转换, 直到读取完 task 指定的 offset 范围.
对于每条 kafka 消息, 将会转换为 InputRow, 然后通过 driver.add() 加到 appenderator 中.

driver 的创建如下 :

  FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox)

...

private FiniteAppenderatorDriver newDriver(
final Appenderator appenderator,
final TaskToolbox toolbox
)
{
return new FiniteAppenderatorDriver(
appenderator,
new ActionBasedSegmentAllocator(toolbox.getTaskActionClient(), dataSchema),
toolbox.getSegmentHandoffNotifierFactory(),
new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
toolbox.getObjectMapper(),
tuningConfig.getMaxRowsPerSegment(),
tuningConfig.getHandoffConditionTimeout()
);
}

persist()


从 kafka 读取消息并 add 之后, 将会调用 driver.persist() 进行持久化, 实现是 FiniteAppenderatorDriver.persist() :

public Object persist(final Committer committer) throws InterruptedException
{
try {
log.info("Persisting data.");
final long start = System.currentTimeMillis();
final Object commitMetadata = appenderator.persistAll(wrapCommitter(committer)).get();
log.info("Persisted pending data in %,dms.", System.currentTimeMillis() - start);
return commitMetadata;
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}

appenderator.persistAll 实现是: AppenderatorImpl.persistAll()

public ListenableFuture<Object> persistAll(final Committer committer)
{
// Submit persistAll task to the persistExecutor

final Map<SegmentIdentifier, Integer> commitHydrants = Maps.newHashMap();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
final Set<SegmentIdentifier> identifiers = sinks.keySet();
for (SegmentIdentifier identifier : identifiers) {
final Sink sink = sinks.get(identifier);
final List<FireHydrant> hydrants = Lists.newArrayList(sink);
commitHydrants.put(identifier, hydrants.size());

final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();

for (FireHydrant hydrant : hydrants.subList(0, limit)) {
if (!hydrant.hasSwapped()) {
log.info("Hydrant[%s] hasn't persisted yet, persisting. Segment[%s]", hydrant, identifier);
indexesToPersist.add(Pair.of(hydrant, identifier));
}
}

if (sink.swappable()) {
indexesToPersist.add(Pair.of(sink.swap(), identifier));
}
}

log.info("Submitting persist runnable for dataSource[%s]", schema.getDataSource());

final String threadName = String.format("%s-incremental-persist", schema.getDataSource());
final Object commitMetadata = committer.getMetadata();
final Stopwatch runExecStopwatch = Stopwatch.createStarted();
final Stopwatch persistStopwatch = Stopwatch.createStarted();
final ListenableFuture<Object> future = persistExecutor.submit(
new ThreadRenamingCallable<Object>(threadName)
{
@Override
public Object doCall()
{
try {
for (Pair<FireHydrant, SegmentIdentifier> pair : indexesToPersist) {
metrics.incrementRowOutputCount(persistHydrant(pair.lhs, pair.rhs));
}

log.info(
"Committing metadata[%s] for sinks[%s].", commitMetadata, Joiner.on(", ").join(
Iterables.transform(
commitHydrants.entrySet(),
new Function<Map.Entry<SegmentIdentifier, Integer>, String>()
{
@Override
public String apply(Map.Entry<SegmentIdentifier, Integer> entry)
{
return String.format("%s:%d", entry.getKey().getIdentifierAsString(), entry.getValue());
}
}
)
)
);

committer.run();
objectMapper.writeValue(computeCommitFile(), Committed.create(commitHydrants, commitMetadata));

return commitMetadata;
}
catch (Exception e) {
metrics.incrementFailedPersists();
throw Throwables.propagate(e);
}
finally {
metrics.incrementNumPersists();
metrics.incrementPersistTimeMillis(persistStopwatch.elapsed(TimeUnit.MILLISECONDS));
persistStopwatch.stop();
}
}
}
);

final long startDelay = runExecStopwatch.elapsed(TimeUnit.MILLISECONDS);
metrics.incrementPersistBackPressureMillis(startDelay);
if (startDelay > WARN_DELAY) {
log.warn("Ingestion was throttled for [%,d] millis because persists were pending.", startDelay);
}
runExecStopwatch.stop();
resetNextFlush();

return future;
}

如上, persistAll 会对没有进行 persist 的 hydrants 进行持久化操作,
提交一个线程到 persistExecutor , 里边用 AppenderatorImpl#persistHydrant() 来持久化 indexesToPersist 中的 index :

AppenderatorImpl.persistHydrant() :

private int persistHydrant(FireHydrant indexToPersist, SegmentIdentifier identifier)
{
synchronized (indexToPersist) {
if (indexToPersist.hasSwapped()) {
log.info(
"Segment[%s], Hydrant[%s] already swapped. Ignoring request to persist.",
identifier, indexToPersist
);
return 0;
}

log.info("Segment[%s], persisting Hydrant[%s]", identifier, indexToPersist);

try {
int numRows = indexToPersist.getIndex().size();

final File persistedFile;
final File persistDir = createPersistDirIfNeeded(identifier);
final IndexSpec indexSpec = tuningConfig.getIndexSpec();
persistedFile = indexMerger.persist(
indexToPersist.getIndex(),
identifier.getInterval(),
new File(persistDir, String.valueOf(indexToPersist.getCount())),
indexSpec
);

indexToPersist.swapSegment(
new QueryableIndexSegment(
indexToPersist.getSegment().getIdentifier(),
indexIO.loadIndex(persistedFile)
)
);
return numRows;
}
catch (IOException e) {
log.makeAlert("dataSource[%s] -- incremental persist failed", schema.getDataSource())
.addData("segment", identifier.getIdentifierAsString())
.addData("count", indexToPersist.getCount())
.emit();

throw Throwables.propagate(e);
}
}
}

如上, indexMerger.persist() 进行 index 持久化, 并返回一个 persistedFile .
indexToPersist.swapSegment() 把 FireHydrant.segment 设置为新的 QueryableIndexSegment .

publish


persist 之后就是 publish

定义 publisher

首先定义 TransactionalSegmentPublisher 进行 publish:

final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher()
{
@Override
public boolean publishSegments(Set<DataSegment> segments, Object commitMetadata) throws IOException
{
final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue(
((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS),
KafkaPartitions.class
);

// Sanity check, we should only be publishing things that match our desired end state.
if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) {
throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
}

final SegmentTransactionalInsertAction action;

if (ioConfig.isUseTransaction()) {
action = new SegmentTransactionalInsertAction(
segments,
new KafkaDataSourceMetadata(ioConfig.getStartPartitions()),
new KafkaDataSourceMetadata(finalPartitions)
);
} else {
action = new SegmentTransactionalInsertAction(segments, null, null);
}

log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());

return toolbox.getTaskActionClient().submit(action).isSuccess();
}
};

如上, 首先将定义一个 SegmentTransactionalInsertAction , 用来插入 segments 信息到 meta storage 中.

然后提交这个 action :

toolbox.getTaskActionClient().submit(action).isSuccess();

submit() 的实现是 TaskActionClient.submit(), 如下:

public <RetType> RetType submit(TaskAction<RetType> taskAction) throws IOException
{
log.info("Performing action for task[%s]: %s", task.getId(), taskAction);

if (taskAction.isAudited()) {
// Add audit log
try {
storage.addAuditLog(task, taskAction);
}
catch (Exception e) {
final String actionClass = taskAction.getClass().getName();
log.makeAlert(e, "Failed to record action in audit log")
.addData("task", task.getId())
.addData("actionClass", actionClass)
.emit();
throw new ISE(e, "Failed to record action [%s] in audit log", actionClass);
}
}

return taskAction.perform(task, toolbox);
}

如上, action 的执行逻辑是在 taskAction.perform(task, toolbox) 之中 ,
其实现是 SegmentTransactionalInsertAction.perform():

public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox) throws IOException
{
toolbox.verifyTaskLocks(task, segments);

final SegmentPublishResult retVal = toolbox.getIndexerMetadataStorageCoordinator().announceHistoricalSegments(
segments,
startMetadata,
endMetadata
);

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
.setDimension(DruidMetrics.DATASOURCE, task.getDataSource())
.setDimension(DruidMetrics.TASK_TYPE, task.getType());

if (retVal.isSuccess()) {
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/success", 1));
} else {
toolbox.getEmitter().emit(metricBuilder.build("segment/txn/failure", 1));
}

for (DataSegment segment : retVal.getSegments()) {
metricBuilder.setDimension(DruidMetrics.INTERVAL, segment.getInterval().toString());
toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize()));
}

return retVal;
}

其中, announceHistoricalSegments() 的实现是 IndexerSQLMetadataStorageCoordinator.announceHistoricalSegments() :

public SegmentPublishResult announceHistoricalSegments(
final Set<DataSegment> segments,
final DataSourceMetadata startMetadata,
final DataSourceMetadata endMetadata
) throws IOException
{
if (segments.isEmpty()) {
throw new IllegalArgumentException("segment set must not be empty");
}

final String dataSource = segments.iterator().next().getDataSource();
for (DataSegment segment : segments) {
if (!dataSource.equals(segment.getDataSource())) {
throw new IllegalArgumentException("segments must all be from the same dataSource");
}
}

if ((startMetadata == null && endMetadata != null) || (startMetadata != null && endMetadata == null)) {
throw new IllegalArgumentException("start/end metadata pair must be either null or non-null");
}

final AtomicBoolean txnFailure = new AtomicBoolean(false);

try {
return connector.retryTransaction(
new TransactionCallback<SegmentPublishResult>()
{
@Override
public SegmentPublishResult inTransaction(
final Handle handle,
final TransactionStatus transactionStatus
) throws Exception
{
final Set<DataSegment> inserted = Sets.newHashSet();

if (startMetadata != null) {
final boolean success = updateDataSourceMetadataWithHandle(
handle,
dataSource,
startMetadata,
endMetadata
);

if (!success) {
transactionStatus.setRollbackOnly();
txnFailure.set(true);
throw new RuntimeException("Aborting transaction!");
}
}

for (final DataSegment segment : segments) {
if (announceHistoricalSegment(handle, segment)) {
inserted.add(segment);
}
}

return new SegmentPublishResult(ImmutableSet.copyOf(inserted), true);
}
},
3,
SQLMetadataConnector.DEFAULT_MAX_TRIES
);
}
catch (CallbackFailedException e) {
if (txnFailure.get()) {
return new SegmentPublishResult(ImmutableSet.<DataSegment>of(), false);
} else {
throw e;
}
}
}

如上, 通过 updateDataSourceMetadataWithHandle() 进行 dataSource metadata 记录的更新.
之后 announceHistoricalSegent(handle, segment) 进行数据库 segment 记录插入.
最终返回 SegmentPublishResult 结果.

以上只是 publisher 的定义, 真正执行的地方是在后面.

我们回到 KafkaIndexTask.run() 方法 , 定义 publisher 之后, 我们将使用 driver.finish(publisher, ) 进行 push 和 publish 操作 .

driver.finish(publisher, ) 执行

finish() 如下:

public SegmentsAndMetadata finish(
final TransactionalSegmentPublisher publisher,
final Committer committer
) throws InterruptedException
{
final SegmentsAndMetadata segmentsAndMetadata = publishAll(publisher, wrapCommitter(committer));

if (segmentsAndMetadata != null) {
final long giveUpAt = handoffConditionTimeout > 0
? System.currentTimeMillis() + handoffConditionTimeout
: 0;

log.info("Awaiting handoff of segments: [%s]", Joiner.on(", ").join(appenderator.getSegments()));

synchronized (handoffMonitor) {
while (!appenderator.getSegments().isEmpty()) {

if (giveUpAt == 0) {
handoffMonitor.wait();
} else {
final long remaining = giveUpAt - System.currentTimeMillis();
if (remaining > 0) {
handoffMonitor.wait(remaining);
} else {
throw new ISE(
"Segment handoff wait timeout. Segments not yet handed off: [%s]",
Joiner.on(", ").join(appenderator.getSegments())
);
}
}
}
}

log.info("All segments handed off.");

return new SegmentsAndMetadata(
segmentsAndMetadata.getSegments(),
((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()
);
} else {
return null;
}
}

其中 publishAll(publisher, wrapCommitter(committer)) 返回 SegmentsAndMetadata .

publishAll()

publishAll 是 FiniteAppenderatorDriver.publishAll(), 如下:

private SegmentsAndMetadata publishAll(
final TransactionalSegmentPublisher publisher,
final Committer wrappedCommitter
) throws InterruptedException
{
final List<SegmentIdentifier> theSegments = ImmutableList.copyOf(appenderator.getSegments());

long nTry = 0;
while (true) {
try {
log.info("Pushing segments: [%s]", Joiner.on(", ").join(theSegments));
final SegmentsAndMetadata segmentsAndMetadata = appenderator.push(theSegments, wrappedCommitter).get();

// Sanity check
if (!segmentsToIdentifiers(segmentsAndMetadata.getSegments()).equals(Sets.newHashSet(theSegments))) {
throw new ISE(
"WTF?! Pushed different segments than requested. Pushed[%s], requested[%s].",
Joiner.on(", ").join(identifiersToStrings(segmentsToIdentifiers(segmentsAndMetadata.getSegments()))),
Joiner.on(", ").join(identifiersToStrings(theSegments))
);
}

log.info(
"Publishing segments with commitMetadata[%s]: [%s]",
segmentsAndMetadata.getCommitMetadata(),
Joiner.on(", ").join(segmentsAndMetadata.getSegments())
);

if (segmentsAndMetadata.getSegments().isEmpty()) {
log.info("Nothing to publish, skipping publish step.");
} else {
final boolean published = publisher.publishSegments(
ImmutableSet.copyOf(segmentsAndMetadata.getSegments()),
((FiniteAppenderatorDriverMetadata) segmentsAndMetadata.getCommitMetadata()).getCallerMetadata()
);

if (published) {
log.info("Published segments, awaiting handoff.");
} else {
log.info("Transaction failure while publishing segments, checking if someone else beat us to it.");
if (usedSegmentChecker.findUsedSegments(segmentsToIdentifiers(segmentsAndMetadata.getSegments()))
.equals(Sets.newHashSet(segmentsAndMetadata.getSegments()))) {
log.info("Our segments really do exist, awaiting handoff.");
} else {
log.warn("Our segments don't exist, giving up.");
return null;
}
}
}

for (final DataSegment dataSegment : segmentsAndMetadata.getSegments()) {
handoffNotifier.registerSegmentHandoffCallback(
new SegmentDescriptor(
dataSegment.getInterval(),
dataSegment.getVersion(),
dataSegment.getShardSpec().getPartitionNum()
),
MoreExecutors.sameThreadExecutor(),
new Runnable()
{
@Override
public void run()
{
final SegmentIdentifier identifier = SegmentIdentifier.fromDataSegment(dataSegment);
log.info("Segment[%s] successfully handed off, dropping.", identifier);
final ListenableFuture<?> dropFuture = appenderator.drop(identifier);
Futures.addCallback(
dropFuture,
new FutureCallback<Object>()
{
@Override
public void onSuccess(Object result)
{
synchronized (handoffMonitor) {
handoffMonitor.notifyAll();
}
}

@Override
public void onFailure(Throwable e)
{
log.warn(e, "Failed to drop segment[%s]?!");
synchronized (handoffMonitor) {
handoffMonitor.notifyAll();
}
}
}
);
}
}
);
}

return segmentsAndMetadata;
}
catch (InterruptedException e) {
throw e;
}
catch (Exception e) {
final long sleepMillis = computeNextRetrySleep(++nTry);
log.warn(e, "Failed publishAll (try %d), retrying in %,dms.", nTry, sleepMillis);
Thread.sleep(sleepMillis);
}
}
}

如上, 具体看 appenderator.push() 方法.

appenderator.push()

push() 的实现是 AppenderatorImpl.push(), 如下:

public ListenableFuture<SegmentsAndMetadata> push(
final List<SegmentIdentifier> identifiers,
final Committer committer
)
{
final Map<SegmentIdentifier, Sink> theSinks = Maps.newHashMap();
for (final SegmentIdentifier identifier : identifiers) {
final Sink sink = sinks.get(identifier);
if (sink == null) {
throw new NullPointerException("No sink for identifier: " + identifier);
}
theSinks.put(identifier, sink);
sink.finishWriting();
}

return Futures.transform(
persistAll(committer),
new Function<Object, SegmentsAndMetadata>()
{
@Override
public SegmentsAndMetadata apply(Object commitMetadata)
{
final List<DataSegment> dataSegments = Lists.newArrayList();

for (Map.Entry<SegmentIdentifier, Sink> entry : theSinks.entrySet()) {
if (droppingSinks.contains(entry.getKey())) {
log.info("Skipping push of currently-dropping sink[%s]", entry.getKey());
continue;
}

final DataSegment dataSegment = mergeAndPush(entry.getKey(), entry.getValue());
if (dataSegment != null) {
dataSegments.add(dataSegment);
} else {
log.warn("mergeAndPush[%s] returned null, skipping.", entry.getKey());
}
}

return new SegmentsAndMetadata(dataSegments, commitMetadata);
}
},
mergeExecutor
);
}

其中, transform() 会执行 AppenderatorImpl.persistAll() , 并把结果转换为 SegmentsAndMetadata :
看一下 transform 定义的 Function 中, 主要操作是 mergeAndPush() , 将进行 segment 合并操作 , 并 push 到 Deep Storage :

DataSegment dataSegment = mergeAndPush(entry.getKey(), entry.getValue())


private DataSegment mergeAndPush(final SegmentIdentifier identifier, final Sink sink)
{
// Bail out if this sink is null or otherwise not what we expect.
if (sinks.get(identifier) != sink) {
log.warn("Sink for segment[%s] no longer valid, bailing out of mergeAndPush.", identifier);
return null;
}

// Use a descriptor file to indicate that pushing has completed.
final File persistDir = computePersistDir(identifier);
final File mergedTarget = new File(persistDir, "merged");
final File descriptorFile = computeDescriptorFile(identifier);

// Sanity checks
for (FireHydrant hydrant : sink) {
if (sink.isWritable()) {
throw new ISE("WTF?! Expected sink to be no longer writable before mergeAndPush. Segment[%s].", identifier);
}

synchronized (hydrant) {
if (!hydrant.hasSwapped()) {
throw new ISE("WTF?! Expected sink to be fully persisted before mergeAndPush. Segment[%s].", identifier);
}
}
}

try {
if (descriptorFile.exists()) {
// Already pushed.
log.info("Segment[%s] already pushed.", identifier);
return objectMapper.readValue(descriptorFile, DataSegment.class);
}

log.info("Pushing merged index for segment[%s].", identifier);

removeDirectory(mergedTarget);

if (mergedTarget.exists()) {
throw new ISE("Merged target[%s] exists after removing?!", mergedTarget);
}

List<QueryableIndex> indexes = Lists.newArrayList();
for (FireHydrant fireHydrant : sink) {
Segment segment = fireHydrant.getSegment();
final QueryableIndex queryableIndex = segment.asQueryableIndex();
log.info("Adding hydrant[%s]", fireHydrant);
indexes.add(queryableIndex);
}

final File mergedFile;
mergedFile = indexMerger.mergeQueryableIndex(
indexes,
schema.getAggregators(),
mergedTarget,
tuningConfig.getIndexSpec()
);

QueryableIndex index = indexIO.loadIndex(mergedFile);

DataSegment segment = dataSegmentPusher.push(
mergedFile,
sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions()))
);

objectMapper.writeValue(descriptorFile, segment);

log.info("Pushed merged index for segment[%s], descriptor is: %s", identifier, segment);

return segment;
}
catch (Exception e) {
metrics.incrementFailedHandoffs();
log.warn(e, "Failed to push merged index for segment[%s].", identifier);
throw Throwables.propagate(e);
}
}

如上, indexMerger.mergeQueryableIndex() 会进行 merge segments 返回 mergedFile,
然后 indexIO.loadIndex(mergedFile) 加载.
最后通过 dataSegmentPusher.push() 把 mergedFile push, 并将 segment 信息写入 descriptorFile .

其中 dataSegmentPusher.push() 是 DataSegmentPusher.push() 接口方法, 它有多个实现 , 比如 HdfsDataSegmentPusher / LocalDataSegmentPusher 等等.

如果是 LocalDataSegmentPusher , 会复制 segment 到指定目录 , 比如以下日志所示:

var/druid/segments/ds-test-01/2018-06-04T12:00:00.000Z_2018-06-04T13:00:00.000Z/2018-06-04T12:38:14.881Z/0
var/druid/indexing-logs/index_kafka_ds-test-01_859074593c222df_jcbcickc.log:729:2018-06-04T12:42:45,399 INFO [appenderator_merge_0] io.druid.segment.loading.LocalDataSegmentPusher - Copying segment[ds-test-01_2018-06-04T12:00:00.000Z_2018-06-04T13:00:00.000Z_2018-06-04T12:38:14.881Z] to local filesystem at location[var/druid/segments/ds-test-01/2018-06-04T12:00:00.000Z_2018-06-04T13:00:00.000Z/2018-06-04T12:38:14.881Z/0]
var/druid/indexing-logs/index_kafka_ds-test-01_09ec8505814100a_cebgdhom.log:582:2018-06-04T13:29:59,694 INFO [appenderator_merge_0] io.druid.segment.loading.LocalDataSegmentPusher - Copying segment[ds-test-01_2018-06-04T12:00:00.000Z_2018-06-04T13:00:00.000Z_2018-06-04T12:38:14.881Z_1] to local filesystem at location[var/druid/segments/ds-test-01/2018-06-04T12:00:00.000Z_2018-06-04T13:00:00.000Z/2018-06-04T12:38:14.881Z/1]

publisher.publishSegments()

push 成功之后, 就可以进行 publish 操作了.
这里进行的 publishSegments() 操作,
就是前面我们定义的 TransactionalSegmentPublisher , 这里不再赘述.

handoffNotifier

之后就是对 segmentsAndMetadata.getSegments() 进行 handoffNotifier.registerSegmentHandoffCallback(),
即是注册一个 handOffCallback , 当 handoff 完成时, 将会执行回调.
方法实现是 CoordinatorBasedSegmentHandoffNotifier#registerSegmentHandoffCallback ,
callback 被保存在 CoordinatorBasedSegmentHandoffNotifier.handOffCallbacks,

被使用的地方是 checkForSegmentHandoffs() , 调用栈是:

CoordinatorBasedSegmentHandoffNotifier.checkForSegmentHandoffs()  (io.druid.segment.realtime.plumber)
Anonymous in start() in CoordinatorBasedSegmentHandoffNotifier.run() (io.druid.segment.realtime.plumber)
FiniteAppenderatorDriver.startJob() (io.druid.segment.realtime.appenderator)
KafkaIndexTask.run(TaskToolbox) (io.druid.indexing.kafka)

也就是前面 KafkaIndexTask.run() 中, 通过 driver.startJob() 启动的 ScheduledExecutorService 任务,
定期对 CoordinatorBasedSegmentHandoffNotifier.handOffCallbacks 进行检查是否完成和是否需要调用对应的 callback .
callback 的内容是 appenderator.drop(identifier).

checkForSegmentHandoffs() :

void checkForSegmentHandoffs()
{
try {
Iterator<Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>>> itr = handOffCallbacks.entrySet()
.iterator();
while (itr.hasNext()) {
Map.Entry<SegmentDescriptor, Pair<Executor, Runnable>> entry = itr.next();
SegmentDescriptor descriptor = entry.getKey();
try {
List<ImmutableSegmentLoadInfo> loadedSegments = coordinatorClient.fetchServerView(
dataSource,
descriptor.getInterval(),
true
);

if (isHandOffComplete(loadedSegments, entry.getKey())) {
log.info("Segment Handoff complete for dataSource[%s] Segment[%s]", dataSource, descriptor);
entry.getValue().lhs.execute(entry.getValue().rhs);
itr.remove();
}
}
catch (Exception e) {
log.error(
e,
"Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs",
dataSource,
descriptor,
pollDurationMillis
);
}
}
if (!handOffCallbacks.isEmpty()) {
log.info("Still waiting for Handoff for Segments : [%s]", handOffCallbacks.keySet());
}
}
catch (Throwable t) {
log.error(
t,
"Exception while checking handoff for dataSource[%s] Segment[%s], Will try again after [%d]secs",
dataSource,
pollDurationMillis
);
}
}

如上, coordinatorClient.fetchServerView() 是通过对 coordinator 的 http 请求来判断 loadedSegments 的 handoff 状态.

返回 TaskStatus

最后 Task.run() 会返回任务状态, 再回到前面,
作为 taskRunner.run(task) 返回的状态, 将会被写入 statusFile,
如此便结束了一个 peon 的 task 运行.

最后


以上是 middleManager 接收和运行任务的概要流程, 由于篇幅优先,
一些重要的细节 (如 segment 持久化 、 coordinator 管理 segment 、实时查询的实现等等) 将在后续博客中再介绍.

参考