基于Apache Flume Datahub插件将日志数据同步上云
2017-03-28 00:00
387 查看
简介
Apache Flume是一个分布式的、可靠的、可用的系统,可用于从不同的数据源中高效地收集、聚合和移动海量日志数据到集中式数据存储系统,支持多种Source和Sink插件。本文将介绍如何使用Apache Flume的Datahub Sink插件将日志数据实时上传到Datahub。环境要求
JDK (1.7及以上,推荐1.7)Flume-NG 1.x
Apache Maven 3.x
插件部署
下载插件压缩包
$ wget http://repo.aliyun.com/download/flume-datahub-sink-1.1.0.tar.gz
解压插件压缩包
$ tar zxvf flume-datahub-sink-1.1.0.tar.gz $ ls flume-datahub-sink lib libext
部署Datahub Sink插件
将解压后的插件文件夹flume-datahub-sink移动到Apache Flume安装目录下$ mkdir {YOUR_FLUME_DIRECTORY}/plugins.d $ mv flume-datahub-sink {YOUR_FLUME_DIRECTORY}/plugins.d/
移动后,核验Datahub Sink插件是否已经在相应目录:
$ ls { YOUR_APACHE_FLUME_DIR }/plugins.d flume-datahub-sink
配置示例
Flume的原理、架构,以及核心组件的介绍请参考 Flume-ng的原理和使用。本文将构建一个使用Datahub Sink的Flume实例,对日志文件中的结构化数据进行解析,并上传到Datahub Topic中。需要上传的日志文件格式如下(每行为一条记录,字段之间逗号分隔):
# test_basic.log some,log,line1 some,log,line2 ...
下面将创建Datahub Topic,并把每行日志的第一列和第二列作为一条记录写入Topic中。
创建Datahub Topic
使用Datahub WebConsole创建好Topic,schema为(string c1, string c2),下面假设建好的Topic名为test_topic。Flume配置文件
在Flume安装目录的conf/文件夹下创建名为datahub_basic.conf的文件,并输入内容如下:# A single-node Flume configuration for Datahub # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = cat {YOUR_LOG_DIRECTORY}/test_basic.log # Describe the sink a1.sinks.k1.type = com.aliyun.datahub.flume.sink.DatahubSink a1.sinks.k1.datahub.accessID = {YOUR_ALIYUN_DATAHUB_ACCESS_ID} a1.sinks.k1.datahub.accessKey = {YOUR_ALIYUN_DATAHUB_ACCESS_KEY} a1.sinks.k1.datahub.endPoint = {YOUR_ALIYUN_DATAHUB_END_POINT} a1.sinks.k1.datahub.project = test_project a1.sinks.k1.datahub.topic = test_topic a1.sinks.k1.batchSize = 1 a1.sinks.k1.serializer = DELIMITED a1.sinks.k1.serializer.delimiter = , a1.sinks.k1.serializer.fieldnames = c1,c2, a1.sinks.k1.serializer.charset = UTF-8 a1.sinks.k1.shard.number = 1 a1.sinks.k1.shard.maxTimeOut = 60 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 1000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
这里serializer配置指定了以逗号分隔的形式将输入源解析成三个字段,并忽略第三个字段。
启动Flume
配置完成后,启动Flume并指定agent的名称和配置文件路径,添加**-Dflume.root.logger=INFO,console**选项可以将日志实时输出到控制台。$ cd {YOUR_FLUME_DIRECTORY} $ bin/flume-ng agent -n a1 -c conf -f conf/datahub_basic.conf -Dflume.root.logger=INFO,console
写入成功,显示日志如下:
... Write success. Event count: 2 ...
数据使用
日志数据通过Flume上传到Datahub后,可以使用StreamCompute流计算来进行实时分析,例如对于一些Web网站的日志,可以实时统计各个页面的PV/UV等。另外,导入Datahub的数据也可以配置Connector将数据归档至MaxCompute中,方便后续的离线分析。对于数据归档MaxCompute的场景,一般来说需要将数据进行分区。Datahub到MaxCompute的归档可以根据MaxCompute表的分区字段自动创建分区,前提是要求MaxCompute和Datahub的字段名以及类型可以完全对应上。如果需要根据日志的传输时间自动设置分区,则在上面的例子中需要指定MaxCompute的分区相应字段和时间格式,例如按小时自动创建分区,添加的配置如下:
a1.sinks.k1.maxcompute.partition.columns = pt a1.sinks.k1.maxcompute.partition.values = %Y%m%d%H
注意:pt这个字段需要在Datahub Topic以及MaxCompute表中都存在,且是表的分区字段。
相关文章推荐
- 基于Apache Flume Datahub插件将日志数据同步上云
- 基于Apache Flume Datahub插件将日志数据同步上云
- 基于OGG Datahub插件将Oracle数据同步上云
- 使用基于 Eclipse 插件框架的 ODA(Open Data Access)进行自定义数据驱动开发
- 基于OGG Datahub插件将Oracle数据同步上云
- mysql数据通过fluent同步到阿里云datahub填坑过程
- 从Apache的日志文件收集和提供统计数据(一个Python插件架构的简单实现)
- 基于OGG Datahub插件将Oracle数据同步上云
- 基于虚拟日志压缩的数据同步方案
- Visual Studio 2008中如何比较二个数据库的架构【Schema】和数据【Data】并同步 [转贴]
- 基于Oracle数据库的数据同步技术
- 基于jquery的finkyUI插件与Ajax实现页面数据加载功能
- WinCE中基于XML的数据同步
- 多层数据库应用基于Delphi DataSnap方法调用的实现(二)更新数据集
- 基于hive的日志数据统计实战
- 基于Hive的日志数据统计实战
- CYQ.Data 轻量数据层之路 应用示例三 Aop切入留言系统--操作日志(二十七)
- WinCE中基于XML的数据同步
- lyhucSelect基于Jquery的Select数据联动插件
- 数据同步中一表操作的增删改反映到记录数据变化的表datainfo上的触发器写法