Skip to content

序列化器与 SPI 机制

字数: 0 字 时长: 0 分钟

需求分析

消费者调用远程方法,本质还是需要发送请求给服务提供者,无论是请求还是响应,都会涉及到参数的传输。而 Java 对象是存活在 JVM 中的,需要先序列化为字节流,再通过网络传输给服务提供者,再反序列化为 Java 对象。

常见的序列化方式有:

  • JDK原生序列化:JDK 自带的序列化方式,优点是天然集成,无需引入外部依赖,效果比较中庸
  • JSON:优点是可读性强,跨语言支持;缺点是序列化后的数据相对较大,对复杂结构数据支持较差
  • Hessian:优点是跨语言、序列化的后数据小;缺点是性能相对较低,对象必须实现 Serializable 接口
  • Kryo:优点是高性能,可以序列化任何对象;缺点是只适用 Java,不支持跨语言
  • Protobuf:优点是跨语言、性能好、序列化后的数据小;缺点是配置相对复杂

SPI 机制

我们可以提供几种内置的序列化器,但框架设计者无法穷举所有方案(也没必要),那么用户如果想要自定义序列化器该怎么办呢?

答案是利用 SPI 机制,SPI (Service Provider Interface) 允许服务提供者通过特定的配置文件将自己的实现注册到系统中,然后系统通过反射机制动态加载这些实现,而不需要修改原始框架的代码

代码实现

serializer1.webp

公共代码

  • Serializer 接口

定义一个抽象的 Serializer 接口,所有序列化器都需要实现该接口

java
/**
 * 序列化接口
 */
public interface Serializer {

    <T> byte[] serialize(T object) throws IOException;

    <T> T deserialize(byte[] bytes,Class<T> type) throws IOException;

}
  • SerializerKeys 常量

使用接口来定义常量是一个编程技巧,因为接口的字段默认是 public static final 的,可以作为常量使用

java
/**
 * 序列化器键名
 */
public interface SerializerKeys {

    String JDK = "jdk";
    String JSON = "json";
    String KRYO = "kryo";
    String HESSIAN = "hessian";

}
  • SpiLoader SPI 加载器

定义 SPI 加载规则,系统 SPI 插件类从 META-INF/rpc/system/ 目录下加载

java
/**
 * SPI 加载器
 */
@SuppressWarnings("unchecked")
@Slf4j
public class SpiLoader {

    private static Map<String,Map<String,Class<?>>> loaderMap = new ConcurrentHashMap<>();

    private static Map<String,Object> instanceCache = new ConcurrentHashMap<>();

    private static final String RPC_SYSTEM_SPI_DIR = "META-INF/rpc/system/";

    private static final String RPC_CUSTOM_SPI_DIR = "META-INF/rpc/custom/";

    private static final String[] SCAN_DIRS = new String[]{RPC_SYSTEM_SPI_DIR,RPC_CUSTOM_SPI_DIR};

    private static final List<Class<?>> LOAD_CLASS_LIST = List.of(Serializer.class);

    public static void loadAll() {
        log.info("加载所有 SPI");
        for (Class<?> clazz : LOAD_CLASS_LIST) {
            load(clazz);
        }
    }

    public static <T> T getInstance(Class<?> clazz, String key) {
        String clazzName = clazz.getName();
        Map<String, Class<?>> keyClassMap = loaderMap.get(clazzName);
        if (keyClassMap == null) {
            throw new RuntimeException(String.format("SpiLoader 未加载 %s 类型",clazzName));
        }
        if (!keyClassMap.containsKey(key)) {
            throw new RuntimeException(String.format("SpiLoader 的 %s 不存在 %s 的类型",clazzName,key));
        }
        //获取到要加载的实现类型
        Class<?> implClass = keyClassMap.get(key);
        String implClassName = implClass.getName();
        if (!instanceCache.containsKey(implClassName)) {
            try {
                instanceCache.put(implClassName,implClass.getConstructor().newInstance());
            }catch (Exception e){
                String errMsg = String.format("%s 实例化失败",implClassName);
                throw new RuntimeException(errMsg,e);
            }
        }
        return (T) instanceCache.get(implClassName);
    }

    public static Map<String,Class<?>> load(Class<?> loadClass) {
        log.info("加载类型为 {} 的 SPI",loadClass.getName());
        //扫描路径.用户自定义的 SPI 优先级高于系统 SPI
        Map<String,Class<?>> keyClassMap = new HashMap<>();
        for (String scanDir : SCAN_DIRS) {
            // 通过 ResourceUtil 获取资源文件,而部署通过文件路径获取
            // 因为框架作为依赖被引入,是无法直接得到正确文件路径的
            List<URL> resources = ResourceUtil.getResources(scanDir + loadClass.getName());
            //读取每个资源文件
            for (URL resource : resources) {
                try {
                    InputStreamReader inputStreamReader = new InputStreamReader(resource.openStream());
                    BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
                    String line;
                    while ((line = bufferedReader.readLine()) != null) {
                        String[] strArray = line.split("=");
                        if (strArray.length > 1) {
                            String key = strArray[0];
                            String className = strArray[1];
                            keyClassMap.put(key,Class.forName(className));
                        }
                    }
                }catch (Exception e) {
                    log.error("SPI 加载 {} 失败",resource.getPath(),e);
                }
            }
        }
        loaderMap.put(loadClass.getName(),keyClassMap);
        return keyClassMap;
    }

}
  • SerializerFactory 序列化器工厂

静态代码块中调用了 SpiLoader.load(Serializer.class) 意味着从 META-INF/rpc/system/ 目录下加载所有实现 Serializer 接口的类

serializer2.webp

java
/**
 * 序列化器工厂 单例 + 工厂模式
 */
public class SerializerFactory {

    static {
        SpiLoader.load(Serializer.class);
    }

    /**
     * 默认序列化器
     */
    private static final Serializer DEFAULT_SERIALIZER = new JdkSerializer();

    /**
     * 获取实例
     * @param key
     * @return
     */
    public static Serializer getInstance(String key) {
        return SpiLoader.getInstance(Serializer.class, key);
    }
}

JDK 序列化器

java
/**
 * Jdk 序列化器
 */
public class JdkSerializer implements Serializer{


    @Override
    public <T> byte[] serialize(T object) throws IOException {
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
        objectOutputStream.writeObject(object);
        objectOutputStream.close();
        return outputStream.toByteArray();
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
        ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
        try (ObjectInputStream objectInputStream = new ObjectInputStream(inputStream)) {
            Object obj = objectInputStream.readObject();
            if (!type.isInstance(obj)) {
                throw new ClassCastException("反序列化的对象与目标类型不匹配");
            }
            return type.cast(obj);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }
    }
}

Hessian 序列化器

  • 引入依赖
xml
<dependency>
    <groupId>com.caucho</groupId>
    <artifactId>hessian</artifactId>
    <version>4.0.66</version>
</dependency>
  • 代码实现
java
public class HessianSerializer implements Serializer{
    @Override
    public <T> byte[] serialize(T object) throws IOException {
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        Hessian2Output ho = new Hessian2Output(bos);
        ho.writeObject(object);
        return bos.toByteArray();
    }

    @SuppressWarnings("unchecked")
    @Override
    public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
        ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
        Hessian2Input hi = new Hessian2Input(bis);
        return (T) hi.readObject(type);
    }

}

Json 序列化器

  • 引入依赖
xml
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.16.2</version>
</dependency>
  • 代码实现
java
public class JsonSerializer implements Serializer{

    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    @Override
    public <T> byte[] serialize(T object) throws IOException {
        return OBJECT_MAPPER.writeValueAsBytes(object);
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
        T obj = OBJECT_MAPPER.readValue(bytes, type);
        if (obj instanceof RpcRequest) {
            return handleRequest((RpcRequest) obj, type);
        }
        if (obj instanceof RpcResponse) {
            return handleResponse((RpcResponse) obj, type);
        }
        return obj;
    }

    private <T> T handleRequest(RpcRequest rpcRequest,Class<T> type) throws IOException {
        Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
        Object[] args = rpcRequest.getArgs();

        for (int i = 0; i < parameterTypes.length; i++) {
            Class<?> clazz = parameterTypes[i];
            //如果类型不同,则重新处理一下类型
            // 处理泛型擦除或类型转换失败问题(JSON序列化中间层效果类似深拷贝)
            if (!clazz.isAssignableFrom(args[i].getClass())) {
                byte[] argBytes = OBJECT_MAPPER.writeValueAsBytes(args[i]);
                args[i] = OBJECT_MAPPER.readValue(argBytes, clazz);
            }
        }
        return type.cast(rpcRequest);
    }

    private <T> T handleResponse(RpcResponse rpcResponse, Class<T> type) throws IOException {
        byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(rpcResponse.getData());
        rpcResponse.setData(OBJECT_MAPPER.readValue(bytes,rpcResponse.getDataType()));
        return type.cast(rpcResponse);
    }
    
}

Kryo 序列化器

  • 引入依赖
xml
<dependency>
    <groupId>com.esotericsoftware</groupId>
    <artifactId>kryo</artifactId>
    <version>5.6.1</version>
</dependency>
  • 代码实现
java
public class KryoSerializer implements Serializer{

    /**
     * Kryo 线程不安全,使用 ThreadLocal 保证每个线程只有一个 Kryo
     */
    private static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = ThreadLocal.withInitial(() -> {
        Kryo kryo = new Kryo();
        //设置动态序列化和反序列化,不提前注册所有类,可能有安全问题
        kryo.setRegistrationRequired(false);
        return kryo;
    });


    @Override
    public <T> byte[] serialize(T object) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Output output = new Output(byteArrayOutputStream);
        KRYO_THREAD_LOCAL.get().writeObject(output,object);
        output.close();
        return byteArrayOutputStream.toByteArray();
    }

    @Override
    public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
        Input input = new Input(byteArrayInputStream);
        T result = KRYO_THREAD_LOCAL.get().readObject(input, type);
        input.close();
        return result;
    }
}