MapReduce Map Side Join - 用户经纬度转换的例子

工作中, 我们经常碰到大小表 Join 的问题.
例如: 我们有一个用户经纬度上报日志大表 A + 一个经纬度与行政区域映射小表 B, 我们需要把A中的用户经纬度经过B的映射转换为所处的行政区域名称, 这就面临两个问题:

  1. Join 的逻辑比较复杂, 不是简单的 a.id = b.id , B 中只有经纬度范围(矩形位置)和对应的行政区域名称.
  2. 日志表 A 是大表, 一般是百GB级别, 区域映射表 B 是小表, 可能是几十MB, 直接 join 的话, 很耗费时间和资源.

如果用 Map Side Join 的话, 则容易实现和高效得多.

什么是 Map Side Join ?


顾名思义, 就是在 Mapper 端进行 Join 操作, 因此需要把小表维护在 Mapper 本地, 由于不需要进行 Reduce, Mapper 可以直接输出 Join 结果到 HDFS, 从而避免了 reduce 的网络和磁盘开销, 得到结果的速度也更快.

如何做 ?


  • 首先把表 B 上传到 HDFS 上
  • 在 MR 的任务中使用 org.apache.hadoop.mapreduce.Job#addCacheFile(URI uri) 增加文件, DistributedCache 会负责将这个文件下载到 Mapper 所在的机器上,
  • 代码上在 mapper 端可以通过 context.getLocalCacheFiles() 获取到这些已下载文件, 然后就可以读到表 B 的数据文件了, 并把小表数据加载到 Mapper 所在机器的内存,
  • 如果为了提高速度, 可以做一些内存索引, 加快和大表的 Join 速度. 比如, 将表 B 的纬度作为 key, value 为经纬度矩形的链表, 以此来减少计算的次数等等.

public static class SdkLogMapper extends
Mapper<WritableComparable, HCatRecord, WritableComparable, HCatRecord> {
@Override
protected void setup(Context context) throws IOException, InterruptedException {
//
Path[] paths = context.getLocalCacheFiles();
// ...
}
}


public static void main(String[] args) throws Exception {
// ...
job.addCacheFile(hdfsFilePathUri);
// ...
}

参考