| 
21 | 21 | #include "cpp2sky/exception.h"  | 
22 | 22 | #include "utils/grpc_status.h"  | 
23 | 23 | 
 
  | 
24 |  | -#define DEFAULT_CONNECTION_ACTIVE_RETRY_TIMES 5  | 
25 |  | -#define DEFAULT_CONNECTION_ACTIVE_RETRY_SLEEP_SEC 3  | 
26 |  | - | 
27 | 24 | namespace cpp2sky {  | 
28 | 25 | 
 
  | 
29 | 26 | namespace {  | 
@@ -57,37 +54,23 @@ GrpcAsyncSegmentReporterClient::GrpcAsyncSegmentReporterClient(  | 
57 | 54 | }  | 
58 | 55 | 
 
  | 
59 | 56 | GrpcAsyncSegmentReporterClient::~GrpcAsyncSegmentReporterClient() {  | 
60 |  | -  // If connection is inactive, it dispose all drained messages even if it has  | 
61 |  | -  // tons of messages.  | 
62 |  | -  uint8_t retry_times = DEFAULT_CONNECTION_ACTIVE_RETRY_TIMES;  | 
63 |  | -  while (channel_->GetState(false) !=  | 
64 |  | -         grpc_connectivity_state::GRPC_CHANNEL_READY) {  | 
65 |  | -    if (retry_times <= 0) {  | 
66 |  | -      gpr_log(GPR_INFO,  | 
67 |  | -              "All %ld pending messages have disposed because of no active "  | 
68 |  | -              "connection",  | 
69 |  | -              drained_messages_.size());  | 
70 |  | -      resetStream();  | 
71 |  | -      return;  | 
72 |  | -    }  | 
73 |  | -    retry_times--;  | 
74 |  | -    std::this_thread::sleep_for(  | 
75 |  | -        std::chrono::seconds(DEFAULT_CONNECTION_ACTIVE_RETRY_SLEEP_SEC));  | 
76 |  | -  }  | 
77 |  | - | 
78 | 57 |   // It will wait until there is no drained messages.  | 
79 | 58 |   // There are no timeout option to handle this, so if you would like to stop  | 
80 | 59 |   // them, you should send signals like SIGTERM.  | 
81 | 60 |   // If server stopped with accidental issue, the event loop handle that it  | 
82 | 61 |   // failed to send message and close stream, then recreate new stream and try  | 
83 | 62 |   // to do it. This process will continue forever without sending explicit  | 
84 | 63 |   // signal.  | 
 | 64 | +  // TODO(shikugawa): Block to wait drained messages to be clear with createing condition  | 
 | 65 | +  // variable wrapper.  | 
 | 66 | +#ifndef TEST  | 
85 | 67 |   if (stream_) {  | 
86 | 68 |     std::unique_lock<std::mutex> lck(mux_);  | 
87 | 69 |     while (!drained_messages_.empty()) {  | 
88 | 70 |       cv_.wait(lck);  | 
89 | 71 |     }  | 
90 | 72 |   }  | 
 | 73 | +#endif  | 
91 | 74 | 
 
  | 
92 | 75 |   resetStream();  | 
93 | 76 | }  | 
 | 
0 commit comments