您的位置:首页 > 职场人生

黑马程序员 java自学日记二 Java中的管道流

2013-08-09 21:00 204 查看
1.引言
Java I/O系统是建立在数据流概念之上的,而在UNIX/Linux中有一个类似的概念,就是管道,它具有将一个程序的输出当作另一个程序的输入的能力。在Java中,可以使用管道流进行线程之间的通信,输入流和输出流必须相连接,这样的通信有别于一般的Shared
Data通信,其不需要一个共享的数据空间。
2.相关类及其关系
1)字节流:
分为管道输出流(PipedOutputStream)和管道输入流(PipedInputStream),利用
java.io.PipedOutputStream和java.io.PipedInputStream可以实现线程之间的二进制信息传输。如果要进行管道输出,则必须把输出流连在输入流上。
java.io.PipedOutputStream是java.io.OutputStream的直接子类,而java.io.
PipedInputStream是java.io.InputStream的直接子类。PipedOutputStream和PipedInputStream往往成对出现、配合使用。举例说明:
TestPipe.Java
import java.io.IOException;
publicclass TestPipe {
public static void main(String[] args) {
Send s = new Send();

Receive r = new Receive();

try {
//输出管道流.connect(输入管道流);

s.getPos().connect(r.getPis()); //
连接管道

} catch (IOException e) {

e.printStackTrace();

}

new Thread(s).start(); // 启动线程

new Thread(r).start(); //
启动线程

}

}
Receive.java
importjava.io.IOException;

import java.io.PipedInputStream;
classReceive implements Runnable { //实现Runnable接口

private PipedInputStream pis = null;
public Receive() {

this.pis = newPipedInputStream(); //
实例化输入流

}
public void run() {

byte b[] = new byte[1024];

int len = 0;

try {

len = this.pis.read(b); //
接收数据

} catch (IOException e) {

e.printStackTrace();

}

try {

this.pis.close();

} catch (IOException e) {

e.printStackTrace();

}

System.out.println("接收的内容为:" + new String(b, 0, len));

}
public PipedInputStream getPis() {

return pis;

}

}
Send.java
importjava.io.IOException;

import java.io.PipedOutputStream;

class Send implements Runnable {

// 实现Runnable接口

private PipedOutputStream pos = null;//管道输出流
public Send() {

this.pos = newPipedOutputStream();//
实例化输出流

}
public void run() {

String str = "HelloWorld!!!";

try {

this.pos.write(str.getBytes());//
输出信息

} catch (IOException e) {

e.printStackTrace();

}

try {

this.pos.close(); // 关闭输出流

} catch (IOException e) {

e.printStackTrace();

}

}
public PipedOutputStream getPos() { //
通过线程类得到输出流

return pos;

}

}
我们可以看到使用管道流,通过connect方法进行连接,实现了Send线程和Receive线程之间的通信。
注意:
PipedInputStream中实际是用了一个1024字节固定大小的循环缓冲区。写入PipedOutputStream的数据实际上保存到对应的
PipedInputStream的内部缓冲区。从PipedInputStream执行读操作时,读取的数据实际上来自这个内部缓冲区。如果对应的
PipedInputStream输入缓冲区已满,任何企图写入PipedOutputStream的线程都将被阻塞。而且这个写操作线程将一直阻塞,直至出现读取PipedInputStream的操作从缓冲区删除数据。这也就是说往PipedOutputStream写数据的线程Send若是和从PipedInputStream读数据的线程Receive是同一个线程的话,那么一旦Send线程发送数据过多(大于1024字节),它就会被阻塞,这就直接导致接受数据的线程阻塞而无法工作(因为是同一个线程嘛),那么这就是一个典型的死锁现象,这也就是为什么javadoc中关于这两个类的使用时告诉大家要在多线程环境下使用的原因了。

应用:过滤器模式

使用这个模式的典型例子是Unix的shell命令。这个模式的好处在于过滤器无需知道它与何种东西进行连接,并且这可以实现并行,而且系统的可扩展性可以根据添加删除或者改变Filter进行增强。
在这举一个不断计算平均值的例子,producer作为前端的数据源,不断产生随机数,通过pipe进入filter进行数据处理,然后通过第二个pipe就行后端处理。
importjava.util.*;

import java.io.*;
publicclass PipeTest

/* 建立3个线程(Producer、Filter、Consumer)类和两组通信管道,通过多线程将管道1的数据传送到管道2中,实现管道的通信。

* Producer => pout1->pin1 => Filter(pin1->pout2) => pout2->pin2 =>Consumer

*/

{

public static void main(Stringargs[]) {

try {

PipedOutputStream pout1 = newPipedOutputStream();

PipedInputStream pin1 = newPipedInputStream(pout1);
PipedOutputStream pout2 = newPipedOutputStream();

PipedInputStream pin2 = newPipedInputStream(pout2);
/* construct threads */
Producer prod = newProducer(pout1);

Filter filt = newFilter(pin1, pout2);

Consumer cons = newConsumer(pin2);
/* start threads */
prod.start();

filt.start();

cons.start();

} catch (IOException e) {

}

}

}
//前端:该类的作用是产生随机数,并将其放到管道1的输出流中

class Producer extends Thread {

private DataOutputStream out;//DataOutputStream是用于写入一些基本类型数据的类,此类的实例用于生成伪随机数流

private Random rand = new Random();
public Producer(OutputStream os) {

out = new DataOutputStream(os);

}
public void run() {

while (true) {

try {

double num =rand.nextDouble();

// 将double值直接写入流

out.writeDouble(num);

System.out.println("写入流中的值是:"
+num);

out.flush();

sleep(Math.abs(rand.nextInt()%10));//随机休眠一段时间

} catch (Exception e) {

System.out.println("Error: " + e);

}

}

}

}
//过滤器,起数据处理作用,读取管道1中输入流的内容,并将其放到管道2的输出流中

class Filter extends Thread {

private DataInputStream in;

private DataOutputStream out;

private double total = 0;

private int count = 0;
public Filter(InputStream is, OutputStreamos) {

in = new DataInputStream(is);

out = new DataOutputStream(os);

}
public void run() {

for (;;) {

try {

double x =in.readDouble(); //
读取流中的数据

total += x;

count++;

if (count != 0) {

double d = total /count;

out.writeDouble(d);//
将得到的数据平均值写入流

}

} catch (IOException e) {

System.out.println("Error: " + e);

}

}

}

}
//后端:读取管道2输入流的内容

class Consumer extends Thread {

private double old_avg = 0;

private DataInputStream in;
public Consumer(InputStream is) {

in = new DataInputStream(is);

}
public void run() {

for (;;) {

try {

double avg =in.readDouble();

if (Math.abs(avg -old_avg) > 0.01) {

System.out.println("现在的平均值是: " + avg);

System.out.println();

old_avg = avg;

}

} catch (IOException e) {

System.out.println("Error: " + e);

}

}

}
}
2)字符流
Java利用 java.io.PipedWriter和java.io.PipedReader在线程之间传输字符信息。与
java.io.PipedOutputStream和java.io.PipedInputStream类似,java.io.PipedWriter是java.io.Writer的直接子类,java.io.PipedReader是java.io.Reader的直接子类。PipedWriter拥有一个允许指定输入管道字符流的构造方法,而PipedReader拥有一个允许指定输出管道字符流的构造方法。从而使得PipedWriter和PipedReader往往成对出现、配合使用。
以典型KWIC系统为例,下边的代码演示了如何使用字符流并且使用了过滤器模式:ReadLineThread--Pipe1
--> ShiftThread -- Pipe2 --> SortLinesThread

importjava.io.BufferedReader;

import java.io.BufferedWriter;

import java.io.FileNotFoundException;

import java.io.FileReader;

import java.io.FileWriter;

import java.io.IOException;

import java.io.PipedReader;

import java.io.PipedWriter;

import java.util.ArrayList;

import java.util.Collections;

import java.util.StringTokenizer;

public class KwicPipe {

public static void main(String[]args) {

try {

//get the input and output path

String src = args[0];

String dest = args[1];

//(writeToShiftThread =>readFromShiftThread) = Pipe1

PipedReaderreadFromShiftThread = new PipedReader();

PipedWriterwriteToShiftThread = new PipedWriter(readFromShiftThread);

//(writeToSortLinesThread=> readFromSortLinesThread) = Pipe2

PipedReaderreadFromSortLinesThread = new PipedReader();

PipedWriterwriteToSortLinesThread = new PipedWriter(readFromSortLinesThread);

//ReadLineThread --Pipe1--> ShiftThread -- Pipe2 --> SortLinesThread

ReadLineThread R1 = newReadLineThread(writeToShiftThread,src);

ShiftThread R2 = newShiftThread(readFromShiftThread,writeToSortLinesThread);

SortLinesThread R3 = newSortLinesThread(readFromSortLinesThread,dest);

//Start the three processingthread

R1.start();

R2.start();

R3.start();

}

catch (IOException e) {

System.out.println("NOI/O");

}

}

}

// read the content of kwici.dat and send the lines to another thread

class ReadLineThread extends Thread {

PipedWriter PipeIn;

String InputFilename= null;

ReadLineThread(PipedWriterPlaceInPipe, String InputFilename) {

PipeIn = PlaceInPipe;

this.InputFilename =InputFilename;

}

private BufferedReaderfileopen(String InputFilename) {

BufferedReader input_file = null;

try {

input_file = newBufferedReader(new FileReader(InputFilename));

} catch (IOException e) {

System.err.println(("File not open" + e.toString()));

System.exit(1);

}

return input_file;

}

public void run() {

try {

String Input;

BufferedReader TheInput =fileopen(InputFilename);

while ( (Input =TheInput.readLine()) != null) {

System.out.println(Input);

PipeIn.write(Input +"\n"); // Read from the file and then write to the pipe1

}

}

catch (FileNotFoundException e) {

System.out.println("NOFILE ");

}

catch (IOException e) {

System.out.println("NO I/O");

}

}

}

// read the lines from ReadLineThread and shift them. Send all the shiftedlines to SortLinesThread

class ShiftThread extends Thread {

PipedReader PipeOut;

PipedWriter PipeIn;

ShiftThread(PipedReader ReadFromPipe,PipedWriter WriteToPipe) {

PipeOut = ReadFromPipe;

PipeIn = WriteToPipe;

}

public void run() {

char[] cbuf = new char[80];

int i, j;

StringBuffer linebuff = newStringBuffer();

try {

// read from ReadLineThread

i = PipeOut.read(cbuf, 0,80);

while (i != -1) {

for (j = 0; j < i;j++) {

//if new line

if (cbuf[j]=='\n'){

// When reach theend of line,shift it

shiftline(linebuff.toString());

// empty thebuffer

linebuff.delete(0, linebuff.length());

}

else {

linebuff.append(cbuf[j]);

}

}

i = PipeOut.read(cbuf, 0,80); //get next buffer's worth

}

}

catch (FileNotFoundException e) {

System.out.println("NOFILE ");

}

catch (IOException e) {

System.out.println("NOI/O or end of stream (ShiftThread terminated)");

}

/* BECAUSE

* If a thread was providing datacharacters to the connected piped output,

* but the thread is no longeralive, then an IOException is thrown. (javadoc)

*/

}

private void shiftline( String line )

{

String onetoken = new String ();

StringTokenizer tokens =

new StringTokenizer( line );

ArrayList<String> Tokens = newArrayList<String> ();

int count = tokens.countTokens();

for ( int i = 0; i < count;i++)

{

onetoken =tokens.nextToken();

if (!((onetoken.compareTo("a" ) == 0) && (onetoken.compareTo( "an" ) == 0)&& (onetoken.compareTo( "and" ) == 0) &&(onetoken.compareTo( "the" ) == 0)))

{

Tokens.add(onetoken);

}

}

for ( int tokencount = 0;tokencount < count; tokencount++ )

{

StringBuffer linebuffer = newStringBuffer ();

int index = tokencount;

for ( int i = 0; i< count;i++ )

{

if (index >= count)

index = 0;

linebuffer.append (Tokens.get(index) );

linebuffer.append ("");

index++;

} //for i

line = linebuffer.toString();

// send the line to theSortLinesThread

try {

PipeIn.write(line+"\n");

} catch (IOException e) {

e.printStackTrace();

}

} // for token count

return;

}

}

class SortLinesThread extends Thread {

PipedReader PipeOut;

String OutputFilename;

ArrayList<String> KwicList = new ArrayList<String>();

SortLinesThread(PipedReaderReadFromPipe, String OutputFilename) {

PipeOut = ReadFromPipe;

this.OutputFilename =OutputFilename;

}

public void run() {

char[] cbuf = new char[80];

int i, j;

StringBuffer linebuff = newStringBuffer();

try {

// read from ShiftLineThread

i = PipeOut.read(cbuf, 0,80);

while (i != -1) { // I don'tknow we're using that (The method Read blocks until at least one character ofinput is available.)

for (j = 0; j < i;j++) {

//if new line

if (cbuf[j]=='\n'){

// add it to theArrayList

KwicList.add(linebuff.toString());

// adn empty thebuffer

linebuff.delete(0, linebuff.length());

}

else {

//append thecharacter to the line

linebuff.append(cbuf[j]);

}

}

i = PipeOut.read(cbuf, 0,80); //get next buffer's worth

}

}

catch (FileNotFoundException e) {

System.out.println("NOFILE ");

}

catch (IOException e) {

System.out.println("NOI/O or end of stream (SortLinesThread terminated)");

}

/* BECAUSE

* If a thread was providing datacharacters to the connected piped output,

* but the thread is no longeralive, then an IOException is thrown. (javadoc)

*/

// when the reading is finished,sort the ArrayList and diplay

Collections.sort(KwicList);//sortwhen added

displaylist ( KwicList);//Standard Output

//Export to file

try {

export(KwicList,OutputFilename);

} catch (Exception e) {

System.out.println("Error Output File ");

}

}

private void displaylist(ArrayList<String> KwicList )

{

System.out.println ("\nList: " );

for ( int count = 0; count <KwicList.size(); count++ )

System.out.println(KwicList.get (count) );

}

private voidexport(ArrayList<String> List, String oufFilename) throws Exception{

BufferedWriter writer = null;

try {

writer = newBufferedWriter(new FileWriter(oufFilename));

} catch (FileNotFoundException e){

System.err.println(("File not open" + e.toString()));

System.exit(1);

}

for (int count = 0; count <List.size(); count++) {

writer.write(List.get(count));

writer.write("\r\n");

}

writer.flush();

writer.close();

System.out.println("ProcessedFinished");

}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: