hadoop学习笔记之操作mysql数据库
2014-06-05 12:06
429 查看
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import org.apache.hadoop.mapred.lib.db.DBWritable; public class hadoop_mysql_read { public static class StudentInfoRecord implements Writable, DBWritable { int id; String name; public StudentInfoRecord() { } @Override public void write(PreparedStatement statement) throws SQLException { statement.setInt(1, this.id); statement.setString(2, this.name); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.id = resultSet.getInt(1); this.name = resultSet.getString(2); try { this.name=new String(this.name.getBytes(),"utf-8"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.id); Text.writeString(out, this.name); } @Override public void readFields(DataInput in) throws IOException { this.id = in.readInt(); this.name = Text.readString(in); } @Override public String toString() { return "StudentInfoRecord [id=" + id + ", name=" + name + "]"; } } public static class DBMapper extends MapReduceBase implements Mapper<LongWritable, StudentInfoRecord, LongWritable, Text> { @Override public void map(LongWritable key, StudentInfoRecord value, OutputCollector<LongWritable, Text> output, Reporter reporter) throws IOException { System.out.println(value); output.collect(new LongWritable(value.id), new Text(value.name)); } } /** CREATE TABLE `studentinfo` ( `id` int(11) NOT NULL, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=MyISAM DEFAULT CHARSET=utf8; */ public static void main(String[] args) throws IOException { JobConf conf=new JobConf(hadoop_mysql_read.class); conf.setOutputKeyClass(LongWritable.class); conf.setOutputValueClass(Text.class); conf.setInputFormat(DBInputFormat.class); FileOutputFormat.setOutputPath(conf, new Path("output")); //需要把mysql.jar放入hadoop lib目录 //jdbc:mysql://localhost:3306/library?useUnicode=true&characterEncoding=UTF-8 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.9:3306/hadoop?useUnicode=true&characterEncoding=UTF-8","root","123456"); String[] fields={"id","name"}; DBInputFormat.setInput(conf, StudentInfoRecord.class, "studentinfo", null,"id",fields); conf.setMapperClass(DBMapper.class); conf.setReducerClass(IdentityReducer.class); JobClient.runJob(conf); } }
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.Iterator; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBOutputFormat; import org.apache.hadoop.mapred.lib.db.DBWritable; public class hadoop_mysql_write { public static class StudentInfoRecord implements Writable, DBWritable { int id; String name; public StudentInfoRecord() { } @Override public void write(PreparedStatement statement) throws SQLException { statement.setInt(1, this.id); statement.setString(2, this.name); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.id = resultSet.getInt(1); this.name = resultSet.getString(2); System.out.println(toString()); } @Override public void write(DataOutput out) throws IOException { out.writeInt(this.id); Text.writeString(out, this.name); } @Override public void readFields(DataInput in) throws IOException { this.id = in.readInt(); this.name = Text.readString(in); } @Override public String toString() { return "StudentInfoRecord [id=" + id + ", name=" + name + "]"; } } // 定义了MyReducer类实现了map和reduce public static class MyReducer extends MapReduceBase implements Reducer<LongWritable, Text, StudentInfoRecord, Text> { @Override public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<StudentInfoRecord, Text> output, Reporter reporter) throws IOException { //从hua.bcp文件中读取数据 插入数据库 String[] splits = values.next().toString().split("\t"); StudentInfoRecord r = new StudentInfoRecord(); r.id = Integer.parseInt(splits[0]); r.name = splits[1]; r.name=new String(r.name.getBytes(),"utf-8"); System.out.println(r.name); output.collect(r, new Text(r.name)); } } /** CREATE TABLE `studentinfo` ( `id` int(11) NOT NULL, `name` varchar(255) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=MyISAM DEFAULT CHARSET=utf8; */ public static void main(String[] args) throws IOException { JobConf conf=new JobConf(hadoop_mysql_write.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(DBOutputFormat.class); //从hua.bcp文件中读取数据 插入数据库 FileInputFormat.setInputPaths(conf, new Path("hua.bcp")); //需要把mysql.jar放入hadoop lib目录 //jdbc:mysql://localhost:3306/library?useUnicode=true&characterEncoding=UTF-8 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.9:3306/hadoop?useUnicode=true&characterEncoding=UTF-8","root","123456"); //设置数据库表 DBOutputFormat.setOutput(conf, "studentinfo", "id","name"); conf.setMapperClass(IdentityMapper.class); conf.setReducerClass(MyReducer.class); JobClient.runJob(conf); } }
相关文章推荐
- Hadoop2.6.0学习笔记(七)MapReduce操作MySQL数据库
- hadoop学习笔记之-pig安装及操作实例
- MYSQL数据库 - 学习笔记2 - C语言操作MYSQL
- 【PHP+MySQL学习笔记】php操作MySQL数据库中语句
- Python的学习笔记(四)(MySQL数据库的操作)
- [Python] 学习笔记之MySQL数据库操作
- 【MYSQL数据库】MYSQL学习笔记-mysql分区基本操作
- python学习笔记--操作mysql数据库
- hadoop学习笔记(HDFS的文件操作)
- MySQL学习笔记1----MySQL数据库基本操作
- Hadoop学习笔记0002——HDFS文件操作
- Hadoop学习笔记之(二):实验Hadoop的文件块复制删除操作感受强大的容灾性
- Hadoop学习笔记之操作hive
- Hadoop学习笔记_操作篇之一:HDFS操作
- mysql数据库学习笔记之常用操作命令
- MYSQL数据库 - 学习笔记1 - 数据库基本操作
- Spark Hadoop集群部署与Spark操作HDFS运行详解---Spark学习笔记10
- hadoop学习笔记1.使用shell和JAVA API操作HDFS
- hadoop学习笔记3:shell下的hdfs操作
- hadoop学习笔记(11)——hbase shell简单操作示例