Skip to content

Commit 357af67

Browse files
FIxing PR comments
1 parent 4529f81 commit 357af67

File tree

6 files changed

+14
-21
lines changed

6 files changed

+14
-21
lines changed

client/src/main/java/io/split/engine/common/PushManagerImp.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.split.engine.sse.EventSourceClientImp;
99
import io.split.engine.sse.PushStatusTracker;
1010
import io.split.engine.sse.PushStatusTrackerImp;
11+
import io.split.engine.sse.client.SSEClient;
1112
import io.split.engine.sse.dtos.AuthenticationResponse;
1213
import io.split.engine.sse.dtos.SegmentQueueDto;
1314
import io.split.engine.sse.workers.SegmentsWorkerImp;
@@ -87,7 +88,7 @@ public synchronized void start() {
8788

8889
stop();
8990
if (response.isRetry()) {
90-
_pushStatusTracker.forceRetryableError();//retriable error
91+
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
9192
} else {
9293
_pushStatusTracker.forcePushDisable();
9394
}

client/src/main/java/io/split/engine/sse/EventSourceClientImp.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private void onMessage(RawEvent event) {
9797
String type = event.event();
9898
String payload = event.data();
9999
if(_firstEvent.compareAndSet(false, true) && !ERROR.equals(type)){
100-
_pushStatusTracker.notifyStreamingReady();
100+
_pushStatusTracker.handleSseStatus(SSEClient.StatusMessage.FIRST_EVENT);
101101
}
102102
if (payload.length() > 0) {
103103
_log.debug(String.format("Payload received: %s", payload));

client/src/main/java/io/split/engine/sse/PushStatusTracker.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,4 @@ public interface PushStatusTracker {
1111
void handleIncomingAblyError(ErrorNotification notification);
1212
void handleSseStatus(SSEClient.StatusMessage newStatus);
1313
void forcePushDisable();
14-
void notifyStreamingReady();
15-
void forceRetryableError();
1614
}

client/src/main/java/io/split/engine/sse/PushStatusTrackerImp.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public PushStatusTrackerImp(LinkedBlockingQueue<PushManager.Status> statusMessag
2828
_statusMessages = statusMessages;
2929
}
3030

31-
public synchronized void reset() {
31+
private synchronized void reset() {
3232
_publishersOnline.set(true);
3333
_sseStatus.set(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS);
3434
_backendStatus.set(ControlType.STREAMING_RESUMED);
@@ -39,10 +39,12 @@ public void handleSseStatus(SSEClient.StatusMessage newStatus) {
3939
_log.debug(String.format("Current status: %s. New status: %s", _sseStatus.get().toString(), newStatus.toString()));
4040

4141
switch(newStatus) {
42-
case CONNECTED:
43-
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS, SSEClient.StatusMessage.CONNECTED)
44-
|| _sseStatus.compareAndSet(SSEClient.StatusMessage.RETRYABLE_ERROR, SSEClient.StatusMessage.CONNECTED)) {
42+
case FIRST_EVENT:
43+
if (SSEClient.StatusMessage.CONNECTED.equals(_sseStatus.get())) {
44+
_statusMessages.offer(PushManager.Status.STREAMING_READY);
4545
}
46+
case CONNECTED:
47+
_sseStatus.compareAndSet(SSEClient.StatusMessage.INITIALIZATION_IN_PROGRESS, SSEClient.StatusMessage.CONNECTED);
4648
break;
4749
case RETRYABLE_ERROR:
4850
if (_sseStatus.compareAndSet(SSEClient.StatusMessage.CONNECTED, SSEClient.StatusMessage.RETRYABLE_ERROR)) {
@@ -134,16 +136,6 @@ public synchronized void forcePushDisable() {
134136
_backendStatus.set(ControlType.STREAMING_DISABLED);
135137
_statusMessages.offer(PushManager.Status.STREAMING_OFF);
136138
}
137-
138-
@Override
139-
public void notifyStreamingReady() {
140-
_statusMessages.offer(PushManager.Status.STREAMING_READY);
141-
}
142-
143-
@Override
144-
public void forceRetryableError() {
145-
_statusMessages.offer(PushManager.Status.STREAMING_BACKOFF);
146-
}
147139

148140
private boolean isPublishers() {
149141
for(Integer publisher : regions.values()) {

client/src/main/java/io/split/engine/sse/client/SSEClient.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ public enum StatusMessage {
3232
RETRYABLE_ERROR,
3333
NONRETRYABLE_ERROR,
3434
INITIALIZATION_IN_PROGRESS,
35-
FORCED_STOP
35+
FORCED_STOP,
36+
FIRST_EVENT
3637
}
3738

3839
private enum ConnectionState {

client/src/test/java/io/split/engine/common/PushManagerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io.split.engine.sse.EventSourceClient;
55
import io.split.engine.sse.PushStatusTracker;
66
import io.split.engine.sse.PushStatusTrackerImp;
7+
import io.split.engine.sse.client.SSEClient;
78
import io.split.engine.sse.dtos.AuthenticationResponse;
89
import io.split.engine.sse.workers.SegmentsWorkerImp;
910
import io.split.engine.sse.workers.SplitsWorker;
@@ -55,7 +56,7 @@ public void startWithPushEnabledShouldConnect() throws InterruptedException {
5556

5657
Thread.sleep(1500);
5758

58-
Mockito.verify(_pushStatusTracker, Mockito.times(0)).forceRetryableError();
59+
Mockito.verify(_pushStatusTracker, Mockito.times(0)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
5960
Mockito.verify(_pushStatusTracker, Mockito.times(0)).forcePushDisable();
6061
}
6162

@@ -98,6 +99,6 @@ public void startWithPushDisabledAndRetryTrueShouldConnect() throws InterruptedE
9899
Mockito.verify(_eventSourceClient, Mockito.times(1)).stop();
99100

100101
Thread.sleep(1500);
101-
Mockito.verify(_pushStatusTracker, Mockito.times(1)).forceRetryableError();
102+
Mockito.verify(_pushStatusTracker, Mockito.times(1)).handleSseStatus(SSEClient.StatusMessage.RETRYABLE_ERROR);
102103
}
103104
}

0 commit comments

Comments
 (0)