Easticsearch 数据迁移至influxdb【python】
2017-01-16 22:25
447 查看
Easticsearch 数据迁移至influxdb python
需求:将Easticsearch部分数据迁移至influxdb中。
见过从mysql,influxdb迁移至Easticsearch中的,没见过从Easticsearch迁移至influxdb中,迁移的数据是一些实时性的流量数据,influxdb时序性数据库对这类数据的支撑比较客观。
解决方案:大批量从Easticsearch取数据,两种方案。1.from...size 2.scroll (类似于数据库的游标) 脚本采用第二种scroll方案对Easticsearch 查询取数据。循环通过scrool_id进行查询并写入influxdb中。
需求:将Easticsearch部分数据迁移至influxdb中。
见过从mysql,influxdb迁移至Easticsearch中的,没见过从Easticsearch迁移至influxdb中,迁移的数据是一些实时性的流量数据,influxdb时序性数据库对这类数据的支撑比较客观。
解决方案:大批量从Easticsearch取数据,两种方案。1.from...size 2.scroll (类似于数据库的游标) 脚本采用第二种scroll方案对Easticsearch 查询取数据。循环通过scrool_id进行查询并写入influxdb中。
#!/usr/bin/env python #coding=utf-8 import sys import json import datetime import elasticsearch from influxdb import InfluxDBClient #连接Easticsearch class ES(object): @classmethod def connect_host(cls): url = "http://192.168.121.33:9202/" es = elasticsearch.Elasticsearch(url,timeout=120) return es es = ES.connect_host() #连接influxdb client = InfluxDBClient(host="192.168.121.33", port="8086", username='admin', password='admin', database='esl') client.create_database('esl') #DSL查询语法 data = { "query": { "match_all" : {}}, "size": 100 } # 设置要过滤返回的字段值,要什么字段。 'hits.hits._source.resource_id', 'hits.hits._source.timestamp', 'hits.hits._source.counter_volume', 'hits.hits._source.@timestamp', ] # 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用 res = es.search( index='pipefilter_meters*', doc_type ='canaledge.flow.bytes', body=data, search_type="scan", scroll="10m" ) scroll_id = res['_scroll_id'] response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,) scroll_id = response['_scroll_id'] #获取第二次scroll_id hits = response['hits']['hits'] in_data = [] while len(hits) > 0: for i in hits: res_id = i['_source']['resource_id'] r_id, r_type = res_id.split(':') datas = { "measurement": "es_net", "tags": { "resource_id": r_id, "type": r_type }, "time": i['_source']['timestamp'], "fields": { "counter_volume": i['_source']['counter_volume'] } } in_data.append(datas) #循环写入influxdb client.write_points(in_data) in_data = [] #每次循环完重新定义列表为空 data = { "query": { "match_all" : {}}, "size": 100 } ## 设置要过滤返回的字段值,要什么字段。 '_scroll_id', 'hits.hits._source.resource_id', 'hits.hits._source.timestamp', 'hits.hits._source.counter_volume', 'hits.hits._source.@timestamp', ] ## 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用 response= es.scroll(scroll_id=scroll_id, scroll= "10m",filter_path=return_fields,) #调试 #if not response.get('hits'): # print response # sys.exit(1) #else: hits = response['hits']['hits'] scroll_id = response["_scroll_id"] #获取第三次scroll_id
相关文章推荐
- Python动态类型的学习---引用的理解
- Python3写爬虫(四)多线程实现数据爬取
- 垃圾邮件过滤器 python简单实现
- 下载并遍历 names.txt 文件,输出长度最长的回文人名。
- install and upgrade scrapy
- Scrapy的架构介绍
- Centos6 编译安装Python
- 使用Python生成Excel格式的图片
- 让Python文件也可以当bat文件运行
- [Python]推算数独
- Python中zip()函数用法举例
- Python中map()函数浅析
- Python将excel导入到mysql中
- Python在CAM软件Genesis2000中的应用
- 使用Shiboken为C++和Qt库创建Python绑定
- FREEBASIC 编译可被python调用的dll函数示例
- 通过构建一个简单的掷骰子游戏去学习怎么用 Python 编程