您的位置:首页 > 其它

文件的合并排序与文件分割

2017-03-10 17:01 796 查看
背景:一个文件内有多条数据记录,每条记录为一行,记录按时间字段升序排序。

需求1:将多个这样的文件合并成一个按时间排序的文件

需求2:将一个按数据记录时间字段排好序的大文件分割成几个小文件

代码

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.LineNumberReader;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 假定个文件内的数据有序
*
* @author zsm
* @date 2017年3月9日 下午2:50:26
*/
public class Main_MultiFileMergeSort {

public static void main(String[] args) throws IOException {
// TODO Auto-generated method stub
if (args.length == 4 && Integer.parseInt(args[0]) == 1) {// merge sort
int threadNum = Integer.parseInt(args[1]);
String fileParentPath = args[2];
String containedStr = args[3];

FileSort fileSort = new FileSort(true);
System.out.println("file mergeing...");
long startTime = System.currentTimeMillis();

String resultFileName = fileSort.mergeSort(threadNum, fileParentPath, containedStr);

System.out.println("done.time used:" + (System.currentTimeMillis() - startTime) + " ms");
System.out.println("resultFileName: " + resultFileName + ", is sorted correct: "
+ FileSort.isAscendingOrder(fileParentPath, resultFileName));
} else if (args.length == 4 && Integer.parseInt(args[0]) == 2) {// file split
String fileParentPath = args[1];
String srcFileName = args[2];
int splitedFileNum = Integer.parseInt(args[3]);

System.out.println("file spliting...");
long startTime = System.currentTimeMillis();

FileSort.splitFile(fileParentPath, srcFileName, false, splitedFileNum);

System.out.println("done.time used:" + (System.currentTimeMillis() - startTime) + " ms");
} else {
System.out.println("\n*************");
System.out.println("arguments of merge sort operation: 1  threadNum  fileParentPath  containedStr");
System.out.println("arguments of file split operation: 2  fileParentPath  srcFileName  splitedFileNum");
System.out.println("*************\n");
}
}

public static void fileSplitTest() {
String parentPath = "F:/";
System.out.println("file spliting...");
long startTime = System.currentTimeMillis();

FileSort.splitFile(parentPath, "17915_main_acttmp.txt", false, 10);

System.out.println("done.time used:" + (System.currentTimeMillis() - startTime) + " ms");
}

public static void fileSortTest() throws IOException {
String parentPath = "F:/2016-11-10";

FileSort fileSort = new FileSort(true);
System.out.println("file mergeing...");
long startTime = System.currentTimeMillis();

String resultFileName = fileSort.mergeSort(4, parentPath, "gps.txt");

System.out.println("done.time used:" + (System.currentTimeMillis() - startTime) + " ms");
System.out.println("resultFileName: " + resultFileName + ", is sorted correct: "
+ FileSort.isAscendingOrder(parentPath, resultFileName));
}
}

class FileSort {
/**
* 是否删除排序过程产生的临时文件
*/
private boolean isDeleteIntermediateFile;

/**
* 以唯一的数字来作为中间文件的文件名,数字的初始值
*/
private AtomicInteger count = new AtomicInteger(0);

public FileSort(boolean isDeleteIntermediateFile) {
this.isDeleteIntermediateFile = isDeleteIntermediateFile;
}

/**
* 将给定的两个文件合并.<br>
* 为了在得到合并结果后删除中间产生的文件时不至于把原始文件也删掉,通过文件名来区别:中间产生的文件的名字包含"_acttmpf",因此原始数据文件不能包含该字符串
*
* @return 合并后的文件名
*/
public String mergeSort(String fileParentPath, String srcFileName1, String srcFileName2) {
String strForIdentifyIntermediateFile = "_acttmpf";
String tmpOutPutFileName = count.getAndIncrement() + "_" + Thread.currentThread().getName()
+ strForIdentifyIntermediateFile + ".txt";
try {
String tmpOutPutFilePath = fileParentPath + "/" + tmpOutPutFileName;
File file1 = new File(fileParentPath + "/" + srcFileName1);
File file2 = new File(fileParentPath + "/" + srcFileName2);

BufferedReader file1BufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file1)));
BufferedReader file2BufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(file2)));
BufferedWriter tmpOutFile = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(tmpOutPutFilePath)));
// System.out.println("tmpFile:" + tmpOutPutFilePath);

String tmpTitle, tmpStr1, tmpStr2;
String[] tmpSplitStrs;
int tmpGpstime1, tmpGpstime2;
tmpTitle = file1BufferedReader.readLine();// 去掉表头,下同
file2BufferedReader.readLine();
writeLine(tmpOutFile, tmpTitle);

tmpStr1 = file1BufferedReader.readLine();
tmpStr2 = file2BufferedReader.readLine();
do {
if (tmpStr1 == null || tmpStr2 == null) {
break;
} else {
tmpSplitStrs = tmpStr1.split(",");
tmpGpstime1 = Integer.parseInt(tmpSplitStrs[tmpSplitStrs.length - 1]);
tmpSplitStrs = tmpStr2.split(",");
tmpGpstime2 = Integer.parseInt(tmpSplitStrs[tmpSplitStrs.length - 1]);
if (tmpGpstime1 < tmpGpstime2) {
writeLine(tmpOutFile, tmpStr1);
tmpStr1 = file1BufferedReader.readLine();
} else {
writeLine(tmpOutFile, tmpStr2);
tmpStr2 = file2BufferedReader.readLine();
}
}
} while (true);
if (tmpStr1 != null) {
do {
writeLine(tmpOutFile, tmpStr1);
tmpStr1 = file1BufferedReader.readLine();
} while (tmpStr1 != null);
}
if (tmpStr2 != null) {
do {
writeLine(tmpOutFile, tmpStr2);
tmpStr2 = file2BufferedReader.readLine();
} while (tmpStr2 != null);
}

file1BufferedReader.close();
file2BufferedReader.close();
tmpOutFile.close();

if (isDeleteIntermediateFile) {
// 删除中间产生的文件
if ((srcFileName1.indexOf(strForIdentifyIntermediateFile) != -1) && file1.exists()) {
file1.delete();
}
if ((srcFileName2.indexOf(strForIdentifyIntermediateFile) != -1) && file2.exists()) {
file2.delete();
}
}

} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return tmpOutPutFileName;
}

/**
* 将给定的多个文件合并
*
* @param fileParentPath
*            被排序文件所在目录的路径
* @param fileNameList
*            文件名数组
* @param posStart
* @param posEnd
*            文件名数组中[posStart,posEnd]间的文件才会参与合并排序
* @return 最终排好序的文件的文件名
*/
public String mergeSort(String fileParentPath, String[] fileNameList, int posStart, int posEnd) {
if (fileNameList == null || posStart < 0 || posEnd >= fileNameList.length || posStart > posEnd) {
System.err.println("error:one of the following condition is satified:");
System.err
.println("fileNameList == null || posStart<0 || posEnd >= fileNameList.length || posStart>posEnd");
return null;
} else if (posEnd - posStart == 0) {// 对一个文件排序
return fileNameList[posStart];
} else if (posEnd - posStart == 1) {// 对两个文件排序
return mergeSort(fileParentPath, fileNameList[posStart], fileNameList[posEnd]);
} else {
int posMid = (posStart + posEnd) / 2;
String srcFileName1 = mergeSort(fileParentPath, fileNameList, posStart, posMid);
String srcFileName2 = mergeSort(fileParentPath, fileNameList, posMid + 1, posEnd);
return mergeSort(fileParentPath, srcFileName1, srcFileName2);
}
}

/**
* 对给定目录的所有文件进行合并排序,要求该目录下都为文件,不能有目录
*
* @param fileParentPath
*            被排序文件所在目录的路径
* @return 最终排好序的文件的文件名
*/
public String mergeSort(String fileParentPath) {
File[] fileList = new File(fileParentPath).listFiles();
String[] fileNameList = new String[fileList.length];
System.out.println(fileNameList.length + " files in " + fileParentPath);
for (int i = 0; i < fileNameList.length; i++) {
fileNameList[i] = fileList[i].getName();
// System.out.println(fileNameList[i]);
}
return mergeSort(fileParentPath, fileNameList, 0, fileNameList.length - 1);
}

/**
* 对文件名能被正则条件匹配的文件进行排序
*
* @param fileParentPath
*            被排序文件所在目录的路径
* @param containedStr
*            文件名包含此字符串的文件才会加入排序
* @return 最终排好序的文件的文件名
*/
public String mergeSort(String fileParentPath, String containedStr) {
String[] fileNameList = getMatchedFileNames(fileParentPath, containedStr);
return mergeSort(fileParentPath, fileNameList, 0, fileNameList.length - 1);
}

/**
* 用多线程对文件名能被正则条件匹配的文件进行排序
*
* @param threadNum
*            线程数
* @param fileParentPath
*            被排序文件所在目录的路径
* @param containedStr
*            文件名包含此字符串的文件才会加入排序
* @return 最终排好序的文件的文件名
*/
public String mergeSort(int threadNum, String fileParentPath, String containedStr) {

String[] fileNameList = getMatchedFileNames(fileParentPath, containedStr);

if (threadNum > 1 && fileNameList.length > 2) {// 待合并文件至少3个且线程数至少2个时才用多线程
// 分多个线程进行合并
SortThread[] sortThread = new SortThread[threadNum];
int fileCountPerThread = fileNameList.length / threadNum;
int tmpPosStart, tmpPosEnd;
for (int i = 0; i < threadNum; i++) {
tmpPosStart = i * fileCountPerThread;
tmpPosEnd = (i == threadNum - 1) ? (fileNameList.length - 1) : (tmpPosStart + fileCountPerThread - 1);
sortThread[i] = new SortThread(isDeleteIntermediateFile, fileParentPath, fileNameList, tmpPosStart,
tmpPosEnd);
sortThread[i].start();
}
// 等各线程操作完成
for (int i = 0; i < threadNum; i++) {
try {
sortThread[i].join();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
// 获得每个线程合并成的文件名
fileNameList = new String[threadNum];
for (int i = 0; i < threadNum; i++) {
fileNameList[i] = sortThread[i].getResultFileName();
}
}

// 将每个线程合并产生的文件合并
return mergeSort(fileParentPath, fileNameList, 0, fileNameList.length - 1);
}

class SortThread extends Thread {
private boolean isDeleteIntermediateFile;
private String fileParentPath;
private String[] fileNameList;
private int posStart;
private int posEnd;

private String resultFileName;

public SortThread(boolean isDeleteIntermediateFile, String fileParentPath, String[] fileNameList, int posStart,
int posEnd) {
super();
this.isDeleteIntermediateFile = isDeleteIntermediateFile;
this.fileParentPath = fileParentPath;
this.fileNameList = fileNameList;
this.posStart = posStart;
this.posEnd = posEnd;
}

@Override
public void run() {
// TODO Auto-generated method stub
System.out.println(Thread.currentThread().getName() + ": [" + posStart + "," + posEnd + "]");
this.resultFileName = (new FileSort(isDeleteIntermediateFile)).mergeSort(fileParentPath, fileNameList,
posStart, posEnd);
}

public String getResultFileName() {
return this.resultFileName;
}
}

private String[] getMatchedFileNames(String fileParentPath, String containedStr) {
// 获取匹配到的文件
File[] fileList = new File(fileParentPath).listFiles();
ArrayList<String> selectedFileNameList = new ArrayList<>();
String tmpFileName;
for (int i = 0; i < fileList.length; i++) {
tmpFileName = fileList[i].getName();
if (fileList[i].isFile() && (tmpFileName.indexOf(containedStr) != -1)) {
// System.out.println(tmpFileName);
selectedFileNameList.add(tmpFileName);
}
}
System.out.println(selectedFileNameList.size() + " files in " + fileParentPath);
if (selectedFileNameList.size() == 0) {
System.err.println("no file matched in " + fileParentPath);
}
// 得到要进行合并排序的文件列表
String[] fileNameList = new String[selectedFileNameList.size()];
for (int i = 0; i < fileNameList.length; i++) {
fileNameList[i] = selectedFileNameList.get(i);
}
return fileNameList;
}

private void writeLine(BufferedWriter bufferedWriter, String msg) throws IOException {
bufferedWriter.write(msg + "\n");
}

/**
* 判断文件记录是否按gps时间升序排
*/
public static boolean isAscendingOrder(String fileParentPath, String fileName) throws IOException {
if (fileParentPath == null || fileName == null) {
return true;
}
BufferedReader file = new BufferedReader(
new InputStreamReader(new FileInputStream(fileParentPath + "/" + fileName)));
String tmpStr;
String[] tmpSplitStrs;
int lastGpstime, curGpstime;
tmpStr = file.readLine();// 略过表头
tmpStr = file.readLine();// 读第一行

if (tmpStr == null) {
return false;
}

tmpSplitStrs = tmpStr.split(",");
lastGpstime = Integer.parseInt(tmpSplitStrs[tmpSplitStrs.length - 1]);
while ((tmpStr = file.readLine()) != null) {
tmpSplitStrs = tmpStr.split(",");
curGpstime = Integer.parseInt(tmpSplitStrs[tmpSplitStrs.length - 1]);
if (lastGpstime > curGpstime) {
return false;
} else {
lastGpstime = curGpstime;
}
}
return true;
}

/**
* 文件分裂成多个
*/
public static void splitFile(String fileParentPath, String srcFileName, boolean isDeleteSrcFile,
int splitedFileNum) {
if (splitedFileNum < 1) {
System.err.println("splitedFileNum " + splitedFileNum + " is less than 1");
return;
}
File srcFile = new File(fileParentPath + "/" + srcFileName);
// 获取总行数
try {
int srcFileLines = getFileLines(srcFile);
if (srcFileLines < splitedFileNum) {
System.out.println("splitedFileNum " + splitedFileNum + " is set to be srcFileLines " + srcFileLines);
splitedFileNum = srcFileLines;
}

// 分割文件
String srcFileNameWithoutExtension = srcFileName.substring(0, srcFileName.indexOf('.'));
String srcFileExtension = srcFileName.substring(srcFileName.indexOf('.'));
int splitedFileLines = srcFileLines / splitedFileNum;
BufferedReader br = new BufferedReader(new FileReader(srcFile));
System.out.println(srcFileLines + " lines are splited into " + splitedFileNum + " files, each with "
+ splitedFileLines + " lines.");
String tmpLine;
for (int i = 0; i < splitedFileNum; i++) {
BufferedWriter bw = new BufferedWriter(new FileWriter(
fileParentPath + "/" + srcFileNameWithoutExtension + "_splited_" + i + srcFileExtension));
for (int j = 0; j < splitedFileLines; j++) {
tmpLine = br.readLine();
if (tmpLine != null) {
bw.write(tmpLine + "\n");
}
}
if (i == splitedFileNum - 1) {// 平均分下去还多出的行都写到最后一个文件里
while ((tmpLine = br.readLine()) != null) {
bw.write(tmpLine + "\n");
}
}
bw.flush();
bw.close();
}
br.close();
if (isDeleteSrcFile && srcFile.exists()) {
srcFile.delete();
}
} catch (IOException e) {
e.printStackTrace();
}
}

/**
* 获取文件总行数
*
* @throws IOException
*/
public static int getFileLines(File srcFile) throws IOException {
LineNumberReader reader = new LineNumberReader(new FileReader(srcFile));
reader.skip(Long.MAX_VALUE);
int srcFileLines = reader.getLineNumber() + 1;
reader.close();
return srcFileLines;
}
}


View Code
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: