Skip to content

Implement Persistent Subscription API. #35

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.eventstore.dbclient;

public enum ConsumerStrategy {
DispatchToSingle,
RoundRobin,
Pinned,
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ public class EventStoreConnection {
private UserCredentials userCredentials = null;
private NodePreference nodePreference;
private boolean requiresLeader;
private boolean insecure;

public EventStoreConnection(Endpoint endpoint, Endpoint[] gossipSeeds, String domain, SslContext sslContext, UserCredentials userCredentials, NodePreference nodePreference, boolean requiresLeader, Timeouts timeouts) {
public EventStoreConnection(Endpoint endpoint, Endpoint[] gossipSeeds, String domain, SslContext sslContext, UserCredentials userCredentials, NodePreference nodePreference, boolean requiresLeader, boolean insecure, Timeouts timeouts) {
this.endpoint = endpoint;
this.gossipSeeds = gossipSeeds;
this.domain = domain;
Expand All @@ -30,17 +31,7 @@ public EventStoreConnection(Endpoint endpoint, Endpoint[] gossipSeeds, String do
this.timeouts = timeouts;
this.nodePreference = nodePreference;
this.requiresLeader = requiresLeader;

if (sslContext == null) {
try {
this.sslContext = GrpcSslContexts.
forClient().
trustManager(InsecureTrustManagerFactory.INSTANCE).
build();
} catch (SSLException e) {
throw new RuntimeException(e);
}
}
this.insecure = insecure;
}

public static EventStoreConnectionBuilder builder() {
Expand All @@ -51,10 +42,14 @@ public StreamsClient newStreamsClient() {
return new StreamsClient(createManagedChannel(), userCredentials, requiresLeader, timeouts);
}

public PersistentClient newPersistentClient() {
return new PersistentClient(createManagedChannel(), userCredentials, requiresLeader, timeouts);
}

private ManagedChannel createManagedChannel() {
ManagedChannel channel = null;
List<InetSocketAddress> addresses = null;
String target = domain != null ? domain : "";
NettyChannelBuilder builder = null;

if (gossipSeeds != null) {
addresses = new ArrayList<>();
Expand All @@ -72,19 +67,21 @@ private ManagedChannel createManagedChannel() {
.getDefaultRegistry()
.register(new ClusterResolverFactory(addresses, nodePreference, timeouts, sslContext));

channel = NettyChannelBuilder
.forTarget(target)
.userAgent("Event Store Client (Java)")
.sslContext(sslContext)
.build();
builder = NettyChannelBuilder
.forTarget(target);
} else {
channel = NettyChannelBuilder
.forAddress(endpoint.getHostname(), endpoint.getPort())
.userAgent("Event Store Client (Java)")
.sslContext(sslContext)
.build();
builder = NettyChannelBuilder
.forAddress(endpoint.getHostname(), endpoint.getPort());
}

if (insecure) {
builder.usePlaintext();
} else if (sslContext != null) {
builder.sslContext(sslContext);
}

return channel;
return builder
.userAgent("Event Store Client (Java)")
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public class EventStoreConnectionBuilder {
private SslContext _sslContext = null;
private Endpoint endpoint = null;
private boolean requiresLeader = false;
private boolean insecure = false;

public EventStoreConnectionBuilder() {
_timeouts = Timeouts.DEFAULT;
Expand All @@ -24,7 +25,7 @@ public EventStoreConnectionBuilder connectionTimeouts(Timeouts timeouts) {
}

public EventStoreConnectionBuilder insecure() {
_sslContext = null;
this.insecure = true;
return this;
}

Expand All @@ -47,22 +48,22 @@ public EventStoreConnection createSingleNodeConnection(String hostname, int port
}

public EventStoreConnection createSingleNodeConnection(Endpoint endpoint) {
return new EventStoreConnection(endpoint, null, null, _sslContext, _defaultUserCredentials, NodePreference.RANDOM, requiresLeader, _timeouts);
return new EventStoreConnection(endpoint, null, null, _sslContext, _defaultUserCredentials, NodePreference.RANDOM, requiresLeader, insecure, _timeouts);
}

public EventStoreConnection createClusterConnectionUsingSeeds(Endpoint[] endpoints) {
return createClusterConnectionUsingSeeds(endpoints, NodePreference.RANDOM);
}

public EventStoreConnection createClusterConnectionUsingSeeds(Endpoint[] endpoints, NodePreference nodePreference) {
return new EventStoreConnection(null, endpoints, null, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, _timeouts);
return new EventStoreConnection(null, endpoints, null, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, insecure, _timeouts);
}

public EventStoreConnection createClusterConnectionUsingDns(String domain) {
return createClusterConnectionUsingDns(domain, NodePreference.RANDOM);
}

public EventStoreConnection createClusterConnectionUsingDns(String domain, NodePreference nodePreference) {
return new EventStoreConnection(null, null, domain, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, _timeouts);
return new EventStoreConnection(null, null, domain, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, insecure, _timeouts);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.eventstore.dbclient;

import com.eventstore.dbclient.proto.shared.Shared;
import com.eventstore.dbclient.proto.streams.StreamsOuterClass;

public abstract class ExpectedRevision {
public final static ExpectedRevision ANY = new AnyExpectedRevision();
public final static ExpectedRevision NO_STREAM = new NoStreamExpectedRevision();
public final static ExpectedRevision STREAM_EXISTS = new StreamExistsExpectedRevision();
public static ExpectedRevision expectedRevision(long revision) {
return new SpecificExpectedRevision(revision);
}

abstract public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options);

static class NoStreamExpectedRevision extends ExpectedRevision {
@Override
public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) {
return options.setNoStream(Shared.Empty.getDefaultInstance());
}
}

static class AnyExpectedRevision extends ExpectedRevision {
@Override
public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) {
return options.setAny(Shared.Empty.getDefaultInstance());
}
}

static class StreamExistsExpectedRevision extends ExpectedRevision {
@Override
public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) {
return options.setStreamExists(Shared.Empty.getDefaultInstance());
}
}

static class SpecificExpectedRevision extends ExpectedRevision {
final long version;

SpecificExpectedRevision(long version) {
this.version = version;
}

@Override
public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) {
return options.setRevision(version);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.eventstore.dbclient;

public enum NackAction {
Park,
Retry,
Skip,
Stop,
}
Loading