jstorm kafka插件使用案例
2017-04-29 12:06
513 查看
本文用的是jstorm 2.2.1
一、pom引用
二、自定义bolt
三、自定义拓扑图入口类
四、配置文件application.properties
小注:
1、jstorm kafka插件源码集成
需要到jstorm的github官网:https://github.com/alibaba/jstorm/releases中找到你需要使用的release版本,下载源码,将其中的插件源码集成到你自己的项目中,插件源码位置如下图:
2、logback的使用
jstorm 2.1.1之后,jstorm默认使用了logback作为日志框架,logback在一般使用时是兼容log4j的,也就是说log4j可以直接桥接到logback,具体为:
a. 添加slf4j-api, log4j-over-slf4j和logback依赖(其实加了logback依赖之后就不需要加slf4j-api依赖了),具体:
b. 排除pom中所有的slf4j-log4j12的依赖,因为slf4j-log4j12跟log4j-over-slf4j是冲突的:
这里版本一般是1.7.5,但是还要具体看你的应用pom仲裁出的版本。
理论上,这样就能够把log4j桥接到slf4j。
demo下载地址:
http://download.csdn.net/detail/xunzaosiyecao/9829079
https://github.com/JianKunKing/jstorm-kafka-plugin-demo
作者:jiankunking 出处:http://blog.csdn.net/jiankunking
一、pom引用
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>jiankunking</groupId> <artifactId>kafkajstorm</artifactId> <version>1.0-SNAPSHOT</version> <url>http://blog.csdn.net/jiankunking</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <!--<scope>test</scope>--> </dependency> <!--jstorm begin--> <dependency> <groupId>com.alibaba.jstorm</groupId> <artifactId>jstorm-core</artifactId> <version>2.2.1</version> <!--<scope>provided</scope>--> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.9.2</artifactId> <version>0.8.1</version> <exclusions> <exclusion> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>junit</groupId> <artifactId>junit</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.5</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.5.0</version> <exclusions> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.googlecode.json-simple</groupId> <artifactId>json-simple</artifactId> <version>1.1</version> </dependency> <!--jstorm end--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.3.3</version> </dependency> <!--logback begin--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.10</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency> <!--log end--> </dependencies> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
二、自定义bolt
package jiankunking.kafkajstorm.bolts; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.TupleImplExt; import jiankunking.kafkajstorm.util.ByteUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; /** * Created by jiankunking on 2017/4/19 16:47. */ public class CustomBolt extends BaseBasicBolt { protected final Logger logger = LoggerFactory.getLogger(CustomBolt.class); public void execute(Tuple input, BasicOutputCollector collector) { try { String ss=ByteUtil.getStringFromByteArray((byte[]) ((TupleImplExt) input).get("bytes")); System.out.println(ss); logger.info(ss); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { System.out.println("declareOutputFields"); } }
三、自定义拓扑图入口类
package jiankunking.kafkajstorm.topologies; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.generated.AlreadyAliveException; import backtype.storm.generated.InvalidTopologyException; import backtype.storm.topology.TopologyBuilder; import com.alibaba.jstorm.client.ConfigExtension; import jiankunking.kafkajstorm.bolts.CustomBolt; import jiankunking.kafkajstorm.kafka.KafkaSpout; import jiankunking.kafkajstorm.kafka.KafkaSpoutConfig; import jiankunking.kafkajstorm.util.PropertiesUtil; import java.util.Map; /** * Created by jiankunking on 2017/4/19 16:27. * 拓扑图 入口类 */ public class CustomCounterTopology { /** * 入口类,即提交任务的类 * * @throws InterruptedException * @throws AlreadyAliveException * @throws InvalidTopologyException */ public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException { System.out.println("11111"); PropertiesUtil propertiesUtil = new PropertiesUtil("/application.properties", false); Map propsMap = propertiesUtil.getAllProperty(); KafkaSpoutConfig spoutConfig = new KafkaSpoutConfig(propertiesUtil.getProps()); spoutConfig.configure(propsMap); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", new KafkaSpout(spoutConfig)); builder.setBolt("customBolt", new CustomBolt(), 1).shuffleGrouping("kafkaSpout"); //Configuration Config conf = new Config(); conf.setDebug(false); //指定使用logback.xml //需要把logback.xml文件放到jstorm conf目录下 ConfigExtension.setUserDefinedLogbackConf(conf, "%JSTORM_HOME%/conf/logback.xml"); if (args != null && args.length > 0) { //提交到集群运行 StormSubmitter.submitTopologyWithProgressBar("customCounterTopology", conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); //本地模式运行 LocalCluster cluster = new LocalCluster(); cluster.submitTopology("CustomCounterTopology", conf, builder.createTopology()); } } }
四、配置文件application.properties
# kafka # kafka 消费组 kafka.client.id=kafkaspoutid kafka.broker.partitions=4 kafka.fetch.from.beginning=false kafka.topic=test_one kafka.broker.hosts=10.10.10.10:9092 kafka.zookeeper.hosts=10.10.10.10:2181 storm.zookeeper.root=/kafka
小注:
1、jstorm kafka插件源码集成
需要到jstorm的github官网:https://github.com/alibaba/jstorm/releases中找到你需要使用的release版本,下载源码,将其中的插件源码集成到你自己的项目中,插件源码位置如下图:
2、logback的使用
jstorm 2.1.1之后,jstorm默认使用了logback作为日志框架,logback在一般使用时是兼容log4j的,也就是说log4j可以直接桥接到logback,具体为:
a. 添加slf4j-api, log4j-over-slf4j和logback依赖(其实加了logback依赖之后就不需要加slf4j-api依赖了),具体:
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.5</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>log4j-over-slf4j</artifactId> <version>1.7.10</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.0.13</version> </dependency>
b. 排除pom中所有的slf4j-log4j12的依赖,因为slf4j-log4j12跟log4j-over-slf4j是冲突的:
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.5</version> <scope>provided</scope> </dependency>
这里版本一般是1.7.5,但是还要具体看你的应用pom仲裁出的版本。
理论上,这样就能够把log4j桥接到slf4j。
demo下载地址:
http://download.csdn.net/detail/xunzaosiyecao/9829079
https://github.com/JianKunKing/jstorm-kafka-plugin-demo
作者:jiankunking 出处:http://blog.csdn.net/jiankunking
相关文章推荐
- select三级联动结合jquery的chosen.jquery.js 插件一起使用原创案例
- EasyUI插件使用案例----将商品拖动到购物车
- 使用PlupLoad js插件进行文件上传案例
- easyui下面layout插件的使用案例
- ExtJS中grid按照使用Expand插件、分组显示、中文拼音首字母排序、改变行背景、列背景、静态数据分页综合案例
- storm kafka插件使用案例
- 使用logstash的logstash-input-kafka插件读取kafka中的数据
- hadoop插件sqoop使用案例
- Logstash学习1_logstash的kafka插件使用
- kafka HighLevelConsumer API 使用案例
- cropper(裁剪图片)插件使用(案例)
- kafka 使用中几个案例
- _00017 Kafka的体系结构介绍以及Kafka入门案例(初级案例+Java API的使用)
- 一个使用上传图片插件,图文编辑,时间插件的综合案例
- 使用python制作ArcGIS插件(6)案例分析
- 用一个案例介绍jQuery插件的使用和写法
- elasticsearch-river-kafka 插件的环境配置和使用
- logstash的kafka插件使用