Skip to content

Commit dd23389

Browse files
authored
UBERF-9608 Fix bucket selection in datalake (#8210)
Signed-off-by: Alexander Onnikov <[email protected]>
1 parent 07e9c31 commit dd23389

File tree

3 files changed

+15
-13
lines changed

3 files changed

+15
-13
lines changed

services/datalake/pod-datalake/src/datalake/datalake.ts

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import { type S3Bucket } from '../s3'
2525
export class DatalakeImpl implements Datalake {
2626
constructor (
2727
private readonly db: BlobDB,
28-
private readonly buckets: Partial<Record<Location, S3Bucket>>,
28+
private readonly buckets: Array<{ location: Location, bucket: S3Bucket }>,
2929
private readonly options: {
3030
cacheControl: string
3131
}
@@ -44,13 +44,12 @@ export class DatalakeImpl implements Datalake {
4444
}
4545

4646
async head (ctx: MeasureContext, workspace: string, name: string): Promise<BlobHead | null> {
47-
const { bucket } = await this.selectStorage(ctx, workspace)
48-
4947
const blob = await this.db.getBlob(ctx, { workspace, name })
5048
if (blob === null) {
5149
return null
5250
}
5351

52+
const { bucket } = await this.selectStorage(ctx, workspace, blob.location)
5453
const head = await bucket.head(ctx, blob.filename)
5554
if (head == null) {
5655
return null
@@ -72,13 +71,13 @@ export class DatalakeImpl implements Datalake {
7271
name: string,
7372
options: { range?: string }
7473
): Promise<BlobBody | null> {
75-
const { bucket } = await this.selectStorage(ctx, workspace)
76-
7774
const blob = await this.db.getBlob(ctx, { workspace, name })
7875
if (blob === null) {
7976
return null
8077
}
8178

79+
const { bucket } = await this.selectStorage(ctx, workspace, blob.location)
80+
8281
const range = options.range
8382
const object = await bucket.get(ctx, blob.filename, { range })
8483
if (object == null) {
@@ -176,17 +175,17 @@ export class DatalakeImpl implements Datalake {
176175
await this.db.setParent(ctx, { workspace, name }, parent !== null ? { workspace, name: parent } : null)
177176
}
178177

179-
async selectStorage (ctx: MeasureContext, workspace: string): Promise<BlobStorage> {
180-
const location = this.selectLocation(workspace)
181-
const bucket = this.buckets[location]
178+
async selectStorage (ctx: MeasureContext, workspace: string, location?: Location): Promise<BlobStorage> {
179+
location ??= this.selectLocation(workspace)
180+
181+
const bucket = this.buckets.find((b) => b.location === location)?.bucket
182182
if (bucket == null) {
183183
throw new Error(`Unsupported location: ${location}`)
184184
}
185185
return { location, bucket }
186186
}
187187

188188
selectLocation (workspace: string): Location {
189-
// TODO select location based on workspace
190-
return 'weur'
189+
return this.buckets[0].location
191190
}
192191
}

services/datalake/pod-datalake/src/datalake/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import { MeasureContext } from '@hcengineering/core'
1717
import { type Readable } from 'stream'
1818
import { S3Bucket } from '../s3'
1919

20-
export type Location = 'weur' | 'eeur' | 'wnam' | 'enam' | 'apac'
20+
export type Location = 'eu' | 'weur' | 'eeur' | 'wnam' | 'enam' | 'apac'
2121

2222
export type UUID = string & { __uuid: true }
2323

services/datalake/pod-datalake/src/server.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,17 +74,20 @@ const wrapRequest =
7474
}
7575

7676
export function createServer (ctx: MeasureContext, config: Config): { app: Express, close: () => void } {
77-
const buckets: Partial<Record<Location, S3Bucket>> = {}
77+
const buckets: Array<{ location: Location, bucket: S3Bucket }> = []
7878
for (const bucket of config.Buckets) {
7979
const location = bucket.location as Location
8080
if (
81+
location === 'eu' ||
8182
location === 'weur' ||
8283
location === 'eeur' ||
8384
location === 'wnam' ||
8485
location === 'enam' ||
8586
location === 'apac'
8687
) {
87-
buckets[location] = createBucket(createClient(bucket), bucket.bucket)
88+
buckets.push({ location, bucket: createBucket(createClient(bucket), bucket.bucket) })
89+
} else {
90+
ctx.warn('invalid bucket location', { location, bucket })
8891
}
8992
}
9093

0 commit comments

Comments
 (0)