Druid | Kafka Indexing Service - 1 - SupervisorSpec 提交概要流程

Kafka Indexing Service (简称 KIS) 可以从 Kafka 拉取数据并写入 Druid 的 DataSource 中,
KIS 的代码并不复杂, 我们可以从中了解到更多的细节。下面我们来一起看看提交 SupervisorSpec 的大概流程。

定义 SuperVisor


使用 KIS 时, 需要定义一个 SuperVisor 描述 json, 假设为 supervisor-01.json 文件, 如下例子:

首先定义了一个 dataSource 叫 "ds-test-01",然后定义输入消息的 parser 格式为 json,
再设置 DataSource 中的 timestamp 和 dimensions 对应的 json fields ,
接着是定义需要的 metrics ,常见的如平均值/最大/最小值,
ioConfig 设置 Kafka 的输入信息,tuningConfig 是一些优化的配置,这里不是重点,更多参数请对照此文档

{
"type": "kafka",
"dataSchema": {
"dataSource": "ds-test-01",
"parser": {
"type": "string",
"parseSpec": {
"format": "json",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["upperColor"],
"dimensionExclusions": [
"timestamp",
"value"
]
}
}
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "kafka",
"workerThreads": 2,
"maxRowsPerSegment": 5000000
},
"ioConfig": {
"topic": "test-druid-metrics-1",
"consumerProperties": {
"bootstrap.servers": "foo"
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT5M"
}
}

提交 SuperVisorSpec


我们现在把上一步定义的 SuperVisor Spec 的 json 通过 HTTP POST 提交:

curl -X POST -H 'Content-Type: application/json' -d @supervisor-01.json http://${overlord_host}:8090/druid/indexer/v1/supervisor

接收处理 SuperVisorSpec


SuperVisorSpec 接收处理是在 SupervisorResource:

@Path("/druid/indexer/v1/supervisor")
public class SupervisorResource
{
private final TaskMaster taskMaster;

@Inject
public SupervisorResource(TaskMaster taskMaster)
{
this.taskMaster = taskMaster;
}

@POST
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
public Response specPost(final SupervisorSpec spec)
{
return asLeaderWithSupervisorManager(
new Function<SupervisorManager, Response>()
{
@Override
public Response apply(SupervisorManager manager)
{
manager.createOrUpdateAndStartSupervisor(spec);
return Response.ok(ImmutableMap.of("id", spec.getId())).build();
}
}
);
}
...
}

如上, specPost() 输入是 SupervisorSpec , 它的实现是 KafkaSupervisorSpec,
但因为 SupervisorSpec 是接口, 反序列化结果需要是实现类,
因此是有配置 Json 映射的, 会根据 type 来选择反序列化的结果类, 定义如下 :

// SupervisorSpec.java
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "NoopSupervisorSpec", value = NoopSupervisorSpec.class)
})
public interface SupervisorSpec
{
String getId();

Supervisor createSupervisor();
}

// KafkaIndexTaskModule.java
public class KafkaIndexTaskModule implements DruidModule
{
@Override
public List<? extends Module> getJacksonModules()
{
return ImmutableList.of(
new SimpleModule(getClass().getSimpleName())
.registerSubtypes(
new NamedType(KafkaIndexTask.class, "index_kafka"),
new NamedType(KafkaDataSourceMetadata.class, "kafka"),
new NamedType(KafkaIOConfig.class, "kafka"),
new NamedType(KafkaTuningConfig.class, "kafka"),
new NamedType(KafkaSupervisorSpec.class, "kafka")
)
);
}
}

我们可以看看实现类 KafkaSupervisorSpec , 可以发现 KafkaSupervisorSpec 和我们提交的 Json 内容是对应的。

// KafkaSupervisorSpec.java
@JsonCreator
public KafkaSupervisorSpec(
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("tuningConfig") KafkaTuningConfig tuningConfig,
@JsonProperty("ioConfig") KafkaSupervisorIOConfig ioConfig,
@JacksonInject TaskStorage taskStorage,
@JacksonInject TaskMaster taskMaster,
@JacksonInject IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
@JacksonInject KafkaIndexTaskClientFactory kafkaIndexTaskClientFactory,
@JacksonInject @Json ObjectMapper mapper
)

回到 specPost(),接下来就是根据 KafkaSupervisorSpec 来启动创建和启动 Supervisor:

private boolean createAndStartSupervisorInternal(SupervisorSpec spec, boolean persistSpec)
{
String id = spec.getId();
if (supervisors.containsKey(id)) {
return false;
}

Supervisor supervisor = spec.createSupervisor();
supervisor.start(); // try starting the supervisor first so we don't persist a bad spec

if (persistSpec) {
metadataSupervisorManager.insert(id, spec);
}

supervisors.put(id, Pair.of(supervisor, spec));
return true;
}

上面的 spec.createSupervisor() 将会创建 supervisor,
实现是 KafkaSupervisorSpec.createSupervisor(), 如下:

@Override
public Supervisor createSupervisor()
{
return new KafkaSupervisor(
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
kafkaIndexTaskClientFactory,
mapper,
this
);
}

创建之后, 执行 supervisor.start() 启动 supervisor , 也就是 KafkaSupervisor.start() :

@Override
public void start()
{
synchronized (stateChangeLock) {
Preconditions.checkState(!started, "already started");
Preconditions.checkState(!exec.isShutdown(), "already stopped");

try {
consumer = getKafkaConsumer();

exec.submit(
new Runnable()
{
@Override
public void run()
{
try {
while (!Thread.currentThread().isInterrupted()) {
final Notice notice = notices.take();

try {
notice.handle();
}
catch (Exception e) {
log.makeAlert(e, "KafkaSupervisor[%s] failed to handle notice", dataSource)
.addData("noticeClass", notice.getClass().getSimpleName())
.emit();
}
}
}
catch (InterruptedException e) {
log.info("KafkaSupervisor[%s] interrupted, exiting", dataSource);
}
}
}
);
}
catch (Exception e) {
log.makeAlert(e, "Exception starting KafkaSupervisor[%s]", dataSource)
.emit();
throw Throwables.propagate(e);
}

firstRunTime = DateTime.now().plus(ioConfig.getStartDelay());
scheduledExec.scheduleAtFixedRate(
buildRunTask(),
ioConfig.getStartDelay().getMillis(),
Math.max(ioConfig.getPeriod().getMillis(), MAX_RUN_FREQUENCY_MILLIS),
TimeUnit.MILLISECONDS
);

started = true;
log.info("Started KafkaSupervisor[%s], first run in [%s]", dataSource, ioConfig.getStartDelay());
}
}

上面主要进行几个操作:

1. 创建 KafkaConsumer

通过 getKafkaConsumer() 创建 KafkaConsumer

2. notices 队列处理线程

ExecutorService.submit() 一个线程, 该线程不断从 BlockingQueue 中获取 Notice 并进行处理.

3. 定期往 notices 队列推 notice

ScheduledExecutorService.scheduleAtFixedRate() 定期往 notices 队列里增加一个 RunNotice.

RunNotice 处理


从上面可知, Notice 是 Supervisor 运行的关键。

其中从 RunNotice 的定义可知:

  @Override
public void handle()
{
long nowTime = System.currentTimeMillis();
if (nowTime - lastRunTime < MAX_RUN_FREQUENCY_MILLIS) {
return;
}
lastRunTime = nowTime;

runInternal();
}

void runInternal()
{
possiblyRegisterListener();
updatePartitionDataFromKafka();
discoverTasks();
updateTaskStatus();
checkTaskDuration();
checkPendingCompletionTasks();
checkCurrentTaskState();
createNewTasks();

}

核心处理逻辑在 KafkaSupervisor.runInternal() 中,接下来就分别看看这几个方法。

possiblyRegisterListener()


主要进行 TaskRunner 的 Listener 注册, 主要是 statusChanged() 回调定义, 只是往 notices 队列增加一个 RunNotice() :

  taskRunner.get().registerListener(
new TaskRunnerListener()
{
@Override
public String getListenerId()
{
return supervisorId;
}
...
@Override
public void statusChanged(String taskId, TaskStatus status)
{
notices.add(new RunNotice());
}
}, MoreExecutors.sameThreadExecutor()
);

updatePartitionDataFromKafka()


首先来看一下 KafkaSupervisor.partitionGroups 这个数据结构, 他是一个 Map<Integer, Map<Integer, Long>>,
它记录的内容是 Map<{group ID}, Map<{partition ID}, {startingOffset}>>,
我们把其中的 Map<{partition ID}, {startingOffset}> 称为 partitionMap,
这个方法的主要工作就是:
检查 ioConfig.getTopic() 的所有分区,

  1. 查看每个分区对应的 taskGroupId 是否有 key, 无则创建.
    taskGroupId 等于 partition % ioConfig.getTaskCount(), 也即是说如果设置了 3 个 task,
    Kafka topic 有 9 个分区, 那将会有 3 个 taskGroupId, 每个 taskGroupId 会消费 3 个分区.
    如果 taskCount 大于 分区数, 那么 taskCount 的值会被设置为分区数。
  2. 检查 taskGroupId 的 partitionMap 是否有某个 partition 的 key, 无则创建.

discoverTasks()


这部分将会从 taskStorage 读取 activeTasks, 如果是属于当前 dataSource 而且类型属于 KafkaIndexTask, 则:
通过 task 对应的 taskGroupId 查看有对应的 TaskGroup , 无则创建, 并把 task 加入到该 TaskGroup。

其中数据库读取出来的 KafkaIndexTask 如下所示, 定义了运行所需的信息 :

@JsonCreator
public KafkaIndexTask(
@JsonProperty("id") String id,
@JsonProperty("resource") TaskResource taskResource,
@JsonProperty("dataSchema") DataSchema dataSchema,
@JsonProperty("tuningConfig") KafkaTuningConfig tuningConfig,
@JsonProperty("ioConfig") KafkaIOConfig ioConfig,
@JsonProperty("context") Map<String, Object> context,
@JacksonInject ChatHandlerProvider chatHandlerProvider
)

updateTaskStatus()


从 taskStorage 中获取 task 的最新状态, 同步到内存中。

checkTaskDuration()


检查每个 taskGroup 中的任务运行时间, 如果超过设置中的 ioConfig.taskDuration, 则保存状态并停止它。

checkPendingCompletionTasks()


这个方法检查 pendingCompletionTaskGroups ,寻找 group 中已经完成的 tasks。
如果 group 中有任意一个 task 完成了,我们都可以停止 group 中的其他 tasks。
如果 group 已经达到 publishing timeout , 则停掉当前 group 、同 groupId 中接下来的 group 和正在运行的 group。

checkCurrentTaskState()


检查 taskGroups 中的任务状态,停止和删除错误或失败的 tasks , 如果 group 中某一任务成功了,停止 group 中的所有任务。

createNewTasks()


终于到了创建 task 的地方 , 这里首先会遍历 partitionGroups 的 keys, 也即是 groupId,
看 taskGroups 中是否存在该 groupId, 否则创建一个 TaskGroup.

然后循环 taskGroups , 检查 ioConfig.getReplicas() > taskGroup.tasks.size(),
tasks 数目是否会小于配置的 replicas 数目, 是则通过 createKafkaTasksForGroup() 创建 task。

createKafkaTasksForGroup()

我们来看看 createKafkaTasksForGroup(int groupId, int replicas):

private void createKafkaTasksForGroup(int groupId, int replicas)
{
Map<Integer, Long> startPartitions = taskGroups.get(groupId).partitionOffsets;
Map<Integer, Long> endPartitions = new HashMap<>();
for (Integer partition : startPartitions.keySet()) {
endPartitions.put(partition, Long.MAX_VALUE);
}

String sequenceName = generateSequenceName(groupId);

Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();

KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
sequenceName,
new KafkaPartitions(ioConfig.getTopic(), startPartitions),
new KafkaPartitions(ioConfig.getTopic(), endPartitions),
consumerProperties,
true,
false,
minimumMessageTime
);

for (int i = 0; i < replicas; i++) {
String taskId = Joiner.on("_").join(sequenceName, getRandomId());
KafkaIndexTask indexTask = new KafkaIndexTask(
taskId,
new TaskResource(sequenceName, 1),
spec.getDataSchema(),
spec.getTuningConfig(),
kafkaIOConfig,
ImmutableMap.<String, Object>of(),
null
);

Optional<TaskQueue> taskQueue = taskMaster.getTaskQueue();
if (taskQueue.isPresent()) {
try {
taskQueue.get().add(indexTask);
}
catch (EntryExistsException e) {
log.error("Tried to add task [%s] but it already exists", indexTask.getId());
}
} else {
log.error("Failed to get task queue because I'm not the leader!");
}
}
}

如上, 主要根据 replicas 参数进行 task 创建,然后通过 taskMaster.getTaskQueue().get().add() 增加新建的 task。
TaskQueue.add() 定义如下:

public boolean add(final Task task) throws EntryExistsException
{
giant.lock();

try {
Preconditions.checkState(active, "Queue is not active!");
Preconditions.checkNotNull(task, "task");
Preconditions.checkState(tasks.size() < config.getMaxSize(), "Too many tasks (max = %,d)", config.getMaxSize());

// If this throws with any sort of exception, including TaskExistsException, we don't want to
// insert the task into our queue. So don't catch it.
taskStorage.insert(task, TaskStatus.running(task.getId()));
addTaskInternal(task);
managementMayBeNecessary.signalAll();
return true;
}
finally {
giant.unlock();
}
}

// Should always be called after taking giantLock
private void addTaskInternal(final Task task){
tasks.add(task);
taskLockbox.add(task);
}

如上, 首先将通过 taskStorage 把 task 信息存储持久化,
然后把 task 加入到 tasks 中, tasks 的使用是在 manage() 进行,
而 manage() 是在一个 forever loop 中调用,
TaskQueue.start() 可以知道:

  @LifecycleStart
public void start()
{
giant.lock();

try {
Preconditions.checkState(!active, "queue must be stopped");
active = true;
syncFromStorage();
managerExec.submit(
new Runnable()
{
@Override
public void run()
{
while (true) {
try {
manage();
break;
}
catch (InterruptedException e) {
log.info("Interrupted, exiting!");
break;
}
catch (Exception e) {
final long restartDelay = config.getRestartDelay().getMillis();
log.makeAlert(e, "Failed to manage").addData("restartDelay", restartDelay).emit();
try {
Thread.sleep(restartDelay);
}
catch (InterruptedException e2) {
log.info("Interrupted, exiting!");
break;
}
}
}
}
}
);
...
}
finally {
giant.unlock();
}
}

因此, 继续看 manage() ,此方法会不断检查 tasks , 如果有未运行的 task, 则通过 taskRunner.run(task) 运行它:

private void manage() throws InterruptedException
{
log.info("Beginning management in %s.", config.getStartDelay());
Thread.sleep(config.getStartDelay().getMillis());

// Ignore return value- we'll get the IDs and futures from getKnownTasks later.
taskRunner.restore();

while (active) {
giant.lock();

try {
// Task futures available from the taskRunner
final Map<String, ListenableFuture<TaskStatus>> runnerTaskFutures = Maps.newHashMap();
for (final TaskRunnerWorkItem workItem : taskRunner.getKnownTasks()) {
runnerTaskFutures.put(workItem.getTaskId(), workItem.getResult());
}
// Attain futures for all active tasks (assuming they are ready to run).
// Copy tasks list, as notifyStatus may modify it.
for (final Task task : ImmutableList.copyOf(tasks)) {
if (!taskFutures.containsKey(task.getId())) {
final ListenableFuture<TaskStatus> runnerTaskFuture;
if (runnerTaskFutures.containsKey(task.getId())) {
runnerTaskFuture = runnerTaskFutures.get(task.getId());
} else {
// Task should be running, so run it.
final boolean taskIsReady;
try {
taskIsReady = task.isReady(taskActionClientFactory.create(task));
}
catch (Exception e) {
log.warn(e, "Exception thrown during isReady for task: %s", task.getId());
notifyStatus(task, TaskStatus.failure(task.getId()));
continue;
}
if (taskIsReady) {
log.info("Asking taskRunner to run: %s", task.getId());
runnerTaskFuture = taskRunner.run(task);
} else {
continue;
}
}
taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
}
}
// Kill tasks that shouldn't be running
final Set<String> tasksToKill = Sets.difference(
runnerTaskFutures.keySet(),
ImmutableSet.copyOf(
Lists.transform(
tasks,
new Function<Task, Object>()
{
@Override
public String apply(Task task)
{
return task.getId();
}
}
)
)
);
if (!tasksToKill.isEmpty()) {
log.info("Asking taskRunner to clean up %,d tasks.", tasksToKill.size());
for (final String taskId : tasksToKill) {
try {
taskRunner.shutdown(taskId);
}
catch (Exception e) {
log.warn(e, "TaskRunner failed to clean up task: %s", taskId);
}
}
}
// awaitNanos because management may become necessary without this condition signalling,
// due to e.g. tasks becoming ready when other folks mess with the TaskLockbox.
managementMayBeNecessary.awaitNanos(60000000000L /* 60 seconds */);
}
finally {
giant.unlock();
}
}
}

TaskRunner 的实现是 RemoteTaskRunner (io.druid.indexing.overlord):

@Override
public ListenableFuture<TaskStatus> run(final Task task)
{
final RemoteTaskRunnerWorkItem completeTask, runningTask, pendingTask;
if ((pendingTask = pendingTasks.get(task.getId())) != null) {
log.info("Assigned a task[%s] that is already pending, not doing anything", task.getId());
return pendingTask.getResult();
} else if ((runningTask = runningTasks.get(task.getId())) != null) {
ZkWorker zkWorker = findWorkerRunningTask(task.getId());
if (zkWorker == null) {
log.warn("Told to run task[%s], but no worker has started running it yet.", task.getId());
} else {
log.info("Task[%s] already running on %s.", task.getId(), zkWorker.getWorker().getHost());
TaskAnnouncement announcement = zkWorker.getRunningTasks().get(task.getId());
if (announcement.getTaskStatus().isComplete()) {
taskComplete(runningTask, zkWorker, announcement.getTaskStatus());
}
}
return runningTask.getResult();
} else if ((completeTask = completeTasks.get(task.getId())) != null) {
return completeTask.getResult();
} else {
return addPendingTask(task).getResult();
}
}

如上, 对于一个 task, 分别判断是否在 pendingTasks 、 runningTasks 或 completeTasks 中, 然后返回相应的状态。
如果都不在, 则通过 addPendingTask(task) 加到 pendingTasks 中.

addPendingTask(task) 中会调用 runPendingTasks() :

private void runPendingTasks()
{
runPendingTasksExec.submit(
new Callable<Void>()
{
@Override
public Void call() throws Exception
{
try {
// make a copy of the pending tasks because tryAssignTask may delete tasks from pending and move them
// into running status
List<RemoteTaskRunnerWorkItem> copy = Lists.newArrayList(pendingTasks.values());
for (RemoteTaskRunnerWorkItem taskRunnerWorkItem : copy) {
String taskId = taskRunnerWorkItem.getTaskId();
if (tryAssignTasks.putIfAbsent(taskId, taskId) == null) {
try {
//this can still be null due to race from explicit task shutdown request
//or if another thread steals and completes this task right after this thread makes copy
//of pending tasks. See https://github.com/druid-io/druid/issues/2842 .
Task task = pendingTaskPayloads.get(taskId);
if (task != null && tryAssignTask(task, taskRunnerWorkItem)) {
pendingTaskPayloads.remove(taskId);
}
}
catch (Exception e) {
log.makeAlert(e, "Exception while trying to assign task")
.addData("taskId", taskRunnerWorkItem.getTaskId())
.emit();
RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(taskId);
if (workItem != null) {
taskComplete(workItem, null, TaskStatus.failure(taskId));
}
}
finally {
tryAssignTasks.remove(taskId);
}
}
}
}
catch (Exception e) {
log.makeAlert(e, "Exception in running pending tasks").emit();
}

return null;
}
}
);
}

如上, 主要工作是通过 tryAssignTask(task, taskRunnerWorkItem) 进行 Task 指派.

private boolean tryAssignTask(final Task task, final RemoteTaskRunnerWorkItem taskRunnerWorkItem) throws Exception
{
Preconditions.checkNotNull(task, "task");
Preconditions.checkNotNull(taskRunnerWorkItem, "taskRunnerWorkItem");
Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");

if (runningTasks.containsKey(task.getId()) || findWorkerRunningTask(task.getId()) != null) {
log.info("Task[%s] already running.", task.getId());
return true;
} else {
// Nothing running this task, announce it in ZK for a worker to run it
WorkerBehaviorConfig workerConfig = workerConfigRef.get();
WorkerSelectStrategy strategy;
if (workerConfig == null || workerConfig.getSelectStrategy() == null) {
log.warn("No worker selections strategy set. Using default.");
strategy = WorkerBehaviorConfig.DEFAULT_STRATEGY;
} else {
strategy = workerConfig.getSelectStrategy();
}

ZkWorker assignedWorker = null;
Optional<ImmutableWorkerInfo> immutableZkWorker = null;
try {
immutableZkWorker = strategy.findWorkerForTask(
config,
ImmutableMap.copyOf(
Maps.transformEntries(
Maps.filterEntries(
zkWorkers, new Predicate<Map.Entry<String, ZkWorker>>()
{
@Override
public boolean apply(Map.Entry<String, ZkWorker> input)
{
return !lazyWorkers.containsKey(input.getKey()) &&
!workersWithUnacknowledgedTask.containsKey(input.getKey());
}
}
),
new Maps.EntryTransformer<String, ZkWorker, ImmutableWorkerInfo>()
{
@Override
public ImmutableWorkerInfo transformEntry(
String key, ZkWorker value
)
{
return value.toImmutable();
}
}
)
),
task
);

if (immutableZkWorker.isPresent()) {
if (workersWithUnacknowledgedTask.putIfAbsent(immutableZkWorker.get().getWorker().getHost(), task.getId())
== null) {
assignedWorker = zkWorkers.get(immutableZkWorker.get().getWorker().getHost());
return announceTask(task, assignedWorker, taskRunnerWorkItem);
} else {
log.debug(
"Lost race to run task [%s] on worker [%s]. Workers to ack tasks are [%s].",
task.getId(),
immutableZkWorker.get().getWorker().getHost(),
workersWithUnacknowledgedTask
);
}
} else {
log.debug(
"Unsuccessful task-assign attempt for task [%s] on workers [%s]. Workers to ack tasks are [%s].",
task.getId(),
zkWorkers.values(),
workersWithUnacknowledgedTask
);
}

return false;
}
finally {
if (assignedWorker != null) {
workersWithUnacknowledgedTask.remove(assignedWorker.getWorker().getHost());
}

if(immutableZkWorker.isPresent()) {
//if this attempt lost the race to run the task then there might be another worker available to try on.
//if this attempt won the race to run the task then other task might be able to use this worker now after task ack.
runPendingTasks();
}
}
}
}

如上,指派 task 是通过 Zookeeper 来实现的。

  1. strategy.findWorkerForTask()
    strategy 实现是 FillCapacityWorkerSelectStrategy, 主要是对 worker 通过 CurrCapacityUsed 和 Version 进行排序,返回最合适的 worker。
public class FillCapacityWorkerSelectStrategy implements WorkerSelectStrategy
{
@Override
public Optional<ImmutableWorkerInfo> findWorkerForTask(
final WorkerTaskRunnerConfig config,
final ImmutableMap<String, ImmutableWorkerInfo> zkWorkers,
final Task task
)
{
TreeSet<ImmutableWorkerInfo> sortedWorkers = Sets.newTreeSet(
new Comparator<ImmutableWorkerInfo>()
{
@Override
public int compare(
ImmutableWorkerInfo zkWorker, ImmutableWorkerInfo zkWorker2
)
{
int retVal = Ints.compare(zkWorker2.getCurrCapacityUsed(), zkWorker.getCurrCapacityUsed());
// the version sorting is needed because if the workers have the same currCapacityUsed only one of them is
// returned. Exists the possibility that this worker is disabled and doesn't have valid version so can't
// run new tasks, so in this case the workers are sorted using version to ensure that if exists enable
// workers the comparator return one of them.

if (retVal == 0) {
retVal = zkWorker.getWorker().getVersion().compareTo(zkWorker2.getWorker().getVersion());
}

return retVal;
}
}
);
sortedWorkers.addAll(zkWorkers.values());
final String minWorkerVer = config.getMinWorkerVersion();

for (ImmutableWorkerInfo zkWorker : sortedWorkers) {
if (zkWorker.canRunTask(task) && zkWorker.isValidVersion(minWorkerVer)) {
return Optional.of(zkWorker);
}
}

return Optional.absent();
}
}
  1. 如果有合适的 worker , 会通过 announceTask(task, assignedWorker, taskRunnerWorkItem) 进行任务指派:
private boolean announceTask(
final Task task,
final ZkWorker theZkWorker,
final RemoteTaskRunnerWorkItem taskRunnerWorkItem
) throws Exception
{
Preconditions.checkArgument(task.getId().equals(taskRunnerWorkItem.getTaskId()), "task id != workItem id");
final String worker = theZkWorker.getWorker().getHost();
synchronized (statusLock) {
if (!zkWorkers.containsKey(worker) || lazyWorkers.containsKey(worker)) {
// the worker might got killed or has been marked as lazy.
log.info("Not assigning task to already removed worker[%s]", worker);
return false;
}
log.info("Coordinator asking Worker[%s] to add task[%s]", worker, task.getId());

CuratorUtils.createIfNotExists(
cf,
JOINER.join(indexerZkConfig.getTasksPath(), worker, task.getId()),
CreateMode.EPHEMERAL,
jsonMapper.writeValueAsBytes(task),
config.getMaxZnodeBytes()
);

RemoteTaskRunnerWorkItem workItem = pendingTasks.remove(task.getId());
if (workItem == null) {
log.makeAlert("WTF?! Got a null work item from pending tasks?! How can this be?!")
.addData("taskId", task.getId())
.emit();
return false;
}

RemoteTaskRunnerWorkItem newWorkItem = workItem.withWorker(theZkWorker.getWorker(), null);
runningTasks.put(task.getId(), newWorkItem);
log.info("Task %s switched from pending to running (on [%s])", task.getId(), newWorkItem.getWorker().getHost());
TaskRunnerUtils.notifyStatusChanged(listeners, task.getId(), TaskStatus.running(task.getId()));

// Syncing state with Zookeeper - don't assign new tasks until the task we just assigned is actually running
// on a worker - this avoids overflowing a worker with tasks
Stopwatch timeoutStopwatch = Stopwatch.createStarted();
while (!isWorkerRunningTask(theZkWorker.getWorker(), task.getId())) {
final long waitMs = config.getTaskAssignmentTimeout().toStandardDuration().getMillis();
statusLock.wait(waitMs);
long elapsed = timeoutStopwatch.elapsed(TimeUnit.MILLISECONDS);
if (elapsed >= waitMs) {
log.makeAlert(
"Task assignment timed out on worker [%s], never ran task [%s]! Timeout: (%s >= %s)!",
worker,
task.getId(),
elapsed,
config.getTaskAssignmentTimeout()
);
taskComplete(taskRunnerWorkItem, theZkWorker, TaskStatus.failure(task.getId()));
break;
}
}
return true;
}
}

如上,CuratorUtils.createIfNotExists() 会把 task 写入 Zookeeper,
然后将 task 从 pendingTasks 移除, 并加入到 runningTasks 中,
然后会等待 taskAssignmentTimeout 时间,等待 worker 把 task 变更为 RUNNING 状态,
如果超时,则停止该 task。

最后


至此, Supervisor 创建、启动、下发任务的大概流程已经介绍完,至于 worker 如何执行 task,只能下一篇再介绍。