Netty
字数: 0 字 时长: 0 分钟
简介
Netty 是基于事件驱动架构的高性能 Java 网络通信框架,解决了传统 Java 网络编程中的一些复杂问题:
- 传统 BIO 阻塞瓶颈:底层使用 NIO 多路复用实现单线程万级连接,避免线程爆炸
- 原生 NIO 开发复杂:通过 Reactor 线程模型 + 线程队列对开发者封装异步 API,无需阻塞等待
- 网络编程常见痛点:自带 TCP 粘包拆包、心跳检测、内存池等生产级解决方案
Netty 兼具易用性、高性能、拓展性等优点,许多流行中间件底层通信框架都是基于 Netty,比如 RocketMQ、Dubbo、ElasticSearch 等。
Netty 如何解决原生 NIO 空轮询 Bug
空轮询 bug 的原因
当连接的 Socket 被突然中断时, epoll 会将该 Socket 的事件标记为 EPOLLHUP 或 EPOLLERR,导致 Selector 被唤醒。 然而 SelectionKey 并未自定义处理这些异常事件的类型,导致 Selector 被唤醒后,无法处理这些异常事件,从而进入空轮询状态。
Netty 没有解决空轮询 Bug,而是绕开了这个问题
- Netty 通过
selectCnt计数器统计连续空轮询的次数:每次执行Selector.select()后,如果没有发现 I/O 事件,计数器就递增 - 当计数器达到阈值(默认 512)时,Netty 会重建一个
Selector,将所有注册的Channel从旧的Selector转移到新的Selector - 重建并且转移完成后,Netty 会关闭旧的
Selector,从而避免继续在旧的Selector上空轮询
Reactor 线程模型
传统阻塞 I/O 线程模型的瓶颈:每个连接独占线程,无法支撑大量连接;线程阻塞在 I/O 时 CPU 闲置,资源利用率低。而 Reactor 线程模型可以通过有限的线程资源高效处理海量连接。
单 Reactor 单线程模型

Reactor对象通过Select监听客户端请求事件,收到事件后通过Dispatch分发- 如果是建立连接请求事件,由
Acceptor处理连接请求 - 如果是读写请求事件,则由
Handler对象处理业务流程
单 Reactor 单线程模型实现简单,但业务阻塞会拖垮整个系统,适合 Redis 单线程架构(Redis O(1) 复杂度,速度快不易阻塞)
单 Reactor 多线程模型

- 主 Reactor (单线程):仅处理
ACCEPT和READ、WRITE事件,业务处理转发给 Worker 线程池 - Worker 线程池处理所有业务逻辑
业务处理由多线程处理,优点是业务逻辑不会阻塞 I/O ,但单 Reactor 是高负载下的性能瓶颈。为 Netty 3.X 版本默认模型。
主从 Reactor 多线程模型

- Reactor 主线程只处理
ACCEPT事件 - 多个 Reactor 子线程来处理
READ、WRITE事件,每个 Reactor 子线程独占 Selector ,无竞争 - Worker 线程池来处理业务逻辑
Reactor 分为一个专门复杂处理连接的 Main Reactor(boosGroup) 和若干个负责处理读写Sub Reactor(workGroup)。是 Netty 4.x 的核心模型。
Netty 原理

BoosGroup
BoosGroup 线程组中的线程(通常是一个)负责监听绑定的端口,接收客户端的连接请求。 当有新的连接到来时,BoosGroup 将其注册到 WorkerGroup 中的某个 NioEventLoop 中,并将其 Channel(即连接)交由 Worker 线程处理
WorkerGroup
WorkerGroup中的每一个线程都是一个NioEventLoop,负责管理和处理与一个或多个Channel相关的所有 I/O 操作。NioEventLoop内部使用一个Selector进行多路复用监听多个Channel的事件(读、写、连接等)- 当有 I/O 事件发生时,
NioEventLoop会将该事件分发给对应的Channel处理,处理逻辑通常由ChannelPipeline中的各个ChannelHandler完成。
为什么 Netty 不用 AIO 而用 NIO 多路复用
- Netty 整体架构是
Reactor模型,而 AIO 是Proactor(异步) 模型,混在一起非常混乱 - AIO 有个缺点是接收数据需要预先分配缓存,而不是 NIO 那些需要接收时才需要分配缓存,所以多连接数多但流量小的情况会浪费内存
- Linux 上 AIO 不够成熟,性能上没有明显优势
ChannelPipeline
Pipeline 的本质是双向责任链,在 DefaultChannelPipeline 类中定义了 HeadContext 和 TailContext 两个上下文对象,它们是 pipeline 的首尾节点。

- 入站事件
首尾节点由 Netty 创建,用户处理器插入到中间。入站事件(如 selector 检测到读事件)触发时,入站事件从 HeadContext 开始,逐个触发入站处理器(ChannelInboundHandler),最后到达 TailContext

- 出站事件
出站时,从 TailContext 开始,逐个触发出站处理器(ChannelOutboundHandler),最后到达 HeadContext

- 双工处理器
Netty 不仅支持入站和出站两类处理器,还专门设计了 ChannelDuplexHandler(双工处理器) ,可以同时处理双向事件。
通过类图可以看到 ChannelDuplexHandler 巧妙地继承了 ChannelInboundHandler 和 ChannelOutboundHandler,心跳检测处理器 IdleStateHandler 就是双工处理器

Netty 心跳检测
Netty 提供了心跳检测处理器 IdleStateHandler,只需简单配置就可以完成心跳检测
- 在
ChannelPipeline处理器链中添加IdleStateHandler
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// IdleStateHandler 是 netty 的空闲状态处理器
pipeline.addLast(new IdleStateHandler(
3, // 3秒没有读就触发心跳检测
5, // 5秒没有写就触发心跳检测
7, // 7秒没有读写就触发心跳检测
TimeUnit.SECONDS));
pipeline.addLast(new MyServerHandler());
}
});- 重写
userEventTriggered方法处理心跳事件
userEventTriggered 收到 IdleStateEvent 空闲事件时,进行处理
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent idleStateEvent) {
switch (idleStateEvent.state()) {
case READER_IDLE -> {
System.out.println("读空闲");
}
case WRITER_IDLE -> {
System.out.println("写空闲");
}
case ALL_IDLE -> {
System.out.println("读写空闲");
}
}
}
}
}Netty 启动过程
Netty 服务基础代码实现
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(7668).sync();
cf.channel().closeFuture().sync();
}以上是 Netty 服务基础代码实现,接下来通过核心源码分析启动过程:
线程数设置
如果没有显示指定线程数,默认值为 系统可用的 CPU 线程数 * 2

- 线程工厂为每个 EventLoop 创建一个特定线程

初始化 ServerBootStrap
- 通过
group()添加boosGroup和workerGroup

channel(NioServerSocketChannel.class),注意此时只是通过反射得到了NioServerSocketChannel的构造函数,还未正式创建NioServerSocketChannel对象
.BhdFEOrz.webp)
option()方法传入 TCP 参数,放在LinkedHashMap中

childHandler()传入handler,这个handler将会在每个客户端连接时进行调用,供SocketChannel使用
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}端口绑定
调用 bind() 进行端口绑定时,会执行一个关键方法 init()

init() 采用双重异步初始化机制,确保用户自定义 handler 先被添加,保证线程安全
.C5lmOo0n.webp)
最终会调用 JDK NIO 进行端口绑定
// NioServerSocketChannel.java
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress, config.getBacklog());
}
//SingleThreadIoEventGroup.java
protected void run() {
assert inEventLoop();
//初始化平台相关的 I/O 多路复用器 select
ioHandler.initialize();
do {
runIo();
if (isShuttingDown()) {
ioHandler.prepareToDestroy();
}
//异步任务处理
runAllTasks(maxTaskProcessingQuantumNs);
} while (!confirmShutdown() && !canSuspend());
}服务启动时序图

