Skip to content

Commit f5a0b31

Browse files
committed
merge 3.3
2 parents 712763e + bc543b6 commit f5a0b31

File tree

8 files changed

+127
-15
lines changed

8 files changed

+127
-15
lines changed

dubbo-common/src/test/java/org/apache/dubbo/common/bytecode/WrapperTest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import org.apache.dubbo.common.utils.ClassUtils;
2020

21+
import java.util.Arrays;
22+
2123
import org.junit.jupiter.api.Assertions;
2224
import org.junit.jupiter.api.Test;
2325
import org.slf4j.Logger;
@@ -179,9 +181,14 @@ void test_getDeclaredMethodNames_ContainExtendsParentMethods() throws Exception
179181

180182
@Test
181183
void test_getMethodNames_ContainExtendsParentMethods() throws Exception {
182-
assertArrayEquals(
183-
new String[] {"hello", "world"}, Wrapper.getWrapper(Son.class).getMethodNames());
184-
assertArrayEquals(new String[] {"hello", "world"}, ClassUtils.getMethodNames(Son.class));
184+
String[] methodNamesFromWrapepr = Wrapper.getWrapper(Son.class).getMethodNames();
185+
String[] methodNamesFromClassUtils = ClassUtils.getMethodNames(Son.class);
186+
187+
Arrays.sort(methodNamesFromWrapepr);
188+
Arrays.sort(methodNamesFromClassUtils);
189+
190+
assertArrayEquals(new String[] {"hello", "world"}, methodNamesFromWrapepr);
191+
assertArrayEquals(new String[] {"hello", "world"}, methodNamesFromClassUtils);
185192
}
186193

187194
@Test

dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.util.concurrent.CompletableFuture;
3939
import java.util.concurrent.ConcurrentHashMap;
4040
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.RejectedExecutionException;
4142
import java.util.concurrent.TimeUnit;
4243

4344
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
@@ -324,7 +325,12 @@ public void run(Timeout timeout) {
324325

325326
ExecutorService executor = future.getExecutor();
326327
if (executor != null && !executor.isShutdown()) {
327-
executor.execute(() -> notifyTimeout(future));
328+
try {
329+
executor.execute(() -> notifyTimeout(future));
330+
} catch (RejectedExecutionException e) {
331+
notifyTimeout(future);
332+
throw e;
333+
}
328334
} else {
329335
notifyTimeout(future);
330336
}

dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/exchange/support/DefaultFutureTest.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@
2727

2828
import java.time.LocalDateTime;
2929
import java.time.format.DateTimeFormatter;
30+
import java.util.concurrent.ArrayBlockingQueue;
3031
import java.util.concurrent.ExecutorService;
32+
import java.util.concurrent.ThreadPoolExecutor;
33+
import java.util.concurrent.TimeUnit;
3134
import java.util.concurrent.atomic.AtomicInteger;
3235

3336
import org.junit.jupiter.api.Assertions;
@@ -224,6 +227,43 @@ void testClose1() {
224227
Assertions.assertFalse(executor.isTerminated());
225228
}
226229

230+
@Test
231+
void testTimeoutWithRejectedExecution() throws Exception {
232+
// Create a ThreadPoolExecutor with a queue capacity of 1
233+
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
234+
1, // corePoolSize
235+
1, // maxPoolSize
236+
60L,
237+
TimeUnit.SECONDS,
238+
new ArrayBlockingQueue<>(1), // queue capacity is 1
239+
new ThreadPoolExecutor.AbortPolicy() // default rejection policy: throws exception
240+
);
241+
// Submit two tasks to occupy the thread and the queue
242+
customExecutor.submit(() -> {
243+
try {
244+
Thread.sleep(500); // occupy the thread for a while
245+
} catch (InterruptedException ignored) {
246+
}
247+
});
248+
customExecutor.submit(() -> {
249+
try {
250+
Thread.sleep(500); // occupy the queue
251+
} catch (InterruptedException ignored) {
252+
}
253+
});
254+
// Create a Dubbo Mock Channel and a request
255+
Channel channel = new MockedChannel();
256+
Request request = new Request(999);
257+
// Use Dubbo's newFuture and pass in the custom thread pool
258+
DefaultFuture future = DefaultFuture.newFuture(channel, request, 100, customExecutor);
259+
// Mark the request as sent
260+
DefaultFuture.sent(channel, request);
261+
// Wait for the timeout task to trigger
262+
Thread.sleep(300);
263+
Assertions.assertNull(DefaultFuture.getFuture(999), "Future should be removed from FUTURES after timeout");
264+
customExecutor.shutdown();
265+
}
266+
227267
@Test
228268
void testClose2() {
229269
Channel channel = new MockedChannel();

dubbo-remoting/dubbo-remoting-zookeeper-curator5/src/main/java/org/apache/dubbo/remoting/zookeeper/curator5/Curator5ZookeeperClient.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,10 @@ public List<ACL> getAclForPath(String path) {
9999
boolean connected = client.blockUntilConnected(timeout, TimeUnit.MILLISECONDS);
100100
boolean check = UrlUtils.isCheck(url);
101101
if (check && !connected) {
102-
IllegalStateException illegalStateException = new IllegalStateException("zookeeper not connected");
102+
// close CuratorFramework to stop re-connection.
103+
client.close();
104+
IllegalStateException illegalStateException =
105+
new IllegalStateException("zookeeper not connected, the address is: " + url);
103106

104107
// 5-1 Failed to connect to configuration center.
105108
logger.error(

dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiter.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,27 +39,32 @@ public class DefaultTPSLimiter implements TPSLimiter {
3939

4040
@Override
4141
public boolean isAllowable(URL url, Invocation invocation) {
42+
boolean isMethodLevelTpsConfigured =
43+
url.hasMethodParameter(RpcUtils.getMethodName(invocation), TPS_LIMIT_RATE_KEY);
44+
String key = isMethodLevelTpsConfigured
45+
? url.getServiceKey() + "#" + RpcUtils.getMethodName(invocation)
46+
: url.getServiceKey();
4247
int rate = url.getMethodParameter(RpcUtils.getMethodName(invocation), TPS_LIMIT_RATE_KEY, -1);
4348
long interval = url.getMethodParameter(
4449
RpcUtils.getMethodName(invocation), TPS_LIMIT_INTERVAL_KEY, DEFAULT_TPS_LIMIT_INTERVAL);
45-
String serviceKey = url.getServiceKey();
50+
4651
if (rate > 0) {
47-
StatItem statItem = stats.get(serviceKey);
52+
StatItem statItem = stats.get(key);
4853
if (statItem == null) {
49-
stats.putIfAbsent(serviceKey, new StatItem(serviceKey, rate, interval));
50-
statItem = stats.get(serviceKey);
54+
stats.putIfAbsent(key, new StatItem(key, rate, interval));
55+
statItem = stats.get(key);
5156
} else {
5257
// rate or interval has changed, rebuild
5358
if (statItem.getRate() != rate || statItem.getInterval() != interval) {
54-
stats.put(serviceKey, new StatItem(serviceKey, rate, interval));
55-
statItem = stats.get(serviceKey);
59+
stats.put(key, new StatItem(key, rate, interval));
60+
statItem = stats.get(key);
5661
}
5762
}
5863
return statItem.isAllowable();
5964
} else {
60-
StatItem statItem = stats.get(serviceKey);
65+
StatItem statItem = stats.get(key);
6166
if (statItem != null) {
62-
stats.remove(serviceKey);
67+
stats.remove(key);
6368
}
6469
}
6570

dubbo-rpc/dubbo-rpc-api/src/test/java/org/apache/dubbo/rpc/filter/tps/DefaultTPSLimiterTest.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,12 +65,52 @@ void testIsNotAllowable() {
6565
}
6666

6767
@Test
68-
void testTPSLimiterForMethodLevelConfig() {
68+
void testMethodLevelTpsOverridesServiceLevel() {
6969
Invocation invocation = new MockInvocation();
7070
URL url = URL.valueOf("test://test");
7171
url = url.addParameter(INTERFACE_KEY, "org.apache.dubbo.rpc.file.TpsService");
7272
url = url.addParameter(TPS_LIMIT_RATE_KEY, TEST_LIMIT_RATE);
7373
int tpsConfigForMethodLevel = 3;
74+
url = url.addParameter("tps", 1);
75+
url = url.addParameter("echo.tps", tpsConfigForMethodLevel);
76+
url = url.addParameter(TPS_LIMIT_INTERVAL_KEY, 1000);
77+
for (int i = 1; i <= tpsConfigForMethodLevel + 1; i++) {
78+
if (i == tpsConfigForMethodLevel + 1) {
79+
Assertions.assertFalse(defaultTPSLimiter.isAllowable(url, invocation));
80+
} else {
81+
Assertions.assertTrue(defaultTPSLimiter.isAllowable(url, invocation));
82+
}
83+
}
84+
}
85+
86+
@Test
87+
void testServiceLevelTpsWhenOtherMethodsHaveTps() {
88+
Invocation invocation = new MockInvocation();
89+
URL url = URL.valueOf("test://test");
90+
url = url.addParameter(INTERFACE_KEY, "org.apache.dubbo.rpc.file.TpsService");
91+
url = url.addParameter(TPS_LIMIT_RATE_KEY, TEST_LIMIT_RATE);
92+
int tpsConfigForServiceLevel = 3;
93+
url = url.addParameter("tps", tpsConfigForServiceLevel);
94+
url = url.addParameter("otherMethod.tps", 1);
95+
url = url.addParameter(TPS_LIMIT_INTERVAL_KEY, 1000);
96+
for (int i = 1; i <= tpsConfigForServiceLevel + 1; i++) {
97+
if (i == tpsConfigForServiceLevel + 1) {
98+
Assertions.assertFalse(defaultTPSLimiter.isAllowable(url, invocation));
99+
} else {
100+
Assertions.assertTrue(defaultTPSLimiter.isAllowable(url, invocation));
101+
}
102+
}
103+
}
104+
105+
@Test
106+
void testMethodLevelTpsIsolation() {
107+
Invocation invocation = new MockInvocation();
108+
URL url = URL.valueOf("test://test");
109+
url = url.addParameter(INTERFACE_KEY, "org.apache.dubbo.rpc.file.TpsService");
110+
url = url.addParameter(TPS_LIMIT_RATE_KEY, TEST_LIMIT_RATE);
111+
int tpsConfigForMethodLevel = 3;
112+
url = url.addParameter("tps", 1);
113+
url = url.addParameter("otherMethod.tps", 2);
74114
url = url.addParameter("echo.tps", tpsConfigForMethodLevel);
75115
url = url.addParameter(TPS_LIMIT_INTERVAL_KEY, 1000);
76116
for (int i = 1; i <= tpsConfigForMethodLevel + 1; i++) {

dubbo-test/dubbo-test-check/src/main/java/org/apache/dubbo/test/check/registrycenter/context/ZookeeperWindowsContext.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,11 @@ public class ZookeeperWindowsContext extends ZookeeperContext {
4949
*/
5050
private final ExecuteWatchdog WATCHDOG = new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT);
5151

52+
/**
53+
* Set it to TRUE when using WatchDog.
54+
*/
55+
private boolean usedWatchDog = false;
56+
5257
/**
5358
* The map to store the pair of clientPort and PID.
5459
*/
@@ -93,6 +98,7 @@ public ExecutorService getExecutorService() {
9398
* Returns the {@link ExecuteWatchdog}.
9499
*/
95100
public ExecuteWatchdog getWatchdog() {
101+
usedWatchDog = true;
96102
return WATCHDOG;
97103
}
98104

@@ -101,7 +107,10 @@ public ExecuteWatchdog getWatchdog() {
101107
*/
102108
public void destroy() {
103109
this.processIds.clear();
104-
this.WATCHDOG.destroyProcess();
110+
// check WatchDog used flag to avoid hanging at destroyProcess when WatchDog is not used.
111+
if (usedWatchDog) {
112+
this.WATCHDOG.destroyProcess();
113+
}
105114
try {
106115
DEFAULT_EXECUTOR_SERVICE.shutdownNow();
107116
} catch (SecurityException | NullPointerException ex) {

dubbo-test/dubbo-test-check/src/main/java/org/apache/dubbo/test/check/registrycenter/processor/ResetZookeeperProcessor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ public void process(Context context) throws DubboTestException {
4545
client.start();
4646
boolean connected = client.blockUntilConnected(1000, TimeUnit.MILLISECONDS);
4747
if (!connected) {
48+
// close CuratorFramework to stop re-connection.
49+
client.close();
4850
throw new IllegalStateException("zookeeper not connected");
4951
}
5052
client.delete().deletingChildrenIfNeeded().forPath("/dubbo");

0 commit comments

Comments
 (0)