MapReduce编程实例(二)-MR2操作MySQL
2014-07-01 10:40
134 查看
MR2中如果需要从数据库中读取或者写入数据,需要自己实现Writable和DBWritable两个接口,同时还需在DBConfiguration对数据库的元信息做相应配置。
下面这个例子是Hadoop自带的,只是将数据库改为MySQL,在执行之前需要将Java连接MySQL的驱动上传到每台机器的${HADOOP_HOME}/share/hadoop/common路径下
这里需要说明两点:
1、一般不需要MR直接操作数据库,因为MR并发执行,很容易把数据库跑崩,此例只展示如何操作
2、如果必须要操作数据库,可以将数据库里面的数据导出到文本文件中,计算完成后,在通过SQLLoad的方式保存到数据库
例子比较简单,通过注释可以看懂
[java] view
plaincopyprint?
package com.test.mr2;
/**
*MapReduce操作数据库实例
*计算页面访问总量,Access表中保存了每个页面访问明细,Pageview表保存了每个页面被浏览的总次数
*sum Access表的每个url,将计算结果写到Pageview
*/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DBOperator extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DBOperator.class);
private Connection connection;
private boolean initialized = false;
// Access表字段名称
private static final String[] AccessFieldNames = { "url", "referrer",
"time" };
// Pageview表字段名称
private static final String[] PageviewFieldNames = { "url", "pageview" };
private static final String DB_URL = "jdbc:mysql://192.168.1.182:3306/hadoop?createDatabaseIfNotExist=true";
private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
private static final String DB_USER = "root";
private static final String DB_PASSWORD = "root";
// 建立数据库连接
private void createConnection(String driverClassName, String url)
throws Exception {
Class.forName(driverClassName);
connection = DriverManager.getConnection(url, DB_USER, DB_PASSWORD);
connection.setAutoCommit(false);
}
// 关闭数据库连接
private void shutdown() {
try {
connection.commit();
} catch (Throwable ex) {
LOG.warn("Exception occurred while closing connection :"
+ StringUtils.stringifyException(ex));
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (Throwable ex) {
LOG.warn("Exception occurred while shutting down HSQLDB :"
+ StringUtils.stringifyException(ex));
}
}
}
// 初始化数据库信息
private void initialize(String driverClassName, String url)
throws Exception {
if (!this.initialized) {
createConnection(driverClassName, url);
dropTables();
createTables();
populateAccess();
this.initialized = true;
}
}
private void dropTables() {
String dropAccess = "DROP TABLE Access IF EXISTS";
String dropPageview = "DROP TABLE Pageview IF EXISTS";
Statement st = null;
try {
st = connection.createStatement();
st.executeUpdate(dropAccess);
st.executeUpdate(dropPageview);
connection.commit();
st.close();
} catch (SQLException ex) {
try {
if (st != null) {
st.close();
}
} catch (Exception e) {
}
}
}
private void createTables() throws SQLException {
String createAccess = "CREATE TABLE IF NOT EXISTS "
+ "Access(url VARCHAR(100) NOT NULL,"
+ " referrer VARCHAR(100)," + " time BIGINT NOT NULL, "
+ " PRIMARY KEY (url, time))";
String createPageview = "CREATE TABLE IF NOT EXISTS "
+ "Pageview(url VARCHAR(100) NOT NULL,"
+ " pageview BIGINT NOT NULL, " + " PRIMARY KEY (url))";
Statement st = connection.createStatement();
try {
st.executeUpdate(createAccess);
st.executeUpdate(createPageview);
connection.commit();
} finally {
st.close();
}
}
/**
* 填充测试数据到Access表中
*/
private void populateAccess() throws SQLException {
PreparedStatement statement = null;
try {
statement = connection
.prepareStatement("INSERT INTO Access(url, referrer, time)"
+ " VALUES (?, ?, ?)");
Random random = new Random();
int time = random.nextInt(50) + 50;
final int PROBABILITY_PRECISION = 100; // 1 / 100
final int NEW_PAGE_PROBABILITY = 15; // 15 / 100
// Pages in the site :
String[] pages = { "/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h",
"/i", "/j" };
// linkMatrix[i] is the array of pages(indexes) that page_i links
// to.
int[][] linkMatrix = { { 1, 5, 7 }, { 0, 7, 4, 6, },
{ 0, 1, 7, 8 }, { 0, 2, 4, 6, 7, 9 }, { 0, 1 },
{ 0, 3, 5, 9 }, { 0 }, { 0, 1, 3 }, { 0, 2, 6 },
{ 0, 2, 6 } };
// a mini model of user browsing a la pagerank
int currentPage = random.nextInt(pages.length);
String referrer = null;
for (int i = 0; i < time; i++) {
statement.setString(1, pages[currentPage]);
statement.setString(2, referrer);
statement.setLong(3, i);
statement.execute();
int action = random.nextInt(PROBABILITY_PRECISION);
// go to a new page with probability
// NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
if (action < NEW_PAGE_PROBABILITY) {
currentPage = random.nextInt(pages.length); // a random page
referrer = null;
} else {
referrer = pages[currentPage];
action = random.nextInt(linkMatrix[currentPage].length);
currentPage = linkMatrix[currentPage][action];
}
}
connection.commit();
} catch (SQLException ex) {
connection.rollback();
throw ex;
} finally {
if (statement != null) {
statement.close();
}
}
}
/**
*
* 验证结果是否正确
*/
private boolean verify() throws SQLException {
String countAccessQuery = "SELECT COUNT(*) FROM Access";
String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview";
Statement st = null;
ResultSet rs = null;
try {
st = connection.createStatement();
rs = st.executeQuery(countAccessQuery);
rs.next();
long totalPageview = rs.getLong(1);
rs = st.executeQuery(sumPageviewQuery);
rs.next();
long sumPageview = rs.getLong(1);
LOG.info("totalPageview=" + totalPageview);
LOG.info("sumPageview=" + sumPageview);
/*
* Access表中记录总数应该和Pageview表中pageview的和相等
*/
return totalPageview == sumPageview && totalPageview != 0;
} finally {
if (st != null)
st.close();
if (rs != null)
rs.close();
}
}
/**
* Access表的记录对象
*/
static class AccessRecord implements Writable, DBWritable {
String url;
String referrer;
long time;
/*
* 从输入流中反序列化需要的属性
*/
public void readFields(DataInput in) throws IOException {
this.url = Text.readString(in);
this.referrer = Text.readString(in);
this.time = in.readLong();
}
/*
* 将属性序列化到输出流中
*/
public void write(DataOutput out) throws IOException {
Text.writeString(out, url);
Text.writeString(out, referrer);
out.writeLong(time);
}
public void readFields(ResultSet resultSet) throws SQLException {
this.url = resultSet.getString(1);
this.referrer = resultSet.getString(2);
this.time = resultSet.getLong(3);
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, url);
statement.setString(2, referrer);
statement.setLong(3, time);
}
@Override
public String toString() {
return url + " " + referrer +" "+time;
}
}
/**
* Pageview表的记录对象
*/
static class PageviewRecord implements Writable, DBWritable {
String url;
long pageview;
public PageviewRecord(String url, long pageview) {
this.url = url;
this.pageview = pageview;
}
public void readFields(DataInput in) throws IOException {
this.url = Text.readString(in);
this.pageview = in.readLong();
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, url);
out.writeLong(pageview);
}
public void readFields(ResultSet resultSet) throws SQLException {
this.url = resultSet.getString(1);
this.pageview = resultSet.getLong(2);
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, url);
statement.setLong(2, pageview);
}
@Override
public String toString() {
return url + " " + pageview;
}
}
/**
* Mapper从AccessRecord中摘录出url,然后将<url:1>输出给Reducer
*/
static class PageviewMapper extends
Mapper<LongWritable, AccessRecord, Text, LongWritable> {
LongWritable ONE = new LongWritable(1L);
@Override
public void map(LongWritable key, AccessRecord value, Context context)
throws IOException, InterruptedException {
System.out.println("key:" + key + ",value:" + value.toString());
Text oKey = new Text(value.url);
context.write(oKey, ONE);
}
}
/**
* Reducer
*/
static class PageviewReducer extends
Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
NullWritable n = NullWritable.get();
@Override
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long sum = 0L;
for (LongWritable value : values) {
sum += value.get();
}
context.write(new PageviewRecord(key.toString(), sum), n);
}
}
public int run(String[] args) throws Exception {
String driverClassName = DRIVER_CLASS;
String url = DB_URL;
if (args.length > 1) {
driverClassName = args[0];
url = args[1];
}
initialize(driverClassName, url);
Configuration conf = getConf();
//为MR配置数据库信息用以读取和写入
DBConfiguration.configureDB(conf, driverClassName, url, DB_USER,
DB_PASSWORD);
Job job = new Job(conf);
job.setJobName("Count Pageviews of URLs");
job.setJarByClass(DBOperator.class);
job.setMapperClass(PageviewMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(PageviewReducer.class);
DBInputFormat.setInput(job, AccessRecord.class, "Access", null, "url",
AccessFieldNames);
DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(PageviewRecord.class);
job.setOutputValueClass(NullWritable.class);
int ret;
try {
ret = job.waitForCompletion(true) ? 0 : 1;
boolean correct = verify();
if (!correct) {
throw new RuntimeException("Evaluation was not correct!");
}else{
System.out.println("calculate success");
}
} finally {
shutdown();
}
return ret;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new DBOperator(), args);
System.exit(ret);
}
}
MySQL字符串追加和截取
一些Python脚本保存数据库中,今天需要将MySQL数据库中,“Python XXXX.py”修改为绝对路径,即“/usr/bin/python XXXX.py”,这里就需要用到MySQL的字符串连接函数CONCAT
CONCAT(str1,str2)
返回字符串str1,str2连接后的结果,
例如原表:
mysql> select * from student;
+----+---------+--------+------+--------+-------+
| id | sid | name | cid | cname | score |
+----+---------+--------+------+--------+-------+
| 1 | 2005001 | 张三 | 0001 | 数学 | 69 |
| 2 | 2005002 | 李四 | 0001 | 数学 | 89 |
| 3 | 2005001 | 张三 | 0001 | 数学 | 69 |
+----+---------+--------+------+--------+-------+
使用CONCAT函数查询
mysql> select CONCAT('H',name) from student;
+------------------+
| CONCAT('H',name) |
+------------------+
| H张三 |
| H李四 |
| H张三 |
+------------------+
使用CONCAT函数更新
mysql> update student set name =CONCAT('A',name) where id=1;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> select * from student;
+----+---------+---------+------+--------+-------+
| id | sid | name | cid | cname | score |
+----+---------+---------+------+--------+-------+
| 1 | 2005001 | A张三 | 0001 | 数学 | 69 |
| 2 | 2005002 | 李四 | 0001 | 数学 | 89 |
| 3 | 2005001 | 张三 | 0001 | 数学 | 69 |
+----+---------+---------+------+--------+-------+
MySQL还内置了一些非常又有的字符串截取函数:
1、left(str, length) 截取左边的字符串
说明:left(被截取字段,截取长度)
mysql> select Left(name,2) from student where id=1;
+--------------+
| Left(name,2) |
+--------------+
| A张 |
+--------------+
2、right(str, length) 截取右边的字符串
说明:right(被截取字段,截取长度)
mysql> select right(name,2) from student where id=1;
+---------------+
| right(name,2) |
+---------------+
| 张三 |
+---------------+
3、 substring(str, pos, length) 从指定位置截取指定长度的字符串
说明:substring(被截取字段,从第几位开始截取)
mysql> select substring(name,2,1) from student where id=1;
+---------------------+
| substring(name,2,1) |
+---------------------+
| 张 |
+---------------------+
下面这个例子是Hadoop自带的,只是将数据库改为MySQL,在执行之前需要将Java连接MySQL的驱动上传到每台机器的${HADOOP_HOME}/share/hadoop/common路径下
这里需要说明两点:
1、一般不需要MR直接操作数据库,因为MR并发执行,很容易把数据库跑崩,此例只展示如何操作
2、如果必须要操作数据库,可以将数据库里面的数据导出到文本文件中,计算完成后,在通过SQLLoad的方式保存到数据库
例子比较简单,通过注释可以看懂
[java] view
plaincopyprint?
package com.test.mr2;
/**
*MapReduce操作数据库实例
*计算页面访问总量,Access表中保存了每个页面访问明细,Pageview表保存了每个页面被浏览的总次数
*sum Access表的每个url,将计算结果写到Pageview
*/
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.reduce.LongSumReducer;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class DBOperator extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(DBOperator.class);
private Connection connection;
private boolean initialized = false;
// Access表字段名称
private static final String[] AccessFieldNames = { "url", "referrer",
"time" };
// Pageview表字段名称
private static final String[] PageviewFieldNames = { "url", "pageview" };
private static final String DB_URL = "jdbc:mysql://192.168.1.182:3306/hadoop?createDatabaseIfNotExist=true";
private static final String DRIVER_CLASS = "com.mysql.jdbc.Driver";
private static final String DB_USER = "root";
private static final String DB_PASSWORD = "root";
// 建立数据库连接
private void createConnection(String driverClassName, String url)
throws Exception {
Class.forName(driverClassName);
connection = DriverManager.getConnection(url, DB_USER, DB_PASSWORD);
connection.setAutoCommit(false);
}
// 关闭数据库连接
private void shutdown() {
try {
connection.commit();
} catch (Throwable ex) {
LOG.warn("Exception occurred while closing connection :"
+ StringUtils.stringifyException(ex));
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (Throwable ex) {
LOG.warn("Exception occurred while shutting down HSQLDB :"
+ StringUtils.stringifyException(ex));
}
}
}
// 初始化数据库信息
private void initialize(String driverClassName, String url)
throws Exception {
if (!this.initialized) {
createConnection(driverClassName, url);
dropTables();
createTables();
populateAccess();
this.initialized = true;
}
}
private void dropTables() {
String dropAccess = "DROP TABLE Access IF EXISTS";
String dropPageview = "DROP TABLE Pageview IF EXISTS";
Statement st = null;
try {
st = connection.createStatement();
st.executeUpdate(dropAccess);
st.executeUpdate(dropPageview);
connection.commit();
st.close();
} catch (SQLException ex) {
try {
if (st != null) {
st.close();
}
} catch (Exception e) {
}
}
}
private void createTables() throws SQLException {
String createAccess = "CREATE TABLE IF NOT EXISTS "
+ "Access(url VARCHAR(100) NOT NULL,"
+ " referrer VARCHAR(100)," + " time BIGINT NOT NULL, "
+ " PRIMARY KEY (url, time))";
String createPageview = "CREATE TABLE IF NOT EXISTS "
+ "Pageview(url VARCHAR(100) NOT NULL,"
+ " pageview BIGINT NOT NULL, " + " PRIMARY KEY (url))";
Statement st = connection.createStatement();
try {
st.executeUpdate(createAccess);
st.executeUpdate(createPageview);
connection.commit();
} finally {
st.close();
}
}
/**
* 填充测试数据到Access表中
*/
private void populateAccess() throws SQLException {
PreparedStatement statement = null;
try {
statement = connection
.prepareStatement("INSERT INTO Access(url, referrer, time)"
+ " VALUES (?, ?, ?)");
Random random = new Random();
int time = random.nextInt(50) + 50;
final int PROBABILITY_PRECISION = 100; // 1 / 100
final int NEW_PAGE_PROBABILITY = 15; // 15 / 100
// Pages in the site :
String[] pages = { "/a", "/b", "/c", "/d", "/e", "/f", "/g", "/h",
"/i", "/j" };
// linkMatrix[i] is the array of pages(indexes) that page_i links
// to.
int[][] linkMatrix = { { 1, 5, 7 }, { 0, 7, 4, 6, },
{ 0, 1, 7, 8 }, { 0, 2, 4, 6, 7, 9 }, { 0, 1 },
{ 0, 3, 5, 9 }, { 0 }, { 0, 1, 3 }, { 0, 2, 6 },
{ 0, 2, 6 } };
// a mini model of user browsing a la pagerank
int currentPage = random.nextInt(pages.length);
String referrer = null;
for (int i = 0; i < time; i++) {
statement.setString(1, pages[currentPage]);
statement.setString(2, referrer);
statement.setLong(3, i);
statement.execute();
int action = random.nextInt(PROBABILITY_PRECISION);
// go to a new page with probability
// NEW_PAGE_PROBABILITY / PROBABILITY_PRECISION
if (action < NEW_PAGE_PROBABILITY) {
currentPage = random.nextInt(pages.length); // a random page
referrer = null;
} else {
referrer = pages[currentPage];
action = random.nextInt(linkMatrix[currentPage].length);
currentPage = linkMatrix[currentPage][action];
}
}
connection.commit();
} catch (SQLException ex) {
connection.rollback();
throw ex;
} finally {
if (statement != null) {
statement.close();
}
}
}
/**
*
* 验证结果是否正确
*/
private boolean verify() throws SQLException {
String countAccessQuery = "SELECT COUNT(*) FROM Access";
String sumPageviewQuery = "SELECT SUM(pageview) FROM Pageview";
Statement st = null;
ResultSet rs = null;
try {
st = connection.createStatement();
rs = st.executeQuery(countAccessQuery);
rs.next();
long totalPageview = rs.getLong(1);
rs = st.executeQuery(sumPageviewQuery);
rs.next();
long sumPageview = rs.getLong(1);
LOG.info("totalPageview=" + totalPageview);
LOG.info("sumPageview=" + sumPageview);
/*
* Access表中记录总数应该和Pageview表中pageview的和相等
*/
return totalPageview == sumPageview && totalPageview != 0;
} finally {
if (st != null)
st.close();
if (rs != null)
rs.close();
}
}
/**
* Access表的记录对象
*/
static class AccessRecord implements Writable, DBWritable {
String url;
String referrer;
long time;
/*
* 从输入流中反序列化需要的属性
*/
public void readFields(DataInput in) throws IOException {
this.url = Text.readString(in);
this.referrer = Text.readString(in);
this.time = in.readLong();
}
/*
* 将属性序列化到输出流中
*/
public void write(DataOutput out) throws IOException {
Text.writeString(out, url);
Text.writeString(out, referrer);
out.writeLong(time);
}
public void readFields(ResultSet resultSet) throws SQLException {
this.url = resultSet.getString(1);
this.referrer = resultSet.getString(2);
this.time = resultSet.getLong(3);
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, url);
statement.setString(2, referrer);
statement.setLong(3, time);
}
@Override
public String toString() {
return url + " " + referrer +" "+time;
}
}
/**
* Pageview表的记录对象
*/
static class PageviewRecord implements Writable, DBWritable {
String url;
long pageview;
public PageviewRecord(String url, long pageview) {
this.url = url;
this.pageview = pageview;
}
public void readFields(DataInput in) throws IOException {
this.url = Text.readString(in);
this.pageview = in.readLong();
}
public void write(DataOutput out) throws IOException {
Text.writeString(out, url);
out.writeLong(pageview);
}
public void readFields(ResultSet resultSet) throws SQLException {
this.url = resultSet.getString(1);
this.pageview = resultSet.getLong(2);
}
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, url);
statement.setLong(2, pageview);
}
@Override
public String toString() {
return url + " " + pageview;
}
}
/**
* Mapper从AccessRecord中摘录出url,然后将<url:1>输出给Reducer
*/
static class PageviewMapper extends
Mapper<LongWritable, AccessRecord, Text, LongWritable> {
LongWritable ONE = new LongWritable(1L);
@Override
public void map(LongWritable key, AccessRecord value, Context context)
throws IOException, InterruptedException {
System.out.println("key:" + key + ",value:" + value.toString());
Text oKey = new Text(value.url);
context.write(oKey, ONE);
}
}
/**
* Reducer
*/
static class PageviewReducer extends
Reducer<Text, LongWritable, PageviewRecord, NullWritable> {
NullWritable n = NullWritable.get();
@Override
public void reduce(Text key, Iterable<LongWritable> values,
Context context) throws IOException, InterruptedException {
long sum = 0L;
for (LongWritable value : values) {
sum += value.get();
}
context.write(new PageviewRecord(key.toString(), sum), n);
}
}
public int run(String[] args) throws Exception {
String driverClassName = DRIVER_CLASS;
String url = DB_URL;
if (args.length > 1) {
driverClassName = args[0];
url = args[1];
}
initialize(driverClassName, url);
Configuration conf = getConf();
//为MR配置数据库信息用以读取和写入
DBConfiguration.configureDB(conf, driverClassName, url, DB_USER,
DB_PASSWORD);
Job job = new Job(conf);
job.setJobName("Count Pageviews of URLs");
job.setJarByClass(DBOperator.class);
job.setMapperClass(PageviewMapper.class);
job.setCombinerClass(LongSumReducer.class);
job.setReducerClass(PageviewReducer.class);
DBInputFormat.setInput(job, AccessRecord.class, "Access", null, "url",
AccessFieldNames);
DBOutputFormat.setOutput(job, "Pageview", PageviewFieldNames);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(PageviewRecord.class);
job.setOutputValueClass(NullWritable.class);
int ret;
try {
ret = job.waitForCompletion(true) ? 0 : 1;
boolean correct = verify();
if (!correct) {
throw new RuntimeException("Evaluation was not correct!");
}else{
System.out.println("calculate success");
}
} finally {
shutdown();
}
return ret;
}
public static void main(String[] args) throws Exception {
int ret = ToolRunner.run(new DBOperator(), args);
System.exit(ret);
}
}
MySQL字符串追加和截取
一些Python脚本保存数据库中,今天需要将MySQL数据库中,“Python XXXX.py”修改为绝对路径,即“/usr/bin/python XXXX.py”,这里就需要用到MySQL的字符串连接函数CONCAT
CONCAT(str1,str2)
返回字符串str1,str2连接后的结果,
例如原表:
mysql> select * from student;
+----+---------+--------+------+--------+-------+
| id | sid | name | cid | cname | score |
+----+---------+--------+------+--------+-------+
| 1 | 2005001 | 张三 | 0001 | 数学 | 69 |
| 2 | 2005002 | 李四 | 0001 | 数学 | 89 |
| 3 | 2005001 | 张三 | 0001 | 数学 | 69 |
+----+---------+--------+------+--------+-------+
使用CONCAT函数查询
mysql> select CONCAT('H',name) from student;
+------------------+
| CONCAT('H',name) |
+------------------+
| H张三 |
| H李四 |
| H张三 |
+------------------+
使用CONCAT函数更新
mysql> update student set name =CONCAT('A',name) where id=1;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> select * from student;
+----+---------+---------+------+--------+-------+
| id | sid | name | cid | cname | score |
+----+---------+---------+------+--------+-------+
| 1 | 2005001 | A张三 | 0001 | 数学 | 69 |
| 2 | 2005002 | 李四 | 0001 | 数学 | 89 |
| 3 | 2005001 | 张三 | 0001 | 数学 | 69 |
+----+---------+---------+------+--------+-------+
MySQL还内置了一些非常又有的字符串截取函数:
1、left(str, length) 截取左边的字符串
说明:left(被截取字段,截取长度)
mysql> select Left(name,2) from student where id=1;
+--------------+
| Left(name,2) |
+--------------+
| A张 |
+--------------+
2、right(str, length) 截取右边的字符串
说明:right(被截取字段,截取长度)
mysql> select right(name,2) from student where id=1;
+---------------+
| right(name,2) |
+---------------+
| 张三 |
+---------------+
3、 substring(str, pos, length) 从指定位置截取指定长度的字符串
说明:substring(被截取字段,从第几位开始截取)
mysql> select substring(name,2,1) from student where id=1;
+---------------------+
| substring(name,2,1) |
+---------------------+
| 张 |
+---------------------+
相关文章推荐
- MapReduce编程实例(二)-MR2操作MySQL
- VC操作XML编程实例
- RedHat 上安装多个 mysql 实例并配置 django 连接的操作记录
- mysql-基本命令-c编程实例
- C++操作XML编程实例 转
- 配置MySQL与卸载MySQL实例操作
- The MySQL C API 编程实例
- C语言操作mysql 添加、删除、修改、查询实例
- C#使用MySQLDriverCS操作MySQL实例教程代码
- discuz操作mysql实例
- WCF热门问题编程示例(2)多个实例调用一个WCF服务操作,需要等待服务响应吗
- RedHat 上安装多个 mysql 实例并配置 django 连接的操作记录
- java操作xml编程实例(sax)
- The MySQL C API 编程实例
- [Java] 操作Mysql实例
- The MySQL C API 编程实例
- java操作xml编程实例(sax)
- WCF热门问题编程示例(2)多个实例调用一个WCF服务操作,需要等待服务响应吗
- 数据库操作:java连接MYSQL实例代码演示
- mysql常用字符串操作函数大全,以及实例