借鉴sqoop实现hdfs文件内容导入mysql
2015-04-07 17:34
183 查看
这次需要将hadoop mr的计算结果导入到mysql中,虽然是mr的结果导入db中,为了保险起见,还是存在hdfs上,之后读取hdfs上的结果导入db中,读取失败可重新执行单个读取导入过程。
一般先动手前,有个思路,再百度看看是否有更好的实现,大略搜了一下,发现sqoop貌似实现了hdfs和各种dc之间的读取写入。这里,因为业务简单,都是insert语句不涉及事务,只是连接一个db,不涉及mr等操作,so我只是借鉴sqoop的思想,没有使用sqoop。
一般实现的思路就是,读取hdfs文件,生成对应的insert语句,导入mysql就好了。
其中需要详细考虑的几个问题如下:
1、批量导入insert,一般的数据量设置多大好些?
2、执行一般失败后重新导入数据,对于已经导入的数据如何处理?
这里的话,当然批量导入会好些,但是也要考虑hdfs的reader是一个个读取数据,如果批量导入的size太大,需要存储数据的变量占用的内存大,会导致oom。
一般批量insert的量不超过1000条就好了,我这边的话,每天的量也就2000条,so我设置成500条了。
执行一般失败后重新导入数据,感觉要先删除已有导入的数据,再次导入。因为我这边每天导入一次,根据日期做delete就行了。对于不同场景,需要自己操作了。
如果是mysql数据库的话,不用自己删除了。
因为有个语句 INSERT INTO .. ON
DUPLICATE KEY UPDATE ,如果执行时不存在重复记录,则执行新增操作,否则执行更新操作。详细的内容可以读http://www.jb51.net/article/39255.htm
如果db是mysql的话,就不用自己先删除数据了。
另 INSERT INTO TABLE (a,c) VALUES (1,3),(1,7) ON DUPLICATE KEY UPDATE c=VALUES(c);
上语句比单独执行2条语句快。
本人才疏学浅,有错误之处请不吝指教。
关于insert的数量,很有必要深入研究下。
至此说完。。。
------------------------------
贴些代码(部分是sqoop中的源码改编)
至此说完。。。
本人才疏学浅,有错误之处请不吝指教。
关于insert的数量,很有必要深入研究下。
一般先动手前,有个思路,再百度看看是否有更好的实现,大略搜了一下,发现sqoop貌似实现了hdfs和各种dc之间的读取写入。这里,因为业务简单,都是insert语句不涉及事务,只是连接一个db,不涉及mr等操作,so我只是借鉴sqoop的思想,没有使用sqoop。
一般实现的思路就是,读取hdfs文件,生成对应的insert语句,导入mysql就好了。
其中需要详细考虑的几个问题如下:
1、批量导入insert,一般的数据量设置多大好些?
2、执行一般失败后重新导入数据,对于已经导入的数据如何处理?
这里的话,当然批量导入会好些,但是也要考虑hdfs的reader是一个个读取数据,如果批量导入的size太大,需要存储数据的变量占用的内存大,会导致oom。
一般批量insert的量不超过1000条就好了,我这边的话,每天的量也就2000条,so我设置成500条了。
执行一般失败后重新导入数据,感觉要先删除已有导入的数据,再次导入。因为我这边每天导入一次,根据日期做delete就行了。对于不同场景,需要自己操作了。
如果是mysql数据库的话,不用自己删除了。
因为有个语句 INSERT INTO .. ON
DUPLICATE KEY UPDATE ,如果执行时不存在重复记录,则执行新增操作,否则执行更新操作。详细的内容可以读http://www.jb51.net/article/39255.htm
如果db是mysql的话,就不用自己先删除数据了。
另 INSERT INTO TABLE (a,c) VALUES (1,3),(1,7) ON DUPLICATE KEY UPDATE c=VALUES(c);
上语句比单独执行2条语句快。
本人才疏学浅,有错误之处请不吝指教。
关于insert的数量,很有必要深入研究下。
至此说完。。。
------------------------------
贴些代码(部分是sqoop中的源码改编)
/** * 如果记录重复,则执行更新操作,否则执行新增数据操作 * @param numRows * @return */ protected String getUpdateStatement(int numRows) { boolean first; StringBuilder sb = new StringBuilder(); sb.append("INSERT INTO "); sb.append(tableName); sb.append("("); first = true; for (String column : columnNames) { if (first) { first = false; } else { sb.append(", "); } sb.append(column); } sb.append(") VALUES("); for (int i = 0; i < numRows; i++) { if (i > 0) { sb.append("),("); } for (int j = 0; j < columnNames.length; j++) { if (j > 0) { sb.append(", "); } sb.append("?"); } } sb.append(") ON DUPLICATE KEY UPDATE "); first = true; for (String column : columnNames) { if (first) { first = false; } else { sb.append(", "); } sb.append(column).append("=VALUES(").append(column).append(")"); } String query = sb.toString(); LOG.debug("Using upsert query: " + query); return query; }
protected PreparedStatement getPreparedStatement(List<Writable> userRecords) throws SQLException { PreparedStatement stmt = null; // Synchronize on connection to ensure this does not conflict // with the operations in the update thread. Connection conn = ConnectionRelate.getConn(); stmt = conn.prepareStatement(getUpdateStatement(userRecords.size())); // Inject the record parameters into the UPDATE and WHERE clauses. This // assumes that the update key column is the last column serialized in // by the underlying record. Our code auto-gen process for exports was // responsible for taking care of this constraint. int i = 0; for (Writable record : userRecords) { setParam(stmt, i, record); i += columnNames.length; } stmt.addBatch(); return stmt; }
try { reader = new SequenceFile.Reader(FileSystem.get(new Configuration()), path, new Configuration()); StringTriWritable key = new StringTriWritable(); Writable value = (MediaScoreVector) reader.getValueClass().newInstance(); while (reader.next(key, value)) { list.add(value); index++; if(index == batchSize){ index = 0; //exp the data to the mysql stmt = mediaSQL.getPreparedStatement(list); stmt.executeBatch(); list.clear(); } } //deal with size less batchsize if(list.size()>0) { stmt = mediaSQL.getPreparedStatement(list); stmt.executeBatch(); list.clear(); list = null; } } catch (Exception e) { 。。。。 } finally { if(stmt!=null){ 。。。。 } 。。。。 }
至此说完。。。
本人才疏学浅,有错误之处请不吝指教。
关于insert的数量,很有必要深入研究下。
相关文章推荐
- 通过sqoop 实现hdfs与mysql的数据导入导出
- 读取hdfs文件内容导入mysql(续)
- Java实现文件内容导入数据库
- 通过Sqoop实现Mysql / Oracle 与HDFS / Hbase互导数据
- sqoop:mysql和Hbase/Hive/Hdfs之间相互导入数据
- java实现将文件内容导入到数据库中
- sqoop:mysql和Hbase/Hive/Hdfs之间相互导入数据
- 使用Sqoop实现HDFS与Mysql互转
- 修改php.ini实现Mysql导入数据库文件最大限制的修改方法
- Sqoop实现MySql/Oracle与Hdfs/Hbase互导数据
- 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- 通过Sqoop实现Mysql / Oracle 与HDFS / Hbase互导数据
- Sqoop_详细总结 使用Sqoop将HDFS/Hive/HBase与MySQL/Oracle中的数据相互导入、导出
- 通过Sqoop实现Mysql / Oracle 与HDFS / Hbase互导数据
- 修改php.ini实现Mysql导入数据库文件最大限制的修改方法
- Java不写文件,LOAD DATA LOCAL INFILE大批量导入数据到MySQL的实现
- Sqoop安装配置及将mysql数据导入到hdfs中
- 使用Sqoop将HDFS中数据导入MYSQL中
- mysql 导入hdfs、hive、hbase sqoop使用方法