Skip to content

Implement gossip-based node selection and discovery #18

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.gradle.api.tasks.testing.logging.TestLogEvent
plugins {
id "java"
id "com.google.protobuf" version "0.8.12"
id "idea"
}

group 'com.eventstore'
Expand Down
135 changes: 135 additions & 0 deletions src/main/java/com/eventstore/dbclient/ClusterInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package com.eventstore.dbclient;

import com.eventstore.dbclient.proto.gossip.GossipOuterClass;
import com.eventstore.dbclient.proto.shared.Shared;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

public class ClusterInfo {
private final List<Member> members;

public ClusterInfo(List<Member> members) {
this.members = members;
}

public List<Member> getMembers() {
return members;
}

static ClusterInfo fromWire(GossipOuterClass.ClusterInfo wire) {
List<ClusterInfo.Member> members = new ArrayList<>();
for (GossipOuterClass.MemberInfo member : wire.getMembersList()) {
UUID instanceId;
if (member.getInstanceId().hasStructured()) {
Shared.UUID.Structured structured = member.getInstanceId().getStructured();
instanceId = new UUID(structured.getMostSignificantBits(), structured.getLeastSignificantBits());
} else {
instanceId = UUID.fromString(member.getInstanceId().getString());
}
boolean isAlive = member.getIsAlive();
MemberState state = MemberState.fromWire(member.getState());
Endpoint httpEndpoint = new Endpoint(member.getHttpEndPoint().getAddress(), member.getHttpEndPoint().getPort());

members.add(new Member(instanceId, isAlive, state, httpEndpoint));
}

return new ClusterInfo(members);
}

public enum MemberState {
INITIALIZING, DISCOVER_LEADER, UNKNOWN, PRE_REPLICA, CATCHING_UP, CLONE,
FOLLOWER, PRE_LEADER, LEADER, MANAGER, SHUTTING_DOWN, SHUT_DOWN, READ_ONLY_LEADERLESS,
PRE_READ_ONLY_REPLICA, READ_ONLY_REPLICA, RESIGNING_LEADER;

static MemberState fromWire(GossipOuterClass.MemberInfo.VNodeState state) {
switch (state) {
case Initializing:
return INITIALIZING;
case DiscoverLeader:
return DISCOVER_LEADER;
case PreReplica:
return PRE_REPLICA;
case CatchingUp:
return CATCHING_UP;
case Clone:
return CLONE;
case Follower:
return FOLLOWER;
case PreLeader:
return PRE_LEADER;
case Leader:
return LEADER;
case Manager:
return MANAGER;
case ShuttingDown:
return SHUTTING_DOWN;
case Shutdown:
return SHUT_DOWN;
case ReadOnlyLeaderless:
return READ_ONLY_LEADERLESS;
case PreReadOnlyReplica:
return PRE_READ_ONLY_REPLICA;
case ReadOnlyReplica:
return READ_ONLY_REPLICA;
case ResigningLeader:
return RESIGNING_LEADER;
}
return UNKNOWN;
}
}

public static class Endpoint {
private final String address;
private final int port;

Endpoint(String address, int port) {
this.address = address;
this.port = port;
}

InetSocketAddress toInetSocketAddress() {
return new InetSocketAddress(this.address, this.port);
}

public String getAddress() {
return address;
}

public int getPort() {
return port;
}
}

public static class Member {
private final UUID instanceId;
private final boolean isAlive;
private final MemberState state;
private final Endpoint httpEndpoint;

Member(UUID instanceId, boolean isAlive, MemberState state, Endpoint httpEndpoint) {
this.instanceId = instanceId;
this.isAlive = isAlive;
this.state = state;
this.httpEndpoint = httpEndpoint;
}

public UUID getInstanceId() {
return instanceId;
}

public boolean isAlive() {
return isAlive;
}

public MemberState getState() {
return state;
}

public Endpoint getHttpEndpoint() {
return httpEndpoint;
}
}
}
4 changes: 4 additions & 0 deletions src/main/java/com/eventstore/dbclient/EndpointDiscoverer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.eventstore.dbclient;

public interface EndpointDiscoverer {
}
49 changes: 49 additions & 0 deletions src/main/java/com/eventstore/dbclient/GossipClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package com.eventstore.dbclient;

import com.eventstore.dbclient.proto.gossip.GossipGrpc;
import com.eventstore.dbclient.proto.gossip.GossipOuterClass;
import com.eventstore.dbclient.proto.shared.Shared;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;

import java.util.concurrent.CompletableFuture;

public class GossipClient {
private final ManagedChannel channel;
private final GossipGrpc.GossipStub stub;
private final Timeouts timeouts;

public GossipClient(ManagedChannel channel, Timeouts timeouts) {
this.channel = channel;
this.timeouts = timeouts;

this.stub = GossipGrpc.newStub(channel);
}

public void shutdown() throws InterruptedException {
this.channel.shutdown().awaitTermination(timeouts.shutdownTimeout, timeouts.shutdownTimeoutUnit);
}

public CompletableFuture<ClusterInfo> readGossip() {
CompletableFuture<ClusterInfo> future = new CompletableFuture<>();

this.stub.read(Shared.Empty.getDefaultInstance(), new StreamObserver<GossipOuterClass.ClusterInfo>() {
@Override
public void onNext(GossipOuterClass.ClusterInfo value) {
ClusterInfo info = ClusterInfo.fromWire(value);
future.complete(info);
}

@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}

@Override
public void onCompleted() {
}
});

return future;
}
}
141 changes: 141 additions & 0 deletions src/main/java/com/eventstore/dbclient/GossipResolverFactory.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package com.eventstore.dbclient;

import io.grpc.*;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

public class GossipResolverFactory extends NameResolver.Factory {
private static final Set<ClusterInfo.MemberState> invalidStates;

static {
invalidStates = new HashSet<ClusterInfo.MemberState>() {{
add(ClusterInfo.MemberState.MANAGER);
add(ClusterInfo.MemberState.SHUTTING_DOWN);
add(ClusterInfo.MemberState.SHUT_DOWN);
add(ClusterInfo.MemberState.UNKNOWN);
add(ClusterInfo.MemberState.INITIALIZING);
add(ClusterInfo.MemberState.RESIGNING_LEADER);
add(ClusterInfo.MemberState.PRE_LEADER);
add(ClusterInfo.MemberState.PRE_REPLICA);
add(ClusterInfo.MemberState.PRE_READ_ONLY_REPLICA);
add(ClusterInfo.MemberState.CLONE);
add(ClusterInfo.MemberState.DISCOVER_LEADER);
}};
}

private final List<InetSocketAddress> seedNodes;
private final NodePreference nodePreference;
private final SslContext sslContext;
private final Timeouts timeouts;

public GossipResolverFactory(List<InetSocketAddress> seedNodes, NodePreference nodePreference, Timeouts timeouts, SslContext sslContext) {
this.seedNodes = seedNodes;
this.nodePreference = nodePreference;
this.sslContext = sslContext;
this.timeouts = timeouts;
}

@Override
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
return new NameResolver() {
@Override
public String getServiceAuthority() {
return "eventStoreDBGossip";
}

@Override
public void start(Listener2 listener) {
Collections.shuffle(seedNodes);

for (InetSocketAddress seed : seedNodes) {
try {
ClusterInfo.Endpoint endpoint = attemptDiscovery(seed).get();
if (endpoint == null) {
continue;
}

InetSocketAddress addr = endpoint.toInetSocketAddress();
List<SocketAddress> addrs = new ArrayList<>();
addrs.add(addr);
EquivalentAddressGroup addrGroup = new EquivalentAddressGroup(addrs);
List<EquivalentAddressGroup> addrGroups = new ArrayList<>();
addrGroups.add(addrGroup);

listener.onResult(ResolutionResult.newBuilder()
.setAddresses(addrGroups)
.setAttributes(Attributes.EMPTY)
.build());
return;
} catch (InterruptedException | ExecutionException e) {
listener.onError(Status.INTERNAL);
return;
}
}
}

@Override
public void shutdown() {
}
};
}

@Override
public String getDefaultScheme() {
return "eventstore";
}

private CompletableFuture<ClusterInfo.Endpoint> attemptDiscovery(InetSocketAddress seed) {
ManagedChannel channel = NettyChannelBuilder.forAddress(seed)
.userAgent("Event Store Client (Java) v1.0.0-SNAPSHOT")
.sslContext(this.sslContext)
.build();
GossipClient client = new GossipClient(channel, timeouts);
return client.readGossip()
.thenApply(this::determineBestFitNode)
.thenApply(m -> m.map(ClusterInfo.Member::getHttpEndpoint).orElse(null));
}

private Optional<ClusterInfo.Member> determineBestFitNode(ClusterInfo clusterInfo) {
return clusterInfo.getMembers()
.stream()
.filter(ClusterInfo.Member::isAlive)
.filter(m -> !invalidStates.contains(m.getState()))
.sorted((o1, o2) -> {
switch (nodePreference) {
case LEADER:
if (o1.getState().equals(ClusterInfo.MemberState.LEADER)) {
return 1;
}
if (o2.getState().equals(ClusterInfo.MemberState.LEADER)) {
return -1;
}
return 0;
case FOLLOWER:
if (o1.getState().equals(ClusterInfo.MemberState.FOLLOWER)) {
return 1;
}
if (o2.getState().equals(ClusterInfo.MemberState.FOLLOWER)) {
return -1;
}
return 0;
case READ_ONLY_REPLICA:
if (o1.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) {
return 1;
}
if (o2.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) {
return -1;
}
return 0;
}
return 0;
})
.findAny();
}
}
5 changes: 5 additions & 0 deletions src/main/java/com/eventstore/dbclient/NodePreference.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package com.eventstore.dbclient;

public enum NodePreference {
LEADER, FOLLOWER, READ_ONLY_REPLICA
}
9 changes: 8 additions & 1 deletion src/main/java/com/eventstore/dbclient/Timeouts.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,22 @@ public class Timeouts {
final long subscriptionTimeout;
final TimeUnit subscriptionTimeoutUnit;

final long readGossipTimeout;
final TimeUnit readGossipTimeoutUnit;

public static final Timeouts DEFAULT = new Timeouts(
5, TimeUnit.SECONDS,
5, TimeUnit.SECONDS,
5, TimeUnit.SECONDS);

Timeouts(final long shutdownTimeout, final TimeUnit shutdownTimeoutUnit,
final long subscriptionTimeout, final TimeUnit subscriptionTimeoutUnit) {
final long subscriptionTimeout, final TimeUnit subscriptionTimeoutUnit,
final long readGossipTimeout, final TimeUnit readGossipTimeoutUnit) {
this.shutdownTimeout = shutdownTimeout;
this.shutdownTimeoutUnit = shutdownTimeoutUnit;
this.subscriptionTimeout = subscriptionTimeout;
this.subscriptionTimeoutUnit = subscriptionTimeoutUnit;
this.readGossipTimeout = readGossipTimeout;
this.readGossipTimeoutUnit = readGossipTimeoutUnit;
}
}
Loading