大数据处理实例——Amazon商品评分&评论(三)
2016-09-24 09:45
405 查看
背景
本系列的第二篇中给出了实时预处理部分的总体框架和Storm Topology的实现。在Storm处理之前,由于从Stanford获取的乐器类评价文件(Musical_Instruments_5.json)是某一段时间的全量数据集,所以也提到可以自己写个小应用来模拟实时评论。这里给出几种实现方案。提前知识
关于Flume的原理,网上有很多参考资料。在本人的实时处理环境搭建过程中也给出了Flume+Kafka的配置文件,这里给出简单说明(1)给出FlumeNG source, channel, sinks 的别名
agent.sources = r1 agent.channels = c1 agent.sinks = k1
(2)FlumeNG source配置,这里配置为spooldir模式,可以监控指定文件夹(agent.sources.r1.spoolDir)中的新文件变化,如果目录中有文件产生,就会立刻读取其内容
agent.sources.r1.type = spooldir agent.sources.r1.spoolDir = /opt/flume/flumeSpool agent.sources.r1.fileHeader = false agent.sources.r1.channels = c1
(3)FlumeNG channel配置,channel 是一个存储地,接收 source 的输出,直到有 sink 消费掉 channel 中的数据。新版本的FlumeNG自带了KafkaChannel。实际上KafkaChannel包含producer和consumer,producer接受source发送过来的数据放到broker中,consumer从broker中pull数据给sink,目前一个channel只允许有一个topic。
agent.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel agent.channels.c1.capacity = 10000 agent.channels.c1.transactionCapacity = 1000 agent.channels.c1.brokerList = zk01:9092,zk02:9092,zk03:9092 agent.channels.c1.topic = review-topic agent.channels.c1.zookeeperConnect=zk01:2181
(4)FlumeNG sink配置,新版本的FlumeNG自带了KafkaSink, 将从channel接收的数据作为kafka的生产者 发送给消费者。
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink agent.sinks.k1.topic = test-topic agent.sinks.k1.brokerList = zk01:9092,zk02:9092,zk03:9092 agent.sinks.k1.requiredAcks = 1 agent.sinks.k1.batchSize = 20 agent.sinks.k1.channel = c1
(5)接下来就是要配置kafka的消费者来消费KafkaSink发送的数据。这也就是在上一篇Storm Topology中配置的KafkaSpout了。其角色就是kafka的消费者和storm的数据接收源,也即从Kafka中读取数据,并组装成tuple发射出去,tuple被发射后就开始在Topology中传播
ReviewTopology.java BrokerHosts brokerHosts = new ZkHosts(zks); SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme()); spoutConf.zkServers = Arrays.asList(new String[] {"zk01", "zk02", "zk03"}); spoutConf.zkPort = 2181; TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafkaSpout", new KafkaSpout(spoutConf), 1);//定义kafkaSpout . . . . . .
当然你可以完全不使用Flume自带的KafkaChannel和KafkaSink,在了解了Flume原理的基础上自定义Channel和Sink实现
方案实现
在介绍了flume+kafka+storm的关联后,回归到本篇的主题,即在已有全量数据集上模拟实时评论。这里给出三种实现方式,前两种是基于FlumeNG source的实现,第三种是自定义Storm Spout的实现基于spool source的实现方式
前面已经介绍过spooldir模式只能监听指定目录下的新增文件,不会监听原有文件的追加内容。其实也是不允许给指定目录的文件追加内容的,因为在spooldir模式下,监听到新增文件并读取后,该文件会被重命名(如test.txt会被重命名为test.txt.COMPLETED),此时外部应用想要在原文件末尾追加内容,显然会出现异常。因此,基于spooldir模式的实时监听,基本上是和log4j的文件分割功能一起使用的,可以为log4j配置按时间生成新文件到spooldir指定目录,这里给出log4j2的参考配置
<?xml version="1.0" encoding="UTF-8"?> <Configuration> <Appenders> <!-- 配置日志输出文件名字追加读写 --> <RollingFile name="REVIEW_LOG" fileName="/opt/flume/flumeSpoolDir/review.log" filePattern="/opt/flume/flumeSpoolDir/review.%d{yyyy-MM-dd-HH-mm-ss}.log" append="true"> <!-- 输出格式 --> <PatternLayout pattern="[%l] [%t] [%-4r] [%d{yyyy-MM-dd HH:mm:ss,SSS}] [%p] \: %m%n"/> <!-- 设置策略 --> <Policies> <!-- 基于时间的触发策略。该策略主要是完成周期性的log文件封存工作。有两个参数: interval,integer型,指定两次封存动作之间的时间间隔。单位:以日志的命名精度filePattern来确定单位, 比如这里配置的yyyy-MM-dd-HH-mm-ss 单位为秒,interval="5"表示美5秒生成一个新的日志文件 modulate,boolean型,说明是否对封存时间进行调制。 --> <TimeBasedTriggeringPolicy interval="5" modulate="true" /> </Policies> </RollingFile> </Appenders> <Loggers> <!-- 配置记录器级别 --> <Root level="info"> <AppenderRef ref="REVIEW_LOG"/> </Root> </Loggers> </Configuration>
基于exec source的实现
ExecSource是用来执行本地shell命令,并把本地日志文件中的数据封装成Event事件流在Flume NG中流动,最常用的命令就是tail -F命令,可以从本地日志文件中获取新追加的日志。(1) 修改flume-conf.properties中flume source的配置
agent.sources.r1.type = exec agent.sources.r1.command= tail -F /opt/flume/flumeSpool/musical_review.txt agent.sources.r1.channels = c1
(2) 写应用程序每五秒读取Musical_Instruments_5.json中的一行,写入/opt/flume/flumeSpool/musical_review.txt末尾
自定义Storm Spout的实现
之前介绍中Storm Topology中Spout使用的是Storm自带的KafkaSpout,其数据来源是Flume的sink。在自己学习实验过程中也可以丢弃Flume,自定义Spout从文件中读入数据,并组装成tuple发射出去public class SourceSpout extends BaseRichSpout { private static final Log LOG = LogFactory.getLog(SourceSpout.class); private SpoutOutputCollector collector; private static final String SOURCE_FILE = "Musical_Instruments_5.json"; private List reviewContent = null; public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) { this.collector = spoutOutputCollector; try { initReviewContent(); } catch (IOException e) { LOG.error(e.getLocalizedMessage()); } } public void nextTuple() { for (String review : reviewContent) { this.collector.emit(new Values(review)); try { Thread.sleep(5000); //每5秒发送一行 } catch (InterruptedException e) { LOG.error(e.getLocalizedMessage()); } } } public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("review")); } private void initReviewContent() throws IOException { if (reviewContent == null) { File file = new File(this.getClass().getResource("/").getPath() + "/resources/" + SOURCE_FILE); if (file.isFile() && file.exists()) { InputStreamReader read = new InputStreamReader(new FileInputStream(file), "UTF-8"); BufferedReader bufferedReader = new BufferedReader(read); String lineTxt = null; while ((lineTxt = bufferedReader.readLine()) != null) { reviewContent.add(lineTxt); } read.close(); } } } }
相关文章推荐
- 大数据处理实例——Amazon商品评分&评论(二)
- 大数据处理实例——Amazon商品评分&评论(四)
- 大数据处理实例——Amazon商品评分&评论(一)
- 网页数据采集: 制作Amazon亚马逊网商品评论网页爬虫
- 处理Android SQLite -&n… 分类: Android数据存储 2014-05-30 10:58 95人阅读 评论(0) 收藏
- Amazon评论数据的预处理代码(Positive & Negative)
- 通过爬取天猫商品评论实例分析Python爬取ajax动态生成的数据
- [求教] - 关于ASP中如何对数据表字段中所包含的特殊字符 ' 与 " 进行转义处理?
- Socket处理发送和接收数据包,一个小实例:
- axis2.AxisFault: 服务器无法处理请求。 ---> 未将对象引用设置到对象的实例。
- GPS数据处理---< 一 >---数据格式
- 在.NET&nbsp;Framework中轻松处理XML数据(五)
- WebService处理大数据量数据出错:运行配置文件中指定的扩展时出现异常。 ---> 超过了最大请求长度。.
- 用perl 提取时间信息并处理数据--实例。
- Problem Description 输入n(n<100)个数,找出其中最小的数,将它与最前面的数交换后输出这些数。 Input 输入数据有多组,每组占一行,每行的开始是一个整数n,表示这个测试实例的数值的个数,跟着就是n个整数。n=0表示输入的结束,不做处理。 Output 对于每组
- 11级_Java_曹建波 9.10 JDB处理大数据&大文本&二进制数据&批处理&事务
- 使用 ibatis 处理复杂对象数据关系的实例
- servlet处理表单数据之实例开发(1)
- Access中"是/否"数据类型是怎样处理的
- DSOFramer控件文档上传到服务器处理页面后,怎么解析数据 <转>