Skip to content

注册中心

字数: 0 字 时长: 0 分钟

需求分析

rc1.webp

消费者要发请求调用提供者,必须得先知道服务提供者的地址信息吧,那么服务提供者在启动时需要先吧服务信息注册到注册中心,消费者调用服务之前去注册中心获取服务信息。

注册中心是分布式系统的核心组件,主流的注册中心中间件有 Zookeeper、Nacos等,我为了实战 Redis,选择使用 Redis 来实现注册中心。

注册中心的核心功能是服务注册服务发现,在具体实现中还有哪些点需要注意呢?

  • 心跳检测和续期机制

服务提供者 A 在启动时将实例信息注册到注册中心,但在运行过程中A 宕机了,如果没有心跳检查和续期机制,注册中心会认为 A 还是正常运行状态,消费者发请求给 A 就 GG 了

因此注册中心需要定期检查服务实例状态,及时剔除掉异常服务实例

  • 缓存机制

关于 Nacos 有道面试题:如果注册中心宕机,远程调用是否可用?

我们的第一反应是,注册中心都宕机了,那么消费者根本不知道服务提供者信息了,肯定不能远程调用了。但实际上不一定,因为 Nacos 有缓存机制:

消费者第一次访问服务提供者时,会先去注册中心获取服务信息,然后会缓存在本地,后续再调用相同的服务时就不必再去注册中心获取服务信息了,直接从本地缓存中获取,性能更高。

代码实现

参照序列化器模块,注册中心也可以保持对 SPI 的支持,我目前只实现了基于 Redis 的实现

rc2.webp

注册中心配置

  • 定义 RegistryConfig 类,存储注册中心连接信息
java
/**
 * RPC 框架注册中心配置
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class RegistryConfig {

    /**
     * 注册中心类别
     */
    private String registry = "redis";

    /**
     * 注册中心 IP
     */
    private String host;

    /**
     * 注册中心 端口
     */
    private Integer port;

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 超时时间 (毫秒)
     */
    private int timeout = 10000;
    
}
  • 提供者和消费者在配置文件中需要配置相同的注册中心信息

application.properties 中添加注册中心连接信息

properties
rpc.registryConfig.registry=redis
rpc.registryConfig.host=x.x.x.x
rpc.registryConfig.port=6379
rpc.registryConfig.username=redis
rpc.registryConfig.password=123456

之前的全局配置入口类 RpcApplication 添加注册中心初始化

java
/**
 * 框架初始化,支持传入自定义配置
 * @param newRpcConfig
 */
public static void init(RpcConfig newRpcConfig) {
    rpcConfig = newRpcConfig;
    log.info("RpcApplication init...,config = {}",newRpcConfig.toString());

    //注册中心初始化
    RegistryConfig registryConfig = rpcConfig.getRegistryConfig();
    Registry registry = RegistryFactory.getInstance(registryConfig.getRegistry());
    registry.init(registryConfig);
    log.info("Registry init success,config = {}",registryConfig);
}

Redis 注册中心

  • 引入依赖

由于我的功能实现比较基础,追求轻量化,这里选择 Jedis 作为 Redis 客户端

xml
<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.4.3</version>
</dependency>
  • 编写 ServiceMetaInfo

这个类用来存放服务器元信息,包含服务名称、服务提供者地址、端口等信息,也就是最终存放到 Redis 中的 value

java
/**
 * 服务器元信息
 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class ServiceMetaInfo {

    /**
     * 服务名称
     */
    private String serviceName;

    /**
     * 服务域名
     */
    private String serviceHost;

    /**
     * 服务端口号
     */
    private Integer servicePort;

    /**
     * 服务分组 (暂未实现)
     */
    private String serviceGroup = "default";


    /**
     * 利用 host + port 组成唯一实例标识
     * @return
     */
    public String getNodeKey() {
        return String.format("%s:%s",serviceHost,servicePort);
    }

}
  • 编写 RedisRegistry

这个类代码比较多,我实现了服务注册心跳检测异常服务剔除等功能满足基本需求,至于服务主动下线、通知机制就没做了。

java
@Slf4j
public class RedisRegistry implements Registry{

    private static final int HEARTBEAT_INTERVAL = 5; //心跳间隔 (秒)
    private static final int HEALTH_CHECK_INTERVAL = 10; //健康检查间隔 (秒)
    private static final int SERVICE_TIMEOUT = 30; //服务超时时间 (秒)
    private static final String SERVICE_ROOT_PATH = "/rpc/service/";
    private static final String HEARTBEAT_ROOT_PATH = "/rpc/heartbeat/";

    // 标志位防止重复初始化,保证 init() 幂等性
    private volatile boolean initialized = false;

    private JedisPool jedisPool;

    private ScheduledExecutorService heartbeatExecutor;

    private ScheduledExecutorService healthCheckExecutor;

    private final Set<String> registeredServices = ConcurrentHashMap.newKeySet();

    @Override
    public  synchronized void init(RegistryConfig registryConfig) {
        //防止重复初始化
        if (initialized) {
            return;
        }
        heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
        healthCheckExecutor = Executors.newSingleThreadScheduledExecutor();

        JedisPoolConfig poolConfig = new JedisPoolConfig();
        poolConfig.setMaxTotal(100);  //最大连接数
        poolConfig.setMaxIdle(20);   //最大空闲连接
        poolConfig.setMinIdle(5);  //最小空闲连接
        poolConfig.setTestOnBorrow(true); //取出连接时校验

        jedisPool = new JedisPool(poolConfig, registryConfig.getHost(), registryConfig.getPort(),
                registryConfig.getTimeout(),registryConfig.getPassword());
        startHealthCheck();
        initialized = true;
    }

    @Override
    public void register(ServiceMetaInfo serviceMetaInfo) throws Exception {
        try (Jedis jedis = jedisPool.getResource()){
            // 1. 注册服务元数据 hash 结构
            // /rpc/service/serviceName:serviceVersion  serviceHost:servicePort {serviceMetaInfo}
            String serviceName = serviceMetaInfo.getServiceName();
            String instanceKey = serviceMetaInfo.getNodeKey();
            String instanceValue = JSONUtil.toJsonStr(serviceMetaInfo);
            jedis.hset(SERVICE_ROOT_PATH + serviceName, instanceKey, instanceValue);
            // 2. 添加心跳标记
            jedis.zadd(HEARTBEAT_ROOT_PATH + serviceName,
                    (double) System.currentTimeMillis() / 1000,
                    instanceKey
            );
            // 本地缓存一份已注册的实例信息,后续只检测本实例注册的服务的存活状态
            registeredServices.add(serviceName);
            // 3. 定期维护心跳
            startHeartbeatTask(serviceMetaInfo);
        }
    }

    @Override
    public void unRegister(ServiceMetaInfo serviceMetaInfo) {
        try (Jedis jedis = jedisPool.getResource()){
            // todo 手动服务下线
        }
    }

    @Override
    public List<ServiceMetaInfo> serviceDiscovery(String serviceKey) {
        try (Jedis jedis = jedisPool.getResource()){
            // 1. 获取所有健康实例 key
            double now = System.currentTimeMillis() / 1000.0;
            double minAliveTime = now - SERVICE_TIMEOUT;
            String minStr = String.format("%.3f", minAliveTime);
            List<String> aliveInstances = jedis.zrangeByScore(HEARTBEAT_ROOT_PATH + serviceKey, minStr, "+inf");

            // 2. 获取健康实例元数据
            // 如果不存在存活实例,直接返回空集合
            if (aliveInstances.isEmpty()) {
                return Collections.emptyList();
            }
            List<String> metadataList =
                    jedis.hmget(SERVICE_ROOT_PATH + serviceKey, aliveInstances.toArray(new String[0]));

            // 3. 解析为对象
            return metadataList.stream()
                    .map(metadata -> JSONUtil.toBean(metadata, ServiceMetaInfo.class))
                    .toList();
        }
    }

    @Override
    public void destroy() {
        // 释放资源
        if (healthCheckExecutor != null) {
            healthCheckExecutor.shutdown();
        }
        if (heartbeatExecutor != null) {
            heartbeatExecutor.shutdown();
        }
        if ( jedisPool != null) {
            jedisPool.close();
        }
        log.info("RedisRegistry 销毁完成");
    }

    /**
     * 每 5 秒去更新心跳时间
     * @param serviceMetaInfo
     */
    private void startHeartbeatTask(ServiceMetaInfo serviceMetaInfo) {
        heartbeatExecutor.scheduleAtFixedRate(() -> {
            try (Jedis jedis = jedisPool.getResource()){
                jedis.zadd(HEARTBEAT_ROOT_PATH + serviceMetaInfo.getServiceName(),
                        (double) System.currentTimeMillis() / 1000,
                        serviceMetaInfo.getNodeKey()
                );
            }
        },0,HEARTBEAT_INTERVAL,TimeUnit.SECONDS);
    }

    /**
     * 健康检查
     */
    private void startHealthCheck() {
        healthCheckExecutor.scheduleAtFixedRate(() -> {
            //检测所有已注册的服务的存活状态
            registeredServices.forEach(this::checkAndCleanExpiredInstances);
        },0,HEALTH_CHECK_INTERVAL,TimeUnit.SECONDS);
    }

    /**
     * 当心跳已经 30 秒没更新过了,则说明该实例已经死亡,清理掉下线实例
     * @param serviceName
     */
    private void checkAndCleanExpiredInstances(String serviceName) {
        try (Jedis jedis = jedisPool.getResource()){
            String heartBeatKey = HEARTBEAT_ROOT_PATH + serviceName;
            double now = System.currentTimeMillis() / 1000.0;
            double minAliveTime = now - SERVICE_TIMEOUT;
            // 1. 删除旧的心跳数据 (score < minAliveTime 的实例)
            jedis.zremrangeByScore(heartBeatKey,0,minAliveTime);
            // 2. 获取当前健康实例
            String minStr = String.format("%.3f", minAliveTime);
            List<String> aliveInstances = jedis.zrangeByScore(heartBeatKey, minStr, "+inf");
            // 3. 清理 Hash 中的无效元数据
            Set<String> allInstances = jedis.hkeys(SERVICE_ROOT_PATH + serviceName);
            for (String instance : allInstances) {
                if (!aliveInstances.contains(instance)) {
                    jedis.hdel(SERVICE_ROOT_PATH + serviceName,instance);
                    // todo 发布服务下线通知
                }
            }
        }
    }

}
  • Redis 缓存类
java
public class RegistryServiceCache {

    private final Map<String,List<ServiceMetaInfo>> cache = new ConcurrentHashMap<>();

    private static volatile RegistryServiceCache instance;

    private static Registry registry;

    private RegistryServiceCache() {

    }

    public static RegistryServiceCache getInstance() {
        if (instance == null) {
            synchronized (RegistryServiceCache.class) {
                if (instance == null) {
                    instance = new RegistryServiceCache();
                    // 初始化缓存类实例的同时,初始化注册中心
                    RpcConfig rpcConfig = RpcApplication.getRpcConfig();
                    registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
                }
            }
        }
        return instance;
    }

    /**
     * 写缓存
     * @param serviceKey
     * @param newServiceCache
     */
    public void writeCache(String serviceKey, List<ServiceMetaInfo> newServiceCache) {
        //在写缓存时进行防御性拷贝
        this.cache.put(serviceKey, new ArrayList<>(newServiceCache));
    }

    /**
     * 读缓存
     * @param serviceKey
     * @return
     */
    public List<ServiceMetaInfo> readCache(String serviceKey) {
        // 使用 computeIfAbsent 保证同一时刻只有一个线程执行查询操作
        // 防止缓存击穿
       return cache.computeIfAbsent(serviceKey,k -> {
           List<ServiceMetaInfo> metaInfos = registry.serviceDiscovery(k);
           // 处理空值并进行防御性拷贝
           return metaInfos == null ? Collections.emptyList() : new ArrayList<>(metaInfos);
       });
    }

    /**
     * 清空缓存
     * @param serviceKey
     */
    public void clearCache(String serviceKey) {
        cache.remove(serviceKey);
    }

}

本地注册器

在第一次写这部分代码时,我也有点疑惑,为什么服务提供者启动时不但要注册服务信息到远程注册中心,也需要注册一份服务信息到本地注册器中。

后来才发现,远程注册中心是服务名和服务具体地址的映射关系(消费者找到提供者);而本地注册器是服务名和服务实现的映射关系(服务提供者找到服务实现类处理消费者请求)

rc3.webp

  • ServiceRegisterInfo 类存放服务名和本地实现类的映射关系
java
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ServiceRegisterInfo<T> {

    /**
     * 服务名称
     */
    private String serviceName;

    /**
     * 实现类
     */
    private Class<? extends T> implClass;
    
}
  • LocalRegistry 本地注册器类
java
/**
 * 本地注册器
 */
public class LocalRegistry {

    private static final Map<String,Class<?>> map = new ConcurrentHashMap<>();

    public static void register(String serviceName,Class<?> impClass) {
        map.put(serviceName, impClass);
    }

    public static Class<?> get(String serviceName) {
        return map.get(serviceName);
    }

    public static void remove(String serviceName) {
        map.remove(serviceName);
    }

}

Redis 数据结构介绍

服务元数据

我使用的 Redis 中的 Hash 数据类型来存储服务元数据信息,以注册一个 UserService 服务为例:

  • key

固定前缀 /rpc/service/ + 服务全类名 cn.ttdgg.example.common.service.UserService

  • value

因为同样一个 UserService 服务接口可能有多个实例提供,不同实例位于不同的服务器节点,因此选择使用 hash 结构:filed 为实例唯一标识,对应的 value 为服务元数据

我这里为了简化,没有定义唯一的实例标识,暂时使用 host + port 作为服务实例唯一标识(不考虑同一节点部署多个实例的情况)

rc4.webp

心跳检测

Redis 注册中心还要维护心跳检测信息,我选择 Zset 数据类型,以 UserService 服务的心跳检测为例:

  • key

固定前缀 /rpc/heartbeat + 服务全类名 cn.ttdgg.example.common.service.UserService

  • value

元素为服务实例唯一标识 host + port , score 分数为当前时间戳,心跳检测定期检测这个 score ,如果 30 秒没有更新过了,则认为服务实例已经死亡,需要被清除

rc5.webp