Skip to content

Commit 524c5b4

Browse files
authored
UBERF-7422: Fix blob/stora (#5933)
Signed-off-by: Andrey Sobolev <[email protected]>
1 parent 80d22b5 commit 524c5b4

File tree

2 files changed

+67
-13
lines changed

2 files changed

+67
-13
lines changed

server/core/src/server/aggregator.ts

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import core, {
22
DOMAIN_BLOB,
33
groupByArray,
4+
toIdMap,
45
withContext,
56
type Blob,
67
type BlobLookup,
@@ -76,11 +77,64 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
7677

7778
async initialize (ctx: MeasureContext, workspaceId: WorkspaceId): Promise<void> {}
7879

80+
async doSyncDocs (ctx: MeasureContext, workspaceId: WorkspaceId, docs: ListBlobResult[]): Promise<void> {
81+
const existingBlobs = toIdMap(
82+
await this.dbAdapter.find<Blob>(ctx, workspaceId, DOMAIN_BLOB, { _id: { $in: docs.map((it) => it._id) } })
83+
)
84+
const toUpdate: Blob[] = []
85+
for (const d of docs) {
86+
const blobInfo = existingBlobs.get(d._id)
87+
if (blobInfo === undefined || blobInfo.etag !== d.etag || blobInfo.size !== d.size) {
88+
const stat = await this.stat(ctx, workspaceId, d._id)
89+
if (stat !== undefined) {
90+
toUpdate.push(stat)
91+
}
92+
}
93+
}
94+
if (toUpdate.length > 0) {
95+
await this.dbAdapter.clean(ctx, workspaceId, DOMAIN_BLOB, Array.from(toUpdate.map((it) => it._id)))
96+
await this.dbAdapter.upload(ctx, workspaceId, DOMAIN_BLOB, toUpdate)
97+
}
98+
}
99+
79100
find (ctx: MeasureContext, workspaceId: WorkspaceId): StorageIterator {
101+
const storageIterator = this.makeStorageIterator(ctx, workspaceId)
102+
103+
let buffer: ListBlobResult[] = []
104+
105+
return {
106+
next: async (ctx) => {
107+
const docInfo = await storageIterator.next()
108+
if (docInfo !== undefined) {
109+
buffer.push(docInfo)
110+
}
111+
if (buffer.length > 50) {
112+
await this.doSyncDocs(ctx, workspaceId, buffer)
113+
114+
buffer = []
115+
}
116+
if (docInfo !== undefined) {
117+
return {
118+
hash: docInfo.etag,
119+
id: docInfo._id,
120+
size: docInfo.size
121+
}
122+
}
123+
},
124+
close: async (ctx) => {
125+
if (buffer.length > 0) {
126+
await this.doSyncDocs(ctx, workspaceId, buffer)
127+
}
128+
await storageIterator.close()
129+
}
130+
}
131+
}
132+
133+
private makeStorageIterator (ctx: MeasureContext, workspaceId: WorkspaceId): BlobStorageIterator {
80134
const adapters = Array.from(this.adapters.values())
81135
let iterator: BlobStorageIterator | undefined
82136
return {
83-
next: async (ctx) => {
137+
next: async () => {
84138
while (true) {
85139
if (iterator === undefined && adapters.length > 0) {
86140
iterator = await (adapters.shift() as StorageAdapter).listStream(ctx, workspaceId)
@@ -90,19 +144,16 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
90144
}
91145
const docInfo = await iterator.next()
92146
if (docInfo !== undefined) {
93-
return {
94-
hash: docInfo.etag,
95-
id: docInfo._id,
96-
size: docInfo.size
97-
}
147+
// We need to check if our stored version is fine
148+
return docInfo
98149
} else {
99150
// We need to take next adapter
100151
await iterator.close()
101152
iterator = undefined
102153
}
103154
}
104155
},
105-
close: async (ctx) => {
156+
close: async () => {
106157
if (iterator !== undefined) {
107158
await iterator.close()
108159
}
@@ -250,9 +301,12 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
250301
selectProvider (
251302
forceProvider: string | undefined,
252303
contentType: string
253-
): { adapter: StorageAdapter | undefined, provider: string } {
304+
): { adapter: StorageAdapter, provider: string } {
254305
if (forceProvider !== undefined) {
255-
return { adapter: this.adapters.get(forceProvider), provider: forceProvider }
306+
return {
307+
adapter: this.adapters.get(forceProvider) ?? (this.adapters.get(this.defaultAdapter) as StorageAdapter),
308+
provider: forceProvider
309+
}
256310
}
257311
// try select provider based on content type matching.
258312
for (const [provider, adapter] of this.adapters.entries()) {
@@ -265,7 +319,7 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
265319
}
266320
}
267321

268-
return { adapter: this.adapters.get(this.defaultAdapter), provider: this.defaultAdapter }
322+
return { adapter: this.adapters.get(this.defaultAdapter) as StorageAdapter, provider: this.defaultAdapter }
269323
}
270324

271325
@withContext('aggregator-put', {})
@@ -283,9 +337,6 @@ export class AggregatorStorageAdapter implements StorageAdapter, StorageAdapterE
283337
).shift()
284338

285339
const { provider, adapter } = this.selectProvider(stat?.provider, contentType)
286-
if (adapter === undefined) {
287-
throw new NoSuchKeyError('No such provider found')
288-
}
289340

290341
const result = await adapter.put(ctx, workspaceId, objectName, stream, contentType, size)
291342

server/server-storage/src/blobStorage.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,9 @@ class StorageBlobAdapter implements DbAdapter {
6565
}
6666

6767
find (ctx: MeasureContext, domain: Domain, recheck?: boolean): StorageIterator {
68+
if (recheck === true) {
69+
return (this.client as StorageAdapterEx).find(ctx, this.workspaceId)
70+
}
6871
return this.blobAdapter.find(ctx, domain, recheck)
6972
}
7073

0 commit comments

Comments
 (0)