Jaccard 系数计算 (Spark实现)

Jaccard 计算 (Spark 实现)

之前我们讨论过了在大数据场景下 Jaccard 系数的计算方法, 那么在 Spark 中如何实现呢 ?

Spark ML 中的示例

import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors

// 构造示例数据
// 0 : [1, 1, 1, 0, 0, 0]
// 1 : [0, 0, 1, 1, 1, 0]
// 2 : [1, 0, 1, 0, 1, 0]
// 3 : [0, 1, 0, 1, 0, 1]
// 4 : [0, 0, 1, 1, 0, 1]
// 5 : [0, 1, 1, 0, 1, 0]

val dfA = spark.createDataFrame(Seq(
(0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
(1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
(2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "keys")
dfA.show(20, false)

/*
+---+-------------------------+
|id |keys |
+---+-------------------------+
|0 |(6,[0,1,2],[1.0,1.0,1.0])|
|1 |(6,[2,3,4],[1.0,1.0,1.0])|
|2 |(6,[0,2,4],[1.0,1.0,1.0])|
+---+-------------------------+
*/

val dfB = spark.createDataFrame(Seq(
(3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
(4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
(5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "keys")

// setNumHashTables 为 3 , 这个 HashTable 就是我们之前博客提到的 LSH , 提高这个参数值可以降低假阴率(减少被漏掉的相似item), 但是会增加计算开销, 需要根据情况取个平衡.
val mh = new MinHashLSH()
.setNumHashTables(3)
.setInputCol("keys")
.setOutputCol("values")

val model = mh.fit(dfA)

// Feature Transformation
model.transform(dfA).show(20, false)

/*
+---+-------------------------+------------------------------------------------------+
|id |keys |values |
+---+-------------------------+------------------------------------------------------+
|0 |(6,[0,1,2],[1.0,1.0,1.0])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9]]|
|1 |(6,[2,3,4],[1.0,1.0,1.0])|[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]] |
|2 |(6,[0,2,4],[1.0,1.0,1.0])|[[-2.031299587E9], [-1.758749518E9], [-1.974047307E9]]|
+---+-------------------------+------------------------------------------------------+
*/

// Approximate similarity join
// 如下结果所示, 默认分值列是 distCol
model.approxSimilarityJoin(dfA, dfB, 0.6).show(false)

/*
+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+
|datasetA |datasetB |distCol|
+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+
|[0, (6,[0,1,2],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.974047307E9]]]|[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 |
|[1, (6,[2,3,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |[4, (6,[2,3,5],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |0.5 |
|[1, (6,[2,3,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 |
|[2, (6,[0,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-1.974047307E9]]]|[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 |
+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+
*/

// 也可以传入 cached transformed columns
val transformedA = model.transform(dfA).cache()
val transformedB = model.transform(dfB).cache()
model.approxSimilarityJoin(transformedA, transformedB, 0.6).show(false)

/*
+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+
|datasetA |datasetB |distCol|
+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+
|[0, (6,[0,1,2],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.974047307E9]]]|[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 |
|[1, (6,[2,3,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |[4, (6,[2,3,5],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |0.5 |
|[1, (6,[2,3,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 |
|[2, (6,[0,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-1.974047307E9]]]|[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 |
+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+
*/

// Self Join
model.approxSimilarityJoin(dfA, dfA, 0.6).filter("datasetA.id < datasetB.id").show(false)

// Approximate nearest neighbor search
val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))
model.approxNearestNeighbors(dfA, key, 2).show(false)

参考


https://spark.apache.org/docs/2.1.0/ml-features.html