From 5af94c70c6a8f0cc16679459c4bcd566410c2a37 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 10:50:57 +0100 Subject: [PATCH 01/13] WIP: Prolog query subscriptions --- core/src/perspectives/PerspectiveClient.ts | 25 ++++++ core/src/perspectives/PerspectiveResolver.ts | 52 ++++++++++- .../src/graphql/mutation_resolvers.rs | 42 +++++++++ .../src/graphql/subscription_resolvers.rs | 38 +++++++- .../src/perspectives/perspective_instance.rs | 88 ++++++++++++++++++- rust-executor/src/pubsub.rs | 1 + 6 files changed, 240 insertions(+), 6 deletions(-) diff --git a/core/src/perspectives/PerspectiveClient.ts b/core/src/perspectives/PerspectiveClient.ts index 80e1f7dfd..0838bee68 100644 --- a/core/src/perspectives/PerspectiveClient.ts +++ b/core/src/perspectives/PerspectiveClient.ts @@ -154,6 +154,31 @@ export class PerspectiveClient { return JSON.parse(perspectiveQueryProlog) } + async subscribeQuery(uuid: string, query: string): Promise<{ subscriptionId: string, result: string }> { + const { perspectiveSubscribeQuery } = unwrapApolloResult(await this.#apolloClient.mutate({ + mutation: gql`mutation perspectiveSubscribeQuery($uuid: String!, $query: String!) { + perspectiveSubscribeQuery(uuid: $uuid, query: $query) { + subscriptionId + result + } + }`, + variables: { uuid, query } + })) + + return perspectiveSubscribeQuery + } + + async keepAliveQuery(uuid: string, subscriptionId: string): Promise { + const { perspectiveKeepAliveQuery } = unwrapApolloResult(await this.#apolloClient.mutate({ + mutation: gql`mutation perspectiveKeepAliveQuery($uuid: String!, $subscriptionId: String!) { + perspectiveKeepAliveQuery(uuid: $uuid, subscriptionId: $subscriptionId) + }`, + variables: { uuid, subscriptionId } + })) + + return perspectiveKeepAliveQuery + } + async add(name: string): Promise { const { perspectiveAdd } = unwrapApolloResult(await this.#apolloClient.mutate({ mutation: gql`mutation perspectiveAdd($name: String!) { diff --git a/core/src/perspectives/PerspectiveResolver.ts b/core/src/perspectives/PerspectiveResolver.ts index b3dd886cc..0ad653922 100644 --- a/core/src/perspectives/PerspectiveResolver.ts +++ b/core/src/perspectives/PerspectiveResolver.ts @@ -1,4 +1,4 @@ -import { Arg, Mutation, PubSub, Query, Resolver, Subscription } from "type-graphql"; +import { Arg, Mutation, PubSub, Query, Resolver, Root, Subscription, ObjectType, Field } from "type-graphql"; import { LinkExpression, LinkExpressionInput, LinkExpressionMutations, LinkExpressionUpdated, LinkInput, LinkMutations } from "../links/Links"; import { Neighbourhood, NeighbourhoodExpression } from "../neighbourhood/Neighbourhood"; import { LinkQuery } from "./LinkQuery"; @@ -20,6 +20,17 @@ testLink.proof = { valid: true } +export const PERSPECTIVE_QUERY_SUBSCRIPTION = "PERSPECTIVE_QUERY_SUBSCRIPTION" + +@ObjectType() +export class QuerySubscription { + @Field() + subscriptionId: string; + + @Field() + result: string; +} + /** * Resolver classes are used here to define the GraphQL schema * (through the type-graphql annotations) @@ -72,10 +83,25 @@ export default class PerspectiveResolver { } @Query(returns => String) - perspectiveQueryProlog(@Arg('uuid') uuid: string, @Arg('query') query: String): string { + async perspectiveQueryProlog( + @Arg('uuid') uuid: string, + @Arg('query') query: string + ): Promise { return `[{"X": 1}]` } + @Mutation(returns => QuerySubscription) + async perspectiveSubscribeQuery( + @Arg('uuid') uuid: string, + @Arg('query') query: string + ): Promise { + const result = `[{"X": 1}]` + return { + subscriptionId: "test-subscription-id", + result: result + } + } + @Mutation(returns => PerspectiveHandle) perspectiveAdd(@Arg('name') name: string, @PubSub() pubSub: any): PerspectiveHandle { const perspective = new PerspectiveHandle('00006', name); @@ -250,4 +276,26 @@ export default class PerspectiveResolver { perspectiveSyncStateChange(@Arg('uuid') uuid: string): PerspectiveState { return PerspectiveState.Synced } + + @Mutation(returns => Boolean) + perspectiveKeepAliveQuery( + @Arg('uuid') uuid: string, + @Arg('subscriptionId') subscriptionId: string + ): boolean { + return true + } + + @Subscription({ + topics: PERSPECTIVE_QUERY_SUBSCRIPTION, + filter: ({ payload, args }) => + payload.subscriptionId === args.subscriptionId && + payload.uuid === args.uuid + }) + perspectiveQuerySubscription( + @Arg('uuid') uuid: string, + @Arg('subscriptionId') subscriptionId: string, + @Root() payload: { subscriptionId: string, uuid: string, result: string } + ): string { + return payload.result + } } \ No newline at end of file diff --git a/rust-executor/src/graphql/mutation_resolvers.rs b/rust-executor/src/graphql/mutation_resolvers.rs index 00380619d..35f182e4c 100644 --- a/rust-executor/src/graphql/mutation_resolvers.rs +++ b/rust-executor/src/graphql/mutation_resolvers.rs @@ -54,6 +54,12 @@ fn link_status_from_input(status: Option) -> Result FieldResult { + check_capability( + &context.capabilities, + &perspective_query_capability(vec![uuid.clone()]), + )?; + + let perspective = get_perspective_with_uuid_field_error(&uuid)?; + let (subscription_id, result_string) = perspective.subscribe_and_query(query).await?; + + Ok(QuerySubscription { + subscription_id, + result: result_string, + }) + } + + async fn perspective_keepalive_query( + &self, + context: &RequestContext, + uuid: String, + subscription_id: String, + ) -> FieldResult { + check_capability( + &context.capabilities, + &perspective_query_capability(vec![uuid.clone()]), + )?; + + let perspective = get_perspective_with_uuid_field_error(&uuid)?; + perspective.keepalive_query(subscription_id).await?; + Ok(true) + } + async fn runtime_add_friends( &self, context: &RequestContext, diff --git a/rust-executor/src/graphql/subscription_resolvers.rs b/rust-executor/src/graphql/subscription_resolvers.rs index 9c1751ea4..a4be4bed2 100644 --- a/rust-executor/src/graphql/subscription_resolvers.rs +++ b/rust-executor/src/graphql/subscription_resolvers.rs @@ -10,9 +10,10 @@ use crate::{ AI_MODEL_LOADING_STATUS, AI_TRANSCRIPTION_TEXT_TOPIC, APPS_CHANGED, EXCEPTION_OCCURRED_TOPIC, NEIGHBOURHOOD_SIGNAL_TOPIC, PERSPECTIVE_ADDED_TOPIC, PERSPECTIVE_LINK_ADDED_TOPIC, PERSPECTIVE_LINK_REMOVED_TOPIC, - PERSPECTIVE_LINK_UPDATED_TOPIC, PERSPECTIVE_REMOVED_TOPIC, - PERSPECTIVE_SYNC_STATE_CHANGE_TOPIC, PERSPECTIVE_UPDATED_TOPIC, - RUNTIME_MESSAGED_RECEIVED_TOPIC, RUNTIME_NOTIFICATION_TRIGGERED_TOPIC, + PERSPECTIVE_LINK_UPDATED_TOPIC, PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC, + PERSPECTIVE_REMOVED_TOPIC, PERSPECTIVE_SYNC_STATE_CHANGE_TOPIC, + PERSPECTIVE_UPDATED_TOPIC, RUNTIME_MESSAGED_RECEIVED_TOPIC, + RUNTIME_NOTIFICATION_TRIGGERED_TOPIC, }, types::{DecoratedLinkExpression, TriggeredNotification}, }; @@ -288,4 +289,35 @@ impl Subscription { } } } + + async fn perspective_query_subscription( + &self, + context: &RequestContext, + uuid: String, + subscription_id: String, + ) -> Pin> + Send>> { + match check_capability(&context.capabilities, &PERSPECTIVE_SUBSCRIBE_CAPABILITY) { + Err(e) => Box::pin(stream::once(async move { Err(e.into()) })), + Ok(_) => { + let pubsub = get_global_pubsub().await; + let receiver = pubsub.subscribe(&PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC).await; + let stream = WatchStream::from_changes(receiver) + .filter_map(move |msg| { + let payload: Result = serde_json::from_str(&msg); + match payload { + Ok(payload) => { + if payload["uuid"].as_str().map(|s| s == uuid).unwrap_or(false) && + payload["subscriptionId"].as_str().map(|s| s == subscription_id).unwrap_or(false) { + Some(Ok(payload["result"].to_string())) + } else { + None + } + } + Err(_) => None + } + }); + Box::pin(stream) + } + } + } } diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index 2c21d419a..12865d759 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -17,6 +17,7 @@ use crate::pubsub::{ get_global_pubsub, NEIGHBOURHOOD_SIGNAL_TOPIC, PERSPECTIVE_LINK_ADDED_TOPIC, PERSPECTIVE_LINK_REMOVED_TOPIC, PERSPECTIVE_LINK_UPDATED_TOPIC, PERSPECTIVE_SYNC_STATE_CHANGE_TOPIC, RUNTIME_NOTIFICATION_TRIGGERED_TOPIC, + PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC, }; use crate::{db::Ad4mDb, types::*}; use ad4m_client::literal::Literal; @@ -31,13 +32,16 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, RwLock}; -use tokio::time::sleep; +use tokio::time::{sleep, Instant}; use tokio::{join, time}; +use uuid; static MAX_COMMIT_BYTES: usize = 3_000_000; //3MiB static MAX_PENDING_DIFFS_COUNT: usize = 150; static MAX_PENDING_SECONDS: u64 = 3; static IMMEDIATE_COMMITS_COUNT: usize = 20; +static QUERY_SUBSCRIPTION_TIMEOUT: u64 = 300; // 5 minutes in seconds +static QUERY_SUBSCRIPTION_CHECK_INTERVAL: u64 = 200; // 200ms #[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] pub enum SdnaType { @@ -135,6 +139,14 @@ pub struct Parameter { value: serde_json::Value, } +#[derive(Clone)] +struct SubscribedQuery { + query: String, + last_result: String, + last_keepalive: Instant, + subscription_id: String, +} + #[derive(Clone)] pub struct PerspectiveInstance { pub persisted: Arc>, @@ -152,6 +164,7 @@ pub struct PerspectiveInstance { links_have_changed: Arc>, commit_debounce_timer: Arc>>, immediate_commits_remaining: Arc>, + subscribed_queries: Arc>>, } impl PerspectiveInstance { @@ -171,6 +184,7 @@ impl PerspectiveInstance { links_have_changed: Arc::new(Mutex::new(false)), commit_debounce_timer: Arc::new(Mutex::new(None)), immediate_commits_remaining: Arc::new(Mutex::new(IMMEDIATE_COMMITS_COUNT)), // Default to 3 immediate commits + subscribed_queries: Arc::new(Mutex::new(HashMap::new())), } } @@ -1773,6 +1787,78 @@ impl PerspectiveInstance { Ok(format!("{{ {} }}", stringified)) } + + pub async fn subscribe_and_query(&self, query: String) -> Result<(String, String), AnyError> { + let subscription_id = uuid::Uuid::new_v4().to_string(); + let initial_result = self.prolog_query(query.clone()).await?; + let result_string = prolog_resolution_to_string(initial_result); + + let subscribed_query = SubscribedQuery { + query, + last_result: result_string.clone(), + last_keepalive: Instant::now(), + subscription_id: subscription_id.clone(), + }; + + self.subscribed_queries.lock().await.insert(subscription_id.clone(), subscribed_query); + Ok((subscription_id, result_string)) + } + + async fn check_subscribed_queries(&self) { + let mut queries_to_remove = Vec::new(); + let mut queries_with_changes = Vec::new(); + let uuid = self.persisted.lock().await.uuid.clone(); + + { + let mut queries = self.subscribed_queries.lock().await; + let now = Instant::now(); + + for (id, query) in queries.iter_mut() { + // Check for timeout + if now.duration_since(query.last_keepalive).as_secs() > QUERY_SUBSCRIPTION_TIMEOUT { + queries_to_remove.push(id.clone()); + continue; + } + + // Check for changes + if let Ok(result) = self.prolog_query(query.query.clone()).await { + let result_string = prolog_resolution_to_string(result); + if result_string != query.last_result { + query.last_result = result_string.clone(); + queries_with_changes.push((id.clone(), result_string)); + } + } + } + + // Remove timed out queries + for id in queries_to_remove { + queries.remove(&id); + } + } + + // Publish changes + for (id, result) in queries_with_changes { + let payload = serde_json::json!({ + "subscriptionId": id, + "uuid": uuid, + "result": result + }); + get_global_pubsub() + .await + .publish( + PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC, + &payload.to_string(), + ) + .await; + } + } + + async fn subscribed_queries_loop(&self) { + while !*self.is_teardown.lock().await { + self.check_subscribed_queries().await; + sleep(Duration::from_millis(QUERY_SUBSCRIPTION_CHECK_INTERVAL)).await; + } + } } pub fn prolog_result(result: String) -> Value { diff --git a/rust-executor/src/pubsub.rs b/rust-executor/src/pubsub.rs index ee1731989..1f5c906a5 100644 --- a/rust-executor/src/pubsub.rs +++ b/rust-executor/src/pubsub.rs @@ -134,6 +134,7 @@ lazy_static::lazy_static! { pub static ref RUNTIME_NOTIFICATION_TRIGGERED_TOPIC: String = "runtime-notification-triggered-topic".to_owned(); pub static ref AI_TRANSCRIPTION_TEXT_TOPIC: String = "ai-transcription-text-topic".to_owned(); pub static ref AI_MODEL_LOADING_STATUS: String = "ai-model-loading-status".to_owned(); + pub static ref PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC: String = "perspective-query-subscription-topic".to_owned(); } pub async fn get_global_pubsub() -> Arc { From f1fe70a9d89781b8d3a481124d4ebad02f7d8691 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 15:16:51 +0100 Subject: [PATCH 02/13] Add new check function to background tasks --- rust-executor/src/perspectives/perspective_instance.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index 12865d759..3243de5d0 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -194,6 +194,7 @@ impl PerspectiveInstance { self.notification_check_loop(), self.nh_sync_loop(), self.pending_diffs_loop(), + self.check_subscribed_queries() ); } From 0e2d5f63c910006f4350053c5cd0ab1d1b6d696a Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 15:39:36 +0100 Subject: [PATCH 03/13] PerspectiveProxy.subscribeInfer() with subscription proxy --- core/src/perspectives/PerspectiveProxy.ts | 168 ++++++++++++++++++++++ 1 file changed, 168 insertions(+) diff --git a/core/src/perspectives/PerspectiveProxy.ts b/core/src/perspectives/PerspectiveProxy.ts index 9970b64a7..a262dab29 100644 --- a/core/src/perspectives/PerspectiveProxy.ts +++ b/core/src/perspectives/PerspectiveProxy.ts @@ -10,6 +10,158 @@ import { collectionAdderToName, collectionRemoverToName, collectionSetterToName import { NeighbourhoodProxy } from "../neighbourhood/NeighbourhoodProxy"; import { NeighbourhoodExpression } from "../neighbourhood/Neighbourhood"; import { AIClient } from "../ai/AIClient"; +import { PERSPECTIVE_QUERY_SUBSCRIPTION } from "./PerspectiveResolver"; +import { gql } from "@apollo/client/core"; + +type QueryCallback = (result: string) => void; + +/** Proxy object for a subscribed Prolog query that provides real-time updates + * + * This class handles: + * - Keeping the subscription alive by sending periodic keepalive signals + * - Managing callbacks for result updates + * - Subscribing to query updates via GraphQL subscriptions + * - Maintaining the latest query result + * + * The subscription will remain active as long as keepalive signals are sent. + * Make sure to call dispose() when you're done with the subscription to clean up + * resources and stop keepalive signals. + * + * Example usage: + * ```typescript + * const subscription = await perspective.subscribeInfer("my_query(X)"); + * console.log("Initial result:", subscription.result); + * + * // Set up callback for updates + * const removeCallback = subscription.onResult(result => { + * console.log("New result:", result); + * }); + * + * // Later: clean up + * subscription.dispose(); + * ``` + */ +export class QuerySubscriptionProxy { + #uuid: string; + #subscriptionId: string; + #client: PerspectiveClient; + #callbacks: Set; + #keepaliveInterval: NodeJS.Timeout; + #subscription?: ZenObservable.Subscription; + #latestResult: string; + + /** Creates a new query subscription + * @param uuid - The UUID of the perspective + * @param subscriptionId - The ID returned by the subscription mutation + * @param initialResult - The initial query result + * @param client - The PerspectiveClient instance to use for communication + */ + constructor(uuid: string, subscriptionId: string, initialResult: string, client: PerspectiveClient) { + this.#uuid = uuid; + this.#subscriptionId = subscriptionId; + this.#client = client; + this.#callbacks = new Set(); + this.#latestResult = initialResult; + + // Call all callbacks with initial result + this.#notifyCallbacks(initialResult); + + // Subscribe to query updates + this.#subscription = (this.#client as any).apolloClient.subscribe({ + query: gql` + subscription perspectiveQuerySubscription($uuid: String!, $subscriptionId: String!) { + perspectiveQuerySubscription(uuid: $uuid, subscriptionId: $subscriptionId) + } + `, + variables: { + uuid: this.#uuid, + subscriptionId: this.#subscriptionId + } + }).subscribe({ + next: (result) => { + if (result.data && result.data.perspectiveQuerySubscription) { + this.#latestResult = result.data.perspectiveQuerySubscription; + this.#notifyCallbacks(this.#latestResult); + } + }, + error: (e) => console.error('Error in query subscription:', e) + }); + + // Start keepalive loop + this.#keepaliveInterval = setInterval(async () => { + try { + await this.#client.keepAliveQuery(this.#uuid, this.#subscriptionId); + } catch (e) { + console.error('Error in keepalive:', e); + } + }, 30000); // Send keepalive every 30 seconds + } + + /** Get the latest query result + * + * This returns the most recent result from the query, which could be either: + * - The initial result from when the subscription was created + * - The latest update received through the subscription + * + * @returns The latest query result as a string (usually a JSON array of bindings) + */ + get result(): string { + return this.#latestResult; + } + + /** Add a callback that will be called whenever new results arrive + * + * The callback will be called immediately with the current result, + * and then again each time the query results change. + * + * @param callback - Function that takes a result string and processes it + * @returns A function that can be called to remove this callback + * + * Example: + * ```typescript + * const removeCallback = subscription.onResult(result => { + * const bindings = JSON.parse(result); + * console.log("New bindings:", bindings); + * }); + * + * // Later: stop receiving updates + * removeCallback(); + * ``` + */ + onResult(callback: QueryCallback): () => void { + this.#callbacks.add(callback); + return () => this.#callbacks.delete(callback); + } + + /** Internal method to notify all callbacks of a new result */ + #notifyCallbacks(result: string) { + for (const callback of this.#callbacks) { + try { + callback(result); + } catch (e) { + console.error('Error in query subscription callback:', e); + } + } + } + + /** Clean up the subscription and stop keepalive signals + * + * This method: + * 1. Stops the keepalive interval + * 2. Unsubscribes from GraphQL subscription updates + * 3. Clears all registered callbacks + * + * After calling this method, the subscription is no longer active and + * will not receive any more updates. The instance should be discarded. + */ + dispose() { + clearInterval(this.#keepaliveInterval); + if (this.#subscription) { + this.#subscription.unsubscribe(); + } + this.#callbacks.clear(); + } +} type PerspectiveListenerTypes = "link-added" | "link-removed" | "link-updated" @@ -576,4 +728,20 @@ export class PerspectiveProxy { return this.#client.aiClient } + /** Subscribe to a Prolog query and get updates when the results change. + * Returns a QuerySubscriptionProxy that handles keepalive and allows registering callbacks. + * The initial and subsequent results can be accessed via the result property. + * + * Make sure to call dispose() on the subscription when you're done with it + */ + async subscribeInfer(query: string): Promise { + const result = await this.#client.subscribeQuery(this.uuid, query); + return new QuerySubscriptionProxy( + this.uuid, + result.subscriptionId, + result.result, + this.#client + ); + } + } \ No newline at end of file From a77461d475f50554a5b3f6346d7a3d7932f2b06f Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 15:48:09 +0100 Subject: [PATCH 04/13] add missing keepalive_query function --- rust-executor/src/perspectives/perspective_instance.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index 3243de5d0..f34695b19 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -1805,6 +1805,16 @@ impl PerspectiveInstance { Ok((subscription_id, result_string)) } + pub async fn keepalive_query(&self, subscription_id: String) -> Result<(), AnyError> { + let mut queries = self.subscribed_queries.lock().await; + if let Some(query) = queries.get_mut(&subscription_id) { + query.last_keepalive = Instant::now(); + Ok(()) + } else { + Err(anyhow!("Subscription not found")) + } + } + async fn check_subscribed_queries(&self) { let mut queries_to_remove = Vec::new(); let mut queries_with_changes = Vec::new(); From 3186f763fb01fbd6d7466b82f1ef75348b8e8d72 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 15:48:26 +0100 Subject: [PATCH 05/13] QuerySubscription graphql type --- rust-executor/src/graphql/graphql_types.rs | 7 +++++++ rust-executor/src/graphql/mutation_resolvers.rs | 6 ------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/rust-executor/src/graphql/graphql_types.rs b/rust-executor/src/graphql/graphql_types.rs index 155e82a60..917a9af4d 100644 --- a/rust-executor/src/graphql/graphql_types.rs +++ b/rust-executor/src/graphql/graphql_types.rs @@ -1067,3 +1067,10 @@ impl ImportResult { } } } + + +#[derive(GraphQLObject, Debug, Clone, Serialize, Deserialize)] +pub struct QuerySubscription { + pub subscription_id: String, + pub result: String, +} diff --git a/rust-executor/src/graphql/mutation_resolvers.rs b/rust-executor/src/graphql/mutation_resolvers.rs index 35f182e4c..f7b314e72 100644 --- a/rust-executor/src/graphql/mutation_resolvers.rs +++ b/rust-executor/src/graphql/mutation_resolvers.rs @@ -54,12 +54,6 @@ fn link_status_from_input(status: Option) -> Result Date: Wed, 26 Feb 2025 15:59:53 +0100 Subject: [PATCH 06/13] Proper implementation of the new subscription resolver --- rust-executor/src/graphql/graphql_types.rs | 21 ++++++++++++++++ .../src/graphql/subscription_resolvers.rs | 24 ++++++------------- .../src/perspectives/perspective_instance.rs | 15 ++++++------ 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/rust-executor/src/graphql/graphql_types.rs b/rust-executor/src/graphql/graphql_types.rs index 917a9af4d..286e9ccc5 100644 --- a/rust-executor/src/graphql/graphql_types.rs +++ b/rust-executor/src/graphql/graphql_types.rs @@ -1074,3 +1074,24 @@ pub struct QuerySubscription { pub subscription_id: String, pub result: String, } + +#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)] +pub struct PerspectiveQuerySubscriptionFilter { + pub uuid: String, + pub subscription_id: String, + pub result: String, +} + +impl GetValue for PerspectiveQuerySubscriptionFilter { + type Value = String; + + fn get_value(&self) -> Self::Value { + self.result.clone() + } +} + +impl GetFilter for PerspectiveQuerySubscriptionFilter { + fn get_filter(&self) -> Option { + Some(self.subscription_id.clone()) + } +} diff --git a/rust-executor/src/graphql/subscription_resolvers.rs b/rust-executor/src/graphql/subscription_resolvers.rs index a4be4bed2..6bf7f676d 100644 --- a/rust-executor/src/graphql/subscription_resolvers.rs +++ b/rust-executor/src/graphql/subscription_resolvers.rs @@ -300,23 +300,13 @@ impl Subscription { Err(e) => Box::pin(stream::once(async move { Err(e.into()) })), Ok(_) => { let pubsub = get_global_pubsub().await; - let receiver = pubsub.subscribe(&PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC).await; - let stream = WatchStream::from_changes(receiver) - .filter_map(move |msg| { - let payload: Result = serde_json::from_str(&msg); - match payload { - Ok(payload) => { - if payload["uuid"].as_str().map(|s| s == uuid).unwrap_or(false) && - payload["subscriptionId"].as_str().map(|s| s == subscription_id).unwrap_or(false) { - Some(Ok(payload["result"].to_string())) - } else { - None - } - } - Err(_) => None - } - }); - Box::pin(stream) + let topic = &PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC; + subscribe_and_process::( + pubsub, + topic.to_string(), + Some(subscription_id), + ) + .await } } } diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index f34695b19..d84c8f865 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -8,6 +8,7 @@ use crate::graphql::graphql_types::{ DecoratedPerspectiveDiff, ExpressionRendered, JsResultType, LinkMutations, LinkQuery, LinkStatus, NeighbourhoodSignalFilter, OnlineAgent, PerspectiveExpression, PerspectiveHandle, PerspectiveLinkFilter, PerspectiveLinkUpdatedFilter, PerspectiveState, PerspectiveStateFilter, + PerspectiveQuerySubscriptionFilter, }; use crate::languages::language::Language; use crate::languages::LanguageController; @@ -1849,16 +1850,16 @@ impl PerspectiveInstance { // Publish changes for (id, result) in queries_with_changes { - let payload = serde_json::json!({ - "subscriptionId": id, - "uuid": uuid, - "result": result - }); + let filter = PerspectiveQuerySubscriptionFilter { + uuid: uuid.clone(), + subscription_id: id, + result, + }; get_global_pubsub() .await .publish( - PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC, - &payload.to_string(), + &PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC.to_string(), + &serde_json::to_string(&filter).unwrap(), ) .await; } From ead40c6c752af54748cb12b094f3c038d4f1a980 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 16:01:53 +0100 Subject: [PATCH 07/13] add correct loop function to background tasks --- rust-executor/src/perspectives/perspective_instance.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index d84c8f865..d6af3d2b9 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -195,7 +195,7 @@ impl PerspectiveInstance { self.notification_check_loop(), self.nh_sync_loop(), self.pending_diffs_loop(), - self.check_subscribed_queries() + self.subscribed_queries_loop() ); } From 1e287137d6491569a57c2e459e5fac916a6dec4f Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 16:11:07 +0100 Subject: [PATCH 08/13] =?UTF-8?q?Don=E2=80=99t=20need=20perspective=20UUID?= =?UTF-8?q?=20to=20subscribe=20to=20query?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/src/perspectives/PerspectiveProxy.ts | 5 ++--- core/src/perspectives/PerspectiveResolver.ts | 4 +--- rust-executor/src/graphql/subscription_resolvers.rs | 1 - 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/core/src/perspectives/PerspectiveProxy.ts b/core/src/perspectives/PerspectiveProxy.ts index a262dab29..425ba1b7e 100644 --- a/core/src/perspectives/PerspectiveProxy.ts +++ b/core/src/perspectives/PerspectiveProxy.ts @@ -69,12 +69,11 @@ export class QuerySubscriptionProxy { // Subscribe to query updates this.#subscription = (this.#client as any).apolloClient.subscribe({ query: gql` - subscription perspectiveQuerySubscription($uuid: String!, $subscriptionId: String!) { - perspectiveQuerySubscription(uuid: $uuid, subscriptionId: $subscriptionId) + subscription perspectiveQuerySubscription($subscriptionId: String!) { + perspectiveQuerySubscription(subscriptionId: $subscriptionId) } `, variables: { - uuid: this.#uuid, subscriptionId: this.#subscriptionId } }).subscribe({ diff --git a/core/src/perspectives/PerspectiveResolver.ts b/core/src/perspectives/PerspectiveResolver.ts index 0ad653922..cd0947842 100644 --- a/core/src/perspectives/PerspectiveResolver.ts +++ b/core/src/perspectives/PerspectiveResolver.ts @@ -288,11 +288,9 @@ export default class PerspectiveResolver { @Subscription({ topics: PERSPECTIVE_QUERY_SUBSCRIPTION, filter: ({ payload, args }) => - payload.subscriptionId === args.subscriptionId && - payload.uuid === args.uuid + payload.subscriptionId === args.subscriptionId }) perspectiveQuerySubscription( - @Arg('uuid') uuid: string, @Arg('subscriptionId') subscriptionId: string, @Root() payload: { subscriptionId: string, uuid: string, result: string } ): string { diff --git a/rust-executor/src/graphql/subscription_resolvers.rs b/rust-executor/src/graphql/subscription_resolvers.rs index 6bf7f676d..78a83548d 100644 --- a/rust-executor/src/graphql/subscription_resolvers.rs +++ b/rust-executor/src/graphql/subscription_resolvers.rs @@ -293,7 +293,6 @@ impl Subscription { async fn perspective_query_subscription( &self, context: &RequestContext, - uuid: String, subscription_id: String, ) -> Pin> + Send>> { match check_capability(&context.capabilities, &PERSPECTIVE_SUBSCRIBE_CAPABILITY) { From 08d294a646831e9959e0a52432223310961b4284 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 16:12:40 +0100 Subject: [PATCH 09/13] Remove redundant field --- rust-executor/src/perspectives/perspective_instance.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index d6af3d2b9..96da6e420 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -145,7 +145,6 @@ struct SubscribedQuery { query: String, last_result: String, last_keepalive: Instant, - subscription_id: String, } #[derive(Clone)] @@ -1799,7 +1798,6 @@ impl PerspectiveInstance { query, last_result: result_string.clone(), last_keepalive: Instant::now(), - subscription_id: subscription_id.clone(), }; self.subscribed_queries.lock().await.insert(subscription_id.clone(), subscribed_query); From 814be7f12f6344b6eb64149bc77c41bd115836e2 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 16:18:12 +0100 Subject: [PATCH 10/13] fmt --- rust-executor/src/graphql/graphql_types.rs | 1 - rust-executor/src/graphql/subscription_resolvers.rs | 5 ++--- .../src/perspectives/perspective_instance.rs | 13 ++++++++----- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/rust-executor/src/graphql/graphql_types.rs b/rust-executor/src/graphql/graphql_types.rs index 286e9ccc5..0c94c09ce 100644 --- a/rust-executor/src/graphql/graphql_types.rs +++ b/rust-executor/src/graphql/graphql_types.rs @@ -1068,7 +1068,6 @@ impl ImportResult { } } - #[derive(GraphQLObject, Debug, Clone, Serialize, Deserialize)] pub struct QuerySubscription { pub subscription_id: String, diff --git a/rust-executor/src/graphql/subscription_resolvers.rs b/rust-executor/src/graphql/subscription_resolvers.rs index 78a83548d..25fe73f46 100644 --- a/rust-executor/src/graphql/subscription_resolvers.rs +++ b/rust-executor/src/graphql/subscription_resolvers.rs @@ -11,9 +11,8 @@ use crate::{ EXCEPTION_OCCURRED_TOPIC, NEIGHBOURHOOD_SIGNAL_TOPIC, PERSPECTIVE_ADDED_TOPIC, PERSPECTIVE_LINK_ADDED_TOPIC, PERSPECTIVE_LINK_REMOVED_TOPIC, PERSPECTIVE_LINK_UPDATED_TOPIC, PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC, - PERSPECTIVE_REMOVED_TOPIC, PERSPECTIVE_SYNC_STATE_CHANGE_TOPIC, - PERSPECTIVE_UPDATED_TOPIC, RUNTIME_MESSAGED_RECEIVED_TOPIC, - RUNTIME_NOTIFICATION_TRIGGERED_TOPIC, + PERSPECTIVE_REMOVED_TOPIC, PERSPECTIVE_SYNC_STATE_CHANGE_TOPIC, PERSPECTIVE_UPDATED_TOPIC, + RUNTIME_MESSAGED_RECEIVED_TOPIC, RUNTIME_NOTIFICATION_TRIGGERED_TOPIC, }, types::{DecoratedLinkExpression, TriggeredNotification}, }; diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index 96da6e420..7d28765eb 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -7,8 +7,8 @@ use crate::agent::{self, create_signed_expression}; use crate::graphql::graphql_types::{ DecoratedPerspectiveDiff, ExpressionRendered, JsResultType, LinkMutations, LinkQuery, LinkStatus, NeighbourhoodSignalFilter, OnlineAgent, PerspectiveExpression, PerspectiveHandle, - PerspectiveLinkFilter, PerspectiveLinkUpdatedFilter, PerspectiveState, PerspectiveStateFilter, - PerspectiveQuerySubscriptionFilter, + PerspectiveLinkFilter, PerspectiveLinkUpdatedFilter, PerspectiveQuerySubscriptionFilter, + PerspectiveState, PerspectiveStateFilter, }; use crate::languages::language::Language; use crate::languages::LanguageController; @@ -17,8 +17,8 @@ use crate::prolog_service::engine::PrologEngine; use crate::pubsub::{ get_global_pubsub, NEIGHBOURHOOD_SIGNAL_TOPIC, PERSPECTIVE_LINK_ADDED_TOPIC, PERSPECTIVE_LINK_REMOVED_TOPIC, PERSPECTIVE_LINK_UPDATED_TOPIC, - PERSPECTIVE_SYNC_STATE_CHANGE_TOPIC, RUNTIME_NOTIFICATION_TRIGGERED_TOPIC, - PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC, + PERSPECTIVE_QUERY_SUBSCRIPTION_TOPIC, PERSPECTIVE_SYNC_STATE_CHANGE_TOPIC, + RUNTIME_NOTIFICATION_TRIGGERED_TOPIC, }; use crate::{db::Ad4mDb, types::*}; use ad4m_client::literal::Literal; @@ -1800,7 +1800,10 @@ impl PerspectiveInstance { last_keepalive: Instant::now(), }; - self.subscribed_queries.lock().await.insert(subscription_id.clone(), subscribed_query); + self.subscribed_queries + .lock() + .await + .insert(subscription_id.clone(), subscribed_query); Ok((subscription_id, result_string)) } From 1772f4c5484d94c6c503112256c9647edaca3421 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 16:18:20 +0100 Subject: [PATCH 11/13] integration test --- tests/js/tests/perspective.ts | 48 +++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/js/tests/perspective.ts b/tests/js/tests/perspective.ts index 744777950..bc21213c8 100644 --- a/tests/js/tests/perspective.ts +++ b/tests/js/tests/perspective.ts @@ -411,6 +411,54 @@ export default function perspectiveTests(testContext: TestContext) { expect(await proxy.getSingleTarget(new LinkQuery(link1))).to.equal('target2') }) + it('can subscribe to Prolog query results', async () => { + // Add some test data + await proxy.add(new Link({ + source: "ad4m://root", + target: "note-ipfs://Qm123" + })) + await proxy.add(new Link({ + source: "note-ipfs://Qm123", + target: "todo-ontology://is-todo" + })) + + // Create subscription + const subscription = await (proxy as any).subscribeInfer('triple(X, _, "todo-ontology://is-todo").') + + // Check initial result + const initialResult = JSON.parse(subscription.result) + expect(initialResult).to.be.an('array') + expect(initialResult.length).to.equal(1) + expect(initialResult[0].X).to.equal('note-ipfs://Qm123') + + // Set up callback for updates + const updates: string[] = [] + const unsubscribe = subscription.onResult((result: any) => { + updates.push(result) + }) + + // Add another link that should trigger an update + await proxy.add(new Link({ + source: "note-ipfs://Qm456", + target: "todo-ontology://is-todo" + })) + + // Wait for subscription update + await sleep(1000) + + // Verify we got an update + expect(updates.length).to.be.greaterThan(0) + const latestResult = JSON.parse(updates[updates.length - 1]) + expect(latestResult).to.be.an('array') + expect(latestResult.length).to.equal(2) + expect(latestResult.map((r: any) => r.X)).to.include('note-ipfs://Qm123') + expect(latestResult.map((r: any) => r.X)).to.include('note-ipfs://Qm456') + + // Clean up subscription + unsubscribe() + subscription.dispose() + }) + }) } } \ No newline at end of file From 2ed719b26170290f5460494206983e2c44ecd2d8 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 16:37:34 +0100 Subject: [PATCH 12/13] =?UTF-8?q?Don=E2=80=99t=20use=20NodeJS=20in=20persp?= =?UTF-8?q?ective=20proxy?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- core/src/perspectives/PerspectiveProxy.ts | 31 ++++++++++++++++++----- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/core/src/perspectives/PerspectiveProxy.ts b/core/src/perspectives/PerspectiveProxy.ts index 425ba1b7e..ac5c13f07 100644 --- a/core/src/perspectives/PerspectiveProxy.ts +++ b/core/src/perspectives/PerspectiveProxy.ts @@ -15,6 +15,11 @@ import { gql } from "@apollo/client/core"; type QueryCallback = (result: string) => void; +// Generic subscription interface that matches Apollo's Subscription +interface Unsubscribable { + unsubscribe(): void; +} + /** Proxy object for a subscribed Prolog query that provides real-time updates * * This class handles: @@ -46,9 +51,10 @@ export class QuerySubscriptionProxy { #subscriptionId: string; #client: PerspectiveClient; #callbacks: Set; - #keepaliveInterval: NodeJS.Timeout; - #subscription?: ZenObservable.Subscription; + #keepaliveTimer: number; + #subscription?: Unsubscribable; #latestResult: string; + #disposed: boolean = false; /** Creates a new query subscription * @param uuid - The UUID of the perspective @@ -86,14 +92,24 @@ export class QuerySubscriptionProxy { error: (e) => console.error('Error in query subscription:', e) }); - // Start keepalive loop - this.#keepaliveInterval = setInterval(async () => { + // Start keepalive loop using platform-agnostic setTimeout + const keepaliveLoop = async () => { + if (this.#disposed) return; + try { await this.#client.keepAliveQuery(this.#uuid, this.#subscriptionId); } catch (e) { console.error('Error in keepalive:', e); } - }, 30000); // Send keepalive every 30 seconds + + // Schedule next keepalive if not disposed + if (!this.#disposed) { + this.#keepaliveTimer = setTimeout(keepaliveLoop, 30000) as unknown as number; + } + }; + + // Start the first keepalive loop + this.#keepaliveTimer = setTimeout(keepaliveLoop, 30000) as unknown as number; } /** Get the latest query result @@ -146,7 +162,7 @@ export class QuerySubscriptionProxy { /** Clean up the subscription and stop keepalive signals * * This method: - * 1. Stops the keepalive interval + * 1. Stops the keepalive timer * 2. Unsubscribes from GraphQL subscription updates * 3. Clears all registered callbacks * @@ -154,7 +170,8 @@ export class QuerySubscriptionProxy { * will not receive any more updates. The instance should be discarded. */ dispose() { - clearInterval(this.#keepaliveInterval); + this.#disposed = true; + clearTimeout(this.#keepaliveTimer); if (this.#subscription) { this.#subscription.unsubscribe(); } From d71c04d17e693481db9b0cbf7b4d2671fb9b08dc Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Wed, 26 Feb 2025 16:49:51 +0100 Subject: [PATCH 13/13] Move subscription client code to PerspectiveClient --- core/src/perspectives/PerspectiveClient.ts | 22 ++++++++++++++++ core/src/perspectives/PerspectiveProxy.ts | 29 +++++++--------------- 2 files changed, 31 insertions(+), 20 deletions(-) diff --git a/core/src/perspectives/PerspectiveClient.ts b/core/src/perspectives/PerspectiveClient.ts index 0838bee68..423d9247d 100644 --- a/core/src/perspectives/PerspectiveClient.ts +++ b/core/src/perspectives/PerspectiveClient.ts @@ -168,6 +168,28 @@ export class PerspectiveClient { return perspectiveSubscribeQuery } + subscribeToQueryUpdates(subscriptionId: string, onData: (result: string) => void): () => void { + const subscription = this.#apolloClient.subscribe({ + query: gql` + subscription perspectiveQuerySubscription($subscriptionId: String!) { + perspectiveQuerySubscription(subscriptionId: $subscriptionId) + } + `, + variables: { + subscriptionId + } + }).subscribe({ + next: (result) => { + if (result.data && result.data.perspectiveQuerySubscription) { + onData(result.data.perspectiveQuerySubscription); + } + }, + error: (e) => console.error('Error in query subscription:', e) + }); + + return () => subscription.unsubscribe(); + } + async keepAliveQuery(uuid: string, subscriptionId: string): Promise { const { perspectiveKeepAliveQuery } = unwrapApolloResult(await this.#apolloClient.mutate({ mutation: gql`mutation perspectiveKeepAliveQuery($uuid: String!, $subscriptionId: String!) { diff --git a/core/src/perspectives/PerspectiveProxy.ts b/core/src/perspectives/PerspectiveProxy.ts index ac5c13f07..67be58016 100644 --- a/core/src/perspectives/PerspectiveProxy.ts +++ b/core/src/perspectives/PerspectiveProxy.ts @@ -52,7 +52,7 @@ export class QuerySubscriptionProxy { #client: PerspectiveClient; #callbacks: Set; #keepaliveTimer: number; - #subscription?: Unsubscribable; + #unsubscribe?: () => void; #latestResult: string; #disposed: boolean = false; @@ -73,24 +73,13 @@ export class QuerySubscriptionProxy { this.#notifyCallbacks(initialResult); // Subscribe to query updates - this.#subscription = (this.#client as any).apolloClient.subscribe({ - query: gql` - subscription perspectiveQuerySubscription($subscriptionId: String!) { - perspectiveQuerySubscription(subscriptionId: $subscriptionId) - } - `, - variables: { - subscriptionId: this.#subscriptionId + this.#unsubscribe = this.#client.subscribeToQueryUpdates( + this.#subscriptionId, + (result) => { + this.#latestResult = result; + this.#notifyCallbacks(result); } - }).subscribe({ - next: (result) => { - if (result.data && result.data.perspectiveQuerySubscription) { - this.#latestResult = result.data.perspectiveQuerySubscription; - this.#notifyCallbacks(this.#latestResult); - } - }, - error: (e) => console.error('Error in query subscription:', e) - }); + ); // Start keepalive loop using platform-agnostic setTimeout const keepaliveLoop = async () => { @@ -172,8 +161,8 @@ export class QuerySubscriptionProxy { dispose() { this.#disposed = true; clearTimeout(this.#keepaliveTimer); - if (this.#subscription) { - this.#subscription.unsubscribe(); + if (this.#unsubscribe) { + this.#unsubscribe(); } this.#callbacks.clear(); }