您的位置:首页 > 数据库 > MySQL

MapReduce编程实例(二)-MR2操作MySQL

2014-07-01 10:40 134 查看
MR2中如果需要从数据库中读取或者写入数据,需要自己实现Writable和DBWritable两个接口,同时还需在DBConfiguration对数据库的元信息做相应配置。

下面这个例子是Hadoop自带的,只是将数据库改为MySQL,在执行之前需要将Java连接MySQL的驱动上传到每台机器的${HADOOP_HOME}/share/hadoop/common路径下

这里需要说明两点:

1、一般不需要MR直接操作数据库,因为MR并发执行,很容易把数据库跑崩,此例只展示如何操作

2、如果必须要操作数据库,可以将数据库里面的数据导出到文本文件中,计算完成后,在通过SQLLoad的方式保存到数据库

例子比较简单,通过注释可以看懂

[java] view
plaincopyprint?





package com.test.mr2;

/**

*MapReduce操作数据库实例

*计算页面访问总量,Access表中保存了每个页面访问明细,Pageview表保存了每个页面被浏览的总次数

*sum Access表的每个url,将计算结果写到Pageview

*/

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import java.sql.Connection;

import java.sql.DriverManager;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.SQLException;

import java.sql.Statement;

import java.util.Random;

import org.apache.commons.logging.Log;

import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.NullWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.io.Writable;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;

import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;

import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;

import org.apache.hadoop.mapreduce.lib.db.DBWritable;

import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;

import org.apache.hadoop.util.StringUtils;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

public class DBOperator extends Configured implements Tool {

private static final Log LOG = LogFactory.getLog(DBOperator.class);

private Connection connection;

private boolean initialized = false;

// Access表字段名称

private static final String[] AccessFieldNames = { "url", "referrer",

"time" };

// Pageview表字段名称

private static final String[] PageviewFieldNames = { "url", "pageview" };

private static final String DB_URL = "jdbc:mysql://192.168.1.182:3306/hadoop?createDatabaseIfNotExist=true";

private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";

private static final String DB_USER = "root";

private static final String DB_PASSWORD = "root";

// 建立数据库连接

private void createConnection(String driverClassName, String url)

throws Exception {

Class.forName(driverClassName);

connection = DriverManager.getConnection(url, DB_USER, DB_PASSWORD);

connection.setAutoCommit(false);

}

// 关闭数据库连接

private void shutdown() {

try {

connection.commit();

} catch (Throwable ex) {

LOG.warn("Exception occurred while closing connection :"

+ StringUtils.stringifyException(ex));

} finally {

try {

if (connection != null) {

connection.close();

}

} catch (Throwable ex) {

LOG.warn("Exception occurred while shutting down HSQLDB :"

+ StringUtils.stringifyException(ex));

}

}

}

// 初始化数据库信息

private void initialize(String driverClassName, String url)

throws Exception {

if (!this.initialized) {

createConnection(driverClassName, url);

dropTables();

createTables();

populateAccess();

this.initialized = true;

}

}

private void dropTables() {

String dropAccess = "DROP TABLE Access IF EXISTS";

String dropPageview = "DROP TABLE Pageview IF EXISTS";

Statement st = null;

try {

st = connection.createStatement();

st.executeUpdate(dropAccess);

st.executeUpdate(dropPageview);

connection.commit();

st.close();

} catch (SQLException ex) {

try {

if (st != null) {

st.close();

}

} catch (Exception e) {

}

}

}

private void createTables() throws SQLException {

String createAccess = "CREATE TABLE IF NOT EXISTS "

+ "Access(url VARCHAR(100) NOT NULL,"

+ " referrer VARCHAR(100)," + " time BIGINT NOT NULL, "

+ " PRIMARY KEY (url, time))";

String createPageview = "CREATE TABLE IF NOT EXISTS "

+ "Pageview(url VARCHAR(100) NOT NULL,"

+ " pageview BIGINT NOT NULL, " + " PRIMARY KEY (url))";

Statement st = connection.createStatement();

try {

st.executeUpdate(createAccess);

st.executeUpdate(createPageview);

connection.commit();

} finally {

st.close();

}

}

/**

* 填充测试数据到Access表中

*/

private void populateAccess() throws SQLException {

PreparedStatement statement = null;

try {

statement = connection

.prepareStatement("INSERT INTO Access(url, referrer, time)"

+ " VALUES (?, ?, ?)");

Random random = new Random();

int time = random.nextInt(50) + 50;

final int PROBABILITY_PRECISION = 100; // 1 / 100

final int NEW_PAGE_PROBABILITY = 15; // 15 / 100

// Pages in the site :

String[] pages = { "/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h",

"/i", "/j" };

// linkMatrix[i] is the array of pages(indexes) that page_i links

// to.

int[][] linkMatrix = { { 1, 5, 7 }, { 0, 7, 4, 6, },

{ 0, 1, 7, 8 }, { 0, 2, 4, 6, 7, 9 }, { 0, 1 },

{ 0, 3, 5, 9 }, { 0 }, { 0, 1, 3 }, { 0, 2, 6 },

{ 0, 2, 6 } };

// a mini model of user browsing a la pagerank

int currentPage = random.nextInt(pages.length);

String referrer = null;

for (int i = 0; i < time; i++) {

statement.setString(1, pages[currentPage]);

statement.setString(2, referrer);

statement.setLong(3, i);

statement.execute();

int action = random.nextInt(PROBABILITY_PRECISION);

// go to a new page with probability

// NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION

if (action < NEW_PAGE_PROBABILITY) {

currentPage = random.nextInt(pages.length); // a random page

referrer = null;

} else {

referrer = pages[currentPage];

action = random.nextInt(linkMatrix[currentPage].length);

currentPage = linkMatrix[currentPage][action];

}

}

connection.commit();

} catch (SQLException ex) {

connection.rollback();

throw ex;

} finally {

if (statement != null) {

statement.close();

}

}

}

/**

*

* 验证结果是否正确

*/

private boolean verify() throws SQLException {

String countAccessQuery = "SELECT COUNT(*) FROM Access";

String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview";

Statement st = null;

ResultSet rs = null;

try {

st = connection.createStatement();

rs = st.executeQuery(countAccessQuery);

rs.next();

long totalPageview = rs.getLong(1);

rs = st.executeQuery(sumPageviewQuery);

rs.next();

long sumPageview = rs.getLong(1);

LOG.info("totalPageview=" + totalPageview);

LOG.info("sumPageview=" + sumPageview);

/*

* Access表中记录总数应该和Pageview表中pageview的和相等

*/

return totalPageview == sumPageview && totalPageview != 0;

} finally {

if (st != null)

st.close();

if (rs != null)

rs.close();

}

}

/**

* Access表的记录对象

*/

static class AccessRecord implements Writable, DBWritable {

String url;

String referrer;

long time;

/*

* 从输入流中反序列化需要的属性

*/

public void readFields(DataInput in) throws IOException {

this.url = Text.readString(in);

this.referrer = Text.readString(in);

this.time = in.readLong();

}

/*

* 将属性序列化到输出流中

*/

public void write(DataOutput out) throws IOException {

Text.writeString(out, url);

Text.writeString(out, referrer);

out.writeLong(time);

}

public void readFields(ResultSet resultSet) throws SQLException {

this.url = resultSet.getString(1);

this.referrer = resultSet.getString(2);

this.time = resultSet.getLong(3);

}

public void write(PreparedStatement statement) throws SQLException {

statement.setString(1, url);

statement.setString(2, referrer);

statement.setLong(3, time);

}

@Override

public String toString() {

return url + " " + referrer +" "+time;

}

}

/**

* Pageview表的记录对象

*/

static class PageviewRecord implements Writable, DBWritable {

String url;

long pageview;

public PageviewRecord(String url, long pageview) {

this.url = url;

this.pageview = pageview;

}

public void readFields(DataInput in) throws IOException {

this.url = Text.readString(in);

this.pageview = in.readLong();

}

public void write(DataOutput out) throws IOException {

Text.writeString(out, url);

out.writeLong(pageview);

}

public void readFields(ResultSet resultSet) throws SQLException {

this.url = resultSet.getString(1);

this.pageview = resultSet.getLong(2);

}

public void write(PreparedStatement statement) throws SQLException {

statement.setString(1, url);

statement.setLong(2, pageview);

}

@Override

public String toString() {

return url + " " + pageview;

}

}

/**

* Mapper从AccessRecord中摘录出url,然后将<url:1>输出给Reducer

*/

static class PageviewMapper extends

Mapper<LongWritable, AccessRecord, Text, LongWritable> {

LongWritable ONE = new LongWritable(1L);

@Override

public void map(LongWritable key, AccessRecord value, Context context)

throws IOException, InterruptedException {

System.out.println("key:" + key + ",value:" + value.toString());

Text oKey = new Text(value.url);

context.write(oKey, ONE);

}

}

/**

* Reducer

*/

static class PageviewReducer extends

Reducer<Text, LongWritable, PageviewRecord, NullWritable> {

NullWritable n = NullWritable.get();

@Override

public void reduce(Text key, Iterable<LongWritable> values,

Context context) throws IOException, InterruptedException {

long sum = 0L;

for (LongWritable value : values) {

sum += value.get();

}

context.write(new PageviewRecord(key.toString(), sum), n);

}

}

public int run(String[] args) throws Exception {

String driverClassName = DRIVER_CLASS;

String url = DB_URL;

if (args.length > 1) {

driverClassName = args[0];

url = args[1];

}

initialize(driverClassName, url);

Configuration conf = getConf();

//为MR配置数据库信息用以读取和写入

DBConfiguration.configureDB(conf, driverClassName, url, DB_USER,

DB_PASSWORD);

Job job = new Job(conf);

job.setJobName("Count Pageviews of URLs");

job.setJarByClass(DBOperator.class);

job.setMapperClass(PageviewMapper.class);

job.setCombinerClass(LongSumReducer.class);

job.setReducerClass(PageviewReducer.class);

DBInputFormat.setInput(job, AccessRecord.class, "Access", null, "url",

AccessFieldNames);

DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(LongWritable.class);

job.setOutputKeyClass(PageviewRecord.class);

job.setOutputValueClass(NullWritable.class);

int ret;

try {

ret = job.waitForCompletion(true) ? 0 : 1;

boolean correct = verify();

if (!correct) {

throw new RuntimeException("Evaluation was not correct!");

}else{

System.out.println("calculate success");

}

} finally {

shutdown();

}

return ret;

}

public static void main(String[] args) throws Exception {

int ret = ToolRunner.run(new DBOperator(), args);

System.exit(ret);

}

}

MySQL字符串追加和截取

一些Python脚本保存数据库中,今天需要将MySQL数据库中,“Python XXXX.py”修改为绝对路径,即“/usr/bin/python XXXX.py”,这里就需要用到MySQL的字符串连接函数CONCAT

CONCAT(str1,str2)

返回字符串str1,str2连接后的结果,

例如原表:

mysql> select * from student;

+----+---------+--------+------+--------+-------+

| id | sid | name | cid | cname | score |

+----+---------+--------+------+--------+-------+

| 1 | 2005001 | 张三 | 0001 | 数学 | 69 |

| 2 | 2005002 | 李四 | 0001 | 数学 | 89 |

| 3 | 2005001 | 张三 | 0001 | 数学 | 69 |

+----+---------+--------+------+--------+-------+

使用CONCAT函数查询

mysql> select CONCAT('H',name) from student;

+------------------+

| CONCAT('H',name) |

+------------------+

| H张三 |

| H李四 |

| H张三 |

+------------------+

使用CONCAT函数更新

mysql> update student set name =CONCAT('A',name) where id=1;

Query OK, 1 row affected (0.00 sec)

Rows matched: 1 Changed: 1 Warnings: 0

mysql> select * from student;

+----+---------+---------+------+--------+-------+

| id | sid | name | cid | cname | score |

+----+---------+---------+------+--------+-------+

| 1 | 2005001 | A张三 | 0001 | 数学 | 69 |

| 2 | 2005002 | 李四 | 0001 | 数学 | 89 |

| 3 | 2005001 | 张三 | 0001 | 数学 | 69 |

+----+---------+---------+------+--------+-------+

MySQL还内置了一些非常又有的字符串截取函数:

1、left(str, length) 截取左边的字符串

说明:left(被截取字段,截取长度)

mysql> select Left(name,2) from student where id=1;

+--------------+

| Left(name,2) |

+--------------+

| A张 |

+--------------+

2、right(str, length) 截取右边的字符串

说明:right(被截取字段,截取长度)

mysql> select right(name,2) from student where id=1;

+---------------+

| right(name,2) |

+---------------+

| 张三 |

+---------------+

3、 substring(str, pos, length) 从指定位置截取指定长度的字符串

说明:substring(被截取字段,从第几位开始截取)

mysql> select substring(name,2,1) from student where id=1;

+---------------------+

| substring(name,2,1) |

+---------------------+

| 张 |

+---------------------+
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: