Skip to content

请求代理

字数: 0 字 时长: 0 分钟

请求代理

前面我们完成了所有准备工作,接下来就是创建代理拦截消费者本地方法调用,构建请求对象,请求服务提供者,将整个流程串起来。

创建一个服务代理类 ServiceProxy,主要封装了服务调用逻辑:

  1. 创建代理服务拦截本地服务调用
  2. 封装请求对象
  3. 查询注册中心,获取远程服务提供者信息
  4. 根据负载均衡策略,选择一个远程服务提供者实例发送请求
java
public class ServiceProxy  implements InvocationHandler {

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        //服务名
        String serviceName = method.getDeclaringClass().getName();

        RpcRequest rpcRequest = RpcRequest.builder()
                .requestId(IdUtil.getSnowflakeNextId())
                .serviceName(serviceName)
                .methodName(method.getName())
                .parameterTypes(method.getParameterTypes())
                .args(args)
                .build();

        // 从注册中心缓存中获取服务提供者请求地址
        List<ServiceMetaInfo> metaInfos = RegistryServiceCache.getInstance().readCache(serviceName);

        if (metaInfos.isEmpty()) {
            throw new RuntimeException("服务实例不存在 : " + serviceName);
        }

        //同一服务可能包含多个实例,需要选择负载均衡策略
        LoadBalancer loadBalancer = LoadBalancerFactory.getInstance(RpcApplication.getRpcConfig().getLoadBalancer());
        // 将调用方法名(请求路径)作为负载均衡参数
        Map<String,Object> requestParams = new HashMap<>();
        requestParams.put("methodName",method.getName());
        ServiceMetaInfo serviceMetaInfo = loadBalancer.select(requestParams, metaInfos);
        System.out.println("调用节点:" + serviceMetaInfo.getNodeKey());

        // rpc 请求
        InetSocketAddress address =
                new InetSocketAddress(serviceMetaInfo.getServiceHost(), serviceMetaInfo.getServicePort());

        CompletableFuture<RpcResponse> future = NettyTcpClient.sendRequest(address, rpcRequest);

        try {
            RpcResponse rpcResponse = future.get();
            return rpcResponse.getData();
        }catch (Exception e) {
            // todo 这里也可以扩展重试策略,我这里简单地直接重试一次
            // 进行一次重试
            System.out.println("开始重试");
            try {
                // 尝试重新访问并返回结果
                CompletableFuture<RpcResponse> retryFuture = NettyTcpClient.sendRequest(address, rpcRequest);
                return retryFuture.get().getData();
            }catch (Exception ex) {
                System.out.println("重试异常,容错响应");
                // todo 这里可以自定义自己的容错方式,我这里直接将异常信息抛给客户端
                throw new RuntimeException("调用失败",ex);
            }
        }
    }

}

负载均衡

和序列化器类似,内置多种负载均衡算法,并且支持 SPI 扩展

balance.webp

这里以轮询负载均衡为例:

java
/**
 * 轮询负载均衡器
 */
public class RoundRobinLoadBalancer implements LoadBalancer {

    /**
     * 当前轮询的下标
     */
    private final AtomicInteger currentIndex = new AtomicInteger(0);

    @Override
    public ServiceMetaInfo select(Map<String, Object> requestParams,
                                  List<ServiceMetaInfo> serviceMetaInfoList) {
        if (serviceMetaInfoList.isEmpty()) {
            return null;
        }

        //只有一个服务提供者实例时,直接返回
        int size = serviceMetaInfoList.size();
        if (size == 1) {
            return serviceMetaInfoList.getFirst();
        }

        //取模算法轮询
        int index = currentIndex.getAndIncrement() % size;
        return serviceMetaInfoList.get(index);
    }

}

重试与容错

代理发送请求给远程服务提供者,如果出现网络异常导致响应超时,那么应该支持自动的重试机制;如果远程服务提供者响应异常,那么应该支持容错机制。

在我的代码中,我象征超时自动重试一次,就没有具体实现重试和容错机制了(又是很大的工作量,意思明白即可)

java
try {
    RpcResponse rpcResponse = future.get();
    return rpcResponse.getData();
}catch (Exception e) {
    // todo 这里也可以扩展重试策略,我这里简单地直接重试一次
    // 进行一次重试
    System.out.println("开始重试");
    try {
        // 尝试重新访问并返回结果
        CompletableFuture<RpcResponse> retryFuture = NettyTcpClient.sendRequest(address, rpcRequest);
        return retryFuture.get().getData();
    }catch (Exception ex) {
        System.out.println("重试异常,容错响应");
        // todo 这里可以自定义自己的容错方式,我这里直接将异常信息抛给客户端
        throw new RuntimeException("调用失败",ex);
    }
}

服务提供者启动类

服务提供者启动类主要逻辑:

  1. 初始化 RPC 框架配置
  2. 注册服务信息到注册中心
  3. 注册服务实例到本地注册器
  4. 启动 Netty 服务端
java
/**
 * 服务提供者启动类
 */
public class ProviderBootstrap {

    public static void init(List<ServiceRegisterInfo<?>> serviceRegisterInfoList) {
        // RPC 框架初始化 (配置和注册中心)
        RpcApplication.init();

        // 全局配置
        final RpcConfig rpcConfig = RpcApplication.getRpcConfig();

        // 注册服务
        for (ServiceRegisterInfo<?> serviceRegisterInfo : serviceRegisterInfoList) {
            String serviceName = serviceRegisterInfo.getServiceName();

            LocalRegistry.register(serviceName,serviceRegisterInfo.getImplClass());

            RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
            Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
            ServiceMetaInfo serviceMetaInfo = ServiceMetaInfo.builder()
                    .serviceName(serviceName)
                    .serviceHost(rpcConfig.getServerHost())
                    .servicePort(rpcConfig.getServerPort())
                    .build();

            try {
                registry.register(serviceMetaInfo);
            }catch (Exception e) {
                throw new RuntimeException(e);
            }

            //启动 RPC 服务器
            NettyTcpServer nettyTcpServer = new NettyTcpServer();
            nettyTcpServer.doStart(rpcConfig.getServerPort());
        }
        
    }
    
}

服务消费者启动类

服务消费者启动类不涉及服务注册,只需要初始化配置即可(全局配置和注册中心连接信息)

java
public class ConsumerBootstrap {

    public static void init() {
        // RPC 框架初始化 (配置和注册中心)
        RpcApplication.init();
    }

}

测试

  • 消费者

只需要配置 application.properties 和创建 ConsumerExample 类即可

test1.webp

启动消费者测试类

5
java
public class ConsumerExample {

    public static void main(String[] args) {
        ConsumerBootstrap.init();
        UserService userService = ServiceProxyFactory.getProxy(UserService.class);
        User user = new User();
        user.setName("甜甜");

        //调用
        User newUser = userService.getUser(user);
        if (newUser != null) {
            System.out.println("远程调用结果: " + newUser.getName());
        }else {
            System.out.println("获取用户失败");
        }

    }
    
}
  • 提供者

服务提供者需要提供 UserServiceImpl 实现类

test2.webp

启动提供者测试类

java
public class ProviderExample {

    public static void main(String[] args) {

        // 注册服务
        List<ServiceRegisterInfo<?>> serviceRegisterInfoList = new ArrayList<>();
        ServiceRegisterInfo<?> serviceRegisterInfo =
                new ServiceRegisterInfo<>(UserService.class.getName(),UserServiceImpl.class);

        serviceRegisterInfoList.add(serviceRegisterInfo);

        // 服务提供者初始化
        ProviderBootstrap.init(serviceRegisterInfoList);
    }

}
  • 测试结果:远程调用成功

test3.webp