Skip to content

Commit f684429

Browse files
authored
striped buffer (#185)
* ConcurrentLfu * test * fix * tests * fix bounds * notes * cleanup * boundedbuffer * basic tests * more tests * scheduler * cleanup * trim with races * background * don't hang * stable background threads * stable tests * test cancel * test full buffer * striped buffer * fix merge * benched * conc level * tests * concurrencylevel * order
1 parent 27de998 commit f684429

File tree

13 files changed

+406
-146
lines changed

13 files changed

+406
-146
lines changed

BitFaster.Caching.Benchmarks/Lfu/LfuJustGetOrAdd.cs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,21 @@ namespace BitFaster.Caching.Benchmarks
1818
{
1919
[SimpleJob(RuntimeMoniker.Net48)]
2020
[SimpleJob(RuntimeMoniker.Net60)]
21-
[DisassemblyDiagnoser(printSource: true, maxDepth: 5)]
21+
//[DisassemblyDiagnoser(printSource: true, maxDepth: 5)]
2222
[MemoryDiagnoser]
2323
// [HardwareCounters(HardwareCounter.LlcMisses, HardwareCounter.CacheMisses)] // Requires Admin https://adamsitnik.com/Hardware-Counters-Diagnoser/
2424
// [ThreadingDiagnoser] // Requires .NET Core
2525
public class LfuJustGetOrAdd
2626
{
2727
private static readonly ConcurrentDictionary<int, int> dictionary = new ConcurrentDictionary<int, int>(8, 9, EqualityComparer<int>.Default);
2828

29+
const int stripes = 1;
2930
private static readonly BackgroundThreadScheduler background = new BackgroundThreadScheduler();
30-
private static readonly ConcurrentLfu<int, int> concurrentLfu = new ConcurrentLfu<int, int>(9, background);
31+
private static readonly ConcurrentLfu<int, int> concurrentLfu = new ConcurrentLfu<int, int>(stripes, 9, background);
3132

32-
private static readonly ConcurrentLfu<int, int> concurrentLfuFore = new ConcurrentLfu<int, int>(9, new ForegroundScheduler());
33-
private static readonly ConcurrentLfu<int, int> concurrentLfuTp = new ConcurrentLfu<int, int>(9, new ThreadPoolScheduler());
34-
private static readonly ConcurrentLfu<int, int> concurrentLfuNull = new ConcurrentLfu<int, int>(9, new NullScheduler());
33+
private static readonly ConcurrentLfu<int, int> concurrentLfuFore = new ConcurrentLfu<int, int>(stripes, 9, new ForegroundScheduler());
34+
private static readonly ConcurrentLfu<int, int> concurrentLfuTp = new ConcurrentLfu<int, int>(stripes, 9, new ThreadPoolScheduler());
35+
private static readonly ConcurrentLfu<int, int> concurrentLfuNull = new ConcurrentLfu<int, int>(stripes, 9, new NullScheduler());
3536

3637
[GlobalSetup]
3738
public void GlobalSetup()

BitFaster.Caching.Benchmarks/Lru/LruJustGetOrAdd.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class LruJustGetOrAdd
5252
private static readonly ICache<int, int> atomicFastLru = new ConcurrentLruBuilder<int, int>().WithConcurrencyLevel(8).WithCapacity(9).WithAtomicGetOrAdd().Build();
5353

5454
private static readonly BackgroundThreadScheduler background = new BackgroundThreadScheduler();
55-
private static readonly ConcurrentLfu<int, int> concurrentLfu = new ConcurrentLfu<int, int>(9, background);
55+
private static readonly ConcurrentLfu<int, int> concurrentLfu = new ConcurrentLfu<int, int>(1, 9, background);
5656

5757

5858
private static readonly int key = 1;

BitFaster.Caching.HitRateAnalysis/Arc/Analysis.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public Analysis(int cacheSize)
2323
{
2424
concurrentLru = new ConcurrentLru<long, int>(1, cacheSize, EqualityComparer<long>.Default);
2525
classicLru = new ClassicLru<long, int>(1, cacheSize, EqualityComparer<long>.Default);
26-
concurrentLfu = new ConcurrentLfu<long, int>(cacheSize, new ForegroundScheduler());
26+
concurrentLfu = new ConcurrentLfu<long, int>(1, cacheSize, new ForegroundScheduler());
2727
}
2828

2929
public int CacheSize => concurrentLru.Capacity;

BitFaster.Caching.HitRateAnalysis/Glimpse/Analysis.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public Analysis(int cacheSize)
2323
{
2424
this.concurrentLru = new ConcurrentLru<long, int>(1, cacheSize, EqualityComparer<long>.Default);
2525
this.classicLru = new ClassicLru<long, int>(1, cacheSize, EqualityComparer<long>.Default);
26-
concurrentLfu = new ConcurrentLfu<long, int>(cacheSize, new ForegroundScheduler());
26+
concurrentLfu = new ConcurrentLfu<long, int>(1, cacheSize, new ForegroundScheduler());
2727
}
2828

2929
public int CacheSize => this.concurrentLru.Capacity;

BitFaster.Caching.ThroughputAnalysis/Program.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ static void Main(string[] args)
8585
for (int i = 0; i < warmup + runs; i++)
8686
{
8787
var scheduler = new BackgroundThreadScheduler();
88-
results[i] = MeasureThroughput(new ConcurrentLfu<int, int>(capacity, scheduler), tc);
88+
results[i] = MeasureThroughput(new ConcurrentLfu<int, int>(concurrencyLevel: tc, capacity: capacity, scheduler: scheduler), tc);
8989
scheduler.Dispose();
9090
}
9191
avg = AverageLast(results, runs) / 1000000;

BitFaster.Caching.UnitTests/BoundedBufferTests.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Concurrent;
23
using System.Collections.Generic;
34
using System.Linq;
45
using System.Text;
@@ -43,12 +44,12 @@ public void WhenBufferHasOneItemCountIsOne()
4344
[Fact]
4445
public void WhenBufferHas15ItemCountIs15()
4546
{
46-
buffer.TryAdd(0).Should().BeTrue();
47-
buffer.TryTake(out var _).Should().BeTrue();
47+
buffer.TryAdd(0).Should().Be(BufferStatus.Success);
48+
buffer.TryTake(out var _).Should().Be(BufferStatus.Success);
4849

4950
for (int i = 0; i < 15; i++)
5051
{
51-
buffer.TryAdd(0).Should().BeTrue();
52+
buffer.TryAdd(0).Should().Be(BufferStatus.Success);
5253
}
5354

5455
// head = 1, tail = 0 : head > tail
@@ -60,23 +61,23 @@ public void WhenBufferIsFullTryAddIsFalse()
6061
{
6162
for (int i = 0; i < 16; i++)
6263
{
63-
buffer.TryAdd(i).Should().BeTrue();
64+
buffer.TryAdd(i).Should().Be(BufferStatus.Success);
6465
}
6566

66-
buffer.TryAdd(666).Should().BeFalse();
67+
buffer.TryAdd(666).Should().Be(BufferStatus.Full);
6768
}
6869

6970
[Fact]
7071
public void WhenBufferIsEmptyTryTakeIsFalse()
7172
{
72-
buffer.TryTake(out var _).Should().BeFalse();
73+
buffer.TryTake(out var _).Should().Be(BufferStatus.Empty);
7374
}
7475

7576
[Fact]
7677
public void WhenItemAddedItCanBeTaken()
7778
{
78-
buffer.TryAdd(123).Should().BeTrue();
79-
buffer.TryTake(out var item).Should().BeTrue();
79+
buffer.TryAdd(123).Should().Be(BufferStatus.Success);
80+
buffer.TryTake(out var item).Should().Be(BufferStatus.Success);
8081
item.Should().Be(123);
8182
}
8283

@@ -91,7 +92,7 @@ public void WhenItemsAreAddedClearRemovesItems()
9192
buffer.Clear();
9293

9394
buffer.Count.Should().Be(0);
94-
buffer.TryTake(out var _).Should().BeFalse();
95+
buffer.TryTake(out var _).Should().Be(BufferStatus.Empty);
9596
}
9697
}
9798
}

BitFaster.Caching.UnitTests/Lfu/ConcurrentLfuTests.cs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public class ConcurrentLfuTests
2121
{
2222
private readonly ITestOutputHelper output;
2323

24-
private ConcurrentLfu<int, int> cache = new ConcurrentLfu<int, int>(20, new BackgroundThreadScheduler());
24+
private ConcurrentLfu<int, int> cache = new ConcurrentLfu<int, int>(1, 20, new BackgroundThreadScheduler());
2525

2626
public ConcurrentLfuTests(ITestOutputHelper output)
2727
{
@@ -160,7 +160,7 @@ public void WriteUpdateProtectedLruOrder()
160160
public void ReadSchedulesMaintenanceWhenBufferIsFull()
161161
{
162162
var scheduler = new TestScheduler();
163-
cache = new ConcurrentLfu<int, int>(20, scheduler);
163+
cache = new ConcurrentLfu<int, int>(1, 20, scheduler);
164164

165165
cache.GetOrAdd(1, k => k);
166166
scheduler.RunCount.Should().Be(1);
@@ -182,7 +182,7 @@ public void WhenReadBufferIsFullReadsAreDropped()
182182
{
183183
int bufferSize = ConcurrentLfu<int, int>.BufferSize;
184184
var scheduler = new TestScheduler();
185-
cache = new ConcurrentLfu<int, int>(20, scheduler);
185+
cache = new ConcurrentLfu<int, int>(1, 20, scheduler);
186186

187187
cache.GetOrAdd(1, k => k);
188188
scheduler.RunCount.Should().Be(1);
@@ -202,7 +202,7 @@ public void WhenReadBufferIsFullReadsAreDropped()
202202
public void WhenWriteBufferIsFullAddDoesMaintenance()
203203
{
204204
var scheduler = new TestScheduler();
205-
cache = new ConcurrentLfu<int, int>(ConcurrentLfu<int, int>.BufferSize * 2, scheduler);
205+
cache = new ConcurrentLfu<int, int>(1, ConcurrentLfu<int, int>.BufferSize * 2, scheduler);
206206

207207
// add an item, flush write buffer
208208
cache.GetOrAdd(-1, k => k);
@@ -231,7 +231,7 @@ public void WhenWriteBufferIsFullUpdatesAreDropped()
231231
{
232232
int bufferSize = ConcurrentLfu<int, int>.BufferSize;
233233
var scheduler = new TestScheduler();
234-
cache = new ConcurrentLfu<int, int>(20, scheduler);
234+
cache = new ConcurrentLfu<int, int>(1, 20, scheduler);
235235

236236
cache.GetOrAdd(-1, k => k);
237237
scheduler.RunCount.Should().Be(1);
@@ -243,7 +243,7 @@ public void WhenWriteBufferIsFullUpdatesAreDropped()
243243
}
244244

245245
cache.PendingMaintenance();
246-
246+
247247
// TODO: how to verify this? There is no counter for updates.
248248
}
249249

@@ -460,7 +460,7 @@ public void VerifyHitsWithBackgroundScheduler()
460460
public void VerifyHitsWithThreadPoolScheduler()
461461
{
462462
// when running all tests in parallel, sample count drops significantly: set low bar for stability.
463-
cache = new ConcurrentLfu<int, int>(20, new ThreadPoolScheduler());
463+
cache = new ConcurrentLfu<int, int>(1, 20, new ThreadPoolScheduler());
464464
VerifyHits(iterations: 10000000, minSamples: 500000);
465465
}
466466

@@ -470,7 +470,7 @@ public void VerifyHitsWithThreadPoolScheduler()
470470
[Fact]
471471
public void VerifyHitsWithNullScheduler()
472472
{
473-
cache = new ConcurrentLfu<int, int>(20, new NullScheduler());
473+
cache = new ConcurrentLfu<int, int>(1, 20, new NullScheduler());
474474
VerifyHits(iterations: 10000000, minSamples: -1);
475475
}
476476

@@ -481,7 +481,7 @@ public void VerifyHitsWithNullScheduler()
481481
[Fact]
482482
public void VerifyHitsWithForegroundScheduler()
483483
{
484-
cache = new ConcurrentLfu<int, int>(20, new ForegroundScheduler());
484+
cache = new ConcurrentLfu<int, int>(1, 20, new ForegroundScheduler());
485485

486486
// Note: TryAdd will drop 1 read per full read buffer, since TryAdd will return false
487487
// before TryScheduleDrain is called. This serves as sanity check.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Text;
5+
using System.Threading.Tasks;
6+
using FluentAssertions;
7+
using Xunit;
8+
9+
namespace BitFaster.Caching.UnitTests
10+
{
11+
public class StripedBufferTests
12+
{
13+
const int bufferSize = 16;
14+
const int stripeCount = 2;
15+
private readonly StripedBuffer<int> buffer = new StripedBuffer<int>(stripeCount, bufferSize);
16+
17+
[Fact]
18+
public void WhenBufferIsFullTryAddReturnsFull()
19+
{
20+
for (int i = 0; i < stripeCount; i++)
21+
{
22+
for (int j = 0; j < bufferSize; j++)
23+
{
24+
buffer.TryAdd(1).Should().Be(BufferStatus.Success);
25+
}
26+
}
27+
28+
buffer.TryAdd(1).Should().Be(BufferStatus.Full);
29+
}
30+
31+
[Fact]
32+
public void WhenBufferIsEmptyDrainReturnsZero()
33+
{
34+
var array = new int[bufferSize];
35+
buffer.DrainTo(array).Should().Be(0);
36+
}
37+
38+
[Fact]
39+
public void WhenBufferIsFullDrainReturnsItemCount()
40+
{
41+
for (int i = 0; i < stripeCount; i++)
42+
{
43+
for (int j = 0; j < bufferSize; j++)
44+
{
45+
buffer.TryAdd(1);
46+
}
47+
}
48+
49+
var array = new int[bufferSize * stripeCount];
50+
buffer.DrainTo(array).Should().Be(stripeCount * bufferSize);
51+
}
52+
53+
[Fact]
54+
public void WhenDrainBufferIsSmallerThanStripedBufferDrainReturnsBufferItemCount()
55+
{
56+
for (int i = 0; i < stripeCount; i++)
57+
{
58+
for (int j = 0; j < bufferSize; j++)
59+
{
60+
buffer.TryAdd(1);
61+
}
62+
}
63+
64+
var array = new int[bufferSize];
65+
buffer.DrainTo(array).Should().Be(bufferSize);
66+
}
67+
68+
[Fact]
69+
public void WhenBufferIsPartFullDrainReturnsItems()
70+
{
71+
for (int j = 0; j < bufferSize; j++)
72+
{
73+
buffer.TryAdd(1);
74+
}
75+
76+
var array = new int[bufferSize * stripeCount];
77+
buffer.DrainTo(array).Should().Be(bufferSize);
78+
}
79+
80+
[Fact]
81+
public void WhenBufferIsClearedDrainReturns0()
82+
{
83+
for (int i = 0; i < stripeCount; i++)
84+
{
85+
for (int j = 0; j < bufferSize; j++)
86+
{
87+
buffer.TryAdd(1);
88+
}
89+
}
90+
91+
buffer.Clear();
92+
93+
var array = new int[bufferSize * stripeCount];
94+
buffer.DrainTo(array).Should().Be(0);
95+
}
96+
}
97+
}

0 commit comments

Comments
 (0)