请求代理
字数: 0 字 时长: 0 分钟
请求代理
前面我们完成了所有准备工作,接下来就是创建代理拦截消费者本地方法调用,构建请求对象,请求服务提供者,将整个流程串起来。
创建一个服务代理类 ServiceProxy
,主要封装了服务调用逻辑:
- 创建代理服务拦截本地服务调用
- 封装请求对象
- 查询注册中心,获取远程服务提供者信息
- 根据负载均衡策略,选择一个远程服务提供者实例发送请求
java
public class ServiceProxy implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//服务名
String serviceName = method.getDeclaringClass().getName();
RpcRequest rpcRequest = RpcRequest.builder()
.requestId(IdUtil.getSnowflakeNextId())
.serviceName(serviceName)
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();
// 从注册中心缓存中获取服务提供者请求地址
List<ServiceMetaInfo> metaInfos = RegistryServiceCache.getInstance().readCache(serviceName);
if (metaInfos.isEmpty()) {
throw new RuntimeException("服务实例不存在 : " + serviceName);
}
//同一服务可能包含多个实例,需要选择负载均衡策略
LoadBalancer loadBalancer = LoadBalancerFactory.getInstance(RpcApplication.getRpcConfig().getLoadBalancer());
// 将调用方法名(请求路径)作为负载均衡参数
Map<String,Object> requestParams = new HashMap<>();
requestParams.put("methodName",method.getName());
ServiceMetaInfo serviceMetaInfo = loadBalancer.select(requestParams, metaInfos);
System.out.println("调用节点:" + serviceMetaInfo.getNodeKey());
// rpc 请求
InetSocketAddress address =
new InetSocketAddress(serviceMetaInfo.getServiceHost(), serviceMetaInfo.getServicePort());
CompletableFuture<RpcResponse> future = NettyTcpClient.sendRequest(address, rpcRequest);
try {
RpcResponse rpcResponse = future.get();
return rpcResponse.getData();
}catch (Exception e) {
// todo 这里也可以扩展重试策略,我这里简单地直接重试一次
// 进行一次重试
System.out.println("开始重试");
try {
// 尝试重新访问并返回结果
CompletableFuture<RpcResponse> retryFuture = NettyTcpClient.sendRequest(address, rpcRequest);
return retryFuture.get().getData();
}catch (Exception ex) {
System.out.println("重试异常,容错响应");
// todo 这里可以自定义自己的容错方式,我这里直接将异常信息抛给客户端
throw new RuntimeException("调用失败",ex);
}
}
}
}
负载均衡
和序列化器类似,内置多种负载均衡算法,并且支持 SPI 扩展
这里以轮询负载均衡为例:
java
/**
* 轮询负载均衡器
*/
public class RoundRobinLoadBalancer implements LoadBalancer {
/**
* 当前轮询的下标
*/
private final AtomicInteger currentIndex = new AtomicInteger(0);
@Override
public ServiceMetaInfo select(Map<String, Object> requestParams,
List<ServiceMetaInfo> serviceMetaInfoList) {
if (serviceMetaInfoList.isEmpty()) {
return null;
}
//只有一个服务提供者实例时,直接返回
int size = serviceMetaInfoList.size();
if (size == 1) {
return serviceMetaInfoList.getFirst();
}
//取模算法轮询
int index = currentIndex.getAndIncrement() % size;
return serviceMetaInfoList.get(index);
}
}
重试与容错
代理发送请求给远程服务提供者,如果出现网络异常导致响应超时,那么应该支持自动的重试机制;如果远程服务提供者响应异常,那么应该支持容错机制。
在我的代码中,我象征超时自动重试一次,就没有具体实现重试和容错机制了(又是很大的工作量,意思明白即可)
java
try {
RpcResponse rpcResponse = future.get();
return rpcResponse.getData();
}catch (Exception e) {
// todo 这里也可以扩展重试策略,我这里简单地直接重试一次
// 进行一次重试
System.out.println("开始重试");
try {
// 尝试重新访问并返回结果
CompletableFuture<RpcResponse> retryFuture = NettyTcpClient.sendRequest(address, rpcRequest);
return retryFuture.get().getData();
}catch (Exception ex) {
System.out.println("重试异常,容错响应");
// todo 这里可以自定义自己的容错方式,我这里直接将异常信息抛给客户端
throw new RuntimeException("调用失败",ex);
}
}
服务提供者启动类
服务提供者启动类主要逻辑:
- 初始化 RPC 框架配置
- 注册服务信息到注册中心
- 注册服务实例到本地注册器
- 启动 Netty 服务端
java
/**
* 服务提供者启动类
*/
public class ProviderBootstrap {
public static void init(List<ServiceRegisterInfo<?>> serviceRegisterInfoList) {
// RPC 框架初始化 (配置和注册中心)
RpcApplication.init();
// 全局配置
final RpcConfig rpcConfig = RpcApplication.getRpcConfig();
// 注册服务
for (ServiceRegisterInfo<?> serviceRegisterInfo : serviceRegisterInfoList) {
String serviceName = serviceRegisterInfo.getServiceName();
LocalRegistry.register(serviceName,serviceRegisterInfo.getImplClass());
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
ServiceMetaInfo serviceMetaInfo = ServiceMetaInfo.builder()
.serviceName(serviceName)
.serviceHost(rpcConfig.getServerHost())
.servicePort(rpcConfig.getServerPort())
.build();
try {
registry.register(serviceMetaInfo);
}catch (Exception e) {
throw new RuntimeException(e);
}
//启动 RPC 服务器
NettyTcpServer nettyTcpServer = new NettyTcpServer();
nettyTcpServer.doStart(rpcConfig.getServerPort());
}
}
}
服务消费者启动类
服务消费者启动类不涉及服务注册,只需要初始化配置即可(全局配置和注册中心连接信息)
java
public class ConsumerBootstrap {
public static void init() {
// RPC 框架初始化 (配置和注册中心)
RpcApplication.init();
}
}
测试
- 消费者
只需要配置 application.properties
和创建 ConsumerExample
类即可
启动消费者测试类
java
public class ConsumerExample {
public static void main(String[] args) {
ConsumerBootstrap.init();
UserService userService = ServiceProxyFactory.getProxy(UserService.class);
User user = new User();
user.setName("甜甜");
//调用
User newUser = userService.getUser(user);
if (newUser != null) {
System.out.println("远程调用结果: " + newUser.getName());
}else {
System.out.println("获取用户失败");
}
}
}
- 提供者
服务提供者需要提供 UserServiceImpl
实现类
启动提供者测试类
java
public class ProviderExample {
public static void main(String[] args) {
// 注册服务
List<ServiceRegisterInfo<?>> serviceRegisterInfoList = new ArrayList<>();
ServiceRegisterInfo<?> serviceRegisterInfo =
new ServiceRegisterInfo<>(UserService.class.getName(),UserServiceImpl.class);
serviceRegisterInfoList.add(serviceRegisterInfo);
// 服务提供者初始化
ProviderBootstrap.init(serviceRegisterInfoList);
}
}
- 测试结果:远程调用成功