您的位置:首页 > 理论基础 > 计算机网络

Java多线程环境下如何高效安全处理数据(输入输出流、文件、网络等)(一)

2013-12-05 12:38 971 查看



博客分类: 

java多线程网络应用JavathreadBlog 

      本博客属原创文章,欢迎转载!转载请务必注明出处:http://guoyunsky.iteye.com/blog/867469

    本博客已迁移本人独立博客: http://www.yun5u.com/

   

  这个标题可能有些歧义,我也不知道该取什么标题,知道的同学帮忙取下.同时这只是我平时的一个总结,

问题估计会有很多,大家帮忙指正,谢谢!
这里先说下应用场景,比如:
   1)需要一直处理一个文件目录,处理里面的文件.文件过多,单线程恐怕速度跟不上,于是使用多线程.

    2)网络下载,需要下载很多URL.单线程也是速度跟不上,于是一个URL用一个线程去下载并处理(如爬虫,这也是我写爬虫的总结).

 
再说下如何高效安全:
    1)以上不可能每一个文件,每一个URL就开一个线程去处理.肯定是先初始化一个线程池,然后将文件、URL放入一个容器(比如队列),然后线程从容器里获取数据去处理,处理完了,就再获取,如此直到处理完毕.

    2)可能文件或者URL的数据会很大,足让你内存溢出.或者多个线程的数据加起来也足以让你内存爆掉.那肯定要设置内存装载的数据大小限制,也就是所谓的缓存。当缓存写满了,再考虑写入文件.

    3)由于线程固定,缓存也是固定,写入的文件也是固定.那这些都是可循环利用的对象。不可能每一次处理都是new,那是极大的浪费。所以可以固定线程的个数,缓存的大小(可以控制内存大小),甚至那个备份文件也是一直可以循环所有的对象

  
所需要的东西:
    1)干活的线程

    2)接活的容器

    3)线程池

    4)可将数据放入内存达到一定阀值后再写入文件的类,同时提供返回数据的功能(内存和文件里的数据都得返回).返回数据也有多种形式,字符串、流?同时也得考虑循环利用,毕竟也是固定的

   5)附加功能:

         a.内容可以指纹化(MD5或SHA1)

         b.可以如InputStream的mark,reset等.毕竟这一切都可以当做输入输出流来处理,我接下来的代码也是

         c.可以控制处理的速度,比如这个场景是下载URL(网络爬虫),我不想下载速度过快.

         d.待补充

 

大概的设计:
   1)干活的线程 MyThread

   2)接活的容器 具体看你的应用,例子里有

    3)线程池   具体也看你的应用,我这里只是测试代码里弄个线程组

    4)处理数据的类:

          a.读数据到内存或文件中的类:MyOutputStream

          b.MyOutputStream里面又要返回数据的类:MyInputStream

 

接下来开始贴代码了.

1)可以读取数据,如果数据过多达到缓存,可以写入文件的类

Java代码  



import java.io.File;  

import java.io.FileOutputStream;  

import java.io.IOException;  

import java.io.OutputStream;  

  

public class MyOutputStream extends OutputStream {  

    private boolean isOpen;         // 是否已经打开  

    private long size;              // 数据总大小  

    private String backedFileName;  // 超出缓存,要写入到的文件名  

    private OutputStream diskStream;// 超出缓存,写入到文件的OutputStream  

    private byte[] buffer;          // 缓存  

    private long position;          // 当前位置  

    private boolean recording;      // 是否记录数据中  

      

      

    public MyOutputStream(int bufferSize,String backedFileName){  

        this.buffer=new byte[bufferSize];  

        this.backedFileName=backedFileName;  

        this.recording=true;  

    }  

      

    public void open() throws IOException{  

        if(isOpen()){  

            throw new IOException("MyOutputStream already open for ".concat(Thread.currentThread().getName()));  

        }  

        isOpen=true;  

        this.position=0;  

        this.size=0;  

        this.recording=true;  

          

        closeDiskStream();  

          

        this.diskStream=new FileOutputStream(this.backedFileName);  

          

    }  

      

    private void closeDiskStream() throws IOException{  

        if(this.diskStream!=null){  

            diskStream.close();  

            diskStream=null;  

        }  

    }  

      

    public void closeRecorder() throws IOException{  

        recording=false;  

        closeDiskStream();  

        if(this.size==0){  

            this.size=position;  

        }  

    }  

      

    public boolean isOpen(){  

        return isOpen;  

    }  

    // 记录一个字节  

    private void record(int b) throws IOException{  

        if(this.position>=this.buffer.length){  

            this.diskStream.write((byte)b);  

        }else{  

            buffer[(int)position]=(byte)b;  

        }  

        this.position++;  

    }  

    // 记录多个字节  

    private void record(byte[] b,int off,int len) throws IOException{  

        if(position>=this.buffer.length){   // 如果缓存已经满了,则写入硬盘  

            if(this.diskStream==null){  

                throw new IOException("diskStream is null for ".concat(Thread.currentThread().getName()));  

            }  

            this.diskStream.write(b, off, len); // 写入硬盘  

            this.position+=len; // 位置增加  

        }else{  // 没满,则写入缓存.如果此时缓存写满了,则再写入磁盘  

            int toCopy=Math.min(this.buffer.length-(int)this.position, len);    // 计算要写入缓存的长度,不让缓存爆掉  

            System.arraycopy(b, off, this.buffer, (int)this.position, len);     // 拷贝到缓存  

            this.position+=toCopy;  

              

            if(toCopy<len){ // 如果缓存已满,则将剩下的数据写入硬盘  

                //this.diskStream.write(b,off+toCopy,len-toCopy);       

                record(b,off+toCopy,len-toCopy);    // 为什么不直接用上一行代码?需要验证diskStream  

            }  

        }  

    }  

    // 写入数据  

    @Override  

    public void write(int b) throws IOException {  

        if(recording){  

            record(b);  

        }  

    }  

    // 写入数据  

    @Override  

    public void write(byte[] b, int off, int len) throws IOException {  

       if(recording){  

           record(b,off,len);  

       }  

    }  

    // 写入数据  

    @Override  

    public void write(byte[] b) throws IOException {  

        if(recording){  

            record(b,0,b.length);  

        }  

    }  

     

    // 关闭,关闭了才能获得长度  

    @Override  

    public void close() throws IOException {  

        isOpen=false;  

        closeRecorder();  

    }  

      

      

     

    // 刷新  

    @Override  

    public void flush() throws IOException {  

       if(this.diskStream!=null){  

           this.diskStream.flush();  

       }  

    }  

    // 获得数据大小  

    public long getSize() {  

        return size;  

    }  

      

    public static void main(String[] args) {  

        String dir=new File("").getAbsolutePath().concat(File.separator);  

        String fileMemory=dir.concat("fileMemory.txt");  

        String fileDisk=dir.concat("fileDisk.txt");  

        int bufferSize=5;  

        MyOutputStream mosMemory=null;  

        MyOutputStream mosDisk=null;  

        try {  

            mosMemory=new MyOutputStream(bufferSize,fileMemory);  

            mosMemory.open();  

              

            mosDisk=new MyOutputStream(bufferSize,fileDisk);  

            mosDisk.open();  

              

            for(int i=0;i<100;i++){  

                if(i<bufferSize){  

                    mosMemory.write(i);  

                }  

                mosDisk.write(i);  

            }  

              

            mosMemory.close();  

            mosDisk.close();  

              

            System.out.println("mosMemory length:"+mosMemory.getSize());  

            System.out.println("mosDisk length:"+mosDisk.getSize());  

              

        } catch (IOException e) {  

            // TODO Auto-generated catch block  

            e.printStackTrace();  

        }finally{  

              

        }  

    }  

  

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐