Kafka对接flume实现(一)
2016-03-24 15:18
302 查看
方法一:继承AbstractSink 类,实现Configurable 接口,复写process(),start(),stop(),configure(Context context) 方法
/**
* Created by rong 2015/12/20.
*/
public class KafkaSink extends AbstractSink implements Configurable {
}
/**
* Created by rong 2015/12/20.
*/
public class KafkaSink extends AbstractSink implements Configurable {
private static final Logger logger = LoggerFactory.getLogger(KafkaSink.class); public static final String KEY_HDR = "key"; public static final String TOPIC_HDR = "topic"; private Properties kafkaProps; private Producer<String, byte[]> producer; private String topic; private int batchSize; private List<KeyedMessage<String, byte[]>> messageList; private KafkaSinkCounter counter; /** * 往kafka发送消息的最大值, 默认为10M */ private int messageMaxBytes; @Override public Status process() throws EventDeliveryException { Status result = Status.READY; Channel channel = getChannel(); Transaction transaction = null; Event event = null; String eventTopic = null; String eventKey = null; try { long processedEvents = 0; transaction = channel.getTransaction(); transaction.begin(); messageList.clear(); processedEvents: for (; processedEvents < batchSize; processedEvents += 1) { event = channel.take(); if (event == null) { // no events available in channel if (processedEvents == 0) { result = Status.BACKOFF; counter.incrementBatchEmptyCount(); } else { counter.incrementBatchUnderflowCount(); } break; } byte[] eventBody = event.getBody(); if (eventBody.length >= messageMaxBytes) { logger.error("message size so big: messageMaxBytes:" + messageMaxBytes + ", eventBody:" + eventBody.length + ", {Event} " + eventTopic + " : " + eventKey + " : " + new String(eventBody, "UTF-8").substring(0, 100000)); continue processedEvents; } Map<String, String> headers = event.getHeaders(); if ((eventTopic = headers.get(TOPIC_HDR)) == null) { eventTopic = topic; } eventKey = headers.get(KEY_HDR); if (logger.isDebugEnabled()) { logger.debug("{Event} " + eventTopic + " : " + eventKey + " : " + new String(eventBody, "UTF-8")); logger.debug("event #{}", processedEvents); } // create a message and add to buffer KeyedMessage<String, byte[]> data = new KeyedMessage<String, byte[]> (eventTopic, eventKey, eventBody); messageList.add(data); } // publish batch and commit. if (processedEvents > 0) { long startTime = System.nanoTime(); producer.send(messageList); long endTime = System.nanoTime(); counter.addToKafkaEventSendTimer((endTime - startTime) / (1000 * 1000)); counter.addToEventDrainSuccessCount(Long.valueOf(messageList.size())); } transaction.commit(); } catch (Exception ex) { String errorMsg = "Failed to publish events"; logger.error("Failed to publish events", ex); result = Status.BACKOFF; if (transaction != null) { try { transaction.rollback(); counter.incrementRollbackCount(); } catch (Exception e) { logger.error("Transaction rollback failed", e); throw Throwables.propagate(e); } } throw new EventDeliveryException(errorMsg, ex); } finally { if (transaction != null) { transaction.close(); } } return result; } @Override public synchronized void start() { // instantiate the producer ProducerConfig config = new ProducerConfig(kafkaProps); producer = new Producer<String, byte[]>(config); counter.start(); super.start(); } @Override public synchronized void stop() { producer.close(); counter.stop(); logger.info("Kafka Sink {} stopped. Metrics: {}", getName(), counter); super.stop(); } /** * We configure the sink and generate properties for the Kafka Producer * <p/> * Kafka producer properties is generated as follows: * 1. We generate a properties object with some static defaults that * can be overridden by Sink configuration * 2. We add the configuration users added for Kafka (parameters starting * with .kafka. and must be valid Kafka Producer properties * 3. We add the sink's documented parameters which can override other * properties * * @param context */ @Override public void configure(Context context) { batchSize = context.getInteger(KafkaSinkConstants.BATCH_SIZE, KafkaSinkConstants.DEFAULT_BATCH_SIZE); messageList = new ArrayList<KeyedMessage<String, byte[]>>(batchSize); logger.debug("Using batch size: {}", batchSize); topic = context.getString(KafkaSinkConstants.TOPIC, KafkaSinkConstants.DEFAULT_TOPIC); if (topic.equals(KafkaSinkConstants.DEFAULT_TOPIC)) { logger.warn("The Property 'topic' is not set. " + "Using the default topic name: " + KafkaSinkConstants.DEFAULT_TOPIC); } else { logger.info("Using the static topic: " + topic + " this may be over-ridden by event headers"); } kafkaProps = KafkaSinkUtil.getKafkaProperties(context); if (logger.isDebugEnabled()) { logger.debug("Kafka producer properties: " + kafkaProps); } if (counter == null) { counter = new KafkaSinkCounter(getName()); } //往kafka发送消息的最大值, 默认为10M messageMaxBytes = context.getInteger("messageMaxBytes", 10485760); }
}
相关文章推荐
- Kafka 之 中级
- Flume环境部署和配置详解及案例大全
- Linux下Kafka单机安装配置方法(图文)
- Kafka使用入门教程第1/2页
- Play! Akka Flume实现的完整数据收集
- log4j + flume 1.6 集成
- flume自定义Interceptor
- 使用Flume聚合Tomcat 日志
- #Note# Analyzing Twitter Data with Apache Hadoo...
- Logstash 与Elasticsearch整合使用示例
- Kafka+Log4j实现日志集中管理
- Kafka深度解析
- Kafka设计解析(三)- Kafka High Availability (下)
- kafka+storm初探
- storm集群 + kafka单机性能测试
- flume、kafka、storm常用命令
- kafka 一些基本知识
- 开源日志系统比较
- Kafka-manager部署与测试(完整) 推荐