页面树结构

2017-07-25 Apache Spark 2.2.0 官方文档中文版发布 : http://spark.apachecn.org/docs/cn/2.2.0/


MachineLearning 优酷地址 : http://i.youku.com/apachecn

转至元数据结尾
转至元数据起始

本节介绍使用功能的算法,大致分为以下几类:

  • 提取:从“原始”数据中提取特征
  • 转换:缩放,转换或修改特征
  • 选择:从较大的一组特征中选择一个子集
  • 局部敏感哈希(LSH):这类算法将特征变换的方面与其他算法相结合。

目录

Feature Extractors(特征提取)

TF-IDF(词频-逆向文档频率)

词频(Term Frequency)- 逆向文档频率(Inverse Document Frequency 是一种在文本挖掘中广泛使用的特征向量化方法,以反映一个单词在语料库中的重要性。定义:t 表示由一个单词,表示一个文档,表示语料库(corpus),词频 TF(t,d) 表示某一个给定的单词 t 出现在文档中的次数, 而文档频率 DF(t,D) 表示包含单词 t  的文档次数。如果我们只使用词频 TF 来衡量重要性,则很容易过分强调出现频率过高并且文档包含少许信息的单词,例如,'a','the',和 'of'。如果一个单词在整个语料库中出现的非常频繁,这意味着它并没有携带特定文档的某些特殊信息(换句话说,该单词对整个文档的重要程度低)。逆向文档频率是一个数字量度,表示一个单词提供了多少信息:

 其中,|D| 是在语料库中文档总数。由于使用对数,所以如果一个单词出现在所有的文件,其IDF值变为0。注意,应用平滑项以避免在语料库之外的项除以零(为了防止分母为0,分母需要加1)。因此,TF-IDF测量只是TF和IDF的产物:(对TF-IDF定义为TF和IDF的乘积)

 关于词频TF和文档频率DF的定义有多种形式。在MLlib,我们分离TF和IDF,使其灵活。

TF(词频Term Frequency)HashingTFCountVectorizer都可以用于生成词频TF向量。

HashingTF是一个需要特征词集的转换器(Transformer),它可以将这些集合转换成固定长度的特征向量。在文本处理中,“特征词集”有一系列的特征词构成。HashingTF利用hashing trick,原始特征(raw feature)通过应用哈希函数映射到索引(术语)中。这里使用的哈希函数是murmurHash 3。然后根据映射的索引计算词频。这种方法避免了计算全局特征词对索引映射的需要,这对于大型语料库来说可能是昂贵的,但是它具有潜在的哈希冲突,其中不同的原始特征可以在散列之后变成相同的特征词。为了减少碰撞的机会,我们可以增加目标特征维度,即哈希表的桶数。由于使用简单的模数将散列函数转换为列索引,建议使用两个幂作为特征维,否则不会将特征均匀地映射到列。默认功能维度为。可选的二进制切换参数控制词频计数。当设置为true时,所有非零频率计数设置为1。这对于模拟二进制而不是整数的离散概率模型尤其有用。

CountVectorizer将文本文档转换为关键词计数的向量。有关详细信息,请参阅CountVectorizer 

IDF(逆向文档频率)IDF是一个适合数据集并生成IDFModel的评估器Estimator),IDFModel获取特征向量(通常由HashingTF或CountVectorizer创建)并缩放每列。直观地说,它下调了在语料库中频繁出现的列。

注: spark.ml不提供文本分割的工具。我们推荐用户参考Stanford NLP Group 和 scalanlp/chalk.

例子 

在下面的代码段中,我们从一组句子开始。我们使用Tokenizer将每个句子分成单词。对于每个句子(词袋,词集:bag of words),我们使用HashingTF将该句子哈希成特征向量。我们使用IDF来重新缩放特征向量;这通常会在使用文本作为功能时提高性能。然后,我们的特征向量可以被传递给学习算法。

Scala
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}

val sentenceData = spark.createDataFrame(Seq(
  (0.0, "Hi I heard about Spark"),
  (0.0, "I wish Java could use case classes"),
  (1.0, "Logistic regression models are neat")
)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val wordsData = tokenizer.transform(sentenceData)

val hashingTF = new HashingTF()
  .setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(20)

val featurizedData = hashingTF.transform(wordsData)
// alternatively, CountVectorizer can also be used to get term frequency vectors

val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(featurizedData)

val rescaledData = idfModel.transform(featurizedData)
rescaledData.select("label", "features").show()

有关API的更多详细信息,请参阅HashingTF Scala文档 IDF Scala文档 

在Spark repo中"examples/src/main/scala/org/apache/spark/examples/ml/TfIdfExample.scala"下可以找到完整的示例代码。

 

Word2Vec

Word2Vec是一个Estimator(评估器),它采用表示文档的单词序列,并训练一个Word2VecModel。 该模型将每个单词映射到一个唯一的固定大小向量。 Word2VecModel使用文档中所有单词的平均值将每个文档转换为向量; 该向量然后可用作预测,文档相似性计算等功能。有关更多详细信息,请参阅有关Word2Vec的MLlib用户指南

在下面的代码段中,我们从一组文档开始,每一个文档都用一个单词序列表示。 对于每个文档,我们将其转换为特征向量。 然后可以将该特征向量传递给学习算法。

Scala
import org.apache.spark.ml.feature.Word2Vec
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Input data: Each row is a bag of words from a sentence or document.
val documentDF = spark.createDataFrame(Seq(
  "Hi I heard about Spark".split(" "),
  "I wish Java could use case classes".split(" "),
  "Logistic regression models are neat".split(" ")
).map(Tuple1.apply)).toDF("text")

// Learn a mapping from words to Vectors.
val word2Vec = new Word2Vec()
  .setInputCol("text")
  .setOutputCol("result")
  .setVectorSize(3)
  .setMinCount(0)
val model = word2Vec.fit(documentDF)

val result = model.transform(documentDF)
result.collect().foreach { case Row(text: Seq[_], features: Vector) =>
  println(s"Text: [${text.mkString(", ")}] => \nVector: $features\n") }

有关API的更多详细信息,请参阅Word2Vec Scala文档

在Spark repo中的"examples/src/main/scala/org/apache/spark/examples/ml/Word2VecExample.scala"中查找完整示例代码。

 

CountVectorizer

CountVectorizer和CountVectorizerModel旨在帮助将文本文档集合转换为标记数的向量。 当先验词典不可用时,CountVectorizer可以用作估计器来提取词汇表,并生成CountVectorizerModel。 该模型通过词汇生成文档的稀疏表示,然后可以将其传递给其他算法,如LDA。

在拟合过程中,CountVectorizer将选择通过语料库按术语频率排序的top前几vocabSize词。 可选参数minDF还通过指定术语必须出现以包含在词汇表中的文档的最小数量(或小于1.0)来影响拟合过程。 另一个可选的二进制切换参数控制输出向量。 如果设置为true,则所有非零计数都设置为1.对于模拟二进制而不是整数的离散概率模型,这是非常有用的。

Examples

假设我们有如下的DataFrame包含id和texts两列:

 id | texts
----|----------
 0  | Array("a", "b", "c")
 1  | Array("a", "b", "b", "c", "a")

文本中的每一行都是Array[String]类型的文档。调用CountVectorizer的拟合产生一个具有词汇表(a, b, c)的CountVectorizerModel。然后转换后的输出列 包含“向量”这一列:

 id | texts                           | vector
----|---------------------------------|---------------
 0  | Array("a", "b", "c")            | (3,[0,1,2],[1.0,1.0,1.0])
 1  | Array("a", "b", "b", "c", "a")  | (3,[0,1,2],[2.0,2.0,1.0])

每个向量表示文档在词汇表上的标记数。

Scala
import org.apache.spark.ml.feature.{CountVectorizer, CountVectorizerModel}

val df = spark.createDataFrame(Seq(
  (0, Array("a", "b", "c")),
  (1, Array("a", "b", "b", "c", "a"))
)).toDF("id", "words")

// fit a CountVectorizerModel from the corpus
val cvModel: CountVectorizerModel = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("features")
  .setVocabSize(3)
  .setMinDF(2)
  .fit(df)

// alternatively, define CountVectorizerModel with a-priori vocabulary
val cvm = new CountVectorizerModel(Array("a", "b", "c"))
  .setInputCol("words")
  .setOutputCol("features")

cvModel.transform(df).show(false)

有关API的更多详细信息,请参阅CountVectorizer Scala文档CountVectorizerModel Scala文档

在Spark repo中的"examples/src/main/scala/org/apache/spark/examples/ml/CountVectorizerExample.scala"中查找完整示例代码。 


Feature Transformers(特征变换)

Tokenizer(分词器)

Tokenization(文本符号化)是将文本 (如一个句子)拆分成单词的过程。(在Spark ML中)Tokenizer(分词器)提供此功能。下面的示例演示如何将句子拆分为词的序列。

RegexTokenizer 提供了(更高级的)基于正则表达式 (regex) 匹配的(对句子或文本的)单词拆分。默认情况下,参数"pattern"(默认的正则表达式: "\\s+") 作为分隔符用于拆分输入的文本。或者,用户可以将参数“gaps”设置为 false ,指定正则表达式"pattern"表示"tokens",而不是分隔符,这样作为划分结果找到的所有匹配项。

Scala
import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}
import org.apache.spark.sql.functions._

val sentenceDataFrame = spark.createDataFrame(Seq(
  (0, "Hi I heard about Spark"),
  (1, "I wish Java could use case classes"),
  (2, "Logistic,regression,models,are,neat")
)).toDF("id", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")
val regexTokenizer = new RegexTokenizer()
  .setInputCol("sentence")
  .setOutputCol("words")
  .setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val countTokens = udf { (words: Seq[String]) => words.length }

val tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)

val regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words")
    .withColumn("tokens", countTokens(col("words"))).show(false)

请参阅 Tokenizer Scala docs  和 RegexTokenizer Scala docs 了解相关的 API 的详细信息

在Spark repo中路径 "examples/src/main/scala/org/apache/spark/examples/ml/TokenizerExample.scala" 找到完整的示例代码。 


StopWordsRemover(去停用词)

Stop words (停用字)是(在文档中)频繁出现,但未携带太多意义的词语,它们不应该参与算法运算。

 StopWordsRemover(的作用是)将输入的字符串 (如分词器 Tokenizer 的输出)中的停用字删除(后输出)。停用字表由 stopWords 参数指定。对于某些语言的默认停止词是通过调用 StopWordsRemover.loadDefaultStopWords(language) 设置的,可用的选项为"丹麦","荷兰语"、"英语"、"芬兰语","法国","德国"、"匈牙利"、"意大利"、"挪威"、"葡萄牙"、"俄罗斯"、"西班牙"、"瑞典"和"土耳其"。布尔型参数 caseSensitive 指示是否区分大小写 (默认为否)。

Examples

假设有如下DataFrame,有id和raw两列:

 id | raw
----|----------
 0  | [I, saw, the, red, baloon]
 1  | [Mary, had, a, little, lamb]

通过对 raw 列调用 StopWordsRemover,我们可以得到筛选出的结果列如下:

 id | raw                         | filtered
----|-----------------------------|--------------------
 0  | [I, saw, the, red, baloon]  |  [saw, red, baloon]
 1  | [Mary, had, a, little, lamb]|  [Mary, little, lamb]
其中,“I”, “the”, “had”以及“a”被移除。
Scala
import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()
  .setInputCol("raw")
  .setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(
  (0, Seq("I", "saw", "the", "red", "baloon")),
  (1, Seq("Mary", "had", "a", "little", "lamb"))
)).toDF("id", "raw")

remover.transform(dataSet).show()

有关API的更多详细信息,请参阅StopWordsRemover Scala文档 

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/StopWordsRemoverExample.scala"可以找到完整的示例代码。  


n-gram(N元模型

一个 n-gram是一个长度为n(整数)的字的序列。NGram可用于将输入特征转换成n-grams。

N-Gram 的输入为一系列的字符串(例如:Tokenizer分词器的输出)。参数 n 表示每个 n-gram 中单词(terms)的数量。输出将由 n-gram 序列组成,其中每个 n-gram 由空格分隔的 n 个连续词的字符串表示。如果输入的字符串序列少于n个单词,NGram 输出为空。

Scala
import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(
  (0, Array("Hi", "I", "heard", "about", "Spark")),
  (1, Array("I", "wish", "Java", "could", "use", "case", "classes")),
  (2, Array("Logistic", "regression", "models", "are", "neat"))
)).toDF("id", "words")

val ngram = new NGram().setN(2).setInputCol("words").setOutputCol("ngrams")

val ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(false) 

请参阅 NGram Scala docs了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/NGramExample.scala"里可以找到完整的示例代码。  


Binarizer(二值化)

Binarization (二值化)是将数值特征阈值化为二进制(0/1)特征的过程。

Binarizer(ML提供的二元化方法)二元化涉及的参数有 inputCol(输入)、outputCol(输出)以及threshold(阀值)。(输入的)特征值大于阀值将二值化为1.0,特征值小于等于阀值将二值化为0.0。inputCol 支持向量(Vector)和双精度(Double)类型。

Scala
import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))
val dataFrame = spark.createDataFrame(data).toDF("id", "feature")

val binarizer: Binarizer = new Binarizer()
  .setInputCol("feature")
  .setOutputCol("binarized_feature")
  .setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

println(s"Binarizer output with Threshold = ${binarizer.getThreshold}")
binarizedDataFrame.show()

请参阅  Binarizer Scala docs了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/BinarizerExample.scala"里可以找到完整的示例代码 


PCA(主元分析)

PCA 是使用正交变换将可能相关变量的一组观察值转换为称为主成分的线性不相关变量的值的一组统计过程。 PCA 类训练使用 PCA 将向量投影到低维空间的模型。下面的例子显示了如何将5维特征向量投影到3维主成分中。

Scala
import org.apache.spark.ml.feature.PCA
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),
  Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),
  Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val pca = new PCA()
  .setInputCol("features")
  .setOutputCol("pcaFeatures")
  .setK(3)
  .fit(df)

val result = pca.transform(df).select("pcaFeatures")
result.show(false)

请参阅  PCA Scala docs了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/PCAExample.scala"里可以找到完整的示例代码 


PolynomialExpansion(多项式扩展)

Polynomial expansion (多项式展开)是将特征扩展为多项式空间的过程,多项式空间由原始维度的n度组合组成。 PolynomialExpansion类提供此功能。 下面的例子显示了如何将您的功能扩展到3度多项式空间。

Scala
import org.apache.spark.ml.feature.PolynomialExpansion
import org.apache.spark.ml.linalg.Vectors

val data = Array(
  Vectors.dense(2.0, 1.0),
  Vectors.dense(0.0, 0.0),
  Vectors.dense(3.0, -1.0)
)
val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val polyExpansion = new PolynomialExpansion()
  .setInputCol("features")
  .setOutputCol("polyFeatures")
  .setDegree(3)

val polyDF = polyExpansion.transform(df)
polyDF.show(false)

请参阅  PolynomialExpansion Scala docs 了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/PolynomialExpansionExample.scala"里可以找到完整的示例代码  


Discrete Cosine Transform(DCT 离散余弦变换)

Discrete Cosine Transform离散余弦变换 是将时域的N维实数序列转换成频域的N维实数序列的过程(有点类似离散傅里叶变换)。(ML中的)DCT类提供了离散余弦变换DCT-II的功能,将离散余弦变换后结果乘以 得到一个与时域矩阵长度一致的矩阵。没有偏移被应用于变换的序列(例如,变换的序列的第0个元素是第0个DCT系数,而不是第N / 2个),即输入序列与输出之间是一一对应的。

Scala
import org.apache.spark.ml.feature.DCT
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  Vectors.dense(0.0, 1.0, -2.0, 3.0),
  Vectors.dense(-1.0, 2.0, 4.0, -7.0),
  Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()
  .setInputCol("features")
  .setOutputCol("featuresDCT")
  .setInverse(false)

val dctDf = dct.transform(df)
dctDf.select("featuresDCT").show(false)

请参阅 DCT Scala docs  了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/DCTExample.scala"里可以找到完整的示例代码  


StringIndexer(字符串-索引变换)

StringIndexer(字符串-索引变换)将标签的字符串列编号变成标签索引列。标签索引序列的取值范围是[0,numLabels(字符串中所有出现的单词去掉重复的词后的总和)],按照标签出现频率排序,出现最多的标签索引为0。如果输入是数值型,我们先将数值映射到字符串,再对字符串进行索引化。如果下游的 pipeline(例如:Estimator 或者 Transformer)需要用到索引化后的标签序列,则需要将这个 pipeline 的输入列名字指定为索引化序列的名字。大部分情况下,通过 setInputCol 设置输入的列名。

Examples

假设我们有如下的 DataFrame ,包含有 id 和 category 两列

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | a
 4  | a
 5  | c

标签类别(category)是有3种取值的标签:“a”,“b”,“c”。使用 StringIndexer 通过 category 进行转换成 categoryIndex 后可以得到如下结果:

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0
 3  | a        | 0.0
 4  | a        | 0.0
 5  | c        | 1.0

“a”因为出现的次数最多,所以得到为0的索引(index)。第二多的“c”得到1的索引,“b”得到2的索引

另外,StringIndexer 在转换新数据时提供两种容错机制处理训练中没有出现的标签

  • StringIndexer 抛出异常错误(默认值)
  • 跳过未出现的标签实例。

Examples

回顾一下上一个例子,这次我们将继续使用上一个例子训练出来的 StringIndexer 处理下面的数据集 

 id | category
----|----------
 0  | a
 1  | b
 2  | c
 3  | d

如果没有在 StringIndexer 里面设置未训练过(unseen)的标签的处理或者设置未 “error”,运行时会遇到程序抛出异常。当然,也可以通过设置 setHandleInvalid("skip"),得到如下的结果

 id | category | categoryIndex
----|----------|---------------
 0  | a        | 0.0
 1  | b        | 2.0
 2  | c        | 1.0

 注意:输出里面没有出现“d”

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(
  Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))
).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)
indexed.show()

请参阅  StringIndexer Scala docs了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/StringIndexerExample.scala"里可以找到完整的示例代码   


IndexToString(索引-字符串变换)

与 StringIndexer 对应,IndexToString 将索引化标签还原成原始字符串。一个常用的场景是先通过 StringIndexer 产生索引化标签,然后使用索引化标签进行训练,最后再对预测结果使用 IndexToString来获取其原始的标签字符串。

Examples

假设我们有如下的DataFrame包含id和categoryIndex两列:

 id | categoryIndex
----|---------------
 0  | 0.0
 1  | 2.0
 2  | 1.0
 3  | 0.0
 4  | 0.0
 5  | 1.0

使用IndexToString我们可以获取其原始的标签字符串如下:

 id | categoryIndex | originalCategory
----|---------------|-----------------
 0  | 0.0           | a
 1  | 2.0           | b
 2  | 1.0           | c
 3  | 0.0           | a
 4  | 0.0           | a
 5  | 1.0           | c
Scala
import org.apache.spark.ml.attribute.Attribute
import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

println(s"Transformed string column '${indexer.getInputCol}' " +
    s"to indexed column '${indexer.getOutputCol}'")
indexed.show()

val inputColSchema = indexed.schema(indexer.getOutputCol)
println(s"StringIndexer will store labels in output column metadata: " +
    s"${Attribute.fromStructField(inputColSchema).toString}\n")

val converter = new IndexToString()
  .setInputCol("categoryIndex")
  .setOutputCol("originalCategory")

val converted = converter.transform(indexed)

println(s"Transformed indexed column '${converter.getInputCol}' back to original string " +
    s"column '${converter.getOutputCol}' using labels in metadata")
converted.select("id", "categoryIndex", "originalCategory").show()

请参阅  IndexToString Scala docs 了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/IndexToStringExample.scala"里可以找到完整的示例代码   


OneHotEncoder(独热编码)

独热编码(One-hot encoding)将一列标签索引映射到一列二进制向量,最多只有一个单值。 该编码允许期望连续特征(例如逻辑回归)的算法使用分类特征。

Scala
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val df = spark.createDataFrame(Seq(
  (0, "a"),
  (1, "b"),
  (2, "c"),
  (3, "a"),
  (4, "a"),
  (5, "c")
)).toDF("id", "category")

val indexer = new StringIndexer()
  .setInputCol("category")
  .setOutputCol("categoryIndex")
  .fit(df)
val indexed = indexer.transform(df)

val encoder = new OneHotEncoder()
  .setInputCol("categoryIndex")
  .setOutputCol("categoryVec")

val encoded = encoder.transform(indexed)
encoded.show()

 请参阅  OneHotEncoder Scala docs了解相关的 API 的详细信息

 在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/OneHotEncoderExample.scala"里可以找到完整的示例代码  


VectorIndexer(向量类型索引化)

VectorIndexer可以帮助指定向量数据集中的分类特征。它可以自动确定哪些功能是分类的,并将原始值转换为类别索引。具体来说,它执行以下操作:

  1. 取一个Vector类型的输入列和一个参数maxCategories。
  2. 根据不同值的数量确定哪些功能应分类,其中最多maxCategories的功能被声明为分类。
  3. 为每个分类功能计算基于0的类别索引。
  4. 索引分类特征并将原始特征值转换为索引。

索引分类功能允许诸如决策树和树组合之类的算法适当地处理分类特征,提高性能。

在下面的示例中,我们读取标注点的数据集,然后使用VectorIndexer来确定哪些功能应被视为分类。我们将分类特征值转换为其索引。然后,该转换的数据可以传递给诸如DecisionTreeRegressor之类的算法来处理分类特征。

 

Scala
import org.apache.spark.ml.feature.VectorIndexer

val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val indexer = new VectorIndexer()
  .setInputCol("features")
  .setOutputCol("indexed")
  .setMaxCategories(10)

val indexerModel = indexer.fit(data)

val categoricalFeatures: Set[Int] = indexerModel.categoryMaps.keys.toSet
println(s"Chose ${categoricalFeatures.size} categorical features: " +
  categoricalFeatures.mkString(", "))

// Create new column "indexed" with categorical values transformed to indices
val indexedData = indexerModel.transform(data)
indexedData.show()

请参阅  VectorIndexer Scala docs 了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/VectorIndexerExample.scala"里可以找到完整的示例代码    


Interaction(相互作用)

交互是一个变换器,它采用向量或双值列,并生成一个单个向量列,其中包含来自每个输入列的一个值的所有组合的乘积。

例如,如果您有2个向量类型的列,每个列具有3个维度作为输入列,那么您将获得一个9维向量作为输出列。

Examples

假设我们有如下DataFrame,列为“id1”, “vec1” 和 “vec2”:

 id1|vec1          |vec2          
  ---|--------------|--------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] 
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] 
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] 
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] 
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0]  

应用与这些输入列的交互,然后将交互作为输出列包含:

 id1|vec1          |vec2          |interactedCol                                         
  ---|--------------|--------------|------------------------------------------------------
  1  |[1.0,2.0,3.0] |[8.0,4.0,5.0] |[8.0,4.0,5.0,16.0,8.0,10.0,24.0,12.0,15.0]            
  2  |[4.0,3.0,8.0] |[7.0,9.0,8.0] |[56.0,72.0,64.0,42.0,54.0,48.0,112.0,144.0,128.0]     
  3  |[6.0,1.0,9.0] |[2.0,3.0,6.0] |[36.0,54.0,108.0,6.0,9.0,18.0,54.0,81.0,162.0]        
  4  |[10.0,8.0,6.0]|[9.0,4.0,5.0] |[360.0,160.0,200.0,288.0,128.0,160.0,216.0,96.0,120.0]
  5  |[9.0,2.0,7.0] |[10.0,7.0,3.0]|[450.0,315.0,135.0,100.0,70.0,30.0,350.0,245.0,105.0] 
  6  |[1.0,1.0,4.0] |[2.0,8.0,4.0] |[12.0,48.0,24.0,12.0,48.0,24.0,48.0,192.0,96.0]  
Scala
import org.apache.spark.ml.feature.Interaction
import org.apache.spark.ml.feature.VectorAssembler

val df = spark.createDataFrame(Seq(
  (1, 1, 2, 3, 8, 4, 5),
  (2, 4, 3, 8, 7, 9, 8),
  (3, 6, 1, 9, 2, 3, 6),
  (4, 10, 8, 6, 9, 4, 5),
  (5, 9, 2, 7, 10, 7, 3),
  (6, 1, 1, 4, 2, 8, 4)
)).toDF("id1", "id2", "id3", "id4", "id5", "id6", "id7")

val assembler1 = new VectorAssembler().
  setInputCols(Array("id2", "id3", "id4")).
  setOutputCol("vec1")

val assembled1 = assembler1.transform(df)

val assembler2 = new VectorAssembler().
  setInputCols(Array("id5", "id6", "id7")).
  setOutputCol("vec2")

val assembled2 = assembler2.transform(assembled1).select("id1", "vec1", "vec2")

val interaction = new Interaction()
  .setInputCols(Array("id1", "vec1", "vec2"))
  .setOutputCol("interactedCol")

val interacted = interaction.transform(assembled2)

interacted.show(truncate = false)

有关API的更多详细信息,请参阅交互式Scala文档

在Spark repo中的“examples/src/main/scala/org/apache/spark/examples/ml/InteractionExample.scala”中查找完整示例代码。

Normalizer(范数p-norm规范化)

Normalizer是一个转换器,它可以将一组特征向量(通过计算p-范数)规范化。参数为p(默认值:2)来指定规范化中使用的p-norm。规范化操作可以使输入数据标准化,对后期机器学习算法的结果也有更好的表现。

下面的例子展示如何读入一个libsvm格式的数据,然后将每一行转换为以及形式。

Scala
import org.apache.spark.ml.feature.Normalizer
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.5, -1.0)),
  (1, Vectors.dense(2.0, 1.0, 1.0)),
  (2, Vectors.dense(4.0, 10.0, 2.0))
)).toDF("id", "features")

// Normalize each Vector using $L^1$ norm.
val normalizer = new Normalizer()
  .setInputCol("features")
  .setOutputCol("normFeatures")
  .setP(1.0)

val l1NormData = normalizer.transform(dataFrame)
println("Normalized using L^1 norm")
l1NormData.show()

// Normalize each Vector using $L^\infty$ norm.
val lInfNormData = normalizer.transform(dataFrame, normalizer.p -> Double.PositiveInfinity)
println("Normalized using L^inf norm")
lInfNormData.show()

请参阅  Normalizer Scala docs 了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/NormalizerExample.scala"里可以找到完整的示例代码。   


StandardScaler(标准化)

StandardScaler转换Vector行的数据集,使每个要素标准化以具有单位标准偏差和 或 零均值。它需要参数:
  • withStd:默认为True。将数据缩放到单位标准偏差。
  • withMean:默认为false。在缩放之前将数据中心为平均值。它将构建一个密集的输出,所以在应用于稀疏输入时要小心。

StandardScaler是一个Estimator,可以适合数据集生成StandardScalerModel; 这相当于计算汇总统计数据。 然后,模型可以将数据集中的向量列转换为具有单位标准偏差和/或零平均特征。

请注意,如果特征的标准偏差为零,它将在该特征的向量中返回默认的0.0值。

以下示例演示如何以libsvm格式加载数据集,然后将每个要素归一化以具有单位标准偏差。

Scala
import org.apache.spark.ml.feature.StandardScaler

val dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

val scaler = new StandardScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")
  .setWithStd(true)
  .setWithMean(false)

// Compute summary statistics by fitting the StandardScaler.
val scalerModel = scaler.fit(dataFrame)

// Normalize each feature to have unit standard deviation.
val scaledData = scalerModel.transform(dataFrame)
scaledData.show()

请参阅StandardScaler Scala docs了解相关的 API 的详细信息。

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/StandardScalerExample.scala"里可以找到完整的示例代码。

  

MinMaxScaler(最大-最小规范化) 

MinMaxScaler转换Vector行的数据集,将每个要素的重新映射到特定范围(通常为[0,1])。它需要参数:
  • min:默认为0.0,转换后的下限,由所有功能共享。
  • max:默认为1.0,转换后的上限,由所有功能共享。

MinMaxScaler计算数据集的统计信息,并生成MinMaxScalerModel。然后,模型可以单独转换每个要素,使其在给定的范围内。

特征E的重新缩放值被计算为:

                         

对于情况Emax == Emin,Rescaled(ei) = 0.5 *(max + min)

请注意,由于零值可能会转换为非零值,即使对于稀疏输入,变压器的输出也将为DenseVector。

以下示例演示如何以libsvm格式加载数据集,然后将每个要素重新缩放为[0,1]。
Scala
import org.apache.spark.ml.feature.MinMaxScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -1.0)),
  (1, Vectors.dense(2.0, 1.1, 1.0)),
  (2, Vectors.dense(3.0, 10.1, 3.0))
)).toDF("id", "features")

val scaler = new MinMaxScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MinMaxScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [min, max].
val scaledData = scalerModel.transform(dataFrame)
println(s"Features scaled to range: [${scaler.getMin}, ${scaler.getMax}]")
scaledData.select("features", "scaledFeatures").show()

请参阅MinMaxScaler Scala docs 和 MinMaxScalerModel Scala docs 了解相关的 API 的详细信息。

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/MinMaxScalerExample.scala"里可以找到完整的示例代码   


MaxAbsScaler(绝对值规范化)

MaxAbsScaler转换Vector行的数据集,通过划分每个要素中的最大绝对值,将每个要素的重新映射到范围[-1,1]。 它不会使数据移动/居中,因此不会破坏任何稀疏性。

MaxAbsScaler计算数据集的统计信息,并生成MaxAbsScalerModel。 然后,模型可以将每个要素单独转换为范围[-1,1]。

以下示例演示如何以libsvm格式加载数据集,然后将每个要素重新缩放为[-1,1]。

Scala
import org.apache.spark.ml.feature.MaxAbsScaler
import org.apache.spark.ml.linalg.Vectors

val dataFrame = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 0.1, -8.0)),
  (1, Vectors.dense(2.0, 1.0, -4.0)),
  (2, Vectors.dense(4.0, 10.0, 8.0))
)).toDF("id", "features")

val scaler = new MaxAbsScaler()
  .setInputCol("features")
  .setOutputCol("scaledFeatures")

// Compute summary statistics and generate MaxAbsScalerModel
val scalerModel = scaler.fit(dataFrame)

// rescale each feature to range [-1, 1]
val scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()

请参阅 MaxAbsScaler Scala docs 和 MaxAbsScalerModel Scala docs 了解相关的 API 的详细信息。

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/MaxAbsScalerExample.scala"里可以找到完整的示例代码 


Bucketizer(分箱器

Bucketizer 将一列连续的特征转换为特征 buckets(区间),buckets(区间)由用户指定。Bucketizer 需要一个参数:

splits(分割):这是个将连续的特征转换为 buckets(区间)的参数. n+1次分割时,将产生n个 buckets(区间)。一个bucket(区间)通过范围 [x,y) 中 来定义除了最后一个 bucket 包含 Splits(分割)应该是严格递增的。-inf, inf 之间的值必须明确提供来覆盖所有的 Double 值;另外,Double 值超出 splits(分割)指定的值将认为是错误的. 两个splits (拆分)的例子为 Array(Double.NegativeInfinity, 0.0, 1.0, Double.PositiveInfinity)以及Array(0.0, 1.0, 2.0)。

 

请注意,如果你不知道目标列的上线和下限,则应将 Double.NegativeInfinity   Double.PositiveInfinity 添加为splits(分割)的边界,以防止 Bucketizer 界限出现异常.

还请注意,你提供的 splits(分割)必须严格按照增加的顺序,即 s0 < s1 < s2 < ... < sn.

更多的细节可以查看 Bucketizer 的API文档.

下面这个例子演示了如何将包含 Doubles 的一列 bucketize (分箱)为另外一个索引列.

Scala
import org.apache.spark.ml.feature.Bucketizer

val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, Double.PositiveInfinity)

val data = Array(-0.5, -0.3, 0.0, 0.2)
val dataFrame = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val bucketizer = new Bucketizer()
  .setInputCol("features")
  .setOutputCol("bucketedFeatures")
  .setSplits(splits)

// Transform original data into its bucket index.
val bucketedData = bucketizer.transform(dataFrame)
bucketedData.show()

请参阅 Bucketizer Scala docs了解相关的 API 的详细信息。

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/BucketizerExample.scala"里可以找到完整的示例代码。


ElementwiseProduct (Hadamard乘积)

ElementwiseProduct 每个输入向量中乘以一个 weight(权重)向量,使用元素相乘的方法.换句话来说,就是通过scalar multiplier (标量乘法)对数据集中的每一列进行缩放。这表示输入向量 和转换向量 通过 Hadamard productHadamard积) 产生一个结果向量.

下面例子展示如何通过转换向量的值来调整向量。

Scala
import org.apache.spark.ml.feature.ElementwiseProduct
import org.apache.spark.ml.linalg.Vectors

// Create some vector data; also works for sparse vectors
val dataFrame = spark.createDataFrame(Seq(
  ("a", Vectors.dense(1.0, 2.0, 3.0)),
  ("b", Vectors.dense(4.0, 5.0, 6.0)))).toDF("id", "vector")

val transformingVector = Vectors.dense(0.0, 1.0, 2.0)
val transformer = new ElementwiseProduct()
  .setScalingVec(transformingVector)
  .setInputCol("vector")
  .setOutputCol("transformedVector")

// Batch transform the vectors to create new column:
transformer.transform(dataFrame).show()

请参阅 ElementwiseProduct Scala docs 了解相关的 API 的详细信息。

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/ElementwiseProductExample.scala"里可以找到完整的示例代码 。


SQLTransformer(SQL变换器)

SQLTransformer(SQL转换器)实现由 SQL 语句定义的转换 。目前我们只支持SQL语法如 "SELECT ... FROM __THIS__ ..." ,其中 "__THIS__" 代表输入数据集的基础表。选择语句指定输出中展示的字段、元素和表达式,支持Spark SQL 中的所有选择语句。用户还可以使用 Spark SQL 内置函数和U DFs(自定义函数)来对这些选定的列进行操作。SQLTransformer 支持如下语句:

  • SELECT a, a + b AS a_b FROM __THIS__
  • SELECT a, SQRT(b) AS b_sqrt FROM __THIS__ where a > 5
  • SELECT a, b, SUM(c) AS c_sum FROM __THIS__ GROUP BY a, b

例子:

假设我们有如下DataFrame包含 idv1v2列:

 id |  v1 |  v2
----|-----|-----
 0  | 1.0 | 3.0  
 2  | 2.0 | 5.0
下面是使用  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__" 的 SQLTransformer 的输出:
 id |  v1 |  v2 |  v3 |  v4
----|-----|-----|-----|-----
 0  | 1.0 | 3.0 | 4.0 | 3.0
 2  | 2.0 | 5.0 | 7.0 |10.0
Scala
import org.apache.spark.ml.feature.SQLTransformer

val df = spark.createDataFrame(
  Seq((0, 1.0, 3.0), (2, 2.0, 5.0))).toDF("id", "v1", "v2")

val sqlTrans = new SQLTransformer().setStatement(
  "SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")

sqlTrans.transform(df).show()
请参阅 SQLTransformer Scala docs  了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/SQLTransformerExample.scala"里可以找到完整的示例代码。 


VectorAssembler(特征向量合并)

VectorAssembler 是将给定的一系列的列合并到单个向量列中的 transformer。它可以将原始特征和不同特征transformers(转换器)生成的特征合并为单个特征向量,来训练 ML 模型,如逻辑回归和决策树等机器学习算法。VectorAssembler 可接受以下的输入列类型:所有数值型、布尔类型、向量类型。输入列的值将按指定顺序依次添加到一个向量中。
Examples
假设我们有一个 DataFrame 包含 idhourmobileuserFeatures以及clicked 列:

 

 id | hour | mobile | userFeatures     | clicked
----|------|--------|------------------|---------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0
userFeatures 是一个包含3个用户特征的特征列,我们希望将 hourmobile 以及 userFeatures 组合为一个单一特征向量叫做 features,并将其用于预测是否点击。如果我们设置 VectorAssembler 的输入列为 hour mobile 以及userFeatures,输出列为 features,转换后我们应该得到以下结果:
 id | hour | mobile | userFeatures     | clicked | features
----|------|--------|------------------|---------|-----------------------------
 0  | 18   | 1.0    | [0.0, 10.0, 0.5] | 1.0     | [18.0, 1.0, 0.0, 10.0, 0.5]
Scala
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors

val dataset = spark.createDataFrame(
  Seq((0, 18, 1.0, Vectors.dense(0.0, 10.0, 0.5), 1.0))
).toDF("id", "hour", "mobile", "userFeatures", "clicked")

val assembler = new VectorAssembler()
  .setInputCols(Array("hour", "mobile", "userFeatures"))
  .setOutputCol("features")

val output = assembler.transform(dataset)
println(output.select("features", "clicked").first())
请参阅  VectorAssembler Scala docs了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/VectorAssemblerExample.scala"里可以找到完整的示例代码。

 

QuantileDiscretizer(分位数离散化)

QuantileDiscretizer(分位数离散化)采用具有连续特征的列,并输出具有分类特征的列.bin(分级)的数量由numBuckets 参数设置。buckets(区间数)有可能小于这个值,例如,如果输入的不同值太少,就无法创建足够的不同的quantiles(分位数).

NaN values:在 QuantileDiscretizer fitting NaN值会从列中移除.这将产生一个 Bucketizer 模型进行预测.在转换过程中,Bucketizer 会发出错误信息当在数据集中找到 NaN 值,但用户也可以通过设置 handleInvalid 来选择保留或删除数据集中的 NaN 值.如果用户选择保留 NaN 值,那么它们将被特别处理并放入自己的 bucket(区间)中.例如,如果使用4个 buckets(区间),那么非 NaN 数据将放入 buckets[0-3],NaN将计数在特殊的 bucket[4] 中.

Algorithm:使用近似算法来选择 bin 的范围(有关详细说明可以参考 approxQuantile 的文档)。可以使用relativeError参数来控制近似的精度。当设置为零时,计算精确的 quantiles(分位数)(注意:计算 quantiles(分位数)是一项昂贵的操作.下边界和上边界将被 -Infinity(负无穷) 和 +Infinity(正无穷) 覆盖所有实际值).

Examples

假设我们有一个 DataFrame 包含 idhour 列:

 id | hour
----|------
 0  | 18.0
----|------
 1  | 19.0
----|------
 2  | 8.0
----|------
 3  | 5.0
----|------
 4  | 2.2

hour 是一个 Double 类型的连续特征,我们想要将连续的特征变成一个特征。将参数 numBuckets 设置为 3,我们应该得到以下 DataFrame :

 

 id | hour | result
----|------|------
 0  | 18.0 | 2.0
----|------|------
 1  | 19.0 | 2.0
----|------|------
 2  | 8.0  | 1.0
----|------|------
 3  | 5.0  | 1.0
----|------|------
 4  | 2.2  | 0.0
Scala
import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
var df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
  .setInputCol("hour")
  .setOutputCol("result")
  .setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show()
请参阅  QuantileDiscretizer Scala docs了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/QuantileDiscretizerExample.scala"里可以找到完整的示例代码

 

Feature Selectors(特征选择)

VectorSlicer(向量切片机

VectorSlicer是一个转换器,它采用特征向量,并输出一个新的特征向量与原始特征的子阵列。从向量列中提取特征很有用。

VectorSlicer接受具有指定索引的向量列,然后输出一个新的向量列,其值通过这些索引进行选择。有两种类型的指数:

  1. 代表向量中的索引的整数索引,setIndices()。
  2. 表示向量中特征名称的字符串索引,setNames(),此类要求向量列有AttributeGroup,因为实现在Attribute的name字段上匹配。
 

整数和字符串的规格都可以接受。此外,您可以同时使用整数索引和字符串名称。必须至少选择一个特征。重复的功能是不允许的,所以选择的索引和名称之间不能有重叠。请注意,如果选择了功能的名称,则会遇到空的输入属性时会抛出异常。

输出向量将首先(按照给定的顺序)对所选索引的特征进行排序,其次是所选择的名称(按照给定的顺序)。

Examples
假设我们有一个含有userFeatures列的DataFrame

 

 userFeatures
------------------
 [0.0, 10.0, 0.5]
userFeatures是一个包含三个用户功能的向量列。
假设userFeature的第一列全部为0,因此我们要删除它并仅选择最后两列。
VectorSlicer使用setIndices(1,2)选择最后两个元素,然后生成一个名为features的新向量列:
 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]
假设我们对userFeatures具有潜在的输入属性,即["f1", "f2", "f3"],那么我们可以使用setNames("f2", "f3")来选择它们。
 userFeatures     | features
------------------|-----------------------------
 [0.0, 10.0, 0.5] | [10.0, 0.5]
 ["f1", "f2", "f3"] | ["f2", "f3"]
 
Scala
import java.util.Arrays

import org.apache.spark.ml.attribute.{Attribute, AttributeGroup, NumericAttribute}
import org.apache.spark.ml.feature.VectorSlicer
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType

val data = Arrays.asList(
  Row(Vectors.sparse(3, Seq((0, -2.0), (1, 2.3)))),
  Row(Vectors.dense(-2.0, 2.3, 0.0))
)

val defaultAttr = NumericAttribute.defaultAttr
val attrs = Array("f1", "f2", "f3").map(defaultAttr.withName)
val attrGroup = new AttributeGroup("userFeatures", attrs.asInstanceOf[Array[Attribute]])

val dataset = spark.createDataFrame(data, StructType(Array(attrGroup.toStructField())))

val slicer = new VectorSlicer().setInputCol("userFeatures").setOutputCol("features")

slicer.setIndices(Array(1)).setNames(Array("f3"))
// or slicer.setIndices(Array(1, 2)), or slicer.setNames(Array("f2", "f3"))

val output = slicer.transform(dataset)
output.show(false)

请参阅  VectorSlicer Scala docs了解相关的 API 的详细信息

 在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/VectorSlicerExample.scala"里可以找到完整的示例代码。


RFormula(R模型公式) 

RFormula选择由R模型公式R model formula指定的列。目前,我们支持R运算符的有限子集,包括‘~’, ‘.’, ‘:’, ‘+’以及‘-‘,基本操作如下:

  •  ~分隔目标和对象
  •  +合并对象,“+ 0”表示删除截距
  •  - 删除对象,“ - 1”表示删除截距
  •  :交互(数字乘法或二值化分类值
  •  . 除了目标外的全部列

假设a和b是double列,我们使用以下简单的例子来说明RFormula的效果:

  • y ~ a + b表示模型y ~ w0 + w1 * a +w2 * b其中w0为截距,w1w2为相关系数。
  • y ~a + b + a:b – 1表示模型y ~ w1* a + w2 * b + w3 * a * b,其中w1w2w3是相关系数。

RFormula产生一个特征向量列和一个标签的double列或label列。 像R在线性回归中使用公式时,字符型的输入将转换成one-hot编码,数字列将被转换为双精度。 如果label列是类型字符串,则它将首先使用StringIndexer转换为double。 如果DataFrame中不存在label列,则会从公式中指定的响应变量创建输出标签列。

Examples

假设我们有一个具有列id,country,hour和clicked的DataFrame:

id | country | hour | clicked
---|---------|------|---------
 7 | "US"    | 18   | 1.0
 8 | "CA"    | 12   | 0.0
 9 | "NZ"    | 15   | 0.0

如果我们使用具有clicked ~ country + hour的公式字符串的RFormula,这表示我们想要基于country 和hour预测clicked,转换后我们应该得到以下DataFrame:

id | country | hour | clicked | features         | label
---|---------|------|---------|------------------|-------
 7 | "US"    | 18   | 1.0     | [0.0, 0.0, 18.0] | 1.0
 8 | "CA"    | 12   | 0.0     | [0.0, 1.0, 12.0] | 0.0
 9 | "NZ"    | 15   | 0.0     | [1.0, 0.0, 15.0] | 0.0
Scala
import org.apache.spark.ml.feature.RFormula

val dataset = spark.createDataFrame(Seq(
  (7, "US", 18, 1.0),
  (8, "CA", 12, 0.0),
  (9, "NZ", 15, 0.0)
)).toDF("id", "country", "hour", "clicked")

val formula = new RFormula()
  .setFormula("clicked ~ country + hour")
  .setFeaturesCol("features")
  .setLabelCol("label")

val output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()

请参阅 RFormula Scala docs 了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/RFormulaExample.scala"里可以找到完整的示例代码


ChiSqSelector(卡方特征选择器)

ChiSqSelector代表卡方特征选择。它适用于带有类别特征的标签数据。ChiSqSelector使用卡方独立测试来决定选择哪些特征。它支持三种选择方法:numTopFeaturespercentilefpr:

  • numTopFeatures根据卡方检验选择固定数量的顶级功能。这类似于产生具有最大预测能力的功能。
  • percentile类似于numTopFeatures,但选择所有功能的一部分,而不是固定数量。
  • fpr选择p值低于阈值的所有特征,从而控制选择的假阳性率。

默认情况下,选择方法是numTopFeatures,默认的顶级功能数量设置为50.用户可以使用setSelectorType选择一种选择方法。

Examples

假设我们有一个具有列id, features和clicked的DataFrame,这被用作我们预测的目标:

id | features              | clicked
---|-----------------------|---------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0
如果我们使用ChiSqSelector并设置numTopFeatures=1,根据我们所有的特征,其中最后一列标签clicked是认为最有用的特征:

 

id | features              | clicked | selectedFeatures
---|-----------------------|---------|------------------
 7 | [0.0, 0.0, 18.0, 1.0] | 1.0     | [1.0]
 8 | [0.0, 1.0, 12.0, 0.0] | 0.0     | [0.0]
 9 | [1.0, 0.0, 15.0, 0.1] | 0.0     | [0.1]
Scala
import org.apache.spark.ml.feature.ChiSqSelector
import org.apache.spark.ml.linalg.Vectors

val data = Seq(
  (7, Vectors.dense(0.0, 0.0, 18.0, 1.0), 1.0),
  (8, Vectors.dense(0.0, 1.0, 12.0, 0.0), 0.0),
  (9, Vectors.dense(1.0, 0.0, 15.0, 0.1), 0.0)
)

val df = spark.createDataset(data).toDF("id", "features", "clicked")

val selector = new ChiSqSelector()
  .setNumTopFeatures(1)
  .setFeaturesCol("features")
  .setLabelCol("clicked")
  .setOutputCol("selectedFeatures")

val result = selector.fit(df).transform(df)

println(s"ChiSqSelector output with top ${selector.getNumTopFeatures} features selected")
result.show()

请参阅 ChiSqSelector Scala docs 了解相关的 API 的详细信息

在Spark repo中路径"examples/src/main/scala/org/apache/spark/examples/ml/ChiSqSelectorExample.scala"里可以找到完整的示例代码。


Locality Sensitive Hashing(局部敏感哈希)

LSH是哈希技术中重要的一种,通常用于集群,近似最近邻搜索和大型数据集的孤立点检测。

LSH的大致思路是用一系列函数(LSH families)将数据哈希到桶中,这样彼此接近的数据点处于相同的桶中可能性就会很高,而彼此相距很远的数据点很可能处于不同的桶中。一个LSH family 正式定义如下。

在度量空间(M,d)中,M是一个集合,d是M上的一个距离函数,LSH family是一系列能满足以下属性的函数h:

                                                        

满足以上条件的LSH family被称为(r1, r2, p1, p2)-sensitive。

在Spark中,不同的LSH families实现在不同的类中(例如:MinHash ),并且在每个类中提供了用于特征变换的API,近似相似性连接和近似最近邻。

在LSH中,我们将一个假阳性定义为一对相距大的输入特征(当 d(p,q)≥r2 时),它们被哈希到同一个桶中,并且将一个假阴性定义为一对相邻的特征(当  d(p,q)≤r1 时 ),它们被分散到不同的桶中。

 

LSH OperationsLSH运算)

我们称使用LSH的主要操作类型为LSH 运算。 一个合适的LSH模型对每一个操作中都有对应的方法。

Feature Transformation(特征变换)

特征变换是将哈希值添加为新列的基本功能。 这可以有助于降低维数。 用户可以通过设置 inputCol 和 outputCol  参数来指定输入和输出列名。

LSH 还支持多个LSH哈希表。 用户可以通过设置 numHashTables 来指定哈希表的数量。 这也用于近似相似性连接和近似最近邻的 OR-amplification(或放大器)放大。 增加哈希表的数量将增加准确性,但也会增加通信成本和运行时间。

outputCol 的类型是 Seq [Vector],其中数组的维数等于 numHashTables ,并且向量的维度当前设置为1。在将来的版本中,我们将实现 AND-amplification(与放大器),以便用户可以指定这些向量的维度 。

Approximate Similarity Join(近似相似度连接)

近似相似度连接采用两个数据集,并且近似返回距离小于用户定义阈值的数据集中的行对。 近似相似度连接支持两个不同的数据集连接和自连接。 Self-joinin (自连接)会产生一些重复的对。

近似相似度连接接受已转换和未转换的数据集作为输入。 如果使用未转换的数据集,它将自动转换。 在这种情况下,哈希签名将被创建为outputCol。

在加入的数据集中,可以在数据集A和数据集B中查询原始数据集。 距离列将被添加到输出数据集,以显示返回的每对行之间的真实距离。

Approximate Nearest Neighbor Search(近似最邻近搜索)

近似最近邻搜索采用数据集(特征向量)和密钥(单个特征向量),并且它近似返回数据集中最接近向量的指定数量的行。

近似最近邻搜索接受已转换和未转换的数据集作为输入。 如果使用未转换的数据集,它将自动转换。 在这种情况下,哈希签名将被创建为outputCol。

距离列将被添加到输出数据集,以显示每个输出行和搜索的键之间的真实距离。

注意:当哈希桶中没有足够的候选项时,近似最近邻搜索将返回少于k行。

LSH AlgorithmsLSH算法

Bucketed Random Projection for Euclidean Distance( 欧几里得度量的随机投影)

随机桶投影是用于欧几里德距离的 LSH family。 欧氏度量的定义如下:

                                                        

其LSH family将向量x特征向量映射到随机单位矢量v,并将映射结果分为哈希桶中:

                                                      

其中r是用户定义的桶长度,桶长度可用于控制哈希桶的平均大小(因此也可用于控制桶的数量)。 较大的桶长度(即,更少的桶)增加了将特征哈希到相同桶的概率(增加真实和假阳性的数量)。

桶随机投影接受任意向量作为输入特征,并支持稀疏和密集向量。

Scala
import org.apache.spark.ml.feature.BucketedRandomProjectionLSH
import org.apache.spark.ml.linalg.Vectors

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.dense(1.0, 1.0)),
  (1, Vectors.dense(1.0, -1.0)),
  (2, Vectors.dense(-1.0, -1.0)),
  (3, Vectors.dense(-1.0, 1.0))
)).toDF("id", "keys")

val dfB = spark.createDataFrame(Seq(
  (4, Vectors.dense(1.0, 0.0)),
  (5, Vectors.dense(-1.0, 0.0)),
  (6, Vectors.dense(0.0, 1.0)),
  (7, Vectors.dense(0.0, -1.0))
)).toDF("id", "keys")

val key = Vectors.dense(1.0, 0.0)

val brp = new BucketedRandomProjectionLSH()
  .setBucketLength(2.0)
  .setNumHashTables(3)
  .setInputCol("keys")
  .setOutputCol("values")

val model = brp.fit(dfA)

// Feature Transformation
model.transform(dfA).show()
// Cache the transformed columns
val transformedA = model.transform(dfA).cache()
val transformedB = model.transform(dfB).cache()

// Approximate similarity join
model.approxSimilarityJoin(dfA, dfB, 1.5).show()
model.approxSimilarityJoin(transformedA, transformedB, 1.5).show()
// Self Join
model.approxSimilarityJoin(dfA, dfA, 2.5).filter("datasetA.id < datasetB.id").show()

// Approximate nearest neighbor search
model.approxNearestNeighbors(dfA, key, 2).show()
model.approxNearestNeighbors(transformedA, key, 2).show()

如需了解API中的更多详细信息,请参阅 BucketedRandomProjectionLSH Scala 文档

在Spark repo 路径“examples/src/main/scala/org/apache/spark/examples/ml/BucketedRandomProjectionLSHExample.scala”中查找完整示例代码。

MinHash for Jaccard Distance

MinHash 是一个用于Jaccard 距离的 LSH family,它的输入特征是自然数的集合。 两组的Jaccard距离由它们的交集和并集的基数定义:

                                         

MinHash 将随机哈希函数g应用于集合中的每个元素,并取得所有哈希值中的最小值。

                                        

MinHash 的输入集合表示为二进制向量,其中向量索引表示元素本身,向量中的非零值表示该元素在集合中存在。尽管支持稠密和稀疏向量,但通常推荐使用稀疏向量来提高效率。 例如,Vectors.sparse(10,Array [(2,1.0),(3,1.0),(5,1.0)])表示空间中有10个元素。 该集合包含 elem 2,elem 3 和 elem 5。所有非零值都被视为二进制“1”值。

注意:空集不能被MinHash转换,这意味着任何输入向量必须至少有一个非零条目。

Scala
import org.apache.spark.ml.feature.MinHashLSH
import org.apache.spark.ml.linalg.Vectors

val dfA = spark.createDataFrame(Seq(
  (0, Vectors.sparse(6, Seq((0, 1.0), (1, 1.0), (2, 1.0)))),
  (1, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (4, 1.0)))),
  (2, Vectors.sparse(6, Seq((0, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "keys")

val dfB = spark.createDataFrame(Seq(
  (3, Vectors.sparse(6, Seq((1, 1.0), (3, 1.0), (5, 1.0)))),
  (4, Vectors.sparse(6, Seq((2, 1.0), (3, 1.0), (5, 1.0)))),
  (5, Vectors.sparse(6, Seq((1, 1.0), (2, 1.0), (4, 1.0))))
)).toDF("id", "keys")

val key = Vectors.sparse(6, Seq((1, 1.0), (3, 1.0)))

val mh = new MinHashLSH()
  .setNumHashTables(3)
  .setInputCol("keys")
  .setOutputCol("values")

val model = mh.fit(dfA)

// Feature Transformation
model.transform(dfA).show()
// Cache the transformed columns
val transformedA = model.transform(dfA).cache()
val transformedB = model.transform(dfB).cache()

// Approximate similarity join
model.approxSimilarityJoin(dfA, dfB, 0.6).show()
model.approxSimilarityJoin(transformedA, transformedB, 0.6).show()
// Self Join
model.approxSimilarityJoin(dfA, dfA, 0.6).filter("datasetA.id < datasetB.id").show()

// Approximate nearest neighbor search
model.approxNearestNeighbors(dfA, key, 2).show()
model.approxNearestNeighbors(transformedA, key, 2).show()

如需了解API中的更多详细信息,请参阅 MinHashLSH Scala 文档

在Spark repo 路径“examples/src/main/scala/org/apache/spark/examples/ml/MinHashLSHExample.scala”中查找完整示例代码。

 

  • 无标签