elasticsearch-river-kafka 插件的环境配置和使用
2015-10-30 17:41
411 查看
1.elasticsearch-river-kafka插件的安装
elasticsearch-river-kafka插件的安装与其他插件一样cd$ELASTICSEARCH_HOME ./bin/plugin-urlfile:/$PLUGIN_PATH-installelasticsearch-river-kafka |
cd$ELASTICSEARCH_HOME ./bin/plugin-removeelasticsearch-river-kafka ./bin/plugin-urlfile:/$PLUGIN_PATH-installelasticsearch-river-kafka |
伦理片'target='_blank'>http://www.dotdy.com/
'target='_blank'>http://www.dotdy.com/
2.river节点的配置
配置river节点的时候,river节点和非river节点都要配置。river节点:在es的配置文件中添加下面几行
#node.river:_none_##这一行要注释掉,表示为river节点 threadpool: bulk: type: fixed size: 60 queue_size: 1000 |
node.river:_none_##这一行要解注,表示该节点不是river节点 threadpool: bulk: type: fixed size: 60 queue_size: 1000 |
节点配置完后,记得重启es,重启es的顺序:master节点→data节点→river节点
3.elasticsearch-river-kafka插件的开发
社区中的elasticsearch-river-kafka插件仅提供了对String和json数据的简单处理。在实际生产中,我们遇到的情况要复杂得多。那么这个时候,我们就得自己去开发elasticsearch-river-kafka插件实现一些附加功能。
下面就简单介绍一下开发elasticsearch-river-kafka插件的步骤
1)KafkaRiverPlugin
该类需要继承KafkaRiverPlugin和实现AbstractPlugin,在该类中定义plugin的名称和描述
@Override public String name(){ return "river-kafka" ; } @Override public String description(){ return "River KafkaPlugin" ; } |
需要在es-plugin.properties中添加如下的定义,这样ES在启动的时候就能够通过org.elasticsearch.plugins.PluginManager
在当前的classpath中扫描到我们的plugin。
注意:定义中要写KafkaRiverPlugin类的全称,es-plugin.properties一般位于src/main/resources下
plugin=com.test.elasticsearch.plugin.river.kafka.KafkaRiverPlugin |
KafkaRiverPlugin的onModule方法:在ES加载所有的插件时,会invoke一个onModule方法。KafkaRiverModule会作为参数传进来
public void onModule(RiversModule module){ module.registerRiver( "kafka" , KafkaRiverModule. class ); } |
public class KafkaRiverModule extends AbstractModule { @Override protected void configure() { bind(River. class ).to(KafkaRiver. class ).asEagerSingleton(); } } |
–KafkaRiver必须继承AbstractRiverComponent,并且实现River接口。
–KafkaRiver只提供两个方法:start和close。
–AbstractRiverComponent用于initializekafkariver的logger、river名、river的配置
–构造函数通过@Inject注入river所需要的一切东西:RiverName,RiverSettings、logger、自定义的配置信息
(这里是BasicProperties,在BasicProperties中定义的配置参数可以在创建river的时候被指定,参见“4.kafka→river→es的数据存储”)
–在start方法中启动了kafkariver的线程。在这个线程中,将数据从kafka中读取数据,然后将这些数据写到es中。
–kafkaConsumer用来定义从kafka中读取数据时的用户操作。
–ElasticsearchProducer用来定义将数据写入ES时的用户操作。
public class KafkaRiver extends AbstractRiverComponent implements River { private BasicProperties properties; private KafkaConsumer kafkaConsumer; private ElasticsearchProducer elasticsearchProducer; private static ScheduledExecutorService private Thread riverMonitorThread; private KafkaRiverSubMonitor kafkaRiverSubMonitor; private Thread thread; private ESLogger logger; @Inject protected KafkaRiver(RiverName super (riverName, settings); this .logger properties new BasicProperties(settings); elasticsearchProducer new ElasticsearchProducer(client, properties); kafkaConsumer new KafkaConsumer(riverName, } @Override public void start() { //启动KafkaRiver的线程 try { logger.info( "MHA: ); thread "kafka_river" ).newThread(kafkaConsumer); thread.start(); } catch (Exception logger.error( "Unexpected ,ex); throw new RuntimeException(ex); } } ...... } |
4.kafka→river→es的数据存储
通过下面的指令,可以创建一条river,这样从kafka的baymaxtest的topic中的数据通过river就会落到es上。注意:一个集群可以创建多个river,各river可以指定不同的topic、patition和序列化类
curl-XPUT ' -d '{ "type" : "kafka" , "kafka" : { "topic" : "test" , "numOfConsumer" : "2" , "zk.connect" : "10.10.10.10:2181" , "zk.session.timeout.ms" : "50000" , "zk.sync.time.ms" : "200" , "zk.auto.commit.interval.ms" : "1000" , "zk.auto.commit.enable" : "true" , "zk.auto.offset.reset" : "smallest" , "zk.fetch.message.max.bytes" : "5242880" , "serializer" : "com.test.elasticsearch.river.kafka.serializer.AASerializer" }, "elasticsearch" : { "indexName" : "stringfortest" , "indexType" : "message1" , "batch_size" : "500" , "handling_batch_coresize" : "2" , "handling_batch_maximumPoolSize" : "2" , "handling_batch_keepAliveTime" : "600" , "handling_batch_queueSize" : "10" , "es_bulk_timeout" : "5" } }' |
kafka中→
topic:kafka的topic名为test,
numOfConsumer:从kafka中读取数据的消费者个数
zk.connect:zookper的host名
serializer:对从kafka中来的数据的序列化类
elasticsearch中→
indexName:在es中生成的index名,从该river中通过的数据会落到这个index中
indexType:index的type
es_bulk_timeout:es批量处理的timeout
上述指令会返回下面的结果
{ "_index" : "_river" , "_type" : "baymaxriver1" , "_id" : "_meta" , "_version" : 1 , "created" : true } |
相关文章推荐
- java中判断任何一个整数区间的素数的个数,并输出所有的素数
- 写支票的滞留金的算法
- php 发送邮件代码
- MySql数据库基础操作
- Markdown离线版(MAC)
- android js和webview交互
- css3盒模型
- SpringMVC文件上传和下载
- 揭开LiteOS的神秘面纱
- 上传下载---下载
- 机器学习之聚类算法(K-Means)
- Hadoop的map获取当前spilt文件名
- JavaScript中的循环类型(整理笔记)
- iOS开源框架 JCAlertView
- ARC以及MRC中setter方法的差异
- 上传下载---上传
- 解决eclipse插件svn不显示svn信息和显示的信息为数字的问题
- android sdk版本版本与ADT版本不兼容的处理办法
- 如何访问nc给外部访问的接口
- 解决eclipse插件svn不显示svn信息和显示的信息为数字的问题