网络服务器
字数: 0 字 时长: 0 分钟
需求分析
在使用 Spring Cloud 相关组件开发微服务系统时,服务间通信使用 OpenFeign ,其底层网络传输使用 HTTP 协议。初次接触到 RPC 的概念时,我混淆地认为 RPC 和 OpenFeign 差不多(都是服务间远程调用嘛),但其实 HTTP 只是 RPC 框架网络传输地一种可选方式。
HTTP 协议中的请求、响应格式比较重,会影响网络传输性能,因此一般 RPC 框架会自定义网络传输协议。我选择使用 Netty 来自定义实现 RPC 协议。
方案设计
1. 网络传输设计
HTTP 协议头信息比较大,会影响传输性能;而且 HTTP 本身属于无状态协议,每次 HTTP 请求都是独立的,每次请求/响应都要重新建立和关闭连接,会影响性能。
因此,一般 RPC 框架会选择 TCP 协议作为网络传输协议。
2. 消息结构设计
消息结构设计要尽可能的轻量化,用最少的空间传递需要的信息,有利于提高网络传输性能。
最少的空间:就要尽可能选择更轻量的类型,比如
byte
类型就只占用一个字节,相比int
就要更清理化。需要的信息:也就是仅传递必要的信息,参照 HTTP 请求结构如下:
- 魔数:作用是安全校验,标志这个消息是否为我们自定义的 RPC 协议消息
- 版本号:保证请求和响应的版本一致性
- 序列化方式:告诉服务端和客户端如何解析数据(类似 HTTP 的
Content-Type
内容类型) - 类型:标识是请求、响应或者是心跳检测等其他用途(类似 HTTP 有请求头和响应头)
- 状态:如果是响应,记录响应的结果(类似 HTTP 的 200 状态码)
- 请求 id:唯一标识某个请求,因为 TCP 是双向通信的,需要唯一标识来追踪每个请求
- 请求体数据长度:标识请求体数据的长度,保证能完整的获取请求体数据(涉及半包/粘包问题)
- 请求体数据:实际传输的数据
代码实现
自定义协议
将协议相关类放到 protocol
包下
ProtocolMessage
协议消息结构类
ProtocolMessage
定义了消息头和消息体,消息头包含了上面提到的前 7
个部分,一共 18
个字节
/**
* 协议消息结构
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage<T> {
/**
* 消息头
*/
private Header header;
/**
* 消息体
*/
private T body;
/**
* 协议消息头
*/
@Data
public static class Header {
/**
* 魔数,校验安全
*/
private short magic;
/**
* 版本号
*/
private byte version;
/**
* 序列化器
*/
private byte serializer;
/**
* 消息类型 (请求、响应)
*/
private byte type;
/**
* 状态
*/
private byte status;
/**
* 请求 id
*/
private long requestId;
/**
* 消息体长度
*/
private int bodyLength;
}
}
- 定义协议相关常量枚举类
/**
* 协议常量
*/
public interface ProtocolConstant {
/**
* 消息头长度
*/
int MESSAGE_HEADER_LENGTH = 18;
/**
* 协议魔数
*/
short PROTOCOL_MAGIC = 0x1;
/**
* 协议版本号
*/
byte PROTOCOL_VERSION = 0x1;
}
/**
* 协议消息的序列化器枚举
*/
@Getter
public enum ProtocolMessageSerializerEnum {
JDK(0,"jdk"),
JSON(1,"json"),
KRYO(2,"kryo"),
HESSIAN(3,"hessian");
private final int key;
private final String value;
ProtocolMessageSerializerEnum(int key, String value) {
this.key = key;
this.value = value;
}
/**
* 获取值列表
* @return
*/
public static List<String> getValues() {
return Arrays.stream(values())
.map(item -> item.value)
.toList();
}
public static ProtocolMessageSerializerEnum getEnumByKey(int key) {
for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
if (anEnum.key == key) {
return anEnum;
}
}
return null;
}
/**
* 根据 value 获取枚举
* @param value
* @return
*/
public static ProtocolMessageSerializerEnum getEnumByValue(String value) {
if (ObjectUtil.isEmpty(value)) {
return null;
}
for (ProtocolMessageSerializerEnum anEnum : ProtocolMessageSerializerEnum.values()) {
if (anEnum.value.equals(value)) {
return anEnum;
}
}
return null;
}
}
@Getter
public enum ProtocolMessageStatusEnum {
OK("ok",20),
BAD_REQUEST("badRequest",40),
BAD_RESPONSE("badResponse",50);
private final String text;
private final int value;
ProtocolMessageStatusEnum(String text,int value) {
this.text = text;
this.value = value;
}
/**
* 根据 value 获取枚举
* @param value
* @return
*/
public static ProtocolMessageStatusEnum getEnumByValue(int value) {
for (ProtocolMessageStatusEnum anEnum : ProtocolMessageStatusEnum.values()) {
if (anEnum.value == value) {
return anEnum;
}
}
return null;
}
}
@Getter
public enum ProtocolMessageTypeEnum {
REQUEST(0),
RESPONSE(1),
HEART_BEAT(2),
OTHERS(3);
private final int key;
ProtocolMessageTypeEnum(int key) {
this.key = key;
}
public static ProtocolMessageTypeEnum getEnumByKey(int key) {
for (ProtocolMessageTypeEnum anEnum : ProtocolMessageTypeEnum.values()) {
if (anEnum.key == key){
return anEnum;
}
}
return null;
}
}
请求/响应 对象
RpcRequest
请求对象
Rpc 请求对象封装了服务名、方法名、参数类型列表、参数列表、请求id 等信息
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcRequest implements Serializable {
/**
* 服务名称
*/
private String serviceName;
/**
* 方法名称
*/
private String methodName;
/**
* 服务版本
*/
private String serviceVersion = RpcConstant.DEFAULT_SERVICE_VERSION;
/**
* 参数类型列表
*/
private Class<?>[] parameterTypes;
/**
* 参数列表
*/
private Object[] args;
/**
* 请求 Id
*/
private long requestId;
}
RpcResponse
响应对象
响应对象封装了返回值、异常信息、请求id 等信息
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RpcResponse implements Serializable {
/**
* 响应数据
*/
private Object data;
/**
* 响应数据类型 (预留)
*/
private Class<?> dataType;
/**
* 响应信息
*/
private String message;
/**
* 异常信息
*/
private Exception exception;
private Long requestId;
}
编码器/解码器
消费者构建请求对象后,需要把请求头构建为协议格式,再经过网络传输到服务端,服务端经过协议解析,获取到请求体对象,再进行处理
这个过程需要根据我们的协议定义编码器和解码器
ProtocolMessageEncoder
编码器
编码器主要根据协议请求头中的序列化方式,将传过来的协议对象序列化为字节流通过网络发送出去
public class ProtocolMessageEncoder extends MessageToByteEncoder<ProtocolMessage<?>> {
@Override
protected void encode(ChannelHandlerContext ctx,
ProtocolMessage msg, ByteBuf out) throws Exception {
// 根据协议头获取序列化方式
ProtocolMessage.Header header = msg.getHeader();
ProtocolMessageSerializerEnum serializerEnum =
ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
throw new RuntimeException("序列化协议不存在");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
byte[] bodyBytes = serializer.serialize(msg.getBody());
// 回填实际 body 长度 (必须完成序列化后才能计算)
header.setBodyLength(bodyBytes.length);
// 协议头写入
out.writeShort(header.getMagic()) //2字节
.writeByte(header.getVersion()) //1字节
.writeByte(header.getSerializer()) //1字节
.writeByte(header.getType()) //1字节
.writeByte(header.getStatus()) //1字节
.writeLong(header.getRequestId()) //8字节
.writeInt(header.getBodyLength()); //4字节
// 协议体写入
out.writeBytes(bodyBytes);
}
}
ProtocolMessageDecoder
解码器
解码器主要是根据协议规范解析出业务数据(请求体内容),传递给下游处理器进行处理
public class ProtocolMessageDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 此处不再需要处理半包、粘包的问题,由 LengthFieldBasedFrameDecoder 保证完整性
// 读取协议头部信息
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(in.readShort());
header.setVersion(in.readByte());
header.setSerializer(in.readByte());
header.setType(in.readByte());
header.setStatus(in.readByte());
header.setRequestId(in.readLong());
header.setBodyLength(in.readInt());
// 读取协议体
byte[] bodyBytes = new byte[header.getBodyLength()];
in.readBytes(bodyBytes);
//反序列化业务数据
ProtocolMessageSerializerEnum serializerEnum =
ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
throw new RuntimeException("序列化消息的协议不存在");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnumByKey(header.getType());
//选择响应方式
switch (Objects.requireNonNull(messageTypeEnum)) {
case REQUEST -> {
RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class);
out.add(new ProtocolMessage<>(header, request));
}
case RESPONSE -> {
RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class);
out.add(new ProtocolMessage<>(header, response));
}
case HEART_BEAT -> {
out.add(new ProtocolMessage<>(header,new byte[0]));
}
default -> throw new RuntimeException("无效的消息类型");
}
}
}
请求处理器
Netty 中的请求处理器是入站处理器中的一种,解码器解析出业务数据后会传递给请求处理器进行处理。比如:
- 如果是心跳请求,则直接返回响应即可(表示正常存活)
- 如果是业务请求则根据请求体信息从本地注册器中找到具体的服务实例进行处理后响应处理结果
@Slf4j
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {
public NettyTcpServerHandler() {
System.out.println("NettyServerHandler 初始化完毕");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务端连接建立成功");
}
@SuppressWarnings("unchecked")
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof ProtocolMessage<?> protocolMessage)) {
log.info("收到未知协议消息 {}",msg.getClass());
ReferenceCountUtil.release(msg);
return;
}
byte type = protocolMessage.getHeader().getType();
if (ProtocolMessageTypeEnum.HEART_BEAT.getKey() == type) {
log.info("服务端收到心跳包");
handleHeartbeat(ctx, protocolMessage);
}
if (ProtocolMessageTypeEnum.REQUEST.getKey() == type) {
handleRequest(ctx,(ProtocolMessage<RpcRequest>) msg);
}
}
private void handleRequest(ChannelHandlerContext ctx, ProtocolMessage<RpcRequest> msg) throws Exception{
RpcRequest rpcRequest = msg.getBody();
ProtocolMessage.Header requestHeader = msg.getHeader();
//构建响应信息协议头
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(requestHeader.getMagic());
header.setVersion(requestHeader.getVersion());
header.setSerializer(requestHeader.getSerializer());
header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
header.setStatus(requestHeader.getStatus());
header.setRequestId(requestHeader.getRequestId());
if (rpcRequest == null) {
log.info("rpcRequest 为空");
ReferenceCountUtil.release(msg);
}else {
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.getDeclaredConstructor().newInstance(), rpcRequest.getArgs());
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("success");
rpcResponse.setRequestId(header.getRequestId());
ctx.channel().writeAndFlush(new ProtocolMessage<>(header, rpcResponse));
}
}
/**
* 响应心跳包 (无需构建 body)
*/
private void handleHeartbeat(ChannelHandlerContext ctx, ProtocolMessage<?> msg) throws Exception{
//构建响应信息协议头
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
// 序列化器和接收心跳包保证一致即可
header.setSerializer(msg.getHeader().getSerializer());
header.setType((byte) ProtocolMessageTypeEnum.HEART_BEAT.getKey());
header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue());
header.setRequestId(msg.getHeader().getRequestId());
ctx.channel().writeAndFlush(new ProtocolMessage<>(header, new byte[0]));
}
}
Netty 服务器
定义好了编码器、解码器、请求处理器后就可以创建 Netty 服务器了,注意需要使用 LengthFieldBasedFrameDecoder
解决粘包拆包问题
什么是粘包拆包问题
理想情况下,我们客户端发送多次数据,每次的数据都为 Hello,Server!Hello,Server!,Hello,Server!
但服务端收到的数据可能多了(粘包);也可能少了(拆包):
Netty 提供了LengthFieldBasedFrameDecoder
判断消息长度符合协议头中的数据体长度时才读取,避免了粘包拆包问题:
public class NettyTcpServer implements HttpServer {
@Override
public void doStart(int port) {
MultiThreadIoEventLoopGroup bossGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
MultiThreadIoEventLoopGroup workGroup = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// ===== 入站处理器,执行顺序从上而下 =====
// 1. 解决粘包拆包问题
pipeline.addLast(new LengthFieldBasedFrameDecoder(
Integer.MAX_VALUE,
14,
4
));
// 2. 协议解码
pipeline.addLast(new ProtocolMessageDecoder());
// 3. 业务处理
pipeline.addLast(new NettyTcpServerHandler());
// ===== 出站处理器(执行顺序从下到上) =====
// 协议编码
pipeline.addLast(new ProtocolMessageEncoder());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture bindFuture = serverBootstrap.bind(port).sync();
System.out.println("Netty TCP 服务器已启动,正在监听 " + port + "端口");
bindFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
Netty 客户端
Netty 客户端代码比较多,主要是包含了业务处理和心跳检测部分代码,具体如下:
public class NettyTcpClient {
// 读取超时时间(毫秒)
private static final int READ_TIMEOUT_MS = 3000;
// 心跳间隔 (秒)
private static final int HEARTBEAT_INTERVAL = 30;
// Netty 工作线程组,处理 IO 事件
private static final EventLoopGroup group = new MultiThreadIoEventLoopGroup(NioIoHandler.newFactory());
// 连接池
private static final Map<InetSocketAddress,Channel> channelMap = new ConcurrentHashMap<>();
// 待处理的请求
private static final Map<Long,CompletableFuture<RpcResponse>> pendingMap =
new ConcurrentHashMap<>();
// 心跳处理器 (检测心跳包,自动发送心跳包)
private static class HeartbeatHandler extends ChannelDuplexHandler {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent event &&
event.state() == IdleState.WRITER_IDLE) {
// 构建 ProtocolMessage 心跳包,不需要 body
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) Objects.requireNonNull(ProtocolMessageSerializerEnum.getEnumByValue(
RpcApplication.getRpcConfig().getSerializer())).getKey());
header.setType((byte) ProtocolMessageTypeEnum.HEART_BEAT.getKey());
header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue());
header.setRequestId(IdUtil.getSnowflakeNextId());
ProtocolMessage<Object> protocolMessage = new ProtocolMessage<>(header, null);
ctx.channel().writeAndFlush(protocolMessage)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}
}
private static Channel getChannel(InetSocketAddress address) throws Exception {
return channelMap.compute(address,(key,existingChannel) -> {
if (existingChannel != null && existingChannel.isActive()) {
return existingChannel; //复用已有连接
}
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//心跳检测
pipeline.addLast(new IdleStateHandler(
0,HEARTBEAT_INTERVAL,0,TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
// ===== 出站处理器链 =====
// 编码器
pipeline.addLast(new ProtocolMessageEncoder());
// ===== 入站处理器链 =====
// 解决粘包拆包问题
pipeline.addLast(new LengthFieldBasedFrameDecoder(
Integer.MAX_VALUE,
14,
4
));
// 解码器
pipeline.addLast(new ProtocolMessageDecoder());
// 业务处理
pipeline.addLast(new ResponseHandler());
}
});
Channel channel = bootstrap.connect(address).sync().channel();
//监听连接关闭事件,自动清理失效 Channel
channel.closeFuture().addListener(future -> {
channelMap.remove(address, channel);
});
return channel;
}catch (Exception e) {
throw new IllegalStateException("netty 连接失败: " + address,e);
}
});
}
/**
* 响应处理器 (处理服务端返回的 RpcResponse)
*/
private static class ResponseHandler extends SimpleChannelInboundHandler<ProtocolMessage<?>> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ProtocolMessage protocolMessage) throws Exception {
if ((byte) ProtocolMessageTypeEnum.RESPONSE.getKey() ==
protocolMessage.getHeader().getType()) {
//如果是服务端响应对象
RpcResponse response = (RpcResponse) protocolMessage.getBody();
CompletableFuture<RpcResponse> future = pendingMap.remove(response.getRequestId());
if (future != null) {
// 正常完成,触发 Future 回调
future.complete(response);
}
}else {
System.out.println("不是正常响应对象");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
// 异常通知所有 pending 请求
pendingMap.values().forEach(f -> f.completeExceptionally(cause));
pendingMap.clear();
}
}
/**
* 发送 Rpc 请求
* @param address 目标地址
* @param request Rpc 请求
*/
public static CompletableFuture<RpcResponse> sendRequest(InetSocketAddress address,RpcRequest request) {
// 构建协议头
ProtocolMessage.Header header = new ProtocolMessage.Header();
header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) Objects.requireNonNull(ProtocolMessageSerializerEnum.getEnumByValue(
RpcApplication.getRpcConfig().getSerializer())).getKey());
header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue());
header.setRequestId(request.getRequestId());
//构造 ProtocolMessage
ProtocolMessage<RpcRequest> protocolMessage = new ProtocolMessage<>(header,request);
CompletableFuture<RpcResponse> future = new CompletableFuture<>();
pendingMap.put(request.getRequestId(),future);
// 在 Netty IO 线程中异步获取 Channel 并发送
group.next().execute(() -> {
try {
Channel channel = getChannel(address);
// 发送请求并监听结果
ChannelFuture writeFuture = channel.writeAndFlush(protocolMessage);
writeFuture.addListener((ChannelFutureListener)f -> {
if (!f.isSuccess()) {
future.completeExceptionally(f.cause());
pendingMap.remove(request.getRequestId());
}
});
//全局超时兜底
group.schedule(() -> {
if (!future.isDone()) {
future.completeExceptionally(new TimeoutException("netty 请求超时"));
pendingMap.remove(request.getRequestId());
}
},READ_TIMEOUT_MS , TimeUnit.MILLISECONDS);
}catch (Exception e) {
future.completeExceptionally(e);
pendingMap.remove(request.getRequestId());
}
});
return future;
}
}