Skip to content

Commit f03c6ed

Browse files
committed
Implement Persistent Subscription API.
1 parent 08d5c05 commit f03c6ed

9 files changed

+744
-0
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.eventstore.dbclient;
2+
3+
public enum ConsumerStrategy {
4+
DispatchToSingle,
5+
RoundRobin,
6+
Pinned,
7+
}
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
package com.eventstore.dbclient;
2+
3+
import com.eventstore.dbclient.proto.persistentsubscriptions.Persistent;
4+
import com.eventstore.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc;
5+
import com.eventstore.dbclient.proto.shared.Shared;
6+
import com.google.protobuf.ByteString;
7+
import io.grpc.ManagedChannel;
8+
import io.grpc.Metadata;
9+
import io.grpc.Status;
10+
import io.grpc.StatusRuntimeException;
11+
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
12+
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
13+
import io.grpc.stub.ClientCallStreamObserver;
14+
import io.grpc.stub.ClientResponseObserver;
15+
import io.grpc.stub.MetadataUtils;
16+
import io.grpc.stub.StreamObserver;
17+
18+
import javax.validation.constraints.NotNull;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.function.Function;
21+
22+
public class PersistentClient {
23+
private static final Persistent.ReadReq.Options.Builder defaultReadOptions;
24+
25+
private final ManagedChannel _channel;
26+
private final PersistentSubscriptionsGrpc.PersistentSubscriptionsStub _stub;
27+
private final Timeouts _timeouts;
28+
29+
static {
30+
defaultReadOptions = Persistent.ReadReq.Options.newBuilder()
31+
.setUuidOption(Persistent.ReadReq.Options.UUIDOption.newBuilder()
32+
.setStructured(Shared.Empty.getDefaultInstance()));
33+
}
34+
35+
public PersistentClient(String host, int port, UserCredentials defaultCredentials, Timeouts timeouts, SslContext sslContext) {
36+
this(NettyChannelBuilder.forAddress(host, port)
37+
.userAgent("Event Store Client (Java) v1.0.0-SNAPSHOT")
38+
.sslContext(sslContext)
39+
.build(), defaultCredentials, timeouts);
40+
}
41+
42+
public PersistentClient(
43+
@NotNull ManagedChannel channel,
44+
@NotNull UserCredentials credentials,
45+
@NotNull Timeouts timeouts) {
46+
_channel = channel;
47+
_timeouts = timeouts;
48+
49+
Metadata headers = new Metadata();
50+
headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), credentials.basicAuthHeader());
51+
52+
_stub = MetadataUtils.attachHeaders(PersistentSubscriptionsGrpc.newStub(_channel), headers);
53+
}
54+
55+
public void shutdown() throws InterruptedException {
56+
_channel.shutdown().awaitTermination(_timeouts.shutdownTimeout, _timeouts.shutdownTimeoutUnit);
57+
}
58+
59+
public CompletableFuture create(PersistentSubscriptionSettings settings, String stream, String group) {
60+
CompletableFuture result = new CompletableFuture();
61+
Persistent.CreateReq.Options.Builder builder = Persistent.CreateReq.Options.newBuilder();
62+
Persistent.CreateReq.Settings.Builder settingsBuilder = Persistent.CreateReq.Settings.newBuilder();
63+
Shared.StreamIdentifier.Builder streamIdentifierBuilder = Shared.StreamIdentifier.newBuilder();
64+
65+
settingsBuilder.setRevision(settings.getRevision())
66+
.setResolveLinks(settings.isResolveLinks())
67+
.setReadBatchSize(settings.getReadBatchSize())
68+
.setMinCheckpointCount(settings.getMinCheckpointCount())
69+
.setMaxCheckpointCount(settings.getMaxCheckpointCount())
70+
.setMessageTimeoutMs(settings.getMessageTimeoutMs())
71+
.setMaxSubscriberCount(settings.getMaxSubscriberCount())
72+
.setMaxRetryCount(settings.getMaxRetryCount())
73+
.setLiveBufferSize(settings.getLiveBufferSize())
74+
.setHistoryBufferSize(settings.getHistoryBufferSize())
75+
.setExtraStatistics(settings.isExtraStatistics())
76+
.setCheckpointAfterMs(settings.getCheckpointAfterMs());
77+
78+
switch (settings.getStrategy()) {
79+
case DispatchToSingle:
80+
settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.DispatchToSingle);
81+
break;
82+
case RoundRobin:
83+
settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.RoundRobin);
84+
break;
85+
case Pinned:
86+
settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.Pinned);
87+
break;
88+
}
89+
90+
streamIdentifierBuilder.setStreamName(ByteString.copyFromUtf8(stream));
91+
92+
builder.setSettings(settingsBuilder)
93+
.setGroupName(group)
94+
.setStreamIdentifier(streamIdentifierBuilder)
95+
.build();
96+
97+
Persistent.CreateReq req = Persistent.CreateReq.newBuilder()
98+
.setOptions(builder)
99+
.build();
100+
101+
_stub.create(req, convertSingleResponse(result, resp -> {
102+
return resp;
103+
}));
104+
105+
return result;
106+
}
107+
108+
public CompletableFuture update(PersistentSubscriptionSettings settings, String stream, String group) {
109+
CompletableFuture result = new CompletableFuture();
110+
Persistent.UpdateReq.Options.Builder builder = Persistent.UpdateReq.Options.newBuilder();
111+
Persistent.UpdateReq.Settings.Builder settingsBuilder = Persistent.UpdateReq.Settings.newBuilder();
112+
Shared.StreamIdentifier.Builder streamIdentifierBuilder = Shared.StreamIdentifier.newBuilder();
113+
114+
settingsBuilder.setRevision(settings.getRevision())
115+
.setResolveLinks(settings.isResolveLinks())
116+
.setReadBatchSize(settings.getReadBatchSize())
117+
.setMinCheckpointCount(settings.getMinCheckpointCount())
118+
.setMaxCheckpointCount(settings.getMaxCheckpointCount())
119+
.setMessageTimeoutMs(settings.getMessageTimeoutMs())
120+
.setMaxSubscriberCount(settings.getMaxSubscriberCount())
121+
.setMaxRetryCount(settings.getMaxRetryCount())
122+
.setLiveBufferSize(settings.getLiveBufferSize())
123+
.setHistoryBufferSize(settings.getHistoryBufferSize())
124+
.setExtraStatistics(settings.isExtraStatistics())
125+
.setCheckpointAfterMs(settings.getCheckpointAfterMs());
126+
127+
switch (settings.getStrategy()) {
128+
case DispatchToSingle:
129+
settingsBuilder.setNamedConsumerStrategy(Persistent.UpdateReq.ConsumerStrategy.DispatchToSingle);
130+
break;
131+
case RoundRobin:
132+
settingsBuilder.setNamedConsumerStrategy(Persistent.UpdateReq.ConsumerStrategy.RoundRobin);
133+
break;
134+
case Pinned:
135+
settingsBuilder.setNamedConsumerStrategy(Persistent.UpdateReq.ConsumerStrategy.Pinned);
136+
break;
137+
}
138+
139+
streamIdentifierBuilder.setStreamName(ByteString.copyFromUtf8(stream));
140+
141+
builder.setSettings(settingsBuilder)
142+
.setGroupName(group)
143+
.setStreamIdentifier(streamIdentifierBuilder)
144+
.build();
145+
146+
Persistent.UpdateReq req = Persistent.UpdateReq.newBuilder()
147+
.setOptions(builder)
148+
.build();
149+
150+
_stub.update(req, convertSingleResponse(result, resp -> {
151+
return resp;
152+
}));
153+
154+
return result;
155+
}
156+
157+
public CompletableFuture delete(String stream, String group) {
158+
CompletableFuture result = new CompletableFuture();
159+
160+
Shared.StreamIdentifier streamIdentifier =
161+
Shared.StreamIdentifier.newBuilder()
162+
.setStreamName(ByteString.copyFromUtf8(stream))
163+
.build();
164+
165+
Persistent.DeleteReq.Options options = Persistent.DeleteReq.Options.newBuilder()
166+
.setStreamIdentifier(streamIdentifier)
167+
.setGroupName(group)
168+
.build();
169+
170+
Persistent.DeleteReq req = Persistent.DeleteReq.newBuilder()
171+
.setOptions(options)
172+
.build();
173+
174+
_stub.delete(req, convertSingleResponse(result, resp -> resp));
175+
176+
return result;
177+
}
178+
179+
public CompletableFuture<PersistentSubscription> connect(String stream, String group, int bufferSize, PersistentSubscriptionListener listener) {
180+
final CompletableFuture<PersistentSubscription> result = new CompletableFuture<>();
181+
182+
Shared.StreamIdentifier streamIdentifier =
183+
Shared.StreamIdentifier.newBuilder()
184+
.setStreamName(ByteString.copyFromUtf8(stream))
185+
.build();
186+
187+
Persistent.ReadReq.Options options = defaultReadOptions.clone()
188+
.setBufferSize(bufferSize)
189+
.setStreamIdentifier(streamIdentifier)
190+
.setGroupName(group)
191+
.build();
192+
193+
Persistent.ReadReq req = Persistent.ReadReq.newBuilder()
194+
.setOptions(options)
195+
.build();
196+
197+
ClientResponseObserver<Persistent.ReadReq, Persistent.ReadResp> observer = new ClientResponseObserver<Persistent.ReadReq, Persistent.ReadResp>() {
198+
private boolean _confirmed;
199+
private PersistentSubscription _subscription;
200+
private ClientCallStreamObserver<Persistent.ReadReq> _requestStream;
201+
202+
@Override
203+
public void beforeStart(ClientCallStreamObserver<Persistent.ReadReq> requestStream) {
204+
this._requestStream = requestStream;
205+
}
206+
207+
@Override
208+
public void onNext(Persistent.ReadResp readResp) {
209+
if (!_confirmed && readResp.hasSubscriptionConfirmation()) {
210+
this._confirmed = true;
211+
this._subscription = new PersistentSubscription(this._requestStream, readResp.getSubscriptionConfirmation().getSubscriptionId());
212+
result.complete(this._subscription);
213+
return;
214+
}
215+
216+
if (!_confirmed && readResp.hasEvent()) {
217+
onError(new IllegalStateException("Unconfirmed persistent subscription received event"));
218+
return;
219+
}
220+
221+
if (_confirmed && !readResp.hasEvent()) {
222+
onError(new IllegalStateException(
223+
String.format("Confirmed persistent subscription %s received non-{event,checkpoint} variant",
224+
_subscription.getSubscriptionId())));
225+
return;
226+
}
227+
228+
listener.onEvent(this._subscription, ResolvedEvent.fromWire(readResp.getEvent()));
229+
}
230+
231+
@Override
232+
public void onError(Throwable t) {
233+
if (t instanceof StatusRuntimeException) {
234+
Status s = ((StatusRuntimeException) t).getStatus();
235+
if (s.getCode() == Status.Code.CANCELLED) {
236+
listener.onCancelled(this._subscription);
237+
return;
238+
}
239+
}
240+
241+
listener.onError(this._subscription, t);
242+
}
243+
244+
@Override
245+
public void onCompleted() {
246+
// Subscriptions should only complete on error.
247+
}
248+
};
249+
250+
StreamObserver<Persistent.ReadReq> wireStream = _stub.read(observer);
251+
wireStream.onNext(req);
252+
253+
return result;
254+
}
255+
256+
private <ReqT, RespT, TargetT> ClientResponseObserver<ReqT, RespT> convertSingleResponse(
257+
CompletableFuture<TargetT> dest, Function<RespT, TargetT> converter) {
258+
return new ClientResponseObserver<ReqT, RespT>() {
259+
@Override
260+
public void beforeStart(ClientCallStreamObserver<ReqT> requestStream) {
261+
}
262+
263+
@Override
264+
public void onNext(RespT value) {
265+
try {
266+
TargetT converted = converter.apply(value);
267+
dest.complete(converted);
268+
} catch (Throwable e) {
269+
dest.completeExceptionally(e);
270+
}
271+
}
272+
273+
@Override
274+
public void onError(Throwable t) {
275+
dest.completeExceptionally(t);
276+
}
277+
278+
@Override
279+
public void onCompleted() {
280+
}
281+
};
282+
}
283+
}
284+
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.eventstore.dbclient;
2+
3+
import com.eventstore.dbclient.proto.persistentsubscriptions.Persistent;
4+
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;
5+
import io.grpc.stub.ClientCallStreamObserver;
6+
7+
public class PersistentSubscription {
8+
private final ClientCallStreamObserver<Persistent.ReadReq> _requestStream;
9+
private final String _subscriptionId;
10+
11+
public PersistentSubscription(ClientCallStreamObserver<Persistent.ReadReq> requestStream, String subscriptionId) {
12+
this._requestStream = requestStream;
13+
this._subscriptionId = subscriptionId;
14+
}
15+
16+
public String getSubscriptionId() {
17+
return _subscriptionId;
18+
}
19+
20+
public void stop() {
21+
this._requestStream.cancel("user-initiated", null);
22+
}
23+
24+
}
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package com.eventstore.dbclient;
2+
3+
public abstract class PersistentSubscriptionListener {
4+
public void onEvent(PersistentSubscription subscription, ResolvedEvent event) {
5+
}
6+
7+
public void onError(PersistentSubscription subscription, Throwable throwable) {
8+
}
9+
10+
public void onCancelled(PersistentSubscription subscription) {
11+
}
12+
}

0 commit comments

Comments
 (0)