您的位置:首页 > 其它

elasticsearch-river-kafka 插件的环境配置和使用

2015-10-30 17:41 411 查看




1.elasticsearch-river-kafka插件的安装

elasticsearch-river-kafka插件的安装与其他插件一样

cd$ELASTICSEARCH_HOME

./bin/plugin-urlfile:/$PLUGIN_PATH-installelasticsearch-river-kafka


插件更新

cd$ELASTICSEARCH_HOME

./bin/plugin-removeelasticsearch-river-kafka

./bin/plugin-urlfile:/$PLUGIN_PATH-installelasticsearch-river-kafka




伦理片

'target='_blank'>http://www.dotdy.com/

2.river节点的配置

配置river节点的时候,river节点和非river节点都要配置。
river节点:在es的配置文件中添加下面几行

#node.river:_none_##这一行要注释掉,表示为river节点

threadpool:

bulk:

type:
fixed

size:
60

queue_size:
1000


非river节点:在es的配置文件中添加下面几行

node.river:_none_##这一行要解注,表示该节点不是river节点

threadpool:

bulk:

type:
fixed

size:
60

queue_size:
1000


注意:一般,不会将数据落在river节点上(即node.data:false),但测试环境上就无所谓了,机器资源又紧张。
节点配置完后,记得重启es,重启es的顺序:master节点→data节点→river节点



3.elasticsearch-river-kafka插件的开发

社区中的elasticsearch-river-kafka插件仅提供了对String和json数据的简单处理。在实际生产中,我们遇到的情况要复杂得多。
那么这个时候,我们就得自己去开发elasticsearch-river-kafka插件实现一些附加功能。
下面就简单介绍一下开发elasticsearch-river-kafka插件的步骤
1)KafkaRiverPlugin
该类需要继承KafkaRiverPlugin和实现AbstractPlugin,在该类中定义plugin的名称和描述

@Override

public
String
name(){

return
"river-kafka"
;

}

@Override

public
String
description(){

return
"River
KafkaPlugin"
;

}


2)es-plugin.properties配置文件
需要在es-plugin.properties中添加如下的定义,这样ES在启动的时候就能够通过org.elasticsearch.plugins.PluginManager
在当前的classpath中扫描到我们的plugin。
注意:定义中要写KafkaRiverPlugin类的全称,es-plugin.properties一般位于src/main/resources下

plugin=com.test.elasticsearch.plugin.river.kafka.KafkaRiverPlugin


3)KafkaRiverModule
KafkaRiverPlugin的onModule方法:在ES加载所有的插件时,会invoke一个onModule方法。KafkaRiverModule会作为参数传进来

public
void
onModule(RiversModule
module){

module.registerRiver(
"kafka"
,
KafkaRiverModule.
class
);

}


KafkaRiverModule必须继承AbstractModule。在KafkaRiverModule中会生成一个KafkaRiver。KafkaRiver是River接口的实现。

public
class
KafkaRiverModule
extends
AbstractModule
{

@Override

protected
void
configure()
{

bind(River.
class
).to(KafkaRiver.
class
).asEagerSingleton();

}

}


4)KafkaRiver
–KafkaRiver必须继承AbstractRiverComponent,并且实现River接口。
–KafkaRiver只提供两个方法:start和close。
–AbstractRiverComponent用于initializekafkariver的logger、river名、river的配置
–构造函数通过@Inject注入river所需要的一切东西:RiverName,RiverSettings、logger、自定义的配置信息
(这里是BasicProperties,在BasicProperties中定义的配置参数可以在创建river的时候被指定,参见“4.kafka→river→es的数据存储”)
–在start方法中启动了kafkariver的线程。在这个线程中,将数据从kafka中读取数据,然后将这些数据写到es中。
–kafkaConsumer用来定义从kafka中读取数据时的用户操作。
–ElasticsearchProducer用来定义将数据写入ES时的用户操作。

public
class
KafkaRiver
extends
AbstractRiverComponent
implements
River
{

private
BasicProperties
properties;

private
KafkaConsumer
kafkaConsumer;

private
ElasticsearchProducer
elasticsearchProducer;

private
static
ScheduledExecutorService
service=Executors.newSingleThreadScheduledExecutor();

private
Thread
riverMonitorThread;

private
KafkaRiverSubMonitor
kafkaRiverSubMonitor;

private
Thread
thread;

private
ESLogger
logger;

@Inject

protected
KafkaRiver(RiverName
riverName,RiverSettingssettings,Clientclient){

super
(riverName,
settings);

this
.logger
=Loggers.getLogger(getClass(),settings.globalSettings(),riverName);

properties
=
new
BasicProperties(settings);

elasticsearchProducer
=
new
ElasticsearchProducer(client,
properties);

kafkaConsumer
=
new
KafkaConsumer(riverName,
properties,elasticsearchProducer);

}

@Override


public
void
start()
{

//启动KafkaRiver的线程

try
{

logger.info(
"MHA:
StartingKafkaWorker..."
);

thread
=EsExecutors.daemonThreadFactory(settings.globalSettings(),
"kafka_river"
).newThread(kafkaConsumer);

thread.start();

}
catch
(Exception
ex){

logger.error(
"Unexpected
Erroroccurred"
,ex);

throw
new
RuntimeException(ex);

}

}

......

}


影音先锋电影'target='_blank'>http://www.iskdy.com/


4.kafka→river→es的数据存储

通过下面的指令,可以创建一条river,这样从kafka的baymaxtest的topic中的数据通过river就会落到es上。
注意:一个集群可以创建多个river,各river可以指定不同的topic、patition和序列化类

curl-XPUT
'http://localhost:9200/_river/baymaxriver1/_meta'
-d
'{

"type"
:
"kafka"
,

"kafka"
:
{

"topic"
:
"test"
,

"numOfConsumer"
:
"2"
,

"zk.connect"
:
"10.10.10.10:2181"
,

"zk.session.timeout.ms"
:
"50000"
,

"zk.sync.time.ms"
:
"200"
,

"zk.auto.commit.interval.ms"
:
"1000"
,

"zk.auto.commit.enable"
:
"true"
,

"zk.auto.offset.reset"
:
"smallest"
,

"zk.fetch.message.max.bytes"
:
"5242880"
,

"serializer"
:
"com.test.elasticsearch.river.kafka.serializer.AASerializer"

},

"elasticsearch"
:
{

"indexName"
:
"stringfortest"
,

"indexType"
:
"message1"
,

"batch_size"
:
"500"
,

"handling_batch_coresize"
:
"2"
,

"handling_batch_maximumPoolSize"
:
"2"
,

"handling_batch_keepAliveTime"
:
"600"
,

"handling_batch_queueSize"
:
"10"
,

"es_bulk_timeout"
:
"5"

}

}'


上述指令中主要配置信息的说明:
kafka中→
topic:kafka的topic名为test,
numOfConsumer:从kafka中读取数据的消费者个数
zk.connect:zookper的host名
serializer:对从kafka中来的数据的序列化类
elasticsearch中→
indexName:在es中生成的index名,从该river中通过的数据会落到这个index中
indexType:index的type
es_bulk_timeout:es批量处理的timeout
上述指令会返回下面的结果

{
"_index"
:
"_river"
,

"_type"
:
"baymaxriver1"
,

"_id"
:
"_meta"
,

"_version"
:
1
,

"created"
:
true

}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: