您的位置:首页 > 其它

Kafka对接flume实现(一)

2016-03-24 15:18 302 查看
方法一:继承AbstractSink 类,实现Configurable 接口,复写process(),start(),stop(),configure(Context context) 方法

/**

* Created by rong 2015/12/20.

*/

public class KafkaSink extends AbstractSink implements Configurable {

private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class);
public static final String KEY_HDR = "key";
public static final String TOPIC_HDR = "topic";
private Properties kafkaProps;
private Producer<String, byte[]> producer;
private String topic;
private int batchSize;
private List<KeyedMessage<String, byte[]>> messageList;
private KafkaSinkCounter counter;
/**
* 往kafka发送消息的最大值, 默认为10M
*/
private int messageMaxBytes;

@Override
public Status process() throws EventDeliveryException {
Status result = Status.READY;
Channel channel = getChannel();
Transaction transaction = null;
Event event = null;
String eventTopic = null;
String eventKey = null;

try {
long processedEvents = 0;

transaction = channel.getTransaction();
transaction.begin();

messageList.clear();
processedEvents:
for (; processedEvents < batchSize; processedEvents += 1) {
event = channel.take();

if (event == null) {
// no events available in channel
if (processedEvents == 0) {
result = Status.BACKOFF;
counter.incrementBatchEmptyCount();
} else {
counter.incrementBatchUnderflowCount();
}
break;
}

byte[] eventBody = event.getBody();
if (eventBody.length >= messageMaxBytes) {
logger.error("message size so big: messageMaxBytes:" + messageMaxBytes
+ ", eventBody:" + eventBody.length + ", {Event} " + eventTopic + " : " + eventKey + " : "
+ new String(eventBody, "UTF-8").substring(0, 100000));
continue processedEvents;
}
Map<String, String> headers = event.getHeaders();

if ((eventTopic = headers.get(TOPIC_HDR)) == null) {
eventTopic = topic;
}

eventKey = headers.get(KEY_HDR);

if (logger.isDebugEnabled()) {
logger.debug("{Event} " + eventTopic + " : " + eventKey + " : "
+ new String(eventBody, "UTF-8"));
logger.debug("event #{}", processedEvents);
}

// create a message and add to buffer
KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]>
(eventTopic, eventKey, eventBody);
messageList.add(data);

}

// publish batch and commit.
if (processedEvents > 0) {
long startTime = System.nanoTime();
producer.send(messageList);
long endTime = System.nanoTime();
counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000));
counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size()));
}

transaction.commit();

} catch (Exception ex) {
String errorMsg = "Failed to publish events";
logger.error("Failed to publish events", ex);
result = Status.BACKOFF;
if (transaction != null) {
try {
transaction.rollback();
counter.incrementRollbackCount();
} catch (Exception e) {
logger.error("Transaction rollback failed", e);
throw Throwables.propagate(e);
}
}
throw new EventDeliveryException(errorMsg, ex);
} finally {
if (transaction != null) {
transaction.close();
}
}

return result;
}

@Override
public synchronized void start() {
// instantiate the producer
ProducerConfig config = new ProducerConfig(kafkaProps);
producer = new Producer<String, byte[]>(config);
counter.start();
super.start();
}

@Override
public synchronized void stop() {
producer.close();
counter.stop();
logger.info("Kafka Sink {} stopped. Metrics: {}", getName(), counter);
super.stop();
}

/**
* We configure the sink and generate properties for the Kafka Producer
* <p/>
* Kafka producer properties is generated as follows:
* 1. We generate a properties object with some static defaults that
* can be overridden by Sink configuration
* 2. We add the configuration users added for Kafka (parameters starting
* with .kafka. and must be valid Kafka Producer properties
* 3. We add the sink's documented parameters which can override other
* properties
*
* @param context
*/
@Override
public void configure(Context context) {

batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE,
KafkaSinkConstants.DEFAULT_BATCH_SIZE);
messageList =
new ArrayList<KeyedMessage<String, byte[]>>(batchSize);
logger.debug("Using batch size: {}", batchSize);

topic = context.getString(KafkaSinkConstants.TOPIC,
KafkaSinkConstants.DEFAULT_TOPIC);
if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) {
logger.warn("The Property 'topic' is not set. " +
"Using the default topic name: " +
KafkaSinkConstants.DEFAULT_TOPIC);
} else {
logger.info("Using the static topic: " + topic +
" this may be over-ridden by event headers");
}

kafkaProps = KafkaSinkUtil.getKafkaProperties(context);

if (logger.isDebugEnabled()) {
logger.debug("Kafka producer properties: " + kafkaProps);
}

if (counter == null) {
counter = new KafkaSinkCounter(getName());
}
//往kafka发送消息的最大值, 默认为10M
messageMaxBytes = context.getInteger("messageMaxBytes",
10485760);
}


}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  kafka flume kafka+flum