Skip to content

Commit

Permalink
pass on uid properly for subscribe handler
Browse files Browse the repository at this point in the history
  • Loading branch information
br41nslug committed Nov 28, 2022
1 parent b0c1610 commit 188259a
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 5 deletions.
2 changes: 1 addition & 1 deletion examples/websocket-tester/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ <h1>Directus Websocket Tester</h1>
}
function sendMessage(msg) {
if (ws && connected) {
msg.uid = uuidv4();
!msg.uid && (msg.uid = uuidv4());
addLog('Sending message: '+JSON.stringify(msg, null, 2));
ws.send(JSON.stringify(msg));
}
Expand Down
12 changes: 8 additions & 4 deletions src/handlers/subscribe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*
* SUBSCRIBE request handler
*/
import { Query, } from '@directus/shared/types';
import { Query } from '@directus/shared/types';
import { ClientHandler, WebsocketMessage, WebsocketClient } from '../types';
import { Logger } from 'pino';

Expand All @@ -16,11 +16,12 @@ export const subscribeHandler: ClientHandler = ({ core: cfg }, context) => {
logger, emitter
} = context;
const subscriptions: Record<string, Set<{
uid?: string,
id?: string | number;
query?: Query;
client: WebsocketClient
}>> = {};
function subscribe(collection: string, client: WebsocketClient, conf: { id?: string, query?: Query } = {}) {
function subscribe(collection: string, client: WebsocketClient, conf: Record<string, any> = {}) {
if ( ! subscriptions[collection]) subscriptions[collection] = new Set();
subscriptions[collection]?.add({ ...conf, client });
}
Expand All @@ -38,7 +39,7 @@ export const subscribeHandler: ClientHandler = ({ core: cfg }, context) => {
}
async function dispatch(collection: string, data: any) {
const subs = subscriptions[collection] ?? new Set();
for (const { client, query={} } of subs) {
for (const { client, query={}, uid } of subs) {
const schema = await getSchema({ accountability: client.accountability });
const service = new ItemsService(collection, {
knex, schema, accountability: client.accountability
Expand All @@ -53,6 +54,7 @@ export const subscribeHandler: ClientHandler = ({ core: cfg }, context) => {
const msg = await emitter.emitFilter('websocket.subscribe.beforeSend', {
type: 'SUBSCRIPTION', ...data, payload
});
if (uid) msg.uid = uid;
client.socket.send(JSON.stringify(msg));
}
} catch (err: any) {
Expand All @@ -77,7 +79,9 @@ export const subscribeHandler: ClientHandler = ({ core: cfg }, context) => {
// if not authorized the read should throw an error
await service.readByQuery({ ...(message.query || {}), limit: 1 });
// subscribe to events if all went well
subscribe(collection, client, { id: message.id, query: message.query });
const sub: Record<string, any> = { id: message.id, query: message.query };
if (message?.uid) sub.uid = message.uid;
subscribe(collection, client, sub);
logger.info(`subscribed - ${message.collection} #${message.id}`);
},
onError(client: WebsocketClient) {
Expand Down

0 comments on commit 188259a

Please sign in to comment.