如何编写Flume-ng-morphline-avro-sink
2014-09-25 15:28
495 查看
以下内容都是自己瞎琢磨,并实验通过了。不知是否还有其他更好的方法,请各位大侠指正。本人研究大数据不到两个月时间。
工作需要,在预研与大数据,主要是hadoop相关组件和子项目的一些技术。
预研的产品平台主要包含hadoop、flume、solr、spark。目前重点关注flume和solr部分。
即:从flume采集回日志进行分词传给solr创建索引,或第三方平台发送的已经结构化的数据直接创建索引。
平台框架类似下图(自己用visio画的简单示意图,仅供参考。图中省略了flume channel,数据处理层与数据分发层中间缺少了一个source和channel):
数据源:部署flume agent和第三方agent。flume agent主要用于采集日志。第三方agent用户采集除日志之外的其他信息。
采集层:部署flume和第三方采集平台,flume source用于接收从flume agent采集回来的日志信息。3rd platform用于接收从3rd agent采集回来的其他数据。3rd platform将数据进行结构化传递给flume source。
数据处理层:判断数据类型(结构化、非结构化),对非结构化数据使用morphline进行分词。采用morphline-avro-sink发出。
数据分发层:接收morphline-avro-sink数据,使用不同的sink将数据分发给不同的业务进行处理,其中重要的一条路径是分发给solr创建数据索引。
业务处理层:只介绍solr创建索引。
本文重点介绍morphline-avro-sink的编写过程。以及周边所需要的一些功能。
主要有以下几点是需要重点关注的:
1、参照flume源码中的flume-ng-morphline-solr-sink代码。
2、由于该sink最后是需要将数据以avro格式发出,所以MorphlineSink要继承AbstractRpcSink。因为Flume-ng的AvroSink就是继承的这个类。
3、因为AbstractRpcSink对外提供了两个接口用于数据处理:RpcClient.append(Event)和RpcClient.appendBatch(List<Event>)。所以,要在MorphlineSink中做好RpcClient的初始化。
4、MorphlineHandlerImpl中要在上下文中初始化好一个finalChild的Command。这个command默认是morphline中所有命令的最后一个,来接收之前命令的处理结果。
以下是MorphlineSink的代码,其中与flume-ng-morphline-solr-sink不同之处用红色字体标识。
1、知识的记录和备忘。
2、对正在做这部分的同行有点帮助。
3、请大家指正,是否有其他更好的办法,或者这种实现方式是否存在其他隐患。
4、与大家交流,希望能提高自己。
工作需要,在预研与大数据,主要是hadoop相关组件和子项目的一些技术。
预研的产品平台主要包含hadoop、flume、solr、spark。目前重点关注flume和solr部分。
即:从flume采集回日志进行分词传给solr创建索引,或第三方平台发送的已经结构化的数据直接创建索引。
平台框架类似下图(自己用visio画的简单示意图,仅供参考。图中省略了flume channel,数据处理层与数据分发层中间缺少了一个source和channel):
数据源:部署flume agent和第三方agent。flume agent主要用于采集日志。第三方agent用户采集除日志之外的其他信息。
采集层:部署flume和第三方采集平台,flume source用于接收从flume agent采集回来的日志信息。3rd platform用于接收从3rd agent采集回来的其他数据。3rd platform将数据进行结构化传递给flume source。
数据处理层:判断数据类型(结构化、非结构化),对非结构化数据使用morphline进行分词。采用morphline-avro-sink发出。
数据分发层:接收morphline-avro-sink数据,使用不同的sink将数据分发给不同的业务进行处理,其中重要的一条路径是分发给solr创建数据索引。
业务处理层:只介绍solr创建索引。
本文重点介绍morphline-avro-sink的编写过程。以及周边所需要的一些功能。
主要有以下几点是需要重点关注的:
1、参照flume源码中的flume-ng-morphline-solr-sink代码。
2、由于该sink最后是需要将数据以avro格式发出,所以MorphlineSink要继承AbstractRpcSink。因为Flume-ng的AvroSink就是继承的这个类。
3、因为AbstractRpcSink对外提供了两个接口用于数据处理:RpcClient.append(Event)和RpcClient.appendBatch(List<Event>)。所以,要在MorphlineSink中做好RpcClient的初始化。
4、MorphlineHandlerImpl中要在上下文中初始化好一个finalChild的Command。这个command默认是morphline中所有命令的最后一个,来接收之前命令的处理结果。
以下是MorphlineSink的代码,其中与flume-ng-morphline-solr-sink不同之处用红色字体标识。
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flume.sink.avro.morphline; import java.util.Properties; import java.util.Map.Entry; import org.apache.flume.Channel; import org.apache.flume.ChannelException; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientConfigurationConstants; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.conf.Configurable; import org.apache.flume.conf.ConfigurationException; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractRpcSink; import org.kitesdk.morphline.api.Command; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Flume sink that extracts search documents from Flume events and processes them using a morphline * {@link Command} chain. */ public class MorphlineSink extends AbstractRpcSink implements Configurable { private RpcClient client; private Properties clientProps; private int maxBatchSize = 1000; private long maxBatchDurationMillis = 1000; private String handlerClass; private MorphlineHandler handler; private Context context; private SinkCounter sinkCounter; public static final String BATCH_SIZE = "batchSize"; public static final String BATCH_DURATION_MILLIS = "batchDurationMillis"; public static final String HANDLER_CLASS = "handlerClass"; private static final Logger LOGGER = LoggerFactory.getLogger(MorphlineSink.class); public MorphlineSink() { this(null); } /** For testing only */ protected MorphlineSink(MorphlineHandler handler) { this.handler = handler; } @Override public void configure(Context context) { this.context = context; maxBatchSize = context.getInteger(BATCH_SIZE, maxBatchSize); maxBatchDurationMillis = context.getLong(BATCH_DURATION_MILLIS, maxBatchDurationMillis); handlerClass = context.getString(HANDLER_CLASS, MorphlineHandlerImpl.class.getName()); if (sinkCounter == null) { LOGGER.info("sinkCount is null"); sinkCounter = new SinkCounter(getName()); } /*LOGGER.info("sinkCount is " + sinkCounter.toString());*/ clientProps = new Properties(); clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1"); clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX + "h1", context.getString("hostname") + ":" + context.getInteger("port")); for (Entry<String, String> entry: context.getParameters().entrySet()) { clientProps.setProperty(entry.getKey(), entry.getValue()); } <span style="color:#ff0000;">client = initializeRpcClient(clientProps);</span> if (handler == null) { MorphlineHandler tmpHandler; try { tmpHandler = (MorphlineHandler) Class.forName(handlerClass).newInstance(); } catch (Exception e) { throw new ConfigurationException(e); } tmpHandler.configure(context); handler = tmpHandler; } super.configure(context); } /** * Returns the maximum number of events to take per flume transaction; * override to customize */ private int getMaxBatchSize() { return maxBatchSize; } /** Returns the maximum duration per flume transaction; override to customize */ private long getMaxBatchDurationMillis() { return maxBatchDurationMillis; } /*@Override public synchronized void start() { LOGGER.info("Starting Morphline Sink {} ...", this); sinkCounter.start(); if (handler == null) { MorphlineHandler tmpHandler; try { tmpHandler = (MorphlineHandler) Class.forName(handlerClass).newInstance(); } catch (Exception e) { throw new ConfigurationException(e); } tmpHandler.configure(context); handler = tmpHandler; } super.start(); LOGGER.info("Morphline Sink {} started.", getName()); } @Override public synchronized void stop() { LOGGER.info("Morphline Sink {} stopping...", getName()); try { if (handler != null) { handler.stop(); } sinkCounter.stop(); LOGGER.info("Morphline Sink {} stopped. Metrics: {}, {}", getName(), sinkCounter); } finally { super.stop(); } }*/ @Override public Status process() throws EventDeliveryException { int batchSize = getMaxBatchSize(); long batchEndTime = System.currentTimeMillis() + getMaxBatchDurationMillis(); Channel myChannel = getChannel(); Transaction txn = myChannel.getTransaction(); txn.begin(); boolean isMorphlineTransactionCommitted = true; try { int numEventsTaken = 0; handler.beginTransaction(); isMorphlineTransactionCommitted = false; // List<Event> events = Lists.newLinkedList(); // repeatedly take and process events from the Flume queue for (int i = 0; i < batchSize; i++) { Event event = myChannel.take(); if (event == null) { break; } sinkCounter.incrementEventDrainAttemptCount(); numEventsTaken++; // LOGGER.info("Flume event: {}", event); //StreamEvent streamEvent = createStreamEvent(event); <span style="color:#ff0000;">handler.process(event, client);</span> // events.add(event); if (System.currentTimeMillis() >= batchEndTime) { break; } } // handler.process(events, client); // update metrics if (numEventsTaken == 0) { sinkCounter.incrementBatchEmptyCount(); } if (numEventsTaken < batchSize) { sinkCounter.incrementBatchUnderflowCount(); } else { sinkCounter.incrementBatchCompleteCount(); } handler.commitTransaction(); isMorphlineTransactionCommitted = true; txn.commit(); sinkCounter.addToEventDrainSuccessCount(numEventsTaken); return numEventsTaken == 0 ? Status.BACKOFF : Status.READY; } catch (Throwable t) { // Ooops - need to rollback and back off LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " + myChannel.getName() + ". Exception follows.", t); try { if (!isMorphlineTransactionCommitted) { handler.rollbackTransaction(); } } catch (Throwable t2) { LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback morphline transaction. " + "Exception follows.", t2); } finally { try { txn.rollback(); } catch (Throwable t4) { LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback Flume transaction. " + "Exception follows.", t4); } } if (t instanceof Error) { throw (Error) t; // rethrow original exception } else if (t instanceof ChannelException) { return Status.BACKOFF; } else { throw new EventDeliveryException("Failed to send events", t); // rethrow and backoff } } finally { txn.close(); } } @Override public String toString() { int i = getClass().getName().lastIndexOf('.') + 1; String shortClassName = getClass().getName().substring(i); return getName() + " (" + shortClassName + ")"; } @Override protected RpcClient initializeRpcClient(Properties props) { LOGGER.info("Attempting to create Avro Rpc client."); return RpcClientFactory.getInstance(props); } }
/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.flume.sink.avro.morphline; import java.io.File; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.event.EventBuilder; import org.kitesdk.morphline.api.Command; import org.kitesdk.morphline.api.MorphlineCompilationException; import org.kitesdk.morphline.api.MorphlineContext; import org.kitesdk.morphline.api.Record; import org.kitesdk.morphline.base.Compiler; import org.kitesdk.morphline.base.FaultTolerance; import org.kitesdk.morphline.base.Fields; import org.kitesdk.morphline.base.Metrics; import org.kitesdk.morphline.base.Notifications; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.SharedMetricRegistries; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; import com.google.common.collect.ListMultimap; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; /** * A {@link MorphlineHandler} that processes it's events using a morphline * {@link Command} chain. */ public class MorphlineHandlerImpl implements MorphlineHandler { private MorphlineContext morphlineContext; private Command morphline; private Command finalChild; private String morphlineFileAndId; private Timer mappingTimer; private Meter numRecords; private Meter numFailedRecords; private Meter numExceptionRecords; public static final String MORPHLINE_FILE_PARAM = "morphlineFile"; public static final String MORPHLINE_ID_PARAM = "morphlineId"; /** * Morphline variables can be passed from flume.conf to the morphline, e.g.: * agent.sinks.solrSink.morphlineVariable.zkHost=127.0.0.1:2181/solr */ public static final String MORPHLINE_VARIABLE_PARAM = "morphlineVariable"; private static final Logger LOG = LoggerFactory .getLogger(MorphlineHandlerImpl.class); // For test injection void setMorphlineContext(MorphlineContext morphlineContext) { this.morphlineContext = morphlineContext; } // for interceptor void setFinalChild(Command finalChild) { this.finalChild = finalChild; } @Override public void configure(Context context) { String morphlineFile = context.getString(MORPHLINE_FILE_PARAM); String morphlineId = context.getString(MORPHLINE_ID_PARAM); if (morphlineFile == null || morphlineFile.trim().length() == 0) { throw new MorphlineCompilationException("Missing parameter: " + MORPHLINE_FILE_PARAM, null); } morphlineFileAndId = morphlineFile + "@" + morphlineId; if (morphlineContext == null) { FaultTolerance faultTolerance = new FaultTolerance( context.getBoolean(FaultTolerance.IS_PRODUCTION_MODE, false), context.getBoolean( FaultTolerance.IS_IGNORING_RECOVERABLE_EXCEPTIONS, false), context.getString(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES)); morphlineContext = new MorphlineContext.Builder() .setExceptionHandler(faultTolerance) .setMetricRegistry( SharedMetricRegistries .getOrCreate(morphlineFileAndId)).build(); } Config override = ConfigFactory.parseMap(context .getSubProperties(MORPHLINE_VARIABLE_PARAM + ".")); <span style="color:#ff0000;">finalChild = new CollectorB();</span> morphline = new Compiler().compile(new File(morphlineFile), morphlineId, morphlineContext, finalChild, override); this.mappingTimer = morphlineContext.getMetricRegistry().timer( MetricRegistry.name("morphline.app", Metrics.ELAPSED_TIME)); this.numRecords = morphlineContext.getMetricRegistry().meter( MetricRegistry.name("morphline.app", Metrics.NUM_RECORDS)); this.numFailedRecords = morphlineContext.getMetricRegistry().meter( MetricRegistry.name("morphline.app", "numFailedRecords")); this.numExceptionRecords = morphlineContext.getMetricRegistry().meter( MetricRegistry.name("morphline.app", "numExceptionRecords")); } @Override public void process(Event event, RpcClient client) { // LOG.info("entry into MorphlineHandlerImpl process" + event); numRecords.mark(); Timer.Context timerContext = mappingTimer.time(); try { Record record = new Record(); for (Entry<String, String> entry : event.getHeaders().entrySet()) { record.put(entry.getKey(), entry.getValue()); } byte[] bytes = event.getBody(); if (bytes != null && bytes.length > 0) { record.put(Fields.ATTACHMENT_BODY, bytes); } try { Notifications.notifyStartSession(morphline); if (!morphline.process(record)) { numFailedRecords.mark(); LOG.warn("Morphline {} failed to process record: {}", morphlineFileAndId, record); } <span style="color:#ff0000;">Map<String, String> headers = null; List<Record> tmp = ((CollectorB) finalChild).getRecords(); List<Record> records = new ArrayList<Record>(); records.addAll(tmp); tmp.clear(); // LOG.info("records 00000--------- " + records.size()); Iterator irt = records.iterator(); while (irt.hasNext()) { Record r = (Record) irt.next(); headers = new HashMap<String, String>(); ListMultimap<String, Object> lmt = r.getFields(); Map<String, Collection<Object>> m = lmt.asMap(); Iterator it = m.entrySet().iterator(); while (it.hasNext()) { Entry<String, Object> entry = (Entry<String, Object>) it .next(); if (entry.getValue() != null) { List v = (List) entry.getValue(); if (v.get(0) != null) { headers.put(entry.getKey(), v.get(0).toString()); } } } Event e = EventBuilder.withBody(event.getBody(), headers); client.append(e);</span> } } catch (RuntimeException t) { numExceptionRecords.mark(); morphlineContext.getExceptionHandler().handleException(t, record); } catch (EventDeliveryException e1) { numExceptionRecords.mark(); morphlineContext.getExceptionHandler().handleException(e1, record); } } finally { timerContext.stop(); } } @Override public void beginTransaction() { Notifications.notifyBeginTransaction(morphline); } @Override public void commitTransaction() { Notifications.notifyCommitTransaction(morphline); } @Override public void rollbackTransaction() { Notifications.notifyRollbackTransaction(morphline); } @Override public void stop() { Notifications.notifyShutdown(morphline); } <span style="color:#ff0000;">public static final class CollectorB implements Command { private final List<Record> results = new ArrayList(); public List<Record> getRecords() { return results; } public void reset() { results.clear(); } @Override public Command getParent() { return null; } @Override public void notify(Record notification) { } @Override public boolean process(Record record) { Preconditions.checkNotNull(record); results.add(record); return true; } }</span> }以上是费了几天的力气才完成的功能。发出来的目的:
1、知识的记录和备忘。
2、对正在做这部分的同行有点帮助。
3、请大家指正,是否有其他更好的办法,或者这种实现方式是否存在其他隐患。
4、与大家交流,希望能提高自己。
相关文章推荐
- 重写Flume-NG-morphline-avro-sink
- cloudera search1.0.0环境搭建(2):利用flume-ng的MorphlineSolrSink实现近实时(NRT)搜索
- Flume编译报错: Failed to execute goal on project flume-ng-morphline-solr-sink: Could not resolve depende
- cloudera search1.0.0环境搭建(2):利用flume-ng的MorphlineSolrSink实现近实时(NRT)搜索
- Flume-NG源码阅读之AvroSink
- 【Java】【Flume】Flume-NG源码阅读之AvroSink
- 自定义的flume-ng的postgresql数据库sink
- flume-ng 自定义sink 实现rollfile 变量目录
- Flume NG flume-hdfs-sink 源代码分析
- 【Flume】Rpc sink XX closing Rpc client:NettyAvroRpcClient {xx} …… Failed to send events 问题解决
- Flume-ng HDFS sink原理解析
- Flume NG 学习笔记(十) Transaction、Sink、Source和Channel开发
- 【Flume】Rpc sink XX closing Rpc client:NettyAvroRpcClient {xx} …… Failed to send events 问题解决
- Flume-ng的HdfsSink出现Lease mismatch错误
- Flume-NG源码阅读之SinkGroups和SinkRunner
- Flume-NG源码阅读之SinkGroups和SinkRunner
- Flume-ng生产环境实践(三)实现文件sink,按照固定格式目录输出
- Flume-ng HDFS Sink “丢数据”
- Flume-ng:multi sink one channel两种配置方式的对比
- Flume-ng 1.4 运行抛出“line 81: syntax error in conditional expression: unexpected token `('”