20
20
using BuildXL . Cache . ContentStore . Interfaces . FileSystem ;
21
21
using BuildXL . Cache . ContentStore . Interfaces . Results ;
22
22
using BuildXL . Cache . ContentStore . Interfaces . Sessions ;
23
- using BuildXL . Cache . ContentStore . Interfaces . Stores ;
24
23
using BuildXL . Cache . ContentStore . Interfaces . Utils ;
25
24
using BuildXL . Cache . ContentStore . Sessions ;
26
25
using BuildXL . Cache . ContentStore . Tracing ;
27
26
using BuildXL . Cache . ContentStore . UtilitiesCore ;
27
+ using BuildXL . Utilities . Collections ;
28
+ using BuildXL . Utilities . Tasks ;
28
29
using BuildXL . Utilities . Tracing ;
29
30
using Microsoft . VisualStudio . Services . BlobStore . Common ;
30
31
using Microsoft . VisualStudio . Services . BlobStore . WebApi ;
@@ -58,6 +59,14 @@ private enum Counters
58
59
VstsDownloadUriFetchedInMemory
59
60
}
60
61
62
+ /// <summary>
63
+ /// Reused http client for http downloads
64
+ /// </summary>
65
+ /// <remarks>
66
+ /// <see cref="HttpClient"/> is meant to be static
67
+ /// </remarks>
68
+ private static readonly HttpClient HttpClient = new HttpClient ( ) ;
69
+
61
70
/// <inheritdoc />
62
71
public BackingContentStoreExpiryCache ExpiryCache { get ; } = new BackingContentStoreExpiryCache ( ) ;
63
72
@@ -113,12 +122,19 @@ private enum Counters
113
122
114
123
private const int DefaultReadSizeInBytes = 64 * 1024 ;
115
124
116
- private readonly ParallelHttpDownload . DownloadConfiguration _parallelSegmentDownloadConfig ;
117
-
118
125
/// <summary>
119
- /// Reused http client for http downloads
126
+ /// This is the maximum number of requests that BlobStore is willing to process in parallel.
120
127
/// </summary>
121
- private readonly HttpClient _httpClient = new HttpClient ( ) ;
128
+ private const int BulkPinMaximumHashes = 1000 ;
129
+
130
+ private readonly ParallelHttpDownload . DownloadConfiguration _parallelSegmentDownloadConfig ;
131
+
132
+ private record BackgroundPinRequest (
133
+ ContentHash ContentHash ,
134
+ DateTime EndTime ,
135
+ TaskSourceSlim < PinResult > PinResult ) ;
136
+
137
+ private NagleQueue < BackgroundPinRequest > ? _backgroundPinQueue ;
122
138
123
139
/// <summary>
124
140
/// Initializes a new instance of the <see cref="BlobReadOnlyContentSession"/> class.
@@ -150,6 +166,76 @@ public BlobReadOnlyContentSession(
150
166
_blobCounters = CounterTracker . CreateCounterCollection < Counters > ( counterTracker ) ;
151
167
}
152
168
169
+ /// <inheritdoc />
170
+ protected override Task < BoolResult > StartupCoreAsync ( OperationContext context )
171
+ {
172
+ _backgroundPinQueue = NagleQueue < BackgroundPinRequest > . Create (
173
+ ( batch ) => PerformBackgroundBulkPinAsync ( context , batch ) ,
174
+ Environment . ProcessorCount ,
175
+ TimeSpan . FromMilliseconds ( 50 ) ,
176
+ BulkPinMaximumHashes ) ;
177
+
178
+ return base . StartupCoreAsync ( context ) ;
179
+ }
180
+
181
+ /// <inheritdoc />
182
+ protected async override Task < BoolResult > ShutdownCoreAsync ( OperationContext context )
183
+ {
184
+ BoolResult result = BoolResult . Success ;
185
+ try
186
+ {
187
+ await _backgroundPinQueue ! . DisposeAsync ( ) ;
188
+ }
189
+ catch ( Exception exception )
190
+ {
191
+ result &= new BoolResult ( exception , message : "Failed to dispose the background pin queue" ) ;
192
+ }
193
+
194
+ result &= await base . ShutdownCoreAsync ( context ) ;
195
+
196
+ return result ;
197
+ }
198
+
199
+ private Task PerformBackgroundBulkPinAsync ( OperationContext context , BackgroundPinRequest [ ] batch )
200
+ {
201
+ return context . PerformNonResultOperationAsync ( Tracer , async ( ) =>
202
+ {
203
+ var contentHashes = new ContentHash [ batch . Length ] ;
204
+ var endDateTime = DateTime . MinValue ;
205
+
206
+ var i = 0 ;
207
+ foreach ( var pinRequest in batch )
208
+ {
209
+ contentHashes [ i ++ ] = pinRequest . ContentHash ;
210
+
211
+ if ( pinRequest . EndTime > endDateTime )
212
+ {
213
+ endDateTime = pinRequest . EndTime ;
214
+ }
215
+ }
216
+
217
+ try
218
+ {
219
+ var results = await TaskUtilities . SafeWhenAll ( await PinCoreImplAsync ( context , contentHashes , endDateTime ) ) ;
220
+ foreach ( var indexed in results )
221
+ {
222
+ batch [ indexed . Index ] . PinResult . TrySetResult ( indexed . Item ) ;
223
+ }
224
+ }
225
+ catch ( Exception exception )
226
+ {
227
+ foreach ( var pinRequest in batch )
228
+ {
229
+ pinRequest . PinResult . TrySetException ( exception ) ;
230
+ }
231
+ }
232
+
233
+ return Unit . Void ;
234
+ } ,
235
+ traceOperationStarted : false ,
236
+ extraEndMessage : _ => $ "Count=[{ batch . Length } ]") ;
237
+ }
238
+
153
239
/// <inheritdoc />
154
240
protected override void DisposeCore ( ) => TempDirectory . Dispose ( ) ;
155
241
@@ -159,8 +245,20 @@ protected override async Task<PinResult> PinCoreAsync(
159
245
{
160
246
try
161
247
{
162
- var bulkResults = await PinAsync ( context , new [ ] { contentHash } , context . Token , urgencyHint ) ;
163
- return await bulkResults . SingleAwaitIndexed ( ) ;
248
+ var endTime = DateTime . UtcNow + TimeToKeepContent ;
249
+
250
+ var inMemoryResult = CheckPinInMemory ( contentHash , endTime ) ;
251
+ if ( inMemoryResult . Succeeded )
252
+ {
253
+ return inMemoryResult ;
254
+ }
255
+
256
+ var request = new BackgroundPinRequest (
257
+ ContentHash : contentHash ,
258
+ EndTime : endTime ,
259
+ PinResult : TaskSourceSlim . Create < PinResult > ( ) ) ;
260
+ _backgroundPinQueue ! . Enqueue ( request ) ;
261
+ return await request . PinResult . Task ;
164
262
}
165
263
catch ( Exception e )
166
264
{
@@ -257,10 +355,16 @@ protected override async Task<PlaceFileResult> PlaceFileCoreAsync(
257
355
258
356
/// <inheritdoc />
259
357
[ SuppressMessage ( "Microsoft.Design" , "CA1031:DoNotCatchGeneralExceptionTypes" ) ]
260
- protected override Task < IEnumerable < Task < Indexed < PinResult > > > > PinCoreAsync ( OperationContext context , IReadOnlyList < ContentHash > contentHashes , UrgencyHint urgencyHint , Counter retryCounter , Counter fileCounter )
358
+ protected override async Task < IEnumerable < Task < Indexed < PinResult > > > > PinCoreAsync ( OperationContext context , IReadOnlyList < ContentHash > contentHashes , UrgencyHint urgencyHint , Counter retryCounter , Counter fileCounter )
261
359
{
360
+ if ( contentHashes . Count == 1 )
361
+ {
362
+ var result = await PinCoreAsync ( context , contentHashes [ 0 ] , urgencyHint , retryCounter ) ;
363
+ return new [ ] { Task . FromResult ( new Indexed < PinResult > ( result ! , 0 ) ) } ;
364
+ }
365
+
262
366
var endDateTime = DateTime . UtcNow + TimeToKeepContent ;
263
- return PinCoreImplAsync ( context , contentHashes , endDateTime ) ;
367
+ return await PinCoreImplAsync ( context , contentHashes , endDateTime ) ;
264
368
}
265
369
266
370
private async Task < IEnumerable < Task < Indexed < PinResult > > > > PinCoreImplAsync ( OperationContext context , IReadOnlyList < ContentHash > contentHashes , DateTime keepUntil )
@@ -281,6 +385,9 @@ private async Task<IEnumerable<Task<Indexed<PinResult>>>> PinCoreImplAsync(Opera
281
385
}
282
386
283
387
/// <inheritdoc />
388
+ /// <remarks>
389
+ /// PlaceBulk is both unsupported and unused in this implementation.
390
+ /// </remarks>
284
391
protected override Task < IEnumerable < Task < Indexed < PlaceFileResult > > > > PlaceFileCoreAsync ( OperationContext context , IReadOnlyList < ContentHashWithPath > hashesWithPaths , FileAccessMode accessMode , FileReplacementMode replacementMode , FileRealizationMode realizationMode , UrgencyHint urgencyHint , Counter retryCounter )
285
392
=> throw new NotImplementedException ( ) ;
286
393
@@ -483,7 +590,7 @@ await ParallelHttpDownload.Download(
483
590
_parallelSegmentDownloadConfig ,
484
591
new AppTraceSourceContextAdapter ( context , Tracer . Name , SourceLevels . All ) ,
485
592
VssClientHttpRequestSettings . Default . SessionId ,
486
- _httpClient ) ;
593
+ HttpClient ) ;
487
594
var uri = await GetUriAsync ( context , contentHash ) ;
488
595
if ( uri == null )
489
596
{
0 commit comments