使用Pyspark编写wordcount程序
2016-01-21 09:42
399 查看
# Word count on manuscript using PySpark
# import regex module
import re
# import add from operator module
from operator import add
# read input file
file_in = sc.textFile('/home/an/Documents/A00_Documents/Spark4Py 20150315')
# count lines
print('number of lines in file: %s' % file_in.count())
# add up lenths of each line
#
chars = file_in.map(lambda s: len(s)).reduce(add)
print('number of characters in file: %s' % chars)
# Get words from the input file
words =file_in.flatMap(lambda line: re.split('\W+', line.lower().strip()))
# words of more than 3 characters
words = words.filter(lambda x: len(x) > 3)
# set count 1 per word
words = words.map(lambda w: (w,1))
# reduce phase - sum count all the words
words = words.reduceByKey(add)
# create tuple (count, word) and sort in descending
words = words.map(lambda x: (x[1], x[0])).sortByKey(False)
# take top 20 words by frequency
words.take(20)
# create function for hitogram of most frequent words
#
% matplotlib inline
import matplotlib.pyplot as plt
#
def histogram(words):
count = map(lambda x: x[1], words)
word = map(lambda x: x[0], words)
plt.barh(range(len(count)), count,color = 'grey')
plt.yticks(range(len(count)), word)
# Change order of tuple (word, count) from (count, word)
words = words.map(lambda x:(x[1], x[0]))
words.take(25)
# display histogram
histogram(words.take(25))
# words in one summarised statement
words = sc.textFile('/home/an/Documents/A00_Documents/Spark4Py 20150315')
.flatMap(lambda line: re.split('\W+', line.lower().strip()))
.filter(lambda x: len(x) > 3)
.map(lambda w: (w,1))
.reduceByKey(add)
.map(lambda x: (x[1], x[0])).sortByKey(False)
words.take(20)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
# import regex module
import re
# import add from operator module
from operator import add
# read input file
file_in = sc.textFile('/home/an/Documents/A00_Documents/Spark4Py 20150315')
# count lines
print('number of lines in file: %s' % file_in.count())
# add up lenths of each line
#
chars = file_in.map(lambda s: len(s)).reduce(add)
print('number of characters in file: %s' % chars)
# Get words from the input file
words =file_in.flatMap(lambda line: re.split('\W+', line.lower().strip()))
# words of more than 3 characters
words = words.filter(lambda x: len(x) > 3)
# set count 1 per word
words = words.map(lambda w: (w,1))
# reduce phase - sum count all the words
words = words.reduceByKey(add)
# create tuple (count, word) and sort in descending
words = words.map(lambda x: (x[1], x[0])).sortByKey(False)
# take top 20 words by frequency
words.take(20)
# create function for hitogram of most frequent words
#
% matplotlib inline
import matplotlib.pyplot as plt
#
def histogram(words):
count = map(lambda x: x[1], words)
word = map(lambda x: x[0], words)
plt.barh(range(len(count)), count,color = 'grey')
plt.yticks(range(len(count)), word)
# Change order of tuple (word, count) from (count, word)
words = words.map(lambda x:(x[1], x[0]))
words.take(25)
# display histogram
histogram(words.take(25))
# words in one summarised statement
words = sc.textFile('/home/an/Documents/A00_Documents/Spark4Py 20150315')
.flatMap(lambda line: re.split('\W+', line.lower().strip()))
.filter(lambda x: len(x) > 3)
.map(lambda w: (w,1))
.reduceByKey(add)
.map(lambda x: (x[1], x[0])).sortByKey(False)
words.take(20)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Spark Python API 文档
http://spark.apache.org/docs/latest/api/python/pyspark.html
官方示例
WordCount bin/pyspark ./examples/src/main/python/wordcount.py /tmp/text bin/spark-submit --master local --num-executors 10 ./examples/src/main/python/wordcount.py /tmp/text bin/spark-submit --master yarn --num-executors 10 ./examples/src/main/python/wordcount.py /tmp/text Pi bin/spark-submit --master local --executor-memory 4G --num-executors 10 ./examples/src/main/python/pi.py
Python SparkConf
conf = SparkConf().setAppName("AppName").set("spark.executor.memory", "1g") sc = SparkContext(conf=conf)
Python SparkContext
sc.textFile("/hdfs/path") // 读取文件:Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. sc.union(rdd1, rdd2) // RDDs 并集:Build the union of a list of RDDs. sc.textFile("/hdfs/path").collect() // 获取所有元素:Return a list that contains all of the elements in this RDD. sc.parallelize([0, 2, 3, 4, 6], 5) // 使用 Python 集合创建一个 RDD,一共 5 个分片:Distribute a local Python collection to form an RDD. Using xrange is recommended if the input represents a range for performance.
Python Spark RDD 交并差(交集,并集,差集)
交集: intersection >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) >>> rdd1.intersection(rdd2).collect() [1, 2, 3] 差集: subtract >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) >>> rdd1.subtract(rdd2).collect() [4, 5, 10] 并集: subtract >>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5]) >>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8]) >>> rdd1.union(rdd2).collect() [1, 10, 2, 3, 4, 5, 1, 6, 2, 3, 7, 8] >>> rdd1.union(rdd2).distinct().collect() [1, 2, 3, 4, 5, 6, 7, 8, 10]
相关文章推荐
- 文字内容展开与折叠jquery代码
- C++之路进阶——树套树(陌上花开)
- Infopath表单&Reproting Service在IE11下问题解决
- "无法找到“XXX.exe”的调试信息,或者调试信息不匹配。未使用调试信息生成二进制文件"的解决方法
- 淘宝UWP--自定义图片缓存
- MKMapView简单的路线绘画
- 《iOS Human Interface Guidelines》——iAd Rich Media Ads
- 【转】Hadoop和大数据:60款顶级大数据开源工具(2015-10-27)
- TFS 强制删除工作区
- Android内嵌H5页面调用手机图片操作
- 设计模式一之策略模式
- Java EE标准体系概述
- 分页求总页数
- QTP中action使用,包括多action复用(转)
- StringBuffer类总结
- sql 学习笔记
- 2440slib.s
- MYSQL数据表建立外键
- 在SQL Server 2016里使用查询存储进行性能调优
- C++ Unicode SBCS 函数对照表,以备日后查阅