Skip to content

Commit c1b51ba

Browse files
authored
ensure all span closed before sending span (#50)
* ensure all span closed before sending span * ci
1 parent 6b9bbb9 commit c1b51ba

File tree

12 files changed

+65
-35
lines changed

12 files changed

+65
-35
lines changed

bazel/repositories.bzl

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,9 @@ def cpp2sky_dependencies():
1111
def skywalking_data_collect_protocol():
1212
http_archive(
1313
name = "skywalking_data_collect_protocol",
14-
sha256 = "ca495537bb85dbe8df5984ac7b571a0b87660281be69be1f9a0fa16fbf58f953",
15-
# TODO(shikugawa): Bazel upstreaming
16-
urls = ["https://github.com/Shikugawa/skywalking-data-collect-protocol/archive/v8.1.0-bazel.tar.gz"],
17-
strip_prefix = "skywalking-data-collect-protocol-8.1.0-bazel",
14+
sha256 = "8158e095b9b37c39e18938fe08bd4fca6f2b8e16763ff21fe8118d79241a6e0b",
15+
urls = ["https://github.com/apache/skywalking-data-collect-protocol/archive/v8.3.0.tar.gz"],
16+
strip_prefix = "skywalking-data-collect-protocol-8.3.0",
1817
)
1918

2019
def com_github_grpc_grpc():

cpp2sky/internal/async_client.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,6 @@ enum class Operation : uint8_t {
8787
Connected = 1,
8888
Idle = 2,
8989
WriteDone = 3,
90-
Finished = 4,
9190
};
9291

9392
template <class RequestType, class ResponseType>
@@ -116,7 +115,7 @@ class AsyncStream {
116115
/**
117116
* Handle incoming event related to this stream.
118117
*/
119-
virtual bool handleOperation(Operation incoming_op) = 0;
118+
virtual void handleOperation(Operation incoming_op) = 0;
120119
};
121120

122121
template <class RequestType>

cpp2sky/segment_context.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,11 @@ class SegmentContext {
262262
* Whether belonging span can be skipped analysis or not.
263263
*/
264264
virtual bool skipAnalysis() = 0;
265+
266+
/**
267+
* Determine whether to send this segment or not.
268+
*/
269+
virtual bool readyToSend() = 0;
265270
};
266271

267272
using SegmentContextPtr = std::shared_ptr<SegmentContext>;

cpp2sky/tracer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,6 @@ class Tracer {
3333

3434
using TracerPtr = std::unique_ptr<Tracer>;
3535

36-
TracerPtr createInsecureGrpcTracer(TracerConfig& cfg);
36+
TracerPtr createInsecureGrpcTracer(const TracerConfig& cfg);
3737

3838
} // namespace cpp2sky

source/grpc_async_client_impl.cc

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ GrpcAsyncSegmentReporterClient::~GrpcAsyncSegmentReporterClient() {
8282
// failed to send message and close stream, then recreate new stream and try
8383
// to do it. This process will continue forever without sending explicit
8484
// signal.
85-
{
85+
if (stream_) {
8686
std::unique_lock<std::mutex> lck(mux_);
8787
while (!drained_messages_.empty()) {
8888
cv_.wait(lck);
@@ -149,9 +149,6 @@ GrpcAsyncSegmentReporterStream::~GrpcAsyncSegmentReporterStream() {
149149
}
150150
}
151151
gpr_log(GPR_INFO, "%ld pending messages drained.", pending_messages_size);
152-
153-
ctx_.TryCancel();
154-
request_writer_->Finish(&status_, toTag(&finish_));
155152
}
156153

157154
bool GrpcAsyncSegmentReporterStream::startStream() {
@@ -184,7 +181,7 @@ bool GrpcAsyncSegmentReporterStream::clearPendingMessages() {
184181
return true;
185182
}
186183

187-
bool GrpcAsyncSegmentReporterStream::handleOperation(Operation incoming_op) {
184+
void GrpcAsyncSegmentReporterStream::handleOperation(Operation incoming_op) {
188185
state_ = incoming_op;
189186
if (state_ == Operation::Connected) {
190187
gpr_log(GPR_INFO, "Established connection: %s",
@@ -209,16 +206,9 @@ bool GrpcAsyncSegmentReporterStream::handleOperation(Operation incoming_op) {
209206
if (pending_messages_.empty()) {
210207
cv_.notify_all();
211208
}
212-
return true;
213-
} else if (state_ == Operation::Finished) {
214-
gpr_log(GPR_INFO, "Stream closed with http status: %d",
215-
grpcStatusToGenericHttpStatus(status_.error_code()));
216-
if (!status_.ok()) {
217-
gpr_log(GPR_ERROR, "%s", status_.error_message().c_str());
218-
}
219-
return false;
209+
} else {
210+
throw TracerException("Unknown stream operation");
220211
}
221-
throw TracerException("Unknown stream operation");
222212
}
223213

224214
AsyncStreamPtr<TracerRequestType> GrpcAsyncSegmentReporterStreamFactory::create(

source/grpc_async_client_impl.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ class GrpcAsyncSegmentReporterStream final
112112
// AsyncStream
113113
bool startStream() override;
114114
void sendMessage(TracerRequestType message) override;
115-
bool handleOperation(Operation incoming_op) override;
115+
void handleOperation(Operation incoming_op) override;
116116
void undrainMessage(TracerRequestType message) override {
117117
pending_messages_.push(message);
118118
}
@@ -122,7 +122,6 @@ class GrpcAsyncSegmentReporterStream final
122122

123123
AsyncClient<TracerRequestType, TracerResponseType>* client_;
124124
TracerResponseType commands_;
125-
grpc::Status status_;
126125
grpc::ClientContext ctx_;
127126
std::unique_ptr<grpc::ClientAsyncWriter<TracerRequestType>> request_writer_;
128127
CircularBuffer<TracerRequestType> pending_messages_{
@@ -131,7 +130,6 @@ class GrpcAsyncSegmentReporterStream final
131130

132131
TaggedStream connected_{Operation::Connected, this};
133132
TaggedStream write_done_{Operation::WriteDone, this};
134-
TaggedStream finish_{Operation::Finished, this};
135133

136134
std::condition_variable& cv_;
137135
};

source/segment_context_impl.cc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,16 @@ void CurrentSegmentSpanImpl::endSpan() {
122122
assert(!finished_);
123123
auto now = TimePoint<SystemTime>();
124124
endSpan(now);
125-
finished_ = true;
126125
}
127126

128127
void CurrentSegmentSpanImpl::endSpan(TimePoint<SystemTime> current_time) {
129128
end_time_ = current_time.fetch();
129+
finished_ = true;
130130
}
131131

132132
void CurrentSegmentSpanImpl::endSpan(TimePoint<SteadyTime> current_time) {
133133
end_time_ = current_time.fetch();
134+
finished_ = true;
134135
}
135136

136137
void CurrentSegmentSpanImpl::setComponentId(int32_t component_id) {
@@ -251,6 +252,15 @@ SegmentObject SegmentContextImpl::createSegmentObject() {
251252
return obj;
252253
}
253254

255+
bool SegmentContextImpl::readyToSend() {
256+
for (const auto& span : spans_) {
257+
if (!span->finished()) {
258+
return false;
259+
}
260+
}
261+
return true;
262+
}
263+
254264
SegmentContextFactoryImpl::SegmentContextFactoryImpl(const TracerConfig& cfg)
255265
: service_name_(cfg.service_name()), instance_name_(cfg.instance_name()) {}
256266

source/segment_context_impl.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ class SegmentContextImpl : public SegmentContext {
161161
SegmentObject createSegmentObject() override;
162162
void setSkipAnalysis() override { should_skip_analysis_ = true; }
163163
bool skipAnalysis() override { return should_skip_analysis_; }
164+
bool readyToSend() override;
164165

165166
private:
166167
std::string encodeSpan(CurrentSegmentSpanPtr parent_span,

source/tracer_impl.cc

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
namespace cpp2sky {
2020

21-
TracerImpl::TracerImpl(TracerConfig& config,
21+
TracerImpl::TracerImpl(const TracerConfig& config,
2222
std::shared_ptr<grpc::ChannelCredentials> cred,
2323
GrpcAsyncSegmentReporterStreamFactory& factory)
2424
: th_([this] { this->run(); }) {
@@ -37,7 +37,7 @@ TracerImpl::~TracerImpl() {
3737
}
3838

3939
void TracerImpl::sendSegment(SegmentContextPtr obj) {
40-
if (!obj) {
40+
if (!obj || !obj->readyToSend()) {
4141
return;
4242
}
4343
client_->sendMessage(obj->createSegmentObject());
@@ -55,17 +55,16 @@ void TracerImpl::run() {
5555
TaggedStream* t_stream = deTag(got_tag);
5656
if (!ok) {
5757
client_->resetStream();
58-
continue;
59-
}
60-
if (!t_stream->stream->handleOperation(t_stream->operation)) {
6158
client_->startStream();
59+
continue;
6260
}
61+
t_stream->stream->handleOperation(t_stream->operation);
6362
}
6463
}
6564

66-
TracerPtr createInsecureGrpcTracer(TracerConfig& cfg) {
65+
TracerPtr createInsecureGrpcTracer(const TracerConfig& cfg) {
6766
return std::make_unique<TracerImpl>(cfg, grpc::InsecureChannelCredentials(),
6867
stream_factory);
6968
}
7069

71-
} // namespace cpp2sky
70+
} // namespace cpp2sky

source/tracer_impl.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ using TracerResponseType = Commands;
2626

2727
class TracerImpl : public Tracer {
2828
public:
29-
TracerImpl(TracerConfig& config,
29+
TracerImpl(const TracerConfig& config,
3030
std::shared_ptr<grpc::ChannelCredentials> cred,
3131
GrpcAsyncSegmentReporterStreamFactory& factory);
3232
~TracerImpl();
@@ -43,6 +43,6 @@ class TracerImpl : public Tracer {
4343

4444
static GrpcAsyncSegmentReporterStreamFactory stream_factory;
4545

46-
TracerPtr createInsecureGrpcTracer(TracerConfig& cfg);
46+
TracerPtr createInsecureGrpcTracer(const TracerConfig& cfg);
4747

4848
} // namespace cpp2sky

0 commit comments

Comments
 (0)