Skip to content

Commit 23f01fa

Browse files
feat(mcp): cluster-aware connections + client_name (#906)
* feat(mcp): cluster-aware connections + client_name support Adds Redis Cluster support and connection metadata to the MCP server: Cluster mode (--cluster / REDIS_CLUSTER env): - Uses redis::cluster_async::ClusterConnection for automatic MOVED/ASK redirection handling - Single URL serves as seed node; client discovers cluster topology - RedisConnection enum wraps both Standalone and Cluster connections, implementing ConnectionLike so all existing tools work transparently Client name (--client-name / REDIS_CLIENT_NAME env): - Sends CLIENT SETNAME on every new connection (default: "redisctl-mcp") - MCP connections are identifiable in CLIENT LIST and slowlog Both options work with profiles and dynamic --database-url connections. Closes #904, closes #905 (client_name portion). * style: cargo fmt
1 parent c48cbd5 commit 23f01fa

File tree

8 files changed

+167
-13
lines changed

8 files changed

+167
-13
lines changed

Cargo.lock

Lines changed: 9 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ serial_test = "3.1"
9696
pretty_assertions = "1.4"
9797

9898
# Redis client
99-
redis = { version = "0.27", features = ["tokio-comp", "tokio-rustls-comp", "connection-manager"] }
99+
redis = { version = "0.27", features = ["tokio-comp", "tokio-rustls-comp", "connection-manager", "cluster-async"] }
100100

101101
# External crates (git for dev, version required for crates.io publish)
102102
redis-cloud = { version = "0.9.5", git = "https://github.com/redis-developer/redis-cloud-rs", branch = "main" }

crates/redisctl-mcp/src/lib.rs

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
//! CredentialSource::Profiles(vec!["default".to_string()]),
4242
//! policy,
4343
//! None, // no database URL
44+
//! false, // cluster mode
45+
//! Some("redisctl-mcp".to_string()), // client name
4446
//! )?);
4547
//!
4648
//! // Use merge to compose sub-routers
@@ -118,8 +120,14 @@ mod tests {
118120
std::collections::HashMap::new(),
119121
"test".to_string(),
120122
));
121-
let state =
122-
AppState::new(CredentialSource::Profiles(vec![]), read_only_policy, None).unwrap();
123+
let state = AppState::new(
124+
CredentialSource::Profiles(vec![]),
125+
read_only_policy,
126+
None,
127+
false,
128+
None,
129+
)
130+
.unwrap();
123131

124132
assert!(!state.is_write_allowed());
125133
}
@@ -135,7 +143,14 @@ mod tests {
135143
std::collections::HashMap::new(),
136144
"test".to_string(),
137145
));
138-
let state = AppState::new(CredentialSource::Profiles(vec![]), write_policy, None).unwrap();
146+
let state = AppState::new(
147+
CredentialSource::Profiles(vec![]),
148+
write_policy,
149+
None,
150+
false,
151+
None,
152+
)
153+
.unwrap();
139154

140155
assert!(state.is_write_allowed());
141156
}
@@ -146,6 +161,8 @@ mod tests {
146161
CredentialSource::Profiles(vec![]),
147162
AppState::test_policy(),
148163
Some("redis://localhost:6379".to_string()),
164+
false,
165+
None,
149166
)
150167
.unwrap();
151168

@@ -164,6 +181,8 @@ mod tests {
164181
]),
165182
AppState::test_policy(),
166183
None,
184+
false,
185+
None,
167186
)
168187
.unwrap();
169188

@@ -181,6 +200,8 @@ mod tests {
181200
CredentialSource::Profiles(vec![]),
182201
AppState::test_policy(),
183202
None,
203+
false,
204+
None,
184205
)
185206
.unwrap(),
186207
);
@@ -231,6 +252,8 @@ mod tests {
231252
CredentialSource::Profiles(vec![]),
232253
AppState::test_policy(),
233254
None,
255+
false,
256+
None,
234257
)
235258
.unwrap(),
236259
);
@@ -293,6 +316,8 @@ mod tests {
293316
CredentialSource::Profiles(vec![]),
294317
AppState::test_policy(),
295318
Some("redis://localhost:6379".to_string()),
319+
false,
320+
None,
296321
)
297322
.unwrap(),
298323
);
@@ -316,6 +341,8 @@ mod tests {
316341
CredentialSource::Profiles(vec![]),
317342
AppState::test_policy(),
318343
None,
344+
false,
345+
None,
319346
)
320347
.unwrap(),
321348
);

crates/redisctl-mcp/src/main.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,14 @@ struct Args {
180180
#[arg(long, env = "REDIS_URL")]
181181
database_url: Option<String>,
182182

183+
/// Enable Redis Cluster mode (handles MOVED/ASK redirections)
184+
#[arg(long, env = "REDIS_CLUSTER")]
185+
cluster: bool,
186+
187+
/// Client name for CLIENT SETNAME (identifies MCP connections in CLIENT LIST)
188+
#[arg(long, env = "REDIS_CLIENT_NAME", default_value = "redisctl-mcp")]
189+
client_name: Option<String>,
190+
183191
/// Toolsets to enable (default: all compiled-in).
184192
/// Use bare names for all sub-modules: cloud,enterprise,database,app.
185193
/// Use colon syntax for specific sub-modules: cloud:subscriptions,cloud:networking.
@@ -551,6 +559,8 @@ async fn main() -> Result<()> {
551559
credential_source,
552560
policy.clone(),
553561
args.database_url.clone(),
562+
args.cluster,
563+
args.client_name.clone(),
554564
)?);
555565

556566
// Resolve skills directory
@@ -1023,6 +1033,8 @@ mod tests {
10231033
state::CredentialSource::Profiles(vec![]),
10241034
AppState::test_policy(),
10251035
None,
1036+
false,
1037+
None,
10261038
)
10271039
.unwrap(),
10281040
)

crates/redisctl-mcp/src/state.rs

Lines changed: 59 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ pub struct CachedClients {
3636
#[cfg(feature = "enterprise")]
3737
pub enterprise: HashMap<String, EnterpriseClient>,
3838
#[cfg(feature = "database")]
39-
pub database: HashMap<String, redis::aio::MultiplexedConnection>,
39+
pub database: HashMap<String, crate::tools::redis::RedisConnection>,
4040
}
4141

4242
/// Shared application state
@@ -47,6 +47,10 @@ pub struct AppState {
4747
pub policy: Arc<Policy>,
4848
/// Optional Redis database URL for direct connections
4949
pub database_url: Option<String>,
50+
/// Enable Redis Cluster mode (handles MOVED/ASK redirections)
51+
pub cluster: bool,
52+
/// Client name for CLIENT SETNAME (identifies connections in CLIENT LIST)
53+
pub client_name: Option<String>,
5054
/// redisctl config (for profile-based auth)
5155
config: Option<Config>,
5256
/// Configured profiles (for multi-cluster support)
@@ -65,6 +69,8 @@ impl AppState {
6569
credential_source: CredentialSource,
6670
policy: Arc<Policy>,
6771
database_url: Option<String>,
72+
cluster: bool,
73+
client_name: Option<String>,
6874
) -> Result<Self> {
6975
// Extract profiles list
7076
let profiles = match &credential_source {
@@ -82,6 +88,8 @@ impl AppState {
8288
credential_source,
8389
policy,
8490
database_url,
91+
cluster,
92+
client_name,
8593
config,
8694
profiles,
8795
clients: RwLock::new(CachedClients {
@@ -352,11 +360,16 @@ impl AppState {
352360
///
353361
/// Connections are cached by URL. If a cached connection fails a PING
354362
/// health check, it is evicted and a fresh connection is created.
363+
///
364+
/// Returns a `RedisConnection` (standalone or cluster) based on the
365+
/// `cluster` flag in AppState.
355366
#[cfg(feature = "database")]
356367
pub async fn redis_connection_for_url(
357368
&self,
358369
url: &str,
359-
) -> Result<redis::aio::MultiplexedConnection> {
370+
) -> Result<crate::tools::redis::RedisConnection> {
371+
use crate::tools::redis::RedisConnection;
372+
360373
// Check cache first
361374
{
362375
let clients = self.clients.read().await;
@@ -375,11 +388,42 @@ impl AppState {
375388
}
376389

377390
// Create new connection (or reconnect after eviction)
378-
let client = redis::Client::open(url).context("Failed to create Redis client")?;
379-
let conn = client
380-
.get_multiplexed_async_connection()
381-
.await
382-
.context("Failed to connect to Redis")?;
391+
let conn = if self.cluster {
392+
let client = redis::cluster::ClusterClient::new(vec![url])
393+
.context("Failed to create Redis cluster client")?;
394+
let mut cluster_conn = client
395+
.get_async_connection()
396+
.await
397+
.context("Failed to connect to Redis cluster")?;
398+
399+
// Set client name if configured
400+
if let Some(ref name) = self.client_name {
401+
let _ = redis::cmd("CLIENT")
402+
.arg("SETNAME")
403+
.arg(name)
404+
.query_async::<String>(&mut cluster_conn)
405+
.await;
406+
}
407+
408+
RedisConnection::Cluster(cluster_conn)
409+
} else {
410+
let client = redis::Client::open(url).context("Failed to create Redis client")?;
411+
let mut standalone_conn = client
412+
.get_multiplexed_async_connection()
413+
.await
414+
.context("Failed to connect to Redis")?;
415+
416+
// Set client name if configured
417+
if let Some(ref name) = self.client_name {
418+
let _ = redis::cmd("CLIENT")
419+
.arg("SETNAME")
420+
.arg(name)
421+
.query_async::<String>(&mut standalone_conn)
422+
.await;
423+
}
424+
425+
RedisConnection::Standalone(standalone_conn)
426+
};
383427

384428
// Cache it
385429
{
@@ -449,6 +493,8 @@ impl Clone for AppState {
449493
credential_source: self.credential_source.clone(),
450494
policy: self.policy.clone(),
451495
database_url: self.database_url.clone(),
496+
cluster: self.cluster,
497+
client_name: self.client_name.clone(),
452498
config: self.config.clone(),
453499
profiles: self.profiles.clone(),
454500
clients: RwLock::new(CachedClients {
@@ -486,6 +532,8 @@ impl AppState {
486532
credential_source: CredentialSource::Profiles(vec![]),
487533
policy: Self::test_policy(),
488534
database_url: None,
535+
cluster: false,
536+
client_name: None,
489537
config: None,
490538
profiles: vec![],
491539
clients: RwLock::new(CachedClients {
@@ -509,6 +557,8 @@ impl AppState {
509557
credential_source: CredentialSource::Profiles(vec![]),
510558
policy: Self::test_policy(),
511559
database_url: None,
560+
cluster: false,
561+
client_name: None,
512562
config: None,
513563
profiles: vec![],
514564
clients: RwLock::new(CachedClients {
@@ -534,6 +584,8 @@ impl AppState {
534584
credential_source: CredentialSource::Profiles(vec![]),
535585
policy: Self::test_policy(),
536586
database_url: None,
587+
cluster: false,
588+
client_name: None,
537589
config: None,
538590
profiles: vec![],
539591
clients: RwLock::new(CachedClients {
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
//! Unified Redis connection type supporting both standalone and cluster modes.
2+
3+
use redis::aio::{ConnectionLike, MultiplexedConnection};
4+
use redis::cluster_async::ClusterConnection;
5+
use redis::{Cmd, RedisFuture, Value};
6+
7+
/// A Redis connection that transparently handles both standalone and cluster topologies.
8+
///
9+
/// Implements `ConnectionLike` so it can be used anywhere a Redis connection is expected
10+
/// (`cmd.query_async()`, `pipe.query_async()`, etc.).
11+
#[derive(Clone)]
12+
pub enum RedisConnection {
13+
Standalone(MultiplexedConnection),
14+
Cluster(ClusterConnection),
15+
}
16+
17+
impl ConnectionLike for RedisConnection {
18+
fn req_packed_command<'a>(&'a mut self, cmd: &'a Cmd) -> RedisFuture<'a, Value> {
19+
match self {
20+
RedisConnection::Standalone(conn) => conn.req_packed_command(cmd),
21+
RedisConnection::Cluster(conn) => conn.req_packed_command(cmd),
22+
}
23+
}
24+
25+
fn req_packed_commands<'a>(
26+
&'a mut self,
27+
cmd: &'a redis::Pipeline,
28+
offset: usize,
29+
count: usize,
30+
) -> RedisFuture<'a, Vec<Value>> {
31+
match self {
32+
RedisConnection::Standalone(conn) => conn.req_packed_commands(cmd, offset, count),
33+
RedisConnection::Cluster(conn) => conn.req_packed_commands(cmd, offset, count),
34+
}
35+
}
36+
37+
fn get_db(&self) -> i64 {
38+
match self {
39+
RedisConnection::Standalone(conn) => conn.get_db(),
40+
RedisConnection::Cluster(conn) => conn.get_db(),
41+
}
42+
}
43+
}

crates/redisctl-mcp/src/tools/redis/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
33
mod aliases;
44
mod bulk;
5+
mod connection;
56
mod diagnostics;
67
mod json;
78
mod keys;
@@ -10,6 +11,8 @@ mod search;
1011
mod server;
1112
mod structures;
1213

14+
pub(crate) use connection::RedisConnection;
15+
1316
#[allow(unused_imports)]
1417
pub use aliases::*;
1518
#[allow(unused_imports)]
@@ -145,15 +148,15 @@ pub(crate) fn resolve_redis_url(
145148
})
146149
}
147150

148-
/// Resolve a Redis URL and return a cached multiplexed connection.
151+
/// Resolve a Redis URL and return a cached connection (standalone or cluster).
149152
///
150153
/// This is the main entry point for tool handlers. It resolves the URL from
151154
/// the input parameters, then returns a pooled connection (creating one if needed).
152155
pub(crate) async fn get_connection(
153156
url: Option<String>,
154157
profile: Option<&str>,
155158
state: &AppState,
156-
) -> Result<redis::aio::MultiplexedConnection, ToolError> {
159+
) -> Result<RedisConnection, ToolError> {
157160
let url = resolve_redis_url(url, profile, state)?;
158161
state
159162
.redis_connection_for_url(&url)

0 commit comments

Comments
 (0)