Skip to content

Commit 37db0ba

Browse files
committed
A lot of improvements around compactor.
1 parent ff22bd5 commit 37db0ba

17 files changed

+905
-879
lines changed

BTDB/BTreeLib/BTreeImpl12.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,8 @@ internal IntPtr ValueReplacer(ref ValueReplacerCtx ctx, in Span<CursorItem> stac
304304
{
305305
if (ctx._afterFirst)
306306
{
307-
if (ctx._cancellation.IsCancellationRequested || DateTime.UtcNow > ctx._operationTimeout)
307+
if ((ctx._timeoutTestCounter++ & 15) == 0 && (ctx._cancellation.IsCancellationRequested ||
308+
DateTime.UtcNow > ctx._operationTimeout))
308309
{
309310
ctx._interrupted = true;
310311
if (header.HasLongKeys)

BTDB/BTreeLib/ValueReplacerCtx.cs

+1
Original file line numberDiff line numberDiff line change
@@ -14,4 +14,5 @@ public struct ValueReplacerCtx
1414
internal byte[] _interruptedKey;
1515
internal bool _afterFirst;
1616
internal uint _targetFileId;
17+
internal uint _timeoutTestCounter;
1718
}

BTDB/KVDBLayer/Implementation/BTreeKeyValueDB.cs

+8-8
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public IEnumerable<IKeyValueDBTransaction> Transactions()
6161

6262
readonly IFileCollectionWithFileInfos _fileCollection;
6363
readonly Dictionary<long, object> _subDBs = new();
64-
readonly Func<CancellationToken, bool>? _compactFunc;
64+
readonly Func<CancellationToken, ValueTask<bool>>? _compactFunc;
6565
readonly bool _readOnly;
6666
readonly bool _lenientOpen;
6767
bool _disposed = false;
@@ -413,10 +413,10 @@ internal uint CalculatePreserveKeyIndexKeyFromKeyIndexInfos(ReadOnlySpan<KeyInde
413413
return preserveKeyIndexKey;
414414
}
415415

416-
long IKeyValueDBInternal.ReplaceBTreeValues(CancellationToken cancellation,
416+
async ValueTask<long> IKeyValueDBInternal.ReplaceBTreeValues(CancellationToken cancellation,
417417
RefDictionary<ulong, uint> newPositionMap, uint targetFileId)
418418
{
419-
return ReplaceBTreeValues(cancellation, newPositionMap, targetFileId);
419+
return await ReplaceBTreeValues(cancellation, newPositionMap, targetFileId);
420420
}
421421

422422
long[] IKeyValueDBInternal.CreateIndexFile(CancellationToken cancellation, long preserveKeyIndexGeneration)
@@ -1179,9 +1179,9 @@ public string CalcStats()
11791179
return _allocator.GetStats();
11801180
}
11811181

1182-
public bool Compact(CancellationToken cancellation)
1182+
public async ValueTask<bool> Compact(CancellationToken cancellation)
11831183
{
1184-
return new Compactor(this, cancellation).Run();
1184+
return await new Compactor(this, cancellation).Run();
11851185
}
11861186

11871187
public void CreateKvi(CancellationToken cancellation)
@@ -1759,14 +1759,14 @@ internal MemWriter StartPureValuesFile(out uint fileId)
17591759
return writer;
17601760
}
17611761

1762-
long ReplaceBTreeValues(CancellationToken cancellation, RefDictionary<ulong, uint> newPositionMap,
1762+
async ValueTask<long> ReplaceBTreeValues(CancellationToken cancellation, RefDictionary<ulong, uint> newPositionMap,
17631763
uint targetFileId)
17641764
{
17651765
byte[] restartKey = null;
17661766
while (true)
17671767
{
17681768
var iterationTimeOut = DateTime.UtcNow + TimeSpan.FromMilliseconds(50);
1769-
using (var tr = StartWritingTransaction().GetAwaiter().GetResult())
1769+
using (var tr = await StartWritingTransaction())
17701770
{
17711771
var newRoot = ((BTreeKeyValueDBTransaction)tr).BTreeRoot;
17721772
var cursor = newRoot!.CreateCursor();
@@ -1797,7 +1797,7 @@ long ReplaceBTreeValues(CancellationToken cancellation, RefDictionary<ulong, uin
17971797
}
17981798
}
17991799

1800-
Thread.Sleep(10);
1800+
await Task.Delay(TimeSpan.FromMilliseconds(100), cancellation);
18011801
}
18021802
}
18031803

BTDB/KVDBLayer/Implementation/Compactor.cs

+20-15
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Numerics;
55
using System.Runtime.InteropServices;
66
using System.Threading;
7+
using System.Threading.Tasks;
78
using BTDB.Collections;
89
using BTDB.KVDBLayer.Implementation;
910
using BTDB.StreamLayer;
@@ -98,11 +99,11 @@ void MarkTotallyUselessFilesAsUnknown()
9899
if (toRemoveFileIds.Count > 0) _keyValueDB.MarkAsUnknown(toRemoveFileIds);
99100
}
100101

101-
internal bool Run()
102+
internal async ValueTask<bool> Run()
102103
{
103104
try
104105
{
105-
return RunCore();
106+
return await RunCore();
106107
}
107108
catch (Exception e)
108109
{
@@ -112,7 +113,7 @@ internal bool Run()
112113
return false;
113114
}
114115

115-
bool RunCore()
116+
async ValueTask<bool> RunCore()
116117
{
117118
if (_keyValueDB.FileCollection.GetCount() == 0) return false;
118119
_root = _keyValueDB.ReferenceAndGetOldestRoot();
@@ -153,15 +154,15 @@ bool RunCore()
153154
}
154155

155156
{
156-
using var flushingTransaction = _keyValueDB.StartWritingTransaction().GetAwaiter().GetResult();
157+
using var flushingTransaction = await _keyValueDB.StartWritingTransaction();
157158
flushingTransaction.NextCommitTemporaryCloseTransactionLog();
158159
flushingTransaction.Commit();
159160
}
160161
MarkTotallyUselessFilesAsUnknown();
161162
_keyValueDB.FileCollection.DeleteAllUnknownFiles();
162-
var totalWaste = CalcTotalWaste();
163+
var (totalWaste, maxInOneFile) = CalcTotalWaste();
163164
_keyValueDB.Logger?.CompactionStart(totalWaste);
164-
if (IsWasteSmall(totalWaste))
165+
if (IsWasteSmall(totalWaste, maxInOneFile))
165166
{
166167
if (_keyValueDB.DistanceFromLastKeyIndex(_root) > (ulong)(_keyValueDB.MaxTrLogFileSize / 4))
167168
_keyValueDB.CreateIndexFile(_cancellation, preserveKeyIndexGeneration);
@@ -177,16 +178,16 @@ bool RunCore()
177178
_newPositionMap = new();
178179
var pvlFileId = CompactOnePureValueFileIteration(ref toRemoveFileIds);
179180
btreesCorrectInTransactionId =
180-
_keyValueDB.ReplaceBTreeValues(_cancellation, _newPositionMap, pvlFileId);
181+
await _keyValueDB.ReplaceBTreeValues(_cancellation, _newPositionMap, pvlFileId);
181182
pvlCreated++;
182-
totalWaste = CalcTotalWaste();
183+
(totalWaste, maxInOneFile) = CalcTotalWaste();
183184
if (pvlCreated >= 20)
184185
{
185186
_keyValueDB.Logger?.LogWarning("Compactor didn't removed all waste (" + totalWaste +
186187
"), because it created 20 PVL files already. Remaining waste left to next compaction.");
187188
break;
188189
}
189-
} while (!IsWasteSmall(totalWaste));
190+
} while (!IsWasteSmall(totalWaste, maxInOneFile));
190191

191192
var usedFileGens = _keyValueDB.CreateIndexFile(_cancellation, preserveKeyIndexGeneration);
192193
for (var i = (int)toRemoveFileIds.Count - 1; i >= 0; i--)
@@ -263,9 +264,10 @@ void ForbidDeleteOfFilesUsedByStillRunningOldTransaction()
263264
}
264265
}
265266

266-
bool IsWasteSmall(ulong totalWaste)
267+
bool IsWasteSmall(ulong totalWaste, ulong maxInOneFile)
267268
{
268-
return totalWaste < (ulong)_keyValueDB.MaxTrLogFileSize / 4;
269+
return maxInOneFile < (ulong)_keyValueDB.MaxTrLogFileSize / 4 ||
270+
totalWaste < (ulong)_keyValueDB.MaxTrLogFileSize;
269271
}
270272

271273
void MoveValuesContent(ref MemWriter writerIn, uint wastefulFileId)
@@ -314,16 +316,19 @@ ref MemoryMarshal.GetReference(wasteInMemory[blockId].AsSpan((int)blockStart, (i
314316
writerIn = writer;
315317
}
316318

317-
ulong CalcTotalWaste()
319+
(ulong Total, ulong MaxInOneFile) CalcTotalWaste()
318320
{
319321
var total = 0ul;
322+
var maxInOneFile = 0ul;
320323
foreach (var fileStat in _fileStats.Index)
321324
{
322-
var waste = _fileStats.ValueRef(fileStat).CalcWasteIgnoreUseless();
323-
if (waste > 1024) total += waste;
325+
ref var stat = ref _fileStats.ValueRef(fileStat);
326+
var waste = stat.CalcWasteIgnoreUseless();
327+
if (waste > maxInOneFile) maxInOneFile = waste;
328+
if (waste > stat.CalcUsed() / 8) total += waste;
324329
}
325330

326-
return total;
331+
return (total, maxInOneFile);
327332
}
328333

329334
uint FindMostWastefulFile(long space)

BTDB/KVDBLayer/Implementation/CompactorScheduler.cs

+26-17
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,23 @@
11
using System;
22
using System.Threading;
3+
using System.Threading.Tasks;
34

45
namespace BTDB.KVDBLayer;
56

67
public class CompactorScheduler : IDisposable, ICompactorScheduler
78
{
8-
Func<CancellationToken, bool>[] _coreActions = new Func<CancellationToken, bool>[0];
9+
Func<CancellationToken, ValueTask<bool>>[] _coreActions = [];
910
readonly Timer _timer;
1011
CancellationTokenSource _cancellationSource = new CancellationTokenSource();
11-
readonly object _lock = new object();
12-
bool _running; //compacting in progress
13-
bool _advicedRunning; //was advised again during compacting
14-
bool _firstTime; //in time period before first compact (try save resource during app startup)
12+
readonly object _lock = new();
13+
bool _running; //compacting in progress
14+
bool _advisedRunning; //was advised again during compacting
15+
bool _firstTime; //in time period before first compact (try save resource during app startup)
1516
bool _timerStarted; //timer is planned
1617
bool _disposed;
1718
internal TimeSpan WaitTime { get; set; }
1819

19-
static ICompactorScheduler _instance;
20+
static ICompactorScheduler? _instance;
2021

2122
public static ICompactorScheduler Instance
2223
{
@@ -26,9 +27,10 @@ public static ICompactorScheduler Instance
2627
{
2728
Interlocked.CompareExchange(ref _instance, new CompactorScheduler(), null);
2829
}
29-
return _instance;
30+
31+
return _instance!;
3032
}
31-
set { _instance = value; }
33+
set => _instance = value;
3234
}
3335

3436
internal CompactorScheduler()
@@ -38,7 +40,8 @@ internal CompactorScheduler()
3840
WaitTime = TimeSpan.FromMinutes(10 + new Random().NextDouble() * 5);
3941
}
4042

41-
public Func<CancellationToken, bool> AddCompactAction(Func<CancellationToken, bool> compactAction)
43+
public Func<CancellationToken, ValueTask<bool>> AddCompactAction(
44+
Func<CancellationToken, ValueTask<bool>> compactAction)
4245
{
4346
if (_disposed) throw new ObjectDisposedException(nameof(CompactorScheduler));
4447
while (true)
@@ -49,10 +52,11 @@ public Func<CancellationToken, bool> AddCompactAction(Func<CancellationToken, bo
4952
newA[oldA.Length] = compactAction;
5053
if (Interlocked.CompareExchange(ref _coreActions, newA, oldA) == oldA) break;
5154
}
55+
5256
return compactAction;
5357
}
5458

55-
public void RemoveCompactAction(Func<CancellationToken, bool> compactAction)
59+
public void RemoveCompactAction(Func<CancellationToken, ValueTask<bool>> compactAction)
5660
{
5761
if (_disposed) return;
5862
lock (_lock)
@@ -62,6 +66,7 @@ public void RemoveCompactAction(Func<CancellationToken, bool> compactAction)
6266
{
6367
Monitor.Wait(_lock);
6468
}
69+
6570
_cancellationSource = new CancellationTokenSource();
6671
while (true)
6772
{
@@ -83,9 +88,10 @@ public void AdviceRunning(bool openingDb)
8388
{
8489
if (_running)
8590
{
86-
_advicedRunning = true;
91+
_advisedRunning = true;
8792
return;
8893
}
94+
8995
if (openingDb)
9096
{
9197
if (_timerStarted)
@@ -101,34 +107,36 @@ public void AdviceRunning(bool openingDb)
101107
}
102108
}
103109

104-
void OnTimer(object state)
110+
// ReSharper disable once AsyncVoidMethod
111+
async void OnTimer(object state)
105112
{
106113
lock (_lock)
107114
{
108115
_firstTime = false;
109116
_timerStarted = false;
110117
if (_running) return;
111118
_running = true;
112-
113119
}
120+
114121
try
115122
{
116123
var needed = false;
117124
do
118125
{
119-
_advicedRunning = false;
126+
_advisedRunning = false;
120127
var actions = _coreActions;
121128
for (var i = 0; i < actions.Length; i++)
122129
{
123130
if (_cancellationSource.IsCancellationRequested) break;
124-
needed |= actions[i](_cancellationSource.Token);
131+
needed |= await actions[i](_cancellationSource.Token);
125132
}
126-
} while (_advicedRunning);
133+
} while (_advisedRunning);
134+
127135
lock (_lock)
128136
{
129137
_running = false;
130138
Monitor.PulseAll(_lock);
131-
needed |= _advicedRunning;
139+
needed |= _advisedRunning;
132140
if (needed && !_cancellationSource.IsCancellationRequested)
133141
{
134142
_timer.Change(WaitTime, TimeSpan.FromMilliseconds(-1));
@@ -156,6 +164,7 @@ public void Dispose()
156164
Monitor.Wait(_lock);
157165
}
158166
}
167+
159168
_timer.Dispose();
160169
}
161170
}
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
using System;
22
using System.Threading;
3+
using System.Threading.Tasks;
34

45
namespace BTDB.KVDBLayer;
56

67
public interface ICompactorScheduler
78
{
8-
Func<CancellationToken, bool> AddCompactAction(Func<CancellationToken, bool> compactAction);
9-
void RemoveCompactAction(Func<CancellationToken, bool> compactAction);
9+
Func<CancellationToken, ValueTask<bool>> AddCompactAction(Func<CancellationToken, ValueTask<bool>> compactAction);
10+
void RemoveCompactAction(Func<CancellationToken, ValueTask<bool>> compactAction);
1011
void AdviceRunning(bool openingDb);
1112
}

BTDB/KVDBLayer/Implementation/IKeyValueDBInternal.cs

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Threading;
4+
using System.Threading.Tasks;
45
using BTDB.Collections;
56
using BTDB.StreamLayer;
67

@@ -24,7 +25,7 @@ interface IKeyValueDBInternal : IKeyValueDB
2425
IRootNodeInternal ReferenceAndGetLastCommitted();
2526
void DereferenceRootNodeInternal(IRootNodeInternal root);
2627

27-
long ReplaceBTreeValues(CancellationToken cancellation, RefDictionary<ulong, uint> newPositionMap,
28+
ValueTask<long> ReplaceBTreeValues(CancellationToken cancellation, RefDictionary<ulong, uint> newPositionMap,
2829
uint targetFileId);
2930

3031
long[] CreateIndexFile(CancellationToken cancellation, long preserveKeyIndexGeneration);

BTDB/KVDBLayer/Interface/IKeyValueDB.cs

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public interface IKeyValueDB : IDisposable
2222
(ulong AllocSize, ulong AllocCount, ulong DeallocSize, ulong DeallocCount) GetNativeMemoryStats();
2323

2424
// Returns true if there was big compaction (probably will need another one)
25-
bool Compact(CancellationToken cancellation);
25+
ValueTask<bool> Compact(CancellationToken cancellation);
2626

2727
void CreateKvi(CancellationToken cancellation);
2828

BTDB/KVDBLayer/MemoryImplementation/InMemoryKeyValueDB.cs

+3-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public void Dispose()
3535
_writeWaitingQueue.Dequeue().TrySetCanceled();
3636
}
3737
}
38+
3839
if (_transactions.Count > 0)
3940
throw new BTDBException("Cannot dispose KeyValueDB when transactions still running");
4041

@@ -90,9 +91,9 @@ public string CalcStats()
9091
return (0, 0, 0, 0);
9192
}
9293

93-
public bool Compact(CancellationToken cancellation)
94+
public ValueTask<bool> Compact(CancellationToken cancellation)
9495
{
95-
return false;
96+
return ValueTask.FromResult(false);
9697
}
9798

9899
public void CreateKvi(CancellationToken cancellation)

BTDB/KVDBLayer/ReadOnly/ReadOnlyKeyValueDB.cs

+2-2
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,9 @@ public string CalcStats()
6868
return (_totalLen, 1, 0, 0);
6969
}
7070

71-
public bool Compact(CancellationToken cancellation)
71+
public ValueTask<bool> Compact(CancellationToken cancellation)
7272
{
73-
return false;
73+
return ValueTask.FromResult(false);
7474
}
7575

7676
public void CreateKvi(CancellationToken cancellation)

0 commit comments

Comments
 (0)