-
Notifications
You must be signed in to change notification settings - Fork 7
API
csp-gateway provides a FastAPI based REST API optionally for every instance.
This provides a few nice features out of the box:
-
OpenAPI based endpoints automatically derived from the underlying
csptypes into JSON Schema (/openapi.json) -
Swagger / Redoc API documentation based on the
GatewayModuleandGatewayChannelsin the application (/docs//redoc)
The csp-gateway REST API is designed to be simple to consume from outside csp, with a few fundamental methods.
Note
The REST API is launched when starting the Gateway instance with rest=True
As described in Overview#Channels, the csp-gateway REST API has several methods for interacting with ticking / stateful data living on the GatewayChannels instance.
-
last (
GET,/api/v1/last/<channel>): Get the last tick of data on a channel -
next (
GET,/api/v1/next/<channel>): Wait for the next tick of data on a channel: WARNING: blocks, and can often be misused into race conditions -
state (
GET,/api/v1/state/<channel>): Get the accumulated state for any channel -
send (
POST,/api/v1/send/<channel>): Send a new datum as a tick into the running csp graph -
lookup (
POST,/api/v1/lookup/<channel>/<gateway struct ID>): Lookup an individual GatewayStruct by its requiredidfield
Note
Channels are included in the REST API by using a GatewayModule.
Most commonly, this is MountRestRoutes.
Important
lookup has substantial memory overhead, as we cache a copy of every instance of every datum.
GatewayModule subclasses can disable it via the classmethod omit_from_lookup.
A GatewayModule can call set_state in its connect method to allow for this API to be available.
State is collected by one or more instance attributes into an in-memory DuckDB instance.
For example, suppose I have the following type:
class ExampleData(GatewayStruct):
x: str
y: str
z: strIf my GatewayModule called set_state("example", ("x",)), state would be collected as the last tick of ExampleData per each unique value of x. If called with set_state("example", ("x", "y")), it would be collected as the last tick per each unique pair x,y, etc.
Important
This code and API will likely change a bit as we allow for more granular collection of records, and expose more DuckDB functionality.
State accepts an additional query parameter query.
This allows REST API users to query state and only return satisfying records.
Here are some examples from the autodocumentation illustrating the use of filters:
# Filter only records where `record.x` == 5
api/v1/state/example?query={"filters":[{"attr":"x","by":{"value":5,"where":"=="}}]}
# Filter only records where `record.x` < 10
/api/v1/state/example?query={"filters":[{"attr":"x","by":{"value":10,"where":"<"}}]}
# Filter only records where `record.timestamp` < "2023-03-30T14:45:26.394000"
/api/v1/state/example?query={"filters":[{"attr":"timestamp","by":{"when":"2023-03-30T14:45:26.394000","where":"<"}}]}
# Filter only records where `record.id` < `record.y`
/api/v1/state/example?query={"filters":[{"attr":"id","by":{"attr":"y","where":"<"}}]}
# Filter only records where `record.x` < 50 and `record.x` >= 30
/api/v1/state/example?query={"filters":[{"attr":"x","by":{"value":50,"where":"<"}},{"attr":"x","by":{"value":30,"where":">="}}]}
Important
This code and API will likely change a bit as we expose more DuckDB functionality.
In addition to a REST API, a more csp-like streaming API is also available via Websockets when the MountWebSocketRoutes module is included in a Gateway instance.
This API is bidirectional, providing the ability to receive data as it ticks or to tick in new data.
Any data available via last/send above is available via websocket.
To subscribe to channels, send a JSON message of the form:
{
"action": "subscribe",
"channel": "<channel name>"
}For dict basket channels (channels keyed by an enum or string), you can subscribe to a specific key:
{
"action": "subscribe",
"channel": "<channel name>",
"key": "<basket key>"
}If you omit the key for a dict basket channel, you will subscribe to all keys in that basket.
To unsubscribe from channels, send a JSON message of the form:
{
"action": "unsubscribe",
"channel": "<channel name>"
}For dict basket channels, you can unsubscribe from a specific key:
{
"action": "unsubscribe",
"channel": "<channel name>",
"key": "<basket key>"
}If you omit the key for a dict basket channel, you will unsubscribe from all keys in that basket.
Data will be sent across the websocket for all subscribed channels. It has the form:
{
"channel": "<channel name>",
"data": "<the same data that would be transmitted over e.g. the last endpoint>"
}For dict basket channels, the message also includes the key:
{
"channel": "<channel name>",
"key": "<basket key>",
"data": "<the data for this specific key>"
}To send data into a channel via websocket, use the send action:
{
"action": "send",
"channel": "<channel name>",
"data": {"field1": "value1", "field2": "value2"}
}Data can also be sent as a list:
{
"action": "send",
"channel": "<channel name>",
"data": [{"field1": "value1"}, {"field1": "value2"}]
}For dict basket channels, you must specify the key:
{
"action": "send",
"channel": "<channel name>",
"key": "<basket key>",
"data": {"field1": "value1"}
}A special heartbeat channel is always available. Subscribe to it to receive periodic PING messages:
{"channel": "heartbeat", "data": "PING"}This can be used to verify the connection is alive.
You can query the list of available websocket channels via the REST endpoint:
GET /api/v1/stream
This returns a list of channel names. Dict basket channels are listed with their keys, e.g., basket/A, basket/B, etc.
See Client for details on our integrated python client.
This wiki is autogenerated. To made updates, open a PR against the original source file in docs/wiki.
Get Started
Key Components
For Developers
Modules
- API/UI Modules
- Logging Modules
- Replay Engine Modules
- Utility Modules
For Contributors