您的位置:首页 > 编程语言 > Python开发

kafka+spark streaming代码实例(pyspark+python)

2018-03-02 10:42 711 查看
一、系统准备1.启动zookeeper:bin/zkServer.cmd start2.启动kafka:bin/kafka-server-start.sh -daemon config/server.properties3.启动spark:sbin/start-all.sh数据来源:http://files.grouplens.org/datasets/movielens/ml-100k.zip 流程:kafka读取user数据集并生产数据流——spark streaming 计算每个职业人数——计算结果存入MySQL二、kafka读取user数据集并生产数据流,1秒生产1条记录。先创建topic:bin/kafka-topics.sh --create --zookeeper 192.168.26.247:2181 --replication-factor2 --partitions 1 --topic txt验证topic:bin/kafka-topics.sh --list --zookeeper 192.168.26.247:2181
 bin/kafka-topics.sh --describe --zookeeper192.168.26.247:2181 --topic txt


from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import time
def main():
##生产模块
producer = KafkaProducer(bootstrap_servers=['192.168.26.247:9092'])
with open('/home/hadoop/ml-100k/u.user','r') as f:
for line in f.readlines():
time.sleep(1)
producer.send("txt",line)
print line
#producer.flush()

if __name__ == '__main__':
main() 保存txt.py运行结果如下:



spark streaming 消费并计算数据,并将结果存入数据库。from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition
import MySQLdb
def start():
sconf=SparkConf()
sconf.set('spark.cores.max',3)
sc=SparkContext(appName='txt',conf=sconf)
ssc=StreamingContext(sc,5)
brokers ="192.168.26.247:9092,192.168.26.246:9092"
topic='txt'
start = 70000
partition=0
user_data = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list":brokers})
#fromOffsets 设置从起始偏移量消费
#user_data = KafkaUtils.createDirectStream(ssc,[topic],kafkaParams={"metadata.broker.list":brokers},fromOffsets={TopicAndPartition(topic,partition):long(start)})
user_fields = user_data.map(lambda line: line[1].split('|'))
gender_users = user_fields.map(lambda fields: fields[3]).map(lambda gender: (gender,1)).reduceByKey(lambda a,b: a+b)
user_data.foreachRDD(offset)#存储offset信息
gender_users.pprint()
gender_users.foreachRDD(lambda rdd: rdd.foreach(echo))#返回元组
ssc.start()
ssc.awaitTermination()
offsetRanges = []
def offset(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
def echo(rdd):
zhiye = rdd[0]
num = rdd[1]
for o in offsetRanges:
topic = o.topic
partition = o.partition
fromoffset = o.fromOffset
untiloffset = o.untilOffset
#结果插入MySQL
conn = MySQLdb.connect(user="root",passwd="******",host="192.168.26.245",db="test",charset="utf8")
cursor = conn.cursor()
sql = "insert into zhiye(id,zhiye,num,topic,partitions,fromoffset,untiloffset) \
values (NULL,'%s','%d','%s','%d','%d','%d')" % (zhiye,num,topic,partition,fromoffset,untiloffset)
cursor.execute(sql)
conn.commit()
conn.close()

if __name__ == '__main__':
start()
三、向集群submitbin/spark-submit --master spark://192.168.26.245:7077 --jars jars/spark-streaming-kafka-0-8-assembly_2.11-2.0.2.jar python/txt.py
运行结果


数据库部分数据:


WEB显示数据:
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: