flume 自定义开发HttpSink
2017-03-17 00:02
447 查看
由于业务需求flume 读取数据,然后直接发送到应用系统。所以只好自己开发了一个 Http Sink 进行发送数据,将接收的数据发送到应用系统的restful API。并支持断点续传。
也就是应用程序宕机以后,重新启动,数据还可以继续传送。不丢失数据。数据发送失败也会继续重试。
所需依赖如下:
配置如下:
也就是应用程序宕机以后,重新启动,数据还可以继续传送。不丢失数据。数据发送失败也会继续重试。
好了废话不多说,上代码。
发送Http请求我使用 okHttp ,代码如下:mport com.alibaba.fastjson.JSONArray; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import okhttp3.*; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class LogCollector extends AbstractSink implements Configurable{ private Logger LOG = LoggerFactory.getLogger(LogCollector.class); private String hostname; private String port; private int batchSize; private String postUrl; public LogCollector() { LOG.info("LogCollector start..."); } @Override public void configure(Context context) { hostname = context.getString("hostname"); Preconditions.checkNotNull(hostname, "hostname must be set!!"); port = context.getString("port"); Preconditions.checkNotNull(port, "port must be set!!"); batchSize = context.getInteger("batchSize", 100); Preconditions.checkNotNull(batchSize > 0, "batchSize must be a positive number!!"); postUrl = "http://"+hostname+":"+port+/路径url; } @Override public void start() { super.start(); } @Override public void stop() { super.stop(); } public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = null; try { transaction = channel.getTransaction(); transaction.begin(); Event event = null; String content = null; List<String> contents=new ArrayList<>(); for (int i = 0; i < batchSize; i++) { event = channel.take(); if (event != null) {//对事件进行处理 content = new String(event.getBody()); contents.add(content); } else { result = Status.BACKOFF; break; } } if (contents.size() > 0) { Response response= postJson(postUrl, JSONArray.toJSON(contents).toString()); if(response!=null && response.isSuccessful()){ transaction.commit();//通过 commit 机制确保数据不丢失 } }else { transaction.commit(); } } catch (Exception e) { try { transaction.rollback(); } catch (Exception e2) { LOG.error("Exception in rollback. Rollback might not have been" + "successful.", e2); } LOG.error("Failed to commit transaction." + "Transaction rolled back.", e); Throwables.propagate(e); }finally { if (transaction != null) { transaction.close(); LOG.debug("close Transaction"); } } return result; } /** * post请求,json数据为body * * @param url * @param json */ public Response postJson(String url, String json) { OkHttpClient client = new OkHttpClient(); RequestBody body = RequestBody.create(MediaType.parse("application/json"), json); Request request = new Request.Builder() .url(url) .post(body) .build(); Response response = null; try { response = client.newCall(request).execute(); if (!response.isSuccessful()){ LOG.info("request was error"); } } catch (IOException e) { e.printStackTrace(); } return response; } }
所需依赖如下:
<!--okhttp3--> <dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>3.6.0</version> </dependency> <!--fastjson--> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.5</version> </dependency>
配置如下:
a1.sinks = epp ##use epp sink a1.sinks.epp.type = 包名.LogCollector a1.sinks.epp.channel = c1 a1.sinks.epp.hostname = localhost a1.sinks.epp.port = 8080 a1.sinks.epp.batchSize = 100
相关文章推荐
- flume开发--自定义Sink
- flume开发--自定义Sink
- WCF分布式安全开发实践(10):消息安全模式之自定义用户名密码:Message_UserNamePassword_WSHttpBinding
- WCF分布式安全开发实践(3):传输安全模式之自定义用户名密码身份验证:Transport_UserNamePassword_WSHttpBinding
- flume的自定义sink-Kafka
- flume自定义sink source
- flume的hdfssink自定义EventSerializer序列化类
- 读书片断之 开发自定义HTTP模块
- 自定义flume的sink
- 自定义FlumeKafkaSink
- flume-ng 自定义sink 实现rollfile 变量目录
- flume开发-自定义拦截器(Interceptor)
- WCF分布式安全开发实践(6):传输安全模式之自定义X509Certificate证书验证:Transport_X509Certificate_WSHttpBinding
- 微信公众账号开发教程(四)自定义菜单(含实例源码)——转自http://www.cnblogs.com/yank/p/3418194.html
- 移动网络应用开发中,使用 HTTP 协议比起使用 socket 实现基于 TCP 的自定义协议有哪些优势?
- flume 自定义sink
- WCF分布式安全开发实践(10):消息安全模式之自定义用户名密码:Message_UserNamePassword_WSHttpBinding
- WCF分布式安全开发实践(12):消息安全模式之自定义X509证书验证:Message_CustomX509Certificate_WSHttpBinding
- Flume NG 学习笔记(十) Transaction、Sink、Source和Channel开发
- flume自定义source,sink