hack - MapReduce MultipleInputs & 获取输入的文件名

在文件批处理的时候, 经常会遇到这样的需求, 比如文件名中带有一些信息,MapReduce 中需要通过获取文件名来分别进行不同业务上的处理。

MultipleInputs 下获取文件名的问题


org.apache.hadoop.mapreduce.Mapper#setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context) 中,

常用以下方式获取 FileSplit, 再通过 FileSplit.getPath().getName() 获取文件名

FileSplit split = (FileSplit) context.getInputSplit();

以上方法在用于 FileInputFormat.addInputPath() 时是 ok 的.

但是, 如果使用 MultipleInputs.addInputPath() 增加 Input, 那么继续使用上面的方法将得到一个异常:

java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit cannot be cast to org.apache.hadoop.mapreduce.lib.input.FileSplit

原因如下, 在 MultipleInputs.addInputPath() 中为 job 设置了 DelegatingMapper 和 DelegatingInputFormat :

public static void addInputPath(Job job, Path path,
Class<? extends InputFormat> inputFormatClass,
Class<? extends Mapper> mapperClass) {

addInputPath(job, path, inputFormatClass);
Configuration conf = job.getConfiguration();
String mapperMapping = path.toString() + ";" + mapperClass.getName();
String mappers = conf.get(DIR_MAPPERS);
conf.set(DIR_MAPPERS, mappers == null ? mapperMapping
: mappers + "," + mapperMapping);

job.setMapperClass(DelegatingMapper.class);
}

public static void addInputPath(Job job, Path path,
Class<? extends InputFormat> inputFormatClass) {
String inputFormatMapping = path.toString() + ";"
+ inputFormatClass.getName();
Configuration conf = job.getConfiguration();
String inputFormats = conf.get(DIR_FORMATS);
conf.set(DIR_FORMATS,
inputFormats == null ? inputFormatMapping : inputFormats + ","
+ inputFormatMapping);

job.setInputFormatClass(DelegatingInputFormat.class);
}

在 job 提交运行时 :

mapreduce.JobSubmitter.submitJobInternal()
|- JobSubmitter.writeNewSplits()
|- JobSubmitter.writeNewSplits()

JobSubmitter.writeNewSplits() 中会通过 input.getSplits(job) 获取 splits, 其中 input 就是 DelegatingInputFormat .

DelegatingInputFormat.getSplits() 方法细节如下:

public List<InputSplit> getSplits(JobContext job)
throws IOException, InterruptedException {
......
List<InputSplit> pathSplits = format.getSplits(jobCopy);
for (InputSplit pathSplit : pathSplits) {
splits.add(new TaggedInputSplit(pathSplit, conf, format.getClass(),
mapperClass));
}
......
return splits;
}

可以发现, TaggedInputSplit 就是在这里构造的.

其中 format 是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat , getSplits() 返回 FileSplit 列表, FileSplit 就是我们想要的.

不仅仅是 Class Cast 的问题


然而, 还有一个问题, 我们打开 TaggedInputSplit 的源码发现, TaggedInputSplit 不是一个 public scope 的类, 因此我们无法直接获取 TaggedInputSplit.inputSplit 了.

class TaggedInputSplit extends InputSplit implements Configurable, Writable {

private Class<? extends InputSplit> inputSplitClass;

private InputSplit inputSplit;

private Class<? extends InputFormat> inputFormatClass;

private Class<? extends Mapper> mapperClass;

private Configuration conf;

public TaggedInputSplit() {
// Default constructor.
}

public TaggedInputSplit(InputSplit inputSplit, Configuration conf,
Class<? extends InputFormat> inputFormatClass,
Class<? extends Mapper> mapperClass) {
this.inputSplitClass = inputSplit.getClass();
this.inputSplit = inputSplit;
this.conf = conf;
this.inputFormatClass = inputFormatClass;
this.mapperClass = mapperClass;
}
......
}

因此需要通过反射来 hack.

反射获取 TaggedInputSplit.inputSplit


在自定义的 mapreduce.Mapper#setup() 中进行反射, 从而使 TaggedInputSplit 内部的 FileSplit 能被获取到 :

@Override
protected void setup(Context context) {
InputSplit split = context.getInputSplit();
Class<? extends InputSplit> splitClass = split.getClass();

FileSplit fileSplit = null;
if (splitClass.equals(FileSplit.class)) {
fileSplit = (FileSplit) split;
} else if (splitClass.getName().equals(
"org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit")) {
// begin reflection hackery...

try {
Method getInputSplitMethod = splitClass
.getDeclaredMethod("getInputSplit");
getInputSplitMethod.setAccessible(true);
fileSplit = (FileSplit) getInputSplitMethod.invoke(split);
} catch (Exception e) {
// wrap and re-throw error
throw new RuntimeException(e);
}

// end reflection hackery
}
......
}

参考


hadoop MultipleInputs fails with ClassCastException - Stack Overflow