From 95f0820d0329f732579756abe682e63cf851ac64 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Wed, 15 Jan 2025 10:04:33 -0300 Subject: [PATCH] we do not need to lock for reading --- apps/api/src/yjs/v2/index.ts | 46 ++++--- apps/api/src/yjs/v2/persistors.ts | 208 ++++++++++++++---------------- 2 files changed, 126 insertions(+), 128 deletions(-) diff --git a/apps/api/src/yjs/v2/index.ts b/apps/api/src/yjs/v2/index.ts index c707a3ee..bed9286c 100644 --- a/apps/api/src/yjs/v2/index.ts +++ b/apps/api/src/yjs/v2/index.ts @@ -52,7 +52,6 @@ import { PubSubProvider } from './pubsub/index.js' import { PGPubSub } from './pubsub/pg.js' import pAll from 'p-all' import { getYDocWithoutHistory } from './documents.js' -import { acquireLock } from '../../lock.js' type Role = UserWorkspaceRole @@ -857,6 +856,7 @@ export class WSSharedDocV2 { } } +const creationPromises = new Map>() async function getYDoc( socketServer: IOServer, id: string, @@ -908,25 +908,33 @@ async function getYDoc( } if (!yDoc) { - yDoc = await acquireLock(`getYDoc:${id}`, async () => { - // check cache again after acquiring lock - const fromCache = docs.get(id) ?? docsCache.get(id) - if (fromCache) { - return fromCache - } + let creationPromise = creationPromises.get(id) - yDoc = await WSSharedDocV2.make( - id, - documentId, - workspaceId, - socketServer, - persistor, - tx - ) - docs.set(id, yDoc) - docsCache.set(id, yDoc) - return yDoc - }) + if (!creationPromise) { + creationPromise = (async () => { + const newYDoc = await WSSharedDocV2.make( + id, + documentId, + workspaceId, + socketServer, + persistor, + tx + ) + docs.set(id, newYDoc) + docsCache.set(id, newYDoc) + return newYDoc + })() + + creationPromises.set(id, creationPromise) + + try { + yDoc = await creationPromise + } finally { + creationPromises.delete(id) + } + } else { + yDoc = await creationPromise + } } logger().trace( diff --git a/apps/api/src/yjs/v2/persistors.ts b/apps/api/src/yjs/v2/persistors.ts index 9df29a08..62d5b05e 100644 --- a/apps/api/src/yjs/v2/persistors.ts +++ b/apps/api/src/yjs/v2/persistors.ts @@ -65,53 +65,51 @@ export class DocumentPersistor implements Persistor { } public async load(tx?: PrismaTransaction) { - return acquireLock(`document-persistor:${this.docId}`, async () => { - try { - const ydoc = new Y.Doc() - const dbDoc = await (tx ?? prisma()).yjsDocument.findUnique({ - where: { documentId: this.documentId }, - }) - - if (!dbDoc) { - return { - ydoc, - clock: 0, - byteLength: Y.encodeStateAsUpdate(ydoc).length, - applyUpdateLatency: 0, - clockUpdatedAt: new Date(), - } - } - - const updates = await (tx ?? prisma()).yjsUpdate.findMany({ - where: { - yjsDocumentId: dbDoc.id, - clock: dbDoc.clock, - }, - select: { update: true }, - orderBy: { createdAt: 'asc' }, - }) - let applyUpdateLatency = this.applyUpdate(ydoc, dbDoc.state) - if (updates.length > 0) { - const update = Y.mergeUpdates(updates.map((u) => u.update)) - applyUpdateLatency += this.applyUpdate(ydoc, update) - } + try { + const ydoc = new Y.Doc() + const dbDoc = await (tx ?? prisma()).yjsDocument.findUnique({ + where: { documentId: this.documentId }, + }) + if (!dbDoc) { return { ydoc, - clock: dbDoc.clock, - byteLength: dbDoc.state.length, - applyUpdateLatency, - clockUpdatedAt: - dbDoc.clockUpdatedAt ?? new Date(Date.now() - 24 * 60 * 60 * 1000), + clock: 0, + byteLength: Y.encodeStateAsUpdate(ydoc).length, + applyUpdateLatency: 0, + clockUpdatedAt: new Date(), } - } catch (err) { - logger().error( - { documentId: this.documentId, err }, - 'Failed to load Yjs document state' - ) - throw err } - }) + + const updates = await (tx ?? prisma()).yjsUpdate.findMany({ + where: { + yjsDocumentId: dbDoc.id, + clock: dbDoc.clock, + }, + select: { update: true }, + orderBy: { createdAt: 'asc' }, + }) + let applyUpdateLatency = this.applyUpdate(ydoc, dbDoc.state) + if (updates.length > 0) { + const update = Y.mergeUpdates(updates.map((u) => u.update)) + applyUpdateLatency += this.applyUpdate(ydoc, update) + } + + return { + ydoc, + clock: dbDoc.clock, + byteLength: dbDoc.state.length, + applyUpdateLatency, + clockUpdatedAt: + dbDoc.clockUpdatedAt ?? new Date(Date.now() - 24 * 60 * 60 * 1000), + } + } catch (err) { + logger().error( + { documentId: this.documentId, err }, + 'Failed to load Yjs document state' + ) + throw err + } } public async persist( @@ -264,81 +262,73 @@ export class AppPersistor implements Persistor { } public async load(tx?: PrismaTransaction) { - return acquireLock(`app-persistor:${this.docId}`, async () => { - try { - const yjsAppDoc = await ( - prisma() ?? tx - ).yjsAppDocument.findFirstOrThrow({ - where: { - id: this.yjsAppDocumentId, - }, - orderBy: { - createdAt: 'desc', - }, - include: { - userYjsAppDocuments: { - where: { - // use a new uuid when there is no user - // since uuid is always unique, nothing - // will be returned, which means we will - // be manipulating the published state - userId: this.userId ?? uuidv4(), - }, - take: 1, + try { + const yjsAppDoc = await (prisma() ?? tx).yjsAppDocument.findFirstOrThrow({ + where: { + id: this.yjsAppDocumentId, + }, + orderBy: { + createdAt: 'desc', + }, + include: { + userYjsAppDocuments: { + where: { + // use a new uuid when there is no user + // since uuid is always unique, nothing + // will be returned, which means we will + // be manipulating the published state + userId: this.userId ?? uuidv4(), }, + take: 1, }, - }) + }, + }) - const ydoc = new Y.Doc() - const userYjsAppDoc = yjsAppDoc.userYjsAppDocuments[0] - let byteLength = userYjsAppDoc?.state.length ?? yjsAppDoc.state.length - let clock = userYjsAppDoc?.clock ?? yjsAppDoc.clock - let applyUpdateLatency: number - let clockUpdatedAt = - userYjsAppDoc?.clockUpdatedAt ?? - yjsAppDoc.clockUpdatedAt ?? - new Date() - if (!this.userId) { - applyUpdateLatency = this.applyUpdate(ydoc, yjsAppDoc.state) - } else if (!userYjsAppDoc) { - // user never opened the app before. duplicate the state for them - const userYjsAppDoc = await ( - prisma() ?? tx - ).userYjsAppDocument.upsert({ - where: { - yjsAppDocumentId_userId: { - yjsAppDocumentId: this.yjsAppDocumentId, - userId: this.userId, - }, - }, - create: { + const ydoc = new Y.Doc() + const userYjsAppDoc = yjsAppDoc.userYjsAppDocuments[0] + let byteLength = userYjsAppDoc?.state.length ?? yjsAppDoc.state.length + let clock = userYjsAppDoc?.clock ?? yjsAppDoc.clock + let applyUpdateLatency: number + let clockUpdatedAt = + userYjsAppDoc?.clockUpdatedAt ?? yjsAppDoc.clockUpdatedAt ?? new Date() + if (!this.userId) { + applyUpdateLatency = this.applyUpdate(ydoc, yjsAppDoc.state) + } else if (!userYjsAppDoc) { + // user never opened the app before. duplicate the state for them + const userYjsAppDoc = await (prisma() ?? tx).userYjsAppDocument.upsert({ + where: { + yjsAppDocumentId_userId: { yjsAppDocumentId: this.yjsAppDocumentId, userId: this.userId, - state: yjsAppDoc.state, - clock: yjsAppDoc.clock, }, - update: {}, - }) - applyUpdateLatency = this.applyUpdate(ydoc, userYjsAppDoc.state) - } else { - applyUpdateLatency = this.applyUpdate(ydoc, userYjsAppDoc.state) - } + }, + create: { + yjsAppDocumentId: this.yjsAppDocumentId, + userId: this.userId, + state: yjsAppDoc.state, + clock: yjsAppDoc.clock, + }, + update: {}, + }) + applyUpdateLatency = this.applyUpdate(ydoc, userYjsAppDoc.state) + } else { + applyUpdateLatency = this.applyUpdate(ydoc, userYjsAppDoc.state) + } - return { - ydoc, - clock, - byteLength, - applyUpdateLatency, - clockUpdatedAt, - } - } catch (err) { - logger().error( - { yjsAppDocumentId: this.yjsAppDocumentId, userId: this.userId, err }, - 'Failed to load Yjs app document state' - ) - throw err + return { + ydoc, + clock, + byteLength, + applyUpdateLatency, + clockUpdatedAt, } - }) + } catch (err) { + logger().error( + { yjsAppDocumentId: this.yjsAppDocumentId, userId: this.userId, err }, + 'Failed to load Yjs app document state' + ) + throw err + } } public async persist(