flume抽取mysql数据至hdfs
2017-09-25 21:46
429 查看
场景分析:
一般情况下关系型数据库(mysql、oracle、sqlserver)数据抽取至hdfs、hive、hbase使用sqoop工具。但sqoop数据抽取的底层依靠mapreduce,处理的实时性得不到保证。如果能将数据抽取和Sparkstreaming+Sparksql结合将大大提高了处理效率。因而想到了flume抽取关系型数据库数据至kafka中,有Sparkstreaming读取。本文介绍如何通过flume抽取mysql数据至hdfs(转载于其他博客),后面会介绍kafka+sparkStreaming的流程。
1.建立mysql数据库表
控制台登录mysql后运行下命令:use test;
create table wlslog
(id int not null,
time_stamp varchar(40),
category varchar(40),
type varchar(40),
servername varchar(40),
code varchar(40),
msg varchar(40),
primary key ( id )
);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(1,’apr-8-2014-7:06:16-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000365’,’server state changed to standby’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(2,’apr-8-2014-7:06:17-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000365’,’server state changed to starting’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(3,’apr-8-2014-7:06:18-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000365’,’server state changed to admin’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(4,’apr-8-2014-7:06:19-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000365’,’server state changed to resuming’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(5,’apr-8-2014-7:06:20-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000361’,’started weblogic adminserver’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(6,’apr-8-2014-7:06:21-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000365’,’server state changed to running’);
insert into wlslog(id,time_stamp,category,type,servername,code,msg) values(7,’apr-8-2014-7:06:22-pm-pdt’,’notice’,’weblogicserver’,’adminserver’,’bea-000360’,’server started in running mode’);
commit;
2. 建立相关目录与文件
(1)创建本地状态文件mkdir -p /var/lib/flume
cd /var/lib/flume
touch sql-source.status
chmod -R 777 /var/lib/flume
(2)建立HDFS目标目录
hdfs dfs -mkdir -p /flume/mysql
hdfs dfs -chmod -R 777 /flume/mysql
3. 准备JAR包
从http://book2s.com/java/jar/f/flume-ng-sql-source/download-flume-ng-sql-source-1.3.7.html下载flume-ng-sql-source-1.3.7.jar文件,并复制到Flume库目录。我用的是ambari搭建的集群因此命令如下:cp flume-ng-sql-source-1.3.7.jar /usr/hdp/current/flume-server/lib/
将MySQL JDBC驱动JAR包也复制到Flume库目录。
cp mysql-connector-java-5.1.17.jar /usr/hdp/current/flume-server/lib/mysql-connector-java.jar
4. 配置Flume
ambari主页面如下操作:Ambari -> Flume -> Configs -> flume.conf中配置如下属性:agent.channels.ch1.type = memory agent.sources.sql-source.channels = ch1 agent.channels = ch1 agent.sinks = HDFS agent.sources = sql-source agent.sources.sql-source.type = org.keedio.flume.source.SQLSource agent.sources.sql-source.connection.url = jdbc:mysql://你的ip:3306/test agent.sources.sql-source.user = root agent.sources.sql-source.password = 你的密码 agent.sources.sql-source.table = wlslog agent.sources.sql-source.columns.to.select = * agent.sources.sql-source.incremental.column.name = id agent.sources.sql-source.incremental.value = 0 agent.sources.sql-source.run.query.delay=5000 agent.sources.sql-source.status.file.path = /var/lib/flume agent.sources.sql-source.status.file.name = sql-source.status agent.sinks.HDFS.channel = ch1 agent.sinks.HDFS.type = hdfs agent.sinks.HDFS.hdfs.path = hdfs://你的namenode主机名/flume/mysql agent.sinks.HDFS.hdfs.fileType = DataStream agent.sinks.HDFS.hdfs.writeFormat = Text agent.sinks.HDFS.hdfs.rollSize = 268435456 agent.sinks.HDFS.hdfs.rollInterval = 0 agent.sinks.HDFS.hdfs.rollCount = 0
重启flume服务。hdfs对应目录下将会查看到数据
相关文章推荐
- 利用Flume将MySQL表数据准实时抽取到HDFS、MySQL、Kafka
- 利用Flume将MySQL表数据准实时抽取到HDFS
- 利用Flume将MySQL表数据准实时抽取到HDFS
- 利用Flume将MySQL表数据准实时抽取到HDFS
- 利用Flume将MySQL表数据准实时抽取到HDFS
- 用flume-ng-sql-source 从mysql 抽取数据到kafka被storm消费
- 使用sqoop从Oracle或mysql抽取数据到HDFS遇到的报错及解决
- flume 抽取图片文件数据写入到HDFS
- 用sqoop进行mysql和hdfs系统间的数据互导
- Flume1.4 相关参数设置,将收集的数据汇总到hdfs,解决许多小文件问题
- Flume采集数据到HDFS时,文件中有乱码
- Sqoop安装配置及将mysql数据导入到hdfs中
- SQOOP从HDFS导出数据到MySQL
- 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合
- Hadoop分布式文件系统HDFS——Flume和Sqoop导入数据,distcp并行复制,Hadoop存档
- Flume实时抽取监控目录数据
- Flume采集数据到HDFS中,开头信息有乱码
- 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合
- Storm之——Storm+Kafka+Flume+Zookeeper+MySQL实现数据实时分析(程序案例篇)
- mysql && hbase && hive && hdfs(部分) 数据互导