百万行csv数据导入mysql的处理
2015-04-15 10:50
99 查看
公司有个企业精准营销系统项目,需求就是客户导出他们的客户信息数据和交易数据,都是csv文件,交易数据级别是百万级别,甚至达到千万,用我们系统进行分析时,需要导入mysql数据库中,方便统计分析。
开始没有考虑数据量,我们思路是按行解析csv文件,批量导入mysql中,后来测试发现,对于10w行内数据,基本保持在20秒左右时间导入完成。当数据达到100w行时,导入用时是190秒,太慢了。我们换了一个思路,可以利用多线程,充分利用计算机资源。我们根据文件的大小,拆分成多个文件。每个线程来处理一个文件,最大线程是10个。经测试,导入数据库时间明显减少,用下表数据来展示结果:
附:每5000行批处理提交一次,文件按20000行分割成一个文件,如果大于20w行,则平分成10个文件。
电脑配置:intel i5-2.50GHZ 内存8G 64位操作系统
500w行记录(约330M)
100w行记录
20w行记录
10w行记录
多线程处理代码如下:
MainTask.java
BatchDataThread.java
开始没有考虑数据量,我们思路是按行解析csv文件,批量导入mysql中,后来测试发现,对于10w行内数据,基本保持在20秒左右时间导入完成。当数据达到100w行时,导入用时是190秒,太慢了。我们换了一个思路,可以利用多线程,充分利用计算机资源。我们根据文件的大小,拆分成多个文件。每个线程来处理一个文件,最大线程是10个。经测试,导入数据库时间明显减少,用下表数据来展示结果:
附:每5000行批处理提交一次,文件按20000行分割成一个文件,如果大于20w行,则平分成10个文件。
电脑配置:intel i5-2.50GHZ 内存8G 64位操作系统
次数\方式 | 单线程执行(单位:毫秒) | 多线程执行(单位:毫秒) | 相差时间(单位:毫秒) |
1 | 922099 | 573437 | 348662 |
2 | 946358 | 586947 | 359411 |
3 | 956893 | 574340 | 382553 |
次数\方式 | 单线程执行(单位:ms) | 多线程执行(单位:ms) |
1 | 183951 | 100737 |
2 | 163927 | 112452 |
3 | 194593 | 106406 |
次数\方式 | 单线程执行(单位:ms) | 多线程执行(单位:ms) |
1 | 49859 | 21700 |
2 | 54694 | 21891 |
3 | 48976 | 21989 |
次数\方式 | 单线程执行(单位:ms) | 多线程执行(单位:ms) |
1 | 18949 | 12641 |
2 | 19958 | 13646 |
3 | 19075 | 12843 |
MainTask.java
package com.uec.cbi.biz.bigdata; import java.util.List; import org.apache.log4j.Logger; public class MainTask { public static final Logger logger = Logger.getLogger(MainTask.class); //多线程启动任务 public MainTask(String csvfile,String targetPath){ long time1 = System.currentTimeMillis(); List<String> list = SplitFileUtil.separateFileByLine(csvfile, targetPath); long time2 = System.currentTimeMillis(); logger.info("分割文件时间:"+(time2-time1)); for(int i=0;i<list.size();i++){ logger.info("线程"+i+"开启......"); String targetFile = list.get(i); BatchDataThread bdt = new BatchDataThread(targetFile); Thread th = new Thread(bdt); th.start(); } } public static void main(String[] args) { } }SplitFileUtil.java
package com.uec.cbi.biz.bigdata; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.OutputStreamWriter; import java.io.UnsupportedEncodingException; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.List; import org.apache.log4j.Logger; import com.uec.cbi.constants.Constants; /** * 处理csv文件类 * * @author srchen * */ public class SplitFileUtil { public static final Logger logger = Logger.getLogger(SplitFileUtil.class); public static final String _encoding = "GB2312"; public static final DecimalFormat df = new DecimalFormat("#.00"); /** * 分割文件 * @param sourceFileUrl 源文件路径 * */ public static List<String> separateFileByLine(String sourceFileUrl,String targetPath){ List<String> filenames = new ArrayList<String>(); int file_count = 0; //获取文件总行数 long lineNum = getFileLineNum(sourceFileUrl); if(lineNum<=Constants.line_num_per_file){//不做分文件处理 filenames.add(sourceFileUrl); }else{ //分割文件 if(lineNum%Constants.line_num_per_file==0){ file_count = (int)(lineNum/Constants.line_num_per_file); }else{ file_count = (int)(lineNum/Constants.line_num_per_file+1); } logger.info("文件数目:"+file_count); Long[] countArray = null; if(file_count>Constants.max_file_count){ file_count = Constants.max_file_count; //将每个文件的起始索引记录下来 countArray = new Long[file_count]; long file_start_index = 0; int line_num_per_file = 0; if(lineNum%file_count==0){ line_num_per_file = (int)(lineNum/file_count); }else{ line_num_per_file = (int)(lineNum/file_count+1); } for(int i=0;i<file_count;i++){ file_start_index = (i+1)*line_num_per_file; countArray[i] = file_start_index; } }else{ //将每个文件的起始索引记录下来 countArray = new Long[file_count]; long file_start_index = 0; for(int i=0;i<file_count;i++){ file_start_index = (i+1)*Constants.line_num_per_file; countArray[i] = file_start_index; } } /*处理文件*/ InputStreamReader fr = null; BufferedReader br = null; try { fr = new InputStreamReader(new FileInputStream(sourceFileUrl),_encoding); br = new BufferedReader(fr); int count = 0; int _index = 0; String rec = null;// 一行 List<String> result = new ArrayList<String>(); //long time21 = System.currentTimeMillis(); // 读取一行 while ((rec = br.readLine()) != null) { count++; result.add(rec); if(count==countArray[_index]){ //一次性写入文件 String targetFile = targetPath+"data_part_"+_index+".csv"; writeCsv(targetFile,result); result.clear(); _index++; filenames.add(targetFile); } } if(result.size()>0){ String targetFile = targetPath+"data_part_"+(file_count-1)+".csv"; writeCsv(targetFile,result); result.clear(); filenames.add(targetFile); } //long time22 = System.currentTimeMillis(); // System.out.println("while= time is:"+(time22-time21)); }catch(Exception e){ e.printStackTrace(); }finally { try { if(br!=null){ br.close(); } if(fr!=null){ fr.close(); } } catch (Exception ex) { ex.printStackTrace(); } } } return filenames; } public static void writeCsv(String csvFile,List<String> contentList){ BufferedWriter bw = null; try { bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(csvFile), _encoding), 1024); for(int i=0;i<contentList.size();i++){ String str = contentList.get(i); bw.write(str); bw.newLine(); } bw.flush(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally{ if(bw!=null){ try { bw.close(); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args){ String file = "c://R_HOME/user.csv"; separateFileByLine(file,"c://R_HOME//temp//"); } public static long getFileLineNum(String filename){ long time1 = System.currentTimeMillis(); //取出文件总行数 long count = 0; InputStreamReader fr = null; BufferedReader br = null; try { fr = new InputStreamReader(new FileInputStream(filename),_encoding); br = new BufferedReader(fr); String rec = null;// 一行 long time21 = System.currentTimeMillis(); // 读取一行 while ((rec = br.readLine()) != null) { count++; } long time22 = System.currentTimeMillis(); System.out.println("while= time is:"+(time22-time21)); }catch(Exception e){ e.printStackTrace(); }finally { try { if(br!=null){ br.close(); } if(fr!=null){ fr.close(); } } catch (Exception ex) { ex.printStackTrace(); } } long time2 = System.currentTimeMillis(); System.out.println("getFileLineNum==["+count+"] time is:"+(time2-time1)); return count; } }
BatchDataThread.java
package com.uec.cbi.biz.bigdata; import java.io.BufferedReader; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStreamReader; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import org.apache.log4j.Logger; import com.uec.cbi.constants.Constants; import com.uec.cbi.util.DBUtil; public class BatchDataThread implements Runnable{ public static final Logger logger = Logger.getLogger(BatchDataThread.class); public String fileName = null; public Connection conns=null; public BatchDataThread(String fileName){ this.fileName = fileName; //this.conns = conn; conns = DBUtil.getConnection(); } @Override public void run() { long starttime = System.currentTimeMillis(); InputStreamReader fr = null; BufferedReader br = null; PreparedStatement pstmt = null; try { pstmt = conns.prepareStatement(Constants.strs_insert_table); //logger.info("文件名称:"+fileName); fr = new InputStreamReader(new FileInputStream(fileName)); br = new BufferedReader(fr); String rec = null;// 一行 int count = 0; // 读取一行 while ((rec = br.readLine()) != null) { rec = rec.trim(); String[] values = rec.split(Constants.csv_sep); for(int i=0;i<values.length;i++){ pstmt.setString(i+1, values[i]); } pstmt.addBatch(); count++; if(count%Constants.batch_submit_count==0){ // 执行批量更新 int[] insertcount = pstmt.executeBatch(); //logger.info("【"+fileName+"】 提交成功:"+insertcount.length); logger.info("【"+fileName+"】 提交成功记录数:"+count); } } // 执行批量更新 int[] insertcount = new int[0]; if(count%Constants.batch_submit_count!=0){ insertcount = pstmt.executeBatch(); } // 语句执行完毕,提交本事务 conns.commit(); logger.info("【"+fileName+"】提交成功:"+insertcount.length); long endtime = System.currentTimeMillis(); logger.info("【"+fileName+"】task take time is :"+(endtime-starttime)); } catch (IOException ioe) { ioe.printStackTrace(); }catch (Exception e) { e.printStackTrace(); } finally { if (br != null) { try { br.close(); } catch (IOException e) { e.printStackTrace(); } } if (fr != null) { try { fr.close(); } catch (IOException e) { e.printStackTrace(); } } if(pstmt!=null){ try { pstmt.close(); } catch (SQLException e) { e.printStackTrace(); } } if(conns!=null){ try { conns.close(); } catch (SQLException e) { e.printStackTrace(); } } } File file = new File(fileName); if(file.exists()){ file.delete(); logger.info("【"+fileName+"】删除成功......."); } } }
相关文章推荐
- mysql从csv文件导入数据时提示int类型出现' '(空字符串)
- redis cannot assign requested address mysql;数据表导出为csv格式;C++项目导入eclipse流程
- 如何将 JSON, Text, XML, CSV 数据文件导入 MySQL
- mysql 导入 csv 格式数据
- Excel数据大量导入MySQL--MySQL数据换行符回车符替换处理
- mysql中用HEX和UNHEX函数处理二进制数据的导入导出
- 解析csv数据导入mysql的方法
- mysql中用HEX和UNHEX函数处理二进制数据的导入导出
- 导入csv文件时,如果最后一列没有数据,读取出来的集合比抬头字段少,和数据中包含","如何处理?
- MySQL如何导入csv格式数据文件解决方案
- Python - mysql中导入CSV数据 【学习笔记】
- mysql 跨服务器数据_导入/导出 csv
- mysqlimport工具的使用帮助文档(mysql导入csv数据,mysql备份数据恢复)
- 以CSV文件导入MySQL的批量数据插入操作之Java操作
- mysql从csv文件导入数据时提示int类型出现' '(空字符串)
- 数据库学习纪要(二十一):MySQL创建数据库、表,及导入CSV文件数据1
- 解析csv数据导入mysql的方法
- 【MySQL】数据导出导入成CSV格式
- csv数据导入导出mysql的方法
- mysql 导入CSV数据 [转]