您的位置:首页 > 编程语言 > Python开发

使用scala,python完成统计数据demo

2017-12-16 15:17 441 查看
环境变量修改为:

export JAVA_HOME=/home/hadoop/opt/jdk1.8.0_152

export PATH=PATH:JAVA_HOME/bin

export HADOOP_HOME=/home/hadoop/opt/hadoop-2.9.0

export HADOOP_CONF_DIR=/home/hadoop/opt/hadoop-2.9.0/etc/hadoop

export PATH=PATH:HADOOP_HOME/bin:$HADOOP_HOME/sbin

export SPARK_HOME=/home/hadoop/opt/spark-2.2.1-bin-hadoop2.7

export PATH=PATH:SPARK_HOME/bin

开始操作:

hadoop用户下:

touch words.txt

vi words.txt(随意粘贴)

hadoop fs -put words.txt .

Scala语法:

var:type varName

val:final type varName

eg:val n =10   final int n =10
var i =10   int i =10


spark:session

sc:Context

wecome to spark 下

val lines =sc.textFile(“hdfs://free97zl2:9000/user/hadoop/words.txt”):绝对路径 lines为rdd数据

val lines =sc.textFile(“/user/hadoop/words.txt”)

统计下行数

lines.count()

打印lines

lines.collect()

统计出现次数

val pairs = lines.map(x => (x,1))

打印pairs

pairs.collect()

val counts=pairs.reduceByKey((x,y)=>x+y)

counts.take(10)

val arr =counts.take(10)

for( n<-arr){ println(n)}:循环遍历输出arr

arr.size

arr.length:数组长度

sc.textFile(“/user/hadoop/words.txt”).map((,1)).reduceByKey(+).take(10).foreach(println())

“.” 可以省略,也可以使用空格隔开

法2:sc.textFile(“/user/hadoop/words.txt”)map((,1))reduceByKey(+)take(10)foreach(println())

lines.map(x =>x.split(” “)).collect()

统计每一个词的数量(包含重复词)

lines.map(x => x.split(” “)).flatMap(x => {

var list:List[(String, Int)] = List()

for( y <- x){

list = list :+ (y,1)

}

list

}).collect()

统计每一个词的数量(不包含重复词)

法1:

lines.map(x => x.split(" ")).flatMap(x => {

var list:List[(String, Int)] = List()

for( y <- x){

list = list :+ (y,1)

}

list

}).reduceByKey(_+_).take(100).foreach(println(_))


法2:

lines.map(x => x.split(" ")).flatMap(x => for(y <- x) yield (y,1)).reduceByKey(_+_).take(100).foreach(println(_))


map:一行数据进来,一行数据出去

flatMap:一行数据进来,多行数据出去

使用python完成上面操作:

chmod a+x An... :将Anaconda3-5.0.1-Linux-x86_64.sh变成绿色可执行的
yum -y install bzip2:安装bzip2,否则安装Anaconda报错
./Anaconda3-5.0.1-Linux-x86_64.sh :安装Anaconda,一路回车空格yes即可
出现Thank you for installing Anaconda3!,安装Anaconda成功!
cd ~
source .bashrc


pyspark

lines=sc.textFile(‘/user/hadoop/words.txt’)

wordsArr = lines.map(lambda x:x.split(”))

wordsArr.take(10)

pairs=wordsArr.flatMap(lambda x:[(y,1)for y in x])

pairs.take(10)

counts = pairs.reduceByKey(lambda x,y :x+y)

for p in counts.take(10):

print(p)

回车

datas=counts.take(10)

for p in sorted(datas,key=lambda k:k[1],reverse=True):

print(p)

一句话:sc.textFile(‘/user/hadoop/words.txt’).map(lambda x:x.split(’ ‘)).flatMap(lambda x:[(y,1) for y in x]).reduceByKey(lambda x,y:x+y).take(10)

sc.textFile(‘/user/hadoop/words.txt’).map(lambda x:x.split(’ ‘)).flatMap(lambda x:[(y,1) for y in x]).reduceByKey(lambda x,y:x+y).saveAsTextFile(“/user/hadoop/sparkOut”)

多台机上执行:map flatMap reduceByKey filter groupByKey …

一台机上执行:take collect reduce

面试题:

rdd.map(xxx)

a=10

rdd.map(… b=10 a=a+1)

rdd.take(10)

在本地机器(driver)上运行的代码有:

a=10

rdd.take(10)

在集群(work)上运行的代码有:

rdd.map(xxx)

rdd.map(… b=10 a=a+1)

上述代码存在错误,因为a=10在本机声明的,a=a+1是在集群上进行修改的,如果想要成功,需要生命共享变量
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: