Skip to content

Netty

字数: 0 字 时长: 0 分钟

简介

Netty 是基于事件驱动架构的高性能 Java 网络通信框架,解决了传统 Java 网络编程中的一些复杂问题:

  • 传统 BIO 阻塞瓶颈底层使用 NIO 多路复用实现单线程万级连接,避免线程爆炸
  • 原生 NIO 开发复杂:通过 Reactor 线程模型 + 线程队列对开发者封装异步 API,无需阻塞等待
  • 网络编程常见痛点:自带 TCP 粘包拆包、心跳检测、内存池等生产级解决方案

Netty 兼具易用性、高性能、拓展性等优点,许多流行中间件底层通信框架都是基于 Netty,比如 RocketMQ、Dubbo、ElasticSearch 等。

Netty 如何解决原生 NIO 空轮询 Bug

空轮询 bug 的原因

当连接的 Socket 被突然中断时, epoll 会将该 Socket 的事件标记为 EPOLLHUPEPOLLERR,导致 Selector 被唤醒。 然而 SelectionKey 并未自定义处理这些异常事件的类型,导致 Selector 被唤醒后,无法处理这些异常事件,从而进入空轮询状态。

Netty 没有解决空轮询 Bug,而是绕开了这个问题

  1. Netty 通过 selectCnt 计数器统计连续空轮询的次数:每次执行 Selector.select() 后,如果没有发现 I/O 事件,计数器就递增
  2. 当计数器达到阈值(默认 512)时,Netty 会重建一个 Selector,将所有注册的 Channel 从旧的 Selector 转移到新的 Selector
  3. 重建并且转移完成后,Netty 会关闭旧的 Selector,从而避免继续在旧的 Selector 上空轮询

Reactor 线程模型

传统阻塞 I/O 线程模型的瓶颈:每个连接独占线程,无法支撑大量连接;线程阻塞在 I/O 时 CPU 闲置,资源利用率低。而 Reactor 线程模型可以通过有限的线程资源高效处理海量连接

单 Reactor 单线程模型

单 Reactor 单线程.webp

  1. Reactor 对象通过 Select 监听客户端请求事件,收到事件后通过 Dispatch 分发
  2. 如果是建立连接请求事件,由 Acceptor 处理连接请求
  3. 如果是读写请求事件,则由 Handler 对象处理业务流程

单 Reactor 单线程模型实现简单,但业务阻塞会拖垮整个系统,适合 Redis 单线程架构(Redis O(1) 复杂度,速度快不易阻塞)

单 Reactor 多线程模型

单Reactor多线程.webp

  • 主 Reactor (单线程):仅处理 ACCEPTREADWRITE 事件,业务处理转发给 Worker 线程池
  • Worker 线程池处理所有业务逻辑

业务处理由多线程处理,优点是业务逻辑不会阻塞 I/O ,但单 Reactor 是高负载下的性能瓶颈。为 Netty 3.X 版本默认模型。

主从 Reactor 多线程模型

主从多线程 Reactor模式.webp

  • Reactor 主线程只处理 ACCEPT 事件
  • 多个 Reactor 子线程来处理 READWRITE 事件,每个 Reactor 子线程独占 Selector ,无竞争
  • Worker 线程池来处理业务逻辑

Reactor 分为一个专门复杂处理连接的 Main Reactor(boosGroup) 和若干个负责处理读写Sub Reactor(workGroup)。是 Netty 4.x 的核心模型。

Netty 原理

Netty 模型.webp

BoosGroup

BoosGroup 线程组中的线程(通常是一个)负责监听绑定的端口,接收客户端的连接请求。 当有新的连接到来时,BoosGroup 将其注册到 WorkerGroup 中的某个 NioEventLoop 中,并将其 Channel(即连接)交由 Worker 线程处理

WorkerGroup

  • WorkerGroup 中的每一个线程都是一个 NioEventLoop ,负责管理和处理与一个或多个 Channel 相关的所有 I/O 操作。
  • NioEventLoop 内部使用一个 Selector 进行多路复用监听多个 Channel 的事件(读、写、连接等)
  • 当有 I/O 事件发生时, NioEventLoop 会将该事件分发给对应的 Channel 处理,处理逻辑通常由 ChannelPipeline 中的各个 ChannelHandler 完成。

为什么 Netty 不用 AIO 而用 NIO 多路复用

  1. Netty 整体架构是 Reactor 模型,而 AIO 是 Proactor(异步) 模型,混在一起非常混乱
  2. AIO 有个缺点是接收数据需要预先分配缓存,而不是 NIO 那些需要接收时才需要分配缓存,所以多连接数多但流量小的情况会浪费内存
  3. Linux 上 AIO 不够成熟,性能上没有明显优势

ChannelPipeline

Pipeline 的本质是双向责任链,在 DefaultChannelPipeline 类中定义了 HeadContextTailContext 两个上下文对象,它们是 pipeline 的首尾节点。

Pipeline 双向责任链.webp

  • 入站事件

首尾节点由 Netty 创建,用户处理器插入到中间。入站事件(如 selector 检测到读事件)触发时,入站事件从 HeadContext 开始,逐个触发入站处理器(ChannelInboundHandler),最后到达 TailContext

Pipeline 入站.webp

  • 出站事件

出站时,从 TailContext 开始,逐个触发出站处理器(ChannelOutboundHandler),最后到达 HeadContext

Pipeline 出站.webp

  • 双工处理器

Netty 不仅支持入站和出站两类处理器,还专门设计了 ChannelDuplexHandler(双工处理器) ,可以同时处理双向事件。

通过类图可以看到 ChannelDuplexHandler 巧妙地继承了 ChannelInboundHandlerChannelOutboundHandler心跳检测处理器 IdleStateHandler 就是双工处理器

双工处理器类图.webp

Netty 心跳检测

Netty 提供了心跳检测处理器 IdleStateHandler,只需简单配置就可以完成心跳检测

  • ChannelPipeline 处理器链中添加 IdleStateHandler
java
bootstrap.group(bossGroup,workerGroup)
        .channel(NioServerSocketChannel.class)
        .handler(new LoggingHandler(LogLevel.INFO))
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                // IdleStateHandler 是 netty 的空闲状态处理器
                pipeline.addLast(new IdleStateHandler(
                        3, // 3秒没有读就触发心跳检测
                        5, // 5秒没有写就触发心跳检测
                        7, // 7秒没有读写就触发心跳检测
                        TimeUnit.SECONDS));
                pipeline.addLast(new MyServerHandler());
            }
        });
  • 重写 userEventTriggered 方法处理心跳事件

userEventTriggered 收到 IdleStateEvent 空闲事件时,进行处理

java
public class MyServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
            throws Exception {
        if (evt instanceof IdleStateEvent idleStateEvent) {
            switch (idleStateEvent.state()) {
                case READER_IDLE -> {
                    System.out.println("读空闲");
                }
                case WRITER_IDLE -> {
                    System.out.println("写空闲");
                }
                case ALL_IDLE -> {
                    System.out.println("读写空闲");
                }
            }
        }
    }

}

Netty 启动过程

Netty 服务基础代码实现

java
public static void main(String[] args) throws InterruptedException {
    EventLoopGroup boosGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    ServerBootstrap bootstrap = new ServerBootstrap();

    bootstrap.group(boosGroup,workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG,128)
            .childOption(ChannelOption.SO_KEEPALIVE,true)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new NettyServerHandler());
                }
            });
    
    ChannelFuture cf = bootstrap.bind(7668).sync();
    cf.channel().closeFuture().sync();
}

以上是 Netty 服务基础代码实现,接下来通过核心源码分析启动过程:

线程数设置

如果没有显示指定线程数,默认值为 系统可用的 CPU 线程数 * 2

核心线程数.webp

  • 线程工厂为每个 EventLoop 创建一个特定线程

线程工厂为 EventLoop 创建线程.webp

初始化 ServerBootStrap

  1. 通过 group() 添加 boosGroupworkerGroup

添加核心线程组.webp

  1. channel(NioServerSocketChannel.class) ,注意此时只是通过反射得到了 NioServerSocketChannel 的构造函数,还未正式创建 NioServerSocketChannel 对象

channel().webp

  1. option() 方法传入 TCP 参数,放在 LinkedHashMap

option.webp

  1. childHandler() 传入 handler ,这个 handler 将会在每个客户端连接时进行调用,供 SocketChannel 使用
java
private volatile ChannelHandler childHandler;

public ServerBootstrap childHandler(ChannelHandler childHandler) {
    this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
    return this;
}

端口绑定

调用 bind() 进行端口绑定时,会执行一个关键方法 init()

bind.webp

init() 采用双重异步初始化机制,确保用户自定义 handler 先被添加,保证线程安全

init().webp

最终会调用 JDK NIO 进行端口绑定

java
// NioServerSocketChannel.java
protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().bind(localAddress, config.getBacklog());
}

//SingleThreadIoEventGroup.java
protected void run() {
    assert inEventLoop();
    //初始化平台相关的 I/O 多路复用器 select
    ioHandler.initialize();
    do {
        runIo();
        if (isShuttingDown()) {
            ioHandler.prepareToDestroy();
        }
        //异步任务处理
        runAllTasks(maxTaskProcessingQuantumNs);
        
    } while (!confirmShutdown() && !canSuspend());
}

服务启动时序图

服务器启动时序图.webp