您的位置:首页 > 数据库 > SQL

借鉴sqoop实现hdfs文件内容导入mysql

2015-04-07 17:34 183 查看
这次需要将hadoop mr的计算结果导入到mysql中,虽然是mr的结果导入db中,为了保险起见,还是存在hdfs上,之后读取hdfs上的结果导入db中,读取失败可重新执行单个读取导入过程。

一般先动手前,有个思路,再百度看看是否有更好的实现,大略搜了一下,发现sqoop貌似实现了hdfs和各种dc之间的读取写入。这里,因为业务简单,都是insert语句不涉及事务,只是连接一个db,不涉及mr等操作,so我只是借鉴sqoop的思想,没有使用sqoop。

一般实现的思路就是,读取hdfs文件,生成对应的insert语句,导入mysql就好了。

其中需要详细考虑的几个问题如下:

1、批量导入insert,一般的数据量设置多大好些?

2、执行一般失败后重新导入数据,对于已经导入的数据如何处理?

这里的话,当然批量导入会好些,但是也要考虑hdfs的reader是一个个读取数据,如果批量导入的size太大,需要存储数据的变量占用的内存大,会导致oom。

一般批量insert的量不超过1000条就好了,我这边的话,每天的量也就2000条,so我设置成500条了。

执行一般失败后重新导入数据,感觉要先删除已有导入的数据,再次导入。因为我这边每天导入一次,根据日期做delete就行了。对于不同场景,需要自己操作了。

如果是mysql数据库的话,不用自己删除了。

因为有个语句   INSERT INTO .. ON
DUPLICATE KEY UPDATE  ,如果执行时不存在重复记录,则执行新增操作,否则执行更新操作。详细的内容可以读http://www.jb51.net/article/39255.htm

如果db是mysql的话,就不用自己先删除数据了。

另 INSERT INTO TABLE (a,c) VALUES (1,3),(1,7) ON DUPLICATE KEY UPDATE c=VALUES(c);

上语句比单独执行2条语句快。

本人才疏学浅,有错误之处请不吝指教。

关于insert的数量,很有必要深入研究下。

至此说完。。。

------------------------------

贴些代码(部分是sqoop中的源码改编)

/**
* 如果记录重复,则执行更新操作,否则执行新增数据操作
* @param numRows
* @return
*/
protected String getUpdateStatement(int numRows) {
boolean first;
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ");
sb.append(tableName);
sb.append("(");
first = true;
for (String column : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}
sb.append(column);
}

sb.append(") VALUES(");
for (int i = 0; i < numRows; i++) {
if (i > 0) {
sb.append("),(");
}
for (int j = 0; j < columnNames.length; j++) {
if (j > 0) {
sb.append(", ");
}
sb.append("?");
}
}

sb.append(") ON DUPLICATE KEY UPDATE ");

first = true;
for (String column : columnNames) {
if (first) {
first = false;
} else {
sb.append(", ");
}

sb.append(column).append("=VALUES(").append(column).append(")");
}

String query = sb.toString();
LOG.debug("Using upsert query: " + query);
return query;
}


protected PreparedStatement getPreparedStatement(List<Writable> userRecords) throws SQLException {

PreparedStatement stmt = null;

// Synchronize on connection to ensure this does not conflict
// with the operations in the update thread.
Connection conn = ConnectionRelate.getConn();
stmt = conn.prepareStatement(getUpdateStatement(userRecords.size()));
// Inject the record parameters into the UPDATE and WHERE clauses. This
// assumes that the update key column is the last column serialized in
// by the underlying record. Our code auto-gen process for exports was
// responsible for taking care of this constraint.
int i = 0;
for (Writable record : userRecords) {
setParam(stmt, i, record);
i += columnNames.length;
}
stmt.addBatch();

return stmt;
}


try {
reader = new SequenceFile.Reader(FileSystem.get(new Configuration()), path, new Configuration());
StringTriWritable key = new StringTriWritable();
Writable value = (MediaScoreVector) reader.getValueClass().newInstance();
while (reader.next(key, value)) {
list.add(value);
index++;
if(index == batchSize){
index = 0;
//exp the data to the mysql
stmt = mediaSQL.getPreparedStatement(list);
stmt.executeBatch();

list.clear();
}
}

//deal with size less batchsize
if(list.size()>0) {
stmt = mediaSQL.getPreparedStatement(list);
stmt.executeBatch();

list.clear();
list = null;
}

} catch (Exception e) {
。。。。
} finally {
if(stmt!=null){

。。。。
}
。。。。
}


至此说完。。。

本人才疏学浅,有错误之处请不吝指教。

关于insert的数量,很有必要深入研究下。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: