您的位置:首页 > 其它

第9课:IDEA下的spark程序开发

2016-01-30 19:55 746 查看
第9课:IDEA下的spark程序开发

1.下载IntelliJ IDEA:

http://www.jetbrains.com/idea/



选择社区版,要在centos上安装,需要下载.TARGZ,解压。

进入IDEA目录下的bin目录下,./idea.sh启动(需要提前安装好java,并设定好JAVA_HOME)

windows下需要下载.exe文件后双击安装。

安装时会出现下图所示界面时点击Install scala。安装完成后如下图:



安装完IDEA后启动IDEA,新建scala project。

点击File->new Project->scala->scala->next,如下图:



4. 指定JDK1.8.x和scala2.10.4。填写Project name为WordCount,选择Project SDK为java1.8.0_45,

选择scala SDK为scala-sdk-2.10.4后点击finish。



5.File-> Project Structure来设置工程的libraries,核心是添加spark的jar。

选择Libraries后点击+号后选择java。选择java是从jvm的角度考虑的。



6.添加spark的jar依赖,选择解压后的spark-1.6.0-bin-hadoop2.6.0目录中的lib/spark-assembly-1.6.0-hadoop2.6.0.jar

导入完成后选择Modules为WordCount后点击OK。

可以看到java和scala、spark的依赖都被加入到WordCount项目中了。

7. 在WordCount项目中的src上点击右键,选择new->package,包名为com.dt.spark。在包名上点击右键,选择new->scala class,选择Kind为object,填写Name为WordCount。

8. 编写代码如下:

package com.dt.spark

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

/**

* 使用scala开发本地测试的spark WordCount程序

* DT大数据梦工厂

* 新浪微博:p://weibo.com/ilovepains/

*/

object WordCount {

def main(args: Array[String]){

/*

* 第一步:创建spark的配置对象SparkConf,设置Spark程序运行时的配置信息

* 例如通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置

* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如

* 只有1内存)的初学者

*/

val conf = new SparkConf() //创建SparkConf对象。因为是全局唯一的,所以使用new,不用工厂方法模式。

conf.setAppName("Wow, My First Spark App!") //设置应用程序的名称,在程序运行的监控界面可以看到名称

conf.setMaster("local") //此时程序在本地运行,不需要安装spark集群。

/**

* 第二步:创建SparkContext对象,

* SparkContext是Spark程序所有功能的唯一入口,无论是采用scala/java/Python/R等都必须有一个SParkContext,而且默认都只有一个。

* SparkContext核心作用:初始化应用程序运行时所需要的核心组件,包括DAGScheduler,TaskScheduler,Scheduler Backend,

* 同时还会负责Spark程序往Master注册程序等。SparkContext是整个Spark应用程序中最为重要的一个对象,

*

*/

val sc = new SparkContext(conf) //通过创建SparkContext对象,通过传入SparkConf实例来定制SPark地的具体参数和配置信息。

/*

* 第三步:根据具体的数据来源(/HBase/Local FS/DB/S3等)通过SparkContext创建RDD,

* RDD创建有三种基本方式:1.根据外部数据来源(如HDFS),2.根据Scala集合,3.由其他RDD操作产生

* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴,

*/

val lines = sc.textFile("D://programFiles//spark-1.6.0-bin-hadoop2.6//README.md", 1) //假设电脑只有1G内存,一个core。读取本地文件,设置partition

//也可以写成:l lines:RDD[String] = sc.textFile 类型推断

/**

* 第4步:对初始RDD进行Transformation级别的处理。例如map/filter等高阶函数等的编程

* 来进行具体的数据计算。第4.1步:将每一行的字符串拆分成单个的单词。

*/

val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分,map每次循环一行,将每一行的小集合通过flat合并成一个大集合

/**

* 第4.2步,在单词拆分的基础上对每个单词实例 进行计数为1,也就是word => (word,1)

*/

val pairs = words.map { word => (word,1) }

/**

* 第4.3步,在每个单词实例计数为1的基础上,统计每个单词在文件中出现的总次数。

*/

val wordCounts = pairs.reduceByKey(_+_) //对相同的Key,进行Value的累计(包括Local和Reduce级别同时 Reduce)

wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))

sc.stop() //把上下文去掉,释放资源

}

}

运行时的console结果:
"C:\Program Files\Java\jdk1.8.0_45\bin\java" -Didea.launcher.port=7532 "-Didea.launcher.bin.path=D:\programFiles\IntelliJIDEA\IntelliJ IDEA Community Edition 14.1.4\bin" -Dfile.encoding=GBK -classpath "C:\Program Files\Java\jdk1.8.0_45\jre\lib\charsets.jar;C:\Program
Files\Java\jdk1.8.0_45\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\jce.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\jfxswt.jar;C:\Program
Files\Java\jdk1.8.0_45\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\resources.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\rt.jar;C:\Program
Files\Java\jdk1.8.0_45\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\ext\cldrdata.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\ext\jfxrt.jar;C:\Program
Files\Java\jdk1.8.0_45\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\ext\nashorn.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\ext\sunmscapi.jar;C:\Program
Files\Java\jdk1.8.0_45\jre\lib\ext\sunpkcs11.jar;C:\Program Files\Java\jdk1.8.0_45\jre\lib\ext\zipfs.jar;D:\programFiles\IntelliJIDEA\WordCount\out\production\WordCount;C:\Program Files (x86)\scala\lib\scala-actors-migration.jar;C:\Program Files (x86)\scala\lib\scala-actors.jar;C:\Program
Files (x86)\scala\lib\scala-library.jar;C:\Program Files (x86)\scala\lib\scala-reflect.jar;C:\Program Files (x86)\scala\lib\scala-swing.jar;D:\programFiles\spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar;D:\programFiles\IntelliJIDEA\IntelliJ
IDEA Community Edition 14.1.4\lib\idea_rt.jar" com.intellij.rt.execution.application.AppMain com.dt.spark.WordCount

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

16/01/31 17:31:19 INFO SparkContext: Running Spark version 1.6.0

16/01/31 17:31:26 INFO SecurityManager: Changing view acls to: think

16/01/31 17:31:26 INFO SecurityManager: Changing modify acls to: think

16/01/31 17:31:26 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)

16/01/31 17:31:30 INFO Utils: Successfully started service 'sparkDriver' on port 65482.

16/01/31 17:31:32 INFO Slf4jLogger: Slf4jLogger started

16/01/31 17:31:32 INFO Remoting: Starting remoting

16/01/31 17:31:33 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:65495]

16/01/31 17:31:33 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 65495.

16/01/31 17:31:33 INFO SparkEnv: Registering MapOutputTracker

16/01/31 17:31:33 INFO SparkEnv: Registering BlockManagerMaster

16/01/31 17:31:33 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-d9a024a6-c4da-49e7-baca-d10b53f39e88

16/01/31 17:31:34 INFO MemoryStore: MemoryStore started with capacity 1091.3 MB

16/01/31 17:31:34 INFO SparkEnv: Registering OutputCommitCoordinator

16/01/31 17:31:35 INFO Utils: Successfully started service 'SparkUI' on port 4040.

16/01/31 17:31:35 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/01/31 17:31:36 INFO Executor: Starting executor ID driver on host localhost

16/01/31 17:31:36 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 65502.

16/01/31 17:31:36 INFO NettyBlockTransferService: Server created on 65502

16/01/31 17:31:36 INFO BlockManagerMaster: Trying to register BlockManager

16/01/31 17:31:36 INFO BlockManagerMasterEndpoint: Registering block manager localhost:65502 with 1091.3 MB RAM, BlockManagerId(driver, localhost, 65502)

16/01/31 17:31:36 INFO BlockManagerMaster: Registered BlockManager

16/01/31 17:31:40 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)

16/01/31 17:31:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)

16/01/31 17:31:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:65502 (size: 13.9 KB, free: 1091.2 MB)

16/01/31 17:31:41 INFO SparkContext: Created broadcast 0 from textFile at WordCount.scala:11

16/01/31 17:31:43 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:d401:a5b5:2103:6d13%eth11, but we couldn't find any external IP address!

16/01/31 17:31:44 INFO FileInputFormat: Total input paths to process : 1

16/01/31 17:31:44 INFO SparkContext: Starting job: foreach at WordCount.scala:15

16/01/31 17:31:44 INFO DAGScheduler: Registering RDD 3 (map at WordCount.scala:13)

16/01/31 17:31:44 INFO DAGScheduler: Got job 0 (foreach at WordCount.scala:15) with 1 output partitions

16/01/31 17:31:44 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at WordCount.scala:15)

16/01/31 17:31:44 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)

16/01/31 17:31:44 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)

16/01/31 17:31:44 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:13), which has no missing parents

16/01/31 17:31:45 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 145.4 KB)

16/01/31 17:31:45 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 147.7 KB)

16/01/31 17:31:45 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:65502 (size: 2.3 KB, free: 1091.2 MB)

16/01/31 17:31:45 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

16/01/31 17:31:45 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:13)

16/01/31 17:31:45 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

16/01/31 17:31:45 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2148 bytes)

16/01/31 17:31:45 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

16/01/31 17:31:45 INFO HadoopRDD: Input split: file:/D:/programFiles/spark-1.6.0-bin-hadoop2.6/README.md:0+3359

16/01/31 17:31:45 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

16/01/31 17:31:45 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

16/01/31 17:31:45 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

16/01/31 17:31:45 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

16/01/31 17:31:45 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

16/01/31 17:31:47 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver

16/01/31 17:31:47 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1593 ms on localhost (1/1)

16/01/31 17:31:47 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCount.scala:13) finished in 1.733 s

16/01/31 17:31:47 INFO DAGScheduler: looking for newly runnable stages

16/01/31 17:31:47 INFO DAGScheduler: running: Set()

16/01/31 17:31:47 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

16/01/31 17:31:47 INFO DAGScheduler: waiting: Set(ResultStage 1)

16/01/31 17:31:47 INFO DAGScheduler: failed: Set()

16/01/31 17:31:47 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.scala:14), which has no missing parents

16/01/31 17:31:47 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.5 KB, free 150.2 KB)

16/01/31 17:31:47 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1581.0 B, free 151.7 KB)

16/01/31 17:31:47 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:65502 (size: 1581.0 B, free: 1091.2 MB)

16/01/31 17:31:47 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006

16/01/31 17:31:47 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.scala:14)

16/01/31 17:31:47 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

16/01/31 17:31:47 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1894 bytes)

16/01/31 17:31:47 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)

16/01/31 17:31:47 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/01/31 17:31:47 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 25 ms

package : 1

For : 2

Programs : 1

processing. : 1

Because : 1

The : 1

cluster. : 1

its : 1

[run : 1

APIs : 1

have : 1

Try : 1

computation : 1

through : 1

several : 1

This : 2

graph : 1

Hive : 2

storage : 1

["Specifying : 1

To : 2

page](http://spark.apache.org/documentation.html) : 1

Once : 1

"yarn" : 1

prefer : 1

SparkPi : 2

engine : 1

version : 1

file : 1

documentation, : 1

processing, : 1

the : 21

are : 1

systems. : 1

params : 1

not : 1

different : 1

refer : 2

Interactive : 2

R, : 1

given. : 1

if : 4

build : 3

when : 1

be : 2

Tests : 1

Apache : 1

./bin/run-example : 2

programs, : 1

including : 3

Spark. : 1

package. : 1

1000).count() : 1

Versions : 1

HDFS : 1

Data. : 1

>>> : 1

programming : 1

Testing : 1

module, : 1

Streaming : 1

environment : 1

run: : 1

clean : 1

1000: : 2

rich : 1

GraphX : 1

Please : 3

is : 6

run : 7

URL, : 1

threads. : 1

same : 1

MASTER=spark://host:7077 : 1

on : 5

built : 1

against : 1

[Apache : 1

tests : 2

examples : 2

at : 2

optimized : 1

usage : 1

using : 2

graphs : 1

talk : 1

Shell : 2

class : 2

abbreviated : 1

directory. : 1

README : 1

computing : 1

overview : 1

`examples` : 2

example: : 1

## : 8

N : 1

set : 2

use : 3

Hadoop-supported : 1

tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools). : 1

running : 1

find : 1

contains : 1

project : 1

Pi : 1

need : 1

or : 3

Big : 1

Java, : 1

high-level : 1

uses : 1

<class> : 1

Hadoop, : 2

available : 1

requires : 1

(You : 1

see : 1

Documentation : 1

of : 5

tools : 1

using: : 1

cluster : 2

must : 1

supports : 2

built, : 1

system : 1

build/mvn : 1

Hadoop : 3

this : 1

Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version) : 1

particular : 2

Python : 2

Spark : 13

general : 2

YARN, : 1

pre-built : 1

[Configuration : 1

locally : 2

library : 1

A : 1

locally. : 1

sc.parallelize(1 : 1

only : 1

Configuration : 1

following : 2

basic : 1

# : 1

changed : 1

More : 1

which : 2

learning, : 1

first : 1

./bin/pyspark : 1

also : 4

should : 2

for : 11

[params]`. : 1

documentation : 3

[project : 2

mesos:// : 1

Maven](http://maven.apache.org/). : 1

setup : 1

<http://spark.apache.org/> : 1

latest : 1

your : 1

MASTER : 1

example : 3

scala> : 1

DataFrames, : 1

provides : 1

configure : 1

distributions. : 1

can : 6

About : 1

instructions. : 1

do : 2

easiest : 1

no : 1

how : 2

`./bin/run-example : 1

Note : 1

individual : 1

spark:// : 1

It : 2

Scala : 2

Alternatively, : 1

an : 3

variable : 1

submit : 1

machine : 1

thread, : 1

them, : 1

detailed : 2

stream : 1

And : 1

distribution : 1

return : 2

Thriftserver : 1

./bin/spark-shell : 1

"local" : 1

start : 1

You : 3

Spark](#building-spark). : 1

one : 2

help : 1

with : 3

print : 1

Spark"](http://spark.apache.org/docs/latest/building-spark.html). : 1

data : 1

wiki](https://cwiki.apache.org/confluence/display/SPARK). : 1

in : 5

-DskipTests : 1

downloaded : 1

versions : 1

online : 1

Guide](http://spark.apache.org/docs/latest/configuration.html) : 1

comes : 1

[building : 1

Python, : 2

Many : 1

building : 2

Running : 1

from : 1

way : 1

Online : 1

site, : 1

other : 1

Example : 1

analysis. : 1

sc.parallelize(range(1000)).count() : 1

you : 4

runs. : 1

Building : 1

higher-level : 1

protocols : 1

guidance : 2

a : 8

guide, : 1

name : 1

fast : 1

SQL : 2

will : 1

instance: : 1

to : 14

core : 1

: 67

web : 1

"local
" : 1

programs : 2

package.) : 1

that : 2

MLlib : 1

["Building : 1

shell: : 2

Scala, : 1

and : 10

command, : 2

./dev/run-tests : 1

sample : 1

16/01/31 17:31:47 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver

16/01/31 17:31:47 INFO DAGScheduler: ResultStage 1 (foreach at WordCount.scala:15) finished in 0.597 s

16/01/31 17:31:47 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 596 ms on localhost (1/1)

16/01/31 17:31:47 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

16/01/31 17:31:48 INFO DAGScheduler: Job 0 finished: foreach at WordCount.scala:15, took 3.411606 s

16/01/31 17:31:48 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/01/31 17:31:48 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/01/31 17:31:48 INFO MemoryStore: MemoryStore cleared

16/01/31 17:31:48 INFO BlockManager: BlockManager stopped

16/01/31 17:31:48 INFO BlockManagerMaster: BlockManagerMaster stopped

16/01/31 17:31:48 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/01/31 17:31:48 INFO SparkContext: Successfully stopped SparkContext

16/01/31 17:31:48 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

16/01/31 17:31:48 INFO ShutdownHookManager: Shutdown hook called

16/01/31 17:31:48 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-40913b6e-9c2a-4356-85e6-f021141cf06f

16/01/31 17:31:48 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

Process finished with exit code 0

成功运行在local 模式。

下面将WordCount运行于集群模式:

代码如下:

package com.dt.spark

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

/**

* 使用scala开发集群运行的spark WordCount程序

* DT大数据梦工厂

* 新浪微博:http://weibo.com/ilovepains/

*/

object WordCountCluster {

def main(args: Array[String]){

/*

* 第一步:创建spark的配置对象SparkConf,设置Spark程序运行时的配置信息

* 例如通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置

* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如

* 只有1内存)的初学者

*/

val conf = new SparkConf() //创建SparkConf对象。因为是全局唯一的,所以使用new,不用工厂方法模式。

conf.setAppName("Wow, My First Spark App!") //设置应用程序的名称,在程序运行的监控界面可以看到名称

// conf.setMaster("local") //此时程序在Spark集群。

/**

* 第二步:创建SparkContext对象,

* SparkContext是Spark程序所有功能的唯一入口,无论是采用scala/java/Python/R等都必须有一个SParkContext,而且默认都只有一个。

* SparkContext核心作用:初始化应用程序运行时所需要的核心组件,包括DAGScheduler,TaskScheduler,Scheduler Backend,

* 同时还会负责Spark程序往Master注册程序等。SparkContext是整个Spark应用程序中最为重要的一个对象,

*

*/

val sc = new SparkContext(conf) //通过创建SparkContext对象,通过传入SparkConf实例来定制SPark地的具体参数和配置信息。

/*

* 第三步:根据具体的数据来源(/HBase/Local FS/DB/S3等)通过SparkContext创建RDD,

* RDD创建有三种基本方式:1.根据外部数据来源(如HDFS),2.根据Scala集合,3.由其他RDD操作产生

* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴,

*/

val lines = sc.textFile("hdfs://192.168.1.121:9000/user/spark/README.md") //读取HDFS文件,并切分成不同的partitions

//也可以写成:l lines:RDD[String] = sc.textFile 类型推断

/**

* 第4步:对初始RDD进行Transformation级别的处理。例如map/filter等高阶函数等的编程

* 来进行具体的数据计算。第4.1步:将每一行的字符串拆分成单个的单词。

*/

val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分,map每次循环一行,将每一行的小集合通过flat合并成一个大集合

/**

* 第4.2步,在单词拆分的基础上对每个单词实例 进行计数为1,也就是word => (word,1)

*/

val pairs = words.map { word => (word,1) }

/**

* 第4.3步,在每个单词实例计数为1的基础上,统计每个单词在文件中出现的总次数。

*/

val wordCountsOrdered = pairs.reduceByKey(_+_).map(pair => (pair._2,pair._1)).sortByKey(false).map(pair => (pair._2,pair._1))

//对相同的Key,进行Value的累计(包括Local和Reduce级别同时 Reduce)

wordCounts.collect.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))

sc.stop() //把上下文去掉,释放资源

}

}

在IDE中不推荐直接发布到集群,原因是

1.内存和cores的限制,默认情况下spark程序的driver是在提交spark程序的机器上,如果在IDE中提交程序,那么IDE机器就必须非常强大。

2. driver要指挥集群中的workers的运行,并频繁发生通信,如果开发环境IDE和spark集群不在同一个下,就会出现任务丢失造成运行缓慢等多种不必要的问题。

3. 这是不安全的。

远程调试也需要driver和集群在同一个网络环境中。

一般在生产环境下,都需要专门的和worker配置相同且在同样的网络环境的机器作为driver。

在IDEA中打包程序。

点击File->project Structure->Atrifacts,手动打包。点击+号,后选择JAR->From modules with dependencies...


选择Main Class为com.dt.spark.WordCountCluster后点击OK。

去掉Name中的WordCount:jar中的:jar,选择Output Directory。

特别注意:一定要把scala和spark的包去掉(通过点击-号)。原因是:

1. 包会特别大

2. 实际运行时如果没有这些jar,程序会自动到系统中去找。

选择IDEA主菜单的Build->Build Artifacts->WordCount->Build,开始打包。

打包成功后将jar包拷贝到集群中通过下面命令提交集群运行。

sprak-submit --class com.dt.spark.WordCountCluster --master spark://slq1:7077 /home/richard/spark-1.6.0/class/WordCount.jar

***实际生产环境下都是将命令写成shell脚本来运行,不需要每次都写很长的命令。

以上内容是王家林老师DT大数据梦工厂《 IMF传奇行动》第9课的学习笔记。

王家林:Spark、Flink、DockerAndroid技术中国区布道师。Spark亚太研究院院长和首席专家,DT大数据梦工厂创始人,Android软硬整合源码级专家,英语发音魔术师,健身狂热爱好者。

微信公众账号:DT_Spark

联系邮箱18610086859@126.com

电话:18610086859

QQ:1740415547

微信号:18610086859

新浪微博:ilovepains
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: