您的位置:首页 > 其它

将我自己的知识库文件加载到Elasticsearch中。

2016-12-20 00:00 267 查看
本地有很多文本文件,都是一些知识点,现在将他们批量载入ES中。

有点乱,未整理,但能用。

这里要注意的是str的decode方法,需要加上参数'ignore',因为在实际使用中会发生转换异常,原因我也没有想太明白。加上ignore会跳过这些错误,没有太大影响(出现问题是一些符号字符,而不是英文,中文,也可能有其他状况我没有碰到)。

#-*-coding:UTF-8-*-

__author__='zhaoxp'

import elasticsearch
import json
import os
import os.path

from datetime import datetime

'''create my knowledgebase(kb)

'''

KB_OS_PATH='E:\\study\\my_knowledgebase\\Unix&Linux&AIX&Windows'

def create_kb_index_type(es, index, doc_type):
res = es.index(index=index, doc_type=doc_type, body={})
print('create index&type result ')
print(json.dumps(res, indent=2))
return res

def create_index(es,index):
re = es.indices.create(index=index)
print('create index result')
print(json.dumps(re))

def del_index(es, index):
try:
re = es.indices.delete(index=index)
print('delete index result')
print(json.dumps(re))
return re
except elasticsearch.exceptions.NotFoundError as nfe:
print('del_index error: index(%s) not found'%index)
return None

def get_index(es, index):
try:
re = es.indices.get(index=index)
return re
except elasticsearch.exceptions.NotFoundError as nfe:
print('get_index error: index(%s) not found'%index)
return None

def load_os_knowledge_to_kb(es):
print('load OS knowledge from local files into KB')
print('KB_OS_PATH = %s'%KB_OS_PATH)
file_list = os.listdir(KB_OS_PATH)
for file_name in file_list:
if file_name[-4:] == '.txt':
print('-'*20+file_name+'-'*20)
insert_file_content_to_es(es,KB_OS_PATH,file_name)

def insert_file_content_to_es(es,file_dir, file_name):
print('insert content of file into ES %s/%s'%(file_dir,file_name))
with open(os.path.join(file_dir,file_name),'rb') as f:
try:
file_name = file_name[:-4].decode('gbk','ignore')
file_content = f.read().decode('gbk','ignore')
body = {'file_name':file_name,'author':'zhaoxp','source':'manual write','message':file_content}
try:
re = es.index(index='kb' ,doc_type='OS' ,body=body)
print 'processing %s'%file_name
print re
except Exception as e:
print('Exception : %s - %s'%(file_name,e))
except UnicodeDecodeError as ude:
print('UnicodeDecodeError during processing %s'%file_name)

def test_chinese(es):
dir_path='D:\\temp\\t1'
for fname in os.listdir(dir_path):
print(fname)
print(type(fname))
with open(os.path.join(dir_path,fname),'rb') as f:
content = f.read()
print(content)
print(type(content))
body = {'message':'chinese words:你好,世界',
'title':fname,
'file_content':content}
re = es.index(index='kb', doc_type='os', body=body)
print('test chinese')
print(json.dumps(re, indent=2))

if __name__=='__main__':
print('test ES api')
es = elasticsearch.Elasticsearch(hosts=
[{'host':'10.120.20.206','port':9200},{'host':'10.120.20.205','port':9200}])
if es.ping():
print('ES connected.')
#del_index(es, 'kb')
print es.indices
print dir(es.indices)
#re = get_index(es,'bank')
#print(json.dumps(re, indent=2))
#del_index(es,'kb')
print('start to load')
load_os_knowledge_to_kb(es)
#test_chinese(es)
#
#print('create index')
#create_index(es,'kb')
#print('create index kb and doc_type os')
#create_kb_index_type(es,'kb','os')
#print dir(es)
#doc = {'author': 'kimchy','text': 'Elasticsearch: cool. bonsai cool.','timestamp': datetime.now(),}
#res = create_index(es,index_name='kb',doc_type='test',body=doc)
#res = get_index(es, index_name='kb', doc_type='test', id='AVj2riiT4esYedJ276IG')
else:
print('ES not connected')
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ElasticSearch