您的位置:首页 > 数据库 > MySQL

百万行csv数据导入mysql的处理

2015-04-15 10:50 99 查看
公司有个企业精准营销系统项目,需求就是客户导出他们的客户信息数据和交易数据,都是csv文件,交易数据级别是百万级别,甚至达到千万,用我们系统进行分析时,需要导入mysql数据库中,方便统计分析。

开始没有考虑数据量,我们思路是按行解析csv文件,批量导入mysql中,后来测试发现,对于10w行内数据,基本保持在20秒左右时间导入完成。当数据达到100w行时,导入用时是190秒,太慢了。我们换了一个思路,可以利用多线程,充分利用计算机资源。我们根据文件的大小,拆分成多个文件。每个线程来处理一个文件,最大线程是10个。经测试,导入数据库时间明显减少,用下表数据来展示结果:

附:每5000行批处理提交一次,文件按20000行分割成一个文件,如果大于20w行,则平分成10个文件。

电脑配置:intel i5-2.50GHZ 内存8G 64位操作系统

500w行记录(约330M)

次数\方式单线程执行(单位:毫秒)多线程执行(单位:毫秒)相差时间(单位:毫秒)
1922099573437348662
2946358586947359411
3956893574340382553
100w行记录

次数\方式单线程执行(单位:ms)多线程执行(单位:ms)
1183951100737
2163927112452
3194593106406
20w行记录

次数\方式单线程执行(单位:ms)多线程执行(单位:ms)
14985921700
25469421891
34897621989
10w行记录

次数\方式单线程执行(单位:ms)多线程执行(单位:ms)
11894912641
21995813646
31907512843
多线程处理代码如下:

MainTask.java

package com.uec.cbi.biz.bigdata;

import java.util.List;

import org.apache.log4j.Logger;

public class MainTask {
public static final Logger logger = Logger.getLogger(MainTask.class);
//多线程启动任务
public MainTask(String csvfile,String targetPath){

long time1 = System.currentTimeMillis();
List<String> list = SplitFileUtil.separateFileByLine(csvfile, targetPath);
long time2 = System.currentTimeMillis();
logger.info("分割文件时间:"+(time2-time1));
for(int i=0;i<list.size();i++){
logger.info("线程"+i+"开启......");
String targetFile = list.get(i);
BatchDataThread bdt = new BatchDataThread(targetFile);
Thread th = new Thread(bdt);
th.start();
}
}
public static void main(String[] args) {

}

}
SplitFileUtil.java

package com.uec.cbi.biz.bigdata;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Logger;

import com.uec.cbi.constants.Constants;

/**
* 处理csv文件类
*
* @author srchen
*
*/
public class SplitFileUtil {
public static final Logger logger = Logger.getLogger(SplitFileUtil.class);
public static final String _encoding = "GB2312";
public static final DecimalFormat df = new DecimalFormat("#.00");

/**
* 分割文件
* @param sourceFileUrl 源文件路径
*
*/
public static List<String> separateFileByLine(String sourceFileUrl,String targetPath){
List<String> filenames = new ArrayList<String>();
int file_count = 0;
//获取文件总行数
long lineNum = getFileLineNum(sourceFileUrl);
if(lineNum<=Constants.line_num_per_file){//不做分文件处理
filenames.add(sourceFileUrl);
}else{
//分割文件
if(lineNum%Constants.line_num_per_file==0){
file_count = (int)(lineNum/Constants.line_num_per_file);
}else{
file_count = (int)(lineNum/Constants.line_num_per_file+1);
}
logger.info("文件数目:"+file_count);
Long[] countArray  = null;
if(file_count>Constants.max_file_count){
file_count = Constants.max_file_count;
//将每个文件的起始索引记录下来
countArray  = new Long[file_count];
long file_start_index = 0;
int line_num_per_file = 0;
if(lineNum%file_count==0){
line_num_per_file = (int)(lineNum/file_count);
}else{
line_num_per_file = (int)(lineNum/file_count+1);
}
for(int i=0;i<file_count;i++){
file_start_index = (i+1)*line_num_per_file;
countArray[i] = file_start_index;
}
}else{
//将每个文件的起始索引记录下来
countArray  = new Long[file_count];
long file_start_index = 0;
for(int i=0;i<file_count;i++){
file_start_index = (i+1)*Constants.line_num_per_file;
countArray[i] = file_start_index;
}
}

/*处理文件*/
InputStreamReader fr = null;
BufferedReader br = null;
try {
fr = new InputStreamReader(new FileInputStream(sourceFileUrl),_encoding);
br = new BufferedReader(fr);
int count = 0;
int _index = 0;
String rec = null;// 一行
List<String> result = new ArrayList<String>();
//long time21 = System.currentTimeMillis();
// 读取一行
while ((rec = br.readLine()) != null) {
count++;
result.add(rec);
if(count==countArray[_index]){
//一次性写入文件
String targetFile  = targetPath+"data_part_"+_index+".csv";
writeCsv(targetFile,result);
result.clear();
_index++;
filenames.add(targetFile);
}
}
if(result.size()>0){
String targetFile  = targetPath+"data_part_"+(file_count-1)+".csv";
writeCsv(targetFile,result);
result.clear();
filenames.add(targetFile);
}
//long time22 = System.currentTimeMillis();
// System.out.println("while= time is:"+(time22-time21));
}catch(Exception e){
e.printStackTrace();
}finally {
try {
if(br!=null){
br.close();
}
if(fr!=null){
fr.close();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
return filenames;
}

public static void writeCsv(String csvFile,List<String> contentList){
BufferedWriter bw = null;
try {
bw = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(csvFile), _encoding), 1024);
for(int i=0;i<contentList.size();i++){
String str = contentList.get(i);
bw.write(str);
bw.newLine();
}
bw.flush();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally{
if(bw!=null){
try {
bw.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

public static void  main(String[] args){

String file = "c://R_HOME/user.csv";
separateFileByLine(file,"c://R_HOME//temp//");

}

public static long getFileLineNum(String filename){
long time1 = System.currentTimeMillis();
//取出文件总行数
long count = 0;
InputStreamReader fr = null;
BufferedReader br = null;
try {
fr = new InputStreamReader(new FileInputStream(filename),_encoding);
br = new BufferedReader(fr);
String rec = null;// 一行
long time21 = System.currentTimeMillis();
// 读取一行
while ((rec = br.readLine()) != null) {
count++;
}
long time22 = System.currentTimeMillis();
System.out.println("while= time is:"+(time22-time21));
}catch(Exception e){
e.printStackTrace();
}finally {
try {
if(br!=null){
br.close();
}
if(fr!=null){
fr.close();
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
long time2 = System.currentTimeMillis();
System.out.println("getFileLineNum==["+count+"] time is:"+(time2-time1));
return count;
}
}


BatchDataThread.java

package com.uec.cbi.biz.bigdata;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.log4j.Logger;

import com.uec.cbi.constants.Constants;
import com.uec.cbi.util.DBUtil;

public class BatchDataThread implements Runnable{
public static final Logger logger = Logger.getLogger(BatchDataThread.class);

public String fileName = null;
public Connection conns=null;
public BatchDataThread(String fileName){
this.fileName = fileName;
//this.conns = conn;
conns = DBUtil.getConnection();
}
@Override
public void run() {
long starttime = System.currentTimeMillis();
InputStreamReader fr = null;
BufferedReader br = null;

PreparedStatement pstmt = null;
try {
pstmt = conns.prepareStatement(Constants.strs_insert_table);
//logger.info("文件名称:"+fileName);
fr = new InputStreamReader(new FileInputStream(fileName));
br = new BufferedReader(fr);
String rec = null;// 一行
int count = 0;
// 读取一行
while ((rec = br.readLine()) != null) {
rec = rec.trim();
String[] values = rec.split(Constants.csv_sep);
for(int i=0;i<values.length;i++){
pstmt.setString(i+1, values[i]);
}
pstmt.addBatch();
count++;
if(count%Constants.batch_submit_count==0){
// 执行批量更新
int[] insertcount = pstmt.executeBatch();
//logger.info("【"+fileName+"】 提交成功:"+insertcount.length);
logger.info("【"+fileName+"】 提交成功记录数:"+count);
}
}
// 执行批量更新
int[] insertcount = new int[0];
if(count%Constants.batch_submit_count!=0){
insertcount = pstmt.executeBatch();
}
// 语句执行完毕,提交本事务
conns.commit();
logger.info("【"+fileName+"】提交成功:"+insertcount.length);
long endtime = System.currentTimeMillis();
logger.info("【"+fileName+"】task take time is :"+(endtime-starttime));

} catch (IOException ioe) {
ioe.printStackTrace();
}catch (Exception e) {
e.printStackTrace();
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (fr != null) {
try {
fr.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(pstmt!=null){
try {
pstmt.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
if(conns!=null){
try {
conns.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
File file = new File(fileName);
if(file.exists()){
file.delete();
logger.info("【"+fileName+"】删除成功.......");
}
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: