Skip to content

Netty 源码分析

Netty 服务启动过程

Netty 服务基础代码实现

java
public static void main(String[] args) throws InterruptedException {
    EventLoopGroup boosGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();

    ServerBootstrap bootstrap = new ServerBootstrap();

    bootstrap.group(boosGroup,workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG,128)
            .childOption(ChannelOption.SO_KEEPALIVE,true)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new NettyServerHandler());
                }
            });
    
    ChannelFuture cf = bootstrap.bind(7668).sync();
    cf.channel().closeFuture().sync();
}

1. NioEventLoopGroup 创建

java
//老版本创建方式
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 4.2.0 版本原方法已经过时,用下面方式替代
EventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(
        NioIoHandler.newFactory()
);

源码解析

  • 线程数设置

如果没有显示指定线程数,默认值为 系统可用的 CPU 线程数 * 2

java
static {
    DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
            "io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
    super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
  • 核心线程组构造
java
//线程工厂,为每一个 EventLoop 创建一个特定线程
if (executor == null) {
    executor = new ThreadPerTaskExecutor(this.newDefaultThreadFactory());
}

this.children = new EventExecutor[nThreads];

//子线程循环创建
for(int i = 0; i < nThreads; ++i) {
    //标记子线程是否创建成功
    boolean success = false;
    //应对 JVM 崩溃等极端情况导致 try 块未成功执行完毕
    boolean var18 = false;

    try {
        var18 = true;
        this.children[i] = this.newChild(executor, args);
        success = true;
        var18 = false;
    } catch (Exception e) {
        throw new IllegalStateException("failed to create a child event loop", e);
    } finally {
        //资源回滚
        if (var18) {
            if (!success) {
                for(int j = 0; j < i; ++j) {
                    this.children[j].shutdownGracefully();
                }

                for(int j = 0; j < i; ++j) {
                    EventExecutor e = this.children[j];

                    try {
                        while(!e.isTerminated()) {
                            e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException var20) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }

        }
    }
    
    //双重保险资源回滚
    if (!success) {
        for(int j = 0; j < i; ++j) {
            this.children[j].shutdownGracefully();
        }

        for(int j = 0; j < i; ++j) {
            EventExecutor e = this.children[j];

            try {
                while(!e.isTerminated()) {
                    e.awaitTermination(2147483647L, TimeUnit.SECONDS);
                }
            } catch (InterruptedException var22) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

2. ServerBootStrap 启动器创建

java
 ServerBootstrap bootstrap = new ServerBootstrap();

bootstrap.group(boosGroup,workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG,128)
        .childOption(ChannelOption.SO_KEEPALIVE,true)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new NettyServerHandler());
            }
        });

源码解析

  • group 添加服务器线程组
java
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup");
    return this;
}
  • channel(NioServerSocketChannel.class)

此时只是通过反射得到了 NioServerSocketChannel 的构造函数,还未正式创建 NioServerSocketChannel

java
public ReflectiveChannelFactory(Class<? extends T> clazz) {
    ObjectUtil.checkNotNull(clazz, "clazz");
    try {
        this.constructor = clazz.getConstructor();
    } catch (NoSuchMethodException e) {
        throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
                " does not have a public non-arg constructor", e);
    }
}
  • option()

option 方法传入 TCP 参数,放在一个 LinkedHashMap 中

java
public <T> B option(ChannelOption<T> option, T value) {
    ObjectUtil.checkNotNull(option, "option");
    synchronized (options) {
        if (value == null) {
            options.remove(option);
        } else {
            options.put(option, value);
        }
    }
    return self();
}
  • childHandler()

传入 handler ,这个 handler 将会在每个客户端连接时进行调用,供 SocketChannel 使用

java
private volatile ChannelHandler childHandler;

public ServerBootstrap childHandler(ChannelHandler childHandler) {
    this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler");
    return this;
}

3. 端口绑定

java
ChannelFuture f = bootstrap.bind(port).sync();

源码分析

  • bind()

bind() 入口调用 doBind() 方法

java
// ServerBootstrap.java
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}

public ChannelFuture bind(SocketAddress localAddress) {
    // 参数校验后进入核心逻辑
    return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress"));
}

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister(); // 🌟 重点方法
    // ...
}
  • initAndRegister()

initAndRegister() 是一个核心方法,它会创建 Channel 实例,初始化配置,并注册到 EventLoopGroup

java
// AbstractBootstrap.java
final ChannelFuture initAndRegister() {
    // Step1: 通过反射创建 Channel 实例
    // 通过 channel(NioServerSocketChannel.class) 得到的 channelFactory
    // 最终创建 NioServerSocketChannel
    Channel channel = channelFactory.newChannel(); 

    // Step2: 初始化 Channel 配置
    init(channel); // 🌟 扩展点为 ServerBootstrap 实现的 init 方法

    // Step3: 注册到 EventLoopGroup
    ChannelFuture regFuture = config().group().register(channel);
    return regFuture;
}
  • init(channel) 方法细节

ServerBootstrapAcceptor 负责:

  • 接收新连接(channelRead() 事件处理)
  • 将新连接注册到 childGroup(即 worker 线程组)
  • 应用开发者配置的 childHandler
java
// ServerBootstrap.java
void init(Channel channel) {
    // 设置 ChannelOptions 和 Attributes
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
    final Collection<ChannelInitializerExtension> extensions = getInitializerExtensions();

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            // 异步初始化 ServerBootstrapAcceptor 接收新连接
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, 
                            currentChildOptions, currentChildAttrs,
                            extensions));
                }
            });
        }
    });
}
  • JKD 层 bind()

最终调用调用 JDK NIO 进行端口绑定

java
// NioServerSocketChannel.java
protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().bind(localAddress, config.getBacklog());
}

//SingleThreadIoEventGroup.java
protected void run() {
    assert inEventLoop();
    //初始化平台相关的 I/O 多路复用器 select
    ioHandler.initialize();
    do {
        runIo();
        if (isShuttingDown()) {
            ioHandler.prepareToDestroy();
        }
        //异步任务处理
        runAllTasks(maxTaskProcessingQuantumNs);
        
    } while (!confirmShutdown() && !canSuspend());
}

服务器启动时序图.png

Netty 接收请求