您的位置:首页 > 编程语言 > Python开发

python消费kafka数据批量插入到es

2017-12-04 18:48 579 查看
1、es的批量插入

这是为了方便后期配置的更改,把配置信息放在logging.conf中

用elasticsearch来实现批量操作,先安装依赖包,sudo pip install Elasticsearch2

from elasticsearch import Elasticsearch
class ImportEsData:

logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")

def __init__(self,hosts,index,type):
self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
self.index = index
self.type = type

def set_date(self,data):
# 批量处理
# es.index(index="test-index",doc_type="test-type",id=42,body={"any":"data","timestamp":datetime.now()})
self.es.index(index=self.index,doc_type=self.index,body=data)


2、使用pykafka消费kafka

1.因为kafka是0.8,pykafka不支持zk,只能用get_simple_consumer来实现

2.为了实现多个应用同时消费而且不重消费,所以一个应用消费一个partition

3. 为是确保消费数据量在不满足10000这个批量值,能在一个时间范围内插入到es中,这里设置consumer_timeout_ms一个超时等待时间,退出等待消费阻塞。

4.退出等待消费阻塞后导致无法再消费数据,因此在获取self.consumer 的外层加入了while True 一个死循环

#!/usr/bin/python
# -*- coding: UTF-8 -*-
from pykafka import KafkaClient
import logging
import logging.config
from ConfigUtil import ConfigUtil
import datetime

class KafkaPython:
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")
logger_data = logging.getLogger("data")

def __init__(self):
self.server = ConfigUtil().get("kafka","kafka_server")
self.topic  = ConfigUtil().get("kafka","topic")
self.group = ConfigUtil().get("kafka","group")
self.partition_id = int(ConfigUtil().get("kafka","partition"))
self.consumer_timeout_ms = int(ConfigUtil().get("kafka","consumer_timeout_ms"))
self.consumer = None
self.hosts = ConfigUtil().get("es","hosts")
self.index_name = ConfigUtil().get("es","index_name")
self.type_name = ConfigUtil().get("es","type_name")

def getConnect(self):
client = KafkaClient(self.server)
topic = client.topics[self.topic]
p = topic.partitions
ps={p.get(self.partition_id)}

self.consumer = topic.get_simple_consumer(
consumer_group=self.group,
auto_commit_enable=True,
consumer_timeout_ms=self.consumer_timeout_ms,
# num_consumer_fetchers=1,
# consumer_id='test1',
partitions=ps
)
self.starttime = datetime.datetime.now()

def beginConsumer(self):
print("beginConsumer kafka-python")
imprtEsData = ImportEsData(self.hosts,self.index_name,self.type_name)
#创建ACTIONS
count = 0
ACTIONS = []

while True:
endtime = datetime.datetime.now()
print (endtime - self.starttime).seconds
for message in self.consumer:
if message is not None:
try:
count = count + 1
# print(str(message.partition.id)+","+str(message.offset)+","+str(count))
# self.logger.info(str(message.partition.id)+","+str(message.offset)+","+str(count))
action = {
"_index": self.index_name,
"_type": self.type_name,
"_source": message.value
}
ACTIONS.append(action)
if len(ACTIONS) >= 10000:
imprtEsData.set_date(ACTIONS)
ACTIONS = []
self.consumer.commit_offsets()
endtime = datetime.datetime.now()
print (endtime - self.starttime).seconds
#break
except (Exception) as e:
# self.consumer.commit_offsets()
print(e)
self.logger.error(e)
self.logger.error(str(message.partition.id)+","+str(message.offset)+","+message.value+"\n")
# self.logger_data.error(message.value+"\n")
# self.consumer.commit_offsets()

if len(ACTIONS) > 0:
self.logger.info("等待时间超过,consumer_timeout_ms,把集合数据插入es")
imprtEsData.set_date(ACTIONS)
ACTIONS = []
self.consumer.commit_offsets()

def disConnect(self):
self.consumer.close()

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
class ImportEsData:

logging.config.fileConfig("logging.conf")
logger = logging.getLogger("msg")

def __init__(self,hosts,index,type):
self.es = Elasticsearch(hosts=hosts.strip(',').split(','), timeout=5000)
self.index = index
self.type = type

def set_date(self,data):
# 批量处理
success = bulk(self.es, data, index=self.index, raise_on_error=True)
self.logger.info(success)


3.运行

if __name__ == '__main__':
kp = KafkaPython()
kp.getConnect()
kp.beginConsumer()
# kp.disConnect()


注:简单的写了一个从kafka中读取数据到一个list里,当数据达到一个阈值时,在批量插入到 es的插件

现在还在批量的压测中。。。

欢迎一起讨论

8e6e
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: