Spark的分层聚集聚类

如何解决Spark的分层聚集聚类

我正在使用Spark和Scala进行项目,并且正在寻找一种分层聚类算法,该算法类似于scipy.cluster.hierarchy.fcluster或sklearn.cluster.AgglomerativeClustering,它们可用于大量数据

Spark的MLlib实现了平分k均值,它需要输入簇数。不幸的是,在我的情况下,我不知道簇的数量,我更愿意使用一些距离阈值作为输入参数,因为可以在上述两个python实现中使用它。

如果有人知道答案,我将非常感激。

解决方法

所以我遇到了同样的问题,经过高低查找后没有找到答案,所以我将在这里发布我所做的,希望它可以帮助其他人,也许有人会以此为基础。

我所做的基本思想是递归地使用二分 K 均值来继续将簇一分为二,直到簇中的所有点都离质心指定距离为止。我使用的是 gps 数据,所以我有一些额外的机器来处理。

第一步是创建一个将数据减半的模型。我使用了平分 K 均值,但我认为这适用于任何 pyspark 聚类方法,只要您可以获得到质心的距离。

import pyspark.sql.functions as f
from pyspark import SparkContext,SQLContext
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


bkm = BisectingKMeans().setK(2).setSeed(1)
assembler = VectorAssembler(inputCols=['lat','long'],outputCol="features")
adf = assembler.transform(locAggDf)#locAggDf contains my location info
model = bkm.fit(adf)
# predictions will have the original data plus the "features" col which assigns a cluster number
predictions = model.transform(adf)
predictions.persist()

下一步是我们的递归函数。这里的想法是我们指定距质心的一些距离,如果集群中的任何点比该距离更远,我们将集群切成两半。当一个集群足够紧密以至于它满足条件时,我将它添加到我用来构建最终集群的结果数组中

def bisectToDist(model,predictions,bkm,precision,result = []):
    centers = model.clusterCenters()
    # row[0] is predictedClusterNum,row[1] is unit,row[2] point lat,row[3] point long
    # centers[row[0]] is the lat long of center,centers[row[0]][0] = lat,centers[row[0]][1] = long
    distUdf = f.udf(
        lambda row: getDistWrapper((centers[row[0]][0],centers[row[0]][1],row[1]),(row[2],row[3],row[1])),FloatType())##getDistWrapper(is how I calculate the distance of lat and long but you can define any distance metric)
    predictions = predictions.withColumn('dist',distUdf(
        f.struct(predictions.prediction,predictions.encodedPrecisionUnit,predictions.lat,predictions.long)))
    #create a df of all rows that were in clusters that had a point outside of the threshold
    toBig = predictions.join(
        predictions.groupby('prediction').agg({"dist": "max"}).filter(f.col('max(dist)') > self.precision).select(
            'prediction'),['prediction'],'leftsemi')


    #this could probably be improved
    #get all cluster numbers that were to big
    listids = toBig.select("prediction").distinct().rdd.flatMap(lambda x: x).collect()

    #if all data points are within the speficed distance of the centroid we can return the clustering
    if len(listids) == 0:
        return predictions

    # assuming binary class now k must be = 2
    # if one of the two clusters was small enough we will not have another recusion call for that cluster
    # we must save it and return it at this depth the clustiering that was 2 big will be cut in half in the loop below
    if len(listids) == 1:
        ok = predictions.join(
            predictions.groupby('prediction').agg({"dist": "max"}).filter(
                f.col('max(dist)') <= precision).select(
                'prediction'),'leftsemi')


    for clusterId in listids:
        # get all of the pieces that were to big
        part = toBig.filter(toBig.prediction == clusterId)
        
        # we now deed to refit the subset of the data
        assembler = VectorAssembler(inputCols=['lat',outputCol="features")
        adf = assembler.transform(part.drop('prediction').drop('features').drop('dist'))
        model = bkm.fit(adf)
        #predictions now holds the new subclustering and we are ready for recursion
        predictions = model.transform(adf)
        result.append(bisectToDist(model,result=result))

    #return anything that was given and already good

    if len(listids) == 1:
        return ok

最后我们可以调用函数并构建结果数据帧

result = []
self.bisectToDist(model,result=result)
#drop any nones can happen in recursive not top level call
result =[r for r in result if r]


r = result[0]
r = r.withColumn('subIdx',f.lit(0))
result = result[1:]
idx = 1
for r1 in result:
    r1 = r1.withColumn('subIdx',f.lit(idx))
    r = r.unionByName(r1)
    idx = idx + 1

# each of the subclusters will have a 0 or 1 classification in order to make it 0 - n I added the following
r = r.withColumn('delta',r.subIdx * 100 + r.prediction)
r = r.withColumn('delta',r.delta - f.lag(r.delta,1).over(Window.orderBy("delta"))).fillna(0)
r = r.withColumn('ddelta',f.when(r.delta != 0,1).otherwise(0))
r = r.withColumn('spacialLocNum',f.sum('ddelta').over(Window.orderBy(['subIdx','prediction'])))
#spacialLocNum should be the final clustering 

诚然,这相当复杂且缓慢,但它确实完成了工作,希望对您有所帮助!

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其他元素将获得点击?
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。)
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbcDriver发生异常。为什么?
这是用Java进行XML解析的最佳库。
Java的PriorityQueue的内置迭代器不会以任何特定顺序遍历数据结构。为什么?
如何在Java中聆听按键时移动图像。
Java“Program to an interface”。这是什么意思?
Java在半透明框架/面板/组件上重新绘画。
Java“ Class.forName()”和“ Class.forName()。newInstance()”之间有什么区别?
在此环境中不提供编译器。也许是在JRE而不是JDK上运行?
Java用相同的方法在一个类中实现两个接口。哪种接口方法被覆盖?
Java 什么是Runtime.getRuntime()。totalMemory()和freeMemory()?
java.library.path中的java.lang.UnsatisfiedLinkError否*****。dll
JavaFX“位置是必需的。” 即使在同一包装中
Java 导入两个具有相同名称的类。怎么处理?
Java 是否应该在HttpServletResponse.getOutputStream()/。getWriter()上调用.close()?
Java RegEx元字符(。)和普通点?