@@ -46,22 +46,37 @@ export interface DatalakeConfig extends StorageConfig {
4646/**
4747 * @public
4848 */
49- export function createDatalakeClient ( opt : DatalakeConfig , token : string ) : DatalakeClient {
50- const endpoint = Number . isInteger ( opt . port ) ? `${ opt . endpoint } :${ opt . port } ` : opt . endpoint
49+ export function createDatalakeClient ( cfg : DatalakeConfig , token : string ) : DatalakeClient {
50+ const endpoint = Number . isInteger ( cfg . port ) ? `${ cfg . endpoint } :${ cfg . port } ` : cfg . endpoint
5151 return new DatalakeClient ( endpoint , token )
5252}
5353
5454export const CONFIG_KIND = 'datalake'
5555
56+ /**
57+ * @public
58+ */
59+ export interface DatalakeClientOptions {
60+ retryCount ?: number
61+ retryInterval ?: number
62+ }
63+
5664/**
5765 * @public
5866 */
5967export class DatalakeService implements StorageAdapter {
6068 private readonly client : DatalakeClient
69+ private readonly retryCount : number
70+ private readonly retryInterval : number
6171
62- constructor ( readonly opt : DatalakeConfig ) {
72+ constructor (
73+ readonly cfg : DatalakeConfig ,
74+ readonly options : DatalakeClientOptions = { }
75+ ) {
6376 const token = generateToken ( systemAccountEmail , { name : '' } , { service : 'datalake' } )
64- this . client = createDatalakeClient ( opt , token )
77+ this . client = createDatalakeClient ( cfg , token )
78+ this . retryCount = options . retryCount ?? 5
79+ this . retryInterval = options . retryInterval ?? 50
6580 }
6681
6782 async initialize ( ctx : MeasureContext , workspaceId : WorkspaceId ) : Promise < void > { }
@@ -86,7 +101,7 @@ export class DatalakeService implements StorageAdapter {
86101 async remove ( ctx : MeasureContext , workspaceId : WorkspaceId , objectNames : string [ ] ) : Promise < void > {
87102 await Promise . all (
88103 objectNames . map ( async ( objectName ) => {
89- await this . client . deleteObject ( ctx , workspaceId , objectName )
104+ await this . retry ( ctx , ( ) => this . client . deleteObject ( ctx , workspaceId , objectName ) )
90105 } )
91106 )
92107 }
@@ -106,7 +121,7 @@ export class DatalakeService implements StorageAdapter {
106121 next : async ( ) => {
107122 try {
108123 while ( hasMore && buffer . length < 50 ) {
109- const res = await this . client . listObjects ( ctx , workspaceId , cursor )
124+ const res = await this . retry ( ctx , ( ) => this . client . listObjects ( ctx , workspaceId , cursor ) )
110125 hasMore = res . cursor !== undefined
111126 cursor = res . cursor
112127
@@ -116,7 +131,7 @@ export class DatalakeService implements StorageAdapter {
116131 _class : core . class . Blob ,
117132 etag : blob . etag ,
118133 size : ( typeof blob . size === 'string' ? parseInt ( blob . size ) : blob . size ) ?? 0 ,
119- provider : this . opt . name ,
134+ provider : this . cfg . name ,
120135 space : core . space . Configuration ,
121136 modifiedBy : core . account . ConfigUser ,
122137 modifiedOn : 0
@@ -134,32 +149,26 @@ export class DatalakeService implements StorageAdapter {
134149
135150 @withContext ( 'stat' )
136151 async stat ( ctx : MeasureContext , workspaceId : WorkspaceId , objectName : string ) : Promise < Blob | undefined > {
137- return await withRetry ( ctx , 5 , async ( ) => {
138- try {
139- const result = await this . client . statObject ( ctx , workspaceId , objectName )
140- if ( result !== undefined ) {
141- return {
142- provider : '' ,
143- _class : core . class . Blob ,
144- _id : objectName as Ref < Blob > ,
145- contentType : result . type ,
146- size : result . size ?? 0 ,
147- etag : result . etag ?? '' ,
148- space : core . space . Configuration ,
149- modifiedBy : core . account . System ,
150- modifiedOn : result . lastModified ,
151- version : null
152- }
153- }
154- } catch ( err ) {
155- ctx . error ( 'failed to stat object' , { error : err , objectName, workspaceId : workspaceId . name } )
152+ const result = await this . retry ( ctx , ( ) => this . client . statObject ( ctx , workspaceId , objectName ) )
153+ if ( result !== undefined ) {
154+ return {
155+ provider : '' ,
156+ _class : core . class . Blob ,
157+ _id : objectName as Ref < Blob > ,
158+ contentType : result . type ,
159+ size : result . size ?? 0 ,
160+ etag : result . etag ?? '' ,
161+ space : core . space . Configuration ,
162+ modifiedBy : core . account . System ,
163+ modifiedOn : result . lastModified ,
164+ version : null
156165 }
157- } )
166+ }
158167 }
159168
160169 @withContext ( 'get' )
161170 async get ( ctx : MeasureContext , workspaceId : WorkspaceId , objectName : string ) : Promise < Readable > {
162- return await this . client . getObject ( ctx , workspaceId , objectName )
171+ return await this . retry ( ctx , ( ) => this . client . getObject ( ctx , workspaceId , objectName ) )
163172 }
164173
165174 @withContext ( 'put' )
@@ -178,7 +187,7 @@ export class DatalakeService implements StorageAdapter {
178187 }
179188
180189 const { etag } = await ctx . with ( 'put' , { } , ( ctx ) =>
181- withRetry ( ctx , 5 , ( ) => this . client . putObject ( ctx , workspaceId , objectName , stream , params ) )
190+ this . retry ( ctx , ( ) => this . client . putObject ( ctx , workspaceId , objectName , stream , params ) )
182191 )
183192
184193 return {
@@ -189,7 +198,7 @@ export class DatalakeService implements StorageAdapter {
189198
190199 @withContext ( 'read' )
191200 async read ( ctx : MeasureContext , workspaceId : WorkspaceId , objectName : string ) : Promise < Buffer [ ] > {
192- const data = await this . client . getObject ( ctx , workspaceId , objectName )
201+ const data = await this . retry ( ctx , ( ) => this . client . getObject ( ctx , workspaceId , objectName ) )
193202 const chunks : Buffer [ ] = [ ]
194203
195204 for await ( const chunk of data ) {
@@ -207,12 +216,16 @@ export class DatalakeService implements StorageAdapter {
207216 offset : number ,
208217 length ?: number
209218 ) : Promise < Readable > {
210- return await this . client . getPartialObject ( ctx , workspaceId , objectName , offset , length )
219+ return await this . retry ( ctx , ( ) => this . client . getPartialObject ( ctx , workspaceId , objectName , offset , length ) )
211220 }
212221
213222 async getUrl ( ctx : MeasureContext , workspaceId : WorkspaceId , objectName : string ) : Promise < string > {
214223 return this . client . getObjectUrl ( ctx , workspaceId , objectName )
215224 }
225+
226+ async retry < T > ( ctx : MeasureContext , op : ( ) => Promise < T > ) : Promise < T > {
227+ return await withRetry ( ctx , this . retryCount , op , this . retryInterval )
228+ }
216229}
217230
218231export function processConfigFromEnv ( storageConfig : StorageConfiguration ) : string | undefined {
@@ -244,7 +257,7 @@ async function withRetry<T> (
244257 } catch ( err : any ) {
245258 error = err
246259 ctx . error ( 'error' , { err } )
247- if ( retries !== 0 ) {
260+ if ( retries !== 0 && delay > 0 ) {
248261 await new Promise ( ( resolve ) => setTimeout ( resolve , delay ) )
249262 }
250263 }
0 commit comments