Netty
字数: 0 字 时长: 0 分钟
简介
Netty 是基于事件驱动架构的高性能 Java 网络通信框架,解决了传统 Java 网络编程中的一些复杂问题:
- 传统 BIO 阻塞瓶颈:底层使用 NIO 多路复用实现单线程万级连接,避免线程爆炸
- 原生 NIO 开发复杂:通过 Reactor 线程模型 + 线程队列对开发者封装异步 API,无需阻塞等待
- 网络编程常见痛点:自带 TCP 粘包拆包、心跳检测、内存池等生产级解决方案
Netty 兼具易用性、高性能、拓展性等优点,许多流行中间件底层通信框架都是基于 Netty,比如 RocketMQ、Dubbo、ElasticSearch 等。
Netty 如何解决原生 NIO 空轮询 Bug
空轮询 bug 的原因
当连接的 Socket 被突然中断时, epoll
会将该 Socket 的事件标记为 EPOLLHUP
或 EPOLLERR
,导致 Selector
被唤醒。 然而 SelectionKey
并未自定义处理这些异常事件的类型,导致 Selector
被唤醒后,无法处理这些异常事件,从而进入空轮询状态。
Netty 没有解决空轮询 Bug,而是绕开了这个问题
- Netty 通过
selectCnt
计数器统计连续空轮询的次数:每次执行Selector.select()
后,如果没有发现 I/O 事件,计数器就递增 - 当计数器达到阈值(默认 512)时,Netty 会重建一个
Selector
,将所有注册的Channel
从旧的Selector
转移到新的Selector
- 重建并且转移完成后,Netty 会关闭旧的
Selector
,从而避免继续在旧的Selector
上空轮询
Reactor 线程模型
传统阻塞 I/O 线程模型的瓶颈:每个连接独占线程,无法支撑大量连接;线程阻塞在 I/O 时 CPU 闲置,资源利用率低。而 Reactor 线程模型可以通过有限的线程资源高效处理海量连接。
单 Reactor 单线程模型
Reactor
对象通过Select
监听客户端请求事件,收到事件后通过Dispatch
分发- 如果是建立连接请求事件,由
Acceptor
处理连接请求 - 如果是读写请求事件,则由
Handler
对象处理业务流程
单 Reactor 单线程模型实现简单,但业务阻塞会拖垮整个系统,适合 Redis 单线程架构(Redis O(1) 复杂度,速度快不易阻塞)
单 Reactor 多线程模型
- 主 Reactor (单线程):仅处理
ACCEPT
和READ
、WRITE
事件,业务处理转发给 Worker 线程池 - Worker 线程池处理所有业务逻辑
业务处理由多线程处理,优点是业务逻辑不会阻塞 I/O ,但单 Reactor 是高负载下的性能瓶颈。为 Netty 3.X 版本默认模型。
主从 Reactor 多线程模型
- Reactor 主线程只处理
ACCEPT
事件 - 多个 Reactor 子线程来处理
READ
、WRITE
事件,每个 Reactor 子线程独占 Selector ,无竞争 - Worker 线程池来处理业务逻辑
Reactor 分为一个专门复杂处理连接的 Main Reactor(boosGroup) 和若干个负责处理读写Sub Reactor(workGroup)。是 Netty 4.x 的核心模型。
Netty 原理
BoosGroup
BoosGroup
线程组中的线程(通常是一个)负责监听绑定的端口,接收客户端的连接请求。 当有新的连接到来时,BoosGroup
将其注册到 WorkerGroup
中的某个 NioEventLoop
中,并将其 Channel
(即连接)交由 Worker 线程处理
WorkerGroup
WorkerGroup
中的每一个线程都是一个NioEventLoop
,负责管理和处理与一个或多个Channel
相关的所有 I/O 操作。NioEventLoop
内部使用一个Selector
进行多路复用监听多个Channel
的事件(读、写、连接等)- 当有 I/O 事件发生时,
NioEventLoop
会将该事件分发给对应的Channel
处理,处理逻辑通常由ChannelPipeline
中的各个ChannelHandler
完成。
为什么 Netty 不用 AIO 而用 NIO 多路复用
- Netty 整体架构是
Reactor
模型,而 AIO 是Proactor
(异步) 模型,混在一起非常混乱 - AIO 有个缺点是接收数据需要预先分配缓存,而不是 NIO 那些需要接收时才需要分配缓存,所以多连接数多但流量小的情况会浪费内存
- Linux 上 AIO 不够成熟,性能上没有明显优势
ChannelPipeline
Pipeline 的本质是双向责任链,在 DefaultChannelPipeline
类中定义了 HeadContext
和 TailContext
两个上下文对象,它们是 pipeline 的首尾节点。
- 入站事件
首尾节点由 Netty 创建,用户处理器插入到中间。入站事件(如 selector
检测到读事件)触发时,入站事件从 HeadContext
开始,逐个触发入站处理器(ChannelInboundHandler
),最后到达 TailContext
- 出站事件
出站时,从 TailContext
开始,逐个触发出站处理器(ChannelOutboundHandler
),最后到达 HeadContext
- 双工处理器
Netty 不仅支持入站和出站两类处理器,还专门设计了 ChannelDuplexHandler
(双工处理器) ,可以同时处理双向事件。
通过类图可以看到 ChannelDuplexHandler
巧妙地继承了 ChannelInboundHandler
和 ChannelOutboundHandler
,心跳检测处理器 IdleStateHandler
就是双工处理器
Netty 心跳检测
Netty 提供了心跳检测处理器 IdleStateHandler
,只需简单配置就可以完成心跳检测
- 在
ChannelPipeline
处理器链中添加IdleStateHandler
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// IdleStateHandler 是 netty 的空闲状态处理器
pipeline.addLast(new IdleStateHandler(
3, // 3秒没有读就触发心跳检测
5, // 5秒没有写就触发心跳检测
7, // 7秒没有读写就触发心跳检测
TimeUnit.SECONDS));
pipeline.addLast(new MyServerHandler());
}
});
- 重写
userEventTriggered
方法处理心跳事件
userEventTriggered
收到 IdleStateEvent
空闲事件时,进行处理
public class MyServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent idleStateEvent) {
switch (idleStateEvent.state()) {
case READER_IDLE -> {
System.out.println("读空闲");
}
case WRITER_IDLE -> {
System.out.println("写空闲");
}
case ALL_IDLE -> {
System.out.println("读写空闲");
}
}
}
}
}
Netty 启动过程
Netty 服务基础代码实现
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(7668).sync();
cf.channel().closeFuture().sync();
}
以上是 Netty 服务基础代码实现,接下来通过核心源码分析启动过程:
线程数设置
如果没有显示指定线程数,默认值为 系统可用的 CPU 线程数 * 2
- 线程工厂为每个 EventLoop 创建一个特定线程
初始化 ServerBootStrap
- 通过
group()
添加boosGroup
和workerGroup
channel(NioServerSocketChannel.class)
,注意此时只是通过反射得到了NioServerSocketChannel
的构造函数,还未正式创建NioServerSocketChannel
对象
option()
方法传入 TCP 参数,放在LinkedHashMap
中
childHandler()
传入handler
,这个handler
将会在每个客户端连接时进行调用,供SocketChannel
使用
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
端口绑定
调用 bind()
进行端口绑定时,会执行一个关键方法 init()
init()
采用双重异步初始化机制,确保用户自定义 handler 先被添加,保证线程安全
最终会调用 JDK NIO 进行端口绑定
// NioServerSocketChannel.java
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress, config.getBacklog());
}
//SingleThreadIoEventGroup.java
protected void run() {
assert inEventLoop();
//初始化平台相关的 I/O 多路复用器 select
ioHandler.initialize();
do {
runIo();
if (isShuttingDown()) {
ioHandler.prepareToDestroy();
}
//异步任务处理
runAllTasks(maxTaskProcessingQuantumNs);
} while (!confirmShutdown() && !canSuspend());
}