Skip to content

Commit

Permalink
Merge pull request #573 from coasys/feature/prolog-query-subscription
Browse files Browse the repository at this point in the history
Prolog query subscriptions
  • Loading branch information
lucksus authored Feb 26, 2025
2 parents e3de157 + d71c04d commit 082fa4b
Show file tree
Hide file tree
Showing 9 changed files with 504 additions and 7 deletions.
47 changes: 47 additions & 0 deletions core/src/perspectives/PerspectiveClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,53 @@ export class PerspectiveClient {
return JSON.parse(perspectiveQueryProlog)
}

async subscribeQuery(uuid: string, query: string): Promise<{ subscriptionId: string, result: string }> {
const { perspectiveSubscribeQuery } = unwrapApolloResult(await this.#apolloClient.mutate({
mutation: gql`mutation perspectiveSubscribeQuery($uuid: String!, $query: String!) {
perspectiveSubscribeQuery(uuid: $uuid, query: $query) {
subscriptionId
result
}
}`,
variables: { uuid, query }
}))

return perspectiveSubscribeQuery
}

subscribeToQueryUpdates(subscriptionId: string, onData: (result: string) => void): () => void {
const subscription = this.#apolloClient.subscribe({
query: gql`
subscription perspectiveQuerySubscription($subscriptionId: String!) {
perspectiveQuerySubscription(subscriptionId: $subscriptionId)
}
`,
variables: {
subscriptionId
}
}).subscribe({
next: (result) => {
if (result.data && result.data.perspectiveQuerySubscription) {
onData(result.data.perspectiveQuerySubscription);
}
},
error: (e) => console.error('Error in query subscription:', e)
});

return () => subscription.unsubscribe();
}

async keepAliveQuery(uuid: string, subscriptionId: string): Promise<boolean> {
const { perspectiveKeepAliveQuery } = unwrapApolloResult(await this.#apolloClient.mutate({
mutation: gql`mutation perspectiveKeepAliveQuery($uuid: String!, $subscriptionId: String!) {
perspectiveKeepAliveQuery(uuid: $uuid, subscriptionId: $subscriptionId)
}`,
variables: { uuid, subscriptionId }
}))

return perspectiveKeepAliveQuery
}

async add(name: string): Promise<PerspectiveProxy> {
const { perspectiveAdd } = unwrapApolloResult(await this.#apolloClient.mutate({
mutation: gql`mutation perspectiveAdd($name: String!) {
Expand Down
173 changes: 173 additions & 0 deletions core/src/perspectives/PerspectiveProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,163 @@ import { collectionAdderToName, collectionRemoverToName, collectionSetterToName
import { NeighbourhoodProxy } from "../neighbourhood/NeighbourhoodProxy";
import { NeighbourhoodExpression } from "../neighbourhood/Neighbourhood";
import { AIClient } from "../ai/AIClient";
import { PERSPECTIVE_QUERY_SUBSCRIPTION } from "./PerspectiveResolver";
import { gql } from "@apollo/client/core";

type QueryCallback = (result: string) => void;

// Generic subscription interface that matches Apollo's Subscription
interface Unsubscribable {
unsubscribe(): void;
}

/** Proxy object for a subscribed Prolog query that provides real-time updates
*
* This class handles:
* - Keeping the subscription alive by sending periodic keepalive signals
* - Managing callbacks for result updates
* - Subscribing to query updates via GraphQL subscriptions
* - Maintaining the latest query result
*
* The subscription will remain active as long as keepalive signals are sent.
* Make sure to call dispose() when you're done with the subscription to clean up
* resources and stop keepalive signals.
*
* Example usage:
* ```typescript
* const subscription = await perspective.subscribeInfer("my_query(X)");
* console.log("Initial result:", subscription.result);
*
* // Set up callback for updates
* const removeCallback = subscription.onResult(result => {
* console.log("New result:", result);
* });
*
* // Later: clean up
* subscription.dispose();
* ```
*/
export class QuerySubscriptionProxy {
#uuid: string;
#subscriptionId: string;
#client: PerspectiveClient;
#callbacks: Set<QueryCallback>;
#keepaliveTimer: number;
#unsubscribe?: () => void;
#latestResult: string;
#disposed: boolean = false;

/** Creates a new query subscription
* @param uuid - The UUID of the perspective
* @param subscriptionId - The ID returned by the subscription mutation
* @param initialResult - The initial query result
* @param client - The PerspectiveClient instance to use for communication
*/
constructor(uuid: string, subscriptionId: string, initialResult: string, client: PerspectiveClient) {
this.#uuid = uuid;
this.#subscriptionId = subscriptionId;
this.#client = client;
this.#callbacks = new Set();
this.#latestResult = initialResult;

// Call all callbacks with initial result
this.#notifyCallbacks(initialResult);

// Subscribe to query updates
this.#unsubscribe = this.#client.subscribeToQueryUpdates(
this.#subscriptionId,
(result) => {
this.#latestResult = result;
this.#notifyCallbacks(result);
}
);

// Start keepalive loop using platform-agnostic setTimeout
const keepaliveLoop = async () => {
if (this.#disposed) return;

try {
await this.#client.keepAliveQuery(this.#uuid, this.#subscriptionId);
} catch (e) {
console.error('Error in keepalive:', e);
}

// Schedule next keepalive if not disposed
if (!this.#disposed) {
this.#keepaliveTimer = setTimeout(keepaliveLoop, 30000) as unknown as number;
}
};

// Start the first keepalive loop
this.#keepaliveTimer = setTimeout(keepaliveLoop, 30000) as unknown as number;
}

/** Get the latest query result
*
* This returns the most recent result from the query, which could be either:
* - The initial result from when the subscription was created
* - The latest update received through the subscription
*
* @returns The latest query result as a string (usually a JSON array of bindings)
*/
get result(): string {
return this.#latestResult;
}

/** Add a callback that will be called whenever new results arrive
*
* The callback will be called immediately with the current result,
* and then again each time the query results change.
*
* @param callback - Function that takes a result string and processes it
* @returns A function that can be called to remove this callback
*
* Example:
* ```typescript
* const removeCallback = subscription.onResult(result => {
* const bindings = JSON.parse(result);
* console.log("New bindings:", bindings);
* });
*
* // Later: stop receiving updates
* removeCallback();
* ```
*/
onResult(callback: QueryCallback): () => void {
this.#callbacks.add(callback);
return () => this.#callbacks.delete(callback);
}

/** Internal method to notify all callbacks of a new result */
#notifyCallbacks(result: string) {
for (const callback of this.#callbacks) {
try {
callback(result);
} catch (e) {
console.error('Error in query subscription callback:', e);
}
}
}

/** Clean up the subscription and stop keepalive signals
*
* This method:
* 1. Stops the keepalive timer
* 2. Unsubscribes from GraphQL subscription updates
* 3. Clears all registered callbacks
*
* After calling this method, the subscription is no longer active and
* will not receive any more updates. The instance should be discarded.
*/
dispose() {
this.#disposed = true;
clearTimeout(this.#keepaliveTimer);
if (this.#unsubscribe) {
this.#unsubscribe();
}
this.#callbacks.clear();
}
}

type PerspectiveListenerTypes = "link-added" | "link-removed" | "link-updated"

Expand Down Expand Up @@ -576,4 +733,20 @@ export class PerspectiveProxy {
return this.#client.aiClient
}

/** Subscribe to a Prolog query and get updates when the results change.
* Returns a QuerySubscriptionProxy that handles keepalive and allows registering callbacks.
* The initial and subsequent results can be accessed via the result property.
*
* Make sure to call dispose() on the subscription when you're done with it
*/
async subscribeInfer(query: string): Promise<QuerySubscriptionProxy> {
const result = await this.#client.subscribeQuery(this.uuid, query);
return new QuerySubscriptionProxy(
this.uuid,
result.subscriptionId,
result.result,
this.#client
);
}

}
50 changes: 48 additions & 2 deletions core/src/perspectives/PerspectiveResolver.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Arg, Mutation, PubSub, Query, Resolver, Subscription } from "type-graphql";
import { Arg, Mutation, PubSub, Query, Resolver, Root, Subscription, ObjectType, Field } from "type-graphql";
import { LinkExpression, LinkExpressionInput, LinkExpressionMutations, LinkExpressionUpdated, LinkInput, LinkMutations } from "../links/Links";
import { Neighbourhood, NeighbourhoodExpression } from "../neighbourhood/Neighbourhood";
import { LinkQuery } from "./LinkQuery";
Expand All @@ -20,6 +20,17 @@ testLink.proof = {
valid: true
}

export const PERSPECTIVE_QUERY_SUBSCRIPTION = "PERSPECTIVE_QUERY_SUBSCRIPTION"

@ObjectType()
export class QuerySubscription {
@Field()
subscriptionId: string;

@Field()
result: string;
}

/**
* Resolver classes are used here to define the GraphQL schema
* (through the type-graphql annotations)
Expand Down Expand Up @@ -72,10 +83,25 @@ export default class PerspectiveResolver {
}

@Query(returns => String)
perspectiveQueryProlog(@Arg('uuid') uuid: string, @Arg('query') query: String): string {
async perspectiveQueryProlog(
@Arg('uuid') uuid: string,
@Arg('query') query: string
): Promise<string> {
return `[{"X": 1}]`
}

@Mutation(returns => QuerySubscription)
async perspectiveSubscribeQuery(
@Arg('uuid') uuid: string,
@Arg('query') query: string
): Promise<QuerySubscription> {
const result = `[{"X": 1}]`
return {
subscriptionId: "test-subscription-id",
result: result
}
}

@Mutation(returns => PerspectiveHandle)
perspectiveAdd(@Arg('name') name: string, @PubSub() pubSub: any): PerspectiveHandle {
const perspective = new PerspectiveHandle('00006', name);
Expand Down Expand Up @@ -250,4 +276,24 @@ export default class PerspectiveResolver {
perspectiveSyncStateChange(@Arg('uuid') uuid: string): PerspectiveState {
return PerspectiveState.Synced
}

@Mutation(returns => Boolean)
perspectiveKeepAliveQuery(
@Arg('uuid') uuid: string,
@Arg('subscriptionId') subscriptionId: string
): boolean {
return true
}

@Subscription({
topics: PERSPECTIVE_QUERY_SUBSCRIPTION,
filter: ({ payload, args }) =>
payload.subscriptionId === args.subscriptionId
})
perspectiveQuerySubscription(
@Arg('subscriptionId') subscriptionId: string,
@Root() payload: { subscriptionId: string, uuid: string, result: string }
): string {
return payload.result
}
}
27 changes: 27 additions & 0 deletions rust-executor/src/graphql/graphql_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1067,3 +1067,30 @@ impl ImportResult {
}
}
}

#[derive(GraphQLObject, Debug, Clone, Serialize, Deserialize)]
pub struct QuerySubscription {
pub subscription_id: String,
pub result: String,
}

#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
pub struct PerspectiveQuerySubscriptionFilter {
pub uuid: String,
pub subscription_id: String,
pub result: String,
}

impl GetValue for PerspectiveQuerySubscriptionFilter {
type Value = String;

fn get_value(&self) -> Self::Value {
self.result.clone()
}
}

impl GetFilter for PerspectiveQuerySubscriptionFilter {
fn get_filter(&self) -> Option<String> {
Some(self.subscription_id.clone())
}
}
36 changes: 36 additions & 0 deletions rust-executor/src/graphql/mutation_resolvers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,42 @@ impl Mutation {
Ok(result)
}

async fn perspective_subscribe_query(
&self,
context: &RequestContext,
uuid: String,
query: String,
) -> FieldResult<QuerySubscription> {
check_capability(
&context.capabilities,
&perspective_query_capability(vec![uuid.clone()]),
)?;

let perspective = get_perspective_with_uuid_field_error(&uuid)?;
let (subscription_id, result_string) = perspective.subscribe_and_query(query).await?;

Ok(QuerySubscription {
subscription_id,
result: result_string,
})
}

async fn perspective_keepalive_query(
&self,
context: &RequestContext,
uuid: String,
subscription_id: String,
) -> FieldResult<bool> {
check_capability(
&context.capabilities,
&perspective_query_capability(vec![uuid.clone()]),
)?;

let perspective = get_perspective_with_uuid_field_error(&uuid)?;
perspective.keepalive_query(subscription_id).await?;
Ok(true)
}

async fn runtime_add_friends(
&self,
context: &RequestContext,
Expand Down
Loading

0 comments on commit 082fa4b

Please sign in to comment.