序列化
概述
为了尽可能提高性能,Solpic
在设计请求有效载荷的序列化和响应有效载荷的反序列化时候使用了Flow
模式。由于java.util.concurrent.Flow
是JDK9
才引入,Solpic
核心代码是基于JDK8
开发,因此Solpic
自定义了两套流式处理有效载荷的方案。其中一套是基于"旧时代流处理"方案,也就是围绕InputStream
和OutputStream
进行流式读取和写入。另一个套是完全适配java.util.concurrent.Flow
的流式方案,也就是序列化和反序列化都围绕ByteBuffer
进行。无论选取哪种方案,在编码实现上都需要对流式编程有一定了解,基于这个前提Solpic
内置的有效载荷处理策略和内置的编码解码器已经可以解决绝大部分的场景。
序列化类体系
其中:
PayloadPublisher
与PayloadSubscriber
围绕InputStream
和OutputStream
进行流式读取和写入FlowPayloadPublisher
与FlowPayloadSubscriber
围绕ByteBuffer
进行流式读取和写入
这一小节只介绍PayloadPublisher
与PayloadSubscriber
。
内置策略
Solpic
提供了一些常用的PayloadPublisher
和PayloadSubscriber
实现,可以通过基于枚举实现的单例工厂PayloadPublishers
和PayloadSubscribers
直接调用对应的方法,这些方法都是见名知义的。
// 请求体是字符串
PayloadPublisher p1 = PayloadPublishers.X.ofString("hello", StandardCharsets.UTF_8);
// 请求体是字节数组
PayloadPublisher p2 = PayloadPublishers.X.ofByteArray("world".getBytes(StandardCharsets.UTF_8));
// 响应体是字符串
PayloadSubscriber<String> s1 = PayloadSubscribers.X.ofString(StandardCharsets.UTF_8);
// 响应体是字节数组
PayloadSubscriber<byte[]> s2 = PayloadSubscribers.X.ofByteArray();
举个例子,需要用GET
方法访问https://httpbin.org/get
并且把响应体转换为字符串:
HttpRequest httpRequest = HttpRequest.newBuilder()
.uri(URI.create("https://httpbin.org/get"))
.method(HttpMethod.GET)
.build();
HttpClient httpClient = Solpic.newHttpClient();
PayloadSubscriber<String> payloadSubscriber = PayloadSubscribers.X.ofString();
String responsePayload = httpClient.sendSimple(httpRequest, payloadSubscriber);
Solpic
提供API
用于注册内置策略,例如:
// 注册PayloadPublisher
Function<?, PayloadPublisher> function = new Function<CreateOrderRequest, PayloadPublisher>() {
@Override
public PayloadPublisher apply(CreateOrderRequest o) {
return new PayloadPublisher() {
@Override
public void writeTo(OutputStream outputStream, boolean autoClose) throws IOException {
}
@Override
public long contentLength() {
return 0;
}
};
}
};
PayloadPublishers.X.registerPayloadPublisher(CreateOrderRequest.class, function);
// 注册PayloadSubscriber
Supplier<PayloadSubscriber<?>> supplier = new Supplier<PayloadSubscriber<?>>() {
@Override
public PayloadSubscriber<?> get() {
return new PayloadSubscriber<CreateOrderResponse>() {
@Override
public void readFrom(InputStream inputStream, boolean autoClose) throws IOException {
}
@Override
public CompletionStage<CreateOrderResponse> getPayload() {
return null;
}
};
}
};
PayloadSubscribers.X.registerPayloadSubscriber(CreateOrderResponse.class, supplier);
注册完成后就可以直接使用:
Function<CreateOrderRequest, PayloadPublisher> payloadPublisher =
PayloadPublishers.X.getPayloadPublisher(CreateOrderRequest.class);
PayloadSubscriber<CreateOrderResponse> payloadSubscriber
= PayloadSubscribers.X.getPayloadSubscriber(CreateOrderResponse.class);
使用编码解码器
在HTTP
客户端使用的过程中,绝大部分情况是面向REST API
的场景,请求和响应的有效载荷都是JSON
文本,编码解码器Codec
就是为了这个场景特化设计。举个例子:
// 创建订单的请求对象
@Data
private static class CreateOrderRequest {
private String orderId;
private Long userId;
private BigDecimal amount;
}
// 创建订单的响应对象
@Data
private static class CreateOrderResponse {
private Long id;
}
// 使用Codec
Codec<CreateOrderRequest, CreateOrderResponse> codec = ...
CreateOrderRequest request = new CreateOrderRequest();
request.setUserId(1L);
request.setOrderId(UUID.randomUUID().toString());
request.setAmount(new BigDecimal("100.00"));
// 创建PayloadPublisher
PayloadPublisher p1 = codec.createPayloadPublisher(request);
// 创建PayloadPublisher,提前计算Content-Length
PayloadPublisher p2 = codec.createFixedPayloadPublisher(request);
// 创建Flow模式下的PayloadPublisher
FlowPayloadPublisher p3 = codec.createFlowPayloadPublisher(request);
// 创建PayloadSubscriber
PayloadSubscriber<CreateOrderResponse> s1 = codec.createPayloadSubscriber(CreateOrderResponse.class);
// 创建Flow模式下的PayloadSubscriber
FlowPayloadSubscriber<CreateOrderResponse> s2 = codec.createFlowPayloadSubscriber(CreateOrderResponse.class);
自定义策略
有些特殊场景内置策略和编码解码器都无法满足需求,可以自行实现PayloadPublisher
和PayloadSubscriber
。例如:
class CustomPayloadPublisher implements PayloadPublisher {
private static final byte[] HELLO = "hello".getBytes(StandardCharsets.UTF_8);
private final AtomicBoolean written = new AtomicBoolean();
@Override
public long contentLength() {
return HELLO.length;
}
@Override
public ContentType contentType() {
return ContentType.TEXT_PLAIN;
}
@Override
public void writeTo(OutputStream outputStream, boolean autoClose) throws IOException {
if (written.compareAndSet(false, true)) {
try {
outputStream.write(HELLO);
} finally {
if (autoClose) {
outputStream.close();
}
}
}
}
}
class CustomPayloadSubscriber implements PayloadSubscriber<String> {
private final AtomicBoolean read = new AtomicBoolean();
private volatile CompletionStage<String> future = new CompletableFuture<>();
@Override
public void readFrom(InputStream inputStream, boolean autoClose) throws IOException {
if (read.compareAndSet(false, true)) {
if (autoClose) {
future = MinimalFuture.completedFuture("hello");
}
}
}
@Override
public CompletionStage<String> getPayload() {
return future;
}
}
上面的CustomPayloadPublisher
使用后请求的有效载荷会写入固定字符串hello
,并且请求的Content-Type
为text/plain
。响应有效载荷为固定字符串world
,此处会忽略真实的响应有效载荷。