Jaccard 系数计算 (Spark实现) 发表于 2018-12-08 Jaccard 计算 (Spark 实现)之前我们讨论过了在大数据场景下 Jaccard 系数的计算方法, 那么在 Spark 中如何实现呢 ? Spark ML 中的示例import org.apache.spark.ml.feature.MinHashLSHimport org.apache.spark.ml.linalg.Vectors// 构造示例数据// 0 : [1, 1, 1, 0, 0, 0]// 1 : [0, 0, 1, 1, 1, 0]// 2 : [1, 0, 1, 0, 1, 0]// 3 : [0, 1, 0, 1, 0, 1]// 4 : [0, 0, 1, 1, 0, 1]// 5 : [0, 1, 1, 0, 1, 0]val dfA = spark.createDataFrame(Seq( (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))), (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))), (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0)))))).toDF("id", "keys")dfA.show(20, false)/*+---+-------------------------+|id |keys |+---+-------------------------+|0 |(6,[0,1,2],[1.0,1.0,1.0])||1 |(6,[2,3,4],[1.0,1.0,1.0])||2 |(6,[0,2,4],[1.0,1.0,1.0])|+---+-------------------------+ */val dfB = spark.createDataFrame(Seq( (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))), (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))), (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0)))))).toDF("id", "keys")// setNumHashTables 为 3 , 这个 HashTable 就是我们之前博客提到的 LSH , 提高这个参数值可以降低假阴率(减少被漏掉的相似item), 但是会增加计算开销, 需要根据情况取个平衡.val mh = new MinHashLSH() .setNumHashTables(3) .setInputCol("keys") .setOutputCol("values")val model = mh.fit(dfA)// Feature Transformationmodel.transform(dfA).show(20, false)/*+---+-------------------------+------------------------------------------------------+|id |keys |values |+---+-------------------------+------------------------------------------------------+|0 |(6,[0,1,2],[1.0,1.0,1.0])|[[-2.031299587E9], [-1.974869772E9], [-1.974047307E9]]||1 |(6,[2,3,4],[1.0,1.0,1.0])|[[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]] ||2 |(6,[0,2,4],[1.0,1.0,1.0])|[[-2.031299587E9], [-1.758749518E9], [-1.974047307E9]]|+---+-------------------------+------------------------------------------------------+ */// Approximate similarity join// 如下结果所示, 默认分值列是 distColmodel.approxSimilarityJoin(dfA, dfB, 0.6).show(false)/*+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+|datasetA |datasetB |distCol|+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+|[0, (6,[0,1,2],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.974047307E9]]]|[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 ||[1, (6,[2,3,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |[4, (6,[2,3,5],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |0.5 ||[1, (6,[2,3,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 ||[2, (6,[0,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-1.974047307E9]]]|[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 |+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+*/// 也可以传入 cached transformed columnsval transformedA = model.transform(dfA).cache()val transformedB = model.transform(dfB).cache()model.approxSimilarityJoin(transformedA, transformedB, 0.6).show(false)/*+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+|datasetA |datasetB |distCol|+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+|[0, (6,[0,1,2],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.974047307E9]]]|[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 ||[1, (6,[2,3,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |[4, (6,[2,3,5],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |0.5 ||[1, (6,[2,3,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-4.86208737E8]]] |[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 ||[2, (6,[0,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.758749518E9], [-1.974047307E9]]]|[5, (6,[1,2,4],[1.0,1.0,1.0]), [[-2.031299587E9], [-1.974869772E9], [-1.230128022E9]]]|0.5 |+--------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------+-------+*/// Self Joinmodel.approxSimilarityJoin(dfA, dfA, 0.6).filter("datasetA.id < datasetB.id").show(false)// Approximate nearest neighbor searchval key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))model.approxNearestNeighbors(dfA, key, 2).show(false) 参考 https://spark.apache.org/docs/2.1.0/ml-features.html