您的位置:首页 > 其它

第8课:彻底实战详解使用IDE开发Spark程序

2016-01-24 00:13 639 查看
第8课:彻底实战详解使用IDE开发Spark程序

1.下载安装windows下的scala-2.10.4.

2.打开eclipse,新建scala project: WordCount

3.修改依赖的scala版本为2.10.x。

在Package Explorer中WordCount上点击右键,选择properities->scala Compilier,选择下图所示use peoject settings,选择Scala installation为latest 2.10 bundle(dynamic)后点击OK。



可以看到WordCount project中的scala library container中的版本变成了2.10.5,如下图:



4. 加入spark-1.6.0的jar文件依赖。

下载spark-1.6.0-bin-hadoop2.6.tgz,解压到D:\ProgramFiles目录中,将spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar 导入到eclipse中。

在eclipse中的WordCount上点击右键,选择build path->Configure Build Path,在Libraries栏选择Add External JARs...,选择spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar ,点击OK。

此时可以在WordCount Project中的Referenced Libraries中看到spark-assembly-1.6.0-hadoop2.6.0.jar,如下图所示:



5. 在src下建立spark工程包:

WrodCount project上的src上点击右键,选择new -> Package,包名为com.dt.spark。点击finish。

6. 创建scala入口类。

在com.dt.spark包上点击右键,选择new->scala class。类名为com.dt.spark.WordCount,点击finish。

此时就可以编写代码了。

7. 把class变成object并编写main入口方法。代码如下:

package com.dt.spark

import org.apache.spark.SparkConf

import org.apache.spark.SparkContext

/**

* 使用scala开发本地测试的spark WordCount程序

* DT大数据梦工厂

* 新浪微博:p://weibo.com/ilovepains/

*/

object WordCount {

def main(args: Array[String]){

/*

* 第一步:创建spark的配置对象SparkConf,设置Spark程序运行时的配置信息

* 例如通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置

* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如

* 只有1内存)的初学者

*/

val conf = new SparkConf() //创建SparkConf对象。因为是全局唯一的,所以使用new,不用工厂方法模式。

conf.setAppName("Wow, My First Spark App!") //设置应用程序的名称,在程序运行的监控界面可以看到名称

conf.setMaster("local") //此时程序在本地运行,不需要安装spark集群。

/**

* 第二步:创建SparkContext对象,

* SparkContext是Spark程序所有功能的唯一入口,无论是采用scala/java/Python/R等都必须有一个SParkContext,而且默认都只有一个。

* SparkContext核心作用:初始化应用程序运行时所需要的核心组件,包括DAGScheduler,TaskScheduler,Scheduler Backend,

* 同时还会负责Spark程序往Master注册程序等。SparkContext是整个Spark应用程序中最为重要的一个对象,

*

*/

val sc = new SparkContext(conf) //通过创建SparkContext对象,通过传入SparkConf实例来定制SPark地的具体参数和配置信息。

/*

* 第三步:根据具体的数据来源(/HBase/Local FS/DB/S3等)通过SparkContext创建RDD,

* RDD创建有三种基本方式:1.根据外部数据来源(如HDFS),2.根据Scala集合,3.由其他RDD操作产生

* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴,

*/

val lines = sc.textFile("D://programFiles//spark-1.6.0-bin-hadoop2.6//README.md", 1) //假设电脑只有1G内存,一个core。读取本地文件,设置partition

//也可以写成:l lines:RDD[String] = sc.textFile 类型推断

/**

* 第4步:对初始RDD进行Transformation级别的处理。例如map/filter等高阶函数等的编程

* 来进行具体的数据计算。第4.1步:将每一行的字符串拆分成单个的单词。

*/

val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分,map每次循环一行,将每一行的小集合通过flat合并成一个大集合

/**

* 第4.2步,在单词拆分的基础上对每个单词实例 进行计数为1,也就是word => (word,1)

*/

val pairs = words.map { word => (word,1) }

/**

* 第4.3步,在每个单词实例计数为1的基础上,统计每个单词在文件中出现的总次数。

*/

val wordCounts = pairs.reduceByKey(_+_) //对相同的Key,进行Value的累计(包括Local和Reduce级别同时 Reduce)

wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))

sc.stop() //把上下文去掉,释放资源

}

}

8. 代码完成后,在代码区域点击右键,选择run as -> scala application,运行代码 ,运行结果如下:

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties

16/01/23 23:46:39 INFO SparkContext: Running Spark version 1.6.0

16/01/23 23:46:43 INFO SecurityManager: Changing view acls to: think

16/01/23 23:46:43 INFO SecurityManager: Changing modify acls to: think

16/01/23 23:46:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)

16/01/23 23:46:46 INFO Utils: Successfully started service 'sparkDriver' on port 57050.

16/01/23 23:46:48 INFO Slf4jLogger: Slf4jLogger started

16/01/23 23:46:48 INFO Remoting: Starting remoting

16/01/23 23:46:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:57063]

16/01/23 23:46:49 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 57063.

16/01/23 23:46:49 INFO SparkEnv: Registering MapOutputTracker

16/01/23 23:46:49 INFO SparkEnv: Registering BlockManagerMaster

16/01/23 23:46:49 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-48345653-dc5e-4cfc-95a3-203e0c317eda

16/01/23 23:46:50 INFO MemoryStore: MemoryStore started with capacity 1091.3 MB

16/01/23 23:46:50 INFO SparkEnv: Registering OutputCommitCoordinator

16/01/23 23:46:51 INFO Utils: Successfully started service 'SparkUI' on port 4040.

16/01/23 23:46:51 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/01/23 23:46:52 INFO Executor: Starting executor ID driver on host localhost

16/01/23 23:46:52 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57070.

16/01/23 23:46:52 INFO NettyBlockTransferService: Server created on 57070

16/01/23 23:46:52 INFO BlockManagerMaster: Trying to register BlockManager

16/01/23 23:46:52 INFO BlockManagerMasterEndpoint: Registering block manager localhost:57070 with 1091.3 MB RAM, BlockManagerId(driver, localhost, 57070)

16/01/23 23:46:52 INFO BlockManagerMaster: Registered BlockManager

16/01/23 23:46:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)

16/01/23 23:46:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)

16/01/23 23:46:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:57070 (size: 13.9 KB, free: 1091.2 MB)

16/01/23 23:46:58 INFO SparkContext: Created broadcast 0 from textFile at WordCount.scala:35

16/01/23 23:47:00 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:0:5efe:c0a8:3801%net8, but we couldn't find any external IP address!

16/01/23 23:47:03 INFO FileInputFormat: Total input paths to process : 1

16/01/23 23:47:03 INFO SparkContext: Starting job: foreach at WordCount.scala:50

16/01/23 23:47:03 INFO DAGScheduler: Registering RDD 3 (map at WordCount.scala:45)

16/01/23 23:47:03 INFO DAGScheduler: Got job 0 (foreach at WordCount.scala:50) with 1 output partitions

16/01/23 23:47:03 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at WordCount.scala:50)

16/01/23 23:47:03 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)

16/01/23 23:47:03 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)

16/01/23 23:47:03 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:45), which has no missing parents

16/01/23 23:47:04 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 145.4 KB)

16/01/23 23:47:04 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 147.7 KB)

16/01/23 23:47:04 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:57070 (size: 2.3 KB, free: 1091.2 MB)

16/01/23 23:47:04 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006

16/01/23 23:47:04 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:45)

16/01/23 23:47:04 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks

16/01/23 23:47:04 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2148 bytes)

16/01/23 23:47:04 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)

16/01/23 23:47:04 INFO HadoopRDD: Input split: file:/D:/programFiles/spark-1.6.0-bin-hadoop2.6/README.md:0+3359

16/01/23 23:47:04 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id

16/01/23 23:47:04 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id

16/01/23 23:47:04 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap

16/01/23 23:47:04 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition

16/01/23 23:47:04 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id

16/01/23 23:47:05 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver

16/01/23 23:47:05 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1266 ms on localhost (1/1)

16/01/23 23:47:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool

16/01/23 23:47:05 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCount.scala:45) finished in 1.350 s

16/01/23 23:47:05 INFO DAGScheduler: looking for newly runnable stages

16/01/23 23:47:05 INFO DAGScheduler: running: Set()

16/01/23 23:47:05 INFO DAGScheduler: waiting: Set(ResultStage 1)

16/01/23 23:47:05 INFO DAGScheduler: failed: Set()

16/01/23 23:47:05 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.scala:49), which has no missing parents

16/01/23 23:47:05 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.5 KB, free 150.2 KB)

16/01/23 23:47:05 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1581.0 B, free 151.7 KB)

16/01/23 23:47:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:57070 (size: 1581.0 B, free: 1091.2 MB)

16/01/23 23:47:05 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006

16/01/23 23:47:05 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.scala:49)

16/01/23 23:47:05 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks

16/01/23 23:47:05 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1894 bytes)

16/01/23 23:47:05 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)

16/01/23 23:47:05 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks

16/01/23 23:47:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 28 ms

package : 1

For : 2

Programs : 1

processing. : 1

Because : 1

The : 1

cluster. : 1

its : 1

[run : 1

APIs : 1

have : 1

Try : 1

computation : 1

through : 1

several : 1

This : 2

graph : 1

Hive : 2

storage : 1

["Specifying : 1

To : 2

page](http://spark.apache.org/documentation.html) : 1

Once : 1

"yarn" : 1

prefer : 1

SparkPi : 2

engine : 1

version : 1

file : 1

documentation, : 1

processing, : 1

the : 21

are : 1

systems. : 1

params : 1

not : 1

different : 1

refer : 2

Interactive : 2

R, : 1

given. : 1

if : 4

build : 3

when : 1

be : 2

Tests : 1

Apache : 1

./bin/run-example : 2

programs, : 1

including : 3

Spark. : 1

package. : 1

1000).count() : 1

Versions : 1

HDFS : 1

Data. : 1

>>> : 1

programming : 1

Testing : 1

module, : 1

Streaming : 1

environment : 1

run: : 1

clean : 1

1000: : 2

rich : 1

GraphX : 1

Please : 3

is : 6

run : 7

URL, : 1

threads. : 1

same : 1

MASTER=spark://host:7077 : 1

on : 5

built : 1

against : 1

[Apache : 1

tests : 2

examples : 2

at : 2

optimized : 1

usage : 1

using : 2

graphs : 1

talk : 1

Shell : 2

class : 2

abbreviated : 1

directory. : 1

README : 1

computing : 1

overview : 1

`examples` : 2

example: : 1

## : 8

N : 1

set : 2

use : 3

Hadoop-supported : 1

tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools). : 1

running : 1

find : 1

contains : 1

project : 1

Pi : 1

need : 1

or : 3

Big : 1

Java, : 1

high-level : 1

uses : 1

<class> : 1

Hadoop, : 2

available : 1

requires : 1

(You : 1

see : 1

Documentation : 1

of : 5

tools : 1

using: : 1

cluster : 2

must : 1

supports : 2

built, : 1

system : 1

build/mvn : 1

Hadoop : 3

this : 1

Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version) : 1

particular : 2

Python : 2

Spark : 13

general : 2

YARN, : 1

pre-built : 1

[Configuration : 1

locally : 2

library : 1

A : 1

locally. : 1

sc.parallelize(1 : 1

only : 1

Configuration : 1

following : 2

basic : 1

# : 1

changed : 1

More : 1

which : 2

learning, : 1

first : 1

./bin/pyspark : 1

also : 4

should : 2

for : 11

[params]`. : 1

documentation : 3

[project : 2

mesos:// : 1

Maven](http://maven.apache.org/). : 1

setup : 1

<http://spark.apache.org/> : 1

latest : 1

your : 1

MASTER : 1

example : 3

scala> : 1

DataFrames, : 1

provides : 1

configure : 1

distributions. : 1

can : 6

About : 1

instructions. : 1

do : 2

easiest : 1

no : 1

how : 2

`./bin/run-example : 1

Note : 1

individual : 1

spark:// : 1

It : 2

Scala : 2

Alternatively, : 1

an : 3

variable : 1

submit : 1

machine : 1

thread, : 1

them, : 1

detailed : 2

stream : 1

And : 1

distribution : 1

return : 2

Thriftserver : 1

./bin/spark-shell : 1

"local" : 1

start : 1

You : 3

Spark](#building-spark). : 1

one : 2

help : 1

with : 3

print : 1

Spark"](http://spark.apache.org/docs/latest/building-spark.html). : 1

data : 1

wiki](https://cwiki.apache.org/confluence/display/SPARK). : 1

in : 5

-DskipTests : 1

downloaded : 1

versions : 1

online : 1

Guide](http://spark.apache.org/docs/latest/configuration.html) : 1

comes : 1

[building : 1

Python, : 2

Many : 1

building : 2

Running : 1

from : 1

way : 1

Online : 1

site, : 1

other : 1

Example : 1

analysis. : 1

sc.parallelize(range(1000)).count() : 1

you : 4

runs. : 1

Building : 1

higher-level : 1

protocols : 1

guidance : 2

a : 8

guide, : 1

name : 1

fast : 1

SQL : 2

will : 1

instance: : 1

to : 14

core : 1

: 67

web : 1

"local
" : 1

programs : 2

package.) : 1

that : 2

MLlib : 1

["Building : 1

shell: : 2

Scala, : 1

and : 10

command, : 2

./dev/run-tests : 1

sample : 1

16/01/23 23:47:06 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver

16/01/23 23:47:06 INFO DAGScheduler: ResultStage 1 (foreach at WordCount.scala:50) finished in 0.687 s

16/01/23 23:47:06 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 685 ms on localhost (1/1)

16/01/23 23:47:06 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool

16/01/23 23:47:06 INFO DAGScheduler: Job 0 finished: foreach at WordCount.scala:50, took 3.201890 s

16/01/23 23:47:06 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/01/23 23:47:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!

16/01/23 23:47:06 INFO MemoryStore: MemoryStore cleared

16/01/23 23:47:06 INFO BlockManager: BlockManager stopped

16/01/23 23:47:06 INFO BlockManagerMaster: BlockManagerMaster stopped

16/01/23 23:47:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

16/01/23 23:47:06 INFO SparkContext: Successfully stopped SparkContext

16/01/23 23:47:06 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.

16/01/23 23:47:06 INFO ShutdownHookManager: Shutdown hook called

16/01/23 23:47:06 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-1b2af0f7-bd82-42d3-b7a7-e2a33b8bef07

16/01/23 23:47:06 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.

本地模式测试成功。

以上内容是王家林老师DT大数据梦工厂《 IMF传奇行动》第8课的学习笔记。

王家林:Spark、Flink、DockerAndroid技术中国区布道师。Spark亚太研究院院长和首席专家,DT大数据梦工厂创始人,Android软硬整合源码级专家,英语发音魔术师,健身狂热爱好者。

微信公众账号:DT_Spark

联系邮箱18610086859@126.com

电话:18610086859

QQ:1740415547

微信号:18610086859

新浪微博:ilovepains

王家林老师的第一个中国梦:免费为全社会培养100万名优秀的大数据从业人员!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: