Spark UnsafeRow 对象

UnsafeRow

UnsafeRow 是 Spark Tungsten 内存管理中的重要部分, 用 Raw Memory (堆外内存) 来存储 Row 数据, 以避免不必要的 JVM GC 和 Java Object 的内存开销, 因为 Spark 比 JVM 更了解哪些内存数据可以保存或释放, 它支持 Java Externalizable 和 Kryo KryoSerializable 的 序列化和反序列化.

UnsafeRow 主要有三个区域:

  • null Bit Set
  • Fixed Length Values
  • Variable Length Values

null Bit Set 区域

这个区域里, 每个 bit 用来标记每个 Field 是否 null.

如下是设置 bit set 的方法:

public static void set(Object baseObject, long baseOffset, int index) {
assert index >= 0 : "index (" + index + ") should >= 0";

long mask = 1L << (index & 63);
long wordOffset = baseOffset + (long)(index >> 6) * 8L;
long word = Platform.getLong(baseObject, wordOffset);
Platform.putLong(baseObject, wordOffset, word | mask);
}

如上, baseObject 一般是 byte[].

  • 假设要设置 index=0 的 Field 为 null, 那么此时 mask 将等于 0b0001 (这里只显示低4位), wordOffset 等于 baseOffset + 0, (index >> 6) * 8L 是因为 word 是 8 字节 (64位), 对应 64位, 能够存放 64 个 Field 的 null 标记, 计算之后可以知道 word 的 offset (在第几个 word), 然后我们通过 wordOffset 可以获取要找的已有的 word 值, 该 word 记录了 64 个 Field 的 null 标记, 接着我们只需要把 word | mask 计算后更新回去即可, 这里是 0b0000 | 0b0001, 等于 0b0001.

  • 接着假设再设置 index=1 的 Field 为 null, 此时 mask 将等于 0b0010 (这里只显示低4位), wordOffset 仍然等于 baseOffset + 0, 因为是同一个 word, 然后读出 word, 是前面更新写入后的 0b0001, 然后计算 word | mask, 即 0b0001 | 0b0010, 等于 0b0011, 然后把这个新的 word 值更新回去.

Fixed Length Values 区域

这个区域, 顾名思义, 每个 Field 以 8字节 的固定长度储存.

这里的 Field 分为两类:

  1. 一类是像 long double 或 int 这些原子类型的值, 以 8字节直接储存值.
  2. 另一类是 pointer 或 reference, 因为这类 Field 数据是可变长度的, 因此存放在了另一个地方: Variable Length Values 区域, 而当前区域相当于只存放 Field 数据的索引.
    此类 Field 的记录又分为两部分(各4字节):
    第一个: Field pointer 指向的数据在 Variable Length Values 区域中的 byte-offset 位置(占4字节)
    第二个: Field 的长度(占4字节), 长度单位是bytes, 这两个 4字节 的 integer 合并成了 8 字节来储存.

Variable Length Values 区域

该区域存放的是 可变长度类型的 Field 的数据, 通过查找 Fixed Length Values 区域的索引信息, 即可定位到 Field 数据.

如下是获取 Field 的 offset 位置的方法:

private long getFieldOffset(int ordinal) {
return baseOffset + bitSetWidthInBytes + ordinal * 8L;
}

baseOffset 加上 bitset 区域的大小(bitSetWidthInBytes), 再加上 Field ordinal * 8字节, 即是 Field 数据所在的 offset 位置.

例如假设要获取 UTF8String 的数据, 如下:

public UTF8String getUTF8String(int ordinal) {
if (isNullAt(ordinal)) return null;
final long offsetAndSize = getLong(ordinal);
final int offset = (int) (offsetAndSize >> 32);
final int size = (int) offsetAndSize;
return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
}

offsetAndSize 是8字节, 前32位指示了数据 Variable Length Values 区域中的 byte-offset 位置, 即 offset 变量, 后32位存放数据的长度, 即 size 变量.

有了 offset 和 size 两个信息, 即可定位 Field 数据, 并加载读取出来了.

因此, 通过这三个区域, 我们就实现了 Row 对象的存储, 是管理内存的重要部分.

参考


https://books.japila.pl/spark-sql-internals/UnsafeRow/#mutable-types