JAVA多线程读取同一个文件,加速对文件内容的获取
2018-03-21 20:29
621 查看
前几天,朋友托我帮个忙,问我能不能用多线程的方式,读取一个文件,获取里面的内容。他大概想做的事情,就是读取文件里面每一行的内容,然后分析一下,再插入到数据库这样。但是,由于他那个记录内容的文件实在是太大了,虽然他弄成了单生产者-多消费者的模型,整体的处理速度还是非常的慢,因为读取速度不够快。所以,他就问我要怎么多线程读取同一个文件里面的内容,形成多生产者-多消费者的模型,从而提高速度。
因此就有了下面的demo试的代码,只要传一个文件路径,读取文件的线程数,分隔符,回调这4个参数即可,并且还配上了测试代码。
下面是我本地跑出来的测试结果(测试文件,是一个190MB大的文件):
3线程(本机2核4线程) 耗时 3231498毫秒
2线程 耗时 278592毫秒
单线程 耗时397115毫秒
cpu线程数(4线程)耗时245657 毫秒
[java] view plain copypackage demo.demo;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.security.InvalidParameterException;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadReadFileHelper {
// 模拟数据
private static void writeData() throws FileNotFoundException, IOException {
FileOutputStream fileOutputStream = new FileOutputStream("C:\\Users\\lianghaohui\\Desktop\\test.txt");
Random random = new Random();
for (int n = 0; n < 1000000; n++) {
int count = random.nextInt(10) + 1;
StringBuilder builder = new StringBuilder();
for (int i = 0; i < count; i++) {
builder.append(UUID.randomUUID().toString());
}
builder.append("\n");
fileOutputStream.write(builder.toString().getBytes());
}
fileOutputStream.close();
System.out.println("ok");
}
private static AtomicInteger atomicInteger = new AtomicInteger(0);
// 231498耗时 3线程(本机2核4线程)
// 278592耗时 2线程
// 397115耗时 单线程
// 245657耗时 cpu线程数(4线程)
public static void main(String[] args) throws Exception {
long beginTime = System.currentTimeMillis();
ThreadReadFileHelper helper = new ThreadReadFileHelper();
helper.read("C:\\Users\\lianghaohui\\Desktop\\test.txt", Runtime.getRuntime().availableProcessors(), '\n', new StringCallback("UTF-8") {
@Override
void callback(String data) {
int count = atomicInteger.incrementAndGet();
System.out.println(count);
if (count == 1000000) {
System.out.println("总耗时毫秒:" + (System.currentTimeMillis() - beginTime));
System.out.println(data);
}
}
});
// RandomAccessFile randomAccessFile = new RandomAccessFile("C:\\Users\\lianghaohui\\Desktop\\test.txt", "r");
// while (true) {
// if (randomAccessFile.readLine() == null) {
// System.out.println("总耗时毫秒:" + (System.currentTimeMillis() - beginTime));
// break;
// } else {
// int count = atomicInteger.incrementAndGet();
// System.out.println(count);
// }
// }
// randomAccessFile.close();
}
public void read(String path, int threadCount, char separator, StringCallback callback) throws IOException {
if (threadCount < 1) {
throw new InvalidParameterException("The threadCount can not be less than 1");
}
if (path == null || path.isEmpty()) {
throw new InvalidParameterException("The path can not be null or empty");
}
if (callback == null) {
throw new InvalidParameterException("The callback can not be null");
}
RandomAccessFile randomAccessFile = new RandomAccessFile(path, "r");
long fileTotalLength = randomAccessFile.length();
long gap = fileTotalLength / threadCount;
long checkIndex = 0;
long[] beginIndexs = new long[threadCount];
long[] endIndexs = new long[threadCount];
for (int n = 0; n < threadCount; n++) {
beginIndexs
= checkIndex;
if (n + 1 == threadCount) {
endIndexs
= fileTotalLength;
break;
}
checkIndex += gap;
long gapToEof = getGapToEof(checkIndex, randomAccessFile, separator);
checkIndex += gapToEof;
endIndexs
= checkIndex;
}
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
executorService.execute(() -> {
try {
readData(beginIndexs[0], endIndexs[0], path, randomAccessFile, separator, callback);
} catch (Exception e) {
e.printStackTrace();
}
});
for (int n = 1; n < threadCount; n++) {
long begin = beginIndexs
;
long end = endIndexs
;
executorService.execute(() -> {
try {
readData(begin, end, path, null, separator, callback);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
private long getGapToEof(long beginIndex, RandomAccessFile randomAccessFile, char separator) throws IOException {
randomAccessFile.seek(beginIndex);
long count = 0;
while (randomAccessFile.read() != separator) {
count++;
}
count++;
return count;
}
private void readData(long begin, long end, String path, RandomAccessFile randomAccessFile, char separator, StringCallback callback) throws FileNotFoundException, IOException {
System.out.println("开始工作" + Thread.currentThread().getName());
if (randomAccessFile == null) {
randomAccessFile = new RandomAccessFile(path, "r");
}
randomAccessFile.seek(begin);
StringBuilder builder = new StringBuilder();
while (true) {
int read = randomAccessFile.read();
begin++;
if (separator == read) {
if (callback != null) {
callback.callback0(builder.toString());
}
builder = new StringBuilder();
} else {
builder.append((char) read);
}
if (begin >= end) {
break;
}
}
randomAccessFile.close();
}
public static abstract class StringCallback {
private String charsetName;
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public StringCallback(String charsetName) {
this.charsetName = charsetName;
}
private void callback0(String data) {
executorService.execute(() -> {
try {
callback(new String(data.getBytes("ISO-8859-1"), charsetName));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
}
abstract void callback(String data);
}
}
转自: http://blog.csdn.net/u014653197/article/details/78136568(非常感谢原创博主的整理)
因此就有了下面的demo试的代码,只要传一个文件路径,读取文件的线程数,分隔符,回调这4个参数即可,并且还配上了测试代码。
下面是我本地跑出来的测试结果(测试文件,是一个190MB大的文件):
3线程(本机2核4线程) 耗时 3231498毫秒
2线程 耗时 278592毫秒
单线程 耗时397115毫秒
cpu线程数(4线程)耗时245657 毫秒
[java] view plain copypackage demo.demo;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.security.InvalidParameterException;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class ThreadReadFileHelper {
// 模拟数据
private static void writeData() throws FileNotFoundException, IOException {
FileOutputStream fileOutputStream = new FileOutputStream("C:\\Users\\lianghaohui\\Desktop\\test.txt");
Random random = new Random();
for (int n = 0; n < 1000000; n++) {
int count = random.nextInt(10) + 1;
StringBuilder builder = new StringBuilder();
for (int i = 0; i < count; i++) {
builder.append(UUID.randomUUID().toString());
}
builder.append("\n");
fileOutputStream.write(builder.toString().getBytes());
}
fileOutputStream.close();
System.out.println("ok");
}
private static AtomicInteger atomicInteger = new AtomicInteger(0);
// 231498耗时 3线程(本机2核4线程)
// 278592耗时 2线程
// 397115耗时 单线程
// 245657耗时 cpu线程数(4线程)
public static void main(String[] args) throws Exception {
long beginTime = System.currentTimeMillis();
ThreadReadFileHelper helper = new ThreadReadFileHelper();
helper.read("C:\\Users\\lianghaohui\\Desktop\\test.txt", Runtime.getRuntime().availableProcessors(), '\n', new StringCallback("UTF-8") {
@Override
void callback(String data) {
int count = atomicInteger.incrementAndGet();
System.out.println(count);
if (count == 1000000) {
System.out.println("总耗时毫秒:" + (System.currentTimeMillis() - beginTime));
System.out.println(data);
}
}
});
// RandomAccessFile randomAccessFile = new RandomAccessFile("C:\\Users\\lianghaohui\\Desktop\\test.txt", "r");
// while (true) {
// if (randomAccessFile.readLine() == null) {
// System.out.println("总耗时毫秒:" + (System.currentTimeMillis() - beginTime));
// break;
// } else {
// int count = atomicInteger.incrementAndGet();
// System.out.println(count);
// }
// }
// randomAccessFile.close();
}
public void read(String path, int threadCount, char separator, StringCallback callback) throws IOException {
if (threadCount < 1) {
throw new InvalidParameterException("The threadCount can not be less than 1");
}
if (path == null || path.isEmpty()) {
throw new InvalidParameterException("The path can not be null or empty");
}
if (callback == null) {
throw new InvalidParameterException("The callback can not be null");
}
RandomAccessFile randomAccessFile = new RandomAccessFile(path, "r");
long fileTotalLength = randomAccessFile.length();
long gap = fileTotalLength / threadCount;
long checkIndex = 0;
long[] beginIndexs = new long[threadCount];
long[] endIndexs = new long[threadCount];
for (int n = 0; n < threadCount; n++) {
beginIndexs
= checkIndex;
if (n + 1 == threadCount) {
endIndexs
= fileTotalLength;
break;
}
checkIndex += gap;
long gapToEof = getGapToEof(checkIndex, randomAccessFile, separator);
checkIndex += gapToEof;
endIndexs
= checkIndex;
}
ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
executorService.execute(() -> {
try {
readData(beginIndexs[0], endIndexs[0], path, randomAccessFile, separator, callback);
} catch (Exception e) {
e.printStackTrace();
}
});
for (int n = 1; n < threadCount; n++) {
long begin = beginIndexs
;
long end = endIndexs
;
executorService.execute(() -> {
try {
readData(begin, end, path, null, separator, callback);
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
private long getGapToEof(long beginIndex, RandomAccessFile randomAccessFile, char separator) throws IOException {
randomAccessFile.seek(beginIndex);
long count = 0;
while (randomAccessFile.read() != separator) {
count++;
}
count++;
return count;
}
private void readData(long begin, long end, String path, RandomAccessFile randomAccessFile, char separator, StringCallback callback) throws FileNotFoundException, IOException {
System.out.println("开始工作" + Thread.currentThread().getName());
if (randomAccessFile == null) {
randomAccessFile = new RandomAccessFile(path, "r");
}
randomAccessFile.seek(begin);
StringBuilder builder = new StringBuilder();
while (true) {
int read = randomAccessFile.read();
begin++;
if (separator == read) {
if (callback != null) {
callback.callback0(builder.toString());
}
builder = new StringBuilder();
} else {
builder.append((char) read);
}
if (begin >= end) {
break;
}
}
randomAccessFile.close();
}
public static abstract class StringCallback {
private String charsetName;
private ExecutorService executorService = Executors.newSingleThreadExecutor();
public StringCallback(String charsetName) {
this.charsetName = charsetName;
}
private void callback0(String data) {
executorService.execute(() -> {
try {
callback(new String(data.getBytes("ISO-8859-1"), charsetName));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
});
}
abstract void callback(String data);
}
}
转自: http://blog.csdn.net/u014653197/article/details/78136568(非常感谢原创博主的整理)
相关文章推荐
- JAVA多线程读取同一个文件,加速对文件内容的获取
- java中的文件读取和文件写出:如何从一个文件中获取内容以及如何向一个文件中写入内容
- java的JFileChooser上传一个Excel文件并读取该文件的内容
- java多线程同时读取一个文件
- [Java] Java序列化将一个对象的内容保存到文件和从文件读取对象
- java中多线程读取同一个文件的不同位置,多线程读取文件
- Java中读取一个TXT文件中的每行内容的前5个字符,并保存到另一个TXT文件中。
- 如何用java读取一个txt 文件内的内容并把它赋值与String里?
- java读取ftp文件,并获取文件内容
- 使用java读取txt里边的文件内容并获取大小(M).txt
- Java 多线程读取一个文件
- java多线程读取一个文件
- java读取pdf文件内容
- [Java] 利用xpdf库获取pdf文件的指定范围文本内容
- java读取文件,写文件,读取网页内容
- 一个读取xml文件内容的类
- 使用Java读取Excel文件内容
- java读取pdf文件内容
- 一个JAVA读取.properties文件的例子(lp)
- 将后台数据读取到前台的EXCEL文件中去,用javascript实现,asp.net,javacript(发一个原创)