Skip to content

Commit

Permalink
Move subscription client code to PerspectiveClient
Browse files Browse the repository at this point in the history
  • Loading branch information
lucksus committed Feb 26, 2025
1 parent 2ed719b commit d71c04d
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 20 deletions.
22 changes: 22 additions & 0 deletions core/src/perspectives/PerspectiveClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,28 @@ export class PerspectiveClient {
return perspectiveSubscribeQuery
}

subscribeToQueryUpdates(subscriptionId: string, onData: (result: string) => void): () => void {
const subscription = this.#apolloClient.subscribe({
query: gql`
subscription perspectiveQuerySubscription($subscriptionId: String!) {
perspectiveQuerySubscription(subscriptionId: $subscriptionId)
}
`,
variables: {
subscriptionId
}
}).subscribe({
next: (result) => {
if (result.data && result.data.perspectiveQuerySubscription) {
onData(result.data.perspectiveQuerySubscription);
}
},
error: (e) => console.error('Error in query subscription:', e)
});

return () => subscription.unsubscribe();
}

async keepAliveQuery(uuid: string, subscriptionId: string): Promise<boolean> {
const { perspectiveKeepAliveQuery } = unwrapApolloResult(await this.#apolloClient.mutate({
mutation: gql`mutation perspectiveKeepAliveQuery($uuid: String!, $subscriptionId: String!) {
Expand Down
29 changes: 9 additions & 20 deletions core/src/perspectives/PerspectiveProxy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class QuerySubscriptionProxy {
#client: PerspectiveClient;
#callbacks: Set<QueryCallback>;
#keepaliveTimer: number;
#subscription?: Unsubscribable;
#unsubscribe?: () => void;
#latestResult: string;
#disposed: boolean = false;

Expand All @@ -73,24 +73,13 @@ export class QuerySubscriptionProxy {
this.#notifyCallbacks(initialResult);

// Subscribe to query updates
this.#subscription = (this.#client as any).apolloClient.subscribe({
query: gql`
subscription perspectiveQuerySubscription($subscriptionId: String!) {
perspectiveQuerySubscription(subscriptionId: $subscriptionId)
}
`,
variables: {
subscriptionId: this.#subscriptionId
this.#unsubscribe = this.#client.subscribeToQueryUpdates(
this.#subscriptionId,
(result) => {
this.#latestResult = result;
this.#notifyCallbacks(result);
}
}).subscribe({
next: (result) => {
if (result.data && result.data.perspectiveQuerySubscription) {
this.#latestResult = result.data.perspectiveQuerySubscription;
this.#notifyCallbacks(this.#latestResult);
}
},
error: (e) => console.error('Error in query subscription:', e)
});
);

// Start keepalive loop using platform-agnostic setTimeout
const keepaliveLoop = async () => {
Expand Down Expand Up @@ -172,8 +161,8 @@ export class QuerySubscriptionProxy {
dispose() {
this.#disposed = true;
clearTimeout(this.#keepaliveTimer);
if (this.#subscription) {
this.#subscription.unsubscribe();
if (this.#unsubscribe) {
this.#unsubscribe();
}
this.#callbacks.clear();
}
Expand Down

0 comments on commit d71c04d

Please sign in to comment.