使用pykafka读取实时数据小例子
2017-04-10 21:53
381 查看
使用pykafka读取实时数据
import sys from pykafka import KafkaClient from pykafka.balancedconsumer import BalancedConsumer from pykafka.simpleconsumer import OwnedPartition, OffsetType reload(sys) sys.setdefaultencoding('utf8') #pykafka, need install PyKafka class PyKafka: consumer = None TOPIC = 'log_download' BROKER_LIST = '10.23.23.24:9092,10.23.23.21:9092' ZK_LIST = '10.23.23.24:2181,10.23.23.21:2181/sh-bt' server = topic = zsServer = None def __init__(self): print("begin pykafka") self.server = self.BROKER_LIST self.topic = self.TOPIC self.zkServer= self.ZK_LIST def getConnect(self): client = KafkaClient(hosts=self.server) topic = client.topics[self.topic] self.consumer = topic.get_balanced_consumer( consumer_group="zs_download_04", # 自己命令 auto_offset_reset=OffsetType.LATEST,#在consumer_group存在的情况下,设置此变量,表示从最新的开始取 #auto_offset_reset=OffsetType.EARLIEST, #reset_offset_on_start=True, #auto_commit_enable=True, zookeeper_connect=self.zkServer ) #self.consumer = topic.get_simple_consumer(reset_offset_on_start=False) self.consumer.consume() self.consumer.commit_offsets() return self.consumer def disConnect(self): #self.consumer.close() pass def beginConsumer(self): for oneLog in self.consumer: print(oneLog.offset) print(oneLog.value) if __name__ == '__main__': pk = PyKafka() pk.getConnect() pk.beginConsumer()
相关文章推荐
- 使用kafkapython读取实时数据小例子
- 一个使用cv::Mat按单通道读取数据然后按照灰度范围设置灰度的例子
- 使用Windows API读取文件数据的例子
- Linux下C语言实现的简单使用线程向FIFO里写入与读取数据的例子
- Excel使用VBA读取实时WebService股票数据
- 演练:使用 DataGrid Web 控件读取和写入数据
- 第四课 使用SqlDataReader读取数据(翻译)
- [原创]一个利用PHP语言读取数据库数据的例子(菜鸟版)
- (原创)方便的使用单击和双击更新DataGrid中的数据的例子
- ASP.NET数据库使用精典-----读取数据库中数据
- 使用 SqlDataReader 读取数据示例
- ASP中使用XMLHTTP读取远程数据2
- ASP中使用XMLHTTP读取远程数据3
- 对应诺言,写了篇文章,“如何使用Serialization 进行文件存储/读取数据 (上)”, 请大家评评。
- 使用 XMLHttpRequest 读取 XML 数据
- rome的一个小例子,读取rss数据
- 非阻塞方式下Socket读取数据的一个例子
- 方便的使用单击和双击更新DataGrid中的数据的例子
- ASP中使用XMLHTTP读取远程数据3
- 使用单击和双击更新DataGrid中的数据的例子