From bd7489f75d20d03e8eaa62d6b0d8ff6aadfee33d Mon Sep 17 00:00:00 2001 From: James Nugent Date: Thu, 4 Jun 2020 14:28:28 -0500 Subject: [PATCH] WIP: Cluster Discovery --- build.gradle | 1 + .../com/eventstore/dbclient/ClusterInfo.java | 135 +++++++++++++++++ .../dbclient/EndpointDiscoverer.java | 4 + .../com/eventstore/dbclient/GossipClient.java | 49 ++++++ .../dbclient/GossipResolverFactory.java | 141 ++++++++++++++++++ .../eventstore/dbclient/NodePreference.java | 5 + .../com/eventstore/dbclient/Timeouts.java | 9 +- .../eventstore/dbclient/TimeoutsBuilder.java | 15 +- src/main/proto/gossip.proto | 44 ++++++ .../dbclient/ClusterDiscoveryTests.java | 52 +++++++ 10 files changed, 453 insertions(+), 2 deletions(-) create mode 100644 src/main/java/com/eventstore/dbclient/ClusterInfo.java create mode 100644 src/main/java/com/eventstore/dbclient/EndpointDiscoverer.java create mode 100644 src/main/java/com/eventstore/dbclient/GossipClient.java create mode 100644 src/main/java/com/eventstore/dbclient/GossipResolverFactory.java create mode 100644 src/main/java/com/eventstore/dbclient/NodePreference.java create mode 100644 src/main/proto/gossip.proto create mode 100644 src/test/java/com/eventstore/dbclient/ClusterDiscoveryTests.java diff --git a/build.gradle b/build.gradle index 726b6bdf..364cfdfe 100644 --- a/build.gradle +++ b/build.gradle @@ -4,6 +4,7 @@ import org.gradle.api.tasks.testing.logging.TestLogEvent plugins { id "java" id "com.google.protobuf" version "0.8.12" + id "idea" } group 'com.eventstore' diff --git a/src/main/java/com/eventstore/dbclient/ClusterInfo.java b/src/main/java/com/eventstore/dbclient/ClusterInfo.java new file mode 100644 index 00000000..1e815cb6 --- /dev/null +++ b/src/main/java/com/eventstore/dbclient/ClusterInfo.java @@ -0,0 +1,135 @@ +package com.eventstore.dbclient; + +import com.eventstore.dbclient.proto.gossip.GossipOuterClass; +import com.eventstore.dbclient.proto.shared.Shared; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; + +public class ClusterInfo { + private final List members; + + public ClusterInfo(List members) { + this.members = members; + } + + public List getMembers() { + return members; + } + + static ClusterInfo fromWire(GossipOuterClass.ClusterInfo wire) { + List members = new ArrayList<>(); + for (GossipOuterClass.MemberInfo member : wire.getMembersList()) { + UUID instanceId; + if (member.getInstanceId().hasStructured()) { + Shared.UUID.Structured structured = member.getInstanceId().getStructured(); + instanceId = new UUID(structured.getMostSignificantBits(), structured.getLeastSignificantBits()); + } else { + instanceId = UUID.fromString(member.getInstanceId().getString()); + } + boolean isAlive = member.getIsAlive(); + MemberState state = MemberState.fromWire(member.getState()); + Endpoint httpEndpoint = new Endpoint(member.getHttpEndPoint().getAddress(), member.getHttpEndPoint().getPort()); + + members.add(new Member(instanceId, isAlive, state, httpEndpoint)); + } + + return new ClusterInfo(members); + } + + public enum MemberState { + INITIALIZING, DISCOVER_LEADER, UNKNOWN, PRE_REPLICA, CATCHING_UP, CLONE, + FOLLOWER, PRE_LEADER, LEADER, MANAGER, SHUTTING_DOWN, SHUT_DOWN, READ_ONLY_LEADERLESS, + PRE_READ_ONLY_REPLICA, READ_ONLY_REPLICA, RESIGNING_LEADER; + + static MemberState fromWire(GossipOuterClass.MemberInfo.VNodeState state) { + switch (state) { + case Initializing: + return INITIALIZING; + case DiscoverLeader: + return DISCOVER_LEADER; + case PreReplica: + return PRE_REPLICA; + case CatchingUp: + return CATCHING_UP; + case Clone: + return CLONE; + case Follower: + return FOLLOWER; + case PreLeader: + return PRE_LEADER; + case Leader: + return LEADER; + case Manager: + return MANAGER; + case ShuttingDown: + return SHUTTING_DOWN; + case Shutdown: + return SHUT_DOWN; + case ReadOnlyLeaderless: + return READ_ONLY_LEADERLESS; + case PreReadOnlyReplica: + return PRE_READ_ONLY_REPLICA; + case ReadOnlyReplica: + return READ_ONLY_REPLICA; + case ResigningLeader: + return RESIGNING_LEADER; + } + return UNKNOWN; + } + } + + public static class Endpoint { + private final String address; + private final int port; + + Endpoint(String address, int port) { + this.address = address; + this.port = port; + } + + InetSocketAddress toInetSocketAddress() { + return new InetSocketAddress(this.address, this.port); + } + + public String getAddress() { + return address; + } + + public int getPort() { + return port; + } + } + + public static class Member { + private final UUID instanceId; + private final boolean isAlive; + private final MemberState state; + private final Endpoint httpEndpoint; + + Member(UUID instanceId, boolean isAlive, MemberState state, Endpoint httpEndpoint) { + this.instanceId = instanceId; + this.isAlive = isAlive; + this.state = state; + this.httpEndpoint = httpEndpoint; + } + + public UUID getInstanceId() { + return instanceId; + } + + public boolean isAlive() { + return isAlive; + } + + public MemberState getState() { + return state; + } + + public Endpoint getHttpEndpoint() { + return httpEndpoint; + } + } +} diff --git a/src/main/java/com/eventstore/dbclient/EndpointDiscoverer.java b/src/main/java/com/eventstore/dbclient/EndpointDiscoverer.java new file mode 100644 index 00000000..99d43618 --- /dev/null +++ b/src/main/java/com/eventstore/dbclient/EndpointDiscoverer.java @@ -0,0 +1,4 @@ +package com.eventstore.dbclient; + +public interface EndpointDiscoverer { +} diff --git a/src/main/java/com/eventstore/dbclient/GossipClient.java b/src/main/java/com/eventstore/dbclient/GossipClient.java new file mode 100644 index 00000000..3a6825c3 --- /dev/null +++ b/src/main/java/com/eventstore/dbclient/GossipClient.java @@ -0,0 +1,49 @@ +package com.eventstore.dbclient; + +import com.eventstore.dbclient.proto.gossip.GossipGrpc; +import com.eventstore.dbclient.proto.gossip.GossipOuterClass; +import com.eventstore.dbclient.proto.shared.Shared; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; + +import java.util.concurrent.CompletableFuture; + +public class GossipClient { + private final ManagedChannel channel; + private final GossipGrpc.GossipStub stub; + private final Timeouts timeouts; + + public GossipClient(ManagedChannel channel, Timeouts timeouts) { + this.channel = channel; + this.timeouts = timeouts; + + this.stub = GossipGrpc.newStub(channel); + } + + public void shutdown() throws InterruptedException { + this.channel.shutdown().awaitTermination(timeouts.shutdownTimeout, timeouts.shutdownTimeoutUnit); + } + + public CompletableFuture readGossip() { + CompletableFuture future = new CompletableFuture<>(); + + this.stub.read(Shared.Empty.getDefaultInstance(), new StreamObserver() { + @Override + public void onNext(GossipOuterClass.ClusterInfo value) { + ClusterInfo info = ClusterInfo.fromWire(value); + future.complete(info); + } + + @Override + public void onError(Throwable t) { + future.completeExceptionally(t); + } + + @Override + public void onCompleted() { + } + }); + + return future; + } +} diff --git a/src/main/java/com/eventstore/dbclient/GossipResolverFactory.java b/src/main/java/com/eventstore/dbclient/GossipResolverFactory.java new file mode 100644 index 00000000..666c79e0 --- /dev/null +++ b/src/main/java/com/eventstore/dbclient/GossipResolverFactory.java @@ -0,0 +1,141 @@ +package com.eventstore.dbclient; + +import io.grpc.*; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.URI; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +public class GossipResolverFactory extends NameResolver.Factory { + private static final Set invalidStates; + + static { + invalidStates = new HashSet() {{ + add(ClusterInfo.MemberState.MANAGER); + add(ClusterInfo.MemberState.SHUTTING_DOWN); + add(ClusterInfo.MemberState.SHUT_DOWN); + add(ClusterInfo.MemberState.UNKNOWN); + add(ClusterInfo.MemberState.INITIALIZING); + add(ClusterInfo.MemberState.RESIGNING_LEADER); + add(ClusterInfo.MemberState.PRE_LEADER); + add(ClusterInfo.MemberState.PRE_REPLICA); + add(ClusterInfo.MemberState.PRE_READ_ONLY_REPLICA); + add(ClusterInfo.MemberState.CLONE); + add(ClusterInfo.MemberState.DISCOVER_LEADER); + }}; + } + + private final List seedNodes; + private final NodePreference nodePreference; + private final SslContext sslContext; + private final Timeouts timeouts; + + public GossipResolverFactory(List seedNodes, NodePreference nodePreference, Timeouts timeouts, SslContext sslContext) { + this.seedNodes = seedNodes; + this.nodePreference = nodePreference; + this.sslContext = sslContext; + this.timeouts = timeouts; + } + + @Override + public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) { + return new NameResolver() { + @Override + public String getServiceAuthority() { + return "eventStoreDBGossip"; + } + + @Override + public void start(Listener2 listener) { + Collections.shuffle(seedNodes); + + for (InetSocketAddress seed : seedNodes) { + try { + ClusterInfo.Endpoint endpoint = attemptDiscovery(seed).get(); + if (endpoint == null) { + continue; + } + + InetSocketAddress addr = endpoint.toInetSocketAddress(); + List addrs = new ArrayList<>(); + addrs.add(addr); + EquivalentAddressGroup addrGroup = new EquivalentAddressGroup(addrs); + List addrGroups = new ArrayList<>(); + addrGroups.add(addrGroup); + + listener.onResult(ResolutionResult.newBuilder() + .setAddresses(addrGroups) + .setAttributes(Attributes.EMPTY) + .build()); + return; + } catch (InterruptedException | ExecutionException e) { + listener.onError(Status.INTERNAL); + return; + } + } + } + + @Override + public void shutdown() { + } + }; + } + + @Override + public String getDefaultScheme() { + return "eventstore"; + } + + private CompletableFuture attemptDiscovery(InetSocketAddress seed) { + ManagedChannel channel = NettyChannelBuilder.forAddress(seed) + .userAgent("Event Store Client (Java) v1.0.0-SNAPSHOT") + .sslContext(this.sslContext) + .build(); + GossipClient client = new GossipClient(channel, timeouts); + return client.readGossip() + .thenApply(this::determineBestFitNode) + .thenApply(m -> m.map(ClusterInfo.Member::getHttpEndpoint).orElse(null)); + } + + private Optional determineBestFitNode(ClusterInfo clusterInfo) { + return clusterInfo.getMembers() + .stream() + .filter(ClusterInfo.Member::isAlive) + .filter(m -> !invalidStates.contains(m.getState())) + .sorted((o1, o2) -> { + switch (nodePreference) { + case LEADER: + if (o1.getState().equals(ClusterInfo.MemberState.LEADER)) { + return 1; + } + if (o2.getState().equals(ClusterInfo.MemberState.LEADER)) { + return -1; + } + return 0; + case FOLLOWER: + if (o1.getState().equals(ClusterInfo.MemberState.FOLLOWER)) { + return 1; + } + if (o2.getState().equals(ClusterInfo.MemberState.FOLLOWER)) { + return -1; + } + return 0; + case READ_ONLY_REPLICA: + if (o1.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) { + return 1; + } + if (o2.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) { + return -1; + } + return 0; + } + return 0; + }) + .findAny(); + } +} diff --git a/src/main/java/com/eventstore/dbclient/NodePreference.java b/src/main/java/com/eventstore/dbclient/NodePreference.java new file mode 100644 index 00000000..59878c6a --- /dev/null +++ b/src/main/java/com/eventstore/dbclient/NodePreference.java @@ -0,0 +1,5 @@ +package com.eventstore.dbclient; + +public enum NodePreference { + LEADER, FOLLOWER, READ_ONLY_REPLICA +} diff --git a/src/main/java/com/eventstore/dbclient/Timeouts.java b/src/main/java/com/eventstore/dbclient/Timeouts.java index 2e66e3fc..d7b1b44b 100644 --- a/src/main/java/com/eventstore/dbclient/Timeouts.java +++ b/src/main/java/com/eventstore/dbclient/Timeouts.java @@ -9,15 +9,22 @@ public class Timeouts { final long subscriptionTimeout; final TimeUnit subscriptionTimeoutUnit; + final long readGossipTimeout; + final TimeUnit readGossipTimeoutUnit; + public static final Timeouts DEFAULT = new Timeouts( + 5, TimeUnit.SECONDS, 5, TimeUnit.SECONDS, 5, TimeUnit.SECONDS); Timeouts(final long shutdownTimeout, final TimeUnit shutdownTimeoutUnit, - final long subscriptionTimeout, final TimeUnit subscriptionTimeoutUnit) { + final long subscriptionTimeout, final TimeUnit subscriptionTimeoutUnit, + final long readGossipTimeout, final TimeUnit readGossipTimeoutUnit) { this.shutdownTimeout = shutdownTimeout; this.shutdownTimeoutUnit = shutdownTimeoutUnit; this.subscriptionTimeout = subscriptionTimeout; this.subscriptionTimeoutUnit = subscriptionTimeoutUnit; + this.readGossipTimeout = readGossipTimeout; + this.readGossipTimeoutUnit = readGossipTimeoutUnit; } } diff --git a/src/main/java/com/eventstore/dbclient/TimeoutsBuilder.java b/src/main/java/com/eventstore/dbclient/TimeoutsBuilder.java index fab4ebd9..95b46424 100644 --- a/src/main/java/com/eventstore/dbclient/TimeoutsBuilder.java +++ b/src/main/java/com/eventstore/dbclient/TimeoutsBuilder.java @@ -9,10 +9,17 @@ public class TimeoutsBuilder { long subscriptionTimeout; TimeUnit subscriptionTimeoutUnit; + long readGossipTimeout; + TimeUnit readGossipTimeoutUnit; + public static TimeoutsBuilder newBuilder() { TimeoutsBuilder builder = new TimeoutsBuilder(); builder.shutdownTimeout = Timeouts.DEFAULT.shutdownTimeout; builder.shutdownTimeoutUnit = Timeouts.DEFAULT.shutdownTimeoutUnit; + builder.subscriptionTimeout = Timeouts.DEFAULT.subscriptionTimeout; + builder.subscriptionTimeoutUnit = Timeouts.DEFAULT.subscriptionTimeoutUnit; + builder.readGossipTimeout = Timeouts.DEFAULT.readGossipTimeout; + builder.readGossipTimeoutUnit = Timeouts.DEFAULT.readGossipTimeoutUnit; return builder; } @@ -28,8 +35,14 @@ public TimeoutsBuilder withSubscriptionTimeout(final long timeout, final TimeUni return this; } + public TimeoutsBuilder withReadGossipTimeout(final long timeout, final TimeUnit timeoutUnit) { + readGossipTimeout = timeout; + readGossipTimeoutUnit = timeoutUnit; + return this; + } + public Timeouts build() { - return new Timeouts(shutdownTimeout, shutdownTimeoutUnit, subscriptionTimeout, subscriptionTimeoutUnit); + return new Timeouts(shutdownTimeout, shutdownTimeoutUnit, subscriptionTimeout, subscriptionTimeoutUnit, readGossipTimeout, readGossipTimeoutUnit); } private TimeoutsBuilder() { diff --git a/src/main/proto/gossip.proto b/src/main/proto/gossip.proto new file mode 100644 index 00000000..5b6340ea --- /dev/null +++ b/src/main/proto/gossip.proto @@ -0,0 +1,44 @@ +syntax = "proto3"; +package event_store.client.gossip; +option java_package = "com.eventstore.dbclient.proto.gossip"; + +import "shared.proto"; + +service Gossip { + rpc Read (event_store.client.shared.Empty) returns (ClusterInfo); +} + +message ClusterInfo { + repeated MemberInfo members = 1; +} + +message EndPoint { + string address = 1; + uint32 port = 2; +} + +message MemberInfo { + enum VNodeState { + Initializing = 0; + DiscoverLeader = 1; + Unknown = 2; + PreReplica = 3; + CatchingUp = 4; + Clone = 5; + Follower = 6; + PreLeader = 7; + Leader = 8; + Manager = 9; + ShuttingDown = 10; + Shutdown = 11; + ReadOnlyLeaderless = 12; + PreReadOnlyReplica = 13; + ReadOnlyReplica = 14; + ResigningLeader = 15; + } + event_store.client.shared.UUID instance_id = 1; + int64 time_stamp = 2; + VNodeState state = 3; + bool is_alive = 4; + EndPoint http_end_point = 5; +} diff --git a/src/test/java/com/eventstore/dbclient/ClusterDiscoveryTests.java b/src/test/java/com/eventstore/dbclient/ClusterDiscoveryTests.java new file mode 100644 index 00000000..ba2d9091 --- /dev/null +++ b/src/test/java/com/eventstore/dbclient/ClusterDiscoveryTests.java @@ -0,0 +1,52 @@ +package com.eventstore.dbclient; + +import io.grpc.ManagedChannel; +import io.grpc.netty.shaded.io.grpc.netty.GrpcSslContexts; +import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder; +import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; +import io.grpc.netty.shaded.io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.junit.Test; +import testcontainers.module.EventStoreStreamsClient; + +import javax.net.ssl.SSLException; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; + +public class ClusterDiscoveryTests { + @Test + public void testThing() throws SSLException, ExecutionException, InterruptedException { +// UserCredentials creds = new UserCredentials("admin", "changeit"); +// +// SslContext sslContext = GrpcSslContexts. +// forClient(). +// trustManager(InsecureTrustManagerFactory.INSTANCE). +// build(); +// +// ManagedChannel channel = NettyChannelBuilder.forTarget("eventstore") +// .nameResolverFactory(new GossipResolverFactory()) +// .userAgent("Event Store Client (Java) v1.0.0-SNAPSHOT") +// .sslContext(sslContext) +// .build(); +// +// StreamsClient client = new StreamsClient(channel, creds, Timeouts.DEFAULT); +// +// final String streamName = UUID.randomUUID().toString(); +// final String eventType = "TestEvent"; +// final String eventId = "38fffbc2-339e-11ea-8c7b-784f43837872"; +// final byte[] eventMetaData = new byte[]{0xd, 0xe, 0xa, 0xd}; +// final byte[] eventData = new byte[]{0xb, 0xe, 0xe, 0xf}; +// +// List events = new ArrayList<>(); +// events.add(new ProposedEvent(UUID.fromString(eventId), eventType, "application/octet-stream", eventData, eventMetaData)); +// +// CompletableFuture future = client.appendToStream(streamName, SpecialStreamRevision.NO_STREAM, events); +// WriteResult result = future.get(); +// +// assertEquals(new StreamRevision(0), result.getNextExpectedRevision()); + } +}