-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
99 lines (74 loc) · 3.39 KB
/
Copy pathserver.py
File metadata and controls
99 lines (74 loc) · 3.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import logging
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from dataclasses import dataclass
from typing import Any
from mcp.server.fastmcp import FastMCP
from mcp.types import ImageContent
from mcp_server_nps.client import NPSClient
logger = logging.getLogger(__name__)
@dataclass
class AppContext:
nps_client: NPSClient
@asynccontextmanager
async def app_lifespan(_: FastMCP) -> AsyncIterator[AppContext]:
nps_client = NPSClient()
yield AppContext(nps_client=nps_client)
mcp = FastMCP("Netwrix Privilege Secure", lifespan=app_lifespan)
@mcp.tool()
async def get_active_sessions() -> list[dict[str, Any]]:
"""Get active activity sessions."""
ctx = mcp.get_context()
nps_client: NPSClient = ctx.request_context.lifespan_context.nps_client
sessions = await nps_client.get_active_sessions()
return sessions
@mcp.tool()
async def get_active_session_image(session_id: str) -> ImageContent:
"""Get an image of for an active session."""
ctx = mcp.get_context()
nps_client: NPSClient = ctx.request_context.lifespan_context.nps_client
image_base64 = await nps_client.get_live_image_by_session(session_id)
image_content = ImageContent(data=image_base64, mimeType="image/jpeg", type="image")
return image_content
@mcp.tool()
async def search_metadata_history(search_term: str) -> list[dict[str, Any]]:
"""Search for commands run by users on computers during their sessions in the metadata history."""
ctx = mcp.get_context()
nps_client: NPSClient = ctx.request_context.lifespan_context.nps_client
history = await nps_client.search_metadata_history(search_term)
return history
@mcp.tool()
async def get_admin_credentials_older_than(days: int) -> list[dict[str, Any]]:
"""Find administrators that have passwords older than the specified number of days."""
ctx = mcp.get_context()
nps_client: NPSClient = ctx.request_context.lifespan_context.nps_client
admin_passwords = await nps_client.get_credentials_older_than(days, privilege_type=1)
return admin_passwords
@mcp.tool()
async def get_managed_credentials_older_than(days: int) -> list[dict[str, Any]]:
"""Find all managed passwords that have passwords older than the specified number of days."""
ctx = mcp.get_context()
nps_client: NPSClient = ctx.request_context.lifespan_context.nps_client
managed_passwords = await nps_client.get_credentials_older_than(days, managed_filter=1)
return managed_passwords
@mcp.tool()
async def get_events_from_server(search_text: str) -> list[dict[str, Any]]:
"""Find events that have occurred use search text to filter the events that contain that words in the search text."""
"""The search text is assumed to be a contiguous word, to find failed logins, search for 'login failure'"""
"""Limited to the latest 10 events"""
ctx = mcp.get_context()
nps_client: NPSClient = ctx.request_context.lifespan_context.nps_client
events = await nps_client.get_events(search_text)
return events
@mcp.tool()
async def get_nps_version() -> str:
"""Get the version of NPS."""
ctx = mcp.get_context()
nps_client: NPSClient = ctx.request_context.lifespan_context.nps_client
version = await nps_client.get_version()
return version
if __name__ == "__main__":
try:
mcp.run(transport="stdio")
except Exception as e:
logger.error(f"Error running MCP server: {e}")