您的位置:首页 > 编程语言 > Python开发

spark--二十种特征变换方法及Spark MLlib调用实例(Scala/Java/python)(一)

2017-04-24 14:06 681 查看
来源:http://blog.csdn.net/liulingyuan6/article/details/53397780

Tokenizer(分词器)

算法介绍:

Tokenization将文本划分为独立个体(通常为单词)。下面的例子展示了如何把句子划分为单词。

RegexTokenizer基于正则表达式提供更多的划分选项。默认情况下,参数“pattern”为划分文本的分隔符。或者,用户可以指定参数“gaps”来指明正则“patten”表示“tokens”而不是分隔符,这样来为分词结果找到所有可能匹配的情况。

示例调用:

Scala:

[plain] view
plain copy

import org.apache.spark.ml.feature.{RegexTokenizer, Tokenizer}

val sentenceDataFrame = spark.createDataFrame(Seq(

(0, "Hi I heard about Spark"),

(1, "I wish Java could use case classes"),

(2, "Logistic,regression,models,are,neat")

)).toDF("label", "sentence")

val tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words")

val regexTokenizer = new RegexTokenizer()

.setInputCol("sentence")

.setOutputCol("words")

.setPattern("\\W") // alternatively .setPattern("\\w+").setGaps(false)

val tokenized = tokenizer.transform(sentenceDataFrame)

tokenized.select("words", "label").take(3).foreach(println)

val regexTokenized = regexTokenizer.transform(sentenceDataFrame)

regexTokenized.select("words", "label").take(3).foreach(println)

Java

[java] view
plain copy

import java.util.Arrays;

import java.util.List;

import org.apache.spark.ml.feature.RegexTokenizer;

import org.apache.spark.ml.feature.Tokenizer;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(

RowFactory.create(0, "Hi I heard about Spark"),

RowFactory.create(1, "I wish Java could use case classes"),

RowFactory.create(2, "Logistic,regression,models,are,neat")

);

StructType schema = new StructType(new StructField[]{

new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),

new StructField("sentence", DataTypes.StringType, false, Metadata.empty())

});

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

Tokenizer tokenizer = new Tokenizer().setInputCol("sentence").setOutputCol("words");

Dataset<Row> wordsDataFrame = tokenizer.transform(sentenceDataFrame);

for (Row r : wordsDataFrame.select("words", "label").takeAsList(3)) {

java.util.List<String> words = r.getList(0);

for (String word : words) System.out.print(word + " ");

System.out.println();

}

RegexTokenizer regexTokenizer = new RegexTokenizer()

.setInputCol("sentence")

.setOutputCol("words")

.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);

Python

[python] view
plain copy

from pyspark.ml.feature import Tokenizer, RegexTokenizer

sentenceDataFrame = spark.createDataFrame([

(0, "Hi I heard about Spark"),

(1, "I wish Java could use case classes"),

(2, "Logistic,regression,models,are,neat")

], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

wordsDataFrame = tokenizer.transform(sentenceDataFrame)

for words_label in wordsDataFrame.select("words", "label").take(3):

print(words_label)

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")

# alternatively, pattern="\\w+", gaps(False)

StopWordsRemover

算法介绍:



停用词为在文档中频繁出现,但未承载太多意义的词语,他们不应该被包含在算法输入中。

StopWordsRemover的输入为一系列字符串(如分词器输出),输出中删除了所有停用词。停用词表由stopWords参数提供。一些语言的默认停用词表可以通过StopWordsRemover.loadDefaultStopWords(language)调用。布尔参数caseSensitive指明是否区分大小写(默认为否)。

示例:

假设我们有如下DataFrame,有id和raw两列:

id | raw

----|----------

0 | [I,saw, the, red, baloon]

1 |[Mary, had, a, little, lamb]

通过对raw列调用StopWordsRemover,我们可以得到筛选出的结果列如下:

id | raw | filtered

----|-----------------------------|--------------------

0 | [I,saw, the, red, baloon] | [saw, red, baloon]

1 |[Mary, had, a, little, lamb]|[Mary, little, lamb]

其中,“I”, “the”, “had”以及“a”被移除。

示例调用:

Scala:

[plain] view
plain copy

import org.apache.spark.ml.feature.StopWordsRemover

val remover = new StopWordsRemover()

.setInputCol("raw")

.setOutputCol("filtered")

val dataSet = spark.createDataFrame(Seq(

(0, Seq("I", "saw", "the", "red", "baloon")),

(1, Seq("Mary", "had", "a", "little", "lamb"))

)).toDF("id", "raw")

remover.transform(dataSet).show()

Java:

[java] view
plain copy

import java.util.Arrays;

import java.util.List;

import org.apache.spark.ml.feature.StopWordsRemover;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

StopWordsRemover remover = new StopWordsRemover()

.setInputCol("raw")

.setOutputCol("filtered");

List<Row> data = Arrays.asList(

RowFactory.create(Arrays.asList("I", "saw", "the", "red", "baloon")),

RowFactory.create(Arrays.asList("Mary", "had", "a", "little", "lamb"))

);

StructType schema = new StructType(new StructField[]{

new StructField(

"raw", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())

});

Dataset<Row> dataset = spark.createDataFrame(data, schema);

remover.transform(dataset).show();

Python:

[python] view
plain copy

from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([

(0, ["I", "saw", "the", "red", "baloon"]),

(1, ["Mary", "had", "a", "little", "lamb"])

], ["label", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")

remover.transform(sentenceData).show(truncate=False)

n-gram

[b]算法介绍:

[/b]

一个n-gram是一个长度为整数n的字序列。NGram可以用来将输入转换为n-gram。

NGram的输入为一系列字符串(如分词器输出)。参数n决定每个n-gram包含的对象个数。结果包含一系列n-gram,其中每个n-gram代表一个空格分割的n个连续字符。如果输入少于n个字符串,将没有输出结果。

示例调用:

Scala:

[plain] view
plain copy

import org.apache.spark.ml.feature.NGram

val wordDataFrame = spark.createDataFrame(Seq(

(0, Array("Hi", "I", "heard", "about", "Spark")),

(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),

(2, Array("Logistic", "regression", "models", "are", "neat"))

)).toDF("label", "words")

val ngram = new NGram().setInputCol("words").setOutputCol("ngrams")

val ngramDataFrame = ngram.transform(wordDataFrame)

ngramDataFrame.take(3).map(_.getAs[Stream[String]]("ngrams").toList).foreach(println)

Java:

[java] view
plain copy

import java.util.Arrays;

import java.util.List;

import org.apache.spark.ml.feature.NGram;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(

RowFactory.create(0.0, Arrays.asList("Hi", "I", "heard", "about", "Spark")),

RowFactory.create(1.0, Arrays.asList("I", "wish", "Java", "could", "use", "case", "classes")),

RowFactory.create(2.0, Arrays.asList("Logistic", "regression", "models", "are", "neat"))

);

StructType schema = new StructType(new StructField[]{

new StructField("label", DataTypes.DoubleType, false, Metadata.empty()),

new StructField(

"words", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())

});

Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);

NGram ngramTransformer = new NGram().setInputCol("words").setOutputCol("ngrams");

Dataset<Row> ngramDataFrame = ngramTransformer.transform(wordDataFrame);

for (Row r : ngramDataFrame.select("ngrams", "label").takeAsList(3)) {

java.util.List<String> ngrams = r.getList(0);

for (String ngram : ngrams) System.out.print(ngram + " --- ");

System.out.println();

}

Python:

[python] view
plain copy

from pyspark.ml.feature import NGram

wordDataFrame = spark.createDataFrame([

(0, ["Hi", "I", "heard", "about", "Spark"]),

(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),

(2, ["Logistic", "regression", "models", "are", "neat"])

], ["label", "words"])

ngram = NGram(inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)

for ngrams_label in ngramDataFrame.select("ngrams", "label").take(3):

print(ngrams_label)

Binarizer

算法介绍:

二值化是根据阀值将连续数值特征转换为0-1特征的过程。

Binarizer参数有输入、输出以及阀值。特征值大于阀值将映射为1.0,特征值小于等于阀值将映射为0.0。

示例调用:

Scala:

[plain] view
plain copy

import org.apache.spark.ml.feature.Binarizer

val data = Array((0, 0.1), (1, 0.8), (2, 0.2))

val dataFrame = spark.createDataFrame(data).toDF("label", "feature")

val binarizer: Binarizer = new Binarizer()

.setInputCol("feature")

.setOutputCol("binarized_feature")

.setThreshold(0.5)

val binarizedDataFrame = binarizer.transform(dataFrame)

val binarizedFeatures = binarizedDataFrame.select("binarized_feature")

binarizedFeatures.collect().foreach(println)

Java:

[java] view
plain copy

import java.util.Arrays;

import java.util.List;

import org.apache.spark.ml.feature.Binarizer;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(

RowFactory.create(0, 0.1),

RowFactory.create(1, 0.8),

RowFactory.create(2, 0.2)

);

StructType schema = new StructType(new StructField[]{

new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),

new StructField("feature", DataTypes.DoubleType, false, Metadata.empty())

});

Dataset<Row> continuousDataFrame = spark.createDataFrame(data, schema);

Binarizer binarizer = new Binarizer()

.setInputCol("feature")

.setOutputCol("binarized_feature")

.setThreshold(0.5);

Dataset<Row> binarizedDataFrame = binarizer.transform(continuousDataFrame);

Dataset<Row> binarizedFeatures = binarizedDataFrame.select("binarized_feature");

for (Row r : binarizedFeatures.collectAsList()) {

Double binarized_value = r.getDouble(0);

System.out.println(binarized_value);

}

Python:

[python] view
plain copy

from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([

(0, 0.1),

(1, 0.8),

(2, 0.2)

], ["label", "feature"])

binarizer = Binarizer(threshold=0.5, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

binarizedFeatures = binarizedDataFrame.select("binarized_feature")

for binarized_feature, in binarizedFeatures.collect():

print(binarized_feature)

PCA

算法介绍:

主成分分析是一种统计学方法,它使用正交转换从一系列可能相关的变量中提取线性无关变量集,提取出的变量集中的元素称为主成分。使用PCA方法可以对变量集合进行降维。下面的示例将会展示如何将5维特征向量转换为3维主成分向量。

示例调用:

Scala:

[plain] view
plain copy

import org.apache.spark.ml.feature.PCA

import org.apache.spark.ml.linalg.Vectors

val data = Array(

Vectors.sparse(5, Seq((1, 1.0), (3, 7.0))),

Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0),

Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0)

)

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val pca = new PCA()

.setInputCol("features")

.setOutputCol("pcaFeatures")

.setK(3)

.fit(df)

val pcaDF = pca.transform(df)

val result = pcaDF.select("pcaFeatures")

result.show()

Java:

[java] view
plain copy

import java.util.Arrays;

import java.util.List;

import org.apache.spark.ml.feature.PCA;

import org.apache.spark.ml.feature.PCAModel;

import org.apache.spark.ml.linalg.VectorUDT;

import org.apache.spark.ml.linalg.Vectors;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(

RowFactory.create(Vectors.sparse(5, new int[]{1, 3}, new double[]{1.0, 7.0})),

RowFactory.create(Vectors.dense(2.0, 0.0, 3.0, 4.0, 5.0)),

RowFactory.create(Vectors.dense(4.0, 0.0, 0.0, 6.0, 7.0))

);

StructType schema = new StructType(new StructField[]{

new StructField("features", new VectorUDT(), false, Metadata.empty()),

});

Dataset<Row> df = spark.createDataFrame(data, schema);

PCAModel pca = new PCA()

.setInputCol("features")

.setOutputCol("pcaFeatures")

.setK(3)

.fit(df);

Dataset<Row> result = pca.transform(df).select("pcaFeatures");

result.show();

Python:

[python] view
plain copy

from pyspark.ml.feature import PCA

from pyspark.ml.linalg import Vectors

data = [(Vectors.sparse(5, [(1, 1.0), (3, 7.0)]),),

(Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),

(Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]

df = spark.createDataFrame(data, ["features"])

pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")

model = pca.fit(df)

result = model.transform(df).select("pcaFeatures")

result.show(truncate=False)

PolynomialExpansion

算法介绍:

多项式扩展通过产生n维组合将原始特征将特征扩展到多项式空间。下面的示例会介绍如何将你的特征集拓展到3维多项式空间。

示例调用:

Scala:

[plain] view
plain copy

import org.apache.spark.ml.feature.PolynomialExpansion

import org.apache.spark.ml.linalg.Vectors

val data = Array(

Vectors.dense(-2.0, 2.3),

Vectors.dense(0.0, 0.0),

Vectors.dense(0.6, -1.1)

)

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val polynomialExpansion = new PolynomialExpansion()

.setInputCol("features")

.setOutputCol("polyFeatures")

.setDegree(3)

val polyDF = polynomialExpansion.transform(df)

polyDF.select("polyFeatures").take(3).foreach(println)

Java:

[java] view
plain copy

import java.util.Arrays;

import java.util.List;

import org.apache.spark.ml.feature.PolynomialExpansion;

import org.apache.spark.ml.linalg.VectorUDT;

import org.apache.spark.ml.linalg.Vectors;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

PolynomialExpansion polyExpansion = new PolynomialExpansion()

.setInputCol("features")

.setOutputCol("polyFeatures")

.setDegree(3);

List<Row> data = Arrays.asList(

RowFactory.create(Vectors.dense(-2.0, 2.3)),

RowFactory.create(Vectors.dense(0.0, 0.0)),

RowFactory.create(Vectors.dense(0.6, -1.1))

);

StructType schema = new StructType(new StructField[]{

new StructField("features", new VectorUDT(), false, Metadata.empty()),

});

Dataset<Row> df = spark.createDataFrame(data, schema);

Dataset<Row> polyDF = polyExpansion.transform(df);

List<Row> rows = polyDF.select("polyFeatures").takeAsList(3);

for (Row r : rows) {

System.out.println(r.get(0));

}

Python:

[python] view
plain copy

from pyspark.ml.feature import PolynomialExpansion

from pyspark.ml.linalg import Vectors

df = spark\

.createDataFrame([(Vectors.dense([-2.0, 2.3]),),

(Vectors.dense([0.0, 0.0]),),

(Vectors.dense([0.6, -1.1]),)],

["features"])

px = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")

polyDF = px.transform(df)

for expanded in polyDF.select("polyFeatures").take(3):

print(expanded)

Discrete Cosine Transform(DCT)

算法介绍:

离散余弦变换是与傅里叶变换相关的一种变换,它类似于离散傅立叶变换但是只使用实数。离散余弦变换相当于一个长度大概是它两倍的离散傅里叶变换,这个离散傅里叶变换是对一个实偶函数进行的(因为一个实偶函数的傅里叶变换仍然是一个实偶函数)。离散余弦变换,经常被信号处理和图像处理使用,用于对信号和图像(包括静止图像和运动图像)进行有损数据压缩。

示例调用:

Scala:

[plain] view
plain copy

import org.apache.spark.ml.feature.DCT

import org.apache.spark.ml.linalg.Vectors

val data = Seq(

Vectors.dense(0.0, 1.0, -2.0, 3.0),

Vectors.dense(-1.0, 2.0, 4.0, -7.0),

Vectors.dense(14.0, -2.0, -5.0, 1.0))

val df = spark.createDataFrame(data.map(Tuple1.apply)).toDF("features")

val dct = new DCT()

.setInputCol("features")

.setOutputCol("featuresDCT")

.setInverse(false)

val dctDf = dct.transform(df)

dctDf.select("featuresDCT").show(3)

Java:

[java] view
plain copy

import java.util.Arrays;

import java.util.List;

import org.apache.spark.ml.feature.DCT;

import org.apache.spark.ml.linalg.VectorUDT;

import org.apache.spark.ml.linalg.Vectors;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(

RowFactory.create(Vectors.dense(0.0, 1.0, -2.0, 3.0)),

RowFactory.create(Vectors.dense(-1.0, 2.0, 4.0, -7.0)),

RowFactory.create(Vectors.dense(14.0, -2.0, -5.0, 1.0))

);

StructType schema = new StructType(new StructField[]{

new StructField("features", new VectorUDT(), false, Metadata.empty()),

});

Dataset<Row> df = spark.createDataFrame(data, schema);

DCT dct = new DCT()

.setInputCol("features")

.setOutputCol("featuresDCT")

.setInverse(false);

Dataset<Row> dctDf = dct.transform(df);

dctDf.select("featuresDCT").show(3);

Python:

[python] view
plain copy

from pyspark.ml.feature import DCT

from pyspark.ml.linalg import Vectors

df = spark.createDataFrame([

(Vectors.dense([0.0, 1.0, -2.0, 3.0]),),

(Vectors.dense([-1.0, 2.0, 4.0, -7.0]),),

(Vectors.dense([14.0, -2.0, -5.0, 1.0]),)], ["features"])

dct = DCT(inverse=False, inputCol="features", outputCol="featuresDCT")

dctDf = dct.transform(df)

for dcts in dctDf.select("featuresDCT").take(3):

print(dcts)

STringindexer

算法介绍:

StringIndexer将字符串标签编码为标签指标。指标取值范围为[0,numLabels],按照标签出现频率排序,所以出现最频繁的标签其指标为0。如果输入列为数值型,我们先将之映射到字符串然后再对字符串的值进行指标。如果下游的管道节点需要使用字符串-指标标签,则必须将输入和钻还为字符串-指标列名。

示例:

假设我们有DataFrame数据含有id和category两列:

id | category

----|----------

0 | a

1 | b

2 | c

3 | a

4 | a

5 | c

category是有3种取值的字符串列,使用StringIndexer进行转换后我们可以得到如下输出:

id | category |categoryIndex

----|----------|---------------

0 |a | 0.0

1 |b | 2.0

2 |c | 1.0

3 |a | 0.0

4 |a | 0.0

5 |c | 1.0

另外,如果在转换新数据时出现了在训练中未出现的标签,StringIndexer将会报错(默认值)或者跳过未出现的标签实例。

示例调用:

Scala:

[plain] view
plain copy

import org.apache.spark.ml.feature.StringIndexer

val df = spark.createDataFrame(

Seq((0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c"))

).toDF("id", "category")

val indexer = new StringIndexer()

.setInputCol("category")

.setOutputCol("categoryIndex")

val indexed = indexer.fit(df).transform(df)

indexed.show()

Java:

[java] view
plain copy

import java.util.Arrays;

import java.util.List;

import org.apache.spark.ml.feature.StringIndexer;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

import static org.apache.spark.sql.types.DataTypes.*;

List<Row> data = Arrays.asList(

RowFactory.create(0, "a"),

RowFactory.create(1, "b"),

RowFactory.create(2, "c"),

RowFactory.create(3, "a"),

RowFactory.create(4, "a"),

RowFactory.create(5, "c")

);

StructType schema = new StructType(new StructField[]{

createStructField("id", IntegerType, false),

createStructField("category", StringType, false)

});

Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexer indexer = new StringIndexer()

.setInputCol("category")

.setOutputCol("categoryIndex");

Dataset<Row> indexed = indexer.fit(df).transform(df);

indexed.show();

Python:

[python] view
plain copy

from pyspark.ml.feature import StringIndexer

df = spark.createDataFrame(

[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],

["id", "category"])

indexer = StringIndexer(inputCol="category", outputCol="categoryIndex")

indexed = indexer.fit(df).transform(df)

indexed.show()

IndexToString

算法介绍:

与StringIndexer对应,IndexToString将指标标签映射回原始字符串标签。一个常用的场景是先通过StringIndexer产生指标标签,然后使用指标标签进行训练,最后再对预测结果使用IndexToString来获取其原始的标签字符串。

示例:

假设我们有如下的DataFrame包含id和categoryIndex两列:

id | categoryIndex

----|---------------

0 | 0.0

1 | 2.0

2 | 1.0

3 | 0.0

4 | 0.0

5 | 1.0

使用originalCategory我们可以获取其原始的标签字符串如下:

id | categoryIndex| originalCategory

----|---------------|-----------------

0 |0.0 | a

1 |2.0 | b

2 |1.0 | c

3 |0.0 | a

4 |0.0 | a

5 |1.0 | c

示例调用:

Scala:

[plain] view
plain copy

import org.apache.spark.ml.feature.{IndexToString, StringIndexer}

val df = spark.createDataFrame(Seq(

(0, "a"),

(1, "b"),

(2, "c"),

(3, "a"),

(4, "a"),

(5, "c")

)).toDF("id", "category")

val indexer = new StringIndexer()

.setInputCol("category")

.setOutputCol("categoryIndex")

.fit(df)

val indexed = indexer.transform(df)

val converter = new IndexToString()

.setInputCol("categoryIndex")

.setOutputCol("originalCategory")

val converted = converter.transform(indexed)

converted.select("id", "originalCategory").show()

Java:

[java] view
plain copy

import java.util.Arrays;

import java.util.List;

import org.apache.spark.ml.feature.IndexToString;

import org.apache.spark.ml.feature.StringIndexer;

import org.apache.spark.ml.feature.StringIndexerModel;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(

RowFactory.create(0, "a"),

RowFactory.create(1, "b"),

RowFactory.create(2, "c"),

RowFactory.create(3, "a"),

RowFactory.create(4, "a"),

RowFactory.create(5, "c")

);

StructType schema = new StructType(new StructField[]{

new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),

new StructField("category", DataTypes.StringType, false, Metadata.empty())

});

Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexerModel indexer = new StringIndexer()

.setInputCol("category")

.setOutputCol("categoryIndex")

.fit(df);

Dataset<Row> indexed = indexer.transform(df);

IndexToString converter = new IndexToString()

.setInputCol("categoryIndex")

.setOutputCol("originalCategory");

Dataset<Row> converted = converter.transform(indexed);

converted.select("id", "originalCategory").show();

Python:

[python] view
plain copy

from pyspark.ml.feature import IndexToString, StringIndexer

df = spark.createDataFrame(

[(0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c")],

["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")

model = stringIndexer.fit(df)

indexed = model.transform(df)

converter = IndexToString(inputCol="categoryIndex", outputCol="originalCategory")

converted = converter.transform(indexed)

converted.select("id", "originalCategory").show()

OneHotEncoder

算法介绍:

独热编码将标签指标映射为二值向量,其中最多一个单值。这种编码被用于将种类特征使用到需要连续特征的算法,如逻辑回归等。

示例调用:

Scala:

[plain] view
plain copy

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}

val df = spark.createDataFrame(Seq(

(0, "a"),

(1, "b"),

(2, "c"),

(3, "a"),

(4, "a"),

(5, "c")

)).toDF("id", "category")

val indexer = new StringIndexer()

.setInputCol("category")

.setOutputCol("categoryIndex")

.fit(df)

val indexed = indexer.transform(df)

val encoder = new OneHotEncoder()

.setInputCol("categoryIndex")

.setOutputCol("categoryVec")

val encoded = encoder.transform(indexed)

encoded.select("id", "categoryVec").show()

Java:

[java] view
plain copy

import java.util.Arrays;

import java.util.List;

import org.apache.spark.ml.feature.OneHotEncoder;

import org.apache.spark.ml.feature.StringIndexer;

import org.apache.spark.ml.feature.StringIndexerModel;

import org.apache.spark.sql.Dataset;

import org.apache.spark.sql.Row;

import org.apache.spark.sql.RowFactory;

import org.apache.spark.sql.types.DataTypes;

import org.apache.spark.sql.types.Metadata;

import org.apache.spark.sql.types.StructField;

import org.apache.spark.sql.types.StructType;

List<Row> data = Arrays.asList(

RowFactory.create(0, "a"),

RowFactory.create(1, "b"),

RowFactory.create(2, "c"),

RowFactory.create(3, "a"),

RowFactory.create(4, "a"),

RowFactory.create(5, "c")

);

StructType schema = new StructType(new StructField[]{

new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),

new StructField("category", DataTypes.StringType, false, Metadata.empty())

});

Dataset<Row> df = spark.createDataFrame(data, schema);

StringIndexerModel indexer = new StringIndexer()

.setInputCol("category")

.setOutputCol("categoryIndex")

.fit(df);

Dataset<Row> indexed = indexer.transform(df);

OneHotEncoder encoder = new OneHotEncoder()

.setInputCol("categoryIndex")

.setOutputCol("categoryVec");

Dataset<Row> encoded = encoder.transform(indexed);

encoded.select("id", "categoryVec").show();

Python:

[python] view
plain copy

from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = spark.createDataFrame([

(0, "a"),

(1, "b"),

(2, "c"),

(3, "a"),

(4, "a"),

(5, "c")

], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")

model = stringIndexer.fit(df)

indexed = model.transform(df)

encoder = OneHotEncoder(dropLast=False, inputCol="categoryIndex", outputCol="categoryVec")

encoded = encoder.transform(indexed)

encoded.select("id", "categoryVec").show()
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐