页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

spark.mllib 提供了一系列的机器学习算法可以用来从数据中挖掘信息和进行预测。当这些算法被用来建立机器学习模型时,我们同样需要根据实际应用和需求去基于某些标准评估模型的效果。spark.mllib 同样提供了一整套指标用来满足评估机器学习模型效果的要求。

一些机器学习算法可被归类于更广泛的机器学习应用类型,比如分类问题、回归问题、聚类问题等。这里面每一种类型都有成熟的效果评估指标并且这些指标可以从 spark.mllib 中使用,在本章中将会仔细讲述。

 

Classification model evaluation(分类模型评估)

虽然分类算法的种类有很多,但是分类模型的评估方法的原理都比较相似。在一个有监督分类问题中,对于每一个数据点都会存在一个真实的分类值(标签)和一个模型生成的预测分类值。基于这两个值,每个数据点的结果都可以被归纳为以下四个类别中的一种:

  • True PositiveTP)- 真实标签为正并且预测正确
  • True NegativeTN)- 真实标签为负并且预测正确
  • False PositiveFP)- 真实标签为负但是预测为正
  • False NegativeFN)- 真实标签为正但是预测为负

上面四种值是大多数分类器评估指标的基础。如果仅仅依靠最基本的准确率(预测是对的还是错的)去评估一个分类器的效果的话,这往往不是一个好的指标,原因在于一个数据集可能各个类别的分布非常不平衡。举例来说,如果一个预测欺诈的模型是根据这样的数据集设计出来的:95%的数据点是非欺诈数据,5%的数据点是欺诈数据,那么一个简单的分类器去预测非欺诈时,不考虑输入的影响,它的准确率将会是95%。因此,我们经常会使用 precision 和 recall 这样的指标,因为他们会将“错误”的类型考虑进去。在大多数应用中,precision 和 recall 会被合并为一个指标来在他们之间进行一些平衡,我们称这个指标为 F-measure

 

Binary classification(二项分类)

二项分类器可以用来区分数据集中的元素到两个可能的组中(如欺诈和非欺诈),而且它是多项分类器的一种特殊情况。多数的二项分类评估指标可以被推广使用到多项分类评估中。

Threshold tuning(阈值调优)

很多分类模型对于每一个类实际上会输出一个“分数”(经常是一个概率值),其中较高的分数表示更高的可能性,理解这一点非常重要。在二项分类场景中,模型会为每个类别输出一个概率: 和 。并不是所有场景都会取更高概率的类别,有些场景下模型可能需要进行调整来使得它只有在某个概率非常高的情况下才去预测这个类别(如只有在模型预测的欺诈概率高于90%的情况下才屏蔽这笔信用卡交易)。所以,根据模型输出的概率来确定预测的类别是由预测的阈值来决定的。

调整预测阈值将会改变模型的 precisionrecall,而且它是模型调优的重要环节。为了能够将 precisionrecall 和其他评估指标根据阈值的改变是如何变化的这一情况可视化出来,一种常见的做法是通过阈值的参数化来绘制相互作对比的指标。一种图叫做 P-R 曲线,它会将 (precision, recall) 组成的点根据不同的阈值绘制出来,而另一种叫做 receiver operating characteristic 或者 ROC 曲线,它会绘制出 (recall, false positive rate) 组成的点。

可用的评估指标

指标定义
Precision(精确率)
Recall(召回率)
F-measure
Receiver Operating CharacteristicROC
Area Under ROC CurveROC 曲线下面积
Area Under Precision-Recall CurveP-R 曲线下面积)

举例

下面代码片段阐释了如何加载一个数据集样本,并对数据训练一个二项分类模型,最后通过几个二项评估指标来评价该模型的效果。

Scala
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils

// 以 LIBSVM 格式加载训练数据
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_binary_classification_data.txt")

// 分割数据为训练集(60%)和测试集(40%)
val Array(training, test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
training.cache()

// 运行训练算法来建模
val model = new LogisticRegressionWithLBFGS()
  .setNumClasses(2)
  .run(training)

// 清空预测阈值来使模型返回概率值
model.clearThreshold

// 使用测试集计算原始分数
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
  val prediction = model.predict(features)
  (prediction, label)
}

// 实例化指标对象
val metrics = new BinaryClassificationMetrics(predictionAndLabels)

// 按阈值分别求 precision
val precision = metrics.precisionByThreshold
precision.foreach { case (t, p) =>
  println(s"Threshold: $t, Precision: $p")
}

// 按阈值分别求 recall
val recall = metrics.recallByThreshold
recall.foreach { case (t, r) =>
  println(s"Threshold: $t, Recall: $r")
}

// Precision-Recall 曲线
val PRC = metrics.pr

// F-measure
val f1Score = metrics.fMeasureByThreshold
f1Score.foreach { case (t, f) =>
  println(s"Threshold: $t, F-score: $f, Beta = 1")
}

val beta = 0.5
val fScore = metrics.fMeasureByThreshold(beta)
f1Score.foreach { case (t, f) =>
  println(s"Threshold: $t, F-score: $f, Beta = 0.5")
}

// AUPRC
val auPRC = metrics.areaUnderPR
println("Area under precision-recall curve = " + auPRC)

// 计算 ROC 和 PR 曲线中使用的阈值
val thresholds = precision.map(_._1)

// ROC 曲线
val roc = metrics.roc

// AUROC
val auROC = metrics.areaUnderROC
println("Area under ROC = " + auROC)

 请参考 LogisticRegressionWithLBFGS Scala docs 和 BinaryClassificationMetrics Scala docs 以获取更多 API 相关细节

在Spark开源项目的“examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassificationMetricsExample.scala”路径下可以找到完整的例子代码。

 

Multiclass classification(多项分类)

多项分类描述了这样一种分类问题,对于每个数据点都有种可能的分类标签(当时为二项分类问题)。举例来说,区分手写样本为0到9的数字就是一种有10个可能分类的多项分类问题。

对于多类别评估指标,积极和消极的概念跟二项分类略有不同。预测值和实际分类仍然可以是积极的或消极的,但是它们必须被归类到某种特定的类别中。每一个类别标签和预测值都会分配给多个类别中的一个,所以它对于分配的类别来说是积极的,但对于其他类别则是消极的。因此,true positive 发生在预测值和实际分类相吻合的时候,而 true negative 发生在预测值和实际分类都不属于一个类别的时候。由于这个认识,对于一个数据样本而言存在多个 true negative。根据之前对于 positive 和 negative 的定义扩展到 false negative 和 false positive 时就相对直接了。

Label based metrics(基于标签的指标)

相对于只有两种可能分类的二项分类问题,多项分类问题有多种可能的类别,所以基于类别的评估指标概念则被引入。基于所有类别的 precision 的准确率计算方式为,任何类别被预测正确的次数(true positive)除以数据点的数量。基于一个类别的 precision 的计算方式为,某个特定的类别被预测正确的次数除以这个类别在结果中出现的次数。

可用的评估指标 

定义类别或标签序列为:

 

真实的输出向量包含个元素:

 

对于个元素,一个多类别预测算法会产生一个预测向量

 

一个修正的 delta 函数将会证明是有用的:

 

指标定义
Confusion Matrix(混淆矩阵)
Accuracy(准确率)
Precision by label(标签 precision
Recall by label(标签 recall
F-measure by label(标签 F-measure
Weighted precision(加权 precision
Weighted recall(加权 recall
Weighted F-measure(加权 F-measure

举例

下面代码片段阐释了如何加载一个数据集样本,并对数据训练一个多项分类模型,最后通过多项分类评估指标评估模型效果。

Scala
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils

// 以 LIBSVM 格式加载训练数据
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_multiclass_classification_data.txt")

// 分割数据为训练集(60%)和测试集(40%)
val Array(training, test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
training.cache()

// 运行训练算法来建模
val model = new LogisticRegressionWithLBFGS()
  .setNumClasses(3)
  .run(training)

// 使用测试集计算原始分数
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
  val prediction = model.predict(features)
  (prediction, label)
}

// 实例化指标对象
val metrics = new MulticlassMetrics(predictionAndLabels)

// 混淆矩阵
println("Confusion matrix:")
println(metrics.confusionMatrix)

// 总体统计结果
val accuracy = metrics.accuracy
println("Summary Statistics")
println(s"Accuracy = $accuracy")

// 基于类别计算 Precision
val labels = metrics.labels
labels.foreach { l =>
  println(s"Precision($l) = " + metrics.precision(l))
}

// 基于类别计算 Recall
labels.foreach { l =>
  println(s"Recall($l) = " + metrics.recall(l))
}

// 基于类别计算 False positive rate
labels.foreach { l =>
  println(s"FPR($l) = " + metrics.falsePositiveRate(l))
}

// 基于类别计算 F-measure
labels.foreach { l =>
  println(s"F1-Score($l) = " + metrics.fMeasure(l))
}

// 加权统计结果
println(s"Weighted precision: ${metrics.weightedPrecision}")
println(s"Weighted recall: ${metrics.weightedRecall}")
println(s"Weighted F1 score: ${metrics.weightedFMeasure}")
println(s"Weighted false positive rate: ${metrics.weightedFalsePositiveRate}")

参考 MulticlassMetrics Scala docs 来获取更多 API 相关细节

在Spark repo中的“examples/src/main/scala/org/apache/spark/examples/mllib/BinaryClassificationMetricsExample.scala”中查找完整示例代码。


Multilabel classification (多标签分类)

多标签分类问题指的是将一个数据集中的每一个样本映射到一组分类标签中去。在这一类分类问题中,各个标签所包含的样本并不是互斥的。举例来说,将一组新闻稿分类到不同的主题中去,一篇稿子可能既属于科技类,又属于政治类。

由于标签不是彼此互斥,预测值和真实的分类标签就成为了标签集合的向量,而非标签的向量。于是多标签评估指标就可以从基本的 precisionrecall 等概念扩展到对集合的操作上。例如,对于某个类的 true positive 就发生在当这个类别存在于某个数据点的预测值集合中,并且也存在于它的真实类别集合中时。

可用指标

我们定义一个集合包含个文档

定义为全体标签集合,为全体预测结果集合,L_i 和 P_i 分别为文档 d_i 的标签集合和预测值集合。

那么所有独立标签的集合则表示为

对于集合 A 的标识函数将会被使用

 

指标定义
Precision
Recall
Accuracy
Precision by label
Recall by label
F1-measure by label
Hamming Loss
Subset Accuracy
F1 Measure
Micro precision
Micro recall
Micro F1 Measure

 

举例

以下代码片段阐释了如何去评估一个多标签分类器的效果。下面的多标签分类例子中使用了伪造的预测数据和标签数据。

对于每一篇文章的预测结果为:

  • 文章 0 - 预测值为 0, 1 - 类别为 0, 2
  • 文章 1 - 预测值为 0, 2 - 类别为 0, 1
  • 文章 2 - 预测值为空 - 类别为 0
  • 文章 3 - 预测值为 2 - 类别为 2
  • 文章 4 - 预测值为 2, 0 - 类别为 2, 0
  • 文章 5 - 预测值为 0, 1, 2 - 类别为 0, 1
  • 文章 6 - 预测值为 1 - 类别为 1, 2

对于每一类别的预测结果为:

  • 类别 0 - 文章 0, 1, 4, 5(一共 4 篇)
  • 类别 1 - 文章 0, 5, 6(一共 3 篇)
  • 类别 2 - 文章 1, 3, 4, 5(一共 4 篇)

每一类别实际包含的文章为:

  • 类别 0 - 文章 0, 1, 2, 4, 5(一共 5 篇)
  • 类别 1 - 文章 1, 5, 6(一共 3 篇)
  • 类别 2 - 文章 0, 3, 4, 6(一共 4 篇)
Scala
import org.apache.spark.mllib.evaluation.MultilabelMetrics
import org.apache.spark.rdd.RDD

val scoreAndLabels: RDD[(Array[Double], Array[Double])] = sc.parallelize(
  Seq((Array(0.0, 1.0), Array(0.0, 2.0)),
    (Array(0.0, 2.0), Array(0.0, 1.0)),
    (Array.empty[Double], Array(0.0)),
    (Array(2.0), Array(2.0)),
    (Array(2.0, 0.0), Array(2.0, 0.0)),
    (Array(0.0, 1.0, 2.0), Array(0.0, 1.0)),
    (Array(1.0), Array(1.0, 2.0))), 2)

// 实例化指标对象
val metrics = new MultilabelMetrics(scoreAndLabels)

// 总体统计结果
println(s"Recall = ${metrics.recall}")
println(s"Precision = ${metrics.precision}")
println(s"F1 measure = ${metrics.f1Measure}")
println(s"Accuracy = ${metrics.accuracy}")

// 基于每一标签的统计结果
metrics.labels.foreach(label =>
  println(s"Class $label precision = ${metrics.precision(label)}"))
metrics.labels.foreach(label => println(s"Class $label recall = ${metrics.recall(label)}"))
metrics.labels.foreach(label => println(s"Class $label F1-score = ${metrics.f1Measure(label)}"))

// Micro stats
println(s"Micro recall = ${metrics.microRecall}")
println(s"Micro precision = ${metrics.microPrecision}")
println(s"Micro F1 measure = ${metrics.microF1Measure}")

// Hamming loss
println(s"Hamming loss = ${metrics.hammingLoss}")

// Subset accuracy
println(s"Subset accuracy = ${metrics.subsetAccuracy}")

参考  MultilabelMetrics Scala docs 以获取更多 API 相关细节

在 Spark 项目的“examples/src/main/scala/org/apache/spark/examples/mllib/MultiLabelMetricsExample.scala”路径下可以找到完整示例代码。

 

Ranking system(排名系统)

排名算法(通常被作为推荐系统算法)的作用是基于一些训练数据给用户返回一个相关物品或文章的集合。相关性的定义会根据不同的应用场景而有所差异。排名系统评估指标则用于在不同的场景中量化排名或者推荐的效果。某些指标会把系统推荐的文章集合跟实际相关的文章集合做对比,而另外一些指标会显式地结合数值打分。

可用的指标

一个排名系统通常会处理一个由个用户组成的集合

每一个用户有一个由个真正相关的文章组成的集合

并且还有一个由个推荐文章组成的有序集合,按照相关度降序排序

排序系统的目标就是给每个用户提供与其相关度最高的若干个文章集合。集合的相关度以及算法的有效性可以用下列评估指标进行度量。

度量定义说明
Precision at kPrecision at k 是用来度量对所有用户来说,平均前k个推荐的文章里有几个在真正相关的文章集合中出现。这个指标不考虑推荐的顺序问题。
Mean Average PrecisionMAP 是用来度量推荐的文章集合里有几个出现在真正相关的文章集合中,并且考虑推荐的顺序性(比如文章的相关性越高,它出错的惩罚就越大)。
Normalized Discounted Cumulative GainNDCG at k 是用来度量对所有用户来说,平均前k个推荐的文章里有几个在真正相关的文章集合中出现。跟 Precision at k 不同,这个指标考虑推荐的顺序性(假设文章按照相关性降序排序)。

 

举例

下面代码片段阐释了如何加载一个数据集样本,并使用数据训练一个交替最小二乘法推荐算法模型,最后通过几个排名评估指标来评价推荐系统效果。下面简单总结了一下使用的方法。

对于影片的打分在 1 - 5 分区间:

  • 5: 必看
  • 4: 会喜欢
  • 3: 还可以
  • 2: 不怎么样
  • 1: 烂片

那么如果预测的打分小于3分,我们将不会推荐这部影片。我们使用下面映射关系来表示置信分数:

  • 5 -> 2.5
  • 4 -> 1.5
  • 3 -> 0.5
  • 2 -> -0.5
  • 1 -> -1.5

这个映射关系表示没有被用户察觉到的影片通常在“还可以”和“不怎么样”之间。权重0在这里的语义表示“跟这个影片从来没有过任何互动”。

Scala
import org.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics}
import org.apache.spark.mllib.recommendation.{ALS, Rating}

// 读取打分数据
val ratings = spark.read.textFile("data/mllib/sample_movielens_data.txt").rdd.map { line =>
  val fields = line.split("::")
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)
}.cache()

// 将打分映射到0或者1,1表示影片应该被推荐
val binarizedRatings = ratings.map(r => Rating(r.user, r.product,
  if (r.rating > 0) 1.0 else 0.0)).cache()

// 评分的统计结果
val numRatings = ratings.count()
val numUsers = ratings.map(_.user).distinct().count()
val numMovies = ratings.map(_.product).distinct().count()
println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.")

// 建模
val numIterations = 10
val rank = 10
val lambda = 0.01
val model = ALS.train(ratings, rank, numIterations, lambda)

// 定义一个函数将打分映射到0和1区间
def scaledRating(r: Rating): Rating = {
  val scaledRating = math.max(math.min(r.rating, 1.0), 0.0)
  Rating(r.user, r.product, scaledRating)
}

// 获取排序后对于每个用户前十个预测值,并映射到0和1区间
val userRecommended = model.recommendProductsForUsers(10).map { case (user, recs) =>
  (user, recs.map(scaledRating))
}

// 假设任何被用户打分为3分或更高(映射到1)的影片为一个相关的影片
// 并与前十个最相关的影片做对比
val userMovies = binarizedRatings.groupBy(_.user)
val relevantDocuments = userMovies.join(userRecommended).map { case (user, (actual,
predictions)) =>
  (predictions.map(_.product), actual.filter(_.rating > 0.0).map(_.product).toArray)
}

// 实例化一个评估指标模型
val metrics = new RankingMetrics(relevantDocuments)

// Precision at K
Array(1, 3, 5).foreach { k =>
  println(s"Precision at $k = ${metrics.precisionAt(k)}")
}

// Mean average precision
println(s"Mean average precision = ${metrics.meanAveragePrecision}")

// Normalized discounted cumulative gain
Array(1, 3, 5).foreach { k =>
  println(s"NDCG at $k = ${metrics.ndcgAt(k)}")
}

// 获得每个数据点的 Prediction
val allPredictions = model.predict(ratings.map(r => (r.user, r.product))).map(r => ((r.user,
  r.product), r.rating))
val allRatings = ratings.map(r => ((r.user, r.product), r.rating))
val predictionsAndLabels = allPredictions.join(allRatings).map { case ((user, product),
(predicted, actual)) =>
  (predicted, actual)
}

// 使用回归评估指标获得 RMSE
val regressionMetrics = new RegressionMetrics(predictionsAndLabels)
println(s"RMSE = ${regressionMetrics.rootMeanSquaredError}")

// R-squared
println(s"R-squared = ${regressionMetrics.r2}")

参考 RegressionMetrics Scala docs 和 RankingMetrics Scala docs 获取更多 API 相关细节

在 Spark 项目的“examples/src/main/scala/org/apache/spark/examples/mllib/RankingMetricsExample.scala”路径下可以找到完整示例代码。

 

Regression model evaluation(回归模型评估)

Regression analysis(回归分析)被用于从一系列自变量中预测连续的因变量的场景中。

可用的指标

指标定义
Mean Squared Error (MSE)
Root Mean Squared Error (RMSE)
Mean Absolute Error (MAE)
Coefficient of Determination
Explained Variance

 

举例

下面代码片段阐释了如何加载一个数据集样本,并使用数据训练一个线性回归算法模型,最后通过几个回归评估指标来对模型效果进行评价的过程。

Scala
import org.apache.spark.mllib.evaluation.RegressionMetrics
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD}

// 加载数据
val data = spark
  .read.format("libsvm").load("data/mllib/sample_linear_regression_data.txt")
  .rdd.map(row => LabeledPoint(row.getDouble(0), row.get(1).asInstanceOf[Vector]))
  .cache()

// 建模
val numIterations = 100
val model = LinearRegressionWithSGD.train(data, numIterations)

// 获取预测值
val valuesAndPreds = data.map{ point =>
  val prediction = model.predict(point.features)
  (prediction, point.label)
}

// 实例化一个回归指标对象
val metrics = new RegressionMetrics(valuesAndPreds)

// Squared error
println(s"MSE = ${metrics.meanSquaredError}")
println(s"RMSE = ${metrics.rootMeanSquaredError}")

// R-squared
println(s"R-squared = ${metrics.r2}")

// Mean absolute error
println(s"MAE = ${metrics.meanAbsoluteError}")

// Explained variance
println(s"Explained variance = ${metrics.explainedVariance}")

请参考 RegressionMetrics Scala docs 来获取更多 API 相关细节

在 Spark 项目的“examples/src/main/scala/org/apache/spark/examples/mllib/RegressionMetricsExample.scala”路径下可以找到完整示例代码。

 

  • 无标签