Skip to content
26 changes: 22 additions & 4 deletions dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,30 @@
*/
package org.apache.dubbo.rpc.model;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;

/**
* Zero-copy pack interface using pure Java Stream API
*/
public interface Pack {

/**
* @param obj instance
* @return byte array
* @throws Exception when error occurs
* Stream-based pack object to OutputStream for zero-copy processing
* @param obj instance to pack
* @param output OutputStream to write packed data
* @throws IOException when I/O error occurs
*/
void pack(Object obj, OutputStream output) throws IOException;

/**
* @deprecated Use {@link #pack(Object, OutputStream)} for stream-based processing
*/
byte[] pack(Object obj) throws Exception;
@Deprecated
default byte[] pack(Object obj) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
pack(obj, baos);
return baos.toByteArray();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,107 @@
*/
package org.apache.dubbo.rpc.model;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/**
* A packable method is used to customize serialization for methods. It can provide a common wrapper
* for RESP / Protobuf.
*/
public interface PackableMethod {

default Object parseRequest(byte[] data) throws Exception {
return getRequestUnpack().unpack(data);
/**
* Stream-based parse InputStream to request object
*/
default Object parseRequest(InputStream input) throws IOException {
return getRequestUnpack().unpack(input);
}

default Object parseResponse(byte[] data) throws Exception {
return parseResponse(data, false);
/**
* Stream-based parse InputStream to response object
*/
default Object parseResponse(InputStream input) throws IOException {
return parseResponse(input, false);
}

default Object parseResponse(byte[] data, boolean isReturnTriException) throws Exception {
/**
* Stream-based parse InputStream to response object with exception handling
*/
default Object parseResponse(InputStream input, boolean isReturnTriException) throws IOException {
UnPack unPack = getResponseUnpack();
if (unPack instanceof WrapperUnPack) {
return ((WrapperUnPack) unPack).unpack(data, isReturnTriException);
return ((WrapperUnPack) unPack).unpack(input, isReturnTriException);
}
return unPack.unpack(data);
return unPack.unpack(input);
}

/**
* Stream-based pack request to OutputStream
*/
default void packRequest(Object request, OutputStream output) throws IOException {
getRequestPack().pack(request, output);
}

/**
* Stream-based pack response to OutputStream
*/
default void packResponse(Object response, OutputStream output) throws IOException {
getResponsePack().pack(response, output);
}

/**
* @deprecated Use {@link #parseRequest(InputStream)} for stream-based processing
*/
@Deprecated
default Object parseRequest(byte[] data) throws IOException {
return parseRequest(new ByteArrayInputStream(data));
}

/**
* @deprecated Use {@link #parseResponse(InputStream)} for stream-based processing
*/
@Deprecated
default Object parseResponse(byte[] data) throws IOException {
return parseResponse(data, false);
}

default byte[] packRequest(Object request) throws Exception {
return getRequestPack().pack(request);
/**
* @deprecated Use {@link #parseResponse(InputStream, boolean)} for stream-based processing
*/
@Deprecated
default Object parseResponse(byte[] data, boolean isReturnTriException) throws IOException {
return parseResponse(new ByteArrayInputStream(data), isReturnTriException);
}

default byte[] packResponse(Object response) throws Exception {
return getResponsePack().pack(response);
/**
* @deprecated Use {@link #packRequest(Object, OutputStream)} for stream-based processing
*/
@Deprecated
default byte[] packRequest(Object request) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
packRequest(request, baos);
return baos.toByteArray();
}

/**
* @deprecated Use {@link #packResponse(Object, OutputStream)} for stream-based processing
*/
@Deprecated
default byte[] packResponse(Object response) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
packResponse(response, baos);
return baos.toByteArray();
}

/**
* @deprecated Use {@link #packRequest(Object, OutputStream)} instead
*/
@Deprecated
Pack pack(Object[] arguments);

default boolean needWrapper() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ public UnPack getRequestUnpack() {
return requestUnpack;
}

@Override
@Deprecated
public Pack pack(Object[] arguments) {
return requestPack;
}

@Override
public String toString() {
return "StubMethodDescriptor{" + "method=" + methodName + '('
Expand Down
22 changes: 19 additions & 3 deletions dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,28 @@
*/
package org.apache.dubbo.rpc.model;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

/**
* Zero-copy unpack interface using pure Java Stream API
*/
public interface UnPack {

/**
* @param data byte array
* Stream-based unpack InputStream to object for zero-copy processing
* @param input InputStream containing packed data
* @return object instance
* @throws Exception exception
* @throws IOException when I/O error occurs
*/
Object unpack(InputStream input) throws IOException;

/**
* @deprecated Use {@link #unpack(InputStream)} for stream-based processing
*/
Object unpack(byte[] data) throws Exception;
@Deprecated
default Object unpack(byte[] data) throws IOException {
return unpack(new ByteArrayInputStream(data));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,38 @@
*/
package org.apache.dubbo.rpc.model;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;

public interface WrapperUnPack extends UnPack {

default Object unpack(byte[] data) throws Exception {
/**
* Stream-based unpack InputStream with exception handling
*/
@Override
default Object unpack(InputStream input) throws IOException {
return unpack(input, false);
}

/**
* Stream-based unpack InputStream with exception handling option
*/
Object unpack(InputStream input, boolean isReturnTriException) throws IOException;

/**
* @deprecated Use {@link #unpack(InputStream)} for stream-based processing
*/
@Deprecated
default Object unpack(byte[] data) throws IOException {
return unpack(data, false);
}

Object unpack(byte[] data, boolean isReturnTriException) throws Exception;
/**
* @deprecated Use {@link #unpack(InputStream, boolean)} for stream-based processing
*/
@Deprecated
default Object unpack(byte[] data, boolean isReturnTriException) throws IOException {
return unpack(new ByteArrayInputStream(data), isReturnTriException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

import com.google.protobuf.Message;

public final class DubboMetadataServiceV2Triple {

public static final String SERVICE_NAME = MetadataServiceV2.SERVICE_NAME;
Expand All @@ -67,60 +65,60 @@ public static MetadataServiceV2 newStub(Invoker<?> invoker) {
MetadataRequest.class,
MetadataInfoV2.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
MetadataRequest::parseFrom,
MetadataInfoV2::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataInfoV2.class));

private static final StubMethodDescriptor getMetadataInfoAsyncMethod = new StubMethodDescriptor(
"GetMetadataInfo",
MetadataRequest.class,
CompletableFuture.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
MetadataRequest::parseFrom,
MetadataInfoV2::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataInfoV2.class));

private static final StubMethodDescriptor getMetadataInfoProxyAsyncMethod = new StubMethodDescriptor(
"GetMetadataInfoAsync",
MetadataRequest.class,
MetadataInfoV2.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
MetadataRequest::parseFrom,
MetadataInfoV2::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataInfoV2.class));

private static final StubMethodDescriptor getOpenAPIInfoMethod = new StubMethodDescriptor(
"GetOpenAPIInfo",
OpenAPIRequest.class,
OpenAPIInfo.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
OpenAPIRequest::parseFrom,
OpenAPIInfo::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIInfo.class));

private static final StubMethodDescriptor getOpenAPIInfoAsyncMethod = new StubMethodDescriptor(
"GetOpenAPIInfo",
OpenAPIRequest.class,
CompletableFuture.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
OpenAPIRequest::parseFrom,
OpenAPIInfo::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIInfo.class));

private static final StubMethodDescriptor getOpenAPIInfoProxyAsyncMethod = new StubMethodDescriptor(
"GetOpenAPIInfoAsync",
OpenAPIRequest.class,
OpenAPIInfo.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
OpenAPIRequest::parseFrom,
OpenAPIInfo::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIInfo.class));

static {
serviceDescriptor.addMethod(getMetadataInfoMethod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import org.apache.dubbo.rpc.stub.StubSuppliers;
import org.apache.dubbo.rpc.stub.UnaryStubMethodHandler;

import com.google.protobuf.Message;
import org.apache.dubbo.rpc.protocol.tri.PbArrayPacker;
import org.apache.dubbo.rpc.protocol.tri.PbUnpack;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -68,39 +70,39 @@ public final class {{className}} {

private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true), new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{inputType}}.class), new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{outputType}}.class));

private static final StubMethodDescriptor {{methodName}}AsyncMethod = new StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, java.util.concurrent.CompletableFuture.class, MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true), new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{inputType}}.class), new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{outputType}}.class));

private static final StubMethodDescriptor {{methodName}}ProxyAsyncMethod = new StubMethodDescriptor("{{originMethodName}}Async",
{{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true), new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{inputType}}.class), new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{outputType}}.class));
{{/unaryMethods}}
{{#serverStreamingMethods}}

private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.SERVER_STREAM,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true), new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{inputType}}.class), new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{outputType}}.class));
{{/serverStreamingMethods}}
{{#clientStreamingMethods}}

private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.CLIENT_STREAM,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true), new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{inputType}}.class), new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{outputType}}.class));
{{/clientStreamingMethods}}
{{#biStreamingWithoutClientStreamMethods}}

private static final StubMethodDescriptor {{methodName}}Method = new StubMethodDescriptor("{{originMethodName}}",
{{inputType}}.class, {{outputType}}.class, MethodDescriptor.RpcType.BI_STREAM,
obj -> ((Message) obj).toByteArray(), obj -> ((Message) obj).toByteArray(), {{inputType}}::parseFrom,
{{outputType}}::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true), new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{inputType}}.class), new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>({{outputType}}.class));
{{/biStreamingWithoutClientStreamMethods}}

static{
Expand Down
Loading
Loading