您的位置:首页 > 编程语言 > Java开发

JAVA多线程读取同一个文件,加速对文件内容的获取

2018-03-21 20:29 621 查看
 前几天,朋友托我帮个忙,问我能不能用多线程的方式,读取一个文件,获取里面的内容。他大概想做的事情,就是读取文件里面每一行的内容,然后分析一下,再插入到数据库这样。但是,由于他那个记录内容的文件实在是太大了,虽然他弄成了单生产者-多消费者的模型,整体的处理速度还是非常的慢,因为读取速度不够快。所以,他就问我要怎么多线程读取同一个文件里面的内容,形成多生产者-多消费者的模型,从而提高速度。
  因此就有了下面的demo试的代码,只要传一个文件路径,读取文件的线程数,分隔符,回调这4个参数即可,并且还配上了测试代码。

 下面是我本地跑出来的测试结果(测试文件,是一个190MB大的文件):
3线程(本机2核4线程) 耗时 3231498毫秒
2线程 耗时 278592毫秒
单线程 耗时397115毫秒
cpu线程数(4线程)耗时245657 毫秒

[java] view plain copypackage demo.demo;  
  
import java.io.FileNotFoundException;  
import java.io.FileOutputStream;  
import java.io.IOException;  
import java.io.RandomAccessFile;  
import java.io.UnsupportedEncodingException;  
import java.security.InvalidParameterException;  
import java.util.Random;  
import java.util.UUID;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.atomic.AtomicInteger;  
  
public class ThreadReadFileHelper {  
    // 模拟数据  
    private static void writeData() throws FileNotFoundException, IOException {  
        FileOutputStream fileOutputStream = new FileOutputStream("C:\\Users\\lianghaohui\\Desktop\\test.txt");  
        Random random = new Random();  
        for (int n = 0; n < 1000000; n++) {  
            int count = random.nextInt(10) + 1;  
            StringBuilder builder = new StringBuilder();  
  
            for (int i = 0; i < count; i++) {  
                builder.append(UUID.randomUUID().toString());  
            }  
  
            builder.append("\n");  
            fileOutputStream.write(builder.toString().getBytes());  
        }  
        fileOutputStream.close();  
        System.out.println("ok");  
    }  
  
    private static AtomicInteger atomicInteger = new AtomicInteger(0);  
  
    // 231498耗时 3线程(本机2核4线程)  
    // 278592耗时 2线程  
    // 397115耗时 单线程  
    // 245657耗时 cpu线程数(4线程)  
    public static void main(String[] args) throws Exception {  
        long beginTime = System.currentTimeMillis();  
        ThreadReadFileHelper helper = new ThreadReadFileHelper();  
        helper.read("C:\\Users\\lianghaohui\\Desktop\\test.txt", Runtime.getRuntime().availableProcessors(), '\n', new StringCallback("UTF-8") {  
            @Override  
            void callback(String data) {  
                int count = atomicInteger.incrementAndGet();  
                System.out.println(count);  
                if (count == 1000000) {  
                    System.out.println("总耗时毫秒:" + (System.currentTimeMillis() - beginTime));  
                    System.out.println(data);  
                }  
            }  
        });  
  
        // RandomAccessFile randomAccessFile = new RandomAccessFile("C:\\Users\\lianghaohui\\Desktop\\test.txt", "r");  
        // while (true) {  
        // if (randomAccessFile.readLine() == null) {  
        // System.out.println("总耗时毫秒:" + (System.currentTimeMillis() - beginTime));  
        // break;  
        // } else {  
        // int count = atomicInteger.incrementAndGet();  
        // System.out.println(count);  
        // }  
        // }  
        // randomAccessFile.close();  
    }  
  
    public void read(String path, int threadCount, char separator, StringCallback callback) throws IOException {  
  
        if (threadCount < 1) {  
            throw new InvalidParameterException("The threadCount can not be less than 1");  
        }  
  
        if (path == null || path.isEmpty()) {  
            throw new InvalidParameterException("The path can not be null or empty");  
        }  
  
        if (callback == null) {  
            throw new InvalidParameterException("The callback can not be null");  
        }  
  
        RandomAccessFile randomAccessFile = new RandomAccessFile(path, "r");  
  
        long fileTotalLength = randomAccessFile.length();  
        long gap = fileTotalLength / threadCount;  
        long checkIndex = 0;  
        long[] beginIndexs = new long[threadCount];  
        long[] endIndexs = new long[threadCount];  
  
        for (int n = 0; n < threadCount; n++) {  
            beginIndexs
 = checkIndex;  
            if (n + 1 == threadCount) {  
                endIndexs
 = fileTotalLength;  
                break;  
            }  
            checkIndex += gap;  
            long gapToEof = getGapToEof(checkIndex, randomAccessFile, separator);  
  
            checkIndex += gapToEof;  
            endIndexs
 = checkIndex;  
        }  
  
        ExecutorService executorService = Executors.newFixedThreadPool(threadCount);  
        executorService.execute(() -> {  
            try {  
                readData(beginIndexs[0], endIndexs[0], path, randomAccessFile, separator, callback);  
            } catch (Exception e) {  
                e.printStackTrace();  
            }  
        });  
  
        for (int n = 1; n < threadCount; n++) {  
            long begin = beginIndexs
;  
            long end = endIndexs
;  
            executorService.execute(() -> {  
                try {  
                    readData(begin, end, path, null, separator, callback);  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
            });  
        }  
    }  
  
    private long getGapToEof(long beginIndex, RandomAccessFile randomAccessFile, char separator) throws IOException {  
        randomAccessFile.seek(beginIndex);  
        long count = 0;  
  
        while (randomAccessFile.read() != separator) {  
            count++;  
        }  
  
        count++;  
  
        return count;  
    }  
  
    private void readData(long begin, long end, String path, RandomAccessFile randomAccessFile, char separator, StringCallback callback) throws FileNotFoundException, IOException {  
        System.out.println("开始工作" + Thread.currentThread().getName());  
        if (randomAccessFile == null) {  
            randomAccessFile = new RandomAccessFile(path, "r");  
        }  
  
        randomAccessFile.seek(begin);  
        StringBuilder builder = new StringBuilder();  
  
        while (true) {  
            int read = randomAccessFile.read();  
            begin++;  
            if (separator == read) {  
                if (callback != null) {  
                    callback.callback0(builder.toString());  
                }  
                builder = new StringBuilder();  
            } else {  
                builder.append((char) read);  
            }  
  
            if (begin >= end) {  
                break;  
            }  
        }  
        randomAccessFile.close();  
    }  
  
    public static abstract class StringCallback {  
        private String charsetName;  
        private ExecutorService executorService = Executors.newSingleThreadExecutor();  
  
        public StringCallback(String charsetName) {  
            this.charsetName = charsetName;  
        }  
  
        private void callback0(String data) {  
            executorService.execute(() -> {  
                try {  
                    callback(new String(data.getBytes("ISO-8859-1"), charsetName));  
                } catch (UnsupportedEncodingException e) {  
                    e.printStackTrace();  
                }  
            });  
  
        }  
  
        abstract void callback(String data);  
    }  
  
}  
转自: http://blog.csdn.net/u014653197/article/details/78136568(非常感谢原创博主的整理)
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  多线程 大文件