Java_io体系之PipedWriter、PipedReader简介、走进源码及示例——14-程序员宅基地

技术标签: java  

Java_io体系之PipedWriter、PipedReader简介、走进源码及示例——14


——管道字符输出流、必须建立在管道输入流之上、所以先介绍管道字符输出流。可以先看示例或者总结、总结写的有点Q、不喜可无视、有误的地方指出则不胜感激。


一:PipedWriter


1、类功能简介:


管道字符输出流、用于将当前线程的指定字符写入到与此线程对应的管道字符输入流中去、所以PipedReader(pr)、PipedWriter(pw)必须配套使用、缺一不可。管道字符输出流的本质就是调用pr中的方法将字符或者字符数组写入到pr中、这一点是与众不同的地方。所以pw中的方法很少也很简单、主要就是负责将传入的pr与本身绑定、配对使用、然后就是调用绑定的pr的写入方法、将字符或者字符数组写入到pr的缓存字符数组中。


2、PipedWriter API简介:


A:关键字

    private PipedReader sink;	与此PipedWriter绑定的PipedReader

    
    private boolean closed = false;		标示此流是否关闭。

B:构造方法

	PipedWriter(PipedReader snk)	根据传入的PipedReader构造pw、并将pr与此pw绑定
    
    PipedWriter()	创建一个pw、在使用之前必须与一个pr绑定



C:一般方法

	synchronized void connect(PipedReader snk)		将此pw与一个pr绑定
	
	void close()	关闭此流。
	
	synchronized void connect(PipedReader snk)		将此pw与一个pr绑定
	
	synchronized void flush()	flush此流、唤醒pr中所有等待的方法。
	
	void write(int c)	将一个整数写入到与此pw绑定的pr的缓存字符数组buf中去
	
	void write(char cbuf[], int off, int len)	将cbuf的一部分写入pr的buf中去


3、源码分析


package com.chy.io.original.code;

import java.io.IOException;

public class PipedWriter extends Writer {
	
	//与此PipedWriter绑定的PipedReader
    private PipedReader sink;

    //标示此流是否关闭。
    private boolean closed = false;

    /**
     * 根据传入的PipedReader构造pw、并将pr与此pw绑定
     */
    public PipedWriter(PipedReader snk)  throws IOException {
    	connect(snk);
    }
    
    /**
     * 创建一个pw、在使用之前必须与一个pr绑定
     */
    public PipedWriter() {
    }
    
    /**
     * 将此pw与一个pr绑定
     */
    public synchronized void connect(PipedReader snk) throws IOException {
        if (snk == null) {
            throw new NullPointerException();
        } else if (sink != null || snk.connected) {
		    throw new IOException("Already connected");
		} else if (snk.closedByReader || closed) {
	            throw new IOException("Pipe closed");
	    }
	        
		sink = snk;
		snk.in = -1;
		snk.out = 0;
        snk.connected = true;
    }

    /**
     * 将一个整数写入到与此pw绑定的pr的缓存字符数组buf中去
     */
    public void write(int c)  throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        }
        sink.receive(c);
    }

    /**
     * 将cbuf的一部分写入pr的buf中去
     */
    public void write(char cbuf[], int off, int len) throws IOException {
        if (sink == null) {
            throw new IOException("Pipe not connected");
        } else if ((off | len | (off + len) | (cbuf.length - (off + len))) < 0) {
        	throw new IndexOutOfBoundsException();
		}
		sink.receive(cbuf, off, len);
    }

    /**
     * flush此流、唤醒pr中所有等待的方法。
     */
    public synchronized void flush() throws IOException {
		if (sink != null) {
	            if (sink.closedByReader || closed) {
	                throw new IOException("Pipe closed");
	            }            
	            synchronized (sink) {
	                sink.notifyAll();
	            }
		}
    }

    /**
     * 关闭此流。
     */
    public void close()  throws IOException {
        closed = true;
		if (sink != null) {
		    sink.receivedLast();
		}
    }
}

4、实例演示:


因为PipedWriter必须与PipedReader结合使用、所以将两者的示例放在一起。

二:PipedReader


1、类功能简介:


管道字符输入流、用于读取对应绑定的管道字符输出流写入其内置字符缓存数组buffer中的字符、借此来实现线程之间的通信、pr中专门有两个方法供pw调用、receive(char c)、receive(char[] b, int off, intlen)、使得pw可以将字符或者字符数组写入pr的buffer中、

2、PipedReader API简介:


A:关键字

	boolean closedByWriter = false;		标记PipedWriter是否关闭
	
    boolean closedByReader = false;		标记PipedReader是否关闭
    
    boolean connected = false;			标记PipedWriter与标记PipedReader是否关闭的连接是否关闭

    Thread readSide; 	拥有PipedReader的线程
    
    Thread writeSide;	拥有PipedWriter的线程

    private static final int DEFAULT_PIPE_SIZE = 1024;		用于循环存放PipedWriter写入的字符数组的默认大小

    char buffer[];		用于循环存放PipedWriter写入的字符数组

    int in = -1;	buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。此为初始状态、即buf中没有字符

    int out = 0;	buf中下一个被读取的字符的下标


B:构造方法

	PipedReader(PipedWriter src)	使用默认的buf的大小和传入的pw构造pr
	
	PipedReader(PipedWriter src, int pipeSize)		使用指定的buf的大小和传入的pw构造pr
	
	PipedReader()		使用默认大小构造pr
	
	PipedReader(int pipeSize)		使用指定大小构造pr


C:一般方法

	void close()	清空buf中数据、关闭此流。
	
	void connect(PipedWriter src)	调用与此流绑定的pw的connect方法、将此流与对应的pw绑定
	
	synchronized boolean ready()	查看此流是否可读
	
	synchronized int read()		从buf中读取一个字符、以整数形式返回
	
	synchronized int read(char cbuf[], int off, int len)	将buf中读取一部分字符到cbuf中。
	
	synchronized void receive(int c)	pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。
	
	synchronized void receive(char c[], int off, int len)	将c中一部分字符写入到buf中。
	
	synchronized void receivedLast()	提醒所有等待的线程、已经接收到了最后一个字符。


3、源码分析


package com.chy.io.original.code;

import java.io.IOException;

public class PipedReader extends Reader {
    boolean closedByWriter = false;
    boolean closedByReader = false;
    boolean connected = false;

    Thread readSide;
    Thread writeSide;

   /** 
    * 用于循环存放PipedWriter写入的字符数组的默认大小
    */ 
    private static final int DEFAULT_PIPE_SIZE = 1024;

    /**
     * 用于循环存放PipedWriter写入的字符数组
     */
    char buffer[];

    /**
     * buf中下一个存放PipedWriter调用此PipedReader的receive(int c)时、c在buf中存放的位置的下标。
     * in为-1时、说明buf中没有可读取字符、in=out时已经存满了。
     */
    int in = -1;

    /**
     * buf中下一个被读取的字符的下标
     */
    int out = 0;

    /**
     * 使用默认的buf的大小和传入的pw构造pr
     */
    public PipedReader(PipedWriter src) throws IOException {
    	this(src, DEFAULT_PIPE_SIZE);
    }

    /**
     * 使用指定的buf的大小和传入的pw构造pr
     */
    public PipedReader(PipedWriter src, int pipeSize) throws IOException {
		initPipe(pipeSize);
		connect(src);
    }


    /**
     * 使用默认大小构造pr
     */
    public PipedReader() {
    	initPipe(DEFAULT_PIPE_SIZE);
    }

    /**
     * 使用指定大小构造pr
     */
    public PipedReader(int pipeSize) {
    	initPipe(pipeSize);
    }

    //初始化buf大小
    private void initPipe(int pipeSize) {
		if (pipeSize <= 0) {
		    throw new IllegalArgumentException("Pipe size <= 0");
		}
		buffer = new char[pipeSize];
    }

    /**
     * 调用与此流绑定的pw的connect方法、将此流与对应的pw绑定
     */
    public void connect(PipedWriter src) throws IOException {
    	src.connect(this);
    }
    
    /**
     * pw调用此流的此方法、向pr的buf以整数形式中写入一个字符。
     */
    synchronized void receive(int c) throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByWriter || closedByReader) {
        	throw new IOException("Pipe closed");
        } else if (readSide != null && !readSide.isAlive()) {
            throw new IOException("Read end dead");
        }

		writeSide = Thread.currentThread();
		while (in == out) {
		    if ((readSide != null) && !readSide.isAlive()) {
		    	throw new IOException("Pipe broken");
		    }
		    //buf中写入的被读取完、唤醒所有此对象监控的线程其他方法、如果一秒钟之后还是满值、则再次唤醒其他方法、直到buf中被读取。
		    notifyAll();	
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
		    	throw new java.io.InterruptedIOException();
		    }
		}
		//buf中存放第一个字符时、将字符在buf中存放位置的下标in初始化为0、读取的下标也初始化为0、准备接受写入的第一个字符。
		if (in < 0) {
		    in = 0;
		    out = 0;
		}
		buffer[in++] = (char) c;
		//如果buf中放满了、则再从头开始存放。
		if (in >= buffer.length) {
		    in = 0;
		}
    }

    /**
     * 将c中一部分字符写入到buf中。
     */
    synchronized void receive(char c[], int off, int len)  throws IOException {
		while (--len >= 0) {
		    receive(c[off++]);
		}
    }

    /**
     * 提醒所有等待的线程、已经接收到了最后一个字符、PipedWriter已关闭。用于PipedWriter的close()方法.
     */
    synchronized void receivedLast() {
		closedByWriter = true;
		notifyAll();
    }

    /**
     * 从buf中读取一个字符、以整数形式返回
     */
    public synchronized int read()  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
		    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        readSide = Thread.currentThread();
		int trials = 2;
		while (in < 0) {
		    if (closedByWriter) { 
			/* closed by writer, return EOF */
			return -1;
		    }
		    if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
			throw new IOException("Pipe broken");
		    }
	            /* might be a writer waiting */
		    notifyAll();
		    try {
		        wait(1000);
		    } catch (InterruptedException ex) {
			throw new java.io.InterruptedIOException();
		    }
	 	}
		int ret = buffer[out++];
		if (out >= buffer.length) {
		    out = 0;
		}
		if (in == out) {
	            /* now empty */
		    in = -1;		
		}
		return ret;
    }

    /**
     * 将buf中读取一部分字符到cbuf中。
     */
    public synchronized int read(char cbuf[], int off, int len)  throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
		    throw new IOException("Pipe closed");
		} else if (writeSide != null && !writeSide.isAlive() && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }

        if ((off < 0) || (off > cbuf.length) || (len < 0) ||
            ((off + len) > cbuf.length) || ((off + len) < 0)) {
		    throw new IndexOutOfBoundsException();
		} else if (len == 0) {
		    return 0;
		}

        /* possibly wait on the first character */
		int c = read();		
		if (c < 0) {
		    return -1;
		}
		cbuf[off] =  (char)c;
		int rlen = 1;
		while ((in >= 0) && (--len > 0)) {
		    cbuf[off + rlen] = buffer[out++];
		    rlen++;
		    //如果读取的下一个字符下标大于buffer的size、则重置out、从新开始从第一个开始读取。
		    if (out >= buffer.length) {
		    	out = 0;
		    }
		    //如果下一个写入字符的下标与下一个被读取的下标相同、则清空buf
		    if (in == out) {
	                /* now empty */
		    	in = -1;	
		    }
		}
		return rlen;
    }

    /**
     * 查看此流是否可读、看各个线程是否关闭、以及buffer中是否有可供读取的字符。
     */
    public synchronized boolean ready() throws IOException {
        if (!connected) {
            throw new IOException("Pipe not connected");
        } else if (closedByReader) {
	    throw new IOException("Pipe closed");
	} else if (writeSide != null && !writeSide.isAlive()
                   && !closedByWriter && (in < 0)) {
            throw new IOException("Write end dead");
        }
        if (in < 0) {
            return false;
        } else {
            return true;
        }
    }
 
    /**
     * 清空buf中数据、关闭此流。
     */
    public void close()  throws IOException {
		in = -1;
		closedByReader = true;
    }
}


4、实例演示:


用于发送字符的线程:CharSenderThread

package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedWriter;

@SuppressWarnings("all")
public class CharSenderThread implements Runnable {
	private PipedWriter pw = new PipedWriter();
	
	public PipedWriter getPipedWriter(){
		return pw;
	}
	@Override
	public void run() {
		//sendOneChar();
		//sendShortMessage();
		sendLongMessage();
	}

	private void sendOneChar(){
		try {
			pw.write("a".charAt(0));
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sendShortMessage() {
		try {
			pw.write("this is a short message from CharSenderThread !".toCharArray());
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void sendLongMessage(){
		try {
			char[] b = new char[1028];
			//生成一个长度为1028的字符数组、前1020个是1、后8个是2。
			for(int i=0; i<1020; i++){
				b[i] = 'a';
			}
			for (int i = 1020; i <1028; i++) {
				b[i] = 'b';
			}
			pw.write(b);
			pw.flush();
			pw.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
}

用于接收字符的线程: CharReceiveThread

package com.chy.io.original.thread;

import java.io.IOException;
import java.io.PipedReader;

@SuppressWarnings("all")
public class CharReceiverThread extends Thread {
	
	private PipedReader pr = new PipedReader();
	
	public PipedReader getPipedReader(){
		return pr;
	}
	@Override
	public void run() {
		//receiveOneChar();
		//receiveShortMessage();
		receiverLongMessage();
	}
	
	private void receiveOneChar(){
		try {
			int n = pr.read();
			System.out.println(n);
			pr.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void receiveShortMessage() {
		try {
			char[] b = new char[1024];
			int n = pr.read(b);
			System.out.println(new String(b, 0, n));
			pr.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	private void receiverLongMessage(){
		try {
			char[] b = new char[2048];
			int count = 0;
			while(true){
				count = pr.read(b); 
				for (int i = 0; i < count; i++) {
					System.out.print(b[i]);
				}
				if(count == -1)
					break;
			}
			pr.close();
			
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
}

启动类:PipedWriterAndPipedReaderTest

package com.chy.io.original.test;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;

import com.chy.io.original.thread.CharReceiverThread;
import com.chy.io.original.thread.CharSenderThread;

public class PipedWriterAndPipedReaderTest {
	public static void main(String[] args) throws IOException{
		CharSenderThread cst = new CharSenderThread();
		CharReceiverThread crt = new CharReceiverThread();
		PipedWriter pw = cst.getPipedWriter();
		PipedReader pr = crt.getPipedReader();
		
		pw.connect(pr);
		
		/**
		 * 想想为什么下面这样写会报Piped not connect异常 ?
		 */
		//new Thread(new CharSenderThread()).start();
		//new CharReceiverThread().start();
		
		new Thread(cst).start();
		crt.start();
	}
}

两个线程中分别有三个方法、可以对应的每次放开一对方法来测试、还有这里最后一个读取1028个字符的方法用了死循环来读取、可以试试当不用死循环来读取会有什么不一样的效果?初始化字符的时候要用char = 'a' 而不是cahr = "a"、可自己想原因。。。

总结:


PipedReader、PipedWriter两者的结合如鸳鸯一般、离开哪一方都不能继续存在、同时又如连理枝一般、PipedWriter先通过connect(PipedReader sink)来确定关系、并初始化PipedReader状态、告诉PipedReader只能属于这个PipedWriter、connect =true、当想赠与PipedReader字符时、就直接调用receive(char c) 、receive(char[] b, int off, int len)来将字符或者字符数组放入pr的存折buffer中。站在PipedReader角度上、看上哪个PipedWriter时就暗示pw、将主动权交给pw、调用pw的connect将自己给他去登记。当想要花(将字符读取到程序中)字符了就从buffer中拿、但是自己又没有本事挣字符、所以当buffer中没有字符时、自己就等着、并且跟pw讲没有字符了、pw就会向存折(buffer)中存字符、当然、pw不会一直不断往里存、当存折是空的时候也不会主动存、怕花冒、就等着pr要、要才存。过到最后两个只通过buffer来知道对方的存在与否、每次从buffer中存或者取字符时都会看看对方是否安康、若安好则继续生活、若一方不在、则另一方也不愿独存!


更多IO内容:java_io 体系之目录


版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/iteye_563/article/details/82552845

智能推荐

从O到1:YOLOV5训练自己的目标检测数据集,并使用C++部署,python部署,树莓派等等。_c++深度学习构建数据集实现目标检测-程序员宅基地

文章浏览阅读1.6k次,点赞20次,收藏24次。从O到1:YOLOV5训练自己的目标检测数据集。_c++深度学习构建数据集实现目标检测

【DL小结4】seq2seq与attention机制_seq2seq模型是以编码(encode)和解码(decode)为代表的架构方式,seq2seq模型-程序员宅基地

文章浏览阅读1.1k次。seq2seq概述seq2seq模型是以编码(Encode)和解码(Decode)为代表的架构方式,顾名思义是根据输入序列X来生成输出序列Y。encode意思是将输入序列转化成一个固定长度的向量(语义向量,context vector),decode意思是将语义向量解码成输出序列。编码阶段纯粹的RNN/LSTM/GRU解码阶段由上图可以发现Seq2seq中Decoder的公式和..._seq2seq模型是以编码(encode)和解码(decode)为代表的架构方式,seq2seq模型是根据

智能合约最佳实践 之 Solidity 编码规范_智能合约代码规范-程序员宅基地

文章浏览阅读1.5k次,点赞2次,收藏4次。每一门语言都有其相应的编码规范, Solidity 也一样, 下面官方推荐的规范及我的总结,供大家参考,希望可以帮助大家写出更好规范的智能合约。命名规范避免使用小写的l,大写的I,大写的O 应该避免在命名中单独出现,因为很容易产生混淆。合约、库、事件、枚举及结构体命名合约、库、事件及结构体命名应该使用单词首字母大写的方式,这个方式也称为:帕斯卡命名法或大驼峰式命名法..._智能合约代码规范

2021-06-21 VC++ 6.0安装_bv1yb411t7e8-程序员宅基地

文章浏览阅读1.1k次。2021-06-21 VC++ 6.0安装前言本次实验所用设备为微软surface pro4,操作系统为windows10。注意事项(1)前端编程注意HTML语言的灵活运用以及javascript语言的使用。(2)所要使用图片自己上网寻找替换即可。(3)由于涉及隐私问题,所以代码实现效果图没有全部放出来。(4)本文章主要用于大家学习参考,博客中代码按照步骤来即可执行使用,但请不要商用。。(5)本次安装参照B站视频完成,视频网址如下:https://www.bilibili.com/vi._bv1yb411t7e8

看这一篇就够了!-Ajax详解_ajax解析-程序员宅基地

文章浏览阅读10w+次,点赞1.5k次,收藏7.4k次。今天来聊一聊前后端交互的重要工具Ajax结合上次跟大家分享的前后端交互基础,如果还没有看过的童鞋,以下是传送门前后端交互详解AJAX- 到底什么是Ajax?ajax 全名 async javascript and XML是前后台交互的能⼒也就是我们`客户端给服务端发送消息的⼯具,以及接受响应的⼯具是⼀个 默认异步执⾏机制的功能AJAX分为同步(async = false..._ajax解析

浅谈“三层结构”原理与用意_三层 all-程序员宅基地

文章浏览阅读4.7k次。序在刚刚步入“多层结构”Web应用程序开发的时候,我阅读过几篇关于“asp.net三层结构开发”的文章。但其多半都是对PetShop3.0和Duwamish7的局部剖析或者是学习笔记。对“三层结构”通体分析的学术文章几乎没有。2005年2月11日,Bincess BBS彬月论坛开始试运行。不久之后,我写了一篇题目为《浅谈“三层结构”原理与用意》的文章。旧版文章以彬月论坛程序中的部分代码举例_三层 all

随便推点

编译器_cx51 编译原理-程序员宅基地

文章浏览阅读64次。编译器不是全智能的,有些错误不会立刻呈现1:编译错误,语法问题2:运行时出错,异常,崩溃,运行出错提示不在出错代码时,在之后。_cx51 编译原理

图解 SpringCloud 微服务架构,写的太好了!-程序员宅基地

文章浏览阅读158次。????这是一个或许对你有用的社群????一对一交流/面试小册/简历优化/求职解惑,欢迎加入「芋道快速开发平台」知识星球。下面是星球提供的部分资料:《项目实战(视频)》:从书中学,往事中“练”《互联网高频面试题》:面朝简历学习,春暖花开《架构 x 系统设计》:摧枯拉朽,掌控面试高频场景题《精进 Java 学习指南》:系统学习,互联网主流技术栈《必读 Java 源码专栏》:知其然,知其所以然????这是一个或许..._springboot spring cloud nacos 微服务架构图

keytool 错误: java.lang.Exception: 密钥库文件不存在: keystore_keytool 错误: java.lang.exception: 密钥库文件不存在: jone.ke-程序员宅基地

文章浏览阅读8.9k次,点赞3次,收藏6次。keytool 错误: java.lang.Exception: 密钥库文件不存在: keystore通过Android Studio编译器获取SHA1第一步、打开Android Studio的Terminal工具第二步、输入命令:keytool -v -list -keystore keystore文件路径,然后提示keytool 错误: java.lang.Exception:..._keytool 错误: java.lang.exception: 密钥库文件不存在: jone.keystore java.lang

使用conda命令出现:The environment is inconsistent, please check the package plan carefully解决办法-程序员宅基地

文章浏览阅读5.8k次,点赞2次,收藏8次。使用conda命令时出现下面报错只需要两条命令即可解决一.查询之前的更新版本,然后选择一个之前的版本(出现问题之前)conda list -r二.输入下面命令回滚到之前没出错的版本conda install --revision 版本数字例如输入conda install --revision 143..._the environment is inconsistent

[附源码]Python计算机毕业设计SSM健身俱乐部管理系统(程序+LW)_基于python的健身管理系统-程序员宅基地

文章浏览阅读507次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:SSM + mybatis + Maven + Vue 等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。2.IDE环境:IDEA,Eclipse,Myeclipse都可以。推荐IDEA;_基于python的健身管理系统

Github Desktop1:git的优点和github desktop的简单使用_githubdesktop-程序员宅基地

文章浏览阅读4.4k次,点赞4次,收藏15次。多人合作一个项目,一定要把代码放到一个地方,并随时同步自己本地的和公共的代码。我们也曾这么做,租用一台服务器,修改完本地的代码就复制到服务器上,而开始编写前把服务器上的复制到本地。然而这样做有以下缺点:1.麻烦。就算用ftp软件同步文件,选择文件并复制也很繁琐。2.缺乏更新信息的交流。对代码进行修改,却不说明这次修改了哪些部分,没有更新日志,这将造成开发者对项目的严重误读,而传统的更新日志方..._githubdesktop