1
1
package io .split .engine .sse .client ;
2
2
3
3
import com .google .common .base .Strings ;
4
+ import com .google .common .util .concurrent .ThreadFactoryBuilder ;
4
5
import org .apache .hc .client5 .http .classic .methods .HttpGet ;
5
- import org .apache .hc .client5 .http .config .RequestConfig ;
6
6
import org .apache .hc .client5 .http .impl .classic .CloseableHttpClient ;
7
7
import org .apache .hc .client5 .http .impl .classic .CloseableHttpResponse ;
8
- import org .apache .hc .client5 .http .impl .classic .HttpClients ;
9
- import org .apache .hc .client5 .http .impl .io .PoolingHttpClientConnectionManager ;
10
- import org .apache .hc .core5 .util .Timeout ;
11
8
import org .slf4j .Logger ;
12
9
import org .slf4j .LoggerFactory ;
13
10
19
16
import java .net .SocketException ;
20
17
import java .net .URI ;
21
18
import java .util .concurrent .CountDownLatch ;
19
+ import java .util .concurrent .ExecutorService ;
20
+ import java .util .concurrent .Executors ;
22
21
import java .util .concurrent .TimeUnit ;
23
22
import java .util .concurrent .atomic .AtomicReference ;
24
23
import java .util .function .Function ;
@@ -45,11 +44,16 @@ private enum ConnectionState {
45
44
private final static long CONNECT_TIMEOUT = 30000 ;
46
45
private static final Logger _log = LoggerFactory .getLogger (SSEClient .class );
47
46
47
+ private final ExecutorService _connectionExecutor = Executors .newSingleThreadExecutor (new ThreadFactoryBuilder ()
48
+ .setDaemon (true )
49
+ .setNameFormat ("SPLIT-SSEConnection-%d" )
50
+ .build ());
48
51
private final CloseableHttpClient _client ;
49
52
private final Function <RawEvent , Void > _eventCallback ;
50
53
private final Function <StatusMessage , Void > _statusCallback ;
51
54
private final AtomicReference <ConnectionState > _state = new AtomicReference <>(ConnectionState .CLOSED );
52
55
private final AtomicReference <CloseableHttpResponse > _ongoingResponse = new AtomicReference <>();
56
+ private final AtomicReference <HttpGet > _ongoingRequest = new AtomicReference <>();
53
57
54
58
public SSEClient (Function <RawEvent , Void > eventCallback ,
55
59
Function <StatusMessage , Void > statusCallback ,
@@ -61,23 +65,21 @@ public SSEClient(Function<RawEvent, Void> eventCallback,
61
65
62
66
public synchronized boolean open (URI uri ) {
63
67
if (isOpen ()) {
64
- _log .debug ("SSEClient already open." );
68
+ _log .info ("SSEClient already open." );
65
69
return false ;
66
70
}
67
71
68
72
_statusCallback .apply (StatusMessage .INITIALIZATION_IN_PROGRESS );
69
73
70
74
CountDownLatch signal = new CountDownLatch (1 );
71
- Thread thread = new Thread (() -> connectAndLoop (uri , signal ));
72
- thread .setDaemon (true );
73
- thread .start ();
75
+ _connectionExecutor .submit (() -> connectAndLoop (uri , signal ));
74
76
try {
75
77
if (!signal .await (CONNECT_TIMEOUT , TimeUnit .SECONDS )) {
76
78
return false ;
77
- };
79
+ }
78
80
} catch (InterruptedException e ) {
79
81
Thread .currentThread ().interrupt ();
80
- _log .debug (e .getMessage ());
82
+ _log .info (e .getMessage ());
81
83
return false ;
82
84
}
83
85
return isOpen ();
@@ -91,9 +93,10 @@ public synchronized void close() {
91
93
if (_state .compareAndSet (ConnectionState .OPEN , ConnectionState .CLOSED )) {
92
94
if (_ongoingResponse .get () != null ) {
93
95
try {
96
+ _ongoingRequest .get ().abort ();
94
97
_ongoingResponse .get ().close ();
95
98
} catch (IOException e ) {
96
- _log .debug (String .format ("Error closing SSEClient: %s" , e .getMessage ()));
99
+ _log .info (String .format ("Error closing SSEClient: %s" , e .getMessage ()));
97
100
}
98
101
}
99
102
}
@@ -124,7 +127,7 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
124
127
_statusCallback .apply (StatusMessage .RETRYABLE_ERROR );
125
128
return ;
126
129
} catch (IOException exc ) { // Other type of connection error
127
- _log .debug ( exc .getMessage ());
130
+ _log .info ( String . format ( "SSE connection ended abruptly: %s. Retying" , exc .getMessage () ));
128
131
_statusCallback .apply (StatusMessage .RETRYABLE_ERROR );
129
132
return ;
130
133
}
@@ -145,17 +148,18 @@ private void connectAndLoop(URI uri, CountDownLatch signal) {
145
148
}
146
149
147
150
private boolean establishConnection (URI uri , CountDownLatch signal ) {
148
- HttpGet request = new HttpGet (uri );
151
+
152
+ _ongoingRequest .set (new HttpGet (uri ));
149
153
150
154
try {
151
- _ongoingResponse .set (_client .execute (request ));
155
+ _ongoingResponse .set (_client .execute (_ongoingRequest . get () ));
152
156
if (_ongoingResponse .get ().getCode () != 200 ) {
153
157
return false ;
154
158
}
155
159
_state .set (ConnectionState .OPEN );
156
160
_statusCallback .apply (StatusMessage .CONNECTED );
157
161
} catch (IOException exc ) {
158
- _log .debug (String .format ("Error establishConnection: %s" , exc ));
162
+ _log .error (String .format ("Error establishConnection: %s" , exc ));
159
163
return false ;
160
164
} finally {
161
165
signal .countDown ();
0 commit comments