Netty 源码分析
Netty 服务启动过程
Netty 服务基础代码实现
java
public static void main(String[] args) throws InterruptedException {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
ChannelFuture cf = bootstrap.bind(7668).sync();
cf.channel().closeFuture().sync();
}
1. NioEventLoopGroup
创建
java
//老版本创建方式
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 4.2.0 版本原方法已经过时,用下面方式替代
EventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(
NioIoHandler.newFactory()
);
源码解析
- 线程数设置
如果没有显示指定线程数,默认值为 系统可用的 CPU 线程数 * 2
java
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
- 核心线程组构造
java
//线程工厂,为每一个 EventLoop 创建一个特定线程
if (executor == null) {
executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
}
this.children = new EventExecutor[nThreads];
//子线程循环创建
for(int i = 0; i < nThreads; ++i) {
//标记子线程是否创建成功
boolean success = false;
//应对 JVM 崩溃等极端情况导致 try 块未成功执行完毕
boolean var18 = false;
try {
var18 = true;
this.children[i] = this.newChild(executor, args);
success = true;
var18 = false;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//资源回滚
if (var18) {
if (!success) {
for(int j = 0; j < i; ++j) {
this.children[j].shutdownGracefully();
}
for(int j = 0; j < i; ++j) {
EventExecutor e = this.children[j];
try {
while(!e.isTerminated()) {
e.awaitTermination(2147483647L, TimeUnit.SECONDS);
}
} catch (InterruptedException var20) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//双重保险资源回滚
if (!success) {
for(int j = 0; j < i; ++j) {
this.children[j].shutdownGracefully();
}
for(int j = 0; j < i; ++j) {
EventExecutor e = this.children[j];
try {
while(!e.isTerminated()) {
e.awaitTermination(2147483647L, TimeUnit.SECONDS);
}
} catch (InterruptedException var22) {
Thread.currentThread().interrupt();
break;
}
}
}
}
2. ServerBootStrap
启动器创建
java
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boosGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG,128)
.childOption(ChannelOption.SO_KEEPALIVE,true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
源码解析
group
添加服务器线程组
java
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
return this;
}
channel(NioServerSocketChannel.class)
此时只是通过反射得到了 NioServerSocketChannel
的构造函数,还未正式创建 NioServerSocketChannel
java
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
option()
option
方法传入 TCP 参数,放在一个 LinkedHashMap 中
java
public <T> B option(ChannelOption<T> option, T value) {
ObjectUtil.checkNotNull(option, "option");
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
childHandler()
传入 handler
,这个 handler
将会在每个客户端连接时进行调用,供 SocketChannel
使用
java
private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) {
this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
return this;
}
3. 端口绑定
java
ChannelFuture f = bootstrap.bind(port).sync();
源码分析
bind()
从 bind()
入口调用 doBind()
方法
java
// ServerBootstrap.java
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(SocketAddress localAddress) {
// 参数校验后进入核心逻辑
return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); // 🌟 重点方法
// ...
}
initAndRegister()
initAndRegister()
是一个核心方法,它会创建 Channel
实例,初始化配置,并注册到 EventLoopGroup
中
java
// AbstractBootstrap.java
final ChannelFuture initAndRegister() {
// Step1: 通过反射创建 Channel 实例
// 通过 channel(NioServerSocketChannel.class) 得到的 channelFactory
// 最终创建 NioServerSocketChannel
Channel channel = channelFactory.newChannel();
// Step2: 初始化 Channel 配置
init(channel); // 🌟 扩展点为 ServerBootstrap 实现的 init 方法
// Step3: 注册到 EventLoopGroup
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
init(channel)
方法细节
ServerBootstrapAcceptor
负责:
- 接收新连接(
channelRead()
事件处理) - 将新连接注册到
childGroup
(即worker
线程组) - 应用开发者配置的
childHandler
java
// ServerBootstrap.java
void init(Channel channel) {
// 设置 ChannelOptions 和 Attributes
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
final Collection<ChannelInitializerExtension> extensions = getInitializerExtensions();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 异步初始化 ServerBootstrapAcceptor 接收新连接
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler,
currentChildOptions, currentChildAttrs,
extensions));
}
});
}
});
}
- JKD 层
bind()
最终调用调用 JDK NIO 进行端口绑定
java
// NioServerSocketChannel.java
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress, config.getBacklog());
}
//SingleThreadIoEventGroup.java
protected void run() {
assert inEventLoop();
//初始化平台相关的 I/O 多路复用器 select
ioHandler.initialize();
do {
runIo();
if (isShuttingDown()) {
ioHandler.prepareToDestroy();
}
//异步任务处理
runAllTasks(maxTaskProcessingQuantumNs);
} while (!confirmShutdown() && !canSuspend());
}