Skip to content

Commit de151c3

Browse files
[Fix] Close OpenSearch client gracefully (#126)
* Close OpenSearch client gracefully Signed-off-by: rithin-pullela-aws <[email protected]> * Add changelog Signed-off-by: rithin-pullela-aws <[email protected]> --------- Signed-off-by: rithin-pullela-aws <[email protected]>
1 parent bd95634 commit de151c3

File tree

11 files changed

+501
-330
lines changed

11 files changed

+501
-330
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
.vscode/
99
pyrightconfig.json
1010

11+
# Kiro IDE
12+
.kiro/
13+
1114
# Coverage output
1215
.coverage
1316
.coverage.*

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
1010

1111
### Fixed
1212
- Fix Concurrency: Use Async OpenSearch client to improve concurrency ([#125](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/125))
13+
- [Fix] Close OpenSearch client gracefully ([#126](https://github.com/opensearch-project/opensearch-mcp-server-py/pull/126))
1314

1415
### Removed
1516

src/mcp_server_opensearch/clusters_information.py

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -116,17 +116,9 @@ async def load_clusters_from_yaml(file_path: str) -> None:
116116
ssl_verify=cluster_config.get('ssl_verify', None),
117117
opensearch_header_auth=cluster_config.get('opensearch_header_auth', None),
118118
)
119-
# Check if possible to connect to the cluster
120-
is_connected, error_message = await check_cluster_connection(cluster_info)
121-
if not is_connected:
122-
result['errors'].append(
123-
f"Error connecting to cluster '{cluster_name}': {error_message}"
124-
)
125-
continue
126-
else:
127-
# Add cluster to registry
128-
add_cluster(name=cluster_name, cluster_info=cluster_info)
129119

120+
# Add cluster to registry without checking connection
121+
add_cluster(name=cluster_name, cluster_info=cluster_info)
130122
result['loaded_clusters'].append(cluster_name)
131123

132124
except Exception as e:
@@ -141,23 +133,3 @@ async def load_clusters_from_yaml(file_path: str) -> None:
141133

142134
except yaml.YAMLError as e:
143135
raise yaml.YAMLError(f'Invalid YAML format in {file_path}: {str(e)}')
144-
145-
146-
async def check_cluster_connection(cluster_info: ClusterInfo) -> tuple[bool, str]:
147-
"""Check if the cluster is reachable by attempting to connect.
148-
149-
Args:
150-
cluster_info: ClusterInfo object containing cluster configuration
151-
152-
Returns:
153-
tuple[bool, str]: (True, "") if connection successful, (False, error_message) otherwise
154-
"""
155-
try:
156-
# Lazy import to avoid circular dependency
157-
from opensearch.client import _initialize_client_multi_mode
158-
159-
client = _initialize_client_multi_mode(cluster_info)
160-
await client.ping()
161-
return True, ''
162-
except Exception as e:
163-
return False, str(e)

src/mcp_server_opensearch/streaming_server.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ async def serve(
166166
app=app,
167167
host=host,
168168
port=port,
169+
timeout_graceful_shutdown=10,
169170
)
170171
server = uvicorn.Server(config)
171172
await server.serve()

src/opensearch/client.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
import boto3
1212
import logging
1313
import os
14-
from typing import Any, Dict, Optional
14+
from contextlib import asynccontextmanager
15+
from typing import Any, AsyncIterator, Dict, Optional
1516
from urllib.parse import urlparse
1617

1718
from mcp.server.lowlevel.server import request_ctx
@@ -99,6 +100,43 @@ def initialize_client(args: baseToolArgs) -> AsyncOpenSearch:
99100
raise ConfigurationError(f'Failed to initialize OpenSearch client: {e}')
100101

101102

103+
@asynccontextmanager
104+
async def get_opensearch_client(args: baseToolArgs) -> AsyncIterator[AsyncOpenSearch]:
105+
"""Async context manager for OpenSearch client lifecycle management.
106+
107+
This context manager ensures that OpenSearch clients are properly closed after use,
108+
preventing connection leaks and enabling graceful server shutdown.
109+
110+
Usage:
111+
async with get_opensearch_client(args) as client:
112+
# Use client for operations
113+
result = await client.info()
114+
115+
Args:
116+
args (baseToolArgs): Arguments containing optional opensearch_cluster_name
117+
118+
Yields:
119+
AsyncOpenSearch: An initialized OpenSearch client instance
120+
121+
Raises:
122+
ConfigurationError: If in multi mode but no cluster name provided or invalid mode
123+
AuthenticationError: If authentication fails
124+
"""
125+
client = None
126+
try:
127+
logger.debug('Creating OpenSearch client')
128+
client = initialize_client(args)
129+
yield client
130+
finally:
131+
if client is not None:
132+
try:
133+
logger.debug('Closing OpenSearch client')
134+
await client.close()
135+
except Exception as e:
136+
# Log but don't propagate cleanup errors to avoid masking original errors
137+
logger.warning(f'Error closing OpenSearch client: {e}')
138+
139+
102140
# Private Implementation Functions
103141
def _initialize_client_single_mode() -> AsyncOpenSearch:
104142
"""Initialize OpenSearch client for single mode using environment variables.

0 commit comments

Comments
 (0)