Storm编程之wordcount(kafka--》Jstorm--》redis)
2016-12-06 16:08
1141 查看
本文是笔者这周做的一个小小的尝试。
设计的软件比较多,大家可以一一在本机安装一下,我的电脑是mac pro,基本安装起来和Linux基本一致,比较简单
基本都是下载 解压包 拷贝到安装目录重命名 然后启动服务
需要安装的基本有 JDK1.7,IDEA,kafka,Jstorm,redis都是单机
下面直接给出项目的pom.xml配置信息:
不是storm,如果你用storm的话请用0.9.*版本的,最新的版本我测试过貌似cluster这个类初始化有问题
我们先来看看main函数类的代码:其中主要配置spoutconf和初始化kafkaspout,如果看不懂可以直接看看官网的资料
https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka/README.md
然后我为这个topology定义了两个bolt分别用于split word和wordcount 在wordcount中实现了数据load into redis
好,到这里已经已经贴了全部代码了。
先来看看结果
首先进入到kafka的客户端生产数据:
程序终端显示如下:
下面我们看看redis里面数值:
验证成功,下面来把代码打成jar包,因为这里我们用到了jedis的jar包,我们可以将这个jar先copy到Jstorm Lib目录下,或者直接打包时打到包内,将我们打的jar传到Jstorm Lib目录下,运行
bin/jstorm jar /Users/mac/jstorm-2.1.1/lib/KafkaStormTest.jar KafkaReader Users/mac/input 2
即可,后面的两位是参数,是我测试本地文件夹为spout的时用的,你可以改下代码不用可以去掉。
设计的软件比较多,大家可以一一在本机安装一下,我的电脑是mac pro,基本安装起来和Linux基本一致,比较简单
基本都是下载 解压包 拷贝到安装目录重命名 然后启动服务
需要安装的基本有 JDK1.7,IDEA,kafka,Jstorm,redis都是单机
下面直接给出项目的pom.xml配置信息:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>KafkaStormTest</groupId> <artifactId>zfs_try</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-core</artifactId> <version>0.9.2-incubating</version> <scope>compile</scope> <exclusions> <exclusion> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> </exclusion> <exclusion> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-slf4j-impl</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.storm</groupId> <artifactId>storm-kafka</artifactId> <version>0.9.2-incubating</version> <scope>compile</scope> </dependency> <!--<dependency>--> <!--<groupId>org.apache.kafka</groupId>--> <!--<artifactId>kafka_2.9.2</artifactId>--> <!--<version>0.8.1.1</version>--> <!--<exclusions>--> <!--<exclusion>--> <!--<groupId>org.apache.zookeeper</groupId>--> <!--<artifactId>zookeeper</artifactId>--> <!--</exclusion>--> <!--<exclusion>--> <!--<groupId>log4j</groupId>--> <!--<artifactId>log4j</artifactId>--> <!--</exclusion>--> <!--</exclusions>--> <!--</dependency>--> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.15</version> </dependency> <!--<dependency>--> <!--<groupId>org.apache.storm</groupId>--> <!--<artifactId>storm-core</artifactId>--> <!--<version>1.0.2</version>--> <!--<exclusions>--> <!--<exclusion>--> <!--<groupId>org.apache.logging.log4j</groupId>--> <!--<artifactId>log4j-slf4j-impl</artifactId>--> <!--</exclusion>--> <!--<exclusion>--> <!--<groupId>org.slf4j</groupId>--> <!--<artifactId>log4j-over-slf4j</artifactId>--> <!--</exclusion>--> <!--</exclusions>--> <!--</dependency>--> <!--<dependency>--> <!--<groupId>org.apache.storm</groupId>--> <!--<artifactId>storm-kafka</artifactId>--> <!--<version>1.0.2</version>--> <!--</dependency>--> <dependency> <groupId>commons-logging</groupId> <artifactId>commons-logging</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.10.1.0</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.2</version> </dependency> </dependencies> </project>注意我本地安装的阿里的Jstorm
不是storm,如果你用storm的话请用0.9.*版本的,最新的版本我测试过貌似cluster这个类初始化有问题
LocalCluster cluster = new LocalCluster();
我们先来看看main函数类的代码:其中主要配置spoutconf和初始化kafkaspout,如果看不懂可以直接看看官网的资料
https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka/README.md
然后我为这个topology定义了两个bolt分别用于split word和wordcount 在wordcount中实现了数据load into redis
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.tuple.Fields; import kafka.common.AuthorizationException; import storm.kafka.*; import storm.kafka.bolt.KafkaBolt; import backtype.storm.spout.SchemeAsMultiScheme; import backtype.storm.topology.TopologyBuilder; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * Created by mac on 2016/12/5. */ public class KafkaReader { public static void main(String[] args) { if (args.length != 2) { System.err.println("Usage: inputPaht timeOffset"); System.err.println(" 4000 such as : java -jar WordCount.jar D://input/ 2"); System.exit(2); } String zks = "127.0.0.1:2181";//,h2:2181,h3:2181 String topic = "TEST-TOPIC"; String zkRoot = "/storm"; // default zookeeper root configuration for storm String id = "word"; BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.useStartOffsetTimeIfOffsetOutOfRange=true; spoutConf.forceFromStart= false;//.forceFromStart spoutConf.zkServers = Arrays.asList(new String[] {"127.0.0.1"});//"h1", "h2", "h3" spoutConf.zkPort = 2181; KafkaSpout kafkaSpout=new KafkaSpout(spoutConf); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-reader", kafkaSpout,1); // Kafka我们创建了一个5分区的Topic,这里并行度设置为5 //builder.setSpout("word-reader", new WordReader()); // builder.setBolt("word-spilter", new WordSpliter()).shuffleGrouping("word-reader"); builder.setBolt("word-spilter", new WordSpliter()).shuffleGrouping("kafka-reader"); builder.setBolt("word-counter", new WordCounter2()).shuffleGrouping("word-spilter"); String inputPath = args[0]; String timeOffset = args[1]; Config conf = new Config(); conf.put("INPUT_PATH", inputPath); conf.put("TIME_OFFSET", timeOffset); conf.setDebug(false); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("WordCount", conf, builder.createTopology()); } }下面看下wordsplit类定义:
/** * Created by mac on 2016/12/5. */ import org.apache.commons.lang.StringUtils; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class WordSpliter extends BaseBasicBolt { private static final long serialVersionUID = -5653803832498574866L; public void execute(Tuple input, BasicOutputCollector collector) { String line = input.getString(0); String[] words = line.split(" "); for (String word : words) { word = word.trim(); if (StringUtils.isNotBlank(word)) { word = word.toLowerCase(); collector.emit(new Values(word)); } } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }wordcount类的定义:
/** * Created by mac on 2016/12/5. */ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; public class WordCounter2 extends BaseBasicBolt { private static final long serialVersionUID = 5683648523524179434L; private HashMap<String, Integer> counters = new HashMap<String, Integer>(); private volatile boolean edit = false; @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context) { final long timeOffset = Long.parseLong(stormConf.get("TIME_OFFSET").toString()); new Thread(new Runnable() { public void run() { while (true) { if (edit) { for (Entry<String, Integer> entry : counters.entrySet()) { System.out.println(entry.getKey() + " : " + entry.getValue()); } System.out.println("WordCounter---------------------------------------"); edit = false; } try { Thread.sleep(timeOffset * 1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }).start(); } //load into redis public void execute(Tuple input, BasicOutputCollector collector) { String str = input.getString(0); RedisMethod jedistest = new RedisMethod(); jedistest.setup(); System.out.println("Connection to server sucessfully"); if (!counters.containsKey(str)) { counters.put(str, 1); } else { Integer c = counters.get(str) + 1; counters.put(str, c); } jedistest.jedis.zincrby("storm2redis",1,str); edit = true; System.out.println("WordCounter+++++++++++++++++++++++++++++++++++++++++++"); } public void declareOutputFields(OutputFieldsDeclarer declarer) { // declarer.declare(new Fields("key")); // declarer.declare(new Fields("value")); } }我这里将相关redis的用法放到一个类里面是为了方便以后自己学习使用:
import redis.clients.jedis.Jedis; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; /** * Created by mac on 2016/12/6. */ public class RedisMethod { public static void main(String[] args) { RedisMethod jedistest = new RedisMethod(); jedistest.setup(); System.out.println("Connection to server sucessfully"); //查看服务是否运行 jedistest.testzadd(); // Map<double, String> map = new HashMap<double, String>(); // // map.put(12, "hadoop"); // map.put(11, "hbase"); // map.put(13, "storm"); // jedis.zadd("user",map); } public Jedis jedis; public void setup() { //连接redis服务器,127.0.0.1:6379 jedis = new Jedis("127.0.0.1", 6379); //权限认证 jedis.auth("123456"); System.out.println("Server is running: "+jedis.ping()); } /** * redis存储字符串 */ public void testzadd() { jedis.zadd("zfs1",2,"hadoop"); jedis.zadd("zfs1",1,"storm"); jedis.zadd("zfs1",3,"hive"); Map<String,Double> map = new HashMap<String,Double>(); map.put("hadoop",12.0); map.put("hbase",11.0); map.put("storm",13.0); map.put("hadoop1",12.0); map.put("hbase1",11.0); map.put("storm1",13.0); jedis.zadd("mapdf",map); } public void testString() { //-----添加数据---------- jedis.set("name","xinxin");//向key-->name中放入了value-->xinxin System.out.println(jedis.get("name"));//执行结果:xinxin jedis.append("name", " is my lover"); //拼接 System.out.println(jedis.get("name")); jedis.del("name"); //删除某个键 System.out.println(jedis.get("name")); //设置多个键值对 jedis.mset("name","liuling","age","23","qq","476777XXX"); jedis.incr("age"); //进行加1操作 System.out.println(jedis.get("name") + "-" + jedis.get("age") + "-" + jedis.get("qq")); } /** * redis操作Map */ public void testMap() { //-----添加数据---------- Map<String, String> map = new HashMap<String, String>(); map.put("name", "xinxin"); map.put("age", "22"); map.put("qq", "123456"); jedis.hmset("user",map); //取出user中的name,执行结果:[minxr]-->注意结果是一个泛型的List //第一个参数是存入redis中map对象的key,后面跟的是放入map中的对象的key,后面的key可以跟多个,是可变参数 List<String> rsmap = jedis.hmget("user", "name", "age", "qq"); System.out.println(rsmap); //删除map中的某个键值 jedis.hdel("user","age"); System.out.println(jedis.hmget("user", "age")); //因为删除了,所以返回的是null System.out.println(jedis.hlen("user")); //返回key为user的键中存放的值的个数2 System.out.println(jedis.exists("user"));//是否存在key为user的记录 返回true System.out.println(jedis.hkeys("user"));//返回map对象中的所有key System.out.println(jedis.hvals("user"));//返回map对象中的所有value Iterator<String> iter=jedis.hkeys("user").iterator(); while (iter.hasNext()){ String key = iter.next(); System.out.println(key+":"+jedis.hmget("user",key)); } jedis.hdel("user"); } /** * jedis操作List */ public void testList(){ //开始前,先移除所有的内容 jedis.del("java framework"); System.out.println(jedis.lrange("java framework",0,-1)); //先向key java framework中存放三条数据 jedis.lpush("java framework","spring"); jedis.lpush("java framework","struts"); jedis.lpush("java framework","hibernate"); //再取出所有数据jedis.lrange是按范围取出, // 第一个是key,第二个是起始位置,第三个是结束位置,jedis.llen获取长度 -1表示取得所有 System.out.println(jedis.lrange("java framework",0,-1)); jedis.del("java framework"); jedis.rpush("java framework","spring"); jedis.rpush("java framework","struts"); jedis.rpush("java framework","hibernate"); System.out.println(jedis.lrange("java framework",0,-1)); } /** * jedis操作Set */ public void testSet(){ //添加 jedis.del("user"); jedis.sadd("user","liuling"); jedis.sadd("user","xinxin"); jedis.sadd("user","ling"); jedis.sadd("user","zhangxinxin"); jedis.sadd("user","who"); //移除noname jedis.srem("user","who"); System.out.println(jedis.smembers("user"));//获取所有加入的value System.out.println(jedis.sismember("user", "who"));//判断 who 是否是user集合的元素 System.out.println(jedis.srandmember("user")); System.out.println(jedis.scard("user"));//返回集合的元素个数 } public void test() throws InterruptedException { //jedis 排序 //注意,此处的rpush和lpush是List的操作。是一个双向链表(但从表现来看的) jedis.del("a");//先清除数据,再加入数据进行测试 jedis.rpush("a", "1"); jedis.lpush("a","6"); jedis.lpush("a","3"); jedis.lpush("a","9"); System.out.println(jedis.lrange("a",0,-1));// [9, 3, 6, 1] System.out.println(jedis.sort("a")); //[1, 3, 6, 9] //输入排序后结果 System.out.println(jedis.lrange("a",0,-1)); } }
好,到这里已经已经贴了全部代码了。
先来看看结果
首先进入到kafka的客户端生产数据:
程序终端显示如下:
下面我们看看redis里面数值:
验证成功,下面来把代码打成jar包,因为这里我们用到了jedis的jar包,我们可以将这个jar先copy到Jstorm Lib目录下,或者直接打包时打到包内,将我们打的jar传到Jstorm Lib目录下,运行
bin/jstorm jar /Users/mac/jstorm-2.1.1/lib/KafkaStormTest.jar KafkaReader Users/mac/input 2
即可,后面的两位是参数,是我测试本地文件夹为spout的时用的,你可以改下代码不用可以去掉。
相关文章推荐
- 第二个Storm应用--数单词数量增强版(kafka+JStorm+redis)
- mongoDB、redis、Mysql的count、limit性能比较与分析
- redis、kafka、rabittMQ对比 (转)
- flume+kafka+storm+redis/mysql启动命令记录
- IDEA中运行DirectKafkaWordCount程序
- Kafka+Spark Streaming+Redis实时系统实践
- Kafka+Spark Streaming+Redis实时计算整合实践
- 大数据架构开发 挖掘分析 Hadoop HBase Hive Storm Spark Java Flume ZooKeeper Kafka Redis MongoDB 机器学习 云计算 视频教程
- Flume+Kafka+Storm+Redis实时分析系统基本架构
- IDEA中运行DirectKafkaWordCount程序
- redis中 SETBIT命令 和 BITCOUNT命令
- spark JavaDirectKafkaWordCount 例子分析
- Kafka+Spark Streaming+Redis实时系统实践
- json4s+rediscala实现kafka到spark streaming到redis
- Kafka+Spark Streaming+Redis实时计算整合实践
- Docker + Nodejs + Kafka + Redis + MySQL搭建简单秒杀环境
- Springmvc+mybatis+shiro+Dubbo+ZooKeeper+Redis+KafKa j2ee分布式架构
- jeesz分布式企业框架 javaWeb分布式架构 springmvc+mybatis+shiro dubbo zookeeper redis kafka app服务
- Flume sink Kafka Spout Storm Bolt Hbase or Redis (Flume)
- 分析比较KafkaWordCount及DierctKafkaWordCount