Netty
1. 简介
- Netty 是由 JBOSS 提供的一个 Java 开源框架,现为 Github 上的独立项目
- Netty 是一款高性能、异步事件驱动的 Java 网络编程框架,专为构建高并发、低延迟的分布式系统而设计。
- Netty 广泛应用于互联网、大数据、游戏、IoT 等领域
- Dubbo、RocketMQ、Elasticsearch、Flink 等底层通信均依赖 Netty
2. 原生 NIO 存在的问题
- NIO 类库繁杂,适用麻烦
- 需要熟悉 JAVA 多线程编程才能写出高质量的 NIO 程序
- 开发难度高,维护成本高,如客户端断连重连、网络闪断、半包读写、失败缓存、网络拥塞等
- NIO 空轮询 BUG 没有被 jdk 根本解决
3. Reactor 模型
单 Reactor 单线程模型
Reactor
对象通过Select
监听客户端请求事件,收到事件后通过Dispatch
分发- 如果是建立连接请求事件,由
Acceptor
通过Accept
处理连接请求,然后创建一个Handler
对象处理连接完成后的后续业务处理 - 如果是读写请求事件,则由
Handler
对象处理业务流程
适用场景:客户端数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度 O(1) 的情况
单 Reactor 模型
- 主 Reactor (单线程):仅处理
ACCEPT
和READ
、WRITE
事件,业务处理转发给 Worker 线程池 - Worker 线程池处理所有业务逻辑
适用场景:中等并发量业务场景 (如企业级内部管理系统)
主从 Reactor 多线程模型
- Reactor 主线程只处理
ACCEPT
事件 - 多个 Reactor 子线程来处理
READ
、WRITE
事件,每个 Reactor 子线程独占 Selector ,无竞争 - Worker 线程池来处理业务逻辑
适用场景:
- 高并发互联网应用(如 IM 系统、API 网关)
- 需要同时监听多个端口的服务
4. Netty 模型
为什么 Netty 不用 aio 而是用 IO多路复用
- Netty 整体架构是
Reactor
模型,而 AIO 是Proactor
(异步) 模型,混在一起非常混乱 - AIO 有个缺点是接收数据需要预先分配缓存,而不是 NIO 那些需要接收时才需要分配缓存,所以多连接数多但流量小的情况会浪费内存
- Linux 上 AIO 不够成熟,性能上没有明显优势
- 每个
NioEventLoopGroup
下包含多个NioEventLoop
- 每个
NioEventLoop
下包含一个Selector
和TaskQueue
- 每个
NioEventLoop
的Selector
上可以注册监听多个NioChannel
- 每个
NioChannel
只会绑定在唯一的NioEventLoop
上 - 每个
NioChannel
都会绑定有一个自己的ChannelPipeline
TaskQueue
每个 NioEventLoop
有一个 TaskQueue
任务队列用于异步执行任务,防止阻塞
- 普通异步任务
java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//方案一: 定义普通异步任务
ctx.channel().eventLoop().execute(() -> {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端",CharsetUtil.UTF_8));
});
}
- 异步定时任务
java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//方案二: 定义定时任务
ctx.channel().eventLoop().schedule(() -> {
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8));
}, 5, TimeUnit.SECONDS);
}
异步模型
Netty 的 Future-Listener
机制是其异步编程模型的核心组件
1. 核心接口解析
ChannelFuture
java
public interface ChannelFuture extends Future<Void> {
// 核心方法
Channel channel(); // 关联的 Channel
ChannelFuture addListener(
GenericFutureListener<? extends Future<? super Void>> listener);
ChannelFuture sync() throws InterruptedException; // 阻塞等待完成(慎用)
boolean isSuccess(); // 是否成功
Throwable cause(); // 获取失败原因
}
GenericFutureListener
java
public interface GenericFutureListener<
F extends Future<?>> extends EventListener {
void operationComplete(F future) throws Exception; // 完成时触发
}
2. 工作机制全流程
- 操作发起阶段
- 调用如
channel.writeAndFlush(msg)
发起非阻塞写操作 - Netty 立即返回一个未完成的
ChannelFuture
对象
- 调用如
- 状态变更阶段
- 操作成功:将
Future
状态标记为SUCCESS
- 操作失败:将
Future
状态标记为FAILURE
,并记录失败原因
- 操作成功:将
- 回调触发阶段
- 立即触发:状态变更为完成时,若已有监听器存在,直接在当前线程触发监听器回调
- 延迟触发:状态变更为完成后续添加监听器,则直接触发监听器
3. 基本用法
监听端口绑定结果
java
ChannelFuture cf = bootstrap.bind(6668).sync();
cf.addListener(future -> {
if (cf.isSuccess()) {
System.out.println("监听端口 6668 成功");
}
});