您的位置:首页 > 数据库 > SQL

hadoop学习笔记之操作mysql数据库

2014-06-05 12:06 429 查看
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;

public class hadoop_mysql_read {

public static class StudentInfoRecord implements Writable, DBWritable {
int id;
String name;

public StudentInfoRecord() {

}

@Override
public void write(PreparedStatement statement) throws SQLException {

statement.setInt(1, this.id);
statement.setString(2, this.name);
}

@Override
public void readFields(ResultSet resultSet) throws SQLException {

this.id = resultSet.getInt(1);
this.name = resultSet.getString(2);
try {
this.name=new String(this.name.getBytes(),"utf-8");

} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}

}

@Override
public void write(DataOutput out) throws IOException {

out.writeInt(this.id);
Text.writeString(out, this.name);

}

@Override
public void readFields(DataInput in) throws IOException {

this.id = in.readInt();
this.name = Text.readString(in);

}

@Override
public String toString() {
return "StudentInfoRecord [id=" + id + ", name=" + name + "]";
}
}
public static class DBMapper extends MapReduceBase implements Mapper<LongWritable, StudentInfoRecord, LongWritable, Text>
{

@Override
public void map(LongWritable key, StudentInfoRecord value,
OutputCollector<LongWritable, Text> output, Reporter reporter)
throws IOException {
System.out.println(value);
output.collect(new LongWritable(value.id), new Text(value.name));
}

}

/**
CREATE TABLE `studentinfo` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
*/
public static void main(String[] args) throws IOException {

JobConf conf=new JobConf(hadoop_mysql_read.class);
conf.setOutputKeyClass(LongWritable.class);
conf.setOutputValueClass(Text.class);
conf.setInputFormat(DBInputFormat.class);
FileOutputFormat.setOutputPath(conf, new Path("output"));
//需要把mysql.jar放入hadoop lib目录
//jdbc:mysql://localhost:3306/library?useUnicode=true&characterEncoding=UTF-8
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.9:3306/hadoop?useUnicode=true&characterEncoding=UTF-8","root","123456");
String[] fields={"id","name"};
DBInputFormat.setInput(conf, StudentInfoRecord.class, "studentinfo", null,"id",fields);

conf.setMapperClass(DBMapper.class);
conf.setReducerClass(IdentityReducer.class);
JobClient.runJob(conf);

}

}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.db.DBConfiguration;
import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
import org.apache.hadoop.mapred.lib.db.DBWritable;

public class hadoop_mysql_write {

public static class StudentInfoRecord implements Writable, DBWritable {
int id;
String name;

public StudentInfoRecord() {

}

@Override
public void write(PreparedStatement statement) throws SQLException {

statement.setInt(1, this.id);
statement.setString(2, this.name);
}

@Override
public void readFields(ResultSet resultSet) throws SQLException {

this.id = resultSet.getInt(1);
this.name = resultSet.getString(2);
System.out.println(toString());
}

@Override
public void write(DataOutput out) throws IOException {

out.writeInt(this.id);
Text.writeString(out, this.name);

}

@Override
public void readFields(DataInput in) throws IOException {

this.id = in.readInt();
this.name = Text.readString(in);

}

@Override
public String toString() {
return "StudentInfoRecord [id=" + id + ", name=" + name + "]";
}

}

// 定义了MyReducer类实现了map和reduce
public static class MyReducer extends MapReduceBase implements
Reducer<LongWritable, Text, StudentInfoRecord, Text> {
@Override
public void reduce(LongWritable key, Iterator<Text> values,
OutputCollector<StudentInfoRecord, Text> output,
Reporter reporter) throws IOException {
//从hua.bcp文件中读取数据  插入数据库
String[] splits = values.next().toString().split("\t");

StudentInfoRecord r = new StudentInfoRecord();
r.id = Integer.parseInt(splits[0]);
r.name = splits[1];
r.name=new String(r.name.getBytes(),"utf-8");
System.out.println(r.name);
output.collect(r, new Text(r.name));
}
}
/**
CREATE TABLE `studentinfo` (
`id` int(11) NOT NULL,
`name` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
*/
public static void main(String[] args) throws IOException {

JobConf conf=new JobConf(hadoop_mysql_write.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(DBOutputFormat.class);
//从hua.bcp文件中读取数据  插入数据库
FileInputFormat.setInputPaths(conf, new Path("hua.bcp"));
//需要把mysql.jar放入hadoop lib目录
//jdbc:mysql://localhost:3306/library?useUnicode=true&characterEncoding=UTF-8
DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.9:3306/hadoop?useUnicode=true&characterEncoding=UTF-8","root","123456");
//设置数据库表
DBOutputFormat.setOutput(conf, "studentinfo", "id","name");
conf.setMapperClass(IdentityMapper.class);
conf.setReducerClass(MyReducer.class);

JobClient.runJob(conf);

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: