storm基本使用和拓扑流程
2017-09-24 13:21
417 查看
项目对于每个zip包里文件校验是通过storm的bolt处理的。每个bolt可以定义多个stream流,表示emit的时候可以选择性的发射元组到不同bolt进行处理。发射元祖的时候把文件的目录、清算日期、批次号等作为Field属性发射到下一个bolt,bolt获取这些参数后就可以根据路径去获取文件进行处理。
构建拓扑
spout 类
其中一个bolt
总结即
首先构建topology,可以构建复杂的路由网络。
然后定义spout,spout一般继承BaseRichSpout,重写open, nextTuple 和 declareOutputFields 方法。在declareOutputFields方法里定义此spout 的可能发送的节点。然后在nextTuple发射出去。
bolt 和spout差不多,bolt继承BaseBasicBolt ,重写execute 和 declareOutputFields 方法。
execute方法接收spout或者bolt发送过来的参数实现拓扑流转。
参考资料
http://blog.csdn.net/kingzone_2008/article/details/52791515
构建拓扑
package com.lancy.topology; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import com.lancy.bolt.AuditFileCheckBolt; import com.lancy.bolt.CardReleaseCheckBolt; import com.lancy.bolt.DailyRechargeDataJoinBolt; import com.lancy.bolt.DailyRechargeSummarizeBolt; import com.lancy.bolt.FileBaseCheckBolt; import com.lancy.bolt.FileLineFormatCheckBolt; import com.lancy.bolt.FilePrvAndNxtTradeChkCrossFileBolt; import com.lancy.bolt.FilePrvAndNxtTradeChkSingleFileBolt; import com.lancy.bolt.FileRecordReplicationCheckBolt; import com.lancy.bolt.InsertValidRecordMqBolt; import com.lancy.bolt.OtherFileHandleBolt; import com.lancy.bolt.OtherFileInsertDBBolt; import com.lancy.bolt.RechargeCheckFileHandleBolt; import com.lancy.bolt.ZipCheckBolt; import com.lancy.bolt.lnt.InvalidDataHandleBolt; import com.lancy.common.ConfigCommon; import com.lancy.common.FieldsCommon; import com.lancy.spout.ZipSpout; /** * @ClassName LocalFileCheckTopology * @description 文件处理拓扑 */ public class LocalFileCheckTopology { public static void main(String[] args) { Config config = new Config(); config.setNumWorkers(8); // config.setNumWorkers(6); config.put("topology.backpressure.enable", Boolean.valueOf(false)); config.put("topology.executor.receive.buffer.size", Integer.valueOf(16384)); config.put("topology.executor.send.buffer.size", Integer.valueOf(16384)); ConfigCommon.getInstance(); //初始化ConfigCommon if(args != null && args.length>0){ try { StormSubmitter.submitTopology("local-filecheck--topology",config,topologyBuilder().createTopology()); } catch (AlreadyAliveException e) { e.printStackTrace(); } catch (InvalidTopologyException e) { e.printStackTrace(); } catch (AuthorizationException e) { e.printStackTrace(); } } else { config.setDebug(true); LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("local-filecheck--topology-local-mode",config,topologyBuilder().createTopology()); // try { // Thread.sleep(120*1000); // } catch (InterruptedException e) { // e.printStackTrace(); // } // localCluster.shutdown(); } } public static TopologyBuilder topologyBuilder(){ TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("01. " + "zipSpout", new ZipSpout(), 1); //setBolt参数1组件编号,参数2具体类 //shuffleGrouping表示拓扑上游节点是什么,也就是拓扑流转的顺序,参数2是stream_id builder.setBolt("02. " + "zipCheckBolt", new ZipCheckBolt(), 16) .shuffleGrouping("01. " + "zipSpout", FieldsCommon.NORMAL); builder.setBolt("03. " + "auditFileCheckBolt", new AuditFileCheckBolt(),16) .shuffleGrouping("02. " + "zipCheckBolt", FieldsCommon.NORMAL); builder.setBolt("04. " + "fileBaseCheckBolt", new FileBaseCheckBolt(), 16) .shuffleGrouping("03. " + "auditFileCheckBolt", FieldsCommon.NORMAL); builder.setBolt("05. " +"fileLineFormatCheckBolt", new FileLineFormatCheckBolt(), 96) .shuffleGrouping("04. " + "fileBaseCheckBolt", FieldsCommon.NORMAL); builder.setBolt("06. " +"fileRecordReplicationCheckBolt", new FileRecordReplicationCheckBolt(), 48) .shuffleGrouping("05. " +"fileLineFormatCheckBolt", FieldsCommon.TRADE_FILE); builder.setBolt("07. " + "dailyRechargeSummarizeBolt", new DailyRechargeSummarizeBolt(), 16) .shuffleGrouping("06. " +"fileRecordReplicationCheckBolt", FieldsCommon.RECHARGE_SUMMARIZE); builder.setBolt("08. " +"filePrvAndNxtTradeChkSingleFileBolt", new FilePrvAndNxtTradeChkSingleFileBolt(), 48) .shuffleGrouping("06. " +"fileRecordReplicationCheckBolt", FieldsCommon.NORMAL) .shuffleGrouping("07. " + "dailyRechargeSummarizeBolt", FieldsCommon.NORMAL); builder.setBolt("09. " + "dailyRechargeDataJoinBolt", new DailyRechargeDataJoinBolt(), 16) .shuffleGrouping("01. " + "zipSpout", FieldsCommon.RECHARGE_JOIN); builder.setBolt("10. " + "filePrvAndNxtTradeChkCrossFileBolt", new FilePrvAndNxtTradeChkCrossFileBolt(), 16) .shuffleGrouping("09. " + "dailyRechargeDataJoinBolt", FieldsCommon.PREV_AND_NEXT_CHECK_CROSS_FILE); builder.setBolt("11. " +"insertValidRecordMqBolt", new InsertValidRecordMqBolt(), 256).setNumTasks(256) .shuffleGrouping("08. " +"filePrvAndNxtTradeChkSingleFileBolt", FieldsCommon.INSERT_VALID_MQ) .shuffleGrouping("10. " +"filePrvAndNxtTradeChkCrossFileBolt", FieldsCommon.INSERT_VALID_MQ); builder.setBolt("12. " +"invalidDataHandleBolt", new InvalidDataHandleBolt(), 16) .shuffleGrouping("05. " +"fileLineFormatCheckBolt", FieldsCommon.NON_VALID) .shuffleGrouping("06. " +"fileRecordReplicationCheckBolt", FieldsCommon.NON_VALID) .shuffleGrouping("08. " +"filePrvAndNxtTradeChkSingleFileBolt", FieldsCommon.NON_VALID) .shuffleGrouping("10. " + "filePrvAndNxtTradeChkCrossFileBolt", FieldsCommon.NON_VALID); builder.setBolt("13. " +"otherFileHandleBolt",new OtherFileHandleBolt(),16) .shuffleGrouping("05. " +"fileLineFormatCheckBolt",FieldsCommon.OTHER_FILE); builder.setBolt("14. " +"rechargeCheckFileHandleBolt", new RechargeCheckFileHandleBolt(), 16) .shuffleGrouping("05. " +"fileLineFormatCheckBolt",FieldsCommon.RECHARGE_CHECK_FILE); builder.setBolt("15. " +"cardReleaseCheckBolt", new CardReleaseCheckBolt(), 16) .shuffleGrouping("05. " +"fileLineFormatCheckBolt",FieldsCommon.CARD_DETAIL_FILE); builder.setBolt("16. " +"otherFileInsertDBBolt",new OtherFileInsertDBBolt(),16) .shuffleGrouping("13. " +"otherFileHandleBolt") .shuffleGrouping("14. " +"rechargeCheckFileHandleBolt") .shuffleGrouping("15. " +"cardReleaseCheckBolt"); return builder; } }
spout 类
package com.lingnanpass.spout; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Calendar; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.lingnanpass.common.CommUtil; import com.lingnanpass.common.ConfigCommon; import com.lingnanpass.common.FieldsCommon; import com.lingnanpass.common.LntValidate; import com.lingnanpass.common.WarnUtil; import com.lingnanpass.common.pre.PreFileCheckUtil; import com.lingnanpass.common.util.DateTransformUitl; import com.lingnanpass.common.util.FileUtil; import com.lingnanpass.common.util.FtpUtil; import com.lingnanpass.common.util.JsonUtil; import com.lingnanpass.common.util.MonitorUtil; import com.lingnanpass.common.util.RedisUtil; import com.lingnanpass.common.util.RuleMappingUtil; import com.lingnanpass.common.util.ZIPUtil; import com.lingnanpass.db.DbUtil; import com.lingnanpass.db.JdbcTemplate; /** * @ClassName ZipSpout * @description 从数据库zip队列中获取FTP上传数据的消息,根据消息中描述的FTP地址,文件名等信息下载文件 * @author yaoyf */ @SuppressWarnings("serial") public class ZipSpout extends BaseRichSpout { private SpoutOutputCollector _collector; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this._collector = collector; } @Override public void nextTuple() { //写业务代码 int type = 0; if(type == 0){ //发射后如何判断流到那个bolt呢,是通过 //builder.setBolt("09. " + "dailyRechargeDataJoinBolt", new DailyRechargeDataJoinBolt(), 16) //.shuffleGrouping("01. " + "zipSpout", FieldsCommon.RECHARGE_JOIN); _collector.emit(FieldsCommon.RECHARGE_JOIN, new Values(entry.getKey())); }else{ _collector.emit(FieldsCommon.NORMAL, new Values(zipAbsPath,clearingBatchNo,jyFileName, mCode, serviceCode, areaCode)); } } //定义不同stream_id以及不同stream_id输出的字段参数,只有定义了才可以emit出去 @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(FieldsCommon.NORMAL, new Fields(FieldsCommon.ZIP_ABS_PATH,FieldsCommon.CLEARING_BATCH_NO,FieldsCommon.JY_FILE_NAME, FieldsCommon.MERCHANT,FieldsCommon.SERVICE_CODE, FieldsCommon.AREA_CODE)); declarer.declareStream(FieldsCommon.RECHARGE_JOIN, new Fields(FieldsCommon.RECHARGE_SUMMARIZE_FLAG)); } }
其中一个bolt
package com.lancy.bolt; import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * @ClassName DailyRechargeDataJoinBolt * @description * */ public class DailyRechargeDataJoinBolt extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { String value = input.getString(0);//获取前面的bolt传递的值。 // 发射消息至其他bolt collector.emit(FieldsCommon.PREV_AND_NEXT_CHECK_CROSS_FILE, new Values("CC", ccFile.getAbsolutePath(), ccList)); collector.emit(FieldsCommon.PREV_AND_NEXT_CHECK_CROSS_FILE, new Values("CZ", czFile.getAbsolutePath(), czList)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream(FieldsCommon.PREV_AND_NEXT_CHECK_CROSS_FILE, new Fields(FieldsCommon.ZIP_TYPE, FieldsCommon.FILE_ABS_PATH, FieldsCommon.FILE_LIST)); } }
总结即
首先构建topology,可以构建复杂的路由网络。
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new SentenceSpout(), 1); builder.setBolt("split", new SplitBolt(), 2) .shuffleGrouping("spout ");//通过组件编号路由。 builder.setBolt("count1", new Count1Bolt(), 2) .shuffleGrouping("split","split-stream");//通过组件编号和stream_id路由。 builder.setBolt("count2", new Count2Bolt(), 2) .shuffleGrouping("split","split-stream");//通过组件编号和stream_id路由。 .shuffleGrouping("spout","spout1-stream");//通过组件编号和stream_id路由。 builder.setBolt("print", new PrintBolt(), 2) .shuffleGrouping("count2","count2-stream");//通过组件编号和stream_id路由。 .shuffleGrouping("spout","spout2-stream");//通过组件编号和stream_id路由。
然后定义spout,spout一般继承BaseRichSpout,重写open, nextTuple 和 declareOutputFields 方法。在declareOutputFields方法里定义此spout 的可能发送的节点。然后在nextTuple发射出去。
collector.emit("spout2-stream", new Values("nihao")); @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declareStream("spout2-stream", new Fields("test") ); declarer.declareStream("spout1-stream"); }
bolt 和spout差不多,bolt继承BaseBasicBolt ,重写execute 和 declareOutputFields 方法。
execute方法接收spout或者bolt发送过来的参数实现拓扑流转。
@Override public void execute(Tuple input, BasicOutputCollector collector) { }
参考资料
http://blog.csdn.net/kingzone_2008/article/details/52791515
相关文章推荐
- SVN服务器搭建及客户端使用的基本流程
- Storm 实时云计算 学习使用 包括基本api 以及 高层次api trident 的基本使用
- 【一起学OpenFOAM】03 OpenFOAM基本使用流程
- Git的基本使用及基本流程(平台为GitHub)
- Java多线程总结(6)— 线程池的基本使用和执行流程分析
- Django_forms最基本的使用流程
- Eclipse使用git最基本流程
- PJSIP学习笔记——从simple_pjsua.c示例程序了解PJSUA-LIB的基本使用流程
- 使用URLConnection获取网页信息的基本流程 分类: H1_ANDROID 2013-10-12 23:51 3646人阅读 评论(0) 收藏
- 在salesforce中使用Partner Community的基本流程
- NUnit基本使用流程
- Eclipse使用git最基本流程
- 1:ODBC连接数据库的流程 2:访问数据库的基本流程3:使用ReseltSet 的基本流程 javac程序连接数据库源代码
- TortoiseSVN基本功能使用流程详述
- 使用npm构建前端项目基本流程
- 使用URLConnection获取网页信息的基本流程
- 使用ALM的基本流程
- Fragment使用的基本流程
- NDK学习笔记:动态链接库与静态链接库的基本使用流程简记
- 在salesforce中使用Partner Community的基本流程