Skip to content

Commit c43f1f9

Browse files
PR Comments
1 parent bc42070 commit c43f1f9

File tree

4 files changed

+42
-16
lines changed

4 files changed

+42
-16
lines changed

client/src/main/java/io/split/engine/common/SyncManagerImp.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,13 @@ private void startStreamingMode() {
157157
_pushStatusMonitorTask = _executorService.submit(this::incomingPushStatusHandler);
158158
}
159159
_pushManager.start();
160-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.getType(), StreamEventsEnum.StreamEventsValues.STREAMING_EVENT.getValue(), System.currentTimeMillis()));
160+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.getType(), StreamEventsEnum.SyncModeUpdateValues.STREAMING_EVENT.getValue(), System.currentTimeMillis()));
161161
}
162162

163163
private void startPollingMode() {
164164
_log.debug("Starting in polling mode ...");
165165
_synchronizer.startPeriodicFetching();
166-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.getType(), StreamEventsEnum.StreamEventsValues.POLLING_EVENT.getValue(), System.currentTimeMillis()));
166+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SYNC_MODE_UPDATE.getType(), StreamEventsEnum.SyncModeUpdateValues.POLLING_EVENT.getValue(), System.currentTimeMillis()));
167167
}
168168

169169
@VisibleForTesting
@@ -179,7 +179,7 @@ private void startPollingMode() {
179179
_pushManager.startWorkers();
180180
_pushManager.scheduleConnectionReset();
181181
_backoff.reset();
182-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(), StreamEventsEnum.StreamEventsValues.STREAMING_ENABLED.getValue(), System.currentTimeMillis()));
182+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(), StreamEventsEnum.StreamingStatusValues.STREAMING_ENABLED.getValue(), System.currentTimeMillis()));
183183
_log.info("Streaming up and running.");
184184
break;
185185
case STREAMING_DOWN:

client/src/main/java/io/split/engine/sse/PushStatusTrackerImp.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -96,14 +96,14 @@ public void handleIncomingControlEvent(ControlNotification controlNotification)
9696
}
9797
break;
9898
case STREAMING_PAUSED:
99-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(), StreamEventsEnum.StreamEventsValues.STREAMING_PAUSED.getValue(), System.currentTimeMillis()));
99+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(), StreamEventsEnum.StreamingStatusValues.STREAMING_PAUSED.getValue(), System.currentTimeMillis()));
100100
if (_backendStatus.compareAndSet(ControlType.STREAMING_RESUMED, ControlType.STREAMING_PAUSED) && _publishersOnline.get()) {
101101
// If there are no publishers online, the STREAMING_DOWN message should have already been sent
102102
_statusMessages.offer(PushManager.Status.STREAMING_DOWN);
103103
}
104104
break;
105105
case STREAMING_DISABLED:
106-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(), StreamEventsEnum.StreamEventsValues.STREAMING_DISABLED.getValue(), System.currentTimeMillis()));
106+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.STREAMING_STATUS.getType(), StreamEventsEnum.StreamingStatusValues.STREAMING_DISABLED.getValue(), System.currentTimeMillis()));
107107
_backendStatus.set(ControlType.STREAMING_DISABLED);
108108
_statusMessages.offer(PushManager.Status.STREAMING_OFF);
109109
break;

client/src/main/java/io/split/engine/sse/client/SSEClient.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -133,27 +133,27 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
133133
_log.debug(exc.getMessage());
134134
if (SOCKET_CLOSED_MESSAGE.equals(exc.getMessage())) { // Connection closed by us
135135
_statusCallback.apply(StatusMessage.FORCED_STOP);
136-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.StreamEventsValues.REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
136+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
137137
return;
138138
}
139139
// Connection closed by server
140140
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
141-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.StreamEventsValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
141+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
142142
return;
143143
} catch (IOException exc) { // Other type of connection error
144144
if(!_forcedStop.get()) {
145145
_log.debug(String.format("SSE connection ended abruptly: %s. Retying", exc.getMessage()));
146-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.StreamEventsValues.REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
146+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
147147
_statusCallback.apply(StatusMessage.RETRYABLE_ERROR);
148148
return;
149149
}
150150

151-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.StreamEventsValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
151+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
152152
}
153153
}
154154
} catch (Exception e) { // Any other error non related to the connection disables streaming altogether
155155

156-
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.StreamEventsValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
156+
_telemetryRuntimeProducer.recordStreamingEvents(new StreamingEvent(StreamEventsEnum.SSE_CONNECTION_ERROR.getType(), StreamEventsEnum.SseConnectionErrorValues.NON_REQUESTED_CONNECTION_ERROR.getValue(), System.currentTimeMillis()));
157157
_log.warn(e.getMessage(), e);
158158
_statusCallback.apply(StatusMessage.NONRETRYABLE_ERROR);
159159
} finally {

client/src/main/java/io/split/telemetry/domain/enums/StreamEventsEnum.java

+32-6
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,44 @@ public int getType() {
2121
return _type;
2222
}
2323

24-
public enum StreamEventsValues {
24+
public enum StreamingStatusValues {
2525
STREAMING_DISABLED(0),
2626
STREAMING_PAUSED(2),
27-
STREAMING_EVENT(0),
28-
POLLING_EVENT(1),
29-
REQUESTED_CONNECTION_ERROR(0),
30-
NON_REQUESTED_CONNECTION_ERROR (1),
3127
STREAMING_ENABLED(1);
3228

3329
private long _value;
3430

35-
StreamEventsValues(long value) {
31+
StreamingStatusValues(long value) {
32+
_value = value;
33+
}
34+
35+
public long getValue() {
36+
return _value;
37+
}
38+
}
39+
40+
public enum SseConnectionErrorValues {
41+
REQUESTED_CONNECTION_ERROR(0),
42+
NON_REQUESTED_CONNECTION_ERROR (1);
43+
44+
private long _value;
45+
46+
SseConnectionErrorValues(long value) {
47+
_value = value;
48+
}
49+
50+
public long getValue() {
51+
return _value;
52+
}
53+
}
54+
55+
public enum SyncModeUpdateValues {
56+
STREAMING_EVENT(0),
57+
POLLING_EVENT(1);
58+
59+
private long _value;
60+
61+
SyncModeUpdateValues(long value) {
3662
_value = value;
3763
}
3864

0 commit comments

Comments
 (0)