序列化器与 SPI 机制
字数: 0 字 时长: 0 分钟
需求分析
消费者调用远程方法,本质还是需要发送请求给服务提供者,无论是请求还是响应,都会涉及到参数的传输。而 Java 对象是存活在 JVM 中的,需要先序列化为字节流,再通过网络传输给服务提供者,再反序列化为 Java 对象。
常见的序列化方式有:
- JDK原生序列化:JDK 自带的序列化方式,优点是天然集成,无需引入外部依赖,效果比较中庸
- JSON:优点是可读性强,跨语言支持;缺点是序列化后的数据相对较大,对复杂结构数据支持较差
- Hessian:优点是跨语言、序列化的后数据小;缺点是性能相对较低,对象必须实现
Serializable
接口 - Kryo:优点是高性能,可以序列化任何对象;缺点是只适用 Java,不支持跨语言
- Protobuf:优点是跨语言、性能好、序列化后的数据小;缺点是配置相对复杂
SPI 机制
我们可以提供几种内置的序列化器,但框架设计者无法穷举所有方案(也没必要),那么用户如果想要自定义序列化器该怎么办呢?
答案是利用 SPI 机制,SPI (Service Provider Interface) 允许服务提供者通过特定的配置文件将自己的实现注册到系统中,然后系统通过反射机制动态加载这些实现,而不需要修改原始框架的代码。
代码实现
公共代码
Serializer
接口
定义一个抽象的 Serializer
接口,所有序列化器都需要实现该接口
java
/**
* 序列化接口
*/
public interface Serializer {
<T> byte[] serialize(T object) throws IOException;
<T> T deserialize(byte[] bytes,Class<T> type) throws IOException;
}
SerializerKeys
常量
使用接口来定义常量是一个编程技巧,因为接口的字段默认是 public static final
的,可以作为常量使用
java
/**
* 序列化器键名
*/
public interface SerializerKeys {
String JDK = "jdk";
String JSON = "json";
String KRYO = "kryo";
String HESSIAN = "hessian";
}
SpiLoader
SPI 加载器
定义 SPI 加载规则,系统 SPI 插件类从 META-INF/rpc/system/
目录下加载
java
/**
* SPI 加载器
*/
@SuppressWarnings("unchecked")
@Slf4j
public class SpiLoader {
private static Map<String,Map<String,Class<?>>> loaderMap = new ConcurrentHashMap<>();
private static Map<String,Object> instanceCache = new ConcurrentHashMap<>();
private static final String RPC_SYSTEM_SPI_DIR = "META-INF/rpc/system/";
private static final String RPC_CUSTOM_SPI_DIR = "META-INF/rpc/custom/";
private static final String[] SCAN_DIRS = new String[]{RPC_SYSTEM_SPI_DIR,RPC_CUSTOM_SPI_DIR};
private static final List<Class<?>> LOAD_CLASS_LIST = List.of(Serializer.class);
public static void loadAll() {
log.info("加载所有 SPI");
for (Class<?> clazz : LOAD_CLASS_LIST) {
load(clazz);
}
}
public static <T> T getInstance(Class<?> clazz, String key) {
String clazzName = clazz.getName();
Map<String, Class<?>> keyClassMap = loaderMap.get(clazzName);
if (keyClassMap == null) {
throw new RuntimeException(String.format("SpiLoader 未加载 %s 类型",clazzName));
}
if (!keyClassMap.containsKey(key)) {
throw new RuntimeException(String.format("SpiLoader 的 %s 不存在 %s 的类型",clazzName,key));
}
//获取到要加载的实现类型
Class<?> implClass = keyClassMap.get(key);
String implClassName = implClass.getName();
if (!instanceCache.containsKey(implClassName)) {
try {
instanceCache.put(implClassName,implClass.getConstructor().newInstance());
}catch (Exception e){
String errMsg = String.format("%s 实例化失败",implClassName);
throw new RuntimeException(errMsg,e);
}
}
return (T) instanceCache.get(implClassName);
}
public static Map<String,Class<?>> load(Class<?> loadClass) {
log.info("加载类型为 {} 的 SPI",loadClass.getName());
//扫描路径.用户自定义的 SPI 优先级高于系统 SPI
Map<String,Class<?>> keyClassMap = new HashMap<>();
for (String scanDir : SCAN_DIRS) {
// 通过 ResourceUtil 获取资源文件,而部署通过文件路径获取
// 因为框架作为依赖被引入,是无法直接得到正确文件路径的
List<URL> resources = ResourceUtil.getResources(scanDir + loadClass.getName());
//读取每个资源文件
for (URL resource : resources) {
try {
InputStreamReader inputStreamReader = new InputStreamReader(resource.openStream());
BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
String line;
while ((line = bufferedReader.readLine()) != null) {
String[] strArray = line.split("=");
if (strArray.length > 1) {
String key = strArray[0];
String className = strArray[1];
keyClassMap.put(key,Class.forName(className));
}
}
}catch (Exception e) {
log.error("SPI 加载 {} 失败",resource.getPath(),e);
}
}
}
loaderMap.put(loadClass.getName(),keyClassMap);
return keyClassMap;
}
}
SerializerFactory
序列化器工厂
静态代码块中调用了 SpiLoader.load(Serializer.class)
意味着从 META-INF/rpc/system/
目录下加载所有实现 Serializer
接口的类
java
/**
* 序列化器工厂 单例 + 工厂模式
*/
public class SerializerFactory {
static {
SpiLoader.load(Serializer.class);
}
/**
* 默认序列化器
*/
private static final Serializer DEFAULT_SERIALIZER = new JdkSerializer();
/**
* 获取实例
* @param key
* @return
*/
public static Serializer getInstance(String key) {
return SpiLoader.getInstance(Serializer.class, key);
}
}
JDK 序列化器
java
/**
* Jdk 序列化器
*/
public class JdkSerializer implements Serializer{
@Override
public <T> byte[] serialize(T object) throws IOException {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
objectOutputStream.writeObject(object);
objectOutputStream.close();
return outputStream.toByteArray();
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes);
try (ObjectInputStream objectInputStream = new ObjectInputStream(inputStream)) {
Object obj = objectInputStream.readObject();
if (!type.isInstance(obj)) {
throw new ClassCastException("反序列化的对象与目标类型不匹配");
}
return type.cast(obj);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}
Hessian 序列化器
- 引入依赖
xml
<dependency>
<groupId>com.caucho</groupId>
<artifactId>hessian</artifactId>
<version>4.0.66</version>
</dependency>
- 代码实现
java
public class HessianSerializer implements Serializer{
@Override
public <T> byte[] serialize(T object) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Hessian2Output ho = new Hessian2Output(bos);
ho.writeObject(object);
return bos.toByteArray();
}
@SuppressWarnings("unchecked")
@Override
public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
Hessian2Input hi = new Hessian2Input(bis);
return (T) hi.readObject(type);
}
}
Json 序列化器
- 引入依赖
xml
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.16.2</version>
</dependency>
- 代码实现
java
public class JsonSerializer implements Serializer{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@Override
public <T> byte[] serialize(T object) throws IOException {
return OBJECT_MAPPER.writeValueAsBytes(object);
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
T obj = OBJECT_MAPPER.readValue(bytes, type);
if (obj instanceof RpcRequest) {
return handleRequest((RpcRequest) obj, type);
}
if (obj instanceof RpcResponse) {
return handleResponse((RpcResponse) obj, type);
}
return obj;
}
private <T> T handleRequest(RpcRequest rpcRequest,Class<T> type) throws IOException {
Class<?>[] parameterTypes = rpcRequest.getParameterTypes();
Object[] args = rpcRequest.getArgs();
for (int i = 0; i < parameterTypes.length; i++) {
Class<?> clazz = parameterTypes[i];
//如果类型不同,则重新处理一下类型
// 处理泛型擦除或类型转换失败问题(JSON序列化中间层效果类似深拷贝)
if (!clazz.isAssignableFrom(args[i].getClass())) {
byte[] argBytes = OBJECT_MAPPER.writeValueAsBytes(args[i]);
args[i] = OBJECT_MAPPER.readValue(argBytes, clazz);
}
}
return type.cast(rpcRequest);
}
private <T> T handleResponse(RpcResponse rpcResponse, Class<T> type) throws IOException {
byte[] bytes = OBJECT_MAPPER.writeValueAsBytes(rpcResponse.getData());
rpcResponse.setData(OBJECT_MAPPER.readValue(bytes,rpcResponse.getDataType()));
return type.cast(rpcResponse);
}
}
Kryo 序列化器
- 引入依赖
xml
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo</artifactId>
<version>5.6.1</version>
</dependency>
- 代码实现
java
public class KryoSerializer implements Serializer{
/**
* Kryo 线程不安全,使用 ThreadLocal 保证每个线程只有一个 Kryo
*/
private static final ThreadLocal<Kryo> KRYO_THREAD_LOCAL = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
//设置动态序列化和反序列化,不提前注册所有类,可能有安全问题
kryo.setRegistrationRequired(false);
return kryo;
});
@Override
public <T> byte[] serialize(T object) throws IOException {
ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream);
KRYO_THREAD_LOCAL.get().writeObject(output,object);
output.close();
return byteArrayOutputStream.toByteArray();
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> type) throws IOException {
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream);
T result = KRYO_THREAD_LOCAL.get().readObject(input, type);
input.close();
return result;
}
}