Skip to content

Commit 95d3b34

Browse files
authored
Merge branch 'main' into feature/remove-unused-redis-rs-features
2 parents 5c321fd + 10a2a41 commit 95d3b34

File tree

8 files changed

+161
-166
lines changed

8 files changed

+161
-166
lines changed

.github/json_matrices/engine-matrix.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
[
22
{
33
"type": "valkey",
4-
"version": "8.0",
4+
"version": "8.1",
55
"run": "always"
66
},
7+
{
8+
"type": "valkey",
9+
"version": "8.0"
10+
},
711
{
812
"type": "valkey",
913
"version": "7.2"

glide-core/THIRD_PARTY_LICENSES_RUST

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30393,7 +30393,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3039330393

3039430394
----
3039530395

30396-
Package: socket2:0.5.9
30396+
Package: socket2:0.5.10
3039730397

3039830398
The following copyrights and licenses were found in the source code of this package:
3039930399

glide-core/redis-rs/redis/src/cluster_async/mod.rs

Lines changed: 110 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -706,7 +706,8 @@ struct Message<C: Sized> {
706706
}
707707

708708
enum RecoverFuture {
709-
RecoverSlots(BoxFuture<'static, RedisResult<()>>),
709+
RefreshingSlots(JoinHandle<RedisResult<()>>),
710+
ReconnectToInitialNodes(BoxFuture<'static, ()>),
710711
Reconnect(BoxFuture<'static, ()>),
711712
}
712713

@@ -1570,6 +1571,20 @@ where
15701571
notifiers
15711572
}
15721573

1574+
fn spawn_refresh_slots_task(
1575+
inner: Arc<InnerCore<C>>,
1576+
policy: &RefreshPolicy,
1577+
) -> JoinHandle<RedisResult<()>> {
1578+
// Clone references for task
1579+
let inner_clone = inner.clone();
1580+
let policy_clone = policy.clone();
1581+
1582+
// Spawn the background task and return its handle
1583+
tokio::spawn(async move {
1584+
Self::refresh_slots_and_subscriptions_with_retries(inner_clone, &policy_clone).await
1585+
})
1586+
}
1587+
15731588
async fn aggregate_results(
15741589
receivers: Vec<(Option<String>, oneshot::Receiver<RedisResult<Response>>)>,
15751590
routing: &MultipleNodeRoutingInfo,
@@ -2713,36 +2728,93 @@ where
27132728
}
27142729

27152730
fn poll_recover(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), RedisError>> {
2716-
trace!("entered poll_recovere");
2731+
trace!("entered poll_recover");
2732+
27172733
let recover_future = match &mut self.state {
27182734
ConnectionState::PollComplete => return Poll::Ready(Ok(())),
27192735
ConnectionState::Recover(future) => future,
27202736
};
2737+
27212738
match recover_future {
2722-
RecoverFuture::RecoverSlots(ref mut future) => match ready!(future.as_mut().poll(cx)) {
2723-
Ok(_) => {
2724-
trace!("Recovered!");
2725-
self.state = ConnectionState::PollComplete;
2726-
Poll::Ready(Ok(()))
2727-
}
2728-
Err(err) => {
2729-
trace!("Recover slots failed!");
2730-
let next_state = if err.kind() == ErrorKind::AllConnectionsUnavailable {
2731-
ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
2732-
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
2733-
)))
2734-
} else {
2735-
ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
2736-
Self::refresh_slots_and_subscriptions_with_retries(
2739+
RecoverFuture::RefreshingSlots(handle) => {
2740+
// Check if the task has completed
2741+
match handle.now_or_never() {
2742+
Some(Ok(Ok(()))) => {
2743+
// Task succeeded
2744+
trace!("Slot refresh completed successfully!");
2745+
self.state = ConnectionState::PollComplete;
2746+
return Poll::Ready(Ok(()));
2747+
}
2748+
Some(Ok(Err(e))) => {
2749+
// Task completed but returned an engine error
2750+
trace!("Slot refresh failed: {:?}", e);
2751+
2752+
if e.kind() == ErrorKind::AllConnectionsUnavailable {
2753+
// If all connections unavailable, try reconnect
2754+
self.state =
2755+
ConnectionState::Recover(RecoverFuture::ReconnectToInitialNodes(
2756+
Box::pin(ClusterConnInner::reconnect_to_initial_nodes(
2757+
self.inner.clone(),
2758+
)),
2759+
));
2760+
return Poll::Ready(Err(e));
2761+
} else {
2762+
// Retry refresh
2763+
let new_handle = Self::spawn_refresh_slots_task(
27372764
self.inner.clone(),
27382765
&RefreshPolicy::Throttable,
2739-
),
2740-
)))
2741-
};
2742-
self.state = next_state;
2743-
Poll::Ready(Err(err))
2766+
);
2767+
self.state = ConnectionState::Recover(RecoverFuture::RefreshingSlots(
2768+
new_handle,
2769+
));
2770+
return Poll::Ready(Ok(()));
2771+
}
2772+
}
2773+
Some(Err(join_err)) => {
2774+
if join_err.is_cancelled() {
2775+
// Task was intentionally aborted - don't treat as an error
2776+
trace!("Slot refresh task was aborted");
2777+
self.state = ConnectionState::PollComplete;
2778+
return Poll::Ready(Ok(()));
2779+
} else {
2780+
// Task panicked - try reconnecting to initial nodes as a recovery strategy
2781+
warn!("Slot refresh task panicked: {:?} - attempting recovery by reconnecting to initial nodes", join_err);
2782+
2783+
// TODO - consider a gracefully closing of the client
2784+
// Since a panic indicates a bug in the refresh logic,
2785+
// it might be safer to close the client entirely
2786+
self.state =
2787+
ConnectionState::Recover(RecoverFuture::ReconnectToInitialNodes(
2788+
Box::pin(ClusterConnInner::reconnect_to_initial_nodes(
2789+
self.inner.clone(),
2790+
)),
2791+
));
2792+
2793+
// Report this critical error to clients
2794+
let err = RedisError::from((
2795+
ErrorKind::ClientError,
2796+
"Slot refresh task panicked",
2797+
format!("{:?}", join_err),
2798+
));
2799+
return Poll::Ready(Err(err));
2800+
}
2801+
}
2802+
None => {
2803+
// Task is still running
2804+
// Just continue and return Ok to not block poll_flush
2805+
}
27442806
}
2745-
},
2807+
2808+
// Always return Ready to not block poll_flush
2809+
Poll::Ready(Ok(()))
2810+
}
2811+
// Other cases remain unchanged
2812+
RecoverFuture::ReconnectToInitialNodes(ref mut future) => {
2813+
ready!(future.as_mut().poll(cx));
2814+
trace!("Reconnected to initial nodes");
2815+
self.state = ConnectionState::PollComplete;
2816+
Poll::Ready(Ok(()))
2817+
}
27462818
RecoverFuture::Reconnect(ref mut future) => {
27472819
ready!(future.as_mut().poll(cx));
27482820
trace!("Reconnected connections");
@@ -3016,12 +3088,21 @@ where
30163088
match ready!(self.poll_complete(cx)) {
30173089
PollFlushAction::None => return Poll::Ready(Ok(())),
30183090
PollFlushAction::RebuildSlots => {
3019-
self.state = ConnectionState::Recover(RecoverFuture::RecoverSlots(Box::pin(
3020-
ClusterConnInner::refresh_slots_and_subscriptions_with_retries(
3021-
self.inner.clone(),
3022-
&RefreshPolicy::Throttable,
3023-
),
3024-
)));
3091+
// Spawn refresh task
3092+
let task_handle = ClusterConnInner::spawn_refresh_slots_task(
3093+
self.inner.clone(),
3094+
&RefreshPolicy::Throttable,
3095+
);
3096+
3097+
// Update state
3098+
self.state =
3099+
ConnectionState::Recover(RecoverFuture::RefreshingSlots(task_handle));
3100+
}
3101+
PollFlushAction::ReconnectFromInitialConnections => {
3102+
self.state =
3103+
ConnectionState::Recover(RecoverFuture::ReconnectToInitialNodes(Box::pin(
3104+
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
3105+
)));
30253106
}
30263107
PollFlushAction::Reconnect(addresses) => {
30273108
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
@@ -3034,11 +3115,6 @@ where
30343115
.map(|_| ()), // Convert Vec<Arc<Notify>> to () as it's not needed here
30353116
)));
30363117
}
3037-
PollFlushAction::ReconnectFromInitialConnections => {
3038-
self.state = ConnectionState::Recover(RecoverFuture::Reconnect(Box::pin(
3039-
ClusterConnInner::reconnect_to_initial_nodes(self.inner.clone()),
3040-
)));
3041-
}
30423118
}
30433119
}
30443120
}

glide-core/redis-rs/redis/tests/test_cluster_async.rs

Lines changed: 17 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -1649,119 +1649,28 @@ mod cluster_async {
16491649

16501650
#[test]
16511651
#[serial_test::serial]
1652-
fn test_async_cluster_refresh_topology_even_with_zero_retries() {
1653-
let name = "test_async_cluster_refresh_topology_even_with_zero_retries";
1654-
1655-
let should_refresh = atomic::AtomicBool::new(false);
1656-
1657-
let MockEnv {
1658-
runtime,
1659-
async_connection: mut connection,
1660-
handler: _handler,
1661-
..
1662-
} = MockEnv::with_client_builder(
1663-
ClusterClient::builder(vec![&*format!("redis://{name}")]).retries(0)
1664-
// Disable the rate limiter to refresh slots immediately on the MOVED error.
1665-
.slots_refresh_rate_limit(Duration::from_secs(0), 0),
1666-
name,
1667-
move |cmd: &[u8], port| {
1668-
if !should_refresh.load(atomic::Ordering::SeqCst) {
1669-
respond_startup(name, cmd)?;
1670-
}
1671-
1672-
if contains_slice(cmd, b"PING") || contains_slice(cmd, b"SETNAME") {
1673-
return Err(Ok(Value::SimpleString("OK".into())));
1674-
}
1675-
1676-
if contains_slice(cmd, b"CLUSTER") && contains_slice(cmd, b"SLOTS") {
1677-
return Err(Ok(Value::Array(vec![
1678-
Value::Array(vec![
1679-
Value::Int(0),
1680-
Value::Int(1),
1681-
Value::Array(vec![
1682-
Value::BulkString(name.as_bytes().to_vec()),
1683-
Value::Int(6379),
1684-
]),
1685-
]),
1686-
Value::Array(vec![
1687-
Value::Int(2),
1688-
Value::Int(16383),
1689-
Value::Array(vec![
1690-
Value::BulkString(name.as_bytes().to_vec()),
1691-
Value::Int(6380),
1692-
]),
1693-
]),
1694-
])));
1695-
}
1696-
1697-
if contains_slice(cmd, b"GET") {
1698-
let get_response = Err(Ok(Value::BulkString(b"123".to_vec())));
1699-
match port {
1700-
6380 => get_response,
1701-
// Respond that the key exists on a node that does not yet have a connection:
1702-
_ => {
1703-
// Should not attempt to refresh slots more than once:
1704-
assert!(!should_refresh.swap(true, Ordering::SeqCst));
1705-
Err(parse_redis_value(
1706-
format!("-MOVED 123 {name}:6380\r\n").as_bytes(),
1707-
))
1708-
}
1709-
}
1710-
} else {
1711-
panic!("unexpected command {cmd:?}")
1712-
}
1713-
},
1714-
);
1715-
1716-
let value = runtime.block_on(
1717-
cmd("GET")
1718-
.arg("test")
1719-
.query_async::<_, Option<i32>>(&mut connection),
1720-
);
1721-
1722-
// The user should receive an initial error, because there are no retries and the first request failed.
1723-
assert_eq!(
1724-
value,
1725-
Err(RedisError::from((
1726-
ErrorKind::Moved,
1727-
"An error was signalled by the server",
1728-
"test_async_cluster_refresh_topology_even_with_zero_retries:6380".to_string()
1729-
)))
1730-
);
1731-
1732-
let value = runtime.block_on(
1733-
cmd("GET")
1734-
.arg("test")
1735-
.query_async::<_, Option<i32>>(&mut connection),
1736-
);
1737-
1738-
assert_eq!(value, Ok(Some(123)));
1739-
}
1740-
1741-
#[test]
1742-
#[serial_test::serial]
1743-
fn test_async_cluster_refresh_topology_is_blocking() {
1744-
// Test: Head-of-Line Blocking During Slot Refresh
1652+
fn test_async_cluster_refresh_topology_is_not_blocking() {
1653+
// Test: Non-Head-of-Line Blocking During Slot Refresh
17451654
//
17461655
// This test verifies that during cluster topology refresh operations triggered by
1747-
// MOVED errors, the implementation exhibits head-of-line blocking behavior.
1748-
// When a client receives a MOVED error (indicating topology changes), it needs to
1749-
// refresh its slot mapping. This process blocks all subsequent commands until the
1750-
// refresh completes.
1656+
// MOVED errors, the implementation does not exhibit head-of-line blocking behavior.
1657+
// When a client receives a MOVED error (indicating topology changes), it refreshes
1658+
// its slot mapping in the background, allowing other commands to proceed concurrently.
17511659
//
1752-
// The test employs the following strategy to verify the blocking behavior:
1660+
// The test employs the following strategy to verify the non-blocking behavior:
17531661
//
17541662
// 1. Trigger Slot Refresh: Send a blocking BLPOP command that will receive a MOVED error when
17551663
// slot 0 is migrated, initiating a topology refresh operation.
17561664
//
17571665
// 2. Atomicly migrate slot and pause clients: Use SET SLOT and CLIENT PAUSE to artificially delay the node's
17581666
// response during the refresh operation.
17591667
//
1760-
// 3. Verify Blocking Behavior: While the refresh is in progress, send a GET command
1761-
// to a different node in the cluster and verify it times out due to being blocked.
1668+
// 3. Verify Non-Blocking Behavior: While the refresh is in progress, send a GET command
1669+
// to a different node in the cluster. Unlike the blocking implementation, this command
1670+
// should complete successfully without timing out.
17621671
//
1763-
// This test intentionally demonstrates how topology refresh operations can block
1764-
// subsequent commands, even those directed to healthy nodes in the cluster.
1672+
// This test intentionally demonstrates how topology refresh operations is no longer blocking
1673+
// subsequent commands.
17651674

17661675
// Create a cluster with 3 nodes
17671676
let cluster = TestClusterContext::new_with_cluster_client_builder(
@@ -1818,16 +1727,14 @@ mod cluster_async {
18181727
// This GET should time out as it's blocked by the topology refresh
18191728
let get_result = tokio::time::timeout(
18201729
Duration::from_millis(1000),
1821-
client1.get::<_, redis::Value>(other_shard_key),
1730+
client1.get::<_, String>(other_shard_key),
18221731
)
18231732
.await;
18241733

1825-
// Assert that we got a timeout error due to head-of-line blocking
1826-
assert!(get_result.is_err());
1827-
assert!(matches!(
1828-
get_result.unwrap_err(),
1829-
tokio::time::error::Elapsed { .. }
1830-
));
1734+
// Assert that the GET succeeded (no timeout or error)
1735+
assert!(get_result.is_ok());
1736+
let result = get_result.unwrap().unwrap();
1737+
assert_eq!(result, "value2");
18311738

18321739
true
18331740
});
@@ -1868,7 +1775,7 @@ mod cluster_async {
18681775

18691776
assert!(
18701777
result,
1871-
"The test should pass, demonstrating blocking behavior"
1778+
"The test should pass, demonstrating non blocking behavior"
18721779
);
18731780

18741781
Ok::<_, RedisError>(())

java/THIRD_PARTY_LICENSES_JAVA

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30626,7 +30626,7 @@ SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
3062630626

3062730627
----
3062830628

30629-
Package: socket2:0.5.9
30629+
Package: socket2:0.5.10
3063030630

3063130631
The following copyrights and licenses were found in the source code of this package:
3063230632

0 commit comments

Comments
 (0)