Skip to content

Commit df3ffbd

Browse files
Merge pull request #233 from splitio/development
Telemetry
2 parents 5f84978 + 4b8d6a6 commit df3ffbd

File tree

137 files changed

+5248
-1855
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

137 files changed

+5248
-1855
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ Split has built and maintains SDKs for:
6464
* Java [Github](https://github.com/splitio/java-client) [Docs](https://help.split.io/hc/en-us/articles/360020405151-Java-SDK)
6565
* Javascript [Github](https://github.com/splitio/javascript-client) [Docs](https://help.split.io/hc/en-us/articles/360020448791-JavaScript-SDK)
6666
* Node [Github](https://github.com/splitio/javascript-client) [Docs](https://help.split.io/hc/en-us/articles/360020564931-Node-js-SDK)
67-
* .NET [Github](https://github.com/splitio/.net-core-client) [Docs](https://help.split.io/hc/en-us/articles/360020240172--NET-SDK)
67+
* .NET [Github](https://github.com/splitio/dotnet-client) [Docs](https://help.split.io/hc/en-us/articles/360020240172--NET-SDK)
6868
* Ruby [Github](https://github.com/splitio/ruby-client) [Docs](https://help.split.io/hc/en-us/articles/360020673251-Ruby-SDK)
6969
* PHP [Github](https://github.com/splitio/php-client) [Docs](https://help.split.io/hc/en-us/articles/360020350372-PHP-SDK)
7070
* Python [Github](https://github.com/splitio/python-client) [Docs](https://help.split.io/hc/en-us/articles/360020359652-Python-SDK)

client/CHANGES.txt

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
CHANGES
22

3+
4.2.0 (Jun 7, 2021)
4+
- Updated SDK telemetry storage, metrics and updater to be more effective and send less often.
5+
- Improved the synchronization flow to be more reliable in the event of an edge case generating delay in cache purge propagation, keeping the SDK cache properly synced.
6+
- Fixed issue where the SDK was validating no Split had over 50 conditions (legacy code).
7+
38
4.1.6 (Apr 15, 2021)
4-
-Updated log level and message in some messages.
9+
- Updated log level and message in some messages.
510

611
4.1.5 (Apr 6, 2021)
7-
-Updated: Streaming retry fix.
12+
- Updated streaming logic to use limited fetch retry attempts.
813

914
4.1.4 (Mar 19, 2021)
1015
- Updated: Internal cache structure refactor.

client/pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<groupId>io.split.client</groupId>
77
<artifactId>java-client-parent</artifactId>
8-
<version>4.1.6</version>
8+
<version>4.2.0</version>
99
</parent>
1010
<artifactId>java-client</artifactId>
1111
<packaging>jar</packaging>
@@ -121,7 +121,7 @@
121121
<dependency>
122122
<groupId>com.google.guava</groupId>
123123
<artifactId>guava</artifactId>
124-
<version>29.0-jre</version>
124+
<version>30.0-jre</version>
125125
</dependency>
126126
<dependency>
127127
<groupId>org.slf4j</groupId>

client/src/main/java/io/split/cache/SegmentCache.java

+15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package io.split.cache;
22

3+
import io.split.engine.segments.SegmentImp;
4+
35
import java.util.List;
6+
import java.util.Set;
47

58
/**
69
* Memory for segments
@@ -42,4 +45,16 @@ public interface SegmentCache {
4245
* clear all segments
4346
*/
4447
void clear();
48+
49+
/**
50+
* return every segment
51+
* @return
52+
*/
53+
List<SegmentImp> getAll();
54+
55+
/**
56+
* return key count
57+
* @return
58+
*/
59+
long getKeyCount();
4560
}

client/src/main/java/io/split/cache/SegmentCacheInMemoryImpl.java

+12
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66
import org.slf4j.LoggerFactory;
77

88
import java.util.List;
9+
import java.util.Set;
910
import java.util.concurrent.ConcurrentMap;
11+
import java.util.stream.Collectors;
1012

1113
/**
1214
* InMemoryCache Implementation
@@ -59,4 +61,14 @@ public long getChangeNumber(String segmentName) {
5961
public void clear() {
6062
_segments.clear();
6163
}
64+
65+
@Override
66+
public List<SegmentImp> getAll() {
67+
return _segments.values().stream().collect(Collectors.toList());
68+
}
69+
70+
@Override
71+
public long getKeyCount() {
72+
return _segments.values().stream().mapToLong(SegmentImp::getKeysSize).sum();
73+
}
6274
}

client/src/main/java/io/split/client/ApiKeyCounter.java

+17
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
import org.slf4j.Logger;
77
import org.slf4j.LoggerFactory;
88

9+
import java.util.HashMap;
10+
import java.util.Map;
11+
912
public class ApiKeyCounter {
1013

1114
private static final Logger _log = LoggerFactory.getLogger(ApiKeyCounter.class);
@@ -63,4 +66,18 @@ boolean isApiKeyPresent(String apiKey) {
6366
int getCount(String apiKey) {
6467
return USED_API_KEYS.count(apiKey);
6568
}
69+
70+
public Map<String, Long> getFactoryInstances() {
71+
Map<String, Long> factoryInstances = new HashMap<>();
72+
for (String factory :USED_API_KEYS) {
73+
factoryInstances.putIfAbsent(factory, new Long(getCount(factory)));
74+
}
75+
76+
return factoryInstances;
77+
}
78+
79+
@VisibleForTesting
80+
void clearApiKeys() {
81+
USED_API_KEYS.clear();
82+
}
6683
}

client/src/main/java/io/split/client/EventClientImpl.java

+30-20
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@
44
import io.split.client.dtos.Event;
55
import io.split.client.utils.GenericClientUtil;
66
import io.split.client.utils.Utils;
7+
import io.split.telemetry.domain.enums.EventsDataRecordsEnum;
8+
import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
9+
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
10+
import io.split.telemetry.storage.TelemetryEvaluationProducer;
11+
import io.split.telemetry.storage.TelemetryRuntimeProducer;
712
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
813
import org.slf4j.Logger;
914
import org.slf4j.LoggerFactory;
@@ -23,6 +28,7 @@
2328
import java.util.concurrent.TimeUnit;
2429

2530
import static java.lang.Thread.MIN_PRIORITY;
31+
import static com.google.common.base.Preconditions.checkNotNull;
2632

2733
/**
2834
* Responsible for sending events added via .track() to Split collection services
@@ -45,34 +51,28 @@ public class EventClientImpl implements EventClient {
4551
private final CloseableHttpClient _httpclient;
4652
private final URI _target;
4753
private final int _waitBeforeShutdown;
54+
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
4855

4956
ThreadFactory eventClientThreadFactory(final String name) {
50-
return new ThreadFactory() {
51-
@Override
52-
public Thread newThread(final Runnable r) {
53-
return new Thread(new Runnable() {
54-
@Override
55-
public void run() {
56-
Thread.currentThread().setPriority(MIN_PRIORITY);
57-
r.run();
58-
}
59-
}, name);
60-
}
61-
};
57+
return r -> new Thread(() -> {
58+
Thread.currentThread().setPriority(MIN_PRIORITY);
59+
r.run();
60+
}, name);
6261
}
6362

6463

65-
public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsRootTarget, int maxQueueSize, long flushIntervalMillis, int waitBeforeShutdown) throws URISyntaxException {
66-
return new EventClientImpl(new LinkedBlockingQueue<WrappedEvent>(),
64+
public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsRootTarget, int maxQueueSize, long flushIntervalMillis, int waitBeforeShutdown, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
65+
return new EventClientImpl(new LinkedBlockingQueue<>(maxQueueSize),
6766
httpclient,
6867
Utils.appendPath(eventsRootTarget, "api/events/bulk"),
6968
maxQueueSize,
7069
flushIntervalMillis,
71-
waitBeforeShutdown);
70+
waitBeforeShutdown,
71+
telemetryRuntimeProducer);
7272
}
7373

7474
EventClientImpl(BlockingQueue<WrappedEvent> eventQueue, CloseableHttpClient httpclient, URI target, int maxQueueSize,
75-
long flushIntervalMillis, int waitBeforeShutdown) throws URISyntaxException {
75+
long flushIntervalMillis, int waitBeforeShutdown, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
7676

7777
_httpclient = httpclient;
7878

@@ -83,6 +83,7 @@ public static EventClientImpl create(CloseableHttpClient httpclient, URI eventsR
8383

8484
_maxQueueSize = maxQueueSize;
8585
_flushIntervalMillis = flushIntervalMillis;
86+
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
8687

8788
_senderExecutor = new ThreadPoolExecutor(
8889
1,
@@ -122,9 +123,16 @@ public boolean track(Event event, int eventSize) {
122123
if (event == null) {
123124
return false;
124125
}
125-
_eventQueue.put(new WrappedEvent(event, eventSize));
126+
if(_eventQueue.offer(new WrappedEvent(event, eventSize))) {
127+
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_QUEUED, 1);
128+
}
129+
else {
130+
_log.warn("Event dropped.");
131+
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
132+
}
126133

127-
} catch (InterruptedException e) {
134+
} catch (ClassCastException | NullPointerException | IllegalArgumentException e) {
135+
_telemetryRuntimeProducer.recordEventStats(EventsDataRecordsEnum.EVENTS_DROPPED, 1);
128136
_log.warn("Interruption when adding event withed while adding message %s.", event);
129137
return false;
130138
}
@@ -153,7 +161,7 @@ public void run() {
153161
List<Event> events = new ArrayList<>();
154162
long accumulated = 0;
155163
try {
156-
while (true) {
164+
while (!Thread.currentThread().isInterrupted()) {
157165
WrappedEvent data = _eventQueue.take();
158166
Event event = data.event();
159167
Long size = data.size();
@@ -169,7 +177,7 @@ public void run() {
169177

170178
continue;
171179
}
172-
180+
long initTime = System.currentTimeMillis();
173181
if (events.size() >= _maxQueueSize || accumulated >= MAX_SIZE_BYTES || event == CENTINEL) {
174182

175183
// Send over the network
@@ -183,6 +191,8 @@ public void run() {
183191
// Clear the queue of events for the next batch.
184192
events = new ArrayList<>();
185193
accumulated = 0;
194+
_telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.EVENTS, System.currentTimeMillis()-initTime);
195+
_telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.EVENTS, System.currentTimeMillis());
186196
}
187197
}
188198
} catch (InterruptedException e) {

client/src/main/java/io/split/client/HttpSegmentChangeFetcher.java

+44-21
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,18 @@
44
import io.split.client.dtos.SegmentChange;
55
import io.split.client.utils.Json;
66
import io.split.client.utils.Utils;
7+
import io.split.engine.common.FetchOptions;
78
import io.split.engine.metrics.Metrics;
89
import io.split.engine.segments.SegmentChangeFetcher;
10+
import io.split.telemetry.domain.enums.HTTPLatenciesEnum;
11+
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
12+
import io.split.telemetry.domain.enums.ResourceEnum;
13+
import io.split.telemetry.storage.TelemetryRuntimeProducer;
914
import org.apache.hc.client5.http.classic.methods.HttpGet;
1015
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
1116
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
17+
import org.apache.hc.core5.http.HttpStatus;
18+
import org.apache.hc.core5.http.Header;
1219
import org.apache.hc.core5.http.io.entity.EntityUtils;
1320
import org.apache.hc.core5.net.URIBuilder;
1421
import org.slf4j.Logger;
@@ -17,6 +24,8 @@
1724
import java.net.URI;
1825
import java.net.URISyntaxException;
1926
import java.nio.charset.StandardCharsets;
27+
import java.util.Arrays;
28+
import java.util.stream.Collectors;
2029

2130
import static com.google.common.base.Preconditions.checkNotNull;
2231

@@ -27,57 +36,72 @@ public final class HttpSegmentChangeFetcher implements SegmentChangeFetcher {
2736
private static final Logger _log = LoggerFactory.getLogger(HttpSegmentChangeFetcher.class);
2837

2938
private static final String SINCE = "since";
39+
private static final String TILL = "till";
3040
private static final String PREFIX = "segmentChangeFetcher";
31-
private static final String NAME_CACHE = "Cache-Control";
32-
private static final String VALUE_CACHE = "no-cache";
41+
private static final String CACHE_CONTROL_HEADER_NAME = "Cache-Control";
42+
private static final String CACHE_CONTROL_HEADER_VALUE = "no-cache";
43+
44+
private static final String HEADER_FASTLY_DEBUG_NAME = "Fastly-Debug";
45+
private static final String HEADER_FASTLY_DEBUG_VALUE = "1";
3346

3447
private final CloseableHttpClient _client;
3548
private final URI _target;
36-
private final Metrics _metrics;
37-
38-
public static HttpSegmentChangeFetcher create(CloseableHttpClient client, URI root) throws URISyntaxException {
39-
return create(client, root, new Metrics.NoopMetrics());
40-
}
49+
private final TelemetryRuntimeProducer _telemetryRuntimeProducer;
4150

42-
public static HttpSegmentChangeFetcher create(CloseableHttpClient client, URI root, Metrics metrics) throws URISyntaxException {
43-
return new HttpSegmentChangeFetcher(client, Utils.appendPath(root, "api/segmentChanges"), metrics);
51+
public static HttpSegmentChangeFetcher create(CloseableHttpClient client, URI root, TelemetryRuntimeProducer telemetryRuntimeProducer) throws URISyntaxException {
52+
return new HttpSegmentChangeFetcher(client, Utils.appendPath(root, "api/segmentChanges"), telemetryRuntimeProducer);
4453
}
4554

46-
private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, Metrics metrics) {
55+
private HttpSegmentChangeFetcher(CloseableHttpClient client, URI uri, TelemetryRuntimeProducer telemetryRuntimeProducer) {
4756
_client = client;
4857
_target = uri;
49-
_metrics = metrics;
5058
checkNotNull(_target);
59+
_telemetryRuntimeProducer = checkNotNull(telemetryRuntimeProducer);
5160
}
5261

5362
@Override
54-
public SegmentChange fetch(String segmentName, long since, boolean addCacheHeader) {
63+
public SegmentChange fetch(String segmentName, long since, FetchOptions options) {
5564
long start = System.currentTimeMillis();
5665

5766
CloseableHttpResponse response = null;
5867

5968
try {
6069
String path = _target.getPath() + "/" + segmentName;
61-
URI uri = new URIBuilder(_target).setPath(path).addParameter(SINCE, "" + since).build();
70+
URIBuilder uriBuilder = new URIBuilder(_target)
71+
.setPath(path)
72+
.addParameter(SINCE, "" + since);
73+
if (options.hasCustomCN()) {
74+
uriBuilder.addParameter(TILL, "" + options.targetCN());
75+
}
76+
77+
URI uri = uriBuilder.build();
6278
HttpGet request = new HttpGet(uri);
63-
if(addCacheHeader) {
64-
request.setHeader(NAME_CACHE, VALUE_CACHE);
79+
80+
if(options.cacheControlHeadersEnabled()) {
81+
request.setHeader(CACHE_CONTROL_HEADER_NAME, CACHE_CONTROL_HEADER_VALUE);
82+
}
83+
84+
if (options.fastlyDebugHeaderEnabled()) {
85+
request.addHeader(HEADER_FASTLY_DEBUG_NAME, HEADER_FASTLY_DEBUG_VALUE);
6586
}
87+
6688
response = _client.execute(request);
89+
options.handleResponseHeaders(Arrays.stream(response.getHeaders())
90+
.collect(Collectors.toMap(Header::getName, Header::getValue)));
6791

6892
int statusCode = response.getCode();
6993

70-
if (statusCode < 200 || statusCode >= 300) {
94+
if (statusCode < HttpStatus.SC_OK || statusCode >= HttpStatus.SC_MULTIPLE_CHOICES) {
95+
_telemetryRuntimeProducer.recordSyncError(ResourceEnum.SEGMENT_SYNC, statusCode);
7196
_log.error("Response status was: " + statusCode);
72-
if (statusCode == 403) {
97+
if (statusCode == HttpStatus.SC_FORBIDDEN) {
7398
_log.error("factory instantiation: you passed a browser type api_key, " +
7499
"please grab an api key from the Split console that is of type sdk");
75100
}
76-
_metrics.count(PREFIX + ".status." + statusCode, 1);
77101
throw new IllegalStateException("Could not retrieve segment changes for " + segmentName + "; http return code " + statusCode);
78102
}
79103

80-
104+
_telemetryRuntimeProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, System.currentTimeMillis());
81105

82106
String json = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
83107
if (_log.isDebugEnabled()) {
@@ -86,11 +110,10 @@ public SegmentChange fetch(String segmentName, long since, boolean addCacheHeade
86110

87111
return Json.fromJson(json, SegmentChange.class);
88112
} catch (Throwable t) {
89-
_metrics.count(PREFIX + ".exception", 1);
90113
throw new IllegalStateException("Problem fetching segmentChanges: " + t.getMessage(), t);
91114
} finally {
115+
_telemetryRuntimeProducer.recordSyncLatency(HTTPLatenciesEnum.SEGMENTS, System.currentTimeMillis()-start);
92116
Utils.forceClose(response);
93-
_metrics.time(PREFIX + ".time", System.currentTimeMillis() - start);
94117
}
95118

96119

0 commit comments

Comments
 (0)