1
1
using System ;
2
+ using System . Collections . Generic ;
2
3
using System . IO ;
3
4
using System . Linq ;
4
5
using System . Threading ;
@@ -17,8 +18,8 @@ internal static class DataGenerator
17
18
{
18
19
private static readonly Random Random = new Random ( 1234 ) ;
19
20
20
- public const int LargeBufferSize = 1 * 1024 * 1024 ;
21
- public const int SmallBufferSize = 1 * 1024 ;
21
+ public const int LargeBufferSize = 1024 * 1024 ;
22
+ public const int SmallBufferSize = 1024 ;
22
23
23
24
public static MemoryStream GetSmallStream ( DataFill dataFill ) => GetStream ( SmallBufferSize , dataFill ) ;
24
25
public static MemoryStream GetLargeStream ( DataFill dataFill ) => GetStream ( LargeBufferSize , dataFill ) ;
@@ -168,24 +169,30 @@ public void StreamingCompressionFlushDataFromInternalBuffers()
168
169
[ Test ]
169
170
public void CompressionImprovesWithDictionary ( )
170
171
{
171
- var trainingData = new byte [ 100 ] [ ] ;
172
- for ( int i = 0 ; i < trainingData . Length ; i ++ )
173
- trainingData [ i ] = DataGenerator . GetSmallBuffer ( DataFill . Random ) ;
174
-
175
- var dict = DictBuilder . TrainFromBuffer ( trainingData ) ;
172
+ var dict = TrainDict ( ) ;
176
173
var compressionOptions = new CompressionOptions ( dict ) ;
177
174
178
- var dataStream = DataGenerator . GetSmallStream ( DataFill . Random ) ;
175
+ var dataStream = DataGenerator . GetSmallStream ( DataFill . Sequential ) ;
179
176
180
177
var normalResultStream = new MemoryStream ( ) ;
181
178
using ( var compressionStream = new CompressionStream ( normalResultStream ) )
182
179
dataStream . CopyTo ( compressionStream ) ;
183
180
181
+ dataStream . Seek ( 0 , SeekOrigin . Begin ) ;
182
+
184
183
var dictResultStream = new MemoryStream ( ) ;
185
184
using ( var compressionStream = new CompressionStream ( dictResultStream , compressionOptions ) )
186
185
dataStream . CopyTo ( compressionStream ) ;
187
186
188
187
Assert . Greater ( normalResultStream . Length , dictResultStream . Length ) ;
188
+
189
+ dictResultStream . Seek ( 0 , SeekOrigin . Begin ) ;
190
+
191
+ var resultStream = new MemoryStream ( ) ;
192
+ using ( var decompressionStream = new DecompressionStream ( dictResultStream , new DecompressionOptions ( dict ) ) )
193
+ decompressionStream . CopyTo ( resultStream ) ;
194
+
195
+ Assert . AreEqual ( dataStream . ToArray ( ) , resultStream . ToArray ( ) ) ;
189
196
}
190
197
191
198
[ Test ]
@@ -234,67 +241,97 @@ public void RoundTrip_StreamingToBatch()
234
241
235
242
[ Test , Combinatorial , Parallelizable ( ParallelScope . Children ) ]
236
243
public void RoundTrip_StreamingToStreaming (
244
+ [ Values ( false , true ) ] bool useDict , [ Values ( false , true ) ] bool advanced ,
237
245
[ Values ( 1 , 2 , 7 , 101 , 1024 , 65535 , DataGenerator . LargeBufferSize , DataGenerator . LargeBufferSize + 1 ) ] int zstdBufferSize ,
238
246
[ Values ( 1 , 2 , 7 , 101 , 1024 , 65535 , DataGenerator . LargeBufferSize , DataGenerator . LargeBufferSize + 1 ) ] int copyBufferSize )
239
247
{
248
+ var dict = useDict ? TrainDict ( ) : null ;
240
249
var testStream = DataGenerator . GetLargeStream ( DataFill . Sequential ) ;
241
250
251
+ const int offset = 1 ;
252
+ var buffer = new byte [ copyBufferSize + offset + 1 ] ;
253
+
242
254
var tempStream = new MemoryStream ( ) ;
243
- using ( var compressionStream = new CompressionStream ( tempStream , zstdBufferSize ) )
244
- testStream . CopyTo ( compressionStream , copyBufferSize ) ;
255
+ using ( var compressionStream = new CompressionStream ( tempStream , new CompressionOptions ( dict , advanced ? new Dictionary < ZSTD_cParameter , int > { { ZSTD_cParameter . ZSTD_c_windowLog , 11 } , { ZSTD_cParameter . ZSTD_c_checksumFlag , 1 } , { ZSTD_cParameter . ZSTD_c_nbWorkers , 4 } } : null ) , zstdBufferSize ) )
256
+ {
257
+ int bytesRead ;
258
+ while ( ( bytesRead = testStream . Read ( buffer , offset , copyBufferSize ) ) > 0 )
259
+ compressionStream . Write ( buffer , offset , bytesRead ) ;
260
+ }
245
261
246
262
tempStream . Seek ( 0 , SeekOrigin . Begin ) ;
247
263
248
264
var resultStream = new MemoryStream ( ) ;
249
- using ( var decompressionStream = new DecompressionStream ( tempStream , zstdBufferSize ) )
250
- decompressionStream . CopyTo ( resultStream , copyBufferSize ) ;
265
+ using ( var decompressionStream = new DecompressionStream ( tempStream , new DecompressionOptions ( dict , advanced ? new Dictionary < ZSTD_dParameter , int > { { ZSTD_dParameter . ZSTD_d_windowLogMax , 11 } } : null ) , zstdBufferSize ) )
266
+ {
267
+ int bytesRead ;
268
+ while ( ( bytesRead = decompressionStream . Read ( buffer , offset , copyBufferSize ) ) > 0 )
269
+ resultStream . Write ( buffer , offset , bytesRead ) ;
270
+ }
251
271
252
272
Assert . AreEqual ( testStream . ToArray ( ) , resultStream . ToArray ( ) ) ;
253
273
}
254
274
255
275
[ Test , Combinatorial , Parallelizable ( ParallelScope . Children ) ]
256
276
public async Task RoundTrip_StreamingToStreamingAsync (
277
+ [ Values ( false , true ) ] bool useDict , [ Values ( false , true ) ] bool advanced ,
257
278
[ Values ( 1 , 2 , 7 , 101 , 1024 , 65535 , DataGenerator . LargeBufferSize , DataGenerator . LargeBufferSize + 1 ) ] int zstdBufferSize ,
258
279
[ Values ( 1 , 2 , 7 , 101 , 1024 , 65535 , DataGenerator . LargeBufferSize , DataGenerator . LargeBufferSize + 1 ) ] int copyBufferSize )
259
280
{
281
+ var dict = useDict ? TrainDict ( ) : null ;
260
282
var testStream = DataGenerator . GetLargeStream ( DataFill . Sequential ) ;
261
283
284
+ const int offset = 1 ;
285
+ var buffer = new byte [ copyBufferSize + offset + 1 ] ;
286
+
262
287
var tempStream = new MemoryStream ( ) ;
263
- await using ( var compressionStream = new CompressionStream ( tempStream , zstdBufferSize ) )
264
- await testStream . CopyToAsync ( compressionStream , copyBufferSize ) ;
288
+ await using ( var compressionStream = new CompressionStream ( tempStream , new CompressionOptions ( dict , advanced ? new Dictionary < ZSTD_cParameter , int > { { ZSTD_cParameter . ZSTD_c_windowLog , 11 } , { ZSTD_cParameter . ZSTD_c_checksumFlag , 1 } , { ZSTD_cParameter . ZSTD_c_nbWorkers , 4 } } : null ) , zstdBufferSize ) )
289
+ {
290
+ int bytesRead ;
291
+ while ( ( bytesRead = await testStream . ReadAsync ( buffer , offset , copyBufferSize ) ) > 0 )
292
+ await compressionStream . WriteAsync ( buffer , offset , bytesRead ) ;
293
+ }
265
294
266
295
tempStream . Seek ( 0 , SeekOrigin . Begin ) ;
267
296
268
297
var resultStream = new MemoryStream ( ) ;
269
- await using ( var decompressionStream = new DecompressionStream ( tempStream , zstdBufferSize ) )
270
- await decompressionStream . CopyToAsync ( resultStream , copyBufferSize ) ;
298
+ await using ( var decompressionStream = new DecompressionStream ( tempStream , new DecompressionOptions ( dict , advanced ? new Dictionary < ZSTD_dParameter , int > { { ZSTD_dParameter . ZSTD_d_windowLogMax , 11 } } : null ) , zstdBufferSize ) )
299
+ {
300
+ int bytesRead ;
301
+ while ( ( bytesRead = await decompressionStream . ReadAsync ( buffer , offset , copyBufferSize ) ) > 0 )
302
+ await resultStream . WriteAsync ( buffer , offset , bytesRead ) ;
303
+ }
271
304
272
305
Assert . AreEqual ( testStream . ToArray ( ) , resultStream . ToArray ( ) ) ;
273
306
}
274
307
275
308
[ Test , Explicit ( "stress" ) ]
276
- public void RoundTrip_StreamingToStreaming_Stress ( [ Values ( true , false ) ] bool async )
309
+ public void RoundTrip_StreamingToStreaming_Stress ( [ Values ( true , false ) ] bool useDict , [ Values ( true , false ) ] bool async )
277
310
{
278
311
long i = 0 ;
312
+ var dict = useDict ? TrainDict ( ) : null ;
313
+ var compressionOptions = new CompressionOptions ( dict ) ;
314
+ var decompressionOptions = new DecompressionOptions ( dict ) ;
279
315
Enumerable . Range ( 0 , 10000 )
280
316
. AsParallel ( )
281
317
. WithDegreeOfParallelism ( Environment . ProcessorCount * 4 )
282
- . ForAll ( _ =>
318
+ . ForAll ( n =>
283
319
{
284
- var buffer = new byte [ 13 ] ;
285
- var testStream = DataGenerator . GetSmallStream ( DataFill . Random ) ;
320
+ var testStream = DataGenerator . GetSmallStream ( DataFill . Sequential ) ;
321
+ var cBuffer = new byte [ 1 + ( int ) ( n % ( testStream . Length * 11 ) ) ] ;
322
+ var dBuffer = new byte [ 1 + ( int ) ( n % ( testStream . Length * 13 ) ) ] ;
286
323
287
324
var tempStream = new MemoryStream ( ) ;
288
- using ( var compressionStream = new CompressionStream ( tempStream , 511 ) )
325
+ using ( var compressionStream = new CompressionStream ( tempStream , compressionOptions , 1 + ( int ) ( n % ( testStream . Length * 17 ) ) ) )
289
326
{
290
327
int bytesRead ;
291
- int offset = ( int ) ( Interlocked . Read ( ref i ) % buffer . Length ) ;
292
- while ( ( bytesRead = testStream . Read ( buffer , offset , buffer . Length - offset ) ) > 0 )
328
+ int offset = n % cBuffer . Length ;
329
+ while ( ( bytesRead = testStream . Read ( cBuffer , offset , cBuffer . Length - offset ) ) > 0 )
293
330
{
294
331
if ( async )
295
- compressionStream . WriteAsync ( buffer , offset , bytesRead ) . GetAwaiter( ) . GetResult( ) ;
332
+ compressionStream . WriteAsync ( cBuffer , offset , bytesRead ) . GetAwaiter( ) . GetResult( ) ;
296
333
else
297
- compressionStream. Write( buffer , offset , bytesRead ) ;
334
+ compressionStream. Write( cBuffer , offset , bytesRead ) ;
298
335
if ( Interlocked. Increment( ref i) % 100 == 0 )
299
336
GC . Collect ( GC . MaxGeneration , GCCollectionMode . Forced , true , true ) ;
300
337
}
@@ -303,13 +340,13 @@ public void RoundTrip_StreamingToStreaming_Stress([Values(true, false)] bool asy
303
340
tempStream . Seek ( 0 , SeekOrigin . Begin ) ;
304
341
305
342
var resultStream = new MemoryStream ( ) ;
306
- using ( var decompressionStream = new DecompressionStream ( tempStream , 511 ) )
343
+ using ( var decompressionStream = new DecompressionStream ( tempStream , decompressionOptions , 1 + ( int ) ( n % ( testStream . Length * 19 ) ) ) )
307
344
{
308
345
int bytesRead ;
309
- int offset = ( int ) ( Interlocked . Read ( ref i ) % buffer . Length ) ;
310
- while ( ( bytesRead = async ? decompressionStream . ReadAsync ( buffer , offset , buffer . Length - offset ) . GetAwaiter ( ) . GetResult ( ) : decompressionStream. Read ( buffer , offset , buffer . Length - offset ) ) > 0 )
346
+ int offset = n % dBuffer . Length ;
347
+ while ( ( bytesRead = async ? decompressionStream . ReadAsync ( dBuffer , offset , dBuffer . Length - offset ) . GetAwaiter ( ) . GetResult ( ) : decompressionStream. Read ( dBuffer , offset , dBuffer . Length - offset ) ) > 0 )
311
348
{
312
- resultStream . Write ( buffer , offset , bytesRead ) ;
349
+ resultStream . Write ( dBuffer , offset , bytesRead ) ;
313
350
if ( Interlocked . Increment ( ref i ) % 100 == 0 )
314
351
GC . Collect ( GC . MaxGeneration , GCCollectionMode . Forced , true , true ) ;
315
352
}
@@ -318,5 +355,13 @@ public void RoundTrip_StreamingToStreaming_Stress([Values(true, false)] bool asy
318
355
Assert . AreEqual ( testStream . ToArray ( ) , resultStream . ToArray ( ) ) ;
319
356
} ) ;
320
357
}
358
+
359
+ private static byte [ ] TrainDict ( )
360
+ {
361
+ var trainingData = new byte [ 100 ] [ ] ;
362
+ for ( int i = 0 ; i < trainingData . Length ; i ++ )
363
+ trainingData [ i ] = DataGenerator . GetSmallBuffer ( DataFill . Sequential ) ;
364
+ return DictBuilder . TrainFromBuffer ( trainingData ) ;
365
+ }
321
366
}
322
367
}
0 commit comments