优秀的编程知识分享平台

网站首页 > 技术文章 正文

你在哪里用过管道流呢?不常见的管道流究竟如何使用

nanyue 2024-08-01 22:50:02 技术文章 8 ℃

[啤酒]满怀忧思,不如先干再说!7年开发,专注Java从基础到架构的硬核干货分享!

???[微风]代码已上传【gitee】,因平台限制,仓库链接在评论区第一条!

小贴士:本文为系列文章,可在《简单的IO流》合集中查阅其他IO流相关文章

本文介绍Java的BIO最后一个内容【管道流】,对于管道流的应用在开发并不多见,它往往出现于高并发系统中的多线程之间数据传递

目前全网都在说百万级千万级的并发怎么设计,可项目真正遇到高并发的场景依我看来其实并不多见,往往是一些机构为了课程的卖点带起的节奏,一般的系统往往触摸不到这个瓶颈,建议3-5年的新手专注于夯实基础

既然是系列文章就要保障完整性,仍然将管道流相关内容发出,如果可以在工作或者面试或者吹牛[啤酒]的场景下恰巧可以帮助到你,那将是我的荣幸!

如果你使用到了管道流不妨分享出来

主要内容

  • 管道流介绍和特点
  • 代码实现 字节管道流和字符管道流
  • 演示管道流常见问题,如单线程操作管道流,Read end dead等问题并解决
  • 通过源码分析管道流如何发送和接收数据
  • 以分析数据为例,介绍管道流的开发应用场景

可根据个人知识储备,阅读不同部分,好了正文开始

管道流介绍

管道流用来在多个线程之间进行信息传递,包括:

  • 字节管道输入流PipedInputStream和字节管道输出流PipedOutputStream
  • 字符管道输入流PipedReader和字符管道输出流PipedWriter
  • 输出流又称为发送者,输入流称为接收者

特点

  • 管道流仅用于多个线程之间传递信息,若用在同一个线程中可能会造成阻塞
  • 管道流的输入输出是成对的,一个输出流只能对应一个输入流,使用构造函数或者connect函数进行连接
  • 一对管道流包含一个缓冲区,其默认值为1024个字节,若要改变缓冲区大小,可以使用带有参数的构造函数
  • 管道的读写操作是互相阻塞的,当缓冲区为空时,读操作阻塞;当缓冲区满时,写操作阻塞
  • 管道依附于线程,因此若线程结束,则虽然管道流对象还在,仍然会报错Read end deadWrite end dead,即读取数据或发送数据的线程停止
  • 管道流的读取方法与普通流不同,只有输出流正确close时,输出流才能读到-1值,所以不能通过是否读到数据来关闭流

字节管道流

此案例通过字节方式将数据从PipedOutputStream传输到PipedInputStream,需要开启两个线程,也可以认为是将数据从发送者线程,传输给接收者线程,是另一种线程通信

实现流程:

  • 创建一个输出流和一个输入流,输出流负责写出数据,也就是发送数据,输入流负责接收数据
  • 通过输出流的connect方法传入输入流,建立数据传输通道
  • 开启两个线程,一个线程负责发送数据,另一个线程负责接收数据

代码实现:

public class BytePipelineTest {
    public static void main(String[] args) throws IOException {
        // 1、分别创建输出和输入流
        PipedOutputStream out = new PipedOutputStream();
        PipedInputStream input = new PipedInputStream();

        // 2、连接两个流
        out.connect(input);
        // 3、写数据【需要开启线程】
        new Thread(() -> {
            // 写出数据
            try {
                out.write("管道流发送数据".getBytes());
                // 刷新流
                out.flush();
            } catch (IOException e) {
                System.out.println("发送数据异常");
            }finally {
                // 关闭流
                try {
                    out.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        },"发送者").start();
        // 4、读数据【需要开启线程】
        new Thread(() -> {
            // 读取数据
            try {
                int read;
                byte[] buff = new byte[1024];
                while ((read = input.read(buff)) != -1) {
                    // 输出数据
                    System.out.println(new String(buff,0,read));
                }
            }catch (IOException e) {
                System.out.println("读取数据异常");
            }finally {
                // 关闭流
                try {
                    input.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        },"接收者").start();
    }
}

字符管道流

实现方式和字节管道流基本一致,使用字符流对象即可

public class CharPipelineTest {
    public static void main(String[] args) throws IOException {

        // 1、分别创建输出和输入流
        PipedWriter out = new PipedWriter();
        PipedReader input = new PipedReader();

        // 2、连接两个流
        out.connect(input);
        // 3、写数据【需要开启线程】
        new Thread(() -> {
            try {
                out.write("我是管道流发送的数据");
                out.flush();
            } catch (IOException e) {
                System.out.println("数据发送异常");
            }finally {
                try {
                    out.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        },"发送者").start();
        // 4、读数据【需要开启线程】
        new Thread(() -> {
            // 字符流可以直接读取一行数据,使用BufferedReader,将输入管道流传入构造方法
            BufferedReader reader = new BufferedReader(input);
            try {
                String line = reader.readLine();
                System.out.println(line);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }finally {
                try {
                    // 关闭高级流即可同时关闭低级流
                    reader.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        },"接收者").start();
    }
}

通过管道流传输文件数据

磁盘上有一个图片文件,通过管道流从A线程传输到B线程内,其实还是通过管道流完成一个文件的拷贝,实现流程如下:

  • 发送数据线程负责从磁盘中读取数据【字节输入流】,再将读取的数据通过管道流发送给接收数据线程【字节管道输出流】
  • 接收数据线程负责接收管道流中的数据【字节管道输入流】,再将数据写入到磁盘中【字节输出流】

代码实现:

public class PipelineFileTest {
    public static void main(String[] args) throws IOException {
        // 创建输出流和输入流,读取图片,创建字节流
        PipedOutputStream writer = new PipedOutputStream();
        PipedInputStream reader = new PipedInputStream();
        // 关联
        writer.connect(reader);
        // 发送数据
        new Thread(() -> {
            FileInputStream fileInputStream = null;
            try {
                // 读取文件
                fileInputStream = new FileInputStream("D:\\Pictures\\JavaBIO结构.png");
                byte[] buff = new byte[1024];
                int read;
                System.out.println("开始发送数据......");
                while ((read = fileInputStream.read(buff)) != -1) {
                    // 管道流写出数据
                    writer.write(buff,0,read);
                }
                System.out.println("======》数据发送完成");
            } catch (FileNotFoundException e) {
                System.out.println("文件不存在");
            } catch (IOException e) {
                System.out.println("读取数据异常");
            }finally {
                // 关闭流
                try {
                    if(fileInputStream != null) {
                        fileInputStream.close();
                    }
                    writer.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

        },"发送者").start();


        // 接收数据
        new Thread(() -> {
            FileOutputStream fileOutputStream = null;
            try {
                // 读取文件
                fileOutputStream = new FileOutputStream("D:\\Pictures\\JavaBIO结构-copy.png");
                int read;
                byte[] buff = new byte[1024];
                System.out.println("开始接收数据......");
                while ((read = reader.read(buff)) != -1) {
                    fileOutputStream.write(buff,0,read);
                    fileOutputStream.flush();
                }
                System.out.println("======》数据接收完成");
            } catch (FileNotFoundException e) {
                System.out.println("文件不存在");
            } catch (IOException e) {
                System.out.println("读取数据异常");
            }finally {
                // 关闭流
                try {
                    if(fileOutputStream != null) {
                        fileOutputStream.close();
                    }
                    reader.close();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        },"接收者").start();
    }
}

单线程内实现管道流

单线程内实现管道流会发生阻塞,即程序挂起,不会继续向下执行

public class PipelineStreamSingleThreadTest {
    public static void main(String[] args) throws IOException {
        // 创建输出流和输入流,读取图片,创建字节流
        PipedWriter writer = new PipedWriter();
        PipedReader reader = new PipedReader();
        // 关联
        writer.connect(reader);
        // 发送消息
        writer.write("单线程发送消息");
        // 接收消息
        char[] buff = new char[1024];
        int read;
        while((read = reader.read(buff)) != -1) {
            System.out.println(new String(buff,0,read));
        }
        // 关闭流
        reader.close();
        writer.close();
    }
}

此处已经发生了阻塞,有的地方说是死锁,其实并不是死锁,将发送数据和接收数据放到不同的线程内即可解决,这也是管道流的应用场景,将数据从A线程传递到B线程

PipedOutputStream源码分析

下图为PipedOutputStream继承结构,继承自OutputStream

包含PipedInputStream实例sink,在构造函数中可以与传入管道输入流进行连接

构造方法:

// 无参构造,单独创建一个管道输出流
PipedOutputStream() {}
// 接收管道输入流,创建对象并创建连接
public PipedOutputStream(PipedInputStream snk)  throws IOException {
    connect(snk);
}

connect方法是将传入管道输入流传给参数sink,并且初始化一些参数和状态。由于管道输入输出流是一一对应的,在进行连接前,connect方法会进行判断,若双方任何一个已有连接则抛出异常,提示已存在连接。

写出数据:

PipedOutputStream是生产者,将数据写到【管道】中,由对应的PipedInputStream来读取,不过缓冲区在PipedInputStream之中,上面connect时初始化的也是对应PipedInputStream中的参数,PipedOutputStream实例在写入时,调用的是对应的消费者来receive【接收】数据。

// 单字节写出
public void write(int b)  throws IOException {
    if (sink == null) {
        throw new IOException("Pipe not connected");
    }
    // 调用对应的【PipedInputStream】接收数据
    sink.receive(b);
}

// 写出字节数组
public void write(byte b[], int off, int len) throws IOException {
    if (sink == null) {
        throw new IOException("Pipe not connected");
    } else if (b == null) {
        throw new NullPointerException();
    } else if ((off < 0) || (off > b.length) || (len < 0) ||
               ((off + len) > b.length) || ((off + len) < 0)) {
        throw new IndexOutOfBoundsException();
    } else if (len == 0) {
        return;
    }
    // 调用对应的【PipedInputStream】接收数据
    sink.receive(b, off, len);
}

刷新方法:

// 同步方法,多线程下只会有一个线程消费数据,唤醒所有的【PipedInputStream】
public synchronized void flush() throws IOException {
    if (sink != null) {
        synchronized (sink) {
            sink.notifyAll();
        }
    }
}

关闭流:

close()方法也只是调用对应【PipedInputStream】 的receivedLast方法来实现

public void close()  throws IOException {
    if (sink != null) {
        sink.receivedLast();
    }
}

PipedInputStream中,该方法将closeByWriter置为true并且唤醒所有等待线程,将所有数据写入PipedInputStream的缓冲区

synchronized void receivedLast() {
    closedByWriter = true;
    notifyAll();
}

PipedOutputStream实现相对简单,发送数据,并唤醒对应的【PipedInputStream】接收数据,复杂的是【PipedInputStream】的实现

PipedInputStream源码分析

从PipedOutputStream的源码可以看出来,PipedOutputStream做的只是连接对应的PipedInputStream 实例并在写入时调用对应的receive方法。

java.io.PipedInputStream继承了基类InputStream,其主要参数包含以下几个部分:

  • 负责连接与colse的参数
  • 读取线程与写入线程
  • 管道,即缓冲区相关参数

继承结构:

主要参数:

// 负责连接与close的参数
boolean closedByWriter = false;
volatile boolean closedByReader = false;
boolean connected = false;
// 读取线程与写入线程
Thread readSide;
Thread writeSide;

// 管道,即缓冲区相关参数,默认缓冲区大小为1024
private static final int DEFAULT_PIPE_SIZE = 1024;
protected static final int PIPE_SIZE = DEFAULT_PIPE_SIZE;
// 管道:数据缓冲区
protected byte buffer[];
// 下一个【写入】的位置
protected int in = -1;
// 下一个【读取】的位置
protected int out = 0;

构造方法主要实现

  • 为缓冲区分配空间
  • 连接对应的PipedOutputStream

包含四个构造方法,主要区别在于

  • 使用默认缓冲区大小,还是使用自定义大小;
  • 当传入PipedOutputStream参数则进行连接,反之则暂时不进行连接
// 创建连接,使用默认缓冲区大小
public PipedInputStream(PipedOutputStream src) throws IOException {
    this(src, DEFAULT_PIPE_SIZE);
}
// 使用自定义缓冲区大小,并创建连接
public PipedInputStream(PipedOutputStream src, int pipeSize)
    throws IOException {
    initPipe(pipeSize);
    connect(src);
}
// 仅使用默认缓冲区大小创建【PipedInputStream】对象
public PipedInputStream() {
    initPipe(DEFAULT_PIPE_SIZE);
}

// 使用一个自定义缓冲区创建【PipedInputStream】对象,不创建连接
public PipedInputStream(int pipeSize) {
    initPipe(pipeSize);
}

// 为【管道】按照大小分配空间,也就是为buff字节数组变量赋值
private void initPipe(int pipeSize) {
    if (pipeSize <= 0) {
        throw new IllegalArgumentException("Pipe Size <= 0");
    }
    buffer = new byte[pipeSize];
}
// PipedInputStream.connect()调用传入PipedOutputStream的connect方法建立连接
// 所以也有的地方说connect方法存在于【PipedOutputStream】中也是对的
public void connect(PipedOutputStream src) throws IOException {
    src.connect(this);
}

以上都容易理解,复杂的在于receiveread方法中

receive方法

PipedInputStream的receive方法,在功能上是实现了【写入】功能,将传入的数据写入到【管道】之中。下方是receive两个重载方法源码,首先涉及两个方法checkStateForReceiveawaitSpace

checkStateForReceive方法

确认这对管道流可用:

  • 写入者和读取者是否已连接
  • 是否关闭
  • 读取线程是否有效
// 判断各种值是否为null,验证连接是否可用
private void checkStateForReceive() throws IOException {
    if (!connected) {
        throw new IOException("Pipe not connected");
    } else if (closedByWriter || closedByReader) {
        throw new IOException("Pipe closed");
    } else if (readSide != null && !readSide.isAlive()) {
        throw new IOException("Read end dead");
    }
}

awaitApace方法

缓冲区已满的时候,阻塞数据写入。由于这个缓冲区使用时可以看做一个循环队列,缓冲区已满判断条件是in==out,而判断缓冲区为空的条件是in=-1(read的时候缓冲区为空会将in置为-1)

private void awaitSpace() throws IOException {
    while (in == out) {
        checkStateForReceive();
        notifyAll();
        try {
            wait(1000);
        } catch (InterruptedException ex) {
            throw new java.io.InterruptedIOException();
        }
    }
}

receive(int b)方法

该方法比较简单,判断管道流可用以及缓冲区未满之后写入,只是在写入到了缓冲区队尾(且缓冲区未满)的时候会跳到队头继续写入

protected synchronized void receive(int b) throws IOException {
    checkStateForReceive();
    writeSide = Thread.currentThread();
    // 判断是否可用
    if (in == out)
        awaitSpace();
    // 判断是否小于0,默认是-1
    if (in < 0) {
        in = 0;
        out = 0;
    }
    // 追加数据到缓冲区
    buffer[in++] = (byte)(b & 0xFF);
    // 如果缓冲区存满,则重头写入
    if (in >= buffer.length) {
        in = 0;
    }
}

receive(byte b[], int off, int len)方法

该方法实现比较复杂,有三类情况【其实是4小种情况】:

  • 缓冲区为空:in=-1,out=0,初始时以及在读取的时候发现缓冲区为空会将in置为-1
  • 缓冲区有数据但是未满:in<out或者out<in
  • 缓冲区已满:in==out

该方法中bytesToTransfer变量保留还有多少字节未写入,nextTransferAmount变量保存下一个可写入空间的大小,写入后,若bytesToTransfer仍大于0【即还有数据未写入】,则继续循环,判断缓冲区情况,尝试寻找下一个可写入空间直至全部写入

synchronized void receive(byte b[], int off, int len)  throws IOException {
    checkStateForReceive();
    writeSide = Thread.currentThread();
    int bytesToTransfer = len;
    while (bytesToTransfer > 0) {
        // 缓冲区已满:阻塞
        if (in == out)
            awaitSpace();
        int nextTransferAmount = 0;
        // 判断下一个可写入的连续空间的大小
        // 当out<in,下一个可写入的空间是in到队尾
        if (out < in) {
            nextTransferAmount = buffer.length - in;
        } else if (in < out) {
            // 当缓冲区为空,下一个可写入的空间是0到队尾
            if (in == -1) {
                in = out = 0;
                nextTransferAmount = buffer.length - in;
                // 当in<out,下一个可写入的空间是in-->out的空间
            } else {
                nextTransferAmount = out - in;
            }
        }
        // 如果空间足够写入则写入全部
        if (nextTransferAmount > bytesToTransfer)
            nextTransferAmount = bytesToTransfer;
        assert(nextTransferAmount > 0);
        System.arraycopy(b, off, buffer, in, nextTransferAmount);
        // 如果空间不足够写入,则减去已写入的部分,进入下一个循环找下一个可写入的空间
        bytesToTransfer -= nextTransferAmount;
        off += nextTransferAmount;
        in += nextTransferAmount;
        if (in >= buffer.length) {
            in = 0;
        }
    }
}

read()方法

read()方法在判断了是否有关联PipedOutputStream,读取流是否关闭和写入流是否完全关闭之后开始尝试读取数据,有以下情况:

  • 缓冲区为空,则先尝试唤醒全部等待线程并等待,等待对应的写入线程是否有未完成的写入。若有则等待写入后读取,若无,则尝试2次之后抛出异常并退出。
  • 缓冲区不为空,则直接读取数据,更新参数
public synchronized int read()  throws IOException {
    // 判断是否有关联PipedOutputStream,读取流是否关闭和写入流是否完全关闭
    if (!connected) {
        throw new IOException("Pipe not connected");
    } else if (closedByReader) {
        throw new IOException("Pipe closed");
    } else if (writeSide != null && !writeSide.isAlive()
               && !closedByWriter && (in < 0)) {
        throw new IOException("Write end dead");
    }
    readSide = Thread.currentThread();
    int trials = 2;
    // 如果缓冲区为空
    while (in < 0) {
        // 缓冲区为空,并且写入流已经关闭则结束并返回-1
        if (closedByWriter) {
            return -1;
        }
        // 如果写入线程不再活动,并且已经尝试等待2次后仍无数据则抛出异常
        if ((writeSide != null) && (!writeSide.isAlive()) && (--trials < 0)) {
            throw new IOException("Pipe broken");
        }
        // 可能仍有写入线程在等待的,read方法尝试唤醒全部线程并等待,尝试2次后退出并抛出异常
        notifyAll();
        try {
            wait(1000);
        } catch (InterruptedException ex) {
            throw new java.io.InterruptedIOException();
        }
    }
    // 若缓冲区不为空,读取数据,并更新in和out
    int ret = buffer[out++] & 0xFF;
    if (out >= buffer.length) {
        out = 0;
    }
    if (in == out) {
        // 置空
        in = -1;
    }
    return ret;
}

read(byte b[], int off, int len)方法

跟receive的思路相同,每次都是获取可读取的下一个连续空间的字节数来尝试读取,该方法会尝试读取足够多的字节,如果缓冲区的字节数<len会全部读取并返回实际读取的字节数。不过在读取第一个字节的时候调用的read()方法来进行一系列的判断及操作,譬如说缓冲区为空时等待并唤醒可能存在的写入线程来写入后再读取

public synchronized int read(byte b[], int off, int len)  throws IOException {
    // 判断传入参数的合理性
    if (b == null) {
        throw new NullPointerException();
    } else if (off < 0 || len < 0 || len > b.length - off) {
        throw new IndexOutOfBoundsException();
    } else if (len == 0) {
        return 0;
    }

    // 尝试调用read()读取第一个字节
    int c = read();
    if (c < 0) {
        return -1;
    }
    b[off] = (byte) c;
    int rlen = 1;
    // 当缓冲区不为空,并且读取的字节数仍不够时则继续读取
    while ((in >= 0) && (len > 1)) {
        // 获取下一次可读取连续空间的字节数
        int available;

        if (in > out) {
            available = Math.min((buffer.length - out), (in - out));
        } else {
            available = buffer.length - out;
        }

        // 如果这次可读连续空间的字节数已经够了,则只读取len-1个字节
        if (available > (len - 1)) {
            available = len - 1;
        }
        // 读取数据,并更新参数
        System.arraycopy(buffer, out, b, off + rlen, available);
        out += available;
        rlen += available;
        len -= available;

        if (out >= buffer.length) {
            out = 0;
        }
        if (in == out) {
            in = -1;
        }
    }
    return rlen;
}

应用场景

试想一下,基于以上的管道流的特点,你认为管道流一般应用在什么地方呢?

不妨推理一下:管道流使用于多线程之间基于内存的数据传递,多线程一般应用于高并发场景,基本上管道流也是应用于高并发或多任务的场景中,分线程处理任务时考虑使用管道流,数据从一个线程传递到另一个线程,不同的线程对数据的处理方式不同,可以多线程一起执行处理数据,这样不就更快了吗!

数据分析案例

数据库的用户表中有一批数据,需要对数据进行分析,如计算各个地区的男女用户分别是多少,考虑使用数据库分组性能较低,选择在程序中处理

此时可以使用2个线程处理

  • A线程负责从数据库读取数据,将数据发送给B线程分析
  • B线程负责接收A线程数据,分析数据后入库或者生成文件上传

此处会出现2个问题,先演示一下问题,再贴出参考代码

问题1:Read end dead

线程1发送数据,外循环5次,内循环20次,模拟每次读取20条数据【分页查询】,读取5次将数据读完

线程2接收数据,并转换为集合,输出,注意此处还没有做数据分析,因为代码是存在问题的,先说问题

public class PipelineDataAnalysisDemo {

    public static void main(String[] args) throws IOException {

        PipedOutputStream writer = new PipedOutputStream();
        // 创建对象并关联
        PipedInputStream reader = new PipedInputStream(writer);

        // 读取数据库数据,并发送
        new Thread(() -> {
            // 地区
            String[] location = {"北京","上海","河南","广东","浙江","黑龙江","青海"};
            String[] sex = {"男","女"};
            Random random = new Random();
            List<SysUser> userList = new ArrayList<>();
            // 模拟数据,循环生成100个用户,每次20个【类似分页查询】,循环5次
            for (int i = 0; i < 5; i++) {
                for (int j = 0; j < 20; j++) {
                    userList.add(new SysUser(System.currentTimeMillis(),"用户:" + i + j,location[random.nextInt(7)],sex[random.nextInt(2)]));
                }
                // 每循环一轮,发送数据到B线程
                try {
                    ObjectOutputStream oos = new ObjectOutputStream(writer);
                    oos.writeObject(userList);
                    oos.flush();
                    System.out.println("发送数据");
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        },"发送数据").start();
        // 接收数据,并分析

        new Thread(() -> {
            try {
                ObjectInputStream ois = new ObjectInputStream(reader);
                // 读取数据
                List<SysUser> userList = (List<SysUser>) ois.readObject();
                System.out.println(userList);
            } catch (IOException e) {
                throw new RuntimeException(e);
            } catch (ClassNotFoundException e) {
                throw new RuntimeException(e);
            }
        },"接收数据").start();
    }
}

目前的代码来说,其实线程2启动之后读一次数据线程就停止了,所以会出现Read end dead,即读取数据的线程已经停止,可参考文章开头的注意中的第五条

问题2:Write end dead

此时将写数据线程内代码使用while死循环包裹,可解决read end dead问题,保障接收数据线程不死

new Thread(() -> {
    // 不停地读取
    while (true) {
        try {
            ObjectInputStream ois = new ObjectInputStream(reader);
            // 读取数据
            List<SysUser> userList = (List<SysUser>) ois.readObject();
            System.out.println(userList);
        } catch (IOException e) {
            throw new RuntimeException(e);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }
},"接收数据").start();

但是发送数据5次之后又会出现Write end dead,即发送数据完成后发送数据线程停止,但是不能将发送数据线程也设置为while死循环,否则将不停的发送数据,根据实际情况来说,并不知道数据库到底有几条数据,所以不能根据次数判断是否停止线程,可以使用线程池解决该问题

public class PipelineDataAnalysisDemo {
    public static void main(String[] args) throws IOException {
        PipedOutputStream writer = new PipedOutputStream();
        // 创建对象并关联
        PipedInputStream reader = new PipedInputStream(writer);
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        // 读取数据库数据,并发送
        executorService.execute(() -> {
            // 地区
            String[] location = {"北京","上海","河南","广东","浙江","黑龙江","青海"};
            String[] sex = {"男","女"};
            Random random = new Random();
            List<SysUser> userList = new ArrayList<>();
            // 模拟数据,循环生成100个用户,每次20个【类似分页查询】,循环5次
            for (int i = 0; i < 5; i++) {
                try {
                    for (int j = 0; j < 20; j++) {
                        userList.add(new SysUser(System.currentTimeMillis(),
                                                 "用户:" + i + j,location[random.nextInt(7)],sex[random.nextInt(2)]));
                    }
                    // 每循环一轮,发送数据到B线程
                    ObjectOutputStream oos = new ObjectOutputStream(writer);
                    oos.writeObject(userList);
                    // 每次发送之后,清空数组
                    userList.clear();
                    oos.flush();
                    } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        // 接收数据
        executorService.execute(() -> {
            Map<String, Integer> countMap = new HashMap<>();
            while (true) {
                try {
                    ObjectInputStream ois = new ObjectInputStream(reader);
                    // 读取数据
                    List<SysUser> userList = (List<SysUser>) ois.readObject();
                    System.out.println(userList.size());
                    // 数据分析,计算各地区男女用户
                    Map<String, Map<String, List<SysUser>>> collect = userList.stream()
                        .collect(Collectors.groupingBy(SysUser::getLocation, Collectors.groupingBy(SysUser::getSex)));
                    // key为地区
                    for (String locationKey : collect.keySet()) {
                        Map<String, List<SysUser>> map = collect.get(locationKey);
                        System.out.println(map);
                        // 为性别
                        for (String sexKey : map.keySet()) {
                          // 获取对应性别的用户数
                            int size = map.get(sexKey).size();
                            String key = locationKey + ":" + sexKey;
                          // 因为汉字字符串不能通过containsKey判断是否存在,所以需要通过get方法获取
                            if(countMap.get(key) != null) {
                                Integer count = countMap.get(key);
                              // 将原值删除,再添加新值
                                countMap.remove(key);
                                countMap.put(key,count + size);
                            }else {
                                countMap.put(key,size);
                            }
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                } catch (ClassNotFoundException e) {
                    throw new RuntimeException(e);
                }
                for (String location : countMap.keySet()) {
                    System.out.println(location + "===>" + countMap.get(location) + "名用户");
                }
            }
        });
    }
}

总结

以上是对管道流的介绍,一般项目中很少使用管道流,如果你的系统并发较高,并且处理数据量很大,可以使用多线程分批处理数据,此时使用管道流是一个不错的选择



Tags:

最近发表
标签列表