1818
1919import io .etcd .jetcd .ByteSequence ;
2020import io .etcd .jetcd .Client ;
21+ import io .etcd .jetcd .KV ;
2122import io .etcd .jetcd .Watch ;
22- import io .etcd .jetcd .launcher .junit4 .EtcdClusterResource ;
23+ import io .etcd .jetcd .launcher .EtcdCluster ;
24+ import io .etcd .jetcd .launcher .EtcdClusterFactory ;
2325import io .etcd .jetcd .options .DeleteOption ;
2426import io .etcd .jetcd .options .GetOption ;
2527import io .etcd .jetcd .watch .WatchResponse ;
2628import org .apache .seata .discovery .registry .RegistryService ;
27- import org .junit .Rule ;
2829import org .junit .jupiter .api .AfterAll ;
30+ import org .junit .jupiter .api .AfterEach ;
2931import org .junit .jupiter .api .BeforeAll ;
32+ import org .junit .jupiter .api .BeforeEach ;
3033import org .junit .jupiter .api .Disabled ;
3134import org .junit .jupiter .api .Test ;
3235
3336import java .net .InetSocketAddress ;
37+ import java .net .URI ;
3438import java .util .List ;
3539import java .util .concurrent .TimeUnit ;
3640
4246public class EtcdRegistryServiceImplTest {
4347 private static final String REGISTRY_KEY_PREFIX = "registry-seata-" ;
4448 private static final String CLUSTER_NAME = "default" ;
45-
46- @ Rule
47- private static final EtcdClusterResource etcd = new EtcdClusterResource (CLUSTER_NAME , 1 );
48-
49- private final Client client =
50- Client .builder ().endpoints (etcd .getClientEndpoints ()).build ();
5149 private static final String HOST = "127.0.0.1" ;
5250 private static final int PORT = 8091 ;
5351
52+ private static EtcdCluster etcd ;
53+ private static Client client ;
54+ private static List <URI > clientEndpoints ;
55+
5456 @ BeforeAll
55- public static void beforeClass () throws Exception {
56- System .setProperty (
57- EtcdRegistryServiceImpl .TEST_ENDPONT ,
58- etcd .getClientEndpoints ().get (0 ).toString ());
57+ public static void beforeAll () {
58+ etcd = EtcdClusterFactory .buildCluster (CLUSTER_NAME , 1 , false );
59+ etcd .start ();
60+ clientEndpoints = etcd .getClientEndpoints ();
61+ client = Client .builder ().endpoints (clientEndpoints ).build ();
5962 }
6063
6164 @ AfterAll
62- public static void afterClass () throws Exception {
63- System .setProperty (EtcdRegistryServiceImpl .TEST_ENDPONT , "" );
65+ public static void afterAll () {
66+ if (client != null ) {
67+ client .close ();
68+ }
69+ if (etcd != null ) {
70+ etcd .close ();
71+ }
72+ System .clearProperty (EtcdRegistryServiceImpl .TEST_ENDPONT );
73+ }
74+
75+ @ BeforeEach
76+ public void setUp () {
77+ String endpoint = clientEndpoints .get (0 ).toString ();
78+ System .setProperty (EtcdRegistryServiceImpl .TEST_ENDPONT , endpoint );
79+ }
80+
81+ @ AfterEach
82+ public void tearDown () throws Exception {
83+ KV kvClient = client .getKVClient ();
84+ ByteSequence keyPrefix = buildRegistryKeyPrefix ();
85+ DeleteOption deleteOption =
86+ DeleteOption .newBuilder ().withPrefix (keyPrefix ).build ();
87+ kvClient .delete (keyPrefix , deleteOption ).get ();
6488 }
6589
6690 @ Test
6791 public void testRegister () throws Exception {
6892 RegistryService registryService = new EtcdRegistryProvider ().provide ();
6993 InetSocketAddress inetSocketAddress = new InetSocketAddress (HOST , PORT );
70- // 1.register
94+ // 1. Register the service instance.
7195 registryService .register (inetSocketAddress );
72- // 2.get instance information
96+ // 2. Verify the registration by directly querying etcd.
7397 GetOption getOption =
7498 GetOption .newBuilder ().withPrefix (buildRegistryKeyPrefix ()).build ();
7599 long count = client .getKVClient ().get (buildRegistryKeyPrefix (), getOption ).get ().getKvs ().stream ()
@@ -87,7 +111,7 @@ public void testUnregister() throws Exception {
87111 InetSocketAddress inetSocketAddress = new InetSocketAddress (HOST , PORT );
88112 // 1.register
89113 registryService .register (inetSocketAddress );
90- // 2.get instance information
114+ // 2. Verify it was registered successfully.
91115 GetOption getOption =
92116 GetOption .newBuilder ().withPrefix (buildRegistryKeyPrefix ()).build ();
93117 long count = client .getKVClient ().get (buildRegistryKeyPrefix (), getOption ).get ().getKvs ().stream ()
@@ -97,10 +121,9 @@ public void testUnregister() throws Exception {
97121 })
98122 .count ();
99123 assertThat (count ).isEqualTo (1 );
100- // 3.unregister
124+ // 3. Unregister the instance.
101125 registryService .unregister (inetSocketAddress );
102- // 4.again get instance information
103- getOption = GetOption .newBuilder ().withPrefix (buildRegistryKeyPrefix ()).build ();
126+ // 4. Verify it was successfully removed from etcd.
104127 count = client .getKVClient ().get (buildRegistryKeyPrefix (), getOption ).get ().getKvs ().stream ()
105128 .filter (keyValue -> {
106129 String [] instanceInfo = keyValue .getValue ().toString (UTF_8 ).split (":" );
@@ -118,8 +141,8 @@ public void testSubscribe() throws Exception {
118141 registryService .register (inetSocketAddress );
119142 // 2.subscribe
120143 EtcdListener etcdListener = new EtcdListener ();
121- registryService .subscribe (CLUSTER_NAME , etcdListener );
122- // 3.delete instance,see if the listener can be notified
144+ registryService .subscribe (DEFAULT_TX_GROUP , etcdListener );
145+ // 3. Delete the instance key and verify the listener is notified.
123146 DeleteOption deleteOption =
124147 DeleteOption .newBuilder ().withPrefix (buildRegistryKeyPrefix ()).build ();
125148 client .getKVClient ().delete (buildRegistryKeyPrefix (), deleteOption ).get ();
@@ -134,14 +157,14 @@ public void testUnsubscribe() throws Exception {
134157 registryService .register (inetSocketAddress );
135158 // 2.subscribe
136159 EtcdListener etcdListener = new EtcdListener ();
137- registryService .subscribe (CLUSTER_NAME , etcdListener );
160+ registryService .subscribe (DEFAULT_TX_GROUP , etcdListener );
138161 // 3.delete instance,see if the listener can be notified
139162 DeleteOption deleteOption =
140163 DeleteOption .newBuilder ().withPrefix (buildRegistryKeyPrefix ()).build ();
141164 client .getKVClient ().delete (buildRegistryKeyPrefix (), deleteOption ).get ();
142165 assertThat (etcdListener .isNotified ()).isTrue ();
143166 // 4.unsubscribe
144- registryService .unsubscribe (CLUSTER_NAME , etcdListener );
167+ registryService .unsubscribe (DEFAULT_TX_GROUP , etcdListener );
145168 // 5.reset
146169 etcdListener .reset ();
147170 // 6.put instance,the listener should not be notified
@@ -159,47 +182,56 @@ public void testLookup() throws Exception {
159182 registryService .register (inetSocketAddress );
160183 // 2.lookup
161184 List <InetSocketAddress > inetSocketAddresses = registryService .lookup (DEFAULT_TX_GROUP );
162- assertThat (inetSocketAddresses ).size ().isEqualTo (1 );
185+ // 3.Verify that the correct instance is returned.
186+ assertThat (inetSocketAddresses ).hasSize (1 );
187+ assertThat (inetSocketAddresses .get (0 ).getAddress ().getHostAddress ()).isEqualTo (HOST );
188+ assertThat (inetSocketAddresses .get (0 ).getPort ()).isEqualTo (PORT );
163189 }
164190
165191 /**
166- * build registry key prefix
167- *
168- * @return
192+ * Builds the etcd key prefix for a given service group.
193+ * The key prefix includes the transaction service group as is standard in Seata.
194+ * @return ByteSequence of the prefix
169195 */
170196 private ByteSequence buildRegistryKeyPrefix () {
171- return ByteSequence .from (REGISTRY_KEY_PREFIX , UTF_8 );
197+ return ByteSequence .from (REGISTRY_KEY_PREFIX + DEFAULT_TX_GROUP , UTF_8 );
172198 }
173199
174200 /**
175- * etcd listener
201+ * Listener implementation for testing subscription notifications.
176202 */
177203 private static class EtcdListener implements Watch .Listener {
178- private boolean notified = false ;
204+ private volatile boolean notified = false ;
179205
180206 @ Override
181207 public void onNext (WatchResponse response ) {
182208 notified = true ;
183209 }
184210
185211 @ Override
186- public void onError (Throwable throwable ) {}
212+ public void onError (Throwable throwable ) {
213+ // No-op for this test
214+ }
187215
188216 @ Override
189- public void onCompleted () {}
217+ public void onCompleted () {
218+ // No-op for this test
219+ }
190220
191221 /**
192- * @return
222+ * Waits for a short period to allow the async notification to arrive.
223+ * @return true if a notification was received.
193224 */
194225 public boolean isNotified () throws InterruptedException {
195- TimeUnit .SECONDS .sleep (3 );
226+ // Give some time for the watch event to be processed
227+ TimeUnit .SECONDS .sleep (1 );
196228 return notified ;
197229 }
198230
199231 /**
200- * reset
232+ * Resets the notification flag for subsequent assertions.
201233 */
202- private void reset () {
234+ public void reset () {
203235 this .notified = false ;
204236 }
205237 }
0 commit comments