您的位置:首页 > 其它

Flume、Kafka、Storm结合

2015-11-27 16:32 211 查看
Todo:

对Flume的sink进行重构,调用kafka的消费生产者(producer)发送消息;

在Sotrm的spout中继承IRichSpout接口,调用kafka的消息消费者(Consumer)来接收消息,然后经过几个自定义的Bolt,将自定义的内容进行输出

Flume -- Kafka

编写KafkaSink

从$KAFKA_HOME/lib下复制

kafka_2.10-0.8.2.1.jar

kafka-clients-0.8.2.1.jar

scala-library-2.10.4.jar

到$FLUME_HOME/lib

在Eclipse新建工程,从$FLUME_HOME/lib下导入

commons-logging-1.1.1.jar

flume-ng-configuration-1.6.0.jar

flume-ng-core-1.6.0.jar

flume-ng-sdk-1.6.0.jar

zkclient-0.3.jar

kafka_2.10-0.8.2.1.jar

kafka-clients-0.8.2.1.jar

scala-library-2.10.4.jar

到工程。

新建文件KafkaSink.java

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;

public class KafkaSink extends AbstractSink implements Configurable {
private static final Log logger = LogFactory.getLog(KafkaSink.class);

private String topic;
private Producer<String, String> producer;

public void configure(Context context) {
topic = "flume_test";
Properties props = new Properties();
props.setProperty("metadata.broker.list", "localhost:9092");
props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
props.put("zookeeper.connect", "localhost:2181");
props.setProperty("num.partitions", "4"); //
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
producer = new Producer<String, String>(config);
logger.info("KafkaSink初始化完成.");

}

public Status process() throws EventDeliveryException {
Channel channel = getChannel();
Transaction tx = channel.getTransaction();
try {
tx.begin();
Event e = channel.take();
if (e == null) {
tx.rollback();
return Status.BACKOFF;
}
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, new String(e.getBody()));
producer.send(data);
logger.info("flume向kafka发送消息:" + new String(e.getBody()));
tx.commit();
return Status.READY;
} catch (Exception e) {
logger.error("Flume KafkaSinkException:", e);
tx.rollback();
return Status.BACKOFF;
} finally {
tx.close();
}
}
}


导出jar包,放到$FLUME_HOME/lib下

(File->Export->Jar File 全部默认参数)

创建kafka.conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = syslogtcp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = KafkaSink

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1


测试

启动kafka

cd ~/app/kafka
./bin/zookeeper-server-start.sh ./config/zookeeper.properties> /dev/null &
./bin/kafka-server-start.sh ./config/server.properties > /dev/null &


创建topic

~/app/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4  --topic flume_test


启动控制台消费者

~/app/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flume_test  --from-beginning


启动flume agent

flume-ng agent -c conf  -f ~/test/kafka.conf --name a1 -Dflume.root.logger=INFO,console


发送消息

echo "hey manhua" |nc localhost 5140
echo "nice shot" |nc localhost 5140


flume和kafka结合的一个工具
https://github.com/kevinjmh/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin/src/main/java/org/apache/flume/plugins

Kafka -- Storm

http://storm.apache.org/index.html

下载-解压-修改/etc/profile

在Eclipse新建maven工程,其中pom.xml文件填入如下:

<?xml version="1.0" encoding="utf-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>manhua</groupId>
<artifactId>kafka-storm-test</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-storm</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<repositories>
<repository>
<id>github-releases</id>
<url>http://oss.sonatype.org/content/repositories/github-releases/</url>
</repository>
<repository>
<id>clojars.org</id>
<url>http://clojars.org/repo</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.10.0</version>
<!-- keep storm out of the jar-with-dependencies -->
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
</dependencies>
</project>


在src/main/java创建两个java文件

KafkaSpouttest.java

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class KafkaSpouttest implements IRichSpout {

private SpoutOutputCollector collector;
private ConsumerConnector consumer;
private String topic;

public KafkaSpouttest() {
}

public KafkaSpouttest(String topic) {
this.topic = topic;
}

public void nextTuple() {
}

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}

public void ack(Object msgId) {
}

public void activate() {

consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig());

Map<String, Integer> topickMap = new HashMap<String, Integer>();
topickMap.put(topic, 1);

System.out.println("*********Results********topic:" + topic);

Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = consumer.createMessageStreams(topickMap);
KafkaStream<byte[], byte[]> stream = streamMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
String value = new String(it.next().message());
SimpleDateFormat formatter = new SimpleDateFormat("yyyy年MM月dd日 HH:mm:ss SSS");
Date curDate = new Date(System.currentTimeMillis());// 获取当前时间
String str = formatter.format(curDate);

System.out.println("storm接收到来自kafka的消息------->" + value);

collector.emit(new Values(value, 1, str), value);
}
}

private static ConsumerConfig createConsumerConfig() {
Properties props = new Properties();
// 设置zookeeper的链接地址
props.put("zookeeper.connect", "localhost:2181");
// 设置group id
props.put("group.id", "1");
// kafka的group 消费记录是保存在zookeeper上的, 但这个信息在zookeeper上不是实时更新的, 需要有个间隔时间更新
props.put("auto.commit.interval.ms", "1000");
props.put("zookeeper.session.timeout.ms", "10000");
return new ConsumerConfig(props);
}

public void close() {
}

public void deactivate() {
}

public void fail(Object msgId) {
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "id", "time"));
}

public Map<String, Object> getComponentConfiguration() {
System.out.println("getComponentConfiguration被调用");
topic = "flume_test";
return null;
}
}


KafkaTopologytest.java

import java.util.HashMap;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class KafkaTopologytest {

public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("spout", new KafkaSpouttest(""), 1);
builder.setBolt("bolt1", new Bolt1(), 2).shuffleGrouping("spout");
builder.setBolt("bolt2", new Bolt2(), 2).fieldsGrouping("bolt1",new Fields("word"));

Map conf = new HashMap();
conf.put(Config.TOPOLOGY_WORKERS, 1);
conf.put(Config.TOPOLOGY_DEBUG, true);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("my-flume-kafka-storm-topology-integration", conf, builder.createTopology());

Utils.sleep(1000*60*5); // local cluster test ...
cluster.shutdown();
}

public static class Bolt1 extends BaseBasicBolt {

public void execute(Tuple input, BasicOutputCollector collector) {
try {
String msg = input.getString(0);
int id = input.getInteger(1);
String time = input.getString(2);
msg = msg+"bolt1";
System.out.println("对消息加工第1次-------[arg0]:"+ msg +"---[arg1]:"+id+"---[arg2]:"+time+"------->"+msg);
if (msg != null) {
collector.emit(new Values(msg));
}
} catch (Exception e) {
e.printStackTrace();
}
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}

public static class Bolt2 extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();

public void execute(Tuple tuple, BasicOutputCollector collector) {
String msg = tuple.getString(0);
msg = msg + "bolt2";
System.out.println("对消息加工第2次---------->"+msg);
collector.emit(new Values(msg,1));
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
}


测试

接着上面Flume-Kafka的测试,保证kafka已经启动,以及创建了对应的topic

# 启动storm之前必须启动zookeeper

# 启动storm
storm nimbus &
storm supervisor &
storm ui &
# 打开浏览器地址http://localhost:8080 看到界面表示启动成功


测试1

启动控制台的生产者和消费者

~/app/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic flume_test

~/app/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic flume_test  --from-beginning


右键工程中KafkaTopologytest.java运行storm程序

现在在运行生产者的控制台输入值,在消费者和Eclipse都会有显示

测试2

从$KAFKA_HOME/lib下复制

kafka_2.10-0.8.2.1.jar

kafka-clients-0.8.2.1.jar

scala-library-2.10.4.jar

metrics-core-2.2.0.jar

zkclient-0.3.jar

zookeeper-3.4.6.jar

到$STORM_HOME/lib

类似上面的方法导出jar包(File->Export->Jar File 全部默认参数),放到任意目录下

使用storm执行jar包

storm jar  kafkaSpout.jar KafkaTopologytest


启动流程:zookeeper - kafka - storm - flume

Ref:http://www.aboutyun.com/thread-8915-1-1.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: