页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

聚类是一个无监督的学习问题,我们的目标是基于一些相似性的概念将实体的子集相互组合。聚类经常用于探索性分析和/或作为分层 监督学习流水线的组成部分(其中针对每个群集训练不同的分类器或回归模型)。

 该spark.mllib 软件包支持以下模型:

K-means

K-means是将数据点聚类到预定数量的聚类中最常用的聚类算法之一。该spark.mllib实现包括一个称为kmeans ||k-means ++方法的并行变体 。在spark.mllib中,这个实现具有以下参数:

  • k是所需簇的数量。请注意,可以返回少于k个集群,例如,如果有少于k个不同的集群点。
  • maxIterations是要运行的最大迭代次数。
  • initializationMode通过k-means ||指定随机初始化或初始化。
  • run这个参数从Spark 2.0.0起没有效果。
  • initializationSteps确定k-means ||中的步数 算法。
  • epsilon决定了我们认为k-means收敛的距离阈值。
  • initialModel是用于初始化的一组可选集群。如果提供此参数,则只执行一次运行。

例子(Scala)

         可以使用spark-shell执行以下代码片段。

         在加载和解析数据后的以下示例中,我们使用该 KMeans对象将数据集群到两个集群中。所需的簇的数量被传递给算法。然后我们计算平方误差的集合和(WSSSE)。

         你可以通过增加k来减少此误差度量。实际上,最优的k通常是WSSSE图中有一个“elbow”。

Scala
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel}
import org.apache.spark.mllib.linalg.Vectors

//加载和解析数据
val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache()

// 使用KMeans将数据集成到2个类中
val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)

// 通过计算在平方误差的总和中评估聚类
val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)

// 保存和加载模型
clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel") 

 有关API的详细信息,请参阅KMeansScala文档KMeansModelScala文档

在Spark repo中的"examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala"中查找完整示例代码。

 注:kmeans_data.txt 链接   


高斯混合

高斯混合模型代表一个复合分布,由此点是从一个绘制ķ高斯子分布,每个具有其自己的概率。该spark.mllib实现使用 期望最大化算法来给出给定一组样本的最大似然模型。这个实现具有以下参数:

  • k是所需簇的数量。
  • convergenceTol是我们考虑收敛的对数似然的最大变化。
  • maxIterations是不达到收敛的最大迭代次数。
  • initialModel是启动EM算法的可选起始点。如果省略此参数,将从数据中构建一个随机起始点。

例子(Scala)

在加载和解析数据之后的以下示例中,我们使用 GaussianMixture对象将数据集群到两个集群中。所需的簇的数量被传递给算法。然后我们输出混合模型的参数。

Scala
import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel}
import org.apache.spark.mllib.linalg.Vectors

// 加载和解析数据
val data = sc.textFile("data/mllib/gmm_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache()

// 使用GaussianMixture将数据分成两类
val gmm = new GaussianMixture().setK(2).run(parsedData)

// 保存和加载模型
gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")
val sameModel = GaussianMixtureModel.load(sc,
  "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel")

// 输出最大似然模型的参数
for (i <- 0 until gmm.k) {
  println("weight=%f\nmu=%s\nsigma=\n%s\n" format
    (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma))
}

有关API的详细信息,请参阅GaussianMixtureScala文档GaussianMixtureModelScala文档

可以在Spark repo中找到"examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala"完整的示例代码  
注:gmm_data.txt 链接      

 

幂迭代聚类(PIC)

       幂迭代聚类(PIC)是一种可扩展和高效的算法,用于将图中的顶点聚类为给定的边缘属性的成对相似性,Lin和Cohen,Power Iteration Clustering中描述。它通过功率迭代计算图的归一化亲和度矩阵的伪特征向量,并将 其用于聚类顶点。spark.mllib包括使用GraphX作为其后端的PIC的实现。它需要一个RDD(srcId, dstId, similarity)元组,并输出与该聚类分配的模型。相似之处必须是非负的。PIC假定相似性度量是对称的。(srcId, dstId)输入数据中最多只能出现一对,而不管排序如何。如果输入中缺少一对,则将其相似性视为零。 spark.mllib

  • k:簇数
  • maxIterations:最大功率迭代次数
  • initializationMode:初始化模型 这可以是使用随机向量作为顶点属性的“随机”,它是默认的,或者“度”使用归一化的相似度。

例子(Scala)

下面我们来看一下代码片段演示如何在spark.mllib使用PIC 。

   PowerIterationClustering 实现PIC算法。它需要代表亲和度矩阵RDD(srcId: Long, dstId: Long, similarity: Double)元组。

       调用PowerIterationClustering.run返回a PowerIterationClusteringModel,其中包含计算的聚类分配。

Scala
import org.apache.spark.mllib.clustering.PowerIterationClustering

val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints)
val model = new PowerIterationClustering()
  .setK(params.k)
  .setMaxIterations(params.maxIterations)
  .setInitializationMode("degree")
  .run(circlesRdd)

val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
val assignments = clusters.toList.sortBy { case (k, v) => v.length }
val assignmentsStr = assignments
  .map { case (k, v) =>
    s"$k -> ${v.sorted.mkString("[", ",", "]")}"
  }.mkString(", ")
val sizesStr = assignments.map {
  _._2.length
}.sorted.mkString("(", ",", ")")
println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")

有关API的详细信息,请参阅PowerIterationClusteringScala文档PowerIterationClusteringModelScala文档 

可以在Spark repo中找到 "examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala"完整的示例代码。


潜在Dirichlet分配(LDA)

 潜在的Dirichlet分配(LDA) 是一个主题模型,从一组文本文档推断主题。LDA可以被认为是聚类算法如下:

  •  主题对应于集群中心,文档对应于数据集中的示例(行)。
  •  主题和文档都存在于特征空间中,其中特征向量是字计数(单词包)的向量。
  •  LDA不是使用传统距离来估计聚类,而是使用基于如何生成文本文档的统计模型的函数。

               LDA通过setOptimizer功能支持不同的推理算法。 EMLDAOptimizer使用 期望最大化 对似然函数进行聚类,并产生综合结果,

               同时 OnlineLDAOptimizer使用迭代小批量抽样进行在线变分推理 ,通常内存友好。

LDA将文档集合作为字计数的向量和以下参数(使用构建器模式设置):

  • k:主题数量(即集群中心)
  • optimizer:用于学习LDA模型的优化器,或者 EMLDAOptimizerOnlineLDAOptimizer
  • docConcentration:Dirichlet参数,用于事先通过主题分发的文档。较大的值会鼓励更平稳的推断分布。
  • topicConcentration:Dirichlet参数,用于事先通过术语(词)分配主题。较大的值会鼓励更平稳的推断分布。
  • maxIterations:限制迭代次数。
  • checkpointInterval:如果使用检查点(在Spark配置中设置),则此参数指定将创建检查点的频率。如果maxIterations大,使用检查点可以帮助减少磁盘上的随机文件大小,并帮助恢复故障。

所有的spark.mllib的LDA型号都支持:

  • describeTopics:将主题作为最重要术语和术语权重的数组返回
  • topicsMatrix:返回一个vocabSizek矩阵,其中各列是一个主题

       注意:LDA仍然是积极开发的实验功能。因此,某些功能仅在优化程序生成的两个优化器/模型之一中可用。目前,分布式模型可以转换为本地模型,但反之亦然。

       以下讨论将分别描述每个优化器/模型对。

期望最大化

       在 EMLDAOptimizer 和 DistributedLDAModel中的实现

       为LDA提供的参数:

  • docConcentration:只支持对称先验,所以提供的k维度向量中的所有值必须相同。所有值也必须为$ 1.0 $。
                                   提供默认行为的结果(均值维矢量,值为$(50 / k)+ 1 $>1.0  >1.0Vector(-1)k(50/k)+1  (50/k)+1
  • topicConcentration:只支持对称先验。值必须为$ 1.0 $。提供结果,默认值为$ 0.1 + 1 $。>1.0  >1.0-10.1+1  0.1+1
  • maxIterations:EM迭代的最大数量。

注意:重复执行足够的迭代。在早期迭代中,EM通常有无用的主题,但是这些主题在更多的迭代之后会显着改善。使用至少20次和可能的50-100次迭代通常是合理的,具体取决于您的数据集。

       EMLDAOptimizer产生一个DistributedLDAModel,它不仅存储推断的主题,而且存储训练语料库中每个文档的完整的训练语料库和主题分布。一个 DistributedLDAModel的支持:

  • topTopicsPerDocument:训练语料库中每个文档的主题及其权重
  • topDocumentsPerTopic:每个主题的顶部文档以及文档中主题的相应权重。
  • logPrior:考虑到超参数的估计主题和文档的主题分布数概率 docConcentrationtopicConcentration
  • logLikelihood:训练语料库的对数可能性,给出推断的主题和文档主题分布

在线变异贝叶斯

       在 OnlineLDAOptimizer 和 LocalLDAModel中的实现

       为LDA提供的参数:

  • docConcentrationk>=0  >=0Vector(-1)k(1.0/k)  (1.0/k)
  • topicConcentration>=0  >=0-1(1.0/k)  (1.0/k)
  • maxIterations:要提交的最大批量数量。

        另外,OnlineLDAOptimizer接受以下参数:

  • miniBatchFraction:每次迭代采样和使用语料库的分数
  • optimizeDocConcentration:如果设置为true,则在每个minibatch之后执行超参数docConcentration(aka alpha)的最大似然估计docConcentration,并将返回的优化LocalLDAModel
  • tau0kappa(τ  0 +iter) κ   (τ0+iter)−κiter  iter

   OnlineLDAOptimizer产生一个LocalLDAModel,它只存储推断的主题。一个 LocalLDAModel支持:

  • logLikelihood(documents):计算所提供documents给定推断主题的下限 。
  • logPerplexity(documents):计算提供的documents给定推断主题的困惑度的上限。

例子(Scala)

            在以下示例中,我们加载表示文档语料库的字数向量。然后,我们使用LDA 从文档中推断出三个主题。所需的簇的数量被传递给算法。然后我们输出主题,表示为词的概率分布。

Scala
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.Vectors

// 加载和解析数据
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble)))
// 具有唯一ID的索引文档
val corpus = parsedData.zipWithIndex.map(_.swap).cache()

// 使用LDA将文档集成到三个主题中
val ldaModel = new LDA().setK(3).run(corpus)

// 输出主题。每个都是一个分配的单词(匹配的单词向量)
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
val topics = ldaModel.topicsMatrix
for (topic <- Range(0, 3)) {
  print("Topic " + topic + ":")
  for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); }
  println()
}

// 保存和加载模型
ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
val sameModel = DistributedLDAModel.load(sc,
  "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
 

有关API的详细信息,请参阅LDAScala文档DistributedLDAModelScala文档

可以在Spark repo中找到"examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala" 的完整示例代码。

注:sample_lda_data.txt 链接

 

平分k-means

          平均K均值通常比常规K均值快得多,但通常会产生不同的聚类。

          平分k均值是一种层次聚类。分层聚类是集群分析中最常用的方法之一,它旨在构建集群的层次结构。层次聚类的策略通常分为两种:

  • 集合:这是一种“自下而上”的方法:每个观察在自己的集群中开始,并且一组聚类在层次结构上移动时被合并。
  • 分裂:这是一种“自上而下”的方法:所有观察都从一个群集中开始,并且分层在层级结构向下移动时递归执行。

          二分法k均值算法是一种分裂算法。MLlib中的实现具有以下参数:

  • k:所需的叶簇数(默认值:4)。如果没有可分割的叶簇,实际数量可能更小。
  • maxIterations:拆分集群的k-means迭代的最大数量(默认值:20)
  • minDivisibleClusterSize:可分组的最小点数(如果> = 1.0)或点的最小比例(如果<1.0)(默认值:1)
  • 种子:一个随机种子(默认值:类名的哈希值)

例子(Scala)

Scala
import org.apache.spark.mllib.clustering.BisectingKMeans
import org.apache.spark.mllib.linalg.{Vector, Vectors}

// 加载和解析数据
def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble))
val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache()

// 通过BisectingKMeans将数据聚类成6个簇。
val bkm = new BisectingKMeans().setK(6)
val model = bkm.run(data)

// 显示计算成本和集群中心
println(s"Compute Cost: ${model.computeCost(data)}")
model.clusterCenters.zipWithIndex.foreach { case (center, idx) =>
  println(s"Cluster Center ${idx}: ${center}")
}

 有关API的详细信息,请参阅BisectingKMeansScala文档BisectingKMeansModelScala文档

可以在Spark repo中找到"examples/src/main/scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala"完整的示例代码。

注:kmeans_data.txt 链接

 

Streaming k-means

       当数据到达流时,我们可能需要动态地估计集群,并在新数据到达时进行更新。spark.mllib提供对流式k均值聚类的支持,其中包含用于控制估计的衰减(或“健忘”)的参数。该算法使用小批量k-means更新规则的泛化。对于每批数据,我们将所有点分配给最近的集群,计算新的集群中心,然后使用以下方式更新每个集群:

   

例子

       此示例显示如何估计流数据上的集群。

Scala
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse)

val model = new StreamingKMeans()
  .setK(args(3).toInt)
  .setDecayFactor(1.0)
  .setRandomCenters(args(4).toInt, 0.0)

model.trainOn(trainingData)
model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()

      可以在Spark repo中找到"examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala" 完整的示例代码。

     当您添加具有数据的新文本文件时,集群中心将更新。 每个训练点应该格式化为[x1,x2,x3],每个测试数据点应该被格式化为(y,[x1,x2,x3]),其中y是一些有用的标签或标识符(例如真正的类别分配 )。 任何时候,文本文件放在/training/data/dir中,模型将会更新。 任何时候,文本文件都放在/testing/data/dir中,您将看到预测。 使用新的数据,集群中心将改变!

 

  • 无标签