1+ package com .eventstore .dbclient ;
2+
3+ import io .grpc .*;
4+ import io .grpc .netty .shaded .io .grpc .netty .NettyChannelBuilder ;
5+ import io .grpc .netty .shaded .io .netty .handler .ssl .SslContext ;
6+ import org .xbill .DNS .*;
7+
8+ import java .net .InetSocketAddress ;
9+ import java .net .SocketAddress ;
10+ import java .net .URI ;
11+ import java .util .*;
12+ import java .util .concurrent .CompletableFuture ;
13+ import java .util .concurrent .ExecutionException ;
14+
15+ public class ClusterResolverFactory extends NameResolverProvider {
16+ private static final Set <ClusterInfo .MemberState > invalidStates ;
17+ private static final Random random = new Random ();
18+
19+ static {
20+ invalidStates = new HashSet <ClusterInfo .MemberState >() {{
21+ add (ClusterInfo .MemberState .MANAGER );
22+ add (ClusterInfo .MemberState .SHUTTING_DOWN );
23+ add (ClusterInfo .MemberState .SHUT_DOWN );
24+ add (ClusterInfo .MemberState .UNKNOWN );
25+ add (ClusterInfo .MemberState .INITIALIZING );
26+ add (ClusterInfo .MemberState .RESIGNING_LEADER );
27+ add (ClusterInfo .MemberState .PRE_LEADER );
28+ add (ClusterInfo .MemberState .PRE_REPLICA );
29+ add (ClusterInfo .MemberState .PRE_READ_ONLY_REPLICA );
30+ add (ClusterInfo .MemberState .CLONE );
31+ add (ClusterInfo .MemberState .DISCOVER_LEADER );
32+ }};
33+ }
34+
35+ private List <InetSocketAddress > seedNodes ;
36+ private final NodePreference nodePreference ;
37+ private final SslContext sslContext ;
38+ private final Timeouts timeouts ;
39+
40+ public ClusterResolverFactory (List <InetSocketAddress > seedNodes , NodePreference nodePreference , Timeouts timeouts , SslContext sslContext ) {
41+ this .seedNodes = seedNodes ;
42+ this .nodePreference = nodePreference ;
43+ this .sslContext = sslContext ;
44+ this .timeouts = timeouts ;
45+ }
46+
47+ @ Override
48+ public NameResolver newNameResolver (URI targetUri , NameResolver .Args args ) {
49+ return new NameResolver () {
50+ @ Override
51+ public String getServiceAuthority () {
52+ return "eventStoreDBGossip" ;
53+ }
54+
55+ @ Override
56+ public void start (Listener2 listener ) {
57+ List <InetSocketAddress > candidates ;
58+
59+ if (seedNodes != null ) {
60+ candidates = new ArrayList <>(seedNodes );
61+ Collections .shuffle (candidates );
62+ } else {
63+ candidates = new ArrayList <>();
64+ try {
65+ org .xbill .DNS .Record [] records = new Lookup (targetUri .getHost (), Type .SRV ).run ();
66+ for (int i = 0 ; i < records .length ; ++i ) {
67+ SRVRecord record = (SRVRecord ) records [i ];
68+
69+ candidates .add (new InetSocketAddress (record .getName ().toString (true ), record .getPort ()));
70+ }
71+ } catch (TextParseException e ) {
72+ listener .onError (Status .INTERNAL );
73+ }
74+ }
75+
76+ for (InetSocketAddress seed : candidates ) {
77+ try {
78+ ClusterInfo .Endpoint endpoint = attemptDiscovery (seed ).get ();
79+ if (endpoint == null ) {
80+ continue ;
81+ }
82+
83+ InetSocketAddress addr = endpoint .toInetSocketAddress ();
84+ List <SocketAddress > addrs = new ArrayList <>();
85+ addrs .add (addr );
86+ EquivalentAddressGroup addrGroup = new EquivalentAddressGroup (addrs );
87+ List <EquivalentAddressGroup > addrGroups = new ArrayList <>();
88+ addrGroups .add (addrGroup );
89+
90+ listener .onResult (ResolutionResult .newBuilder ()
91+ .setAddresses (addrGroups )
92+ .setAttributes (Attributes .EMPTY )
93+ .build ());
94+ return ;
95+ } catch (InterruptedException | ExecutionException e ) {
96+ listener .onError (Status .INTERNAL );
97+ return ;
98+ }
99+ }
100+ }
101+
102+ @ Override
103+ public void shutdown () {
104+ }
105+ };
106+ }
107+
108+ @ Override
109+ public String getDefaultScheme () {
110+ return seedNodes != null ? "cluster_seeds" : "cluster_dns" ;
111+ }
112+
113+ private CompletableFuture <ClusterInfo .Endpoint > attemptDiscovery (InetSocketAddress seed ) {
114+ ManagedChannel channel = NettyChannelBuilder .forAddress (seed )
115+ .userAgent ("Event Store Client (Java) v1.0.0-SNAPSHOT" )
116+ .sslContext (this .sslContext )
117+ .build ();
118+ GossipClient client = new GossipClient (channel , timeouts );
119+ return client .read ()
120+ .thenApply (this ::determineBestFitNode )
121+ .thenApply (m -> m .map (ClusterInfo .Member ::getHttpEndpoint ).orElse (null ));
122+ }
123+
124+ private Optional <ClusterInfo .Member > determineBestFitNode (ClusterInfo clusterInfo ) {
125+ return clusterInfo .getMembers ()
126+ .stream ()
127+ .filter (ClusterInfo .Member ::isAlive )
128+ .filter (m -> !invalidStates .contains (m .getState ()))
129+ .sorted ((o1 , o2 ) -> {
130+ switch (nodePreference ) {
131+ case LEADER :
132+ if (o1 .getState ().equals (ClusterInfo .MemberState .LEADER )) {
133+ return -1 ;
134+ }
135+ if (o2 .getState ().equals (ClusterInfo .MemberState .LEADER )) {
136+ return 1 ;
137+ }
138+ return 0 ;
139+ case FOLLOWER :
140+ if (o1 .getState ().equals (ClusterInfo .MemberState .FOLLOWER )) {
141+ return -1 ;
142+ }
143+ if (o2 .getState ().equals (ClusterInfo .MemberState .FOLLOWER )) {
144+ return 1 ;
145+ }
146+ return 0 ;
147+ case READ_ONLY_REPLICA :
148+ if (o1 .getState ().equals (ClusterInfo .MemberState .READ_ONLY_REPLICA )) {
149+ return -1 ;
150+ }
151+ if (o2 .getState ().equals (ClusterInfo .MemberState .READ_ONLY_REPLICA )) {
152+ return 1 ;
153+ }
154+ return 0 ;
155+ case RANDOM :
156+ if (random .nextBoolean ()) {
157+ return 1 ;
158+ }
159+
160+ return 1 ;
161+ }
162+ return 0 ;
163+ }).findFirst ();
164+ }
165+
166+ @ Override
167+ protected boolean isAvailable () {
168+ return true ;
169+ }
170+
171+ @ Override
172+ protected int priority () {
173+ return 6 ; // We made sure to have an higher priority than the DNS resolver factory.
174+ }
175+ }
0 commit comments