您的位置:首页 > 运维架构 > Shell

启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计

2017-07-06 00:18 916 查看

1.启动Spark Shell

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。要注意的是要启动Spark-Shell需要先启动Spark-ha集群,Spark集群安装和部署参考:http://blog.csdn.net/tototuzuoquan/article/details/74481570

1.2.1、启动spark shell

启动方式一:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# pwd
/home/tuzq/software/spark-2.1.1-bin-hadoop2.7
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077


通过使用–master指定master的地址,连接的是启动着的那个master

同样,还可以指定执行的内存数和总的核心数

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077 --executor-memory 2g --total-executor-cores 2


参数说明:

–master spark://hadoop:7077 指定Master的地址

–executor-memory 2g 指定每个worker可用内存为2G

–total-executor-cores 2 指定整个集群使用的cup核数为2个

注意:

如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。

Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可

1.2.2、在spark shell中编写WordCount程序

1.首先启动hdfs

2.向hdfs上传一个文件到hdfs(hdfs://mycluster/wordcount/input/2.txt)

效果图下:



如果通过带有协议的方式访问hadoop集群上的文件可以通过下面的方式:

[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/
Found 2 items
drwx-wx-wx   - root supergroup          0 2017-07-06 11:11 hdfs://mycluster/tmp
drwxr-xr-x   - root supergroup          0 2017-07-06 11:16 hdfs://mycluster/wordcount
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/input
Found 9 items
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/1.txt
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/2.txt
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/3.txt
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/4.txt
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/5.txt
-rw-r--r--   3 root supergroup   27209520 2017-07-06 11:16 hdfs://mycluster/wordcount/input/a.txt
-rw-r--r--   3 root supergroup   27209520 2017-07-06 11:16 hdfs://mycluster/wordcount/input/aaa.txt
-rw-r--r--   3 root supergroup   27787264 2017-07-06 11:16 hdfs://mycluster/wordcount/input/b.txt
-rw-r--r--   3 root supergroup   26738688 2017-07-06 11:16 hdfs://mycluster/wordcount/input/c.txt
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/input/2.txt
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/2.txt
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -cat hdfs://mycluster/wordcount/input/2.txt
Collecting and analysis base data for big data analysis;Maintenance Hadoop platform
Development Hadoop framework
Cooperate with data scientist, verify and implement data models to realize automatic and accurate fraud detection, in order to improve the risk management level of E-commerce/payment platforms
Analyze information acquired and compare solutions and weight them against the actual needs, provide root cause analysis affecting key business problems
Play an active role in company's anti-fraud platform strategy
Support related data analysis work, and provide valuable business reports[root@hadoop2 hadoop-2.8.0]#


3.在spark shell中用scala语言编写spark程序

scala> sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://mycluster/wordcount/output")


1.使用hdfs命令查看结果

[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/output
Found 3 items
-rw-r--r--   3 root supergroup          0 2017-07-06 11:48 hdfs://mycluster/wordcount/output/_SUCCESS
-rw-r--r--   3 root supergroup        400 2017-07-06 11:48 hdfs://mycluster/wordcount/output/part-00000
-rw-r--r--   3 root supergroup        346 2017-07-06 11:48 hdfs://mycluster/wordcount/output/part-00001
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -cat hdfs://mycluster/wordcount/output/part-00000
(role,1)
(Play,1)
(fraud,1)
(level,1)
(business,2)
(improve,1)
(platforms,1)
(order,1)
(big,1)
(with,1)
(scientist,,1)
(active,1)
(valuable,1)
(data,5)
(information,1)
(Cooperate,1)
(Collecting,1)
(framework,1)
(E-commerce/payment,1)
(acquired,1)
(root,1)
(accurate,1)
(solutions,1)
(analysis;Maintenance,1)
(problems,1)
(them,1)
(Analyze,1)
(models,1)
(analysis,3)
(realize,1)
(actual,1)
(weight,1)
[root@hadoop2 hadoop-2.8.0]#


说明:

sc是SparkContext对象,该对象时提交spark程序的入口

sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)是从hdfs中读取数据

flatMap(_.split(” “))先map在压平

map((_,1))将单词和1构成元组

reduceByKey(+)按照key进行reduce,并将value累加

saveAsTextFile(“hdfs://mycluster/wordcount/output”)将结果写入到hdfs中

将wordCound的结果排序,并显示的代码:

scala> sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res2: Array[(String, Int)] = Array((and,6), (data,5), (analysis,3), (business,2), (to,2), (platform,2), (in,2), (provide,2), (the,2), (Hadoop,2), (compare,1), (risk,1), (anti-fraud,1), (key,1), (related,1), (base,1), (Support,1), (against,1), (automatic,1), (company's,1), (needs,,1), (implement,1), (affecting,1), (strategy,1), (of,1), (reports,1), (management,1), (detection,,1), (for,1), (work,,1), (cause,1), (an,1), (verify,1), (Development,1), (role,1), (Play,1), (fraud,1), (level,1), (improve,1), (platforms,1), (order,1), (big,1), (with,1), (scientist,,1), (active,1), (valuable,1), (information,1), (Cooperate,1), (Collecting,1), (framework,1), (E-commerce/payment,1), (acquired,1), (root,1), (accurate,1), (solutions,1), (analysis;Maintenance,1), (problems,1), (them,1), (Analyze,1), (m...
scala>


2、idea中创建spark的maven工程

spark shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

创建Maven工程:









要注意的是,在创建好项目之后,一定要重新制定好Maven仓库所在的位置,不然可能会导致重新下载jar包:



创建好maven项目后,点击Enable Auto-Import



配置Maven的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>cn.toto.spark</groupId>
<artifactId>wordCount</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.10.6</scala.version>
<scala.compat.version>2.10</scala.compat.version>
</properties>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.2</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.2</version>
</dependency>
</dependencies>

<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.toto.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>


将src/main/java和src/test/java分别修改成src/main/scala和src/test/scala(或者创建scala的Directory),与pom.xml中的配置保持一致





或者通过如下方式:





新建一个scala class,类型为Object



编写spark程序代码:

package cn.toto.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
* Created by toto on 2017/7/6.
*/
object WordCount {

def main(args: Array[String]): Unit = {
//创建sparkconf
val conf=new SparkConf().setAppName("WordCount")
//创建sparkcontext
val sc=new SparkContext(conf)
//读取hdfs中的数据
val line:RDD[String]=sc.textFile(args(0))
//切分单词
val words:RDD[String]=line.flatMap(_.split(" "))
//将单词计算
val wordAndOne:RDD[(String,Int)]=words.map((_,1))
//分组聚合
val result:RDD[(String,Int)]=wordAndOne.reduceByKey((x,y)=>x+y)
//排序
val finalResult:RDD[(String,Int)]=result.sortBy(_._2,false)
//将数据存到HDFS中
finalResult.saveAsTextFile(args(1))
//释放资源
sc.stop()
}
}


打包:



进入工程的target目录下面,获取jar包



或者直接在IDEA的工程目录下找到:



将wordCount-1.0-SNAPSHOT.jar上传到/home/tuzq/software/sparkdata下



使用spark的jar来做单词统计

要注意的是最后的输出路径要不存在,并且运行下面的程序的时候,最好是把spark-shell给关闭了。否则可能会报错。

bin/spark-submit --master spark://hadoop1:7077,hadoop2:7077 --executor-memory 512m --total-executor-cores 6 --class cn.toto.spark.WordCount /home/tuzq/software/sparkdata/wordCount-1.0-SNAPSHOT.jar hdfs://mycluster/wordcount/input hdfs://mycluster/wordcount/out0001


运行时的状态:



查看hdfs上的结果:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# hdfs dfs -ls hdfs://mycluster/wordcount/out0002
Found 10 items
-rw-r--r--   3 root supergroup          0 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/_SUCCESS
-rw-r--r--   3 root supergroup        191 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00000
-rw-r--r--   3 root supergroup        671 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00001
-rw-r--r--   3 root supergroup        245 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00002
-rw-r--r--   3 root supergroup         31 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00003
-rw-r--r--   3 root supergroup       1096 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00004
-rw-r--r--   3 root supergroup         11 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00005
-rw-r--r--   3 root supergroup        936 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00006
-rw-r--r--   3 root supergroup        588 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00007
-rw-r--r--   3 root supergroup        609 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00008


查看其中的任何一个:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# hdfs dfs -cat hdfs://mycluster/wordcount/out0002/part-00000
(and,770752)
(is,659375)
(I,505440)
(a,468642)
(to,431857)
(in,421230)
(the,331176)
(of,272080)
(FDS,218862)
(for,213029)
(The,196569)
(true,196567)
(but,196566)
(on,193650)
(without,193649)
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]#
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: