优秀的编程知识分享平台

网站首页 > 技术文章 正文

Java,IO模型,Netty,事件驱动,阻塞和非阻塞通信

nanyue 2024-08-01 22:50:16 技术文章 8 ℃

Java支持的IO模型:BIO、NIO、AIO

BIO : 同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,客户端只要有连接请求服务器端就启动一个线程进行处理,不管这个连接做不做事情,都需要有线程开销。

NIO : 同步非阻塞,服务器实现模式为一个线程处理多个连接(请求),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求才进行处理。

AIO(NIO.2):异步非阻塞,AIO 引入异步通道的概念,采用Proactor模式,简化了程序编写,有效的请求才启动线程,特点是先由操作系统完成后才通知服务端程序启动线程做处理,适用于连接数较多且连接时间较长的应用。

适用场景:

BIO,适用于连接数小、架构固定,对服务器资源要求较高,并发局限于应用,JDK1.4以前的唯一选择,程序简单易理解。

NIO,适用于连接数多,连接较短(轻操作)的架构,如:聊天服务器,弹幕系统,服务器间通讯等,编程较复杂,JDK1.4开始支持。

AIO,适用于连接数多,连接比较长(重操作)的架构,如:相册服务器,充分调用OS参与并发操作,编程较复杂,JDK7开始支持。

BIO和NIO的区别(下图):

Netty网络编程框架

Netty,Java开源框架,JBOSS提供,Github独立项目,Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

Netty并发高:

Netty基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),并发性能得到了很大提高。

事件驱动模型,解释,如:鼠标的一个点击,移动,键盘的按键按下等等操作,都是对应操作系统的一个事件,然后应用程序接受该操作做出相应的处理。

网络通信流程

零拷贝及内存映射:

Java,NIO,零拷贝是transferTo方法的实现Java,NIO,MappedByteBuffer,内存映射

代码案例

BIO Server代码:

package com.what21.boot.netty.demo01;

import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.charset.Charset;

/**
 * 阻塞I/O
 */
public class JavaBIOServer {

    public void serve(int port) throws IOException {
        final ServerSocket socket = new ServerSocket(port);
        try {
            for (;;) {
                final Socket clientSocket = socket.accept();
                System.out.println("Accepted connection from " + clientSocket);

                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));                            //4
                            out.flush();
                            clientSocket.close();

                        } catch (IOException e) {
                            e.printStackTrace();
                            try {
                                clientSocket.close();
                            } catch (IOException ex) {
                                // ignore on close
                            }
                        }
                    }
                }).start();                                        //6
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

NIO Server代码:

package com.what21.boot.netty.demo01;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * 非阻塞IO
 */
public class JavaNIOServer {

    public void serve(int port) throws IOException {
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        ServerSocket ss = serverChannel.socket();
        InetSocketAddress address = new InetSocketAddress(port);
        ss.bind(address);
        Selector selector = Selector.open();
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
        for (; ; ) {
            try {
                selector.select();
            } catch (IOException ex) {
                ex.printStackTrace();
                // handle exception
                break;
            }
            Set<SelectionKey> readyKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = readyKeys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                try {
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel client = server.accept();
                        client.configureBlocking(false);
                        client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());    //7
                        System.out.println("Accepted connection from " + client);
                    }
                    if (key.isWritable()) {
                        SocketChannel client =
                                (SocketChannel) key.channel();
                        ByteBuffer buffer =
                                (ByteBuffer) key.attachment();
                        while (buffer.hasRemaining()) {
                            if (client.write(buffer) == 0) {
                                break;
                            }
                        }
                        client.close();
                    }
                } catch (IOException ex) {
                    key.cancel();
                    try {
                        key.channel().close();
                    } catch (IOException cex) {
                        // 在关闭时忽略
                    }
                }
            }
        }
    }
}

Netty,Server代码:

package com.what21.boot.netty.demo01;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.oio.OioServerSocketChannel;

import java.net.InetSocketAddress;

import java.nio.charset.Charset;

public class NettyOioServer {

    public void server(int port) throws Exception {
        final ByteBuf buf = Unpooled.unreleasableBuffer(
                Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
        EventLoopGroup group = new OioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(group)
                    .channel(OioServerSocketChannel.class)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {//3
                        @Override
                        public void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {            //4
                                @Override
                                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                    ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
                                }
                            });
                        }
                    });
            ChannelFuture f = b.bind().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

Tags:

最近发表
标签列表