diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ConsumerStrategy.java b/db-client-java/src/main/java/com/eventstore/dbclient/ConsumerStrategy.java new file mode 100644 index 00000000..6c92b838 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ConsumerStrategy.java @@ -0,0 +1,7 @@ +package com.eventstore.dbclient; + +public enum ConsumerStrategy { + DispatchToSingle, + RoundRobin, + Pinned, +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnection.java b/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnection.java index ef1635aa..7feabf85 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnection.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnection.java @@ -20,8 +20,9 @@ public class EventStoreConnection { private UserCredentials userCredentials = null; private NodePreference nodePreference; private boolean requiresLeader; + private boolean insecure; - public EventStoreConnection(Endpoint endpoint, Endpoint[] gossipSeeds, String domain, SslContext sslContext, UserCredentials userCredentials, NodePreference nodePreference, boolean requiresLeader, Timeouts timeouts) { + public EventStoreConnection(Endpoint endpoint, Endpoint[] gossipSeeds, String domain, SslContext sslContext, UserCredentials userCredentials, NodePreference nodePreference, boolean requiresLeader, boolean insecure, Timeouts timeouts) { this.endpoint = endpoint; this.gossipSeeds = gossipSeeds; this.domain = domain; @@ -30,17 +31,7 @@ public EventStoreConnection(Endpoint endpoint, Endpoint[] gossipSeeds, String do this.timeouts = timeouts; this.nodePreference = nodePreference; this.requiresLeader = requiresLeader; - - if (sslContext == null) { - try { - this.sslContext = GrpcSslContexts. - forClient(). - trustManager(InsecureTrustManagerFactory.INSTANCE). - build(); - } catch (SSLException e) { - throw new RuntimeException(e); - } - } + this.insecure = insecure; } public static EventStoreConnectionBuilder builder() { @@ -51,10 +42,14 @@ public StreamsClient newStreamsClient() { return new StreamsClient(createManagedChannel(), userCredentials, requiresLeader, timeouts); } + public PersistentClient newPersistentClient() { + return new PersistentClient(createManagedChannel(), userCredentials, requiresLeader, timeouts); + } + private ManagedChannel createManagedChannel() { - ManagedChannel channel = null; List addresses = null; String target = domain != null ? domain : ""; + NettyChannelBuilder builder = null; if (gossipSeeds != null) { addresses = new ArrayList<>(); @@ -72,19 +67,21 @@ private ManagedChannel createManagedChannel() { .getDefaultRegistry() .register(new ClusterResolverFactory(addresses, nodePreference, timeouts, sslContext)); - channel = NettyChannelBuilder - .forTarget(target) - .userAgent("Event Store Client (Java)") - .sslContext(sslContext) - .build(); + builder = NettyChannelBuilder + .forTarget(target); } else { - channel = NettyChannelBuilder - .forAddress(endpoint.getHostname(), endpoint.getPort()) - .userAgent("Event Store Client (Java)") - .sslContext(sslContext) - .build(); + builder = NettyChannelBuilder + .forAddress(endpoint.getHostname(), endpoint.getPort()); + } + + if (insecure) { + builder.usePlaintext(); + } else if (sslContext != null) { + builder.sslContext(sslContext); } - return channel; + return builder + .userAgent("Event Store Client (Java)") + .build(); } } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnectionBuilder.java b/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnectionBuilder.java index 41867130..4c2e358a 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnectionBuilder.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/EventStoreConnectionBuilder.java @@ -8,6 +8,7 @@ public class EventStoreConnectionBuilder { private SslContext _sslContext = null; private Endpoint endpoint = null; private boolean requiresLeader = false; + private boolean insecure = false; public EventStoreConnectionBuilder() { _timeouts = Timeouts.DEFAULT; @@ -24,7 +25,7 @@ public EventStoreConnectionBuilder connectionTimeouts(Timeouts timeouts) { } public EventStoreConnectionBuilder insecure() { - _sslContext = null; + this.insecure = true; return this; } @@ -47,7 +48,7 @@ public EventStoreConnection createSingleNodeConnection(String hostname, int port } public EventStoreConnection createSingleNodeConnection(Endpoint endpoint) { - return new EventStoreConnection(endpoint, null, null, _sslContext, _defaultUserCredentials, NodePreference.RANDOM, requiresLeader, _timeouts); + return new EventStoreConnection(endpoint, null, null, _sslContext, _defaultUserCredentials, NodePreference.RANDOM, requiresLeader, insecure, _timeouts); } public EventStoreConnection createClusterConnectionUsingSeeds(Endpoint[] endpoints) { @@ -55,7 +56,7 @@ public EventStoreConnection createClusterConnectionUsingSeeds(Endpoint[] endpoin } public EventStoreConnection createClusterConnectionUsingSeeds(Endpoint[] endpoints, NodePreference nodePreference) { - return new EventStoreConnection(null, endpoints, null, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, _timeouts); + return new EventStoreConnection(null, endpoints, null, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, insecure, _timeouts); } public EventStoreConnection createClusterConnectionUsingDns(String domain) { @@ -63,6 +64,6 @@ public EventStoreConnection createClusterConnectionUsingDns(String domain) { } public EventStoreConnection createClusterConnectionUsingDns(String domain, NodePreference nodePreference) { - return new EventStoreConnection(null, null, domain, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, _timeouts); + return new EventStoreConnection(null, null, domain, _sslContext, _defaultUserCredentials, nodePreference, requiresLeader, insecure, _timeouts); } } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ExpectedRevision.java b/db-client-java/src/main/java/com/eventstore/dbclient/ExpectedRevision.java new file mode 100644 index 00000000..6d5a3655 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ExpectedRevision.java @@ -0,0 +1,49 @@ +package com.eventstore.dbclient; + +import com.eventstore.dbclient.proto.shared.Shared; +import com.eventstore.dbclient.proto.streams.StreamsOuterClass; + +public abstract class ExpectedRevision { + public final static ExpectedRevision ANY = new AnyExpectedRevision(); + public final static ExpectedRevision NO_STREAM = new NoStreamExpectedRevision(); + public final static ExpectedRevision STREAM_EXISTS = new StreamExistsExpectedRevision(); + public static ExpectedRevision expectedRevision(long revision) { + return new SpecificExpectedRevision(revision); + } + + abstract public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options); + + static class NoStreamExpectedRevision extends ExpectedRevision { + @Override + public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) { + return options.setNoStream(Shared.Empty.getDefaultInstance()); + } + } + + static class AnyExpectedRevision extends ExpectedRevision { + @Override + public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) { + return options.setAny(Shared.Empty.getDefaultInstance()); + } + } + + static class StreamExistsExpectedRevision extends ExpectedRevision { + @Override + public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) { + return options.setStreamExists(Shared.Empty.getDefaultInstance()); + } + } + + static class SpecificExpectedRevision extends ExpectedRevision { + final long version; + + SpecificExpectedRevision(long version) { + this.version = version; + } + + @Override + public StreamsOuterClass.AppendReq.Options.Builder applyOnWire(StreamsOuterClass.AppendReq.Options.Builder options) { + return options.setRevision(version); + } + } +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/NackAction.java b/db-client-java/src/main/java/com/eventstore/dbclient/NackAction.java new file mode 100644 index 00000000..6dc1ec2d --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/NackAction.java @@ -0,0 +1,8 @@ +package com.eventstore.dbclient; + +public enum NackAction { + Park, + Retry, + Skip, + Stop, +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/PersistentClient.java b/db-client-java/src/main/java/com/eventstore/dbclient/PersistentClient.java new file mode 100644 index 00000000..24ea0b31 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/PersistentClient.java @@ -0,0 +1,285 @@ +package com.eventstore.dbclient; + +import com.eventstore.dbclient.proto.persistentsubscriptions.Persistent; +import com.eventstore.dbclient.proto.persistentsubscriptions.PersistentSubscriptionsGrpc; +import com.eventstore.dbclient.proto.shared.Shared; +import com.google.protobuf.ByteString; +import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.stub.ClientCallStreamObserver; +import io.grpc.stub.ClientResponseObserver; +import io.grpc.stub.MetadataUtils; +import io.grpc.stub.StreamObserver; + +import javax.validation.constraints.NotNull; +import java.util.concurrent.CompletableFuture; +import java.util.function.Function; + +public class PersistentClient { + private static final Persistent.ReadReq.Options.Builder defaultReadOptions; + + private final ManagedChannel _channel; + private final PersistentSubscriptionsGrpc.PersistentSubscriptionsStub _stub; + private final Timeouts _timeouts; + + static { + defaultReadOptions = Persistent.ReadReq.Options.newBuilder() + .setUuidOption(Persistent.ReadReq.Options.UUIDOption.newBuilder() + .setStructured(Shared.Empty.getDefaultInstance())); + } + + public PersistentClient( + @NotNull ManagedChannel channel, + UserCredentials credentials, + boolean requiresLeader, + @NotNull Timeouts timeouts) { + _channel = channel; + _timeouts = timeouts; + + Metadata headers = new Metadata(); + + if (credentials != null) { + headers.put(Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER), credentials.basicAuthHeader()); + } + + if (requiresLeader) { + headers.put(Metadata.Key.of("requires-leader", Metadata.ASCII_STRING_MARSHALLER), String.valueOf(requiresLeader)); + } + + _stub = MetadataUtils.attachHeaders(PersistentSubscriptionsGrpc.newStub(_channel), headers); + } + + public void shutdown() throws InterruptedException { + _channel.shutdown().awaitTermination(_timeouts.shutdownTimeout, _timeouts.shutdownTimeoutUnit); + } + + public CompletableFuture create(PersistentSubscriptionSettings settings, String stream, String group) { + CompletableFuture result = new CompletableFuture(); + Persistent.CreateReq.Options.Builder builder = Persistent.CreateReq.Options.newBuilder(); + Persistent.CreateReq.Settings.Builder settingsBuilder = Persistent.CreateReq.Settings.newBuilder(); + Shared.StreamIdentifier.Builder streamIdentifierBuilder = Shared.StreamIdentifier.newBuilder(); + + settingsBuilder.setRevision(settings.getRevision()) + .setResolveLinks(settings.isResolveLinks()) + .setReadBatchSize(settings.getReadBatchSize()) + .setMinCheckpointCount(settings.getMinCheckpointCount()) + .setMaxCheckpointCount(settings.getMaxCheckpointCount()) + .setMessageTimeoutMs(settings.getMessageTimeoutMs()) + .setMaxSubscriberCount(settings.getMaxSubscriberCount()) + .setMaxRetryCount(settings.getMaxRetryCount()) + .setLiveBufferSize(settings.getLiveBufferSize()) + .setHistoryBufferSize(settings.getHistoryBufferSize()) + .setExtraStatistics(settings.isExtraStatistics()) + .setCheckpointAfterMs(settings.getCheckpointAfterMs()); + + switch (settings.getStrategy()) { + case DispatchToSingle: + settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.DispatchToSingle); + break; + case RoundRobin: + settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.RoundRobin); + break; + case Pinned: + settingsBuilder.setNamedConsumerStrategy(Persistent.CreateReq.ConsumerStrategy.Pinned); + break; + } + + streamIdentifierBuilder.setStreamName(ByteString.copyFromUtf8(stream)); + + builder.setSettings(settingsBuilder) + .setGroupName(group) + .setStreamIdentifier(streamIdentifierBuilder) + .build(); + + Persistent.CreateReq req = Persistent.CreateReq.newBuilder() + .setOptions(builder) + .build(); + + _stub.create(req, convertSingleResponse(result, resp -> { + return resp; + })); + + return result; + } + + public CompletableFuture update(PersistentSubscriptionSettings settings, String stream, String group) { + CompletableFuture result = new CompletableFuture(); + Persistent.UpdateReq.Options.Builder builder = Persistent.UpdateReq.Options.newBuilder(); + Persistent.UpdateReq.Settings.Builder settingsBuilder = Persistent.UpdateReq.Settings.newBuilder(); + Shared.StreamIdentifier.Builder streamIdentifierBuilder = Shared.StreamIdentifier.newBuilder(); + + settingsBuilder.setRevision(settings.getRevision()) + .setResolveLinks(settings.isResolveLinks()) + .setReadBatchSize(settings.getReadBatchSize()) + .setMinCheckpointCount(settings.getMinCheckpointCount()) + .setMaxCheckpointCount(settings.getMaxCheckpointCount()) + .setMessageTimeoutMs(settings.getMessageTimeoutMs()) + .setMaxSubscriberCount(settings.getMaxSubscriberCount()) + .setMaxRetryCount(settings.getMaxRetryCount()) + .setLiveBufferSize(settings.getLiveBufferSize()) + .setHistoryBufferSize(settings.getHistoryBufferSize()) + .setExtraStatistics(settings.isExtraStatistics()) + .setCheckpointAfterMs(settings.getCheckpointAfterMs()); + + switch (settings.getStrategy()) { + case DispatchToSingle: + settingsBuilder.setNamedConsumerStrategy(Persistent.UpdateReq.ConsumerStrategy.DispatchToSingle); + break; + case RoundRobin: + settingsBuilder.setNamedConsumerStrategy(Persistent.UpdateReq.ConsumerStrategy.RoundRobin); + break; + case Pinned: + settingsBuilder.setNamedConsumerStrategy(Persistent.UpdateReq.ConsumerStrategy.Pinned); + break; + } + + streamIdentifierBuilder.setStreamName(ByteString.copyFromUtf8(stream)); + + builder.setSettings(settingsBuilder) + .setGroupName(group) + .setStreamIdentifier(streamIdentifierBuilder) + .build(); + + Persistent.UpdateReq req = Persistent.UpdateReq.newBuilder() + .setOptions(builder) + .build(); + + _stub.update(req, convertSingleResponse(result, resp -> { + return resp; + })); + + return result; + } + + public CompletableFuture delete(String stream, String group) { + CompletableFuture result = new CompletableFuture(); + + Shared.StreamIdentifier streamIdentifier = + Shared.StreamIdentifier.newBuilder() + .setStreamName(ByteString.copyFromUtf8(stream)) + .build(); + + Persistent.DeleteReq.Options options = Persistent.DeleteReq.Options.newBuilder() + .setStreamIdentifier(streamIdentifier) + .setGroupName(group) + .build(); + + Persistent.DeleteReq req = Persistent.DeleteReq.newBuilder() + .setOptions(options) + .build(); + + _stub.delete(req, convertSingleResponse(result, resp -> resp)); + + return result; + } + + public CompletableFuture connect(String stream, String group, int bufferSize, PersistentSubscriptionListener listener) { + final CompletableFuture result = new CompletableFuture<>(); + + Shared.StreamIdentifier streamIdentifier = + Shared.StreamIdentifier.newBuilder() + .setStreamName(ByteString.copyFromUtf8(stream)) + .build(); + + Persistent.ReadReq.Options options = defaultReadOptions.clone() + .setBufferSize(bufferSize) + .setStreamIdentifier(streamIdentifier) + .setGroupName(group) + .build(); + + Persistent.ReadReq req = Persistent.ReadReq.newBuilder() + .setOptions(options) + .build(); + + ClientResponseObserver observer = new ClientResponseObserver() { + private boolean _confirmed; + private PersistentSubscription _subscription; + private ClientCallStreamObserver _requestStream; + + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + this._requestStream = requestStream; + } + + @Override + public void onNext(Persistent.ReadResp readResp) { + if (!_confirmed && readResp.hasSubscriptionConfirmation()) { + this._confirmed = true; + this._subscription = new PersistentSubscription(this._requestStream, readResp.getSubscriptionConfirmation().getSubscriptionId(), stream, group, bufferSize, defaultReadOptions); + result.complete(this._subscription); + return; + } + + if (!_confirmed && readResp.hasEvent()) { + onError(new IllegalStateException("Unconfirmed persistent subscription received event")); + return; + } + + if (_confirmed && !readResp.hasEvent()) { + onError(new IllegalStateException( + String.format("Confirmed persistent subscription %s received non-{event,checkpoint} variant", + _subscription.getSubscriptionId()))); + return; + } + + listener.onEvent(this._subscription, ResolvedEvent.fromWire(readResp.getEvent())); + } + + @Override + public void onError(Throwable t) { + if (t instanceof StatusRuntimeException) { + Status s = ((StatusRuntimeException) t).getStatus(); + if (s.getCode() == Status.Code.CANCELLED) { + listener.onCancelled(this._subscription); + return; + } + } + + listener.onError(this._subscription, t); + } + + @Override + public void onCompleted() { + // Subscriptions should only complete on error. + } + }; + + StreamObserver wireStream = _stub.read(observer); + wireStream.onNext(req); + + return result; + } + + private ClientResponseObserver convertSingleResponse( + CompletableFuture dest, Function converter) { + return new ClientResponseObserver() { + @Override + public void beforeStart(ClientCallStreamObserver requestStream) { + } + + @Override + public void onNext(RespT value) { + try { + TargetT converted = converter.apply(value); + dest.complete(converted); + } catch (Throwable e) { + dest.completeExceptionally(e); + } + } + + @Override + public void onError(Throwable t) { + dest.completeExceptionally(t); + } + + @Override + public void onCompleted() { + } + }; + } +} + diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscription.java b/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscription.java new file mode 100644 index 00000000..d94040ac --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscription.java @@ -0,0 +1,107 @@ +package com.eventstore.dbclient; + +import com.eventstore.dbclient.proto.persistentsubscriptions.Persistent; +import com.eventstore.dbclient.proto.shared.Shared; +import com.google.protobuf.ByteString; +import io.grpc.stub.ClientCallStreamObserver; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; + +public class PersistentSubscription { + private final ClientCallStreamObserver requestStream; + private final String subscriptionId; + private final String streamName; + private final String groupName; + private final int bufferSize; + private final Persistent.ReadReq.Options.Builder options; + + public PersistentSubscription(ClientCallStreamObserver requestStream, String subscriptionId, String streamName, String groupName, int bufferSize, Persistent.ReadReq.Options.Builder options) { + this.requestStream = requestStream; + this.subscriptionId = subscriptionId; + this.streamName = streamName; + this.groupName = groupName; + this.bufferSize = bufferSize; + this.options = options; + } + + public String getSubscriptionId() { + return subscriptionId; + } + + public void stop() { + this.requestStream.cancel("user-initiated", null); + } + + public void ack(ResolvedEvent ...events) { + this.ack(Arrays.stream(events).iterator()); + } + + public void ack(Iterator events) { + Persistent.ReadReq.Ack.Builder ackBuilder = Persistent.ReadReq.Ack.newBuilder() + .setId(ByteString.copyFromUtf8(subscriptionId)); + + while (events.hasNext()) { + ResolvedEvent event = events.next(); + RecordedEvent record = event.getLink() != null ? event.getLink() : event.getEvent(); + Shared.UUID.Structured structured = Shared.UUID.Structured.newBuilder() + .setLeastSignificantBits(record.getEventId().getLeastSignificantBits()) + .setMostSignificantBits(record.getEventId().getMostSignificantBits()) + .build(); + + Shared.UUID uuid = Shared.UUID.newBuilder().setStructured(structured).build(); + ackBuilder.addIds(uuid); + } + + Persistent.ReadReq.Ack ack = ackBuilder.build(); + Persistent.ReadReq req = Persistent.ReadReq.newBuilder() + .setAck(ack) + .build(); + + requestStream.onNext(req); + } + + public void nack(NackAction action, String reason, ResolvedEvent...events) { + this.nack(action, reason, Arrays.stream(events).iterator()); + } + + public void nack(NackAction action, String reason, Iterator events) { + Persistent.ReadReq.Nack.Builder nackBuilder = Persistent.ReadReq.Nack.newBuilder() + .setId(ByteString.copyFromUtf8(subscriptionId)); + + while (events.hasNext()) { + ResolvedEvent event = events.next(); + RecordedEvent record = event.getLink() != null ? event.getLink() : event.getEvent(); + Shared.UUID.Structured structured = Shared.UUID.Structured.newBuilder() + .setLeastSignificantBits(record.getEventId().getLeastSignificantBits()) + .setMostSignificantBits(record.getEventId().getMostSignificantBits()) + .build(); + + Shared.UUID uuid = Shared.UUID.newBuilder().setStructured(structured).build(); + nackBuilder.addIds(uuid); + } + + nackBuilder.setReason(reason); + switch (action) { + case Park: + nackBuilder.setAction(Persistent.ReadReq.Nack.Action.Park); + break; + case Retry: + nackBuilder.setAction(Persistent.ReadReq.Nack.Action.Retry); + break; + case Skip: + nackBuilder.setAction(Persistent.ReadReq.Nack.Action.Skip); + break; + case Stop: + nackBuilder.setAction(Persistent.ReadReq.Nack.Action.Stop); + break; + } + + Persistent.ReadReq req = Persistent.ReadReq.newBuilder() + .setNack(nackBuilder) + .build(); + + requestStream.onNext(req); + } +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscriptionListener.java b/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscriptionListener.java new file mode 100644 index 00000000..11720756 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscriptionListener.java @@ -0,0 +1,12 @@ +package com.eventstore.dbclient; + +public abstract class PersistentSubscriptionListener { + public void onEvent(PersistentSubscription subscription, ResolvedEvent event) { + } + + public void onError(PersistentSubscription subscription, Throwable throwable) { + } + + public void onCancelled(PersistentSubscription subscription) { + } +} \ No newline at end of file diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscriptionSettings.java b/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscriptionSettings.java new file mode 100644 index 00000000..6ba91475 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscriptionSettings.java @@ -0,0 +1,93 @@ +package com.eventstore.dbclient; + +public class PersistentSubscriptionSettings { + private int checkpointAfterMs; + private boolean extraStatistics; + private boolean resolveLinks; + private int historyBufferSize; + private int liveBufferSize; + private int maxCheckpointCount; + private int maxRetryCount; + private int maxSubscriberCount; + private int messageTimeoutMs; + private int minCheckpointCount; + private int readBatchSize; + private long revision; + private ConsumerStrategy strategy; + + public PersistentSubscriptionSettings(int checkpointAfterMs, boolean extraStatistics, boolean resolveLinks, int historyBufferSize, int liveBufferSize, int maxCheckpointCount, int maxRetryCount, int maxSubscriberCount, int messageTimeoutMs, int minCheckpointCount, int readBatchSize, long revision, ConsumerStrategy strategy) { + this.checkpointAfterMs = checkpointAfterMs; + this.extraStatistics = extraStatistics; + this.resolveLinks = resolveLinks; + this.historyBufferSize = historyBufferSize; + this.liveBufferSize = liveBufferSize; + this.maxCheckpointCount = maxCheckpointCount; + this.maxRetryCount = maxRetryCount; + this.maxSubscriberCount = maxSubscriberCount; + this.messageTimeoutMs = messageTimeoutMs; + this.minCheckpointCount = minCheckpointCount; + this.readBatchSize = readBatchSize; + this.revision = revision; + this.strategy = strategy; + } + + public static PersistentSubscriptionSettingsBuilder builder() { + return new PersistentSubscriptionSettingsBuilder(); + } + + public static PersistentSubscriptionSettingsBuilder copy(PersistentSubscriptionSettings settings) { + return new PersistentSubscriptionSettingsBuilder(settings); + } + + public int getCheckpointAfterMs() { + return checkpointAfterMs; + } + + public boolean isExtraStatistics() { + return extraStatistics; + } + + public boolean isResolveLinks() { + return resolveLinks; + } + + public int getHistoryBufferSize() { + return historyBufferSize; + } + + public int getLiveBufferSize() { + return liveBufferSize; + } + + public int getMaxCheckpointCount() { + return maxCheckpointCount; + } + + public int getMaxRetryCount() { + return maxRetryCount; + } + + public int getMaxSubscriberCount() { + return maxSubscriberCount; + } + + public int getMessageTimeoutMs() { + return messageTimeoutMs; + } + + public int getMinCheckpointCount() { + return minCheckpointCount; + } + + public int getReadBatchSize() { + return readBatchSize; + } + + public long getRevision() { + return revision; + } + + public ConsumerStrategy getStrategy() { + return strategy; + } +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscriptionSettingsBuilder.java b/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscriptionSettingsBuilder.java new file mode 100644 index 00000000..3fff6645 --- /dev/null +++ b/db-client-java/src/main/java/com/eventstore/dbclient/PersistentSubscriptionSettingsBuilder.java @@ -0,0 +1,140 @@ +package com.eventstore.dbclient; + +import com.eventstore.dbclient.proto.persistentsubscriptions.Persistent; + +public class PersistentSubscriptionSettingsBuilder { + private int checkpointAfterMs; + private boolean extraStatistics; + private boolean resolveLinks; + private int historyBufferSize; + private int liveBufferSize; + private int maxCheckpointCount; + private int maxRetryCount; + private int maxSubscriberCount; + private int messageTimeoutMs; + private int minCheckpointCount; + private int readBatchSize; + private long revision; + private ConsumerStrategy strategy; + + public PersistentSubscriptionSettingsBuilder() { + checkpointAfterMs = 2_000; + resolveLinks = false; + extraStatistics = false; + revision = 0; + messageTimeoutMs = 30_000; + maxRetryCount = 10; + minCheckpointCount = 10; + maxCheckpointCount = 1_000; + maxSubscriberCount = 0; + liveBufferSize = 500; + readBatchSize = 20; + historyBufferSize = 500; + strategy = ConsumerStrategy.RoundRobin; + } + + public PersistentSubscriptionSettingsBuilder(PersistentSubscriptionSettings settings) { + checkpointAfterMs = settings.getCheckpointAfterMs(); + resolveLinks = settings.isResolveLinks(); + extraStatistics = settings.isExtraStatistics(); + revision = settings.getRevision(); + messageTimeoutMs = settings.getMessageTimeoutMs(); + maxRetryCount = settings.getMaxRetryCount(); + minCheckpointCount = settings.getMinCheckpointCount(); + maxCheckpointCount = settings.getMaxCheckpointCount(); + maxSubscriberCount = settings.getMaxSubscriberCount(); + liveBufferSize = settings.getLiveBufferSize(); + readBatchSize = settings.getReadBatchSize(); + historyBufferSize = settings.getHistoryBufferSize(); + strategy = settings.getStrategy(); + } + + public PersistentSubscriptionSettings build() { + return new PersistentSubscriptionSettings(checkpointAfterMs, extraStatistics, resolveLinks, historyBufferSize, liveBufferSize, maxCheckpointCount, maxRetryCount, maxSubscriberCount, messageTimeoutMs, minCheckpointCount, readBatchSize, revision, strategy); + } + + public PersistentSubscriptionSettingsBuilder enableLinkResolution() { + return resolveLinks(true); + } + + public PersistentSubscriptionSettingsBuilder disableLinkResolution() { + return resolveLinks(false); + } + + public PersistentSubscriptionSettingsBuilder resolveLinks(boolean value) { + this.resolveLinks = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder enableExtraStatistics() { + return extraStatistics(true); + } + + public PersistentSubscriptionSettingsBuilder disableExtraStatistics() { + return extraStatistics(false); + } + + public PersistentSubscriptionSettingsBuilder extraStatistics(boolean value) { + this.extraStatistics = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder checkpointAfterInMs(int value) { + this.checkpointAfterMs = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder revision(long value) { + this.revision = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder fromStreamStart() { + return revision(0); + } + + public PersistentSubscriptionSettingsBuilder historyBufferSize(int value) { + this.historyBufferSize = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder liveBufferSize(int value) { + this.liveBufferSize = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder maxCheckpointCount(int value) { + this.maxCheckpointCount = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder minCheckpointCount(int value) { + this.minCheckpointCount = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder maxSubscriberCount(int value) { + this.maxSubscriberCount = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder maxRetryCount(int value) { + this.maxRetryCount = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder messageTimeoutInMs(int value) { + this.messageTimeoutMs = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder readBatchSize(int value) { + this.readBatchSize = value; + return this; + } + + public PersistentSubscriptionSettingsBuilder consumerStrategy(ConsumerStrategy strategy) { + this.strategy = strategy; + return this; + } +} diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/RecordedEvent.java b/db-client-java/src/main/java/com/eventstore/dbclient/RecordedEvent.java index fdd78ef3..daf109dd 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/RecordedEvent.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/RecordedEvent.java @@ -1,5 +1,6 @@ package com.eventstore.dbclient; +import com.eventstore.dbclient.proto.persistentsubscriptions.Persistent; import com.eventstore.dbclient.proto.shared.Shared; import com.eventstore.dbclient.proto.streams.StreamsOuterClass; @@ -121,4 +122,23 @@ static RecordedEvent fromWire(StreamsOuterClass.ReadResp.ReadEvent.RecordedEvent wireEvent.getData().toByteArray(), wireEvent.getCustomMetadata().toByteArray()); } + + static RecordedEvent fromWire(Persistent.ReadResp.ReadEvent.RecordedEvent wireEvent) { + UUID eventId; + if (wireEvent.getId().hasStructured()) { + Shared.UUID.Structured structured = wireEvent.getId().getStructured(); + eventId = new UUID(structured.getMostSignificantBits(), structured.getLeastSignificantBits()); + } else { + eventId = UUID.fromString(wireEvent.getId().getString()); + } + + return new RecordedEvent( + wireEvent.getStreamIdentifier().getStreamName().toStringUtf8(), + new StreamRevision(wireEvent.getStreamRevision()), + eventId, + new Position(wireEvent.getCommitPosition(), wireEvent.getPreparePosition()), + wireEvent.getMetadataMap(), + wireEvent.getData().toByteArray(), + wireEvent.getCustomMetadata().toByteArray()); + } } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/ResolvedEvent.java b/db-client-java/src/main/java/com/eventstore/dbclient/ResolvedEvent.java index d06aac46..4741eab0 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/ResolvedEvent.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/ResolvedEvent.java @@ -1,5 +1,6 @@ package com.eventstore.dbclient; +import com.eventstore.dbclient.proto.persistentsubscriptions.Persistent; import com.eventstore.dbclient.proto.streams.StreamsOuterClass; public class ResolvedEvent { @@ -25,4 +26,11 @@ static ResolvedEvent fromWire(StreamsOuterClass.ReadResp.ReadEvent wireEvent) { return new ResolvedEvent(event, link); } + + static ResolvedEvent fromWire(Persistent.ReadResp.ReadEvent wireEvent) { + RecordedEvent event = wireEvent.hasEvent() ? RecordedEvent.fromWire(wireEvent.getEvent()) : null; + RecordedEvent link = wireEvent.hasLink() ? RecordedEvent.fromWire(wireEvent.getLink()) : null; + + return new ResolvedEvent(event, link); + } } diff --git a/db-client-java/src/main/java/com/eventstore/dbclient/StreamsClient.java b/db-client-java/src/main/java/com/eventstore/dbclient/StreamsClient.java index 64c24fc5..1db16755 100644 --- a/db-client-java/src/main/java/com/eventstore/dbclient/StreamsClient.java +++ b/db-client-java/src/main/java/com/eventstore/dbclient/StreamsClient.java @@ -78,15 +78,14 @@ public void shutdown() throws InterruptedException { public CompletableFuture appendToStream( @NotNull String streamName, - @NotNull StreamRevision expectedRevision, + @NotNull ExpectedRevision expectedRevision, @NotNull List proposedEvents) { StreamsOuterClass.AppendReq.Options.Builder options = StreamsOuterClass.AppendReq.Options.newBuilder() .setStreamIdentifier(Shared.StreamIdentifier.newBuilder() .setStreamName(ByteString.copyFromUtf8(streamName)) - .build()) - .setRevision(expectedRevision.getValueUnsigned()); + .build()); - return appendInternal(options, proposedEvents); + return appendInternal(expectedRevision.applyOnWire(options), proposedEvents); } public CompletableFuture appendToStream( @@ -383,16 +382,21 @@ private CompletableFuture appendInternal( requestStream.onNext(StreamsOuterClass.AppendReq.newBuilder().setOptions(options).build()); for (ProposedEvent e : proposedEvents) { + StreamsOuterClass.AppendReq.ProposedMessage.Builder msgBuilder = StreamsOuterClass.AppendReq.ProposedMessage.newBuilder() + .setId(Shared.UUID.newBuilder() + .setStructured(Shared.UUID.Structured.newBuilder() + .setMostSignificantBits(e.getEventId().getMostSignificantBits()) + .setLeastSignificantBits(e.getEventId().getLeastSignificantBits()))) + .setData(ByteString.copyFrom(e.getEventData())) + .putMetadata(SystemMetadataKeys.CONTENT_TYPE, e.getContentType()) + .putMetadata(SystemMetadataKeys.TYPE, e.getEventType()); + + if (e.getUserMetadata() != null) { + msgBuilder.setCustomMetadata(ByteString.copyFrom(e.getUserMetadata())); + } + requestStream.onNext(StreamsOuterClass.AppendReq.newBuilder() - .setProposedMessage(StreamsOuterClass.AppendReq.ProposedMessage.newBuilder() - .setId(Shared.UUID.newBuilder() - .setStructured(Shared.UUID.Structured.newBuilder() - .setMostSignificantBits(e.getEventId().getMostSignificantBits()) - .setLeastSignificantBits(e.getEventId().getLeastSignificantBits()))) - .setData(ByteString.copyFrom(e.getEventData())) - .setCustomMetadata(ByteString.copyFrom(e.getUserMetadata())) - .putMetadata(SystemMetadataKeys.CONTENT_TYPE, e.getContentType()) - .putMetadata(SystemMetadataKeys.TYPE, e.getEventType())) + .setProposedMessage(msgBuilder) .build()); } requestStream.onCompleted(); diff --git a/db-client-java/src/main/proto/persistent.proto b/db-client-java/src/main/proto/persistent.proto new file mode 100644 index 00000000..79b17765 --- /dev/null +++ b/db-client-java/src/main/proto/persistent.proto @@ -0,0 +1,180 @@ +syntax = "proto3"; +package event_store.client.persistent_subscriptions; +option java_package = "com.eventstore.dbclient.proto.persistentsubscriptions"; + +import "shared.proto"; + +service PersistentSubscriptions { + rpc Create (CreateReq) returns (CreateResp); + rpc Update (UpdateReq) returns (UpdateResp); + rpc Delete (DeleteReq) returns (DeleteResp); + rpc Read (stream ReadReq) returns (stream ReadResp); +} + +message ReadReq { + oneof content { + Options options = 1; + Ack ack = 2; + Nack nack = 3; + } + + message Options { + event_store.client.shared.StreamIdentifier stream_identifier = 1; + string group_name = 2; + int32 buffer_size = 3; + UUIDOption uuid_option = 4; + + message UUIDOption { + oneof content { + event_store.client.shared.Empty structured = 1; + event_store.client.shared.Empty string = 2; + } + } + } + + message Ack { + bytes id = 1; + repeated event_store.client.shared.UUID ids = 2; + } + + message Nack { + bytes id = 1; + repeated event_store.client.shared.UUID ids = 2; + Action action = 3; + string reason = 4; + + enum Action { + Unknown = 0; + Park = 1; + Retry = 2; + Skip = 3; + Stop = 4; + } + } +} + +message ReadResp { + oneof content { + ReadEvent event = 1; + SubscriptionConfirmation subscription_confirmation = 2; + } + message ReadEvent { + RecordedEvent event = 1; + RecordedEvent link = 2; + oneof position { + uint64 commit_position = 3; + event_store.client.shared.Empty no_position = 4; + } + oneof count { + int32 retry_count = 5; + event_store.client.shared.Empty no_retry_count = 6; + } + message RecordedEvent { + event_store.client.shared.UUID id = 1; + event_store.client.shared.StreamIdentifier stream_identifier = 2; + uint64 stream_revision = 3; + uint64 prepare_position = 4; + uint64 commit_position = 5; + map metadata = 6; + bytes custom_metadata = 7; + bytes data = 8; + } + } + message SubscriptionConfirmation { + string subscription_id = 1; + } +} + +message CreateReq { + Options options = 1; + + message Options { + event_store.client.shared.StreamIdentifier stream_identifier = 1; + string group_name = 2; + Settings settings = 3; + } + + message Settings { + bool resolve_links = 1; + uint64 revision = 2; + bool extra_statistics = 3; + int32 max_retry_count = 5; + int32 min_checkpoint_count = 7; + int32 max_checkpoint_count = 8; + int32 max_subscriber_count = 9; + int32 live_buffer_size = 10; + int32 read_batch_size = 11; + int32 history_buffer_size = 12; + ConsumerStrategy named_consumer_strategy = 13; + oneof message_timeout { + int64 message_timeout_ticks = 4; + int32 message_timeout_ms = 14; + } + oneof checkpoint_after { + int64 checkpoint_after_ticks = 6; + int32 checkpoint_after_ms = 15; + } + } + + enum ConsumerStrategy { + DispatchToSingle = 0; + RoundRobin = 1; + Pinned = 2; + } +} + +message CreateResp { +} + +message UpdateReq { + Options options = 1; + + message Options { + event_store.client.shared.StreamIdentifier stream_identifier = 1; + string group_name = 2; + Settings settings = 3; + } + + message Settings { + bool resolve_links = 1; + uint64 revision = 2; + bool extra_statistics = 3; + int32 max_retry_count = 5; + int32 min_checkpoint_count = 7; + int32 max_checkpoint_count = 8; + int32 max_subscriber_count = 9; + int32 live_buffer_size = 10; + int32 read_batch_size = 11; + int32 history_buffer_size = 12; + ConsumerStrategy named_consumer_strategy = 13; + oneof message_timeout { + int64 message_timeout_ticks = 4; + int32 message_timeout_ms = 14; + } + oneof checkpoint_after { + int64 checkpoint_after_ticks = 6; + int32 checkpoint_after_ms = 15; + } + } + + enum ConsumerStrategy { + DispatchToSingle = 0; + RoundRobin = 1; + Pinned = 2; + } +} + +message UpdateResp { +} + +message DeleteReq { + Options options = 1; + + message Options { + event_store.client.shared.StreamIdentifier stream_identifier = 1; + string group_name = 2; + } +} + +message DeleteResp { +} \ No newline at end of file diff --git a/db-client-java/src/test/java/com/eventstore/dbclient/ConnectPersistentSubcription.java b/db-client-java/src/test/java/com/eventstore/dbclient/ConnectPersistentSubcription.java new file mode 100644 index 00000000..fc150f1a --- /dev/null +++ b/db-client-java/src/test/java/com/eventstore/dbclient/ConnectPersistentSubcription.java @@ -0,0 +1,76 @@ +package com.eventstore.dbclient; + +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import testcontainers.module.EventStoreStreamsClient; +import testcontainers.module.EventStoreTestDBContainer; + +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; + +public class ConnectPersistentSubcription { + @Rule + public final EventStoreTestDBContainer server = new EventStoreTestDBContainer(false); + + @Rule + public final EventStoreStreamsClient client = new EventStoreStreamsClient(server); + + @Test + public void testConnectPersistentSub() throws Throwable { + PersistentClient client = server.getPersistentClient(); + StreamsClient streamsClient = server.getStreamsClient(); + String streamName = "aStream-" + UUID.randomUUID().toString(); + + PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.builder().build(); + client.create(settings, streamName, "aGroup").get(); + + List events = new ArrayList<>(); + byte[] eventData = "{'foo': true}".getBytes(); + + for (int i = 0; i < 3; ++i) { + events.add(new ProposedEvent(UUID.randomUUID(), "foobar", "application/json", eventData, null)); + } + + final CompletableFuture result = new CompletableFuture<>(); + streamsClient.appendToStream(streamName, ExpectedRevision.ANY, events).get(); + + + client.connect(streamName, "aGroup", 32, new PersistentSubscriptionListener() { + private int count = 0; + + @Override + public void onEvent(PersistentSubscription subscription, ResolvedEvent event) { + ++this.count; + + subscription.ack(event); + + if (this.count == 6) { + result.complete(this.count); + subscription.stop(); + } + } + + @Override + public void onError(PersistentSubscription subscription, Throwable throwable) { + result.completeExceptionally(throwable); + } + + @Override + public void onCancelled(PersistentSubscription subscription) { + result.complete(count); + } + }).get(); + + events.clear(); + + for (int i = 0; i < 3; ++i) { + events.add(new ProposedEvent(UUID.randomUUID(), "foobar", "application/json", eventData, null)); + } + + streamsClient.appendToStream(streamName, ExpectedRevision.ANY, events).get(); + Assert.assertEquals(6, result.get().intValue()); + } +} diff --git a/db-client-java/src/test/java/com/eventstore/dbclient/CreatePersistentSubscriptionTests.java b/db-client-java/src/test/java/com/eventstore/dbclient/CreatePersistentSubscriptionTests.java new file mode 100644 index 00000000..a5dc4fea --- /dev/null +++ b/db-client-java/src/test/java/com/eventstore/dbclient/CreatePersistentSubscriptionTests.java @@ -0,0 +1,26 @@ +package com.eventstore.dbclient; + +import org.junit.Rule; +import org.junit.Test; +import testcontainers.module.EventStoreStreamsClient; +import testcontainers.module.EventStoreTestDBContainer; + +import java.util.concurrent.CompletableFuture; + +public class CreatePersistentSubscriptionTests { + @Rule + public final EventStoreTestDBContainer server = new EventStoreTestDBContainer(false); + + @Rule + public final EventStoreStreamsClient client = new EventStoreStreamsClient(server); + + @Test + public void testCreatePersistentSub() throws Throwable { { + PersistentClient client = server.getPersistentClient(); + + PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.builder().build(); + CompletableFuture result = client.create(settings, "aStream", "aGroup"); + + result.get(); + }} +} diff --git a/db-client-java/src/test/java/com/eventstore/dbclient/SubscribeToStreamTests.java b/db-client-java/src/test/java/com/eventstore/dbclient/SubscribeToStreamTests.java index d2ae4762..7a6a916c 100644 --- a/db-client-java/src/test/java/com/eventstore/dbclient/SubscribeToStreamTests.java +++ b/db-client-java/src/test/java/com/eventstore/dbclient/SubscribeToStreamTests.java @@ -148,7 +148,7 @@ public void onError(Subscription subscription, Throwable throwable) { "application/octet-stream", eventData, eventMetaData)); CompletableFuture writeFuture = client.instance.appendToStream(testStreamName, - new StreamRevision(5999), events); + ExpectedRevision.expectedRevision(5999), events); WriteResult writeResult = writeFuture.get(); assertEquals(new StreamRevision(6000), writeResult.getNextExpectedRevision()); diff --git a/db-client-java/src/test/java/com/eventstore/dbclient/UpdatePersistentSubscriptionTests.java b/db-client-java/src/test/java/com/eventstore/dbclient/UpdatePersistentSubscriptionTests.java new file mode 100644 index 00000000..17914ed2 --- /dev/null +++ b/db-client-java/src/test/java/com/eventstore/dbclient/UpdatePersistentSubscriptionTests.java @@ -0,0 +1,34 @@ +package com.eventstore.dbclient; + +import org.junit.Rule; +import org.junit.Test; +import testcontainers.module.EventStoreStreamsClient; +import testcontainers.module.EventStoreTestDBContainer; + +import java.util.concurrent.CompletableFuture; + +public class UpdatePersistentSubscriptionTests { + @Rule + public final EventStoreTestDBContainer server = new EventStoreTestDBContainer(false); + + @Rule + public final EventStoreStreamsClient client = new EventStoreStreamsClient(server); + + @Test + public void testUpdatePersistentSub() throws Throwable { { + PersistentClient client = server.getPersistentClient(); + + PersistentSubscriptionSettings settings = PersistentSubscriptionSettings.builder().build(); + CompletableFuture result = client.create(settings, "aStream", "aGroupUpd"); + + result.get(); + + PersistentSubscriptionSettings updatedSettings = PersistentSubscriptionSettings.copy(settings) + .checkpointAfterInMs(5_000) + .build(); + + result = client.update(updatedSettings, "aStream", "aGroupUpd"); + + result.get(); + }} +} diff --git a/db-client-java/src/test/java/testcontainers/module/EventStoreTestDBContainer.java b/db-client-java/src/test/java/testcontainers/module/EventStoreTestDBContainer.java index 1e11e010..9df89e49 100644 --- a/db-client-java/src/test/java/testcontainers/module/EventStoreTestDBContainer.java +++ b/db-client-java/src/test/java/testcontainers/module/EventStoreTestDBContainer.java @@ -1,6 +1,7 @@ package testcontainers.module; import com.eventstore.dbclient.EventStoreConnection; +import com.eventstore.dbclient.PersistentClient; import com.eventstore.dbclient.StreamsClient; import com.eventstore.dbclient.UserCredentials; import com.github.dockerjava.api.model.HealthCheck; @@ -53,16 +54,23 @@ public EventStoreTestDBContainer(String image, boolean emptyDatabase) { waitingFor(Wait.forHealthcheck()); } - public StreamsClient getStreamsClient() { + public EventStoreConnection getConnection() { final String address = getContainerIpAddress(); final int port = getMappedPort(DB_HTTP_PORT); return EventStoreConnection .builder() - .insecure() + .sslContext(getClientSslContext()) .defaultUserCredentials(new UserCredentials("admin", "changeit")) - .createSingleNodeConnection(address, port) - .newStreamsClient(); + .createSingleNodeConnection(address, port); + } + + public StreamsClient getStreamsClient() { + return getConnection().newStreamsClient(); + } + + public PersistentClient getPersistentClient() { + return getConnection().newPersistentClient(); } private SslContext getClientSslContext() {