您的位置:首页 > 理论基础 > 计算机网络

flume 自定义开发HttpSink

2017-03-17 00:02 447 查看
由于业务需求flume 读取数据,然后直接发送到应用系统。所以只好自己开发了一个 Http Sink 进行发送数据,将接收的数据发送到应用系统的restful API。并支持断点续传。

也就是应用程序宕机以后,重新启动,数据还可以继续传送。不丢失数据。数据发送失败也会继续重试。

好了废话不多说,上代码。

发送Http请求我使用 okHttp ,代码如下:

mport com.alibaba.fastjson.JSONArray;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import okhttp3.*;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class LogCollector extends AbstractSink implements Configurable{

private Logger LOG = LoggerFactory.getLogger(LogCollector.class);
private String hostname;
private String port;
private int batchSize;
private String postUrl;
public LogCollector() {
LOG.info("LogCollector start...");
}

@Override
public void configure(Context context) {
hostname = context.getString("hostname");
Preconditions.checkNotNull(hostname, "hostname must be set!!");
port = context.getString("port");
Preconditions.checkNotNull(port, "port must be set!!");
batchSize = context.getInteger("batchSize", 100);
Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!");
postUrl = "http://"+hostname+":"+port+/路径url;
}

@Override
public void start() {
super.start();
}

@Override
public void stop() {
super.stop();
}

public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = null;
try {
transaction = channel.getTransaction();
transaction.begin();
Event event = null;
String content = null;
List<String> contents=new ArrayList<>();
for (int i = 0; i < batchSize; i++) {
event = channel.take();
if (event != null) {//对事件进行处理
content = new String(event.getBody());
contents.add(content);
} else {
result = Status.BACKOFF;
break;
}
}
if (contents.size() > 0) {
Response response= postJson(postUrl, JSONArray.toJSON(contents).toString());
if(response!=null && response.isSuccessful()){
transaction.commit();//通过 commit 机制确保数据不丢失
}
}else
{
transaction.commit();
}
} catch (Exception e) {
try {
transaction.rollback();
} catch (Exception e2) {
LOG.error("Exception in rollback. Rollback might not have been" +
"successful.", e2);
}
LOG.error("Failed to commit transaction." +
"Transaction rolled back.", e);
Throwables.propagate(e);
}finally {
if (transaction != null) {
transaction.close();
LOG.debug("close Transaction");
}
}
return result;
}

/**
* post请求,json数据为body
*
* @param url
* @param json
*/
public Response postJson(String url, String json) {
OkHttpClient client = new OkHttpClient();
RequestBody body = RequestBody.create(MediaType.parse("application/json"), json);
Request request = new Request.Builder()
.url(url)
.post(body)
.build();
Response response = null;
try {
response = client.newCall(request).execute();
if (!response.isSuccessful()){
LOG.info("request was error");
}
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
}


所需依赖如下:

<!--okhttp3-->
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>3.6.0</version>
</dependency>

<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.5</version>
</dependency>


配置如下:

a1.sinks = epp

##use epp sink
a1.sinks.epp.type = 包名.LogCollector
a1.sinks.epp.channel = c1
a1.sinks.epp.hostname = localhost
a1.sinks.epp.port = 8080
a1.sinks.epp.batchSize = 100
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  flume-采集