您的位置:首页 > 大数据

大数据处理实例——Amazon商品评分&评论(三)

2016-09-24 09:45 405 查看

背景

本系列的第二篇中给出了实时预处理部分的总体框架和Storm Topology的实现。在Storm处理之前,由于从Stanford获取的乐器类评价文件(Musical_Instruments_5.json)是某一段时间的全量数据集,所以也提到可以自己写个小应用来模拟实时评论。这里给出几种实现方案。

提前知识

关于Flume的原理,网上有很多参考资料。在本人的实时处理环境搭建过程中也给出了Flume+Kafka的配置文件,这里给出简单说明

(1)给出FlumeNG source, channel, sinks 的别名

agent.sources = r1
agent.channels = c1
agent.sinks = k1


(2)FlumeNG source配置,这里配置为spooldir模式,可以监控指定文件夹(agent.sources.r1.spoolDir)中的新文件变化,如果目录中有文件产生,就会立刻读取其内容

agent.sources.r1.type = spooldir
agent.sources.r1.spoolDir = /opt/flume/flumeSpool
agent.sources.r1.fileHeader = false
agent.sources.r1.channels = c1


(3)FlumeNG channel配置,channel 是一个存储地,接收 source 的输出,直到有 sink 消费掉 channel 中的数据。新版本的FlumeNG自带了KafkaChannel。实际上KafkaChannel包含producer和consumer,producer接受source发送过来的数据放到broker中,consumer从broker中pull数据给sink,目前一个channel只允许有一个topic。

agent.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 1000
agent.channels.c1.brokerList = zk01:9092,zk02:9092,zk03:9092
agent.channels.c1.topic = review-topic
agent.channels.c1.zookeeperConnect=zk01:2181


(4)FlumeNG sink配置,新版本的FlumeNG自带了KafkaSink, 将从channel接收的数据作为kafka的生产者 发送给消费者。

agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.topic = test-topic
agent.sinks.k1.brokerList = zk01:9092,zk02:9092,zk03:9092
agent.sinks.k1.requiredAcks = 1
agent.sinks.k1.batchSize = 20
agent.sinks.k1.channel = c1


(5)接下来就是要配置kafka的消费者来消费KafkaSink发送的数据。这也就是在上一篇Storm Topology中配置的KafkaSpout了。其角色就是kafka的消费者和storm的数据接收源,也即从Kafka中读取数据,并组装成tuple发射出去,tuple被发射后就开始在Topology中传播

ReviewTopology.java

BrokerHosts brokerHosts = new ZkHosts(zks);
SpoutConfig spoutConf = new SpoutConfig(brokerHosts, topic, zkRoot, id);
spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
spoutConf.zkServers = Arrays.asList(new String[] {"zk01", "zk02", "zk03"});
spoutConf.zkPort = 2181;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaSpout", new KafkaSpout(spoutConf), 1);//定义kafkaSpout
. . .
. . .


当然你可以完全不使用Flume自带的KafkaChannel和KafkaSink,在了解了Flume原理的基础上自定义Channel和Sink实现

方案实现

在介绍了flume+kafka+storm的关联后,回归到本篇的主题,即在已有全量数据集上模拟实时评论。这里给出三种实现方式,前两种是基于FlumeNG source的实现,第三种是自定义Storm Spout的实现

基于spool source的实现方式

  前面已经介绍过spooldir模式只能监听指定目录下的新增文件,不会监听原有文件的追加内容。其实也是不允许给指定目录的文件追加内容的,因为在spooldir模式下,监听到新增文件并读取后,该文件会被重命名(如test.txt会被重命名为test.txt.COMPLETED),此时外部应用想要在原文件末尾追加内容,显然会出现异常。

  因此,基于spooldir模式的实时监听,基本上是和log4j的文件分割功能一起使用的,可以为log4j配置按时间生成新文件到spooldir指定目录,这里给出log4j2的参考配置

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
<!-- 配置日志输出文件名字追加读写 -->
<RollingFile name="REVIEW_LOG" fileName="/opt/flume/flumeSpoolDir/review.log"
filePattern="/opt/flume/flumeSpoolDir/review.%d{yyyy-MM-dd-HH-mm-ss}.log"
append="true">
<!-- 输出格式 -->
<PatternLayout pattern="[%l] [%t] [%-4r] [%d{yyyy-MM-dd HH:mm:ss,SSS}] [%p] \: %m%n"/>
<!-- 设置策略 -->
<Policies>
<!-- 基于时间的触发策略。该策略主要是完成周期性的log文件封存工作。有两个参数:
interval,integer型,指定两次封存动作之间的时间间隔。单位:以日志的命名精度filePattern来确定单位,
比如这里配置的yyyy-MM-dd-HH-mm-ss 单位为秒,interval="5"表示美5秒生成一个新的日志文件
modulate,boolean型,说明是否对封存时间进行调制。
-->
<TimeBasedTriggeringPolicy interval="5" modulate="true" />
</Policies>
</RollingFile>
</Appenders>
<Loggers>
<!-- 配置记录器级别 -->
<Root level="info">
<AppenderRef ref="REVIEW_LOG"/>
</Root>
</Loggers>
</Configuration>


基于exec source的实现

ExecSource是用来执行本地shell命令,并把本地日志文件中的数据封装成Event事件流在Flume NG中流动,最常用的命令就是tail -F命令,可以从本地日志文件中获取新追加的日志。

(1) 修改flume-conf.properties中flume source的配置

agent.sources.r1.type = exec
agent.sources.r1.command= tail -F /opt/flume/flumeSpool/musical_review.txt
agent.sources.r1.channels = c1


(2) 写应用程序每五秒读取Musical_Instruments_5.json中的一行,写入/opt/flume/flumeSpool/musical_review.txt末尾

自定义Storm Spout的实现

之前介绍中Storm Topology中Spout使用的是Storm自带的KafkaSpout,其数据来源是Flume的sink。在自己学习实验过程中也可以丢弃Flume,自定义Spout从文件中读入数据,并组装成tuple发射出去

public class SourceSpout extends BaseRichSpout {
private static final Log LOG = LogFactory.getLog(SourceSpout.class);

private SpoutOutputCollector collector;
private static final String SOURCE_FILE = "Musical_Instruments_5.json";
private List reviewContent = null;
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
try {
initReviewContent();
} catch (IOException e) {
LOG.error(e.getLocalizedMessage());
}
}

public void nextTuple() {
for (String review : reviewContent) {
this.collector.emit(new Values(review));
try {
Thread.sleep(5000); //每5秒发送一行
} catch (InterruptedException e) {
LOG.error(e.getLocalizedMessage());
}
}

}

public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("review"));
}

private void initReviewContent() throws IOException {
if (reviewContent == null) {
File file = new File(this.getClass().getResource("/").getPath() + "/resources/" + SOURCE_FILE);
if (file.isFile() && file.exists()) {
InputStreamReader read = new InputStreamReader(new FileInputStream(file), "UTF-8");
BufferedReader bufferedReader = new BufferedReader(read);
String lineTxt = null;
while ((lineTxt = bufferedReader.readLine()) != null) {
reviewContent.add(lineTxt);
}
read.close();
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐