您的位置:首页 > 其它

重写Flume-NG-morphline-avro-sink

2015-04-08 22:37 323 查看
之前在CSDN中发过一篇关于如何编写flume的morphline-avro-sink的文章(http://blog.csdn.net/zh_yi/article/details/39552441)。发现浏览次数不少,但没有评论。可能说明看后对大家没有什么帮助吧,最近发现之前写的程序在大数据量环境下存在性能瓶颈,通过该sink的event只能达到200条/秒的发送量,而morphline-solr-sink能达到1000条/秒(当然,受到硬件配置的制约)。于是决定重写这个sink,以提升性能。

本sink开发的依据是flume-ng-morphline-solr-sink,在此基础上将Morphline处理完的Record变成event,通过RpcClient发送出去。

开发的环境需要将flume-ng的1.5.2编译通过,并正确导入到eclipse中。

下面开始编程部分。也就是修改morphline solr sink的部分。

一、 找到flume源码所在路径(C:\apache-flume-1.5.2-src\flume-ng-sinks),拷贝一份flume-ng-morphline-solr-sink目录到同级文件夹,将目录名称修改成flume-ng-morphline-avro-sink。

二、 修改flume-ng-morphline-avro-sink目录下的pom.xml文件。修改<artifactId>标签中内容为flume-ng-morphline-avro-sink。修改<name>标签中内容为FlumeNG Morphline Avro Sink。注释掉<properties>标签中的<solr.version>和<solr.expected.version>两个标签内容。因为是原Solr中的内容,这里用不到。

三、 在eclipse中打开flume-ng-sinks/pom.xml文件,在Overview标签页中的Modules点击Add,找到flume-ng-morphline-avro-sink并选中确定,将新建的morphline-avro-sink添加到Modules中。这样在该pom文件的“pom.xml”标签页中就可以看到多了一个flume-ng-morphline-avro-sink。

<pre lang="html"line="1" escaped="true">

<modules>
<module>flume-hdfs-sink</module>
<module>flume-irc-sink</module>
<module>flume-ng-hbase-sink</module>
<module>flume-ng-elasticsearch-sink</module>
<module>flume-ng-morphline-solr-sink</module>
<module>flume-ng-morphline-avro-sink</module>
</modules>

</pre>

这样就是我们新建的sink利用Maven管理起来了。

四、 修改部分包/类名

1、 将src/test/java下org.apache.flume.sink.solr.morphline中的solr替换成avro。

2、 将src/main/java/下org.apache.flume.sink.solr.morphline中的solr替换成avro.

3、 将src/test/java/org.apache.flume.sink.solr.morphline下的TestMorphlineSolrSink.java修改成TestMorphlineAvroSink.java

4、 将src/main/java/org.apache.flume.sink.solr.morphline中的MorphlineSolrSink.java修改成MorphlineAvroSink.java

五、 Flume中包含AvroSink,是通过Rpc连接。需要创建一个RpcClient,然后将flume中接到的eventappend到client中,作为avro event发送出去。这是AvroSink基本的也是最关键的动作。因此,就需要在MorphlineSink中初始化一个RpcClient,并在Handler中处理发送的动作。

具体实现思路:

因为处理event的代码是顺序执行的,如果每个event处理完成都append到client发送,会导致Morphline处理Record的效率降低。因此,要将此部分代码进行分离。本次将client发送event的动作采用线程池的方式进行处理。初始化构建一个线程池,线程池中的线程接收morphline处理完成的Record,将Record转换成event后再由client发送。

六、 Morphline处理event的简单流程

Morphline是一个ETL工具,采用管道式执行的方式将其中的Record进行处理转换,并按照配置的命令逐条执行,直到得到最终结果后进行加载。

1、 初始化MorphlineSink;构建clientProps,将Rpc服务的主机和端口进行赋值,为构建RpcClient做数据准备。

2、 启动MorphlineSink,调用MorphlineSink的start()方法。

a) 初始化RpcClient。client = initializeRpcClient(clientProps);

b) 构建handler。将初始化好的client赋给handler。

c) 处理event。调用MorphlineSink的process()方法。因为在Morphline中,event将被转换成Morphline定义的Record。在MorphlineSink的process()方法中调用handler的process(event)接口。

d) 利用handler处理event。正常将event转换成Record。利用Morphline Command继续处理Record。管道式处理,本命令处理完成之后将继续下一条命令的处理。

七、 修改源代码。

1、 MorphlineHandler.java

添加setRpcClient接口:

<pre lang="html"line="1" escaped="true">

public void setRpcClient(RpcClientclient);

</pre>

2、 MorphlineHandlerImpl.java

a) 定义线程池中线程的个数的私有成员变量

private static final int NUM =3;//线程池中线程个数定义为3,根据实际情况而定。

定义线程池

<pre lang="html"line="1" escaped="true">

public ExcutorService es =Excutors.newFixedThreadPool(NUM);

</pre>

b) 添加Override setRpcClient方法

<pre lang="html"line="1" escaped="true">

@Override
public
void
setRpcClient(RpcClient client){
this.client =client;
}

</pre>

c) 构建Collector内部类,其中定义一个队列,用来存储被处理完成的Record。

<pre lang="html" line="1"escaped="true">

public
static final
class
Collector
implements
Command {
public Collector(){}
private BlockingQueue<Record>
queue = new ArrayBlockingQueue<Record>(10);
@Override
public Command getParent() {
return
null
;
}
public BlockingQueue<Record> getQueue() {
return
queue;
}
public
void
setQueue(BlockingQueue<Record> queue) {
this.queue = queue;
}
@Override
public
void
notify(Record notification) {
}
@Override
public
boolean
process(Record record) {
Preconditions.checkNotNull(record);
try {
//接收到Record后,将其压如队列

queue.put(record);
} catch (InterruptedException e) {
e.printStackTrace();
return
false
;
}
return
true
;
}
}

</pre>

d) 构建处理利用client发送event的内部线程

<pre lang="html" line="1"escaped="true">

public
class
MorphlineTask implements Runnable {
private BlockingQueue<Record>
queue;
private RpcClient
client;
public MorphlineTask() {
}
public MorphlineTask(BlockingQueue<Record> queue,RpcClient client) {
this.queue = queue;
this.client = client;
}
@Override
public
void
run() {
Record r = null;
while (true) {
try {
//从队列中取出Record,并转换成event。
r = queue.take();
Map<String, String> headers = null;
headers = new HashMap<String, String>();
ListMultimap<String, Object> lmt =r.getFields();
Map<String, Collection<Object>> m =lmt.asMap();
Iterator it = m.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Object> entry = (Entry<String,Object>) it
.next();
if (entry.getValue() !=
null && !entry.getKey().equals(Fields.ATTACHMENT_BODY)) {
List v = (List)entry.getValue();
if (v.get(0) !=
null) {
headers.put(entry.getKey(),v.get(0).toString());
}
}
}
try{
Event e = EventBuilder.withBody((byte[])r.getFirstValue(Fields.ATTACHMENT_BODY), headers);

client.append(e);
}catch(NullPointerException e){
e.printStackTrace();
LOG.error("Rpc Clientis null!");
}
} catch (InterruptedException e) {
// TODO Auto-generatedcatch block
e.printStackTrace();
break;
} catch (EventDeliveryException e1) {
// TODO Auto-generatedcatch block
e1.printStackTrace();
}

}
}
}

</pre>

e) 修改configure(Context context)方法

<pre lang="html" line="1"escaped="true">

Collector finalChild =
null;
try{
//初始化内部处理Record的Command
finalChild = new
Collector();
}catch(Exception e){
e.printStackTrace();
}
//执行线程池中的线程,发送event
for (int i = 0; i <
NUM; i++) {
es.execute(newMorphlineTask(finalChild.getQueue(),
client));
}
</pre>
3、 MorphlineSink.java

a) 将该类原来集成的AbstractSink修改成AbstractRpcSink;

b) 定义私有成员变量RpcClient client;

c) 定义私有成员变量Properties clientProps;

d) 修改configure(Context context)方法。

<pre lang="html" line="1"escaped="true">

//创建clientProps。为client提供初始化数据
clientProps = new Properties();
clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS,
"h1"); clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +"h1", context.getString("hostname")
+ ":" + context.getInteger("port"));
for (Entry<String,String> entry: context.getParameters().entrySet()) {
clientProps.setProperty(entry.getKey(),entry.getValue());
}
</pre>
e) 修改start()方法

添加初始化client代码
<pre lang="html" line="1"escaped="true">
//初始化RpcClient。
try {
client =initializeRpcClient(clientProps);
} catch (Exception e) {
e.printStackTrace();
}
</pre>
赋值client
在构建handler部分添加赋值client代码
tmpHandler.setRpcClient(client);
f) 修改stop()方法

finally中添加client.close();

至此Flume-NG-Morphline-Avro-Sink编写完成。如有问题欢迎指正,有兴趣交流的码农欢迎加QQ:58431505,请注明:Flume交流。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: