forked from inmydata/AgentMCP
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver.py
More file actions
264 lines (230 loc) · 11.1 KB
/
Copy pathserver.py
File metadata and controls
264 lines (230 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
import os
import json
from typing import Optional, List, Dict, Any
from dotenv import load_dotenv
from inmydata_openedge.StructuredData import StructuredDataDriver, AIDataFilter, LogicalOperator, ConditionOperator, TopNOption
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp import Context
from mcp_utils import mcp_utils
load_dotenv(".env", override=True)
# Enable debug mode only when manually set
if os.getenv("MCP_DEBUG", "0") == "1":
import debugpy
debugpy.listen(("localhost", 5678))
print("MCP server waiting for VS Code debugger on port 5678...")
debugpy.wait_for_client()
print("Debugger attached. Continuing execution.")
mcp = FastMCP("inmydata-agent-server")
def utils():
try:
api_key = os.environ.get('INMYDATA_API_KEY', "")
tenant = os.environ.get('INMYDATA_TENANT', "")
server = os.environ.get('INMYDATA_SERVER',"inmydata.com")
calendar = os.environ.get('INMYDATA_CALENDAR',"default")
user = os.environ.get('INMYDATA_USER', 'mcp-agent')
session_id = os.environ.get('INMYDATA_SESSION_ID', 'mcp-session')
return mcp_utils(api_key, tenant, calendar, user, session_id, server,"OpenEdge")
except Exception as e:
raise RuntimeError(f"Error initializing mcp_utils: {e}")
@mcp.tool()
async def get_rows_fast(
subject: str = "",
select: List[str] = [],
where: List[Dict[str, Any]] = [],
summary: bool = True,
system: str = "",
ctx: Optional[Context] = None
) -> str:
"""
FAST PATH (recommended).
Use when the request names specific fields and simple filters (no free-form reasoning).
Returns rows immediately from the warehouse; far faster and cheaper than get_answer.
If the output json contains property row_count wil a value less than or equal
to 10 then the data property can be used as the answer.
If the output json contains a non blank value for the instance_id property
then the data property will only contain a sample of the data and the full data set
can be found in a table named my_table in a DuckDB database file saved on disk.
In that case you MUST use the query_results_fast tool to query the results with SQL
to get the data you need to answer the question. This is the only way to access larger datasets.
Examples:
- "Give me the specific average transaction value and profit margin percentage for each region in 2025"
-> get_rows(
subject="Sales",
select=["Region", "Average Transaction Value", "Profit Margin %"],
where=[{"field":"Financial Year","op":"equals","value":2025, "logical":"AND"}],
summary=True,
system="sports2000"
)
where items: [{"field":"Region","op":"equals","value":"North","logical":"AND"}, {"field":"Sales Value","op":"gte","value":1000,"logical":"AND"}]
Allowed ops: equals, contains, not_contains, starts_with, gt, lt, gte, lte
Allows logical: AND, OR (default is AND). This is the logical join between multiple conditions.
The summary flag indicates if the data request should use a summary query which will summarize the data based on the fields specified. This is useful when datasets are large and summary=True is the default. If summary flag is set to false then it allows data to be read without being summarized.
The system property comes from the value of the system key in the dict of the selected subject. It should ONLY come from that value. The value above is an example only.
The select list should only contain values that have keys in the factFieldTypes or metricFieldTypes dict of the selected subject
"""
try:
if not subject:
return json.dumps({"error": "subject parameter is required"})
if not select:
return json.dumps({"error": "select parameter is required (list of field names)"})
return await utils().get_rows(subject, select, summary, system, where)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_top_n_fast(
subject: str = "",
group_by: str = "",
order_by: str = "",
n: int = 10,
system: str = "",
where: List[Dict[str, Any]] = [],
ctx: Optional[Context] = None
) -> str:
"""
FAST PATH for rankings and leaderboards.
Use when the user asks for "top/bottom N" by a metric (no free-form reasoning).
Much faster and cheaper than get_answer.
If the output json contains property row_count wil a value less than or equal
to 10 then the data property can be used as the answer.
If the output json contains a non blank value for the instance_id property
then the data property will only contain a sample of the data and the full data set
can be found in a table named my_table in a DuckDB database file saved on disk.
In that case you MUST use the query_results_fast tool to query the results with SQL
to get the data you need to answer the question. This is the only way to access larger datasets.
Example:
- "Top 10 regions by profit margin in 2025"
-> get_top_n(subject="Sales", group_by="Region", order_by="Profit Margin %", n=10, system="",
where=[{"field":"Financial Year","op":"equals","value":2025,"logical":"AND"}])
The system property comes from the value of the system key in the dict of the selected subject. It should ONLY come from that value.
"""
try:
if not subject:
return json.dumps({"error": "subject parameter is required"})
if not group_by:
return json.dumps({"error": "group_by parameter is required"})
if not order_by:
return json.dumps({"error": "order_by parameter is required"})
return await utils().get_top_n(subject, group_by, order_by, n,system, where)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def query_results_fast(
instance_id: str = "",
sql: str = "",
ctx: Optional[Context] = None
) -> str:
"""
FAST PATH for querying a result data set produced by other tools.
Use when the data set returned by another tool is over a specific number of rows.
This gives direct SQL access to the DuckDB database file saved on disk
allowing fast and cheap querying of the data.
Can be used to access larger datasets without needs further querying to get to an answer.
The only table in the database will be named my_table.
The columns will be the same as those returned by the tool that produced the dataset.
You will have a sample of the data in the data property of the output json from the tool that
produced the dataset.
Example:
- "Find the biggest difference between credit limit and balance"
-> query_results_fast(dataset_id="", instance_id="", sql="SELECT MAX(CreditLimit - Balance) AS MaxDifference FROM my_table;")
"""
try:
if not instance_id:
return json.dumps({"error": "instance_id parameter is required"})
if not sql:
return json.dumps({"error": "sql parameter is required"})
return await utils().query_results(instance_id, sql)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
def get_schema() -> str:
"""
Get the available schema. Returns a JSON object that defines the available subjects (tables) and their columns.
Returns a JSON string with:
- schemaVersion: int
- generatedAt: ISO 8601 UTC timestamp
- source: string identifying this server
- subjectsCount: int
- subjects: [
{
name: str,
aiDescription: Optional[str],
factFieldTypes: { fieldName: { name, type, aiDescription } },
metricFieldTypes: { metricName: { name, type, dimensionsUsed, aiDescription } },
system: str,
numDimensions: int,
numMetrics: int
}, ...
]
"""
try:
return utils().get_schema()
except Exception as e:
# Mirror your C# error string style
return f"Error retrieving subjects: {e}"
@mcp.tool()
async def get_financial_periods(
target_date: Optional[str] = None,
ctx: Optional[Context] = None
) -> str:
"""
Get all financial periods (year, quarter, month, week) for a given date.
Args:
target_date: Date in ISO format (YYYY-MM-DD). If not provided, uses today's date.
Returns:
JSON string with all financial periods
"""
try:
return await utils().get_financial_periods(target_date)
except Exception as e:
return json.dumps({"error": str(e)})
@mcp.tool()
async def get_calendar_period_date_range(
financial_year: Optional[int] = None,
period_number: Optional[int] = None,
period_type: Optional[str] = None,
ctx: Optional[Context] = None
) -> str:
"""
Get the start and end dates for a specific calendar period.
Args:
financial_year: The financial year (use null/None to automatically use current financial year)
period_number: The period number (e.g., month number, quarter number; use null/None to automatically use current period)
period_type: Type of period (year, month, quarter, week; use null/None to automatically use month)
Note:
If financial_year, period_number, or period_type are null/None (defaults), this tool will automatically
fetch the current financial periods and use the appropriate values for today's date.
Returns:
JSON string with start_date and end_date
"""
try:
# If any parameter is None, fetch current financial periods
if financial_year is None or period_number is None or period_type is None:
periods_result = await utils().get_financial_periods(None)
periods_data = json.loads(periods_result)
if "error" in periods_data:
return periods_result
# Parse the periods JSON
periods_str = periods_data.get("periods", "{}")
try:
periods = json.loads(periods_str) if isinstance(periods_str, str) else periods_str
except:
periods = {}
# Auto-fill missing parameters from current periods
if financial_year is None:
financial_year = periods.get("FinancialYear", periods.get("Year", 0))
if period_number is None:
# Default to current month if not specified
period_number = periods.get("Month", periods.get("Period", 1))
if period_type is None:
period_type = "month" # Default to month
if not financial_year:
return json.dumps({"error": "Could not determine financial_year"})
if not period_number:
return json.dumps({"error": "Could not determine period_number"})
if not period_type:
return json.dumps({"error": "period_type parameter is required"})
return await utils().get_calendar_period_date_range(financial_year, period_number, period_type)
except Exception as e:
return json.dumps({"error": str(e)})
if __name__ == "__main__":
mcp.run() # starts STDIO transport and blocks