您的位置:首页 > 其它

如何编写Flume-ng-morphline-avro-sink

2014-09-25 15:28 495 查看
以下内容都是自己瞎琢磨,并实验通过了。不知是否还有其他更好的方法,请各位大侠指正。本人研究大数据不到两个月时间。

工作需要,在预研与大数据,主要是hadoop相关组件和子项目的一些技术。

预研的产品平台主要包含hadoop、flume、solr、spark。目前重点关注flume和solr部分。

即:从flume采集回日志进行分词传给solr创建索引,或第三方平台发送的已经结构化的数据直接创建索引。

平台框架类似下图(自己用visio画的简单示意图,仅供参考。图中省略了flume channel,数据处理层与数据分发层中间缺少了一个source和channel):

数据源:部署flume agent和第三方agent。flume agent主要用于采集日志。第三方agent用户采集除日志之外的其他信息。

采集层:部署flume和第三方采集平台,flume source用于接收从flume agent采集回来的日志信息。3rd platform用于接收从3rd agent采集回来的其他数据。3rd platform将数据进行结构化传递给flume source。

数据处理层:判断数据类型(结构化、非结构化),对非结构化数据使用morphline进行分词。采用morphline-avro-sink发出。

数据分发层:接收morphline-avro-sink数据,使用不同的sink将数据分发给不同的业务进行处理,其中重要的一条路径是分发给solr创建数据索引。

业务处理层:只介绍solr创建索引。

本文重点介绍morphline-avro-sink的编写过程。以及周边所需要的一些功能。

主要有以下几点是需要重点关注的:

1、参照flume源码中的flume-ng-morphline-solr-sink代码。

2、由于该sink最后是需要将数据以avro格式发出,所以MorphlineSink要继承AbstractRpcSink。因为Flume-ng的AvroSink就是继承的这个类。

3、因为AbstractRpcSink对外提供了两个接口用于数据处理:RpcClient.append(Event)和RpcClient.appendBatch(List<Event>)。所以,要在MorphlineSink中做好RpcClient的初始化。

4、MorphlineHandlerImpl中要在上下文中初始化好一个finalChild的Command。这个command默认是morphline中所有命令的最后一个,来接收之前命令的处理结果。

以下是MorphlineSink的代码,其中与flume-ng-morphline-solr-sink不同之处用红色字体标识。

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.sink.avro.morphline;

import java.util.Properties;
import java.util.Map.Entry;

import org.apache.flume.Channel;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientConfigurationConstants;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractRpcSink;
import org.kitesdk.morphline.api.Command;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Flume sink that extracts search documents from Flume events and processes them using a morphline
* {@link Command} chain.
*/
public class MorphlineSink extends AbstractRpcSink implements Configurable {

private RpcClient client;
private Properties clientProps;

private int maxBatchSize = 1000;
private long maxBatchDurationMillis = 1000;
private String handlerClass;
private MorphlineHandler handler;
private Context context;
private SinkCounter sinkCounter;

public static final String BATCH_SIZE = "batchSize";
public static final String BATCH_DURATION_MILLIS = "batchDurationMillis";
public static final String HANDLER_CLASS = "handlerClass";

private static final Logger LOGGER = LoggerFactory.getLogger(MorphlineSink.class);

public MorphlineSink() {
this(null);
}

/** For testing only */
protected MorphlineSink(MorphlineHandler handler) {
this.handler = handler;
}

@Override
public void configure(Context context) {
this.context = context;
maxBatchSize = context.getInteger(BATCH_SIZE, maxBatchSize);
maxBatchDurationMillis = context.getLong(BATCH_DURATION_MILLIS, maxBatchDurationMillis);
handlerClass = context.getString(HANDLER_CLASS, MorphlineHandlerImpl.class.getName());
if (sinkCounter == null) {
LOGGER.info("sinkCount is null");
sinkCounter = new SinkCounter(getName());
}
/*LOGGER.info("sinkCount is " + sinkCounter.toString());*/

clientProps = new Properties();

clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS, "h1");
clientProps.setProperty(RpcClientConfigurationConstants.CONFIG_HOSTS_PREFIX +
"h1", context.getString("hostname") + ":" + context.getInteger("port"));

for (Entry<String, String> entry: context.getParameters().entrySet()) {
clientProps.setProperty(entry.getKey(), entry.getValue());
}

<span style="color:#ff0000;">client = initializeRpcClient(clientProps);</span>
if (handler == null) {
MorphlineHandler tmpHandler;
try {
tmpHandler = (MorphlineHandler) Class.forName(handlerClass).newInstance();
} catch (Exception e) {
throw new ConfigurationException(e);
}
tmpHandler.configure(context);
handler = tmpHandler;
}
super.configure(context);
}

/**
* Returns the maximum number of events to take per flume transaction;
* override to customize
*/
private int getMaxBatchSize() {
return maxBatchSize;
}

/** Returns the maximum duration per flume transaction; override to customize */
private long getMaxBatchDurationMillis() {
return maxBatchDurationMillis;
}

/*@Override
public synchronized void start() {
LOGGER.info("Starting Morphline Sink {} ...", this);
sinkCounter.start();
if (handler == null) {
MorphlineHandler tmpHandler;
try {
tmpHandler = (MorphlineHandler) Class.forName(handlerClass).newInstance();
} catch (Exception e) {
throw new ConfigurationException(e);
}
tmpHandler.configure(context);
handler = tmpHandler;
}
super.start();
LOGGER.info("Morphline Sink {} started.", getName());
}

@Override
public synchronized void stop() {
LOGGER.info("Morphline Sink {} stopping...", getName());
try {
if (handler != null) {
handler.stop();
}
sinkCounter.stop();
LOGGER.info("Morphline Sink {} stopped. Metrics: {}, {}", getName(), sinkCounter);
} finally {
super.stop();
}
}*/

@Override
public Status process() throws EventDeliveryException {
int batchSize = getMaxBatchSize();
long batchEndTime = System.currentTimeMillis() + getMaxBatchDurationMillis();
Channel myChannel = getChannel();
Transaction txn = myChannel.getTransaction();
txn.begin();
boolean isMorphlineTransactionCommitted = true;
try {
int numEventsTaken = 0;
handler.beginTransaction();
isMorphlineTransactionCommitted = false;
//      List<Event> events = Lists.newLinkedList();
// repeatedly take and process events from the Flume queue
for (int i = 0; i < batchSize; i++) {
Event event = myChannel.take();
if (event == null) {
break;
}
sinkCounter.incrementEventDrainAttemptCount();
numEventsTaken++;
//        LOGGER.info("Flume event: {}", event);
//StreamEvent streamEvent = createStreamEvent(event);
<span style="color:#ff0000;">handler.process(event, client);</span>
//        events.add(event);
if (System.currentTimeMillis() >= batchEndTime) {
break;
}
}
//      handler.process(events, client);

// update metrics
if (numEventsTaken == 0) {
sinkCounter.incrementBatchEmptyCount();
}
if (numEventsTaken < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
handler.commitTransaction();
isMorphlineTransactionCommitted = true;
txn.commit();
sinkCounter.addToEventDrainSuccessCount(numEventsTaken);
return numEventsTaken == 0 ? Status.BACKOFF : Status.READY;
} catch (Throwable t) {
// Ooops - need to rollback and back off
LOGGER.error("Morphline Sink " + getName() + ": Unable to process event from channel " + myChannel.getName()
+ ". Exception follows.", t);
try {
if (!isMorphlineTransactionCommitted) {
handler.rollbackTransaction();
}
} catch (Throwable t2) {
LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback morphline transaction. " +
"Exception follows.", t2);
} finally {
try {
txn.rollback();
} catch (Throwable t4) {
LOGGER.error("Morphline Sink " + getName() + ": Unable to rollback Flume transaction. " +
"Exception follows.", t4);
}
}

if (t instanceof Error) {
throw (Error) t; // rethrow original exception
} else if (t instanceof ChannelException) {
return Status.BACKOFF;
} else {
throw new EventDeliveryException("Failed to send events", t); // rethrow and backoff
}
} finally {
txn.close();
}
}

@Override
public String toString() {
int i = getClass().getName().lastIndexOf('.') + 1;
String shortClassName = getClass().getName().substring(i);
return getName() + " (" + shortClassName + ")";
}

@Override
protected RpcClient initializeRpcClient(Properties props) {
LOGGER.info("Attempting to create Avro Rpc client.");
return RpcClientFactory.getInstance(props);
}

}


/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0 *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flume.sink.avro.morphline;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.api.RpcClient;
import org.apache.flume.event.EventBuilder;
import org.kitesdk.morphline.api.Command;
import org.kitesdk.morphline.api.MorphlineCompilationException;
import org.kitesdk.morphline.api.MorphlineContext;
import org.kitesdk.morphline.api.Record;
import org.kitesdk.morphline.base.Compiler;
import org.kitesdk.morphline.base.FaultTolerance;
import org.kitesdk.morphline.base.Fields;
import org.kitesdk.morphline.base.Metrics;
import org.kitesdk.morphline.base.Notifications;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.SharedMetricRegistries;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
import com.google.common.collect.ListMultimap;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

/**
* A {@link MorphlineHandler} that processes it's events using a morphline
* {@link Command} chain.
*/
public class MorphlineHandlerImpl implements MorphlineHandler {

private MorphlineContext morphlineContext;
private Command morphline;
private Command finalChild;
private String morphlineFileAndId;

private Timer mappingTimer;
private Meter numRecords;
private Meter numFailedRecords;
private Meter numExceptionRecords;

public static final String MORPHLINE_FILE_PARAM = "morphlineFile";
public static final String MORPHLINE_ID_PARAM = "morphlineId";

/**
* Morphline variables can be passed from flume.conf to the morphline, e.g.:
* agent.sinks.solrSink.morphlineVariable.zkHost=127.0.0.1:2181/solr
*/
public static final String MORPHLINE_VARIABLE_PARAM = "morphlineVariable";

private static final Logger LOG = LoggerFactory
.getLogger(MorphlineHandlerImpl.class);

// For test injection
void setMorphlineContext(MorphlineContext morphlineContext) {
this.morphlineContext = morphlineContext;
}

// for interceptor
void setFinalChild(Command finalChild) {
this.finalChild = finalChild;
}

@Override
public void configure(Context context) {
String morphlineFile = context.getString(MORPHLINE_FILE_PARAM);
String morphlineId = context.getString(MORPHLINE_ID_PARAM);
if (morphlineFile == null || morphlineFile.trim().length() == 0) {
throw new MorphlineCompilationException("Missing parameter: "
+ MORPHLINE_FILE_PARAM, null);
}
morphlineFileAndId = morphlineFile + "@" + morphlineId;

if (morphlineContext == null) {
FaultTolerance faultTolerance = new FaultTolerance(
context.getBoolean(FaultTolerance.IS_PRODUCTION_MODE, false),
context.getBoolean(
FaultTolerance.IS_IGNORING_RECOVERABLE_EXCEPTIONS,
false),
context.getString(FaultTolerance.RECOVERABLE_EXCEPTION_CLASSES));

morphlineContext = new MorphlineContext.Builder()
.setExceptionHandler(faultTolerance)
.setMetricRegistry(
SharedMetricRegistries
.getOrCreate(morphlineFileAndId)).build();
}

Config override = ConfigFactory.parseMap(context
.getSubProperties(MORPHLINE_VARIABLE_PARAM + "."));
<span style="color:#ff0000;">finalChild = new CollectorB();</span>
morphline = new Compiler().compile(new File(morphlineFile),
morphlineId, morphlineContext, finalChild, override);

this.mappingTimer = morphlineContext.getMetricRegistry().timer(
MetricRegistry.name("morphline.app", Metrics.ELAPSED_TIME));
this.numRecords = morphlineContext.getMetricRegistry().meter(
MetricRegistry.name("morphline.app", Metrics.NUM_RECORDS));
this.numFailedRecords = morphlineContext.getMetricRegistry().meter(
MetricRegistry.name("morphline.app", "numFailedRecords"));
this.numExceptionRecords = morphlineContext.getMetricRegistry().meter(
MetricRegistry.name("morphline.app", "numExceptionRecords"));
}

@Override
public void process(Event event, RpcClient client) {
//		 LOG.info("entry into MorphlineHandlerImpl  process"   + event);
numRecords.mark();
Timer.Context timerContext = mappingTimer.time();
try {
Record record = new Record();
for (Entry<String, String> entry : event.getHeaders().entrySet()) {
record.put(entry.getKey(), entry.getValue());
}
byte[] bytes = event.getBody();
if (bytes != null && bytes.length > 0) {
record.put(Fields.ATTACHMENT_BODY, bytes);
}

try {
Notifications.notifyStartSession(morphline);
if (!morphline.process(record)) {
numFailedRecords.mark();
LOG.warn("Morphline {} failed to process record: {}",
morphlineFileAndId, record);
}
<span style="color:#ff0000;">Map<String, String> headers = null;
List<Record> tmp = ((CollectorB) finalChild).getRecords();
List<Record> records = new ArrayList<Record>();
records.addAll(tmp);
tmp.clear();
//				LOG.info("records     00000---------   " + records.size());
Iterator irt = records.iterator();
while (irt.hasNext()) {
Record r = (Record) irt.next();
headers = new HashMap<String, String>();
ListMultimap<String, Object> lmt = r.getFields();
Map<String, Collection<Object>> m = lmt.asMap();
Iterator it = m.entrySet().iterator();
while (it.hasNext()) {
Entry<String, Object> entry = (Entry<String, Object>) it
.next();
if (entry.getValue() != null) {
List v = (List) entry.getValue();
if (v.get(0) != null) {
headers.put(entry.getKey(), v.get(0).toString());
}
}
}
Event e = EventBuilder.withBody(event.getBody(), headers);
client.append(e);</span>
}
} catch (RuntimeException t) {
numExceptionRecords.mark();
morphlineContext.getExceptionHandler().handleException(t,
record);
} catch (EventDeliveryException e1) {
numExceptionRecords.mark();
morphlineContext.getExceptionHandler().handleException(e1,
record);
}
} finally {
timerContext.stop();
}
}

@Override
public void beginTransaction() {
Notifications.notifyBeginTransaction(morphline);
}

@Override
public void commitTransaction() {
Notifications.notifyCommitTransaction(morphline);
}

@Override
public void rollbackTransaction() {
Notifications.notifyRollbackTransaction(morphline);
}

@Override
public void stop() {
Notifications.notifyShutdown(morphline);
}

<span style="color:#ff0000;">public static final class CollectorB implements Command {

private final List<Record> results = new ArrayList();

public List<Record> getRecords() {
return results;
}

public void reset() {
results.clear();
}

@Override
public Command getParent() {
return null;
}

@Override
public void notify(Record notification) {
}

@Override
public boolean process(Record record) {
Preconditions.checkNotNull(record);
results.add(record);

return true;
}

}</span>

}
以上是费了几天的力气才完成的功能。发出来的目的:

1、知识的记录和备忘。

2、对正在做这部分的同行有点帮助。

3、请大家指正,是否有其他更好的办法,或者这种实现方式是否存在其他隐患。

4、与大家交流,希望能提高自己。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: