第8课:彻底实战详解使用IDE开发Spark程序
2016-01-24 00:13
639 查看
第8课:彻底实战详解使用IDE开发Spark程序
1.下载安装windows下的scala-2.10.4.
2.打开eclipse,新建scala project: WordCount
3.修改依赖的scala版本为2.10.x。
在Package Explorer中WordCount上点击右键,选择properities->scala Compilier,选择下图所示use peoject settings,选择Scala installation为latest 2.10 bundle(dynamic)后点击OK。
可以看到WordCount project中的scala library container中的版本变成了2.10.5,如下图:
4. 加入spark-1.6.0的jar文件依赖。
下载spark-1.6.0-bin-hadoop2.6.tgz,解压到D:\ProgramFiles目录中,将spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar 导入到eclipse中。
在eclipse中的WordCount上点击右键,选择build path->Configure Build Path,在Libraries栏选择Add External JARs...,选择spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar ,点击OK。
此时可以在WordCount Project中的Referenced Libraries中看到spark-assembly-1.6.0-hadoop2.6.0.jar,如下图所示:
5. 在src下建立spark工程包:
WrodCount project上的src上点击右键,选择new -> Package,包名为com.dt.spark。点击finish。
6. 创建scala入口类。
在com.dt.spark包上点击右键,选择new->scala class。类名为com.dt.spark.WordCount,点击finish。
此时就可以编写代码了。
7. 把class变成object并编写main入口方法。代码如下:
package com.dt.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* 使用scala开发本地测试的spark WordCount程序
* DT大数据梦工厂
* 新浪微博:p://weibo.com/ilovepains/
*/
object WordCount {
def main(args: Array[String]){
/*
* 第一步:创建spark的配置对象SparkConf,设置Spark程序运行时的配置信息
* 例如通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
* 只有1内存)的初学者
*/
val conf = new SparkConf() //创建SparkConf对象。因为是全局唯一的,所以使用new,不用工厂方法模式。
conf.setAppName("Wow, My First Spark App!") //设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster("local") //此时程序在本地运行,不需要安装spark集群。
/**
* 第二步:创建SparkContext对象,
* SparkContext是Spark程序所有功能的唯一入口,无论是采用scala/java/Python/R等都必须有一个SParkContext,而且默认都只有一个。
* SparkContext核心作用:初始化应用程序运行时所需要的核心组件,包括DAGScheduler,TaskScheduler,Scheduler Backend,
* 同时还会负责Spark程序往Master注册程序等。SparkContext是整个Spark应用程序中最为重要的一个对象,
*
*/
val sc = new SparkContext(conf) //通过创建SparkContext对象,通过传入SparkConf实例来定制SPark地的具体参数和配置信息。
/*
* 第三步:根据具体的数据来源(/HBase/Local FS/DB/S3等)通过SparkContext创建RDD,
* RDD创建有三种基本方式:1.根据外部数据来源(如HDFS),2.根据Scala集合,3.由其他RDD操作产生
* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴,
*/
val lines = sc.textFile("D://programFiles//spark-1.6.0-bin-hadoop2.6//README.md", 1) //假设电脑只有1G内存,一个core。读取本地文件,设置partition
//也可以写成:l lines:RDD[String] = sc.textFile 类型推断
/**
* 第4步:对初始RDD进行Transformation级别的处理。例如map/filter等高阶函数等的编程
* 来进行具体的数据计算。第4.1步:将每一行的字符串拆分成单个的单词。
*/
val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分,map每次循环一行,将每一行的小集合通过flat合并成一个大集合
/**
* 第4.2步,在单词拆分的基础上对每个单词实例 进行计数为1,也就是word => (word,1)
*/
val pairs = words.map { word => (word,1) }
/**
* 第4.3步,在每个单词实例计数为1的基础上,统计每个单词在文件中出现的总次数。
*/
val wordCounts = pairs.reduceByKey(_+_) //对相同的Key,进行Value的累计(包括Local和Reduce级别同时 Reduce)
wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))
sc.stop() //把上下文去掉,释放资源
}
}
8. 代码完成后,在代码区域点击右键,选择run as -> scala application,运行代码 ,运行结果如下:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/01/23 23:46:39 INFO SparkContext: Running Spark version 1.6.0
16/01/23 23:46:43 INFO SecurityManager: Changing view acls to: think
16/01/23 23:46:43 INFO SecurityManager: Changing modify acls to: think
16/01/23 23:46:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)
16/01/23 23:46:46 INFO Utils: Successfully started service 'sparkDriver' on port 57050.
16/01/23 23:46:48 INFO Slf4jLogger: Slf4jLogger started
16/01/23 23:46:48 INFO Remoting: Starting remoting
16/01/23 23:46:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:57063]
16/01/23 23:46:49 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 57063.
16/01/23 23:46:49 INFO SparkEnv: Registering MapOutputTracker
16/01/23 23:46:49 INFO SparkEnv: Registering BlockManagerMaster
16/01/23 23:46:49 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-48345653-dc5e-4cfc-95a3-203e0c317eda
16/01/23 23:46:50 INFO MemoryStore: MemoryStore started with capacity 1091.3 MB
16/01/23 23:46:50 INFO SparkEnv: Registering OutputCommitCoordinator
16/01/23 23:46:51 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/01/23 23:46:51 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/01/23 23:46:52 INFO Executor: Starting executor ID driver on host localhost
16/01/23 23:46:52 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57070.
16/01/23 23:46:52 INFO NettyBlockTransferService: Server created on 57070
16/01/23 23:46:52 INFO BlockManagerMaster: Trying to register BlockManager
16/01/23 23:46:52 INFO BlockManagerMasterEndpoint: Registering block manager localhost:57070 with 1091.3 MB RAM, BlockManagerId(driver, localhost, 57070)
16/01/23 23:46:52 INFO BlockManagerMaster: Registered BlockManager
16/01/23 23:46:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)
16/01/23 23:46:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)
16/01/23 23:46:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:57070 (size: 13.9 KB, free: 1091.2 MB)
16/01/23 23:46:58 INFO SparkContext: Created broadcast 0 from textFile at WordCount.scala:35
16/01/23 23:47:00 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:0:5efe:c0a8:3801%net8, but we couldn't find any external IP address!
16/01/23 23:47:03 INFO FileInputFormat: Total input paths to process : 1
16/01/23 23:47:03 INFO SparkContext: Starting job: foreach at WordCount.scala:50
16/01/23 23:47:03 INFO DAGScheduler: Registering RDD 3 (map at WordCount.scala:45)
16/01/23 23:47:03 INFO DAGScheduler: Got job 0 (foreach at WordCount.scala:50) with 1 output partitions
16/01/23 23:47:03 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at WordCount.scala:50)
16/01/23 23:47:03 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/01/23 23:47:03 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/01/23 23:47:03 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:45), which has no missing parents
16/01/23 23:47:04 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 145.4 KB)
16/01/23 23:47:04 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 147.7 KB)
16/01/23 23:47:04 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:57070 (size: 2.3 KB, free: 1091.2 MB)
16/01/23 23:47:04 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/01/23 23:47:04 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:45)
16/01/23 23:47:04 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/01/23 23:47:04 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2148 bytes)
16/01/23 23:47:04 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/01/23 23:47:04 INFO HadoopRDD: Input split: file:/D:/programFiles/spark-1.6.0-bin-hadoop2.6/README.md:0+3359
16/01/23 23:47:04 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/01/23 23:47:04 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/01/23 23:47:04 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/01/23 23:47:04 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/01/23 23:47:04 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/01/23 23:47:05 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver
16/01/23 23:47:05 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1266 ms on localhost (1/1)
16/01/23 23:47:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/01/23 23:47:05 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCount.scala:45) finished in 1.350 s
16/01/23 23:47:05 INFO DAGScheduler: looking for newly runnable stages
16/01/23 23:47:05 INFO DAGScheduler: running: Set()
16/01/23 23:47:05 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/01/23 23:47:05 INFO DAGScheduler: failed: Set()
16/01/23 23:47:05 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.scala:49), which has no missing parents
16/01/23 23:47:05 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.5 KB, free 150.2 KB)
16/01/23 23:47:05 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1581.0 B, free 151.7 KB)
16/01/23 23:47:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:57070 (size: 1581.0 B, free: 1091.2 MB)
16/01/23 23:47:05 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/01/23 23:47:05 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.scala:49)
16/01/23 23:47:05 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/01/23 23:47:05 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1894 bytes)
16/01/23 23:47:05 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/01/23 23:47:05 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/01/23 23:47:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 28 ms
package : 1
For : 2
Programs : 1
processing. : 1
Because : 1
The : 1
cluster. : 1
its : 1
[run : 1
APIs : 1
have : 1
Try : 1
computation : 1
through : 1
several : 1
This : 2
graph : 1
Hive : 2
storage : 1
["Specifying : 1
To : 2
page](http://spark.apache.org/documentation.html) : 1
Once : 1
"yarn" : 1
prefer : 1
SparkPi : 2
engine : 1
version : 1
file : 1
documentation, : 1
processing, : 1
the : 21
are : 1
systems. : 1
params : 1
not : 1
different : 1
refer : 2
Interactive : 2
R, : 1
given. : 1
if : 4
build : 3
when : 1
be : 2
Tests : 1
Apache : 1
./bin/run-example : 2
programs, : 1
including : 3
Spark. : 1
package. : 1
1000).count() : 1
Versions : 1
HDFS : 1
Data. : 1
>>> : 1
programming : 1
Testing : 1
module, : 1
Streaming : 1
environment : 1
run: : 1
clean : 1
1000: : 2
rich : 1
GraphX : 1
Please : 3
is : 6
run : 7
URL, : 1
threads. : 1
same : 1
MASTER=spark://host:7077 : 1
on : 5
built : 1
against : 1
[Apache : 1
tests : 2
examples : 2
at : 2
optimized : 1
usage : 1
using : 2
graphs : 1
talk : 1
Shell : 2
class : 2
abbreviated : 1
directory. : 1
README : 1
computing : 1
overview : 1
`examples` : 2
example: : 1
## : 8
N : 1
set : 2
use : 3
Hadoop-supported : 1
tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools). : 1
running : 1
find : 1
contains : 1
project : 1
Pi : 1
need : 1
or : 3
Big : 1
Java, : 1
high-level : 1
uses : 1
<class> : 1
Hadoop, : 2
available : 1
requires : 1
(You : 1
see : 1
Documentation : 1
of : 5
tools : 1
using: : 1
cluster : 2
must : 1
supports : 2
built, : 1
system : 1
build/mvn : 1
Hadoop : 3
this : 1
Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version) : 1
particular : 2
Python : 2
Spark : 13
general : 2
YARN, : 1
pre-built : 1
[Configuration : 1
locally : 2
library : 1
A : 1
locally. : 1
sc.parallelize(1 : 1
only : 1
Configuration : 1
following : 2
basic : 1
# : 1
changed : 1
More : 1
which : 2
learning, : 1
first : 1
./bin/pyspark : 1
also : 4
should : 2
for : 11
[params]`. : 1
documentation : 3
[project : 2
mesos:// : 1
Maven](http://maven.apache.org/). : 1
setup : 1
<http://spark.apache.org/> : 1
latest : 1
your : 1
MASTER : 1
example : 3
scala> : 1
DataFrames, : 1
provides : 1
configure : 1
distributions. : 1
can : 6
About : 1
instructions. : 1
do : 2
easiest : 1
no : 1
how : 2
`./bin/run-example : 1
Note : 1
individual : 1
spark:// : 1
It : 2
Scala : 2
Alternatively, : 1
an : 3
variable : 1
submit : 1
machine : 1
thread, : 1
them, : 1
detailed : 2
stream : 1
And : 1
distribution : 1
return : 2
Thriftserver : 1
./bin/spark-shell : 1
"local" : 1
start : 1
You : 3
Spark](#building-spark). : 1
one : 2
help : 1
with : 3
print : 1
Spark"](http://spark.apache.org/docs/latest/building-spark.html). : 1
data : 1
wiki](https://cwiki.apache.org/confluence/display/SPARK). : 1
in : 5
-DskipTests : 1
downloaded : 1
versions : 1
online : 1
Guide](http://spark.apache.org/docs/latest/configuration.html) : 1
comes : 1
[building : 1
Python, : 2
Many : 1
building : 2
Running : 1
from : 1
way : 1
Online : 1
site, : 1
other : 1
Example : 1
analysis. : 1
sc.parallelize(range(1000)).count() : 1
you : 4
runs. : 1
Building : 1
higher-level : 1
protocols : 1
guidance : 2
a : 8
guide, : 1
name : 1
fast : 1
SQL : 2
will : 1
instance: : 1
to : 14
core : 1
: 67
web : 1
"local
" : 1
programs : 2
package.) : 1
that : 2
MLlib : 1
["Building : 1
shell: : 2
Scala, : 1
and : 10
command, : 2
./dev/run-tests : 1
sample : 1
16/01/23 23:47:06 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver
16/01/23 23:47:06 INFO DAGScheduler: ResultStage 1 (foreach at WordCount.scala:50) finished in 0.687 s
16/01/23 23:47:06 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 685 ms on localhost (1/1)
16/01/23 23:47:06 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/01/23 23:47:06 INFO DAGScheduler: Job 0 finished: foreach at WordCount.scala:50, took 3.201890 s
16/01/23 23:47:06 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/01/23 23:47:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/01/23 23:47:06 INFO MemoryStore: MemoryStore cleared
16/01/23 23:47:06 INFO BlockManager: BlockManager stopped
16/01/23 23:47:06 INFO BlockManagerMaster: BlockManagerMaster stopped
16/01/23 23:47:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/01/23 23:47:06 INFO SparkContext: Successfully stopped SparkContext
16/01/23 23:47:06 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/01/23 23:47:06 INFO ShutdownHookManager: Shutdown hook called
16/01/23 23:47:06 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-1b2af0f7-bd82-42d3-b7a7-e2a33b8bef07
16/01/23 23:47:06 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
本地模式测试成功。
以上内容是王家林老师DT大数据梦工厂《 IMF传奇行动》第8课的学习笔记。
王家林:Spark、Flink、Docker、Android技术中国区布道师。Spark亚太研究院院长和首席专家,DT大数据梦工厂创始人,Android软硬整合源码级专家,英语发音魔术师,健身狂热爱好者。
微信公众账号:DT_Spark
联系邮箱18610086859@126.com
电话:18610086859
QQ:1740415547
微信号:18610086859
新浪微博:ilovepains
王家林老师的第一个中国梦:免费为全社会培养100万名优秀的大数据从业人员!
1.下载安装windows下的scala-2.10.4.
2.打开eclipse,新建scala project: WordCount
3.修改依赖的scala版本为2.10.x。
在Package Explorer中WordCount上点击右键,选择properities->scala Compilier,选择下图所示use peoject settings,选择Scala installation为latest 2.10 bundle(dynamic)后点击OK。
可以看到WordCount project中的scala library container中的版本变成了2.10.5,如下图:
4. 加入spark-1.6.0的jar文件依赖。
下载spark-1.6.0-bin-hadoop2.6.tgz,解压到D:\ProgramFiles目录中,将spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar 导入到eclipse中。
在eclipse中的WordCount上点击右键,选择build path->Configure Build Path,在Libraries栏选择Add External JARs...,选择spark-1.6.0-bin-hadoop2.6\lib\spark-assembly-1.6.0-hadoop2.6.0.jar ,点击OK。
此时可以在WordCount Project中的Referenced Libraries中看到spark-assembly-1.6.0-hadoop2.6.0.jar,如下图所示:
5. 在src下建立spark工程包:
WrodCount project上的src上点击右键,选择new -> Package,包名为com.dt.spark。点击finish。
6. 创建scala入口类。
在com.dt.spark包上点击右键,选择new->scala class。类名为com.dt.spark.WordCount,点击finish。
此时就可以编写代码了。
7. 把class变成object并编写main入口方法。代码如下:
package com.dt.spark
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
/**
* 使用scala开发本地测试的spark WordCount程序
* DT大数据梦工厂
* 新浪微博:p://weibo.com/ilovepains/
*/
object WordCount {
def main(args: Array[String]){
/*
* 第一步:创建spark的配置对象SparkConf,设置Spark程序运行时的配置信息
* 例如通过setMaster来设置程序要链接的Spark集群的Master的URL,如果设置
* 为local,则代表Spark程序在本地运行,特别适合于机器配置条件非常差(例如
* 只有1内存)的初学者
*/
val conf = new SparkConf() //创建SparkConf对象。因为是全局唯一的,所以使用new,不用工厂方法模式。
conf.setAppName("Wow, My First Spark App!") //设置应用程序的名称,在程序运行的监控界面可以看到名称
conf.setMaster("local") //此时程序在本地运行,不需要安装spark集群。
/**
* 第二步:创建SparkContext对象,
* SparkContext是Spark程序所有功能的唯一入口,无论是采用scala/java/Python/R等都必须有一个SParkContext,而且默认都只有一个。
* SparkContext核心作用:初始化应用程序运行时所需要的核心组件,包括DAGScheduler,TaskScheduler,Scheduler Backend,
* 同时还会负责Spark程序往Master注册程序等。SparkContext是整个Spark应用程序中最为重要的一个对象,
*
*/
val sc = new SparkContext(conf) //通过创建SparkContext对象,通过传入SparkConf实例来定制SPark地的具体参数和配置信息。
/*
* 第三步:根据具体的数据来源(/HBase/Local FS/DB/S3等)通过SparkContext创建RDD,
* RDD创建有三种基本方式:1.根据外部数据来源(如HDFS),2.根据Scala集合,3.由其他RDD操作产生
* 数据会被RDD划分成为一系列的Partitions,分配到每个Partition的数据属于一个Task的处理范畴,
*/
val lines = sc.textFile("D://programFiles//spark-1.6.0-bin-hadoop2.6//README.md", 1) //假设电脑只有1G内存,一个core。读取本地文件,设置partition
//也可以写成:l lines:RDD[String] = sc.textFile 类型推断
/**
* 第4步:对初始RDD进行Transformation级别的处理。例如map/filter等高阶函数等的编程
* 来进行具体的数据计算。第4.1步:将每一行的字符串拆分成单个的单词。
*/
val words = lines.flatMap { line => line.split(" ") } //对每一行的字符串进行单词拆分,map每次循环一行,将每一行的小集合通过flat合并成一个大集合
/**
* 第4.2步,在单词拆分的基础上对每个单词实例 进行计数为1,也就是word => (word,1)
*/
val pairs = words.map { word => (word,1) }
/**
* 第4.3步,在每个单词实例计数为1的基础上,统计每个单词在文件中出现的总次数。
*/
val wordCounts = pairs.reduceByKey(_+_) //对相同的Key,进行Value的累计(包括Local和Reduce级别同时 Reduce)
wordCounts.foreach(wordNumberPair => println(wordNumberPair._1 + " : " + wordNumberPair._2))
sc.stop() //把上下文去掉,释放资源
}
}
8. 代码完成后,在代码区域点击右键,选择run as -> scala application,运行代码 ,运行结果如下:
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
16/01/23 23:46:39 INFO SparkContext: Running Spark version 1.6.0
16/01/23 23:46:43 INFO SecurityManager: Changing view acls to: think
16/01/23 23:46:43 INFO SecurityManager: Changing modify acls to: think
16/01/23 23:46:43 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(think); users with modify permissions: Set(think)
16/01/23 23:46:46 INFO Utils: Successfully started service 'sparkDriver' on port 57050.
16/01/23 23:46:48 INFO Slf4jLogger: Slf4jLogger started
16/01/23 23:46:48 INFO Remoting: Starting remoting
16/01/23 23:46:49 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.56.1:57063]
16/01/23 23:46:49 INFO Utils: Successfully started service 'sparkDriverActorSystem' on port 57063.
16/01/23 23:46:49 INFO SparkEnv: Registering MapOutputTracker
16/01/23 23:46:49 INFO SparkEnv: Registering BlockManagerMaster
16/01/23 23:46:49 INFO DiskBlockManager: Created local directory at C:\Users\think\AppData\Local\Temp\blockmgr-48345653-dc5e-4cfc-95a3-203e0c317eda
16/01/23 23:46:50 INFO MemoryStore: MemoryStore started with capacity 1091.3 MB
16/01/23 23:46:50 INFO SparkEnv: Registering OutputCommitCoordinator
16/01/23 23:46:51 INFO Utils: Successfully started service 'SparkUI' on port 4040.
16/01/23 23:46:51 INFO SparkUI: Started SparkUI at http://192.168.56.1:4040
16/01/23 23:46:52 INFO Executor: Starting executor ID driver on host localhost
16/01/23 23:46:52 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57070.
16/01/23 23:46:52 INFO NettyBlockTransferService: Server created on 57070
16/01/23 23:46:52 INFO BlockManagerMaster: Trying to register BlockManager
16/01/23 23:46:52 INFO BlockManagerMasterEndpoint: Registering block manager localhost:57070 with 1091.3 MB RAM, BlockManagerId(driver, localhost, 57070)
16/01/23 23:46:52 INFO BlockManagerMaster: Registered BlockManager
16/01/23 23:46:57 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)
16/01/23 23:46:58 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)
16/01/23 23:46:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:57070 (size: 13.9 KB, free: 1091.2 MB)
16/01/23 23:46:58 INFO SparkContext: Created broadcast 0 from textFile at WordCount.scala:35
16/01/23 23:47:00 WARN : Your hostname, think-PC resolves to a loopback/non-reachable address: fe80:0:0:0:0:5efe:c0a8:3801%net8, but we couldn't find any external IP address!
16/01/23 23:47:03 INFO FileInputFormat: Total input paths to process : 1
16/01/23 23:47:03 INFO SparkContext: Starting job: foreach at WordCount.scala:50
16/01/23 23:47:03 INFO DAGScheduler: Registering RDD 3 (map at WordCount.scala:45)
16/01/23 23:47:03 INFO DAGScheduler: Got job 0 (foreach at WordCount.scala:50) with 1 output partitions
16/01/23 23:47:03 INFO DAGScheduler: Final stage: ResultStage 1 (foreach at WordCount.scala:50)
16/01/23 23:47:03 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
16/01/23 23:47:03 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
16/01/23 23:47:03 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:45), which has no missing parents
16/01/23 23:47:04 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.1 KB, free 145.4 KB)
16/01/23 23:47:04 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 147.7 KB)
16/01/23 23:47:04 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:57070 (size: 2.3 KB, free: 1091.2 MB)
16/01/23 23:47:04 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1006
16/01/23 23:47:04 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at map at WordCount.scala:45)
16/01/23 23:47:04 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
16/01/23 23:47:04 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 2148 bytes)
16/01/23 23:47:04 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
16/01/23 23:47:04 INFO HadoopRDD: Input split: file:/D:/programFiles/spark-1.6.0-bin-hadoop2.6/README.md:0+3359
16/01/23 23:47:04 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
16/01/23 23:47:04 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
16/01/23 23:47:04 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
16/01/23 23:47:04 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
16/01/23 23:47:04 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
16/01/23 23:47:05 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver
16/01/23 23:47:05 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 1266 ms on localhost (1/1)
16/01/23 23:47:05 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
16/01/23 23:47:05 INFO DAGScheduler: ShuffleMapStage 0 (map at WordCount.scala:45) finished in 1.350 s
16/01/23 23:47:05 INFO DAGScheduler: looking for newly runnable stages
16/01/23 23:47:05 INFO DAGScheduler: running: Set()
16/01/23 23:47:05 INFO DAGScheduler: waiting: Set(ResultStage 1)
16/01/23 23:47:05 INFO DAGScheduler: failed: Set()
16/01/23 23:47:05 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.scala:49), which has no missing parents
16/01/23 23:47:05 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.5 KB, free 150.2 KB)
16/01/23 23:47:05 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1581.0 B, free 151.7 KB)
16/01/23 23:47:05 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:57070 (size: 1581.0 B, free: 1091.2 MB)
16/01/23 23:47:05 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1006
16/01/23 23:47:05 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCount.scala:49)
16/01/23 23:47:05 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
16/01/23 23:47:05 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, partition 0,NODE_LOCAL, 1894 bytes)
16/01/23 23:47:05 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
16/01/23 23:47:05 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
16/01/23 23:47:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 28 ms
package : 1
For : 2
Programs : 1
processing. : 1
Because : 1
The : 1
cluster. : 1
its : 1
[run : 1
APIs : 1
have : 1
Try : 1
computation : 1
through : 1
several : 1
This : 2
graph : 1
Hive : 2
storage : 1
["Specifying : 1
To : 2
page](http://spark.apache.org/documentation.html) : 1
Once : 1
"yarn" : 1
prefer : 1
SparkPi : 2
engine : 1
version : 1
file : 1
documentation, : 1
processing, : 1
the : 21
are : 1
systems. : 1
params : 1
not : 1
different : 1
refer : 2
Interactive : 2
R, : 1
given. : 1
if : 4
build : 3
when : 1
be : 2
Tests : 1
Apache : 1
./bin/run-example : 2
programs, : 1
including : 3
Spark. : 1
package. : 1
1000).count() : 1
Versions : 1
HDFS : 1
Data. : 1
>>> : 1
programming : 1
Testing : 1
module, : 1
Streaming : 1
environment : 1
run: : 1
clean : 1
1000: : 2
rich : 1
GraphX : 1
Please : 3
is : 6
run : 7
URL, : 1
threads. : 1
same : 1
MASTER=spark://host:7077 : 1
on : 5
built : 1
against : 1
[Apache : 1
tests : 2
examples : 2
at : 2
optimized : 1
usage : 1
using : 2
graphs : 1
talk : 1
Shell : 2
class : 2
abbreviated : 1
directory. : 1
README : 1
computing : 1
overview : 1
`examples` : 2
example: : 1
## : 8
N : 1
set : 2
use : 3
Hadoop-supported : 1
tests](https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools). : 1
running : 1
find : 1
contains : 1
project : 1
Pi : 1
need : 1
or : 3
Big : 1
Java, : 1
high-level : 1
uses : 1
<class> : 1
Hadoop, : 2
available : 1
requires : 1
(You : 1
see : 1
Documentation : 1
of : 5
tools : 1
using: : 1
cluster : 2
must : 1
supports : 2
built, : 1
system : 1
build/mvn : 1
Hadoop : 3
this : 1
Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version) : 1
particular : 2
Python : 2
Spark : 13
general : 2
YARN, : 1
pre-built : 1
[Configuration : 1
locally : 2
library : 1
A : 1
locally. : 1
sc.parallelize(1 : 1
only : 1
Configuration : 1
following : 2
basic : 1
# : 1
changed : 1
More : 1
which : 2
learning, : 1
first : 1
./bin/pyspark : 1
also : 4
should : 2
for : 11
[params]`. : 1
documentation : 3
[project : 2
mesos:// : 1
Maven](http://maven.apache.org/). : 1
setup : 1
<http://spark.apache.org/> : 1
latest : 1
your : 1
MASTER : 1
example : 3
scala> : 1
DataFrames, : 1
provides : 1
configure : 1
distributions. : 1
can : 6
About : 1
instructions. : 1
do : 2
easiest : 1
no : 1
how : 2
`./bin/run-example : 1
Note : 1
individual : 1
spark:// : 1
It : 2
Scala : 2
Alternatively, : 1
an : 3
variable : 1
submit : 1
machine : 1
thread, : 1
them, : 1
detailed : 2
stream : 1
And : 1
distribution : 1
return : 2
Thriftserver : 1
./bin/spark-shell : 1
"local" : 1
start : 1
You : 3
Spark](#building-spark). : 1
one : 2
help : 1
with : 3
print : 1
Spark"](http://spark.apache.org/docs/latest/building-spark.html). : 1
data : 1
wiki](https://cwiki.apache.org/confluence/display/SPARK). : 1
in : 5
-DskipTests : 1
downloaded : 1
versions : 1
online : 1
Guide](http://spark.apache.org/docs/latest/configuration.html) : 1
comes : 1
[building : 1
Python, : 2
Many : 1
building : 2
Running : 1
from : 1
way : 1
Online : 1
site, : 1
other : 1
Example : 1
analysis. : 1
sc.parallelize(range(1000)).count() : 1
you : 4
runs. : 1
Building : 1
higher-level : 1
protocols : 1
guidance : 2
a : 8
guide, : 1
name : 1
fast : 1
SQL : 2
will : 1
instance: : 1
to : 14
core : 1
: 67
web : 1
"local
" : 1
programs : 2
package.) : 1
that : 2
MLlib : 1
["Building : 1
shell: : 2
Scala, : 1
and : 10
command, : 2
./dev/run-tests : 1
sample : 1
16/01/23 23:47:06 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver
16/01/23 23:47:06 INFO DAGScheduler: ResultStage 1 (foreach at WordCount.scala:50) finished in 0.687 s
16/01/23 23:47:06 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 685 ms on localhost (1/1)
16/01/23 23:47:06 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/01/23 23:47:06 INFO DAGScheduler: Job 0 finished: foreach at WordCount.scala:50, took 3.201890 s
16/01/23 23:47:06 INFO SparkUI: Stopped Spark web UI at http://192.168.56.1:4040
16/01/23 23:47:06 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
16/01/23 23:47:06 INFO MemoryStore: MemoryStore cleared
16/01/23 23:47:06 INFO BlockManager: BlockManager stopped
16/01/23 23:47:06 INFO BlockManagerMaster: BlockManagerMaster stopped
16/01/23 23:47:06 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
16/01/23 23:47:06 INFO SparkContext: Successfully stopped SparkContext
16/01/23 23:47:06 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/01/23 23:47:06 INFO ShutdownHookManager: Shutdown hook called
16/01/23 23:47:06 INFO ShutdownHookManager: Deleting directory C:\Users\think\AppData\Local\Temp\spark-1b2af0f7-bd82-42d3-b7a7-e2a33b8bef07
16/01/23 23:47:06 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
本地模式测试成功。
以上内容是王家林老师DT大数据梦工厂《 IMF传奇行动》第8课的学习笔记。
王家林:Spark、Flink、Docker、Android技术中国区布道师。Spark亚太研究院院长和首席专家,DT大数据梦工厂创始人,Android软硬整合源码级专家,英语发音魔术师,健身狂热爱好者。
微信公众账号:DT_Spark
联系邮箱18610086859@126.com
电话:18610086859
QQ:1740415547
微信号:18610086859
新浪微博:ilovepains
王家林老师的第一个中国梦:免费为全社会培养100万名优秀的大数据从业人员!
相关文章推荐
- Linux学习笔记(2)----文件权限
- VMware虚拟机克隆Linux系统后找不到eth0网卡的问题
- iOS核心动画高级技巧
- MySQL存储过程中的3种循环
- 使用 sysbench对mysql进行压力测试介绍之一
- iTop4412时钟配置
- 【JS复习笔记】06 方法
- 多表链接 Left join
- [iOS]利用系统NSRegularExpression使用正则表达式
- [ssh新闻发布系统第三天]存储新闻
- 记事(四)
- MySQL数据库事务隔离级别(Transaction Isolation Level)
- Java23种设计模式
- mysql sql_safe_updates 分析
- 百度糯米iOS客户端登录BUG
- Html5 Egret游戏开发 成语大挑战(三)开始界面
- js判断isNumber(obj)
- POJ 1287 Networking
- linux运行时动态调整内核参数工具sysctl学习小结
- 一条insert语句批量插入多条记录