mapreduce 将hdfs数据逐行写入mysql
2015-08-27 17:07
483 查看
code
package hdfsToSQL; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; public class hdfsToSQL { static String driver = "com.mysql.jdbc.Driver"; // static String url = "jdbc:mysql://192.168.1.58:3306/powerloaddata?user=dbuser&password=lfmysql"; static String url = "jdbc:mysql://master:3306/test?user=root"; static Connection conn = null; static Statement stmt = null; static ResultSet rs = null; public static class hdfsToSQLMapper extends Mapper<Object, Text, Text, IntWritable>{ public void map(Object key , Text value, Context context) throws IOException, InterruptedException { // get lines String line = value.toString(); String [] words = line.split(","); if (words.length == 3){ try { // write sql Class.forName(driver); conn = DriverManager.getConnection(url); stmt = conn.createStatement(); String sql = "insert into DataPowerPrediction values("+words[0]+","+words[1]+","+words[2]+")"; stmt.executeUpdate(sql); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); }finally { try { conn.close(); } catch (SQLException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length != 2) { System.err.println("Usage: wordcount <in> <out>"); System.exit(2); } Job job = Job.getInstance(conf, "hdfsToSQL"); job.setJarByClass(hdfsToSQL.class); job.setMapperClass(hdfsToSQLMapper.class); // job.setCombinerClass(IntSumReducer.class); // job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(otherArgs[0])); FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
运行代码
/usr/hadoop/bin/hadoop jar hdfsToSQL.jar hdfsToSQL.hdfsToSQL hdfs://master:9000/user/root/data/foreastdatatest.csv hdfs://master:9000/user/root/output/hdfsToSQL4
结果
15/08/27 02:02:08 INFO mapreduce.Job: map 19% reduce 0% 15/08/27 02:02:11 INFO mapreduce.Job: map 33% reduce 0% 15/08/27 02:02:14 INFO mapreduce.Job: map 47% reduce 0% 15/08/27 02:02:17 INFO mapreduce.Job: map 62% reduce 0% 15/08/27 02:02:19 INFO mapreduce.Job: map 100% reduce 0% 15/08/27 02:02:24 INFO mapreduce.Job: map 100% reduce 100% 15/08/27 02:02:24 INFO mapreduce.Job: Job job_1440638983382_0001 completed successfully
参考:
http://www.powerxing.com/hadoop-build-project-by-shell/
相关文章推荐
- mysql单表数量极限和性能
- Mysqladmin Mysql 管理工具
- lvs+keepalived+mysql cluster实现负载均衡
- 8 个不得不说的 MySQL 陷阱
- Mysql数据库备份及恢复
- 8 个不得不说的 MySQL 陷阱
- 使用lvm 备份和恢复mysql数据库
- 如何安装mysql
- mysql 函数处理
- mysql免安装版window报1067错误解决
- mysql 修改server characterset latin1为 utf8(乱码问题)
- MYSQL的事务处理功能
- mysql挿入数据中文乱码问题
- Ubuntu无法进入mysql,报ERROR 2002 (HY000): Can't connect to local MySQL server through socket …错误
- mysql 创建用户、 授权
- mysql数据库的汉字乱码问题
- 彻底卸载MySQL
- mysql入门
- mysql 没有rowid 怎么实现根据rowid回表呢?
- mysql 多列唯一索引在事务中select for update是不是行锁?