您的位置:首页 > 数据库 > SQL

hadoop连接mysql数据库执行数据读写数据库操作

2017-05-21 10:10 651 查看
目录(?)[+]

为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包有两种方式:

(1)在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。

(2)a)把包传到集群上: Hadoop fs -put MySQL-connector-Java-5.1.0-
bin.jar /hdfsPath/

b)在mr程序提交job前,添加语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java-5.1.0-bin.jar”),conf);

mysql数据库存储到hadoop hdfs

mysql表创建和数据初始化

[sql] view
plain copy

print?

DROP TABLE IF EXISTS `wu_testhadoop`;

CREATE TABLE `wu_testhadoop` (

`id` int(11) NOT NULL AUTO_INCREMENT,

`title` varchar(255) DEFAULT NULL,

`content` varchar(255) DEFAULT NULL,

PRIMARY KEY (`id`)

) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8;

-- ----------------------------

-- Records of wu_testhadoop

-- ----------------------------

INSERT INTO `wu_testhadoop` VALUES ('1', '123', '122312');

INSERT INTO `wu_testhadoop` VALUES ('2', '123', '123456');

定义hadoop数据访问

mysql表创建完毕后,我们需要定义hadoop访问mysql的规则;

hadoop提供了org.apache.hadoop.io.Writable接口来实现简单的高效的可序列化的协议,该类基于DataInput和DataOutput来实现相关的功能。

hadoop对数据库访问也提供了org.apache.hadoop.mapred.lib.db.DBWritable接口,其中write方法用于对PreparedStatement对象设定值,readFields方法用于对从数据库读取出来的对象进行列的值绑定;

以上两个接口的使用如下(内容是从源码得来)

writable

[java] view
plain copy

print?

public class MyWritable implements Writable {

// Some data

private int counter;

private long timestamp;

public void write(DataOutput out) throws IOException {

out.writeInt(counter);

out.writeLong(timestamp);

}

public void readFields(DataInput in) throws IOException {

counter = in.readInt();

timestamp = in.readLong();

}

public static MyWritable read(DataInput in) throws IOException {

MyWritable w = new MyWritable();

w.readFields(in);

return w;

}

}



DBWritable

[java] view
plain copy

print?

public class MyWritable implements Writable, DBWritable {

// Some data

private int counter;

private long timestamp;

//Writable#write() implementation

public void write(DataOutput out) throws IOException {

out.writeInt(counter);

out.writeLong(timestamp);

}

//Writable#readFields() implementation

public void readFields(DataInput in) throws IOException {

counter = in.readInt();

timestamp = in.readLong();

}

public void write(PreparedStatement statement) throws SQLException {

statement.setInt(1, counter);

statement.setLong(2, timestamp);

}

public void readFields(ResultSet resultSet) throws SQLException {

counter = resultSet.getInt(1);

timestamp = resultSet.getLong(2);

}

}

数据库对应的实现

[java] view
plain copy

print?

package com.wyg.hadoop.mysql.bean;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.lib.db.DBWritable;

public class DBRecord implements Writable, DBWritable{

private int id;

private String title;

private String content;

public int getId() {

return id;

}

public void setId(int id) {

this.id = id;

}

public String getTitle() {

return title;

}

public void setTitle(String title) {

this.title = title;

}

public String getContent() {

return content;

}

public void setContent(String content) {

this.content = content;

}

@Override

public void readFields(ResultSet set) throws SQLException {

this.id = set.getInt("id");

this.title = set.getString("title");

this.content = set.getString("content");

}

@Override

public void write(PreparedStatement pst) throws SQLException {

pst.setInt(1, id);

pst.setString(2, title);

pst.setString(3, content);

}

@Override

public void readFields(DataInput in) throws IOException {

this.id = in.readInt();

this.title = Text.readString(in);

this.content = Text.readString(in);

}

@Override

public void write(DataOutput out) throws IOException {

out.writeInt(this.id);

Text.writeString(out, this.title);

Text.writeString(out, this.content);

}

@Override

public String toString() {

return this.id + " " + this.title + " " + this.content;

}

}



实现Map/Reduce

[java] view
plain copy

print?

package com.wyg.hadoop.mysql.mapper;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reporter;

import com.wyg.hadoop.mysql.bean.DBRecord;

@SuppressWarnings("deprecation")

public class DBRecordMapper extends MapReduceBase implements Mapper<LongWritable, DBRecord, LongWritable, Text>{

@Override

public void map(LongWritable key, DBRecord value,

OutputCollector<LongWritable, Text> collector, Reporter reporter)

throws IOException {

collector.collect(new LongWritable(value.getId()), new Text(value.toString()));

}

}

测试hadoop连接mysql并将数据存储到hdfs

[java] view
plain copy

print?

package com.wyg.hadoop.mysql.db;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.lib.IdentityReducer;

import org.apache.hadoop.mapred.lib.db.DBConfiguration;

import org.apache.hadoop.mapred.lib.db.DBInputFormat;

import com.wyg.hadoop.mysql.bean.DBRecord;

import com.wyg.hadoop.mysql.mapper.DBRecordMapper;

public class DBAccess {

public static void main(String[] args) throws IOException {

JobConf conf = new JobConf(DBAccess.class);

conf.setOutputKeyClass(LongWritable.class);

conf.setOutputValueClass(Text.class);

conf.setInputFormat(DBInputFormat.class);

Path path = new Path("hdfs://192.168.44.129:9000/user/root/dbout");

FileOutputFormat.setOutputPath(conf, path);

DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://你的ip:3306/数据库名","用户名","密码");

String [] fields = {"id", "title", "content"};

DBInputFormat.setInput(conf, DBRecord.class, "wu_testhadoop",

null, "id", fields);

conf.setMapperClass(DBRecordMapper.class);

conf.setReducerClass(IdentityReducer.class);

JobClient.runJob(conf);

}

}

执行程序,结果如下:

[java] view
plain copy

print?

15/08/11 16:46:18 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=

15/08/11 16:46:18 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

15/08/11 16:46:18 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).

15/08/11 16:46:19 INFO mapred.JobClient: Running job: job_local_0001

15/08/11 16:46:19 INFO mapred.MapTask: numReduceTasks: 1

15/08/11 16:46:19 INFO mapred.MapTask: io.sort.mb = 100

15/08/11 16:46:19 INFO mapred.MapTask: data buffer = 79691776/99614720

15/08/11 16:46:19 INFO mapred.MapTask: record buffer = 262144/327680

15/08/11 16:46:19 INFO mapred.MapTask: Starting flush of map output

15/08/11 16:46:19 INFO mapred.MapTask: Finished spill 0

15/08/11 16:46:19 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

15/08/11 16:46:19 INFO mapred.LocalJobRunner:

15/08/11 16:46:19 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.

15/08/11 16:46:19 INFO mapred.LocalJobRunner:

15/08/11 16:46:19 INFO mapred.Merger: Merging 1 sorted segments

15/08/11 16:46:19 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 48 bytes

15/08/11 16:46:19 INFO mapred.LocalJobRunner:

15/08/11 16:46:19 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

15/08/11 16:46:19 INFO mapred.LocalJobRunner:

15/08/11 16:46:19 INFO mapred.TaskRunner: Task attempt_local_0001_r_000000_0 is allowed to commit now

15/08/11 16:46:19 INFO mapred.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.44.129:9000/user/root/dbout

15/08/11 16:46:19 INFO mapred.LocalJobRunner: reduce > reduce

15/08/11 16:46:19 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.

15/08/11 16:46:20 INFO mapred.JobClient: map 100% reduce 100%

15/08/11 16:46:20 INFO mapred.JobClient: Job complete: job_local_0001

15/08/11 16:46:20 INFO mapred.JobClient: Counters: 14

15/08/11 16:46:20 INFO mapred.JobClient: FileSystemCounters

15/08/11 16:46:20 INFO mapred.JobClient: FILE_BYTES_READ=34606

15/08/11 16:46:20 INFO mapred.JobClient: FILE_BYTES_WRITTEN=69844

15/08/11 16:46:20 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=30

15/08/11 16:46:20 INFO mapred.JobClient: Map-Reduce Framework

15/08/11 16:46:20 INFO mapred.JobClient: Reduce input groups=2

15/08/11 16:46:20 INFO mapred.JobClient: Combine output records=0

15/08/11 16:46:20 INFO mapred.JobClient: Map input records=2

15/08/11 16:46:20 INFO mapred.JobClient: Reduce shuffle bytes=0

15/08/11 16:46:20 INFO mapred.JobClient: Reduce output records=2

15/08/11 16:46:20 INFO mapred.JobClient: Spilled Records=4

15/08/11 16:46:20 INFO mapred.JobClient: Map output bytes=42

15/08/11 16:46:20 INFO mapred.JobClient: Map input bytes=2

15/08/11 16:46:20 INFO mapred.JobClient: Combine input records=0

15/08/11 16:46:20 INFO mapred.JobClient: Map output records=2

15/08/11 16:46:20 INFO mapred.JobClient: Reduce input records=2

同时可以看到hdfs文件系统多了一个dbout的目录,里边的文件保存了数据库对应的数据,内容保存如下

[java] view
plain copy

print?

1 1 123 122312

2 2 123 123456



hdfs数据导入到mysql

hdfs文件存储到mysql,也需要上边的DBRecord类作为辅助,因为数据库的操作都是通过DBInput和DBOutput来进行的;

首先需要定义map和reduce的实现(map用以对hdfs的文档进行解析,reduce解析map的输出并输出)

[java] view
plain copy

print?

package com.wyg.hadoop.mysql.mapper;

import java.io.IOException;

import java.io.DataInput;

import java.io.DataOutput;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.util.Iterator;

import org.apache.hadoop.filecache.DistributedCache;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import com.wyg.hadoop.mysql.bean.DBRecord;

public class WriteDB {

// Map处理过程

public static class Map extends MapReduceBase implements

Mapper<Object, Text, Text, DBRecord> {

private final static DBRecord one = new DBRecord();

private Text word = new Text();

@Override

public void map(Object key, Text value,

OutputCollector<Text, DBRecord> output, Reporter reporter)

throws IOException {

String line = value.toString();

String[] infos = line.split(" ");

String id = infos[0].split(" ")[1];

one.setId(new Integer(id));

one.setTitle(infos[1]);

one.setContent(infos[2]);

word.set(id);

output.collect(word, one);

}

}

public static class Reduce extends MapReduceBase implements

Reducer<Text, DBRecord, DBRecord, Text> {

@Override

public void reduce(Text key, Iterator<DBRecord> values,

OutputCollector<DBRecord, Text> collector, Reporter reporter)

throws IOException {

&n
26342
bsp; DBRecord record = values.next();

collector.collect(record, new Text());

}

}

}

测试hdfs导入数据到数据库

[java] view
plain copy

print?

package com.wyg.hadoop.mysql.db;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.lib.db.DBConfiguration;

import org.apache.hadoop.mapred.lib.db.DBInputFormat;

import org.apache.hadoop.mapred.lib.db.DBOutputFormat;

import com.wyg.hadoop.mysql.bean.DBRecord;

import com.wyg.hadoop.mysql.mapper.WriteDB;

public class DBInsert {

public static void main(String[] args) throws Exception {

JobConf conf = new JobConf(WriteDB.class);

// 设置输入输出类型

conf.setInputFormat(TextInputFormat.class);

conf.setOutputFormat(DBOutputFormat.class);

// 不加这两句,通不过,但是网上给的例子没有这两句。

//Text, DBRecord

conf.setMapOutputKeyClass(Text.class);

conf.setMapOutputValueClass(DBRecord.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(DBRecord.class);

// 设置Map和Reduce类

conf.setMapperClass(WriteDB.Map.class);

conf.setReducerClass(WriteDB.Reduce.class);

// 设置输如目录

FileInputFormat.setInputPaths(conf, new Path("hdfs://192.168.44.129:9000/user/root/dbout"));

// 建立数据库连接

DBConfiguration.configureDB(conf,"com.mysql.jdbc.Driver", "jdbc:mysql://数据库ip:3306/数据库名称","用户名","密码");

String[] fields = {"id","title","content" };

DBOutputFormat.setOutput(conf, "wu_testhadoop", fields);

JobClient.runJob(conf);

}

}

测试结果如下

[java] view
plain copy

print?

15/08/11 18:10:15 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=

15/08/11 18:10:15 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.

15/08/11 18:10:15 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String).

15/08/11 18:10:15 INFO mapred.FileInputFormat: Total input paths to process : 1

15/08/11 18:10:15 INFO mapred.JobClient: Running job: job_local_0001

15/08/11 18:10:15 INFO mapred.FileInputFormat: Total input paths to process : 1

15/08/11 18:10:15 INFO mapred.MapTask: numReduceTasks: 1

15/08/11 18:10:15 INFO mapred.MapTask: io.sort.mb = 100

15/08/11 18:10:15 INFO mapred.MapTask: data buffer = 79691776/99614720

15/08/11 18:10:15 INFO mapred.MapTask: record buffer = 262144/327680

15/08/11 18:10:15 INFO mapred.MapTask: Starting flush of map output

15/08/11 18:10:16 INFO mapred.MapTask: Finished spill 0

15/08/11 18:10:16 INFO mapred.TaskRunner: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting

15/08/11 18:10:16 INFO mapred.LocalJobRunner: hdfs://192.168.44.129:9000/user/root/dbout/part-00000:0+30

15/08/11 18:10:16 INFO mapred.TaskRunner: Task 'attempt_local_0001_m_000000_0' done.

15/08/11 18:10:16 INFO mapred.LocalJobRunner:

15/08/11 18:10:16 INFO mapred.Merger: Merging 1 sorted segments

15/08/11 18:10:16 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 40 bytes

15/08/11 18:10:16 INFO mapred.LocalJobRunner:

15/08/11 18:10:16 INFO mapred.TaskRunner: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting

15/08/11 18:10:16 INFO mapred.LocalJobRunner: reduce > reduce

15/08/11 18:10:16 INFO mapred.TaskRunner: Task 'attempt_local_0001_r_000000_0' done.

15/08/11 18:10:16 INFO mapred.JobClient: map 100% reduce 100%

15/08/11 18:10:16 INFO mapred.JobClient: Job complete: job_local_0001

15/08/11 18:10:16 INFO mapred.JobClient: Counters: 14

15/08/11 18:10:16 INFO mapred.JobClient: FileSystemCounters

15/08/11 18:10:16 INFO mapred.JobClient: FILE_BYTES_READ=34932

15/08/11 18:10:16 INFO mapred.JobClient: HDFS_BYTES_READ=60

15/08/11 18:10:16 INFO mapred.JobClient: FILE_BYTES_WRITTEN=70694

15/08/11 18:10:16 INFO mapred.JobClient: Map-Reduce Framework

15/08/11 18:10:16 INFO mapred.JobClient: Reduce input groups=2

15/08/11 18:10:16 INFO mapred.JobClient: Combine output records=0

15/08/11 18:10:16 INFO mapred.JobClient: Map input records=2

15/08/11 18:10:16 INFO mapred.JobClient: Reduce shuffle bytes=0

15/08/11 18:10:16 INFO mapred.JobClient: Reduce output records=2

15/08/11 18:10:16 INFO mapred.JobClient: Spilled Records=4

15/08/11 18:10:16 INFO mapred.JobClient: Map output bytes=34

15/08/11 18:10:16 INFO mapred.JobClient: Map input bytes=30

15/08/11 18:10:16 INFO mapred.JobClient: Combine input records=0

15/08/11 18:10:16 INFO mapred.JobClient: Map output records=2

15/08/11 18:10:16 INFO mapred.JobClient: Reduce input records=2

测试之前我对原有表进行了清空处理,可以看到执行后数据库里边添加了两条内容;

下次在执行的时候会报错,属于正常情况,原因在于我们导入数据的时候对id进行赋值了,如果忽略id,是可以一直添加的;

源码下载地址

源码已上传,下载地址为download.csdn.net/detail/wuyinggui10000/8974585
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐