您的位置:首页 > 其它

使用pykafka读取实时数据小例子

2017-04-10 21:53 381 查看
使用pykafka读取实时数据

import sys
from pykafka import KafkaClient
from pykafka.balancedconsumer import BalancedConsumer
from pykafka.simpleconsumer import OwnedPartition, OffsetType

reload(sys)
sys.setdefaultencoding('utf8')

#pykafka, need install PyKafka

class PyKafka:

consumer = None
TOPIC   = 'log_download'
BROKER_LIST = '10.23.23.24:9092,10.23.23.21:9092'
ZK_LIST = '10.23.23.24:2181,10.23.23.21:2181/sh-bt'

server = topic = zsServer = None

def __init__(self):
print("begin pykafka")
self.server  = self.BROKER_LIST
self.topic   = self.TOPIC
self.zkServer= self.ZK_LIST

def getConnect(self):
client = KafkaClient(hosts=self.server)
topic = client.topics[self.topic]

self.consumer = topic.get_balanced_consumer(
consumer_group="zs_download_04", # 自己命令
auto_offset_reset=OffsetType.LATEST,#在consumer_group存在的情况下,设置此变量,表示从最新的开始取
#auto_offset_reset=OffsetType.EARLIEST,
#reset_offset_on_start=True,
#auto_commit_enable=True,
zookeeper_connect=self.zkServer
)
#self.consumer = topic.get_simple_consumer(reset_offset_on_start=False)
self.consumer.consume()
self.consumer.commit_offsets()
return self.consumer

def disConnect(self):
#self.consumer.close()
pass

def beginConsumer(self):
for oneLog in self.consumer:
print(oneLog.offset)
print(oneLog.value)

if __name__ == '__main__':

pk = PyKafka()
pk.getConnect()
pk.beginConsumer()



                                            
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  实时数据 pykafka