使用scala,python完成统计数据demo
2017-12-16 15:17
441 查看
环境变量修改为:
export JAVA_HOME=/home/hadoop/opt/jdk1.8.0_152
export PATH=PATH:JAVA_HOME/bin
export HADOOP_HOME=/home/hadoop/opt/hadoop-2.9.0
export HADOOP_CONF_DIR=/home/hadoop/opt/hadoop-2.9.0/etc/hadoop
export PATH=PATH:HADOOP_HOME/bin:$HADOOP_HOME/sbin
export SPARK_HOME=/home/hadoop/opt/spark-2.2.1-bin-hadoop2.7
export PATH=PATH:SPARK_HOME/bin
开始操作:
hadoop用户下:
touch words.txt
vi words.txt(随意粘贴)
hadoop fs -put words.txt .
Scala语法:
var:type varName
val:final type varName
spark:session
sc:Context
wecome to spark 下
val lines =sc.textFile(“hdfs://free97zl2:9000/user/hadoop/words.txt”):绝对路径 lines为rdd数据
lines.count()
打印lines
lines.collect()
统计出现次数
val pairs = lines.map(x => (x,1))
打印pairs
pairs.collect()
val counts=pairs.reduceByKey((x,y)=>x+y)
counts.take(10)
val arr =counts.take(10)
for( n<-arr){ println(n)}:循环遍历输出arr
arr.size
arr.length:数组长度
sc.textFile(“/user/hadoop/words.txt”).map((,1)).reduceByKey(+).take(10).foreach(println())
“.” 可以省略,也可以使用空格隔开
法2:sc.textFile(“/user/hadoop/words.txt”)map((,1))reduceByKey(+)take(10)foreach(println())
lines.map(x =>x.split(” “)).collect()
统计每一个词的数量(包含重复词)
lines.map(x => x.split(” “)).flatMap(x => {
var list:List[(String, Int)] = List()
for( y <- x){
list = list :+ (y,1)
}
list
}).collect()
统计每一个词的数量(不包含重复词)
法1:
法2:
map:一行数据进来,一行数据出去
flatMap:一行数据进来,多行数据出去
使用python完成上面操作:
pyspark
lines=sc.textFile(‘/user/hadoop/words.txt’)
wordsArr = lines.map(lambda x:x.split(”))
wordsArr.take(10)
pairs=wordsArr.flatMap(lambda x:[(y,1)for y in x])
pairs.take(10)
counts = pairs.reduceByKey(lambda x,y :x+y)
for p in counts.take(10):
print(p)
回车
datas=counts.take(10)
for p in sorted(datas,key=lambda k:k[1],reverse=True):
print(p)
一句话:sc.textFile(‘/user/hadoop/words.txt’).map(lambda x:x.split(’ ‘)).flatMap(lambda x:[(y,1) for y in x]).reduceByKey(lambda x,y:x+y).take(10)
sc.textFile(‘/user/hadoop/words.txt’).map(lambda x:x.split(’ ‘)).flatMap(lambda x:[(y,1) for y in x]).reduceByKey(lambda x,y:x+y).saveAsTextFile(“/user/hadoop/sparkOut”)
多台机上执行:map flatMap reduceByKey filter groupByKey …
一台机上执行:take collect reduce
面试题:
rdd.map(xxx)
a=10
rdd.map(… b=10 a=a+1)
rdd.take(10)
在本地机器(driver)上运行的代码有:
a=10
rdd.take(10)
在集群(work)上运行的代码有:
rdd.map(xxx)
rdd.map(… b=10 a=a+1)
上述代码存在错误,因为a=10在本机声明的,a=a+1是在集群上进行修改的,如果想要成功,需要生命共享变量
export JAVA_HOME=/home/hadoop/opt/jdk1.8.0_152
export PATH=PATH:JAVA_HOME/bin
export HADOOP_HOME=/home/hadoop/opt/hadoop-2.9.0
export HADOOP_CONF_DIR=/home/hadoop/opt/hadoop-2.9.0/etc/hadoop
export PATH=PATH:HADOOP_HOME/bin:$HADOOP_HOME/sbin
export SPARK_HOME=/home/hadoop/opt/spark-2.2.1-bin-hadoop2.7
export PATH=PATH:SPARK_HOME/bin
开始操作:
hadoop用户下:
touch words.txt
vi words.txt(随意粘贴)
hadoop fs -put words.txt .
Scala语法:
var:type varName
val:final type varName
eg:val n =10 final int n =10 var i =10 int i =10
spark:session
sc:Context
wecome to spark 下
val lines =sc.textFile(“hdfs://free97zl2:9000/user/hadoop/words.txt”):绝对路径 lines为rdd数据
val lines =sc.textFile(“/user/hadoop/words.txt”)
统计下行数lines.count()
打印lines
lines.collect()
统计出现次数
val pairs = lines.map(x => (x,1))
打印pairs
pairs.collect()
val counts=pairs.reduceByKey((x,y)=>x+y)
counts.take(10)
val arr =counts.take(10)
for( n<-arr){ println(n)}:循环遍历输出arr
arr.size
arr.length:数组长度
sc.textFile(“/user/hadoop/words.txt”).map((,1)).reduceByKey(+).take(10).foreach(println())
“.” 可以省略,也可以使用空格隔开
法2:sc.textFile(“/user/hadoop/words.txt”)map((,1))reduceByKey(+)take(10)foreach(println())
lines.map(x =>x.split(” “)).collect()
统计每一个词的数量(包含重复词)
lines.map(x => x.split(” “)).flatMap(x => {
var list:List[(String, Int)] = List()
for( y <- x){
list = list :+ (y,1)
}
list
}).collect()
统计每一个词的数量(不包含重复词)
法1:
lines.map(x => x.split(" ")).flatMap(x => { var list:List[(String, Int)] = List() for( y <- x){ list = list :+ (y,1) } list }).reduceByKey(_+_).take(100).foreach(println(_))
法2:
lines.map(x => x.split(" ")).flatMap(x => for(y <- x) yield (y,1)).reduceByKey(_+_).take(100).foreach(println(_))
map:一行数据进来,一行数据出去
flatMap:一行数据进来,多行数据出去
使用python完成上面操作:
chmod a+x An... :将Anaconda3-5.0.1-Linux-x86_64.sh变成绿色可执行的 yum -y install bzip2:安装bzip2,否则安装Anaconda报错 ./Anaconda3-5.0.1-Linux-x86_64.sh :安装Anaconda,一路回车空格yes即可 出现Thank you for installing Anaconda3!,安装Anaconda成功! cd ~ source .bashrc
pyspark
lines=sc.textFile(‘/user/hadoop/words.txt’)
wordsArr = lines.map(lambda x:x.split(”))
wordsArr.take(10)
pairs=wordsArr.flatMap(lambda x:[(y,1)for y in x])
pairs.take(10)
counts = pairs.reduceByKey(lambda x,y :x+y)
for p in counts.take(10):
print(p)
回车
datas=counts.take(10)
for p in sorted(datas,key=lambda k:k[1],reverse=True):
print(p)
一句话:sc.textFile(‘/user/hadoop/words.txt’).map(lambda x:x.split(’ ‘)).flatMap(lambda x:[(y,1) for y in x]).reduceByKey(lambda x,y:x+y).take(10)
sc.textFile(‘/user/hadoop/words.txt’).map(lambda x:x.split(’ ‘)).flatMap(lambda x:[(y,1) for y in x]).reduceByKey(lambda x,y:x+y).saveAsTextFile(“/user/hadoop/sparkOut”)
多台机上执行:map flatMap reduceByKey filter groupByKey …
一台机上执行:take collect reduce
面试题:
rdd.map(xxx)
a=10
rdd.map(… b=10 a=a+1)
rdd.take(10)
在本地机器(driver)上运行的代码有:
a=10
rdd.take(10)
在集群(work)上运行的代码有:
rdd.map(xxx)
rdd.map(… b=10 a=a+1)
上述代码存在错误,因为a=10在本机声明的,a=a+1是在集群上进行修改的,如果想要成功,需要生命共享变量
相关文章推荐
- 使用python统计处理jira数据
- Java使用极小的内存完成对超大数据的去重计数,用于实时计算中统计UV
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 数据科学部门如何使用Python和R组合完成任务
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 使用Python完成控制主机与任务节点的交互 [Demo]
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 使用Python实现子区域数据分类统计
- 【Python_Demo_2】使用Python连续(批量)读取图片数据的Demo
- 使用Python实现子区域数据分类统计