4
4
5
5
-- | An incremental merge of multiple runs.
6
6
module Database.LSMTree.Internal.MergingRun (
7
+ -- * Merging run
7
8
MergingRun (.. )
9
+ , NumRuns (.. )
8
10
, new
9
11
, newCompleted
10
12
, duplicateRuns
11
13
, supplyCredits
12
14
, expectCompleted
13
- -- * Useful types
15
+
16
+ -- * Credit tracking
17
+ -- $credittracking
14
18
, Credits (.. )
15
19
, CreditThreshold (.. )
16
- , NumRuns (.. )
17
- -- * Internal state
18
20
, UnspentCreditsVar (.. )
21
+ , SpentCreditsVar (.. )
19
22
, TotalStepsVar (.. )
23
+
24
+ -- * Internal state
20
25
, MergingRunState (.. )
21
- , SpentCreditsVar (.. )
22
26
, MergeKnownCompleted (.. )
23
27
) where
24
28
@@ -53,12 +57,18 @@ data MergingRun m h = MergingRun {
53
57
mergeNumRuns :: ! NumRuns
54
58
-- | Sum of number of entries in the input runs
55
59
, mergeNumEntries :: ! NumEntries
56
- -- | The number of currently /unspent/ credits
60
+
61
+ -- See $credittracking
62
+
63
+ -- | The current number of credits supplied but as yet /unspent/.
57
64
, mergeUnspentCredits :: ! (UnspentCreditsVar (PrimState m ))
58
- -- | The total number of spent credits.
65
+ -- | The current number of credits supplied but already spent. Note that
66
+ -- the total number of credits supplied is this plus the unspent credits.
59
67
, mergeSpentCredits :: ! (SpentCreditsVar (PrimState m ))
60
- -- | The total number of performed merging steps.
68
+ -- | The current number of merging steps actually performed. This is
69
+ -- always at least as big as the total number of credits supplied.
61
70
, mergeStepsPerformed :: ! (TotalStepsVar (PrimState m ))
71
+
62
72
-- | A variable that caches knowledge about whether the merge has been
63
73
-- completed. If 'MergeKnownCompleted', then we are sure the merge has
64
74
-- been completed, otherwise if 'MergeMaybeCompleted' we have to check the
@@ -75,18 +85,6 @@ newtype NumRuns = NumRuns { unNumRuns :: Int }
75
85
deriving stock (Show , Eq )
76
86
deriving newtype NFData
77
87
78
- newtype UnspentCreditsVar s = UnspentCreditsVar {
79
- getUnspentCreditsVar :: PrimVar s Int
80
- }
81
-
82
- newtype SpentCreditsVar s = SpentCreditsVar {
83
- getSpentCreditsVar :: PrimVar s Int
84
- }
85
-
86
- newtype TotalStepsVar s = TotalStepsVar {
87
- getTotalStepsVar :: PrimVar s Int
88
- }
89
-
90
88
data MergingRunState m h =
91
89
CompletedMerge
92
90
! (Ref (Run m h ))
@@ -220,16 +218,45 @@ duplicateRuns (DeRef mr) =
220
218
Credits
221
219
-------------------------------------------------------------------------------}
222
220
223
- {-
224
- Note [Merge Batching]
225
- ~~~~~~~~~~~~~~
226
-
227
- Merge work is done in batches based on accumulated, unspent credits and a
228
- threshold value. Moreover, merging runs can be shared across tables, which
229
- means that multiple threads can contribute to the same merge concurrently.
230
- The design to contribute credits to the same merging run is largely lock-free.
231
- It ensures consistency of the unspent credits and the merge state, while
232
- allowing threads to progress without waiting on other threads.
221
+ {- $credittracking
222
+
223
+ The credits concept we use here comes from amortised analysis of data
224
+ structures (see the Bankers Method from Okasaki), though here we use it for
225
+ tracking the scheduled (i.e. incremental) merge.
226
+
227
+ In the prototype things are relatively simple: we simulate performing merge
228
+ work in batches (based on a threshold) and the credit tracking reflects this
229
+ by tracking unspent credits (and the debt corresponding to the remaining
230
+ merge work to do). The implementation does this too, we accumulate unspent
231
+ credits until they reach a threshold at which point we do a batch of merging
232
+ work. Unlike the prototype, the implementation tracks both credits supplied
233
+ but as yet unspent and also tracks credits supplied and spent.
234
+
235
+ In the prototype, the credits spent equals the merge steps performed. The
236
+ real implementation is more complicated: we distinguish credit supplied from
237
+ merge steps actually performed. When we spend credits on merging work, the
238
+ number of steps we perform is not guaranteed to be the same as the credits
239
+ supplied. For example we may ask to do 100 credits of merging work, but the
240
+ merge code (for perfectly sensible efficiency reasons) will decide to do 102
241
+ units of merging work. The rule is that we may do (slightly) more work than
242
+ the credits supplied but not less.
243
+
244
+ Thus we track three things:
245
+
246
+ * credits unspent ('UnspentCreditsVar')
247
+ * credits spent ('SpentCreditsVar')
248
+ * steps performed ('TotalStepsVar')
249
+
250
+ The credits supplied is the sum of the credits spent and unspent. The credits
251
+ supplied and the steps performed will be close but not exactly the same in
252
+ general. Though the steps performed may never be less than the credits
253
+ supplied.
254
+
255
+ Merging runs can be shared across tables, which means that multiple threads
256
+ can contribute to the same merge concurrently. The design to contribute
257
+ credits to the same merging run is largely lock-free. It ensures consistency
258
+ of the unspent credits and the merge state, while allowing threads to
259
+ progress without waiting on other threads.
233
260
234
261
First, credits are added atomically to a PrimVar that holds the current total
235
262
of unspent credits. If this addition exceeded the threshold, then credits are
@@ -240,6 +267,14 @@ duplicateRuns (DeRef mr) =
240
267
some point, doing the merge work resulted in the merge being done, then the
241
268
merge is converted into a new run.
242
269
270
+ Concurrency:
271
+
272
+ * 'SpentCreditsVar' is only read and modified with the 'mergeState' lock held.
273
+ * 'StepsPerformedVar' is only modified with the 'mergeState' lock held, but
274
+ is read without the lock.
275
+ * 'UnspentCreditsVar' is read and (atomically) modified without the
276
+ 'mergeState' lock held.
277
+
243
278
In the presence of async exceptions, we offer a weaker guarantee regarding
244
279
consistency of the accumulated, unspent credits and the merge state: a merge
245
280
/may/ progress more than the number of credits that were taken. If an async
@@ -249,8 +284,8 @@ duplicateRuns (DeRef mr) =
249
284
because then a merge might not finish in time, which will mess up the shape of
250
285
the levels tree.
251
286
252
- The implementation also tracks the total of spent credits, and the number of
253
- perfomed merge steps. These are the use cases:
287
+ As mentioned above, the implementation also tracks the total of spent credits,
288
+ and the number of merge steps performed . These are the use cases:
254
289
255
290
* The total of spent credits + the total of unspent credits is used by the
256
291
snapshot feature to restore merge work on snapshot load that was lost during
@@ -259,7 +294,7 @@ duplicateRuns (DeRef mr) =
259
294
* For simplicity, merges are allowed to do more steps than requested. However,
260
295
it does mean that once we do more steps next time a batch of work is done,
261
296
then we should account for the surplus of steps performed by the previous
262
- batch. The total of spent credits + the number of performed merge steps is
297
+ batch. The total of spent credits - the number of performed merge steps is
263
298
used to compute this surplus, and adjust for it.
264
299
265
300
TODO: we should reconsider at some later point in time whether this surplus
@@ -275,6 +310,12 @@ duplicateRuns (DeRef mr) =
275
310
There is an important invariant that we maintain, even in the presence of
276
311
async exceptions: @merge steps actually performed >= recorded merge steps
277
312
performed >= recorded spent credits@. TODO: and this makes it correct (?).
313
+
314
+ Plausibly we could rationalise down to just two counters. The idea would be
315
+ to track spent and unspent credits as before. When more merging work is done
316
+ than requested, the surplus can be subtracted from the unspent credits. This
317
+ may result in the unspent credits being negative for a while, which is ok.
318
+
278
319
-}
279
320
280
321
newtype Credits = Credits Int
@@ -285,6 +326,10 @@ newtype Credits = Credits Int
285
326
-- achieving good (concurrent) performance.
286
327
newtype CreditThreshold = CreditThreshold { getCreditThreshold :: Int }
287
328
329
+ newtype UnspentCreditsVar s = UnspentCreditsVar { getUnspentCreditsVar :: PrimVar s Int }
330
+ newtype SpentCreditsVar s = SpentCreditsVar { getSpentCreditsVar :: PrimVar s Int }
331
+ newtype TotalStepsVar s = TotalStepsVar { getTotalStepsVar :: PrimVar s Int }
332
+
288
333
{-# SPECIALISE supplyCredits ::
289
334
Credits
290
335
-> CreditThreshold
0 commit comments