您的位置:首页 > 其它

提交spark程序

2016-07-06 18:06 225 查看
在hadoop环境下执行spark程序,使用spark-submit提交jar

package com.spark.classfication;

import scala.Tuple2;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.classification.*;
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics;

import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;

import java.util.LinkedList;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.mllib.clustering.GaussianMixture;
import org.apache.spark.mllib.clustering.GaussianMixtureModel;
import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import org.apache.spark.mllib.linalg.Matrix;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
import org.apache.spark.mllib.linalg.distributed.RowMatrix;

public class Classfication {
public static void main(String[] args) {
svm();
kmeans();
pca();
gaus();
}

public static void svm() {
SparkConf conf = new SparkConf().setAppName("SVM Classifier Example");//.setMaster("local");
// SparkConf conf = new SparkConf().setAppName("Spark Pi").setMaster("spark://10.10.1.5:7070").setJars(new List("out\\artifacts\\sparkTest_jar\\sparkTest.jar"));
SparkContext sc = new SparkContext(conf);
String path = "/tmp/data/mllib/sample_libsvm_data.txt";
JavaRDD<LabeledPoint> data = MLUtils.loadLibSVMFile(sc, path).toJavaRDD();

// Split initial RDD into two... [60% training data, 40% testing data].
JavaRDD<LabeledPoint> training = data.sample(false, 0.6, 11L);
training.cache();
JavaRDD<LabeledPoint> test = data.subtract(training);

// Run training algorithm to build the model.
int numIterations = 100;
final SVMModel model = SVMWithSGD.train(training.rdd(), numIterations);

// Clear the default threshold.
model.clearThreshold();

// Compute raw scores on the test set.
JavaRDD<Tuple2<Object, Object>> scoreAndLabels = test.map(new Function<LabeledPoint, Tuple2<Object, Object>>() {
public Tuple2<Object, Object> call(LabeledPoint p) {
Double score = model.predict(p.features());
return new Tuple2<Object, Object>(score, p.label());
}
});

// Get evaluation metrics.
BinaryClassificationMetrics metrics = new BinaryClassificationMetrics(JavaRDD.toRDD(scoreAndLabels));
double auROC = metrics.areaUnderROC();

System.out.println("Area under ROC = " + auROC);
}

public static void kmeans() {
/* Kmean */
SparkConf conf = new SparkConf().setAppName("K-means Example");
JavaSparkContext sc = new JavaSparkContext(conf);
// Load and parse data
String path = "/tmp/data/model/123.txt";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Vector> parsedData = data.map(new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);
return Vectors.dense(values);
}
});
parsedData.cache();
// Cluster the data into two classes using KMeans
int numClusters = 2;
int numIterations = 20;
KMeansModel clusters = KMeans.train(parsedData.rdd(), numClusters, numIterations);
// Evaluate clustering by computing Within Set Sum of Squared Errors
double WSSSE = clusters.computeCost(parsedData.rdd());
System.out.println("Within Set Sum of Squared Errors = " + WSSSE);
for (Vector v : clusters.clusterCenters()) {
System.out.println(" " + v);
}
System.out.println(
"Prediction of (1.1, 2.1, 3.1): " + clusters.predict(Vectors.dense(new double[] { 1.1, 2.1, 3.1 })));
System.out.println("Prediction of (10.1, 9.1, 11.1): "
+ clusters.predict(Vectors.dense(new double[] { 10.1, 9.1, 11.1 })));
System.out.println("Prediction of (21.1, 17.1, 16.1): "
+ clusters.predict(Vectors.dense(new double[] { 21.1, 17.1, 16.1 })));

// Save and load model
// clusters.save(sc.sc(), "myModelPath");
// KMeansModel sameModel = KMeansModel.load(sc.sc(), "myModelPath");
}

public static void gaus() {
/** GaussianMixture **/
SparkConf conf = new SparkConf().setAppName("GaussianMixture Example");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load and parse data
String path = "/tmp/data/mllib/gmm_data.txt";
JavaRDD<String> data = sc.textFile(path);
JavaRDD<Vector> parsedData = data.map(new Function<String, Vector>() {
public Vector call(String s) {
String[] sarray = s.trim().split(" ");
double[] values = new double[sarray.length];
for (int i = 0; i < sarray.length; i++)
values[i] = Double.parseDouble(sarray[i]);
return Vectors.dense(values);
}
});
parsedData.cache();

// Cluster the data into two classes using GaussianMixture
GaussianMixtureModel gmm = new GaussianMixture().setK(2).run(parsedData.rdd());

// Save and load GaussianMixtureModel
// gmm.save(sc.sc(), "myGMMModel");
// GaussianMixtureModel sameModel = GaussianMixtureModel.load(sc.sc(),
// "myGMMModel");
// Output the parameters of the mixture model
for (int j = 0; j < gmm.k(); j++) {
System.out.printf("weight=%f\nmu=%s\nsigma=\n%s\n", gmm.weights()[j], gmm.gaussians()[j].mu(),
gmm.gaussians()[j].sigma());
}
}

public static void pca() {
SparkConf conf = new SparkConf().setAppName("PCA Example");// .setMaster("local");
SparkContext sc = new SparkContext(conf);

double[][] array = new double[][] { { 40.4, 24.7, 7.2, 6.1, 8.3, 8.7, 2.442, 20.0 },
{ 25.0, 12.7, 11.2, 11.0, 12.9, 20.2, 3.542, 9.1 }, { 13.2, 3.3, 3.9, 4.3, 4.4, 5.5, 0.578, 3.6 },
{ 22.3, 6.7, 5.6, 3.7, 6.0, 7.4, 0.176, 7.3 }, { 34.3, 11.8, 7.1, 7.1, 8.0, 8.9, 1.726, 27.5 },
{ 35.6, 12.5, 16.4, 16.7, 22.8, 29.3, 3.017, 26.6 }, { 22.0, 7.8, 9.9, 10.2, 12.6, 17.6, 0.847, 10.6 },
{ 48.4, 13.4, 10.9, 9.9, 10.9, 13.9, 1.772, 1.772 },
{ 40.6, 19.1, 19.8, 19.0, 29.7, 39.6, 2.449, 35.8 }, { 24.8, 8.0, 9.8, 8.9, 11.9, 16.2, 0.789, 13.7 },
{ 12.5, 9.7, 4.2, 4.2, 4.6, 6.5, 0.874, 3.9 }, { 1.8, 0.6, 0.7, 0.7, 0.8, 1.1, 0.056, 1.0 },
{ 32.3, 13.9, 9.4, 8.3, 9.8, 13.3, 2.126, 17.1 }, { 38.5, 9.1, 11.3, 9.5, 12.2, 16.4, 1.327, 11.6 },
{ 26.2, 10.1, 5.6, 15.6, 7.7, 30.1, 0.126, 25.9 } };

LinkedList<Vector> rowsList = new LinkedList<Vector>();
for (int i = 0; i < array.length; i++) {
Vector currentRow = Vectors.dense(array[i]);
rowsList.add(currentRow);
}
JavaRDD<Vector> rows = JavaSparkContext.fromSparkContext(sc).parallelize(rowsList);

// Create a RowMatrix from JavaRDD<Vector>.
RowMatrix mat = new RowMatrix(rows.rdd());

// Compute the top 3 principal components.
Matrix pc = mat.computePrincipalComponents(3);
RowMatrix projected = mat.multiply(pc);
System.out.println("Hello World!");
}
}

所使用的pom.xml引用包
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>AntiSpam</groupId>
<artifactId>ExtractData</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>ExtractData</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.6</version>
<scope>system</scope>
<systemPath>C:/Program Files/Java/jdk1.8.0_73/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<version>1.6.1</version>
</dependency>
</dependencies>
<!-- <build>
<resources>
<resource>
<directory>conf/</directory>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>false</filtering>
</resource>
</resources>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<appendAssemblyId>false</appendAssemblyId>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>AntiSpam.ExtractData.AntiSpam</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build> -->
</project>


mvn clean;mvn package
spark-submit  Gaus.jar
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  spark