Java 网络 I/O 模型详解 (Reactor & Proactor)

网络 I/O 模型是计算机网络编程中用于描述网络通信过程的一种抽象概念,它定义了在进行网络数据传输时,Socket 处理线程之间的交互方式,不同的网络 I/O 模型适用于不同的应用场景和需求。

1 基础 I/O 模型

基础 I/O 模型从 JDK 1.0 开始就得到支持。该模型中有一个 Acceptor 线程不断阻塞地监听连接事件,一旦 Acceptor 收到客户端的连接请求,就会为该客户端创建一个独立的服务线程以进行通信,每个服务线程需要独自管理自己的 Socket。通常情况下,该线程会按顺序执行读取、计算和回复等一系列步骤。然而,这种模型消耗了大量的系统资源:每当有新的连接出现时,都必须新建一个线程来处理请求,而每个线程在完成任务后都必须被销毁。这种频繁的线程创建与销毁将给系统带来沉重的负担,严重降低系统的吞吐量。其架构图如下:

基础 I/O 模型

为了缓解这个问题,我们可以在该架构的基础上引入线程池:

基础 I/O 模型 + 线程池

服务端收到客户端连接请求后,将任务交给线程池,由线程池分配工作线程执行并在完成后归还,有效避免了频繁创建与销毁线程,提升了系统吞吐量。但由于基础 I/O 模型需要每个线程独自处理自己的 Socket,因而在请求量过大、线程数过高的情况下,线程池频繁切换线程很容易导致 load 升高,当面对十万级甚至百万级连接请求时,基础 I/O 模型就无能为力了。

为了解决这个问题,设计者们利用了“批量处理”的思想:将多个线程处理的 Socket 统一交给一个“管理线程”进行处理,这便是接下来要介绍的多路复用 I/O 模型。

2 多路复用 I/O 模型

多路复用 I/O 是从 JDK 1.4 开始支持的,其核心架构如下:

多路复用 I/O 模型

它引入了诸如 Channel、Selector 和 Buffer 等概念,这些概念有效地封装了操作系统层面的 I/O 接口。以 Linux 平台为例,JDK 1.4 使用了 POSIX 标准的 select API(此时 Linux 还没引入 epoll),通过 Selector 对象来集中监控多个 I/O 事件,从而避免了每个线程独立处理 Socket 所带来的问题。此外,自 JDK 1.5 起,还添加了对 epoll 的支持,进一步提升了在 Linux 系统上的 I/O 性能。

2.1 Reactor 多路复用模型

Reactor 即“反应器”,是应用最广泛的一种 I/O 多路复用技术。当需要等待 I/O 操作时,首先释放资源。一旦等待完成,便通过事件驱动的方式继续进行后续工作。以下是 Reactor 模型中的五个重要角色:

  • Handle (句柄或描述符):它是资源在操作系统层面的一种抽象,表示与事件绑定了的资源,即各种 SocketChannel
  • Synchronous Event Demultiplexer (同步事件分发器):Handle 代表的事件会被注册到同步事件分发器上,当事件就绪时,Demultiplexer 会将就绪的事件提交给 Reactor。
    • Demultiplexer 的本质是一个系统调用,用于等待事件的发生。调用方在调用它后会被阻塞,一直阻塞到 Demultiplexer 上有事件就绪为止。
    • 在 Linux 中,同步事件分发器指的就是 I/O 多路复用器,比如 selectpollepoll 等,Java NIO 中的 Selector 就是对多路复用器的封装。
  • Reactor (反应器):事件管理的接口,内部使用 Synchronous Event Demultiplexer 注册、注销 Event Handler,当有事件进入"就绪"状态时,调用注册事件的回调函数处理事件。
  • Event Handler (事件处理器接口):事件处理程序提供了一组接口,在 Reactor 监听到相应的事件发生时调用,执行相应的事件处理。
    • 比如当 Channel 被注册到 Selector 时的回调方法、连接事件发生时的回调方法、写事件发生时的回调方法等都是事件处理器,我们可以实现这些回调来达到对某一事件进行特定反馈的目的。
    • 原生的 Java 并不支持 Event Handler,实际业务中需要自己实现,或使用 Netty 等网络框架
  • Concrete Event Handler (事件处理器实现):它是 Event Handler 的实现类,用于实现回调方法指定的业务逻辑。

Reactor 模型有三种模型,分别是:单 Reactor 单线程模型、单 Reactor 多线程模型和主从 Reactor 多线程模型。

2.1.1 单 Reactor 单线程模型

单 Reactor 单线程模型指设计中只有一个 Reactor,无论是与 I/O 读写相关,还是与 I/O 无关的编解码和计算,都在一个线程上完成。其架构图如下所示:

单 Reactor 单线程模型

在上图中:

  • Acceptor 专门处理连接事件,而 Selector 则充当同步事件分发器。
  • 客户端的请求可以分为连接请求和其他事件请求两种。
    • Selector 上注册了一系列的 Channel,它不断监听这些 Channel。
    • 一旦某个 Channel 上的事件处理器就绪,Selector 就会将该事件分发给事件处理器。

该模型仅依靠单线程处理请求,主循环承担了太多的任务,容易在高并发情境下造成请求积压甚至超时。此外,单线程无法有效利用多核资源。因此,更合适的做法是为解码、计算和编码操作引入额外的线程,并使用线程池进行管理。

2.1.2 单 Reactor 多线程模型

单 Reactor 多线程模型是指仅有一个线程负责执行 I/O 操作和处理连接请求,其他逻辑均由 Worker 线程执行。其架构图如下:

单 Reactor 多线程模型

与第一种模型相比,单 Reactor 多线程模型将业务逻辑委托给线程池来处理,从而可以更有效地利用多核 CPU 资源。然而,单个线程的 Reactor 仍负责监听和响应所有事件,这在高并发环境下仍可能产生性能瓶颈。因此,主从 Reactor 多线程模型应运而生。

2.1.3 主从 Reactor 多线程模型

在客户端连接众多且频繁进行 I/O 操作的情况下,单 Reactor 模型就会暴露出问题。因为 Reactor 不支持异步 I/O 操作,这意味着当 Reactor 处理读写事件时,其他客户端的连接操作可能无法得到及时处理。主从 Reactor 多线程模型就是专门用来解决这个问题的:

主从 Reactor 多线程模型

该模型将处理连接事件的 Reactor 与处理读写事件的 Reactor 分离,避免了读写事件较为频繁的情况下影响新客户端连接。

主从 Reactor 多线程模型中存在多个 Reactor,Main-Reactor 一般只有一个,它负责监听和处理连接请求;而 Sub-Reactor 可以有多个,用线程池进行管理,主要负责监听和处理读写事件等。当然 Main Reactor 也可以多个,也通过线程池管理,但是这样会增加系统复杂度,需要合理规划调度,否则反而会拖累性能。

2.2 单 Reactor 单线程模型代码示例

简单起见,本文只介绍如何实现单 Reactor 单线程模型,下面用到的 API 可以参考上一篇文章:java.nio 核心 API

2.2.1 服务端实现

我们首先构建服务端的核心部分,即构造方法和 main 方法。这两个关键方法将用于启动服务器并激活 Reactor 反应器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ReactorServer {

    private final Selector selector;

    public ReactorServer() throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        ServerSocket serverSocket = serverSocketChannel.socket();
        serverSocket.bind(new InetSocketAddress(1234));

        selector = Selector.open();
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    // 其他内容暂时省略 ...

    public static void main(String[] args) throws IOException {
        ReactorServer server = new ReactorServer();
        System.out.println("Server start ...");
        Reactor reactor = server.new Reactor();
        reactor.run();
    }
}

接下来,我们将专注于 Reactor 类的实现。考虑到该模型的依赖性,这里将 Reactor 设计为内部类。同时,为了简化代码,我们将事件处理器的实现直接嵌入到了 Reactor 中:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class Reactor {
    public void run() {
        try {
            // Reactor 循环
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    if (selectionKey.isAcceptable()) {
                        // 处理连接事件
                        handleAcceptEvent(selectionKey);
                    } else if (selectionKey.isReadable()) {
                        // 处理可读事件
                        handleReadEvent(selectionKey);
                    } else if (selectionKey.isWritable()) {
                        // 处理可写事件
                        handleWriteEvent(selectionKey);
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    // 连接事件处理器
    private void handleAcceptEvent(SelectionKey selectionKey) throws IOException {
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
        SocketChannel clientChannel = serverSocketChannel.accept();
        if (clientChannel != null) {
            clientChannel.configureBlocking(false);// 设置非阻塞
            // 监听可读事件
            clientChannel.register(selector, SelectionKey.OP_READ);
        }
    }
    // 可读事件处理器
    private void handleReadEvent(SelectionKey selectionKey) throws IOException {
        SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
        int count = clientChannel.read(receiveBuffer);
        if (count > 0) {
            String context = new String(receiveBuffer.array(), 0, count);
            System.out.println("Received from client: " + context);
            // 读取成功后监听可写事件
            selectionKey.interestOps(SelectionKey.OP_WRITE);
        }
    }
    // 可写事件处理器
    private void handleWriteEvent(SelectionKey selectionKey) throws IOException {
        SocketChannel clientChannel = (SocketChannel) selectionKey.channel();
        ByteBuffer sendBuffer = ByteBuffer.wrap(("Hello client!").getBytes());
        clientChannel.write(sendBuffer);
        // 写回后,继续监听可读事件
        selectionKey.interestOps(SelectionKey.OP_READ);
    }
}

2.2.2 客户端实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
public class TestReactorClient {

    private final String serverHost;
    private final int serverPort;
    private Selector selector;
    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
    private SocketChannel clientChannel;

    public TestReactorClient(String serverHost, int serverPort) {
        this.serverHost = serverHost;
        this.serverPort = serverPort;
    }

    public void run() throws IOException, InterruptedException {
        connect();
        Thread.sleep(1000);
        sendMsg();
        executorService.shutdown();
    }

    private void connect() throws IOException {
        clientChannel = SocketChannel.open();
        clientChannel.configureBlocking(false);
        selector = Selector.open();
        clientChannel.register(selector, SelectionKey.OP_CONNECT);
        clientChannel.connect(new InetSocketAddress(serverHost, serverPort));
        selector.select();
        clientChannel.finishConnect();
        selector.selectedKeys().clear();
        clientChannel.register(selector, SelectionKey.OP_READ);
        executorService.execute(this::handleEvent);
    }

    private void handleEvent() {
        try {
            while (true) {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    if (selectionKey.isReadable()) {
                        ByteBuffer receiveBuffer = ByteBuffer.allocate(1024);
                        int count = clientChannel.read(receiveBuffer);
                        if (count > 0) {
                            String context = new String(receiveBuffer.array(), 0, count);
                            System.out.println("Received from server: " + context);
                        }
                    }
                }
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void sendMsg() throws IOException {
        ByteBuffer sendBuffer = ByteBuffer.wrap("Hello server!".getBytes());
        clientChannel.write(sendBuffer);
        System.out.println("Sent to server: Hello server!");
    }

    public static void main(String[] args) throws IOException, InterruptedException {
        TestReactorClient client = new TestReactorClient("127.0.0.1", 1234);
        client.run();
    }
}

由于篇幅原因,测试结果省略。

3 异步 I/O 模型

异步 I/O 是从 JDK 1.7 开始支持的。在该模型中,当后台数据处理完成时,内核会通知相应的线程直接获取已经处理好的数据(这是异步与同步之间最本质的区别,即内核返回的是可读事件通知,还是已经处理好的数据),并继续执行后续的操作。

警告

Windows 提供了一套完整的支持 Socket 异步编程的接口 —— IOCP(JDK 中由 sun.nio.ch.Iocp 封装),它是真正意义上的由操作系统实现的异步 I/O。

然而,Linux 内核并不支持异步网络 I/O,因此 Linux 平台上的 JDK 中的异步 API 是由 JVM 在用户态模拟出来的。详情可以参考源码 sun.nio.ch.EPollPort(还是封装的 epoll),本文就不展开了。

不过,下文在介绍异步概念时,我们仍以抽象内核为主体,而暂时忽略 Linux 平台的特性。

3.1 Proactor 异步模型

Proactor 模型是与 Reactor 对标的异步模型。该模型有如下六个角色:

  • Handle:同 Reactor 模型的 Handle,表示与事件绑定了的资源,即 AsynchronousServerSocketChannelAsynchronousSocketChannel 等异步管道。
  • Asynchronous Operation Processor (异步操作处理器):由操作系统内核实现,负责执行相关事件的 I/O 操作。
  • Proactor (前摄器):由操作系统内核实现,负责管理事件循环。它通过 Async Operation Processor 来执行 I/O 操作,当事件完成时,Proactor 会调用相应的完成事件处理器(Completion Event Handler)来处理完成的事件。
  • Completion Event Queue (完成事件队列):由操作系统内核实现,Async Operation Processor 执行完的 I/O 操作结果会放入该队列,Proactor 会从该队列中获得相应的结果。
  • Completion Event Handler (完成事件接口):完成事件处理器抽象层,一般是由回调函数组成的接口,例如 Java 中的 CompletionHandler
  • Concrete Completion Event Handler:完成事件处理器的具体实现。

Proactor 模型与 Reactor 模型很相似,也会进行事件分发,与 Reactor 不同的是,它注册的并不是就绪事件,而是完成事件。Reactor 模型需要应用程序自己处理 I/O 操作,而 Proactor 模型则是由内核线程处理,当执行事件处理器时,Reactor 模型下的 I/O 操作还没有完成,只是就绪,而在 Proactor 模型下 I/O 操作已经完成。

3.2 Java 异步 API 简介

在 Java 中,AIO 有三种使用方式:

  • 将来式:Java 用 Future 实现将来式,将执行任务交给线程池执行后,执行任务的线程并不会阻塞,它会返回一个 Future 对象,在执行结束后,Future 对象的状态会变成 Done。
    • 外部线程可以调用 get() 方法来获取结果,如果结果还没返回,则调用 get() 方法的线程会被阻塞。
    • 如果想避免阻塞,可以轮循 Future.isDone()
  • 回调式:Java 提供了 CompletionHandler 作为回调接口,在调用 read()write() 等方法时,可以传入 CompletionHandler 的实现作为事件完成的回调入口,然后再实现类中编写业务逻辑。
  • 混合式:因为 Future 不支持回调,而且 get() 方法还会导致阻塞,所以 Java 8 中新增了 CompletableFuture 类,该类既支持原来的 Future 功能,又支持回调功能。此外该接口还支持不同 CompletableFuture 之间的互相协调组合,进一步方便了异步 I/O 的开发。

接下来,让我们通过 CompletionHandler 实现一个简易的 Proactor 服务器,并用 Future 实现客户端进行测试。

3.3 Proactor 模型代码示例

3.3.1 服务端实现

首先,我们着手实现 main 方法。这个方法承担着与内核中的 Proactor、完成事件处理器等关键组件进行交互的职责,同时负责注册连接完成事件处理器:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
public class ProactorServer {

    public static void main(String[] args) throws IOException {
        AsynchronousChannelGroup channelGroup =
                AsynchronousChannelGroup.withThreadPool(Executors.newFixedThreadPool(10));

        AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(channelGroup);
        serverChannel.bind(new InetSocketAddress("127.0.0.1", 1234));
        System.out.println("Server start ...");
        // 注册连接处理器,处理客户端连接请求
        CompletionHandler<AsynchronousSocketChannel, Void> acceptHandler = new AcceptHandler(serverChannel);
        serverChannel.accept(null, acceptHandler);

        // 阻塞防止主线程退出
        try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

接下来,我们将专注于不同类型的完成事件处理器的实现,这些处理器都会遵循 CompletionHandler 接口的规范:

  • 连接事件处理器
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    
    static class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, Void> {
    
        private final AsynchronousServerSocketChannel serverChannel;
    
        public AcceptHandler(AsynchronousServerSocketChannel serverChannel) {
            this.serverChannel = serverChannel;
        }
    
        @Override
        public void completed(AsynchronousSocketChannel clientChannel, Void attachment) {
            // 连接事件处理成功后,监听可读事件
            CompletionHandler<Integer, ByteBuffer> readHandler = new ReadHandler(clientChannel);
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            clientChannel.read(buffer, buffer, readHandler);
            // 通知客户端接受连接
            serverChannel.accept(null, this);
        }
    
        @Override
        public void failed(Throwable exc, Void attachment) {
            System.err.println("connect failed: " + exc.getMessage());
        }
    }
  • 读取事件处理器
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    
    static class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    
        private final AsynchronousSocketChannel clientChannel;
    
        public ReadHandler(AsynchronousSocketChannel clientChannel) {
            this.clientChannel = clientChannel;
        }
    
        @Override
        public void completed(Integer bytesRead, ByteBuffer buffer) {
            if (bytesRead > 0) {
                buffer.flip();
                byte[] data = new byte[bytesRead];
                buffer.get(data);
                String receivedData = new String(data);
                System.out.println("receive msg from client: " + receivedData);
    
                // 接收请求并返回响应
                String response = "hello, client!";
                ByteBuffer responseBuffer = ByteBuffer.wrap(response.getBytes());
                CompletionHandler<Integer, ByteBuffer> writeHandler = new WriteHandler(clientChannel);
                clientChannel.write(responseBuffer, responseBuffer, writeHandler);
            }
        }
    
        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {
            System.err.println("receive msg from client error: " + exc.getMessage());
        }
    }
  • 写完事件处理器
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
    static class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
    
        private final AsynchronousSocketChannel clientChannel;
    
        public WriteHandler(AsynchronousSocketChannel clientChannel) {
            this.clientChannel = clientChannel;
        }
    
        @Override
        public void completed(Integer bytesWritten, ByteBuffer buffer) {
            System.out.println("send msg to client success!");
    
            try {
                clientChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void failed(Throwable exc, ByteBuffer buffer) {
            System.err.println("send msg to client error: " + exc.getMessage());
        }
    }

与 NIO 不同的地方在于,AIO 使用了新的通道类 AsynchronousServerSocketChannel。此外,异步 I/O 无法利用 Selector,而是通过 AsynchronousSocketChannel 注册回调函数。

3.3.2 客户端核心实现

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
static class ClientThread implements Runnable {
    private final int clientId;
    public ClientThread(int clientId) {
        this.clientId = clientId;
    }

    @Override
    public void run() {
        try {
            AsynchronousSocketChannel clientChannel = AsynchronousSocketChannel.open();
            Future<Void> future = clientChannel.connect(new InetSocketAddress("localhost", 1234));
            future.get(); // 阻塞等待连接成功

            String message = "Hello from client " + clientId;
            ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
            Future<Integer> writeFuture = clientChannel.write(buffer);
            writeFuture.get(); // 阻塞等待发送成功

            ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
            Future<Integer> readFuture = clientChannel.read(responseBuffer);
            readFuture.get(); // 阻塞等待服务端返回数据

            responseBuffer.flip();
            byte[] responseData = new byte[responseBuffer.remaining()];
            responseBuffer.get(responseData);
            System.out.println("Client " + clientId + " received response: " + new String(responseData));

            clientChannel.close();
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }
}

由于篇幅原因,测试结果省略。至此,与 Java 程序 I/O 模型相关的内容就介绍完了。

4 总结

网络 I/O 模型描述了网卡与 Socket 监听线程之间的交互方式。无论是基础 I/O 模型,还是 I/O 多路复用模型,它们都有其各自的应用场景:

  • 基础 I/O 模型的 API 相对简单,特别适合用来实现一些简易的客户端。
  • 多路复用 I/O 模型适合有大量非活跃连接的场景。该模型通过批量处理,将多个 Socket 交由个别线程进行统一处理,从而极大地提高了吞吐量。然而,由于该模型不支持异步 I/O,用户线程在处理 I/O 任务时会阻塞新到达的任务,因此通常需要设计更复杂的架构,例如主从 Reactor 多线程模型等,这增加了系统的复杂性。
  • 异步 I/O 模型解决了上述同步模型中的问题,而且不需要在应用层设计复杂的架构(这部分工作下放到内核来完成了)。但是这种模型仅在 Windows 系统上能得到良好的支持,我们最常用的 Linux 内核并不支持原生的网络异步 I/O,当 Java 程序部署在 Linux 平台上时,JVM 需要在用户态线程中模拟异步操作,依旧无法避免线程上下文切换以及内核数据拷贝,因此其性能并没有比同步 I/O 强很多。

需要注意的是,在大部分连接都处于活跃状态,且每个连接的数据量都极大的场景下,多路复用的作用就不大了。


欢迎关注我的公众号,第一时间获取文章更新:

微信公众号

相关内容