4
4
import io .split .client .dtos .Event ;
5
5
import io .split .client .utils .GenericClientUtil ;
6
6
import io .split .client .utils .Utils ;
7
+ import io .split .telemetry .domain .enums .EventsDataRecordsEnum ;
8
+ import io .split .telemetry .domain .enums .HTTPLatenciesEnum ;
9
+ import io .split .telemetry .domain .enums .LastSynchronizationRecordsEnum ;
10
+ import io .split .telemetry .storage .TelemetryEvaluationProducer ;
11
+ import io .split .telemetry .storage .TelemetryRuntimeProducer ;
7
12
import org .apache .hc .client5 .http .impl .classic .CloseableHttpClient ;
8
13
import org .slf4j .Logger ;
9
14
import org .slf4j .LoggerFactory ;
23
28
import java .util .concurrent .TimeUnit ;
24
29
25
30
import static java .lang .Thread .MIN_PRIORITY ;
31
+ import static com .google .common .base .Preconditions .checkNotNull ;
26
32
27
33
/**
28
34
* Responsible for sending events added via .track() to Split collection services
@@ -45,34 +51,28 @@ public class EventClientImpl implements EventClient {
45
51
private final CloseableHttpClient _httpclient ;
46
52
private final URI _target ;
47
53
private final int _waitBeforeShutdown ;
54
+ private final TelemetryRuntimeProducer _telemetryRuntimeProducer ;
48
55
49
56
ThreadFactory eventClientThreadFactory (final String name ) {
50
- return new ThreadFactory () {
51
- @ Override
52
- public Thread newThread (final Runnable r ) {
53
- return new Thread (new Runnable () {
54
- @ Override
55
- public void run () {
56
- Thread .currentThread ().setPriority (MIN_PRIORITY );
57
- r .run ();
58
- }
59
- }, name );
60
- }
61
- };
57
+ return r -> new Thread (() -> {
58
+ Thread .currentThread ().setPriority (MIN_PRIORITY );
59
+ r .run ();
60
+ }, name );
62
61
}
63
62
64
63
65
- public static EventClientImpl create (CloseableHttpClient httpclient , URI eventsRootTarget , int maxQueueSize , long flushIntervalMillis , int waitBeforeShutdown ) throws URISyntaxException {
66
- return new EventClientImpl (new LinkedBlockingQueue <WrappedEvent >( ),
64
+ public static EventClientImpl create (CloseableHttpClient httpclient , URI eventsRootTarget , int maxQueueSize , long flushIntervalMillis , int waitBeforeShutdown , TelemetryRuntimeProducer telemetryRuntimeProducer ) throws URISyntaxException {
65
+ return new EventClientImpl (new LinkedBlockingQueue <>( maxQueueSize ),
67
66
httpclient ,
68
67
Utils .appendPath (eventsRootTarget , "api/events/bulk" ),
69
68
maxQueueSize ,
70
69
flushIntervalMillis ,
71
- waitBeforeShutdown );
70
+ waitBeforeShutdown ,
71
+ telemetryRuntimeProducer );
72
72
}
73
73
74
74
EventClientImpl (BlockingQueue <WrappedEvent > eventQueue , CloseableHttpClient httpclient , URI target , int maxQueueSize ,
75
- long flushIntervalMillis , int waitBeforeShutdown ) throws URISyntaxException {
75
+ long flushIntervalMillis , int waitBeforeShutdown , TelemetryRuntimeProducer telemetryRuntimeProducer ) throws URISyntaxException {
76
76
77
77
_httpclient = httpclient ;
78
78
@@ -83,6 +83,7 @@ public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsR
83
83
84
84
_maxQueueSize = maxQueueSize ;
85
85
_flushIntervalMillis = flushIntervalMillis ;
86
+ _telemetryRuntimeProducer = checkNotNull (telemetryRuntimeProducer );
86
87
87
88
_senderExecutor = new ThreadPoolExecutor (
88
89
1 ,
@@ -122,9 +123,16 @@ public boolean track(Event event, int eventSize) {
122
123
if (event == null ) {
123
124
return false ;
124
125
}
125
- _eventQueue .put (new WrappedEvent (event , eventSize ));
126
+ if (_eventQueue .offer (new WrappedEvent (event , eventSize ))) {
127
+ _telemetryRuntimeProducer .recordEventStats (EventsDataRecordsEnum .EVENTS_QUEUED , 1 );
128
+ }
129
+ else {
130
+ _log .warn ("Event dropped." );
131
+ _telemetryRuntimeProducer .recordEventStats (EventsDataRecordsEnum .EVENTS_DROPPED , 1 );
132
+ }
126
133
127
- } catch (InterruptedException e ) {
134
+ } catch (ClassCastException | NullPointerException | IllegalArgumentException e ) {
135
+ _telemetryRuntimeProducer .recordEventStats (EventsDataRecordsEnum .EVENTS_DROPPED , 1 );
128
136
_log .warn ("Interruption when adding event withed while adding message %s." , event );
129
137
return false ;
130
138
}
@@ -153,7 +161,7 @@ public void run() {
153
161
List <Event > events = new ArrayList <>();
154
162
long accumulated = 0 ;
155
163
try {
156
- while (true ) {
164
+ while (! Thread . currentThread (). isInterrupted () ) {
157
165
WrappedEvent data = _eventQueue .take ();
158
166
Event event = data .event ();
159
167
Long size = data .size ();
@@ -169,7 +177,7 @@ public void run() {
169
177
170
178
continue ;
171
179
}
172
-
180
+ long initTime = System . currentTimeMillis ();
173
181
if (events .size () >= _maxQueueSize || accumulated >= MAX_SIZE_BYTES || event == CENTINEL ) {
174
182
175
183
// Send over the network
@@ -183,6 +191,8 @@ public void run() {
183
191
// Clear the queue of events for the next batch.
184
192
events = new ArrayList <>();
185
193
accumulated = 0 ;
194
+ _telemetryRuntimeProducer .recordSyncLatency (HTTPLatenciesEnum .EVENTS , System .currentTimeMillis ()-initTime );
195
+ _telemetryRuntimeProducer .recordSuccessfulSync (LastSynchronizationRecordsEnum .EVENTS , System .currentTimeMillis ());
186
196
}
187
197
}
188
198
} catch (InterruptedException e ) {
0 commit comments