您的位置:首页 > 编程语言 > Java开发

kafka+storm java代码

2015-09-28 13:52 405 查看
一、flume和kafka整合请参照 http://blog.csdn.net/huguoping830623/article/details/48138319

二、示例

[html] view
plaincopy

package com.hgp.kafka.kafka;  

  

import java.util.Arrays;  

import java.util.HashMap;  

import java.util.Iterator;  

import java.util.Map;  

import java.util.Map.Entry;  

import java.util.concurrent.atomic.AtomicInteger;  

  

import org.apache.commons.logging.Log;  

import org.apache.commons.logging.LogFactory;  

  

import storm.kafka.BrokerHosts;  

import storm.kafka.KafkaSpout;  

import storm.kafka.SpoutConfig;  

import storm.kafka.StringScheme;  

import storm.kafka.ZkHosts;  

import backtype.storm.Config;  

import backtype.storm.LocalCluster;  

import backtype.storm.StormSubmitter;  

import backtype.storm.generated.AlreadyAliveException;  

import backtype.storm.generated.InvalidTopologyException;  

import backtype.storm.spout.SchemeAsMultiScheme;  

import backtype.storm.task.OutputCollector;  

import backtype.storm.task.TopologyContext;  

import backtype.storm.topology.OutputFieldsDeclarer;  

import backtype.storm.topology.TopologyBuilder;  

import backtype.storm.topology.base.BaseRichBolt;  

import backtype.storm.tuple.Fields;  

import backtype.storm.tuple.Tuple;  

import backtype.storm.tuple.Values;  

  

public class MyKafkaTopology {  

  

  public static class KafkaWordSplitter extends BaseRichBolt {  

  

    private static final Log LOG = LogFactory.getLog(KafkaWordSplitter.class);  

    private static final long serialVersionUID = 886149197481637894L;  

    private OutputCollector collector;  

        

    

    public void prepare(Map stormConf, TopologyContext context,  

        OutputCollector collector) {  

      this.collector = collector;             

    }  

  

    

    public void execute(Tuple input) {  

      String line = input.getString(0);  

      LOG.info("RECV[kafka -> splitter] " + line);  

      String[] words = line.split("\\s+");  

      for(String word : words) {  

        LOG.info("EMIT[splitter -> counter] " + word);  

        collector.emit(input, new Values(word, 1));  

      }  

      collector.ack(input);  

    }  

  

    

    public void declareOutputFields(OutputFieldsDeclarer declarer) {  

      declarer.declare(new Fields("word", "count"));          

    }  

        

  }  

      

  public static class WordCounter extends BaseRichBolt {  

  

    private static final Log LOG = LogFactory.getLog(WordCounter.class);  

    private static final long serialVersionUID = 886149197481637894L;  

    private OutputCollector collector;  

    private Map<String, AtomicInteger> counterMap;  

        

    

    public void prepare(Map stormConf, TopologyContext context,  

        OutputCollector collector) {  

      this.collector = collector;      

      this.counterMap = new HashMap<String, AtomicInteger>();  

    }  

  

    

    public void execute(Tuple input) {  

      String word = input.getString(0);  

      int count = input.getInteger(1);  

      LOG.info("RECV[splitter -> counter] " + word + " : " + count);  

      AtomicInteger ai = this.counterMap.get(word);  

      if(ai == null) {  

        ai = new AtomicInteger();  

        this.counterMap.put(word, ai);  

      }  

      ai.addAndGet(count);  

      collector.ack(input);  

      LOG.info("CHECK statistics map: " + this.counterMap);  

    }  

  

    

    public void cleanup() {  

      LOG.info("The final result:");  

      Iterator<Entry<String, AtomicInteger>> iter = this.counterMap.entrySet().iterator();  

      while(iter.hasNext()) {  

        Entry<String, AtomicInteger> entry = iter.next();  

        LOG.info(entry.getKey() + "\t:\t" + entry.getValue().get());  

      }  

          

    }  

  

    

    public void declareOutputFields(OutputFieldsDeclarer declarer) {  

      declarer.declare(new Fields("word", "count"));          

    }  

  }  

      

  public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, InterruptedException {  

    String zks = "localhost:2181";  

    String topic = "test";  

    String zkRoot = "/storm"; // default zookeeper root configuration for storm  

    String id = "word";  

        

    BrokerHosts brokerHosts = new ZkHosts(zks);  

    SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);  

    spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());  

    spoutConf.forceFromStart = true;  

    spoutConf.zkServers = Arrays.asList(new String[] {"localhost"});  

    spoutConf.zkPort = 2181;  

        

    TopologyBuilder builder = new TopologyBuilder();  

    builder.setSpout("kafka-reader", new KafkaSpout(spoutConf), 5); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5  

    builder.setBolt("word-splitter", new KafkaWordSplitter(), 2).shuffleGrouping("kafka-reader");  

    builder.setBolt("word-counter", new WordCounter()).fieldsGrouping("word-splitter", new Fields("word"));  

        

    Config conf = new Config();  

        

    String name = MyKafkaTopology.class.getSimpleName();  

    if (args != null && args.length > 0) {  

      // Nimbus host name passed from command line  

      conf.put(Config.NIMBUS_HOST, args[0]);  

      conf.setNumWorkers(3);  

      StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology());  

    } else {  

      conf.setMaxTaskParallelism(3);  

      LocalCluster cluster = new LocalCluster();  

      cluster.submitTopology(name, conf, builder.createTopology());  

      Thread.sleep(60000);  

      cluster.shutdown();  

    }  

  }  

}  

pom.xml代码

[html] view
plaincopy

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  

  <modelVersion>4.0.0</modelVersion>  

  

  <groupId>com.hgp.kafka</groupId>  

  <artifactId>kafka</artifactId>  

  <version>0.0.1-SNAPSHOT</version>  

  <packaging>jar</packaging>  

  

  <name>kafka</name>  

  <url>http://maven.apache.org</url>  

  

  <properties>  

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>  

  </properties>  

  

  <dependencies>  

<dependency>             

 <groupId>org.apache.storm</groupId>               

 <artifactId>storm-core</artifactId>               

 <version>0.9.2-incubating</version>                

<scope>provided</scope>          

 </dependency>           

<dependency>           

<groupId>org.apache.kafka</groupId>          

 <artifactId>kafka_2.10</artifactId>           

<version>0.8.2.1</version>           

<exclusions>              

 <exclusion>                  

 <groupId>org.apache.zookeeper</groupId>                   

<artifactId>zookeeper</artifactId>               

</exclusion>              

 <exclusion>                  

 <groupId>log4j</groupId>                   

<artifactId>log4j</artifactId>              

 </exclusion>          

 </exclusions>      

 </dependency>                   

<dependency>               

<groupId>org.apache.storm</groupId>              

<artifactId>storm-kafka</artifactId>              

 <version>0.9.2-incubating</version>         

</dependency>  

      

  </dependencies>  

</project>  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: