Skip to content

Commit 51e57ec

Browse files
committed
Implement Persistent Subscription API.
1 parent f707d61 commit 51e57ec

19 files changed

+1101
-46
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package com.eventstore.dbclient;
2+
3+
public enum ConsumerStrategy {
4+
DispatchToSingle,
5+
RoundRobin,
6+
Pinned,
7+
}

db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnection.java

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ public class EventStoreConnection {
2020
private UserCredentials userCredentials = null;
2121
private NodePreference nodePreference;
2222
private boolean requiresLeader;
23+
private boolean insecure;
2324

24-
public EventStoreConnection(Endpoint endpoint, Endpoint[] gossipSeeds, String domain, SslContext sslContext, UserCredentials userCredentials, NodePreference nodePreference, boolean requiresLeader, Timeouts timeouts) {
25+
public EventStoreConnection(Endpoint endpoint, Endpoint[] gossipSeeds, String domain, SslContext sslContext, UserCredentials userCredentials, NodePreference nodePreference, boolean requiresLeader, boolean insecure, Timeouts timeouts) {
2526
this.endpoint = endpoint;
2627
this.gossipSeeds = gossipSeeds;
2728
this.domain = domain;
@@ -30,17 +31,7 @@ public EventStoreConnection(Endpoint endpoint, Endpoint[] gossipSeeds, String do
3031
this.timeouts = timeouts;
3132
this.nodePreference = nodePreference;
3233
this.requiresLeader = requiresLeader;
33-
34-
if (sslContext == null) {
35-
try {
36-
this.sslContext = GrpcSslContexts.
37-
forClient().
38-
trustManager(InsecureTrustManagerFactory.INSTANCE).
39-
build();
40-
} catch (SSLException e) {
41-
throw new RuntimeException(e);
42-
}
43-
}
34+
this.insecure = insecure;
4435
}
4536

4637
public static EventStoreConnectionBuilder builder() {
@@ -51,10 +42,14 @@ public StreamsClient newStreamsClient() {
5142
return new StreamsClient(createManagedChannel(), userCredentials, requiresLeader, timeouts);
5243
}
5344

45+
public PersistentClient newPersistentClient() {
46+
return new PersistentClient(createManagedChannel(), userCredentials, requiresLeader, timeouts);
47+
}
48+
5449
private ManagedChannel createManagedChannel() {
55-
ManagedChannel channel = null;
5650
List<InetSocketAddress> addresses = null;
5751
String target = domain != null ? domain : "";
52+
NettyChannelBuilder builder = null;
5853

5954
if (gossipSeeds != null) {
6055
addresses = new ArrayList<>();
@@ -72,19 +67,21 @@ private ManagedChannel createManagedChannel() {
7267
.getDefaultRegistry()
7368
.register(new ClusterResolverFactory(addresses, nodePreference, timeouts, sslContext));
7469

75-
channel = NettyChannelBuilder
76-
.forTarget(target)
77-
.userAgent("Event Store Client (Java)")
78-
.sslContext(sslContext)
79-
.build();
70+
builder = NettyChannelBuilder
71+
.forTarget(target);
8072
} else {
81-
channel = NettyChannelBuilder
82-
.forAddress(endpoint.getHostname(), endpoint.getPort())
83-
.userAgent("Event Store Client (Java)")
84-
.sslContext(sslContext)
85-
.build();
73+
builder = NettyChannelBuilder
74+
.forAddress(endpoint.getHostname(), endpoint.getPort());
75+
}
76+
77+
if (insecure) {
78+
builder.usePlaintext();
79+
} else if (sslContext != null) {
80+
builder.sslContext(sslContext);
8681
}
8782

88-
return channel;
83+
return builder
84+
.userAgent("Event Store Client (Java)")
85+
.build();
8986
}
9087
}

db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnectionBuilder.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ public class EventStoreConnectionBuilder {
88
private SslContext _sslContext = null;
99
private Endpoint endpoint = null;
1010
private boolean requiresLeader = false;
11+
private boolean insecure = false;
1112

1213
public EventStoreConnectionBuilder() {
1314
_timeouts = Timeouts.DEFAULT;
@@ -24,7 +25,7 @@ public EventStoreConnectionBuilder connectionTimeouts(Timeouts timeouts) {
2425
}
2526

2627
public EventStoreConnectionBuilder insecure() {
27-
_sslContext = null;
28+
this.insecure = true;
2829
return this;
2930
}
3031

@@ -47,22 +48,22 @@ public EventStoreConnection createSingleNodeConnection(String hostname, int port
4748
}
4849

4950
public EventStoreConnection createSingleNodeConnection(Endpoint endpoint) {
50-
return new EventStoreConnection(endpoint, null, null, _sslContext, _defaultUserCredentials, NodePreference.RANDOM, requiresLeader, _timeouts);
51+
return new EventStoreConnection(endpoint, null, null, _sslContext, _defaultUserCredentials, NodePreference.RANDOM, requiresLeader, insecure, _timeouts);
5152
}
5253

5354
public EventStoreConnection createClusterConnectionUsingSeeds(Endpoint[] endpoints) {
5455
return createClusterConnectionUsingSeeds(endpoints, NodePreference.RANDOM);
5556
}
5657

5758
public EventStoreConnection createClusterConnectionUsingSeeds(Endpoint[] endpoints, NodePreference nodePreference) {
58-
return new EventStoreConnection(null, endpoints, null, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, _timeouts);
59+
return new EventStoreConnection(null, endpoints, null, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, insecure, _timeouts);
5960
}
6061

6162
public EventStoreConnection createClusterConnectionUsingDns(String domain) {
6263
return createClusterConnectionUsingDns(domain, NodePreference.RANDOM);
6364
}
6465

6566
public EventStoreConnection createClusterConnectionUsingDns(String domain, NodePreference nodePreference) {
66-
return new EventStoreConnection(null, null, domain, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, _timeouts);
67+
return new EventStoreConnection(null, null, domain, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, insecure, _timeouts);
6768
}
6869
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.eventstore.dbclient;
2+
3+
import com.eventstore.dbclient.proto.shared.Shared;
4+
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;
5+
6+
public abstract class ExpectedRevision {
7+
public final static ExpectedRevision ANY = new AnyExpectedRevision();
8+
public final static ExpectedRevision NO_STREAM = new NoStreamExpectedRevision();
9+
public final static ExpectedRevision STREAM_EXISTS = new StreamExistsExpectedRevision();
10+
public static ExpectedRevision expectedRevision(long revision) {
11+
return new SpecificExpectedRevision(revision);
12+
}
13+
14+
abstract public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options);
15+
16+
static class NoStreamExpectedRevision extends ExpectedRevision {
17+
@Override
18+
public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) {
19+
return options.setNoStream(Shared.Empty.getDefaultInstance());
20+
}
21+
}
22+
23+
static class AnyExpectedRevision extends ExpectedRevision {
24+
@Override
25+
public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) {
26+
return options.setAny(Shared.Empty.getDefaultInstance());
27+
}
28+
}
29+
30+
static class StreamExistsExpectedRevision extends ExpectedRevision {
31+
@Override
32+
public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) {
33+
return options.setStreamExists(Shared.Empty.getDefaultInstance());
34+
}
35+
}
36+
37+
static class SpecificExpectedRevision extends ExpectedRevision {
38+
final long version;
39+
40+
SpecificExpectedRevision(long version) {
41+
this.version = version;
42+
}
43+
44+
@Override
45+
public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) {
46+
return options.setRevision(version);
47+
}
48+
}
49+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.eventstore.dbclient;
2+
3+
public enum NackAction {
4+
Park,
5+
Retry,
6+
Skip,
7+
Stop,
8+
}

0 commit comments

Comments
 (0)