在文件批处理的时候, 经常会遇到这样的需求, 比如文件名中带有一些信息,MapReduce 中需要通过获取文件名来分别进行不同业务上的处理。
MultipleInputs 下获取文件名的问题
在 org.apache.hadoop.mapreduce.Mapper#setup(Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>.Context context)
中,
常用以下方式获取 FileSplit,
再通过 FileSplit.getPath().getName()
获取文件名
FileSplit split = (FileSplit) context.getInputSplit(); |
以上方法在用于 FileInputFormat.addInputPath()
时是 ok 的.
但是, 如果使用 MultipleInputs.addInputPath()
增加 Input, 那么继续使用上面的方法将得到一个异常:
java.lang.ClassCastException: org.apache.hadoop.mapreduce.lib.input.TaggedInputSplit cannot be cast to org.apache.hadoop.mapreduce.lib.input.FileSplit |
原因如下, 在 MultipleInputs.addInputPath() 中为 job 设置了 DelegatingMapper 和 DelegatingInputFormat :
public static void addInputPath(Job job, Path path, |
在 job 提交运行时 :
mapreduce.JobSubmitter.submitJobInternal() |
JobSubmitter.writeNewSplits() 中会通过 input.getSplits(job)
获取 splits, 其中 input
就是 DelegatingInputFormat .
DelegatingInputFormat.getSplits()
方法细节如下:
public List<InputSplit> getSplits(JobContext job) |
可以发现, TaggedInputSplit 就是在这里构造的.
其中 format 是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat , getSplits() 返回 FileSplit 列表, FileSplit 就是我们想要的.
不仅仅是 Class Cast 的问题
然而, 还有一个问题, 我们打开 TaggedInputSplit
的源码发现, TaggedInputSplit
不是一个 public scope
的类, 因此我们无法直接获取 TaggedInputSplit.inputSplit
了.
class TaggedInputSplit extends InputSplit implements Configurable, Writable { |
因此需要通过反射来 hack.
反射获取 TaggedInputSplit.inputSplit
在自定义的 mapreduce.Mapper#setup() 中进行反射, 从而使 TaggedInputSplit 内部的 FileSplit 能被获取到 :
@Override |
参考
hadoop MultipleInputs fails with ClassCastException - Stack Overflow