Skip to content

Commit 08f2d9c

Browse files
committed
Implement cluster connection.
1 parent 08d5c05 commit 08f2d9c

12 files changed

+710
-11
lines changed

db-client-java/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ dependencies {
1717
implementation "javax.validation:validation-api:${validationApiVersion}"
1818

1919
implementation "com.google.protobuf:protobuf-java:${protobufVersion}"
20-
2120
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
2221
implementation "io.grpc:grpc-stub:${grpcVersion}"
2322
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
23+
implementation 'dnsjava:dnsjava:3.2.2'
2424

2525
testImplementation "junit:junit:${junitVersion}"
2626
testImplementation "org.slf4j:slf4j-nop:${slf4jNopVersion}"
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package com.eventstore.dbclient;
2+
3+
import com.eventstore.dbclient.proto.gossip.GossipOuterClass;
4+
import com.eventstore.dbclient.proto.shared.Shared;
5+
6+
import java.net.InetSocketAddress;
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.UUID;
10+
11+
public class ClusterInfo {
12+
private final List<Member> members;
13+
14+
public ClusterInfo(List<Member> members) {
15+
this.members = members;
16+
}
17+
18+
public List<Member> getMembers() {
19+
return members;
20+
}
21+
22+
static ClusterInfo fromWire(GossipOuterClass.ClusterInfo wire) {
23+
List<ClusterInfo.Member> members = new ArrayList<>();
24+
for (GossipOuterClass.MemberInfo member : wire.getMembersList()) {
25+
UUID instanceId;
26+
if (member.getInstanceId().hasStructured()) {
27+
Shared.UUID.Structured structured = member.getInstanceId().getStructured();
28+
instanceId = new UUID(structured.getMostSignificantBits(), structured.getLeastSignificantBits());
29+
} else {
30+
instanceId = UUID.fromString(member.getInstanceId().getString());
31+
}
32+
boolean isAlive = member.getIsAlive();
33+
MemberState state = MemberState.fromWire(member.getState());
34+
Endpoint httpEndpoint = new Endpoint(member.getHttpEndPoint().getAddress(), member.getHttpEndPoint().getPort());
35+
36+
members.add(new Member(instanceId, isAlive, state, httpEndpoint));
37+
}
38+
39+
return new ClusterInfo(members);
40+
}
41+
42+
public enum MemberState {
43+
INITIALIZING, DISCOVER_LEADER, UNKNOWN, PRE_REPLICA, CATCHING_UP, CLONE,
44+
FOLLOWER, PRE_LEADER, LEADER, MANAGER, SHUTTING_DOWN, SHUT_DOWN, READ_ONLY_LEADERLESS,
45+
PRE_READ_ONLY_REPLICA, READ_ONLY_REPLICA, RESIGNING_LEADER;
46+
47+
static MemberState fromWire(GossipOuterClass.MemberInfo.VNodeState state) {
48+
switch (state) {
49+
case Initializing:
50+
return INITIALIZING;
51+
case DiscoverLeader:
52+
return DISCOVER_LEADER;
53+
case PreReplica:
54+
return PRE_REPLICA;
55+
case CatchingUp:
56+
return CATCHING_UP;
57+
case Clone:
58+
return CLONE;
59+
case Follower:
60+
return FOLLOWER;
61+
case PreLeader:
62+
return PRE_LEADER;
63+
case Leader:
64+
return LEADER;
65+
case Manager:
66+
return MANAGER;
67+
case ShuttingDown:
68+
return SHUTTING_DOWN;
69+
case Shutdown:
70+
return SHUT_DOWN;
71+
case ReadOnlyLeaderless:
72+
return READ_ONLY_LEADERLESS;
73+
case PreReadOnlyReplica:
74+
return PRE_READ_ONLY_REPLICA;
75+
case ReadOnlyReplica:
76+
return READ_ONLY_REPLICA;
77+
case ResigningLeader:
78+
return RESIGNING_LEADER;
79+
}
80+
return UNKNOWN;
81+
}
82+
}
83+
84+
public static class Endpoint {
85+
private final String address;
86+
private final int port;
87+
88+
Endpoint(String address, int port) {
89+
this.address = address;
90+
this.port = port;
91+
}
92+
93+
InetSocketAddress toInetSocketAddress() {
94+
return new InetSocketAddress(this.address, this.port);
95+
}
96+
97+
public String getAddress() {
98+
return address;
99+
}
100+
101+
public int getPort() {
102+
return port;
103+
}
104+
}
105+
106+
public static class Member {
107+
private final UUID instanceId;
108+
private final boolean isAlive;
109+
private final MemberState state;
110+
private final Endpoint httpEndpoint;
111+
112+
Member(UUID instanceId, boolean isAlive, MemberState state, Endpoint httpEndpoint) {
113+
this.instanceId = instanceId;
114+
this.isAlive = isAlive;
115+
this.state = state;
116+
this.httpEndpoint = httpEndpoint;
117+
}
118+
119+
public UUID getInstanceId() {
120+
return instanceId;
121+
}
122+
123+
public boolean isAlive() {
124+
return isAlive;
125+
}
126+
127+
public MemberState getState() {
128+
return state;
129+
}
130+
131+
public Endpoint getHttpEndpoint() {
132+
return httpEndpoint;
133+
}
134+
}
135+
}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package com.eventstore.dbclient;
2+
3+
import io.grpc.*;
4+
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
5+
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
6+
import org.xbill.DNS.*;
7+
8+
import java.net.InetSocketAddress;
9+
import java.net.SocketAddress;
10+
import java.net.URI;
11+
import java.util.*;
12+
import java.util.concurrent.CompletableFuture;
13+
import java.util.concurrent.ExecutionException;
14+
15+
public class ClusterResolverFactory extends NameResolverProvider {
16+
private static final Set<ClusterInfo.MemberState> invalidStates;
17+
private static final Random random = new Random();
18+
19+
static {
20+
invalidStates = new HashSet<ClusterInfo.MemberState>() {{
21+
add(ClusterInfo.MemberState.MANAGER);
22+
add(ClusterInfo.MemberState.SHUTTING_DOWN);
23+
add(ClusterInfo.MemberState.SHUT_DOWN);
24+
add(ClusterInfo.MemberState.UNKNOWN);
25+
add(ClusterInfo.MemberState.INITIALIZING);
26+
add(ClusterInfo.MemberState.RESIGNING_LEADER);
27+
add(ClusterInfo.MemberState.PRE_LEADER);
28+
add(ClusterInfo.MemberState.PRE_REPLICA);
29+
add(ClusterInfo.MemberState.PRE_READ_ONLY_REPLICA);
30+
add(ClusterInfo.MemberState.CLONE);
31+
add(ClusterInfo.MemberState.DISCOVER_LEADER);
32+
}};
33+
}
34+
35+
private List<InetSocketAddress> seedNodes;
36+
private final NodePreference nodePreference;
37+
private final SslContext sslContext;
38+
private final Timeouts timeouts;
39+
40+
public ClusterResolverFactory(List<InetSocketAddress> seedNodes, NodePreference nodePreference, Timeouts timeouts, SslContext sslContext) {
41+
this.seedNodes = seedNodes;
42+
this.nodePreference = nodePreference;
43+
this.sslContext = sslContext;
44+
this.timeouts = timeouts;
45+
}
46+
47+
@Override
48+
public NameResolver newNameResolver(URI targetUri, NameResolver.Args args) {
49+
return new NameResolver() {
50+
@Override
51+
public String getServiceAuthority() {
52+
return "eventStoreDBGossip";
53+
}
54+
55+
@Override
56+
public void start(Listener2 listener) {
57+
List<InetSocketAddress> candidates;
58+
59+
if (seedNodes != null) {
60+
candidates = new ArrayList<>(seedNodes);
61+
Collections.shuffle(candidates);
62+
} else {
63+
candidates = new ArrayList<>();
64+
try {
65+
org.xbill.DNS.Record[] records = new Lookup(targetUri.getHost(), Type.SRV).run();
66+
for (int i = 0; i < records.length; ++i) {
67+
SRVRecord record = (SRVRecord) records[i];
68+
69+
candidates.add(new InetSocketAddress(record.getName().toString(true), record.getPort()));
70+
}
71+
} catch (TextParseException e) {
72+
listener.onError(Status.INTERNAL);
73+
}
74+
}
75+
76+
for (InetSocketAddress seed : candidates) {
77+
try {
78+
ClusterInfo.Endpoint endpoint = attemptDiscovery(seed).get();
79+
if (endpoint == null) {
80+
continue;
81+
}
82+
83+
InetSocketAddress addr = endpoint.toInetSocketAddress();
84+
List<SocketAddress> addrs = new ArrayList<>();
85+
addrs.add(addr);
86+
EquivalentAddressGroup addrGroup = new EquivalentAddressGroup(addrs);
87+
List<EquivalentAddressGroup> addrGroups = new ArrayList<>();
88+
addrGroups.add(addrGroup);
89+
90+
listener.onResult(ResolutionResult.newBuilder()
91+
.setAddresses(addrGroups)
92+
.setAttributes(Attributes.EMPTY)
93+
.build());
94+
return;
95+
} catch (InterruptedException | ExecutionException e) {
96+
listener.onError(Status.INTERNAL);
97+
return;
98+
}
99+
}
100+
}
101+
102+
@Override
103+
public void shutdown() {
104+
}
105+
};
106+
}
107+
108+
@Override
109+
public String getDefaultScheme() {
110+
return seedNodes != null ? "cluster_seeds" : "cluster_dns";
111+
}
112+
113+
private CompletableFuture<ClusterInfo.Endpoint> attemptDiscovery(InetSocketAddress seed) {
114+
ManagedChannel channel = NettyChannelBuilder.forAddress(seed)
115+
.userAgent("Event Store Client (Java) v1.0.0-SNAPSHOT")
116+
.sslContext(this.sslContext)
117+
.build();
118+
GossipClient client = new GossipClient(channel, timeouts);
119+
return client.read()
120+
.thenApply(this::determineBestFitNode)
121+
.thenApply(m -> m.map(ClusterInfo.Member::getHttpEndpoint).orElse(null));
122+
}
123+
124+
private Optional<ClusterInfo.Member> determineBestFitNode(ClusterInfo clusterInfo) {
125+
return clusterInfo.getMembers()
126+
.stream()
127+
.filter(ClusterInfo.Member::isAlive)
128+
.filter(m -> !invalidStates.contains(m.getState()))
129+
.sorted((o1, o2) -> {
130+
switch (nodePreference) {
131+
case LEADER:
132+
if (o1.getState().equals(ClusterInfo.MemberState.LEADER)) {
133+
return 1;
134+
}
135+
if (o2.getState().equals(ClusterInfo.MemberState.LEADER)) {
136+
return -1;
137+
}
138+
return 0;
139+
case FOLLOWER:
140+
if (o1.getState().equals(ClusterInfo.MemberState.FOLLOWER)) {
141+
return 1;
142+
}
143+
if (o2.getState().equals(ClusterInfo.MemberState.FOLLOWER)) {
144+
return -1;
145+
}
146+
return 0;
147+
case READ_ONLY_REPLICA:
148+
if (o1.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) {
149+
return 1;
150+
}
151+
if (o2.getState().equals(ClusterInfo.MemberState.READ_ONLY_REPLICA)) {
152+
return -1;
153+
}
154+
return 0;
155+
case RANDOM:
156+
if (random.nextBoolean()) {
157+
return 1;
158+
}
159+
160+
return 1;
161+
}
162+
return 0;
163+
})
164+
.findAny();
165+
}
166+
167+
@Override
168+
protected boolean isAvailable() {
169+
return true;
170+
}
171+
172+
@Override
173+
protected int priority() {
174+
return 6;
175+
} // We made sure to have an higher priority than the DNS resolver factory.
176+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.eventstore.dbclient;
2+
3+
public class Endpoint {
4+
private String hostname;
5+
private int port;
6+
7+
public Endpoint(String hostname, int port) {
8+
this.hostname = hostname;
9+
this.port = port;
10+
}
11+
12+
public String getHostname() {
13+
return hostname;
14+
}
15+
16+
public int getPort() {
17+
return port;
18+
}
19+
}

0 commit comments

Comments
 (0)