Skip to content

Commit

Permalink
refactor the grpc client to fix #116 (#132)
Browse files Browse the repository at this point in the history
  • Loading branch information
wbpcode authored Aug 19, 2024
1 parent c96eb92 commit d8b7b81
Show file tree
Hide file tree
Showing 20 changed files with 621 additions and 712 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,16 @@ jobs:
- uses: actions/checkout@v3
- name: Run bazel test with GCC c++11
run: |
bazel test --cxxopt=-std=c++0x //...
bazel test --test_output=all --cxxopt=-std=c++0x //...
- name: Run bazel test with GCC c++17
run: |
bazel test --cxxopt=-std=c++17 //...
bazel test --test_output=all --cxxopt=-std=c++17 //...
- name: Run bazel test with CLANG c++11
run: |
bazel test --config=clang --cxxopt=-std=c++0x //...
bazel test --test_output=all -c dbg --config=clang --cxxopt=-std=c++0x //...
- name: Run bazel test with CLANG c++17
run: |
bazel test --config=clang --cxxopt=-std=c++17 //...
bazel test --test_output=all -c opt --config=clang --cxxopt=-std=c++17 //...
- name: Install cmake dependencies and run cmake compile
run: |
sudo apt update
Expand Down
1 change: 1 addition & 0 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ refresh_compile_commands(
"//cpp2sky/...": "",
"//source/...": "",
"//test/...": "",
"//example/...": "",
},
)
6 changes: 0 additions & 6 deletions cpp2sky/internal/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ cc_library(
visibility = ["//visibility:public"],
)

cc_library(
name = "stream_builder_interface",
hdrs = ["stream_builder.h"],
visibility = ["//visibility:public"],
)

cc_library(
name = "matcher_interface",
hdrs = ["matcher.h"],
Expand Down
141 changes: 56 additions & 85 deletions cpp2sky/internal/async_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@

#pragma once

#include <google/protobuf/message.h>
#include <grpcpp/generic/generic_stub.h>
#include <grpcpp/grpcpp.h>

#include <functional>
#include <memory>

#include "source/utils/circular_buffer.h"

using google::protobuf::Message;
#include "google/protobuf/message.h"
#include "grpcpp/generic/generic_stub.h"
#include "grpcpp/grpcpp.h"
#include "language-agent/Tracing.pb.h"

namespace cpp2sky {

/**
* Template base class for gRPC async client.
*/
template <class RequestType, class ResponseType>
class AsyncClient {
public:
Expand All @@ -37,108 +38,78 @@ class AsyncClient {
virtual void sendMessage(RequestType message) = 0;

/**
* Pending message queue reference.
*/
virtual CircularBuffer<RequestType>& pendingMessages() = 0;

/**
* Start stream if there is no living stream.
*/
virtual void startStream() = 0;

/**
* Completion queue.
* Reset the client. This should be called when the client is no longer
* needed.
*/
virtual grpc::CompletionQueue& completionQueue() = 0;

/**
* gRPC Stub
*/
virtual grpc::TemplatedGenericStub<RequestType, ResponseType>& stub() = 0;
virtual void resetClient() = 0;
};

template <class RequestType, class ResponseType>
using AsyncClientPtr = std::unique_ptr<AsyncClient<RequestType, ResponseType>>;

/**
* Template base class for gRPC async stream. The stream is used to represent
* a single gRPC stream/request.
*/
template <class RequestType, class ResponseType>
class AsyncStream {
public:
virtual ~AsyncStream() = default;

/**
* Send message. It will move the state from Init to Write.
* Send the specified protobuf message.
*/
virtual void sendMessage(RequestType message) = 0;
};

enum class StreamState : uint8_t {
Initialized = 0,
Ready = 1,
Idle = 2,
WriteDone = 3,
ReadDone = 4,
template <class RequestType, class ResponseType>
using AsyncStreamPtr = std::unique_ptr<AsyncStream<RequestType, ResponseType>>;

/**
* Tag for async operation. The callback should be called when the operation is
* done.
*/
struct AsyncEventTag {
std::function<void(bool)> callback;
};
using AsyncEventTagPtr = std::unique_ptr<AsyncEventTag>;

using GrpcClientContextPtr = std::unique_ptr<grpc::ClientContext>;
using GrpcCompletionQueue = grpc::CompletionQueue;

class AsyncStreamCallback {
/**
* Factory for creating async stream.
*/
template <class RequestType, class ResponseType>
class AsyncStreamFactory {
public:
/**
* Callback when stream ready event occured.
*/
virtual void onReady() = 0;
virtual ~AsyncStreamFactory() = default;

/**
* Callback when idle event occured.
*/
virtual void onIdle() = 0;
using StreamPtr = AsyncStreamPtr<RequestType, ResponseType>;
using GrpcStub = grpc::TemplatedGenericStub<RequestType, ResponseType>;

/**
* Callback when write done event occured.
*/
virtual void onWriteDone() = 0;
virtual StreamPtr createStream(GrpcClientContextPtr client_ctx,
GrpcStub& stub, GrpcCompletionQueue& cq,
AsyncEventTag& basic_event_tag,
AsyncEventTag& write_event_tag) = 0;
};

/**
* Callback when read done event occured.
*/
virtual void onReadDone() = 0;
template <class RequestType, class ResponseType>
using AsyncStreamFactoryPtr =
std::unique_ptr<AsyncStreamFactory<RequestType, ResponseType>>;

/**
* Callback when stream had finished with arbitrary error.
*/
virtual void onStreamFinish() = 0;
};
using TraceRequestType = skywalking::v3::SegmentObject;
using TraceResponseType = skywalking::v3::Commands;

struct StreamCallbackTag {
public:
void callback(bool stream_finished) {
if (stream_finished) {
callback_->onStreamFinish();
return;
}

switch (state_) {
case StreamState::Ready:
callback_->onReady();
break;
case StreamState::WriteDone:
callback_->onWriteDone();
break;
case StreamState::Idle:
callback_->onIdle();
break;
case StreamState::ReadDone:
callback_->onReadDone();
break;
default:
break;
}
}

StreamState state_;
AsyncStreamCallback* callback_;
};
using TraceAsyncStream = AsyncStream<TraceRequestType, TraceResponseType>;
using TraceAsyncStreamPtr = AsyncStreamPtr<TraceRequestType, TraceResponseType>;

template <class RequestType, class ResponseType>
using AsyncStreamSharedPtr =
std::shared_ptr<AsyncStream<RequestType, ResponseType>>;
using TraceAsyncStreamFactory =
AsyncStreamFactory<TraceRequestType, TraceResponseType>;
using TraceAsyncStreamFactoryPtr =
AsyncStreamFactoryPtr<TraceRequestType, TraceResponseType>;

using TraceAsyncClient = AsyncClient<TraceRequestType, TraceResponseType>;
using TraceAsyncClientPtr = std::unique_ptr<TraceAsyncClient>;

} // namespace cpp2sky
56 changes: 0 additions & 56 deletions cpp2sky/internal/stream_builder.h

This file was deleted.

2 changes: 1 addition & 1 deletion cpp2sky/tracer.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ class Tracer {

using TracerPtr = std::unique_ptr<Tracer>;

TracerPtr createInsecureGrpcTracer(TracerConfig& cfg);
TracerPtr createInsecureGrpcTracer(const TracerConfig& cfg);

} // namespace cpp2sky
8 changes: 3 additions & 5 deletions source/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,20 @@ cc_library(
name = "cpp2sky_lib",
srcs = [
"grpc_async_client_impl.cc",
"propagation_impl.cc",
"tracer_impl.cc",
"tracing_context_impl.cc",
],
hdrs = [
"grpc_async_client_impl.h",
"propagation_impl.h",
"tracer_impl.h",
"tracing_context_impl.h",
],
visibility = ["//visibility:public"],
deps = [
":cpp2sky_data_lib",
"//cpp2sky:config_cc_proto",
"//cpp2sky:cpp2sky_data_interface",
"//cpp2sky:cpp2sky_interface",
"//cpp2sky/internal:async_client_interface",
"//cpp2sky/internal:matcher_interface",
"//cpp2sky/internal:stream_builder_interface",
"//source/matchers:suffix_matcher_lib",
"//source/utils:util_lib",
"@com_github_gabime_spdlog//:spdlog",
Expand Down
Loading

0 comments on commit d8b7b81

Please sign in to comment.