Hadoop实战-Flume之自定义Sink(十九)
2017-05-16 22:56
435 查看
import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class MySinks extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(MySinks.class); private static final String PROP_KEY_ROOTPATH = "fileName"; private String fileName; @Override public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Channel ch = getChannel(); Transaction txn = ch.getTransaction(); Event event = null; txn.begin(); while (true) { event = ch.take(); if (event != null) { break; } } try { logger.debug("Get event."); String body = new String(event.getBody()); System.out.println("event.getBody()-----" + body); String res = body + ":" + System.currentTimeMillis() + "\r\n"; File file = new File(fileName); FileOutputStream fos = null; try { fos = new FileOutputStream(file, true); } catch (FileNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { fos.write(res.getBytes()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { fos.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } txn.commit(); return Status.READY; } catch (Throwable th) { txn.rollback(); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { txn.close(); } } @Override public void configure(Context context) { // TODO Auto-generated method stub fileName = context.getString(PROP_KEY_ROOTPATH); } }
相关文章推荐
- Hadoop实战-Flume之Sink Load-balancing(十七)
- Hadoop实战-Flume之自定义Source(十八)
- Hadoop实战-Flume之Hdfs Sink(十)
- Hadoop实战-Flume之Sink Failover(十六)
- 自定义flume的sink
- Hadoop硬实战之一:使用flume将系统日志文件导入HDFS
- flume 自定义kafka sink运行失败:找不到Callback
- Hadoop 实例15 MultipleInputs实战2:多种自定义文件格式的文件输入处理
- flume自定义source,sink
- 自定义FlumeKafkaSink
- 【Flume】自定义sink kafka,并编译打包jar,unapproval license的问题解决
- flume 自定义开发HttpSink
- Flume(ng) 自定义sink实现和属性注入
- Hadoop实战-Flume之Source regex_filter(十三)
- Hadoop实战-Flume之Source replicating(十四)
- Hadoop实战-Flume之Hello world(九)
- flume-ng 自定义sink 实现rollfile 变量目录
- flume-ng 自定义sink消费flume source
- Flume-ng 自定义sink实现和属性注入
- Hadoop应用开发实战(flume应用开发、搜索引擎算法、Pipes、集群、PageRank算法)