Skip to content

Commit

Permalink
we do not need to lock for reading
Browse files Browse the repository at this point in the history
  • Loading branch information
vieiralucas committed Jan 15, 2025
1 parent c8bff10 commit 95f0820
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 128 deletions.
46 changes: 27 additions & 19 deletions apps/api/src/yjs/v2/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ import { PubSubProvider } from './pubsub/index.js'
import { PGPubSub } from './pubsub/pg.js'
import pAll from 'p-all'
import { getYDocWithoutHistory } from './documents.js'
import { acquireLock } from '../../lock.js'

type Role = UserWorkspaceRole

Expand Down Expand Up @@ -857,6 +856,7 @@ export class WSSharedDocV2 {
}
}

const creationPromises = new Map<string, Promise<WSSharedDocV2>>()
async function getYDoc(
socketServer: IOServer,
id: string,
Expand Down Expand Up @@ -908,25 +908,33 @@ async function getYDoc(
}

if (!yDoc) {
yDoc = await acquireLock(`getYDoc:${id}`, async () => {
// check cache again after acquiring lock
const fromCache = docs.get(id) ?? docsCache.get(id)
if (fromCache) {
return fromCache
}
let creationPromise = creationPromises.get(id)

yDoc = await WSSharedDocV2.make(
id,
documentId,
workspaceId,
socketServer,
persistor,
tx
)
docs.set(id, yDoc)
docsCache.set(id, yDoc)
return yDoc
})
if (!creationPromise) {
creationPromise = (async () => {
const newYDoc = await WSSharedDocV2.make(
id,
documentId,
workspaceId,
socketServer,
persistor,
tx
)
docs.set(id, newYDoc)
docsCache.set(id, newYDoc)
return newYDoc
})()

creationPromises.set(id, creationPromise)

try {
yDoc = await creationPromise
} finally {
creationPromises.delete(id)
}
} else {
yDoc = await creationPromise
}
}

logger().trace(
Expand Down
208 changes: 99 additions & 109 deletions apps/api/src/yjs/v2/persistors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,53 +65,51 @@ export class DocumentPersistor implements Persistor {
}

public async load(tx?: PrismaTransaction) {
return acquireLock(`document-persistor:${this.docId}`, async () => {
try {
const ydoc = new Y.Doc()
const dbDoc = await (tx ?? prisma()).yjsDocument.findUnique({
where: { documentId: this.documentId },
})

if (!dbDoc) {
return {
ydoc,
clock: 0,
byteLength: Y.encodeStateAsUpdate(ydoc).length,
applyUpdateLatency: 0,
clockUpdatedAt: new Date(),
}
}

const updates = await (tx ?? prisma()).yjsUpdate.findMany({
where: {
yjsDocumentId: dbDoc.id,
clock: dbDoc.clock,
},
select: { update: true },
orderBy: { createdAt: 'asc' },
})
let applyUpdateLatency = this.applyUpdate(ydoc, dbDoc.state)
if (updates.length > 0) {
const update = Y.mergeUpdates(updates.map((u) => u.update))
applyUpdateLatency += this.applyUpdate(ydoc, update)
}
try {
const ydoc = new Y.Doc()
const dbDoc = await (tx ?? prisma()).yjsDocument.findUnique({
where: { documentId: this.documentId },
})

if (!dbDoc) {
return {
ydoc,
clock: dbDoc.clock,
byteLength: dbDoc.state.length,
applyUpdateLatency,
clockUpdatedAt:
dbDoc.clockUpdatedAt ?? new Date(Date.now() - 24 * 60 * 60 * 1000),
clock: 0,
byteLength: Y.encodeStateAsUpdate(ydoc).length,
applyUpdateLatency: 0,
clockUpdatedAt: new Date(),
}
} catch (err) {
logger().error(
{ documentId: this.documentId, err },
'Failed to load Yjs document state'
)
throw err
}
})

const updates = await (tx ?? prisma()).yjsUpdate.findMany({
where: {
yjsDocumentId: dbDoc.id,
clock: dbDoc.clock,
},
select: { update: true },
orderBy: { createdAt: 'asc' },
})
let applyUpdateLatency = this.applyUpdate(ydoc, dbDoc.state)
if (updates.length > 0) {
const update = Y.mergeUpdates(updates.map((u) => u.update))
applyUpdateLatency += this.applyUpdate(ydoc, update)
}

return {
ydoc,
clock: dbDoc.clock,
byteLength: dbDoc.state.length,
applyUpdateLatency,
clockUpdatedAt:
dbDoc.clockUpdatedAt ?? new Date(Date.now() - 24 * 60 * 60 * 1000),
}
} catch (err) {
logger().error(
{ documentId: this.documentId, err },
'Failed to load Yjs document state'
)
throw err
}
}

public async persist(
Expand Down Expand Up @@ -264,81 +262,73 @@ export class AppPersistor implements Persistor {
}

public async load(tx?: PrismaTransaction) {
return acquireLock(`app-persistor:${this.docId}`, async () => {
try {
const yjsAppDoc = await (
prisma() ?? tx
).yjsAppDocument.findFirstOrThrow({
where: {
id: this.yjsAppDocumentId,
},
orderBy: {
createdAt: 'desc',
},
include: {
userYjsAppDocuments: {
where: {
// use a new uuid when there is no user
// since uuid is always unique, nothing
// will be returned, which means we will
// be manipulating the published state
userId: this.userId ?? uuidv4(),
},
take: 1,
try {
const yjsAppDoc = await (prisma() ?? tx).yjsAppDocument.findFirstOrThrow({
where: {
id: this.yjsAppDocumentId,
},
orderBy: {
createdAt: 'desc',
},
include: {
userYjsAppDocuments: {
where: {
// use a new uuid when there is no user
// since uuid is always unique, nothing
// will be returned, which means we will
// be manipulating the published state
userId: this.userId ?? uuidv4(),
},
take: 1,
},
})
},
})

const ydoc = new Y.Doc()
const userYjsAppDoc = yjsAppDoc.userYjsAppDocuments[0]
let byteLength = userYjsAppDoc?.state.length ?? yjsAppDoc.state.length
let clock = userYjsAppDoc?.clock ?? yjsAppDoc.clock
let applyUpdateLatency: number
let clockUpdatedAt =
userYjsAppDoc?.clockUpdatedAt ??
yjsAppDoc.clockUpdatedAt ??
new Date()
if (!this.userId) {
applyUpdateLatency = this.applyUpdate(ydoc, yjsAppDoc.state)
} else if (!userYjsAppDoc) {
// user never opened the app before. duplicate the state for them
const userYjsAppDoc = await (
prisma() ?? tx
).userYjsAppDocument.upsert({
where: {
yjsAppDocumentId_userId: {
yjsAppDocumentId: this.yjsAppDocumentId,
userId: this.userId,
},
},
create: {
const ydoc = new Y.Doc()
const userYjsAppDoc = yjsAppDoc.userYjsAppDocuments[0]
let byteLength = userYjsAppDoc?.state.length ?? yjsAppDoc.state.length
let clock = userYjsAppDoc?.clock ?? yjsAppDoc.clock
let applyUpdateLatency: number
let clockUpdatedAt =
userYjsAppDoc?.clockUpdatedAt ?? yjsAppDoc.clockUpdatedAt ?? new Date()
if (!this.userId) {
applyUpdateLatency = this.applyUpdate(ydoc, yjsAppDoc.state)
} else if (!userYjsAppDoc) {
// user never opened the app before. duplicate the state for them
const userYjsAppDoc = await (prisma() ?? tx).userYjsAppDocument.upsert({
where: {
yjsAppDocumentId_userId: {
yjsAppDocumentId: this.yjsAppDocumentId,
userId: this.userId,
state: yjsAppDoc.state,
clock: yjsAppDoc.clock,
},
update: {},
})
applyUpdateLatency = this.applyUpdate(ydoc, userYjsAppDoc.state)
} else {
applyUpdateLatency = this.applyUpdate(ydoc, userYjsAppDoc.state)
}
},
create: {
yjsAppDocumentId: this.yjsAppDocumentId,
userId: this.userId,
state: yjsAppDoc.state,
clock: yjsAppDoc.clock,
},
update: {},
})
applyUpdateLatency = this.applyUpdate(ydoc, userYjsAppDoc.state)
} else {
applyUpdateLatency = this.applyUpdate(ydoc, userYjsAppDoc.state)
}

return {
ydoc,
clock,
byteLength,
applyUpdateLatency,
clockUpdatedAt,
}
} catch (err) {
logger().error(
{ yjsAppDocumentId: this.yjsAppDocumentId, userId: this.userId, err },
'Failed to load Yjs app document state'
)
throw err
return {
ydoc,
clock,
byteLength,
applyUpdateLatency,
clockUpdatedAt,
}
})
} catch (err) {
logger().error(
{ yjsAppDocumentId: this.yjsAppDocumentId, userId: this.userId, err },
'Failed to load Yjs app document state'
)
throw err
}
}

public async persist(
Expand Down

0 comments on commit 95f0820

Please sign in to comment.