您的位置:首页 > 数据库 > MySQL

mapreduce 将hdfs数据逐行写入mysql

2015-08-27 17:07 483 查看

code

package hdfsToSQL;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class hdfsToSQL {
static String driver = "com.mysql.jdbc.Driver";
//  static String url = "jdbc:mysql://192.168.1.58:3306/powerloaddata?user=dbuser&password=lfmysql";
static String url = "jdbc:mysql://master:3306/test?user=root";
static Connection conn = null;
static Statement stmt = null;
static ResultSet rs = null;

public static class hdfsToSQLMapper extends Mapper<Object, Text, Text, IntWritable>{

public void map(Object key , Text value, Context context) throws IOException, InterruptedException {
// get lines
String line = value.toString();
String [] words = line.split(",");
if (words.length == 3){

try {
// write sql
Class.forName(driver);
conn = DriverManager.getConnection(url);
stmt = conn.createStatement();
String sql = "insert into DataPowerPrediction values("+words[0]+","+words[1]+","+words[2]+")";

stmt.executeUpdate(sql);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {

try {
conn.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

}

}
}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

if (otherArgs.length != 2) {

System.err.println("Usage: wordcount <in> <out>");

System.exit(2);

}

Job job = Job.getInstance(conf, "hdfsToSQL");

job.setJarByClass(hdfsToSQL.class);

job.setMapperClass(hdfsToSQLMapper.class);

//      job.setCombinerClass(IntSumReducer.class);

//      job.setReducerClass(IntSumReducer.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(IntWritable.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}


运行代码

/usr/hadoop/bin/hadoop jar hdfsToSQL.jar hdfsToSQL.hdfsToSQL hdfs://master:9000/user/root/data/foreastdatatest.csv hdfs://master:9000/user/root/output/hdfsToSQL4


结果

15/08/27 02:02:08 INFO mapreduce.Job:  map 19% reduce 0%
15/08/27 02:02:11 INFO mapreduce.Job:  map 33% reduce 0%
15/08/27 02:02:14 INFO mapreduce.Job:  map 47% reduce 0%
15/08/27 02:02:17 INFO mapreduce.Job:  map 62% reduce 0%
15/08/27 02:02:19 INFO mapreduce.Job:  map 100% reduce 0%
15/08/27 02:02:24 INFO mapreduce.Job:  map 100% reduce 100%
15/08/27 02:02:24 INFO mapreduce.Job: Job job_1440638983382_0001 completed successfully


参考:

http://www.powerxing.com/hadoop-build-project-by-shell/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: