注册中心
字数: 0 字 时长: 0 分钟
需求分析

消费者要发请求调用提供者,必须得先知道服务提供者的地址信息吧,那么服务提供者在启动时需要先吧服务信息注册到注册中心,消费者调用服务之前去注册中心获取服务信息。
注册中心是分布式系统的核心组件,主流的注册中心中间件有 Zookeeper、Nacos等,我为了实战 Redis,选择使用 Redis 来实现注册中心。
注册中心的核心功能是服务注册、服务发现,在具体实现中还有哪些点需要注意呢?
- 心跳检测和续期机制
服务提供者 A 在启动时将实例信息注册到注册中心,但在运行过程中A 宕机了,如果没有心跳检查和续期机制,注册中心会认为 A 还是正常运行状态,消费者发请求给 A 就 GG 了
因此注册中心需要定期检查服务实例状态,及时剔除掉异常服务实例
- 缓存机制
关于 Nacos 有道面试题:如果注册中心宕机,远程调用是否可用?
我们的第一反应是,注册中心都宕机了,那么消费者根本不知道服务提供者信息了,肯定不能远程调用了。但实际上不一定,因为 Nacos 有缓存机制:
消费者第一次访问服务提供者时,会先去注册中心获取服务信息,然后会缓存在本地,后续再调用相同的服务时就不必再去注册中心获取服务信息了,直接从本地缓存中获取,性能更高。
代码实现
参照序列化器模块,注册中心也可以保持对 SPI 的支持,我目前只实现了基于 Redis 的实现

注册中心配置
- 定义
RegistryConfig类,存储注册中心连接信息
/**
* RPC 框架注册中心配置
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RegistryConfig {
/**
* 注册中心类别
*/
private String registry = "redis";
/**
* 注册中心 IP
*/
private String host;
/**
* 注册中心 端口
*/
private Integer port;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* 超时时间 (毫秒)
*/
private int timeout = 10000;
}- 提供者和消费者在配置文件中需要配置相同的注册中心信息
application.properties中添加注册中心连接信息
rpc.registryConfig.registry=redis
rpc.registryConfig.host=x.x.x.x
rpc.registryConfig.port=6379
rpc.registryConfig.username=redis
rpc.registryConfig.password=123456之前的全局配置入口类
RpcApplication添加注册中心初始化
/**
* 框架初始化,支持传入自定义配置
* @param newRpcConfig
*/
public static void init(RpcConfig newRpcConfig) {
rpcConfig = newRpcConfig;
log.info("RpcApplication init...,config = {}",newRpcConfig.toString());
//注册中心初始化
RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
registry.init(registryConfig);
log.info("Registry init success,config = {}",registryConfig);
}Redis 注册中心
- 引入依赖
由于我的功能实现比较基础,追求轻量化,这里选择 Jedis 作为 Redis 客户端
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.4.3</version>
</dependency>- 编写
ServiceMetaInfo类
这个类用来存放服务器元信息,包含服务名称、服务提供者地址、端口等信息,也就是最终存放到 Redis 中的 value 值
/**
* 服务器元信息
*/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ServiceMetaInfo {
/**
* 服务名称
*/
private String serviceName;
/**
* 服务域名
*/
private String serviceHost;
/**
* 服务端口号
*/
private Integer servicePort;
/**
* 服务分组 (暂未实现)
*/
private String serviceGroup = "default";
/**
* 利用 host + port 组成唯一实例标识
* @return
*/
public String getNodeKey() {
return String.format("%s:%s",serviceHost,servicePort);
}
}- 编写
RedisRegistry类
这个类代码比较多,我实现了服务注册和心跳检测、异常服务剔除等功能满足基本需求,至于服务主动下线、通知机制就没做了。
@Slf4j
public class RedisRegistry implements Registry{
private static final int HEARTBEAT_INTERVAL = 5; //心跳间隔 (秒)
private static final int HEALTH_CHECK_INTERVAL = 10; //健康检查间隔 (秒)
private static final int SERVICE_TIMEOUT = 30; //服务超时时间 (秒)
private static final String SERVICE_ROOT_PATH = "/rpc/service/";
private static final String HEARTBEAT_ROOT_PATH = "/rpc/heartbeat/";
// 标志位防止重复初始化,保证 init() 幂等性
private volatile boolean initialized = false;
private JedisPool jedisPool;
private ScheduledExecutorService heartbeatExecutor;
private ScheduledExecutorService healthCheckExecutor;
private final Set<String> registeredServices = ConcurrentHashMap.newKeySet();
@Override
public synchronized void init(RegistryConfig registryConfig) {
//防止重复初始化
if (initialized) {
return;
}
heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
healthCheckExecutor = Executors.newSingleThreadScheduledExecutor();
JedisPoolConfig poolConfig = new JedisPoolConfig();
poolConfig.setMaxTotal(100); //最大连接数
poolConfig.setMaxIdle(20); //最大空闲连接
poolConfig.setMinIdle(5); //最小空闲连接
poolConfig.setTestOnBorrow(true); //取出连接时校验
jedisPool = new JedisPool(poolConfig, registryConfig.getHost(), registryConfig.getPort(),
registryConfig.getTimeout(),registryConfig.getPassword());
startHealthCheck();
initialized = true;
}
@Override
public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
try (Jedis jedis = jedisPool.getResource()){
// 1. 注册服务元数据 hash 结构
// /rpc/service/serviceName:serviceVersion serviceHost:servicePort {serviceMetaInfo}
String serviceName = serviceMetaInfo.getServiceName();
String instanceKey = serviceMetaInfo.getNodeKey();
String instanceValue = JSONUtil.toJsonStr(serviceMetaInfo);
jedis.hset(SERVICE_ROOT_PATH + serviceName, instanceKey, instanceValue);
// 2. 添加心跳标记
jedis.zadd(HEARTBEAT_ROOT_PATH + serviceName,
(double) System.currentTimeMillis() / 1000,
instanceKey
);
// 本地缓存一份已注册的实例信息,后续只检测本实例注册的服务的存活状态
registeredServices.add(serviceName);
// 3. 定期维护心跳
startHeartbeatTask(serviceMetaInfo);
}
}
@Override
public void unRegister(ServiceMetaInfo serviceMetaInfo) {
try (Jedis jedis = jedisPool.getResource()){
// todo 手动服务下线
}
}
@Override
public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
try (Jedis jedis = jedisPool.getResource()){
// 1. 获取所有健康实例 key
double now = System.currentTimeMillis() / 1000.0;
double minAliveTime = now - SERVICE_TIMEOUT;
String minStr = String.format("%.3f", minAliveTime);
List<String> aliveInstances = jedis.zrangeByScore(HEARTBEAT_ROOT_PATH + serviceKey, minStr, "+inf");
// 2. 获取健康实例元数据
// 如果不存在存活实例,直接返回空集合
if (aliveInstances.isEmpty()) {
return Collections.emptyList();
}
List<String> metadataList =
jedis.hmget(SERVICE_ROOT_PATH + serviceKey, aliveInstances.toArray(new String[0]));
// 3. 解析为对象
return metadataList.stream()
.map(metadata -> JSONUtil.toBean(metadata, ServiceMetaInfo.class))
.toList();
}
}
@Override
public void destroy() {
// 释放资源
if (healthCheckExecutor != null) {
healthCheckExecutor.shutdown();
}
if (heartbeatExecutor != null) {
heartbeatExecutor.shutdown();
}
if ( jedisPool != null) {
jedisPool.close();
}
log.info("RedisRegistry 销毁完成");
}
/**
* 每 5 秒去更新心跳时间
* @param serviceMetaInfo
*/
private void startHeartbeatTask(ServiceMetaInfo serviceMetaInfo) {
heartbeatExecutor.scheduleAtFixedRate(() -> {
try (Jedis jedis = jedisPool.getResource()){
jedis.zadd(HEARTBEAT_ROOT_PATH + serviceMetaInfo.getServiceName(),
(double) System.currentTimeMillis() / 1000,
serviceMetaInfo.getNodeKey()
);
}
},0,HEARTBEAT_INTERVAL,TimeUnit.SECONDS);
}
/**
* 健康检查
*/
private void startHealthCheck() {
healthCheckExecutor.scheduleAtFixedRate(() -> {
//检测所有已注册的服务的存活状态
registeredServices.forEach(this::checkAndCleanExpiredInstances);
},0,HEALTH_CHECK_INTERVAL,TimeUnit.SECONDS);
}
/**
* 当心跳已经 30 秒没更新过了,则说明该实例已经死亡,清理掉下线实例
* @param serviceName
*/
private void checkAndCleanExpiredInstances(String serviceName) {
try (Jedis jedis = jedisPool.getResource()){
String heartBeatKey = HEARTBEAT_ROOT_PATH + serviceName;
double now = System.currentTimeMillis() / 1000.0;
double minAliveTime = now - SERVICE_TIMEOUT;
// 1. 删除旧的心跳数据 (score < minAliveTime 的实例)
jedis.zremrangeByScore(heartBeatKey,0,minAliveTime);
// 2. 获取当前健康实例
String minStr = String.format("%.3f", minAliveTime);
List<String> aliveInstances = jedis.zrangeByScore(heartBeatKey, minStr, "+inf");
// 3. 清理 Hash 中的无效元数据
Set<String> allInstances = jedis.hkeys(SERVICE_ROOT_PATH + serviceName);
for (String instance : allInstances) {
if (!aliveInstances.contains(instance)) {
jedis.hdel(SERVICE_ROOT_PATH + serviceName,instance);
// todo 发布服务下线通知
}
}
}
}
}- Redis 缓存类
public class RegistryServiceCache {
private final Map<String,List<ServiceMetaInfo>> cache = new ConcurrentHashMap<>();
private static volatile RegistryServiceCache instance;
private static Registry registry;
private RegistryServiceCache() {
}
public static RegistryServiceCache getInstance() {
if (instance == null) {
synchronized (RegistryServiceCache.class) {
if (instance == null) {
instance = new RegistryServiceCache();
// 初始化缓存类实例的同时,初始化注册中心
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
}
}
}
return instance;
}
/**
* 写缓存
* @param serviceKey
* @param newServiceCache
*/
public void writeCache(String serviceKey, List<ServiceMetaInfo> newServiceCache) {
//在写缓存时进行防御性拷贝
this.cache.put(serviceKey, new ArrayList<>(newServiceCache));
}
/**
* 读缓存
* @param serviceKey
* @return
*/
public List<ServiceMetaInfo> readCache(String serviceKey) {
// 使用 computeIfAbsent 保证同一时刻只有一个线程执行查询操作
// 防止缓存击穿
return cache.computeIfAbsent(serviceKey,k -> {
List<ServiceMetaInfo> metaInfos = registry.serviceDiscovery(k);
// 处理空值并进行防御性拷贝
return metaInfos == null ? Collections.emptyList() : new ArrayList<>(metaInfos);
});
}
/**
* 清空缓存
* @param serviceKey
*/
public void clearCache(String serviceKey) {
cache.remove(serviceKey);
}
}本地注册器
在第一次写这部分代码时,我也有点疑惑,为什么服务提供者启动时不但要注册服务信息到远程注册中心,也需要注册一份服务信息到本地注册器中。
后来才发现,远程注册中心是服务名和服务具体地址的映射关系(消费者找到提供者);而本地注册器是服务名和服务实现的映射关系(服务提供者找到服务实现类处理消费者请求)

ServiceRegisterInfo类存放服务名和本地实现类的映射关系
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceRegisterInfo<T> {
/**
* 服务名称
*/
private String serviceName;
/**
* 实现类
*/
private Class<? extends T> implClass;
}LocalRegistry本地注册器类
/**
* 本地注册器
*/
public class LocalRegistry {
private static final Map<String,Class<?>> map = new ConcurrentHashMap<>();
public static void register(String serviceName,Class<?> impClass) {
map.put(serviceName, impClass);
}
public static Class<?> get(String serviceName) {
return map.get(serviceName);
}
public static void remove(String serviceName) {
map.remove(serviceName);
}
}Redis 数据结构介绍
服务元数据
我使用的 Redis 中的 Hash 数据类型来存储服务元数据信息,以注册一个 UserService 服务为例:
- key
固定前缀 /rpc/service/ + 服务全类名 cn.ttdgg.example.common.service.UserService
- value
因为同样一个 UserService 服务接口可能有多个实例提供,不同实例位于不同的服务器节点,因此选择使用 hash 结构:filed 为实例唯一标识,对应的 value 为服务元数据
我这里为了简化,没有定义唯一的实例标识,暂时使用 host + port 作为服务实例唯一标识(不考虑同一节点部署多个实例的情况)

心跳检测
Redis 注册中心还要维护心跳检测信息,我选择 Zset 数据类型,以 UserService 服务的心跳检测为例:
- key
固定前缀 /rpc/heartbeat + 服务全类名 cn.ttdgg.example.common.service.UserService
- value
元素为服务实例唯一标识 host + port , score 分数为当前时间戳,心跳检测定期检测这个 score ,如果 30 秒没有更新过了,则认为服务实例已经死亡,需要被清除

