Skip to content
Draft
5 changes: 5 additions & 0 deletions dubbo-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@
<skip_maven_deploy>false</skip_maven_deploy>
</properties>
<dependencies>
<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should not add Netty as a dependency to the common module.

<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
29 changes: 26 additions & 3 deletions dubbo-common/src/main/java/org/apache/dubbo/rpc/model/Pack.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,35 @@
*/
package org.apache.dubbo.rpc.model;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

/**
* Zero-copy pack interface using ByteBuf
*/
public interface Pack {

/**
* @param obj instance
* @return byte array
* Zero-copy pack object to ByteBuf
* @param obj instance to pack
* @param allocator ByteBuf allocator
* @return ByteBuf containing packed data
* @throws Exception when error occurs
*/
byte[] pack(Object obj) throws Exception;
ByteBuf pack(Object obj, ByteBufAllocator allocator) throws Exception;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same issue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can refactor this using Streams


/**
* @deprecated Use {@link #pack(Object, ByteBufAllocator)} for zero-copy processing
*/
@Deprecated
default byte[] pack(Object obj) throws Exception {
ByteBuf buf = pack(obj, ByteBufAllocator.DEFAULT);
try {
byte[] result = new byte[buf.readableBytes()];
buf.readBytes(result);
return result;
} finally {
buf.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,124 @@
*/
package org.apache.dubbo.rpc.model;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;

/**
* A packable method is used to customize serialization for methods. It can provide a common wrapper
* for RESP / Protobuf.
*/
public interface PackableMethod {

default Object parseRequest(byte[] data) throws Exception {
/**
* Zero-copy parse ByteBuf to request object
*/
default Object parseRequest(ByteBuf data) throws Exception {
return getRequestUnpack().unpack(data);
}

default Object parseResponse(byte[] data) throws Exception {
/**
* Zero-copy parse ByteBuf to response object
*/
default Object parseResponse(ByteBuf data) throws Exception {
return parseResponse(data, false);
}

default Object parseResponse(byte[] data, boolean isReturnTriException) throws Exception {
/**
* Zero-copy parse ByteBuf to response object with exception handling
*/
default Object parseResponse(ByteBuf data, boolean isReturnTriException) throws Exception {
UnPack unPack = getResponseUnpack();
if (unPack instanceof WrapperUnPack) {
return ((WrapperUnPack) unPack).unpack(data, isReturnTriException);
}
return unPack.unpack(data);
}

/**
* Zero-copy pack request to ByteBuf
*/
default ByteBuf packRequest(Object request, ByteBufAllocator allocator) throws Exception {
return getRequestPack().pack(request, allocator);
}

/**
* Zero-copy pack response to ByteBuf
*/
default ByteBuf packResponse(Object response, ByteBufAllocator allocator) throws Exception {
return getResponsePack().pack(response, allocator);
}

/**
* @deprecated Use {@link #parseRequest(ByteBuf)} for zero-copy processing
*/
@Deprecated
default Object parseRequest(byte[] data) throws Exception {
ByteBuf buf = io.netty.buffer.Unpooled.wrappedBuffer(data);
try {
return parseRequest(buf);
} finally {
buf.release();
}
}

/**
* @deprecated Use {@link #parseResponse(ByteBuf)} for zero-copy processing
*/
@Deprecated
default Object parseResponse(byte[] data) throws Exception {
return parseResponse(data, false);
}

/**
* @deprecated Use {@link #parseResponse(ByteBuf, boolean)} for zero-copy processing
*/
@Deprecated
default Object parseResponse(byte[] data, boolean isReturnTriException) throws Exception {
ByteBuf buf = io.netty.buffer.Unpooled.wrappedBuffer(data);
try {
return parseResponse(buf, isReturnTriException);
} finally {
buf.release();
}
}

/**
* @deprecated Use {@link #packRequest(Object, ByteBufAllocator)} for zero-copy processing
*/
@Deprecated
default byte[] packRequest(Object request) throws Exception {
return getRequestPack().pack(request);
ByteBuf buf = packRequest(request, ByteBufAllocator.DEFAULT);
try {
byte[] result = new byte[buf.readableBytes()];
buf.readBytes(result);
return result;
} finally {
buf.release();
}
}

/**
* @deprecated Use {@link #packResponse(Object, ByteBufAllocator)} for zero-copy processing
*/
@Deprecated
default byte[] packResponse(Object response) throws Exception {
return getResponsePack().pack(response);
ByteBuf buf = packResponse(response, ByteBufAllocator.DEFAULT);
try {
byte[] result = new byte[buf.readableBytes()];
buf.readBytes(result);
return result;
} finally {
buf.release();
}
}

/**
* @deprecated Use {@link #packRequest(Object, ByteBufAllocator)} instead
*/
@Deprecated
Pack pack(Object[] arguments);

default boolean needWrapper() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ public UnPack getRequestUnpack() {
return requestUnpack;
}

@Override
@Deprecated
public Pack pack(Object[] arguments) {
return requestPack;
}

@Override
public String toString() {
return "StubMethodDescriptor{" + "method=" + methodName + '('
Expand Down
25 changes: 22 additions & 3 deletions dubbo-common/src/main/java/org/apache/dubbo/rpc/model/UnPack.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,31 @@
*/
package org.apache.dubbo.rpc.model;

import io.netty.buffer.ByteBuf;

/**
* Zero-copy unpack interface using ByteBuf
*/
public interface UnPack {

/**
* @param data byte array
* Zero-copy unpack ByteBuf to object
* @param data ByteBuf containing packed data
* @return object instance
* @throws Exception exception
* @throws Exception exception
*/
Object unpack(ByteBuf data) throws Exception;

/**
* @deprecated Use {@link #unpack(ByteBuf)} for zero-copy processing
*/
Object unpack(byte[] data) throws Exception;
@Deprecated
default Object unpack(byte[] data) throws Exception {
ByteBuf buf = io.netty.buffer.Unpooled.wrappedBuffer(data);
try {
return unpack(buf);
} finally {
buf.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,40 @@
*/
package org.apache.dubbo.rpc.model;

import io.netty.buffer.ByteBuf;

public interface WrapperUnPack extends UnPack {

/**
* Zero-copy unpack ByteBuf with exception handling
*/
default Object unpack(ByteBuf data) throws Exception {
return unpack(data, false);
}

/**
* Zero-copy unpack ByteBuf with exception handling option
*/
Object unpack(ByteBuf data, boolean isReturnTriException) throws Exception;

/**
* @deprecated Use {@link #unpack(ByteBuf)} for zero-copy processing
*/
@Deprecated
default Object unpack(byte[] data) throws Exception {
return unpack(data, false);
}

Object unpack(byte[] data, boolean isReturnTriException) throws Exception;
/**
* @deprecated Use {@link #unpack(ByteBuf, boolean)} for zero-copy processing
*/
@Deprecated
default Object unpack(byte[] data, boolean isReturnTriException) throws Exception {
ByteBuf buf = io.netty.buffer.Unpooled.wrappedBuffer(data);
try {
return unpack(buf, isReturnTriException);
} finally {
buf.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;

import com.google.protobuf.Message;

public final class DubboMetadataServiceV2Triple {

public static final String SERVICE_NAME = MetadataServiceV2.SERVICE_NAME;
Expand All @@ -67,60 +65,60 @@ public static MetadataServiceV2 newStub(Invoker<?> invoker) {
MetadataRequest.class,
MetadataInfoV2.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
MetadataRequest::parseFrom,
MetadataInfoV2::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataInfoV2.class));

private static final StubMethodDescriptor getMetadataInfoAsyncMethod = new StubMethodDescriptor(
"GetMetadataInfo",
MetadataRequest.class,
CompletableFuture.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
MetadataRequest::parseFrom,
MetadataInfoV2::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataInfoV2.class));

private static final StubMethodDescriptor getMetadataInfoProxyAsyncMethod = new StubMethodDescriptor(
"GetMetadataInfoAsync",
MetadataRequest.class,
MetadataInfoV2.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
MetadataRequest::parseFrom,
MetadataInfoV2::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(MetadataInfoV2.class));

private static final StubMethodDescriptor getOpenAPIInfoMethod = new StubMethodDescriptor(
"GetOpenAPIInfo",
OpenAPIRequest.class,
OpenAPIInfo.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
OpenAPIRequest::parseFrom,
OpenAPIInfo::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIInfo.class));

private static final StubMethodDescriptor getOpenAPIInfoAsyncMethod = new StubMethodDescriptor(
"GetOpenAPIInfo",
OpenAPIRequest.class,
CompletableFuture.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
OpenAPIRequest::parseFrom,
OpenAPIInfo::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIInfo.class));

private static final StubMethodDescriptor getOpenAPIInfoProxyAsyncMethod = new StubMethodDescriptor(
"GetOpenAPIInfoAsync",
OpenAPIRequest.class,
OpenAPIInfo.class,
MethodDescriptor.RpcType.UNARY,
obj -> ((Message) obj).toByteArray(),
obj -> ((Message) obj).toByteArray(),
OpenAPIRequest::parseFrom,
OpenAPIInfo::parseFrom);
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbArrayPacker(true),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIRequest.class),
new org.apache.dubbo.rpc.protocol.tri.PbUnpack<>(OpenAPIInfo.class));

static {
serviceDescriptor.addMethod(getMetadataInfoMethod);
Expand Down
Loading
Loading