@@ -19,7 +19,9 @@ module Database.LSMTree.Internal.MergingRun (
19
19
, CreditThreshold (.. )
20
20
, UnspentCreditsVar (.. )
21
21
, SpentCreditsVar (.. )
22
- , TotalStepsVar (.. )
22
+ , StepsPerformedVar (.. )
23
+ , readUnspentCredits
24
+ , readSpentCredits
23
25
24
26
-- * Internal state
25
27
, MergingRunState (.. )
@@ -54,28 +56,28 @@ import System.FS.API (HasFS)
54
56
import System.FS.BlockIO.API (HasBlockIO )
55
57
56
58
data MergingRun m h = MergingRun {
57
- mergeNumRuns :: ! NumRuns
59
+ mergeNumRuns :: ! NumRuns
58
60
-- | Sum of number of entries in the input runs
59
- , mergeNumEntries :: ! NumEntries
61
+ , mergeNumEntries :: ! NumEntries
60
62
61
63
-- See $credittracking
62
64
63
65
-- | The current number of credits supplied but as yet /unspent/.
64
- , mergeUnspentCredits :: ! (UnspentCreditsVar (PrimState m ))
66
+ , mergeUnspentCreditsVar :: ! (UnspentCreditsVar (PrimState m ))
65
67
-- | The current number of credits supplied but already spent. Note that
66
68
-- the total number of credits supplied is this plus the unspent credits.
67
- , mergeSpentCredits :: ! (SpentCreditsVar (PrimState m ))
69
+ , mergeSpentCreditsVar :: ! (SpentCreditsVar (PrimState m ))
68
70
-- | The current number of merging steps actually performed. This is
69
71
-- always at least as big as the total number of credits supplied.
70
- , mergeStepsPerformed :: ! (TotalStepsVar (PrimState m ))
72
+ , mergeStepsPerformedVar :: ! (StepsPerformedVar (PrimState m ))
71
73
72
74
-- | A variable that caches knowledge about whether the merge has been
73
75
-- completed. If 'MergeKnownCompleted', then we are sure the merge has
74
76
-- been completed, otherwise if 'MergeMaybeCompleted' we have to check the
75
77
-- 'MergingRunState'.
76
- , mergeKnownCompleted :: ! (MutVar (PrimState m ) MergeKnownCompleted )
77
- , mergeState :: ! (StrictMVar m (MergingRunState m h ))
78
- , mergeRefCounter :: ! (RefCounter m )
78
+ , mergeKnownCompleted :: ! (MutVar (PrimState m ) MergeKnownCompleted )
79
+ , mergeState :: ! (StrictMVar m (MergingRunState m h ))
80
+ , mergeRefCounter :: ! (RefCounter m )
79
81
}
80
82
81
83
instance RefCounted m (MergingRun m h ) where
@@ -172,9 +174,9 @@ unsafeNew ::
172
174
-> MergingRunState m h
173
175
-> m (Ref (MergingRun m h ))
174
176
unsafeNew mergeNumRuns mergeNumEntries knownCompleted state = do
175
- mergeUnspentCredits <- UnspentCreditsVar <$> newPrimVar 0
176
- mergeSpentCredits <- SpentCreditsVar <$> newPrimVar 0
177
- mergeStepsPerformed <- TotalStepsVar <$> newPrimVar 0
177
+ mergeUnspentCreditsVar <- UnspentCreditsVar <$> newPrimVar 0
178
+ mergeSpentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
179
+ mergeStepsPerformedVar <- StepsPerformedVar <$> newPrimVar 0
178
180
case state of
179
181
OngoingMerge {} -> assert (knownCompleted == MergeMaybeCompleted ) (pure () )
180
182
CompletedMerge {} -> pure ()
@@ -184,9 +186,9 @@ unsafeNew mergeNumRuns mergeNumEntries knownCompleted state = do
184
186
MergingRun {
185
187
mergeNumRuns
186
188
, mergeNumEntries
187
- , mergeUnspentCredits
188
- , mergeSpentCredits
189
- , mergeStepsPerformed
189
+ , mergeUnspentCreditsVar
190
+ , mergeSpentCreditsVar
191
+ , mergeStepsPerformedVar
190
192
, mergeKnownCompleted
191
193
, mergeState
192
194
, mergeRefCounter
@@ -245,7 +247,7 @@ duplicateRuns (DeRef mr) =
245
247
246
248
* credits unspent ('UnspentCreditsVar')
247
249
* credits spent ('SpentCreditsVar')
248
- * steps performed ('TotalStepsVar ')
250
+ * steps performed ('StepsPerformedVar ')
249
251
250
252
The credits supplied is the sum of the credits spent and unspent. The credits
251
253
supplied and the steps performed will be close but not exactly the same in
@@ -326,9 +328,26 @@ newtype Credits = Credits Int
326
328
-- achieving good (concurrent) performance.
327
329
newtype CreditThreshold = CreditThreshold { getCreditThreshold :: Int }
328
330
329
- newtype UnspentCreditsVar s = UnspentCreditsVar { getUnspentCreditsVar :: PrimVar s Int }
330
- newtype SpentCreditsVar s = SpentCreditsVar { getSpentCreditsVar :: PrimVar s Int }
331
- newtype TotalStepsVar s = TotalStepsVar { getTotalStepsVar :: PrimVar s Int }
331
+ newtype UnspentCreditsVar s = UnspentCreditsVar (PrimVar s Int )
332
+ newtype SpentCreditsVar s = SpentCreditsVar (PrimVar s Int )
333
+ newtype StepsPerformedVar s = StepsPerformedVar (PrimVar s Int )
334
+
335
+ {-# INLINE readUnspentCredits #-}
336
+ {-# INLINE readSpentCredits #-}
337
+ {-# INLINE readStepsPerformed #-}
338
+ readUnspentCredits :: PrimMonad m => UnspentCreditsVar (PrimState m ) -> m Int
339
+ readSpentCredits :: PrimMonad m => SpentCreditsVar (PrimState m ) -> m Int
340
+ readStepsPerformed :: PrimMonad m => StepsPerformedVar (PrimState m ) -> m Int
341
+ readUnspentCredits (UnspentCreditsVar v) = readPrimVar v
342
+ readSpentCredits (SpentCreditsVar v) = readPrimVar v
343
+ readStepsPerformed (StepsPerformedVar v) = readPrimVar v
344
+
345
+ {-# INLINE writeSpentCredits #-}
346
+ {-# INLINE writeStepsPerformed #-}
347
+ writeSpentCredits :: PrimMonad m => SpentCreditsVar (PrimState m ) -> Int -> m ()
348
+ writeStepsPerformed :: PrimMonad m => StepsPerformedVar (PrimState m ) -> Int -> m ()
349
+ writeSpentCredits (SpentCreditsVar v) x = writePrimVar v x
350
+ writeStepsPerformed (StepsPerformedVar v) x = writePrimVar v x
332
351
333
352
{-# SPECIALISE supplyCredits ::
334
353
Credits
@@ -352,15 +371,15 @@ supplyCredits (Credits c) creditsThresh (DeRef MergingRun {..}) = do
352
371
else do
353
372
-- unspentCredits' is our /estimate/ of what the new total of unspent
354
373
-- credits is.
355
- Credits unspentCredits' <- addUnspentCredits mergeUnspentCredits (Credits c)
356
- totalSteps <- readPrimVar (getTotalStepsVar mergeStepsPerformed)
374
+ Credits unspentCredits' <- addUnspentCredits mergeUnspentCreditsVar (Credits c)
375
+ stepsPerformed <- readStepsPerformed mergeStepsPerformedVar
357
376
358
- if totalSteps + unspentCredits' >= unNumEntries mergeNumEntries then do
377
+ if stepsPerformed + unspentCredits' >= unNumEntries mergeNumEntries then do
359
378
-- We can finish the merge immediately
360
379
isMergeDone <-
361
- bracketOnError (takeAllUnspentCredits mergeUnspentCredits )
362
- (putBackUnspentCredits mergeUnspentCredits )
363
- (stepMerge mergeSpentCredits mergeStepsPerformed
380
+ bracketOnError (takeAllUnspentCredits mergeUnspentCreditsVar )
381
+ (putBackUnspentCredits mergeUnspentCreditsVar )
382
+ (stepMerge mergeSpentCreditsVar mergeStepsPerformedVar
364
383
mergeState)
365
384
when isMergeDone $ completeMerge mergeState mergeKnownCompleted
366
385
else if unspentCredits' >= getCreditThreshold creditsThresh then do
@@ -373,10 +392,10 @@ supplyCredits (Credits c) creditsThresh (DeRef MergingRun {..}) = do
373
392
-- credits as we took, even if the merge has progressed. See Note
374
393
-- [Merge Batching] to see why this is okay.
375
394
bracketOnError
376
- (tryTakeUnspentCredits mergeUnspentCredits creditsThresh (Credits unspentCredits'))
377
- (mapM_ (putBackUnspentCredits mergeUnspentCredits )) $ \ case
395
+ (tryTakeUnspentCredits mergeUnspentCreditsVar creditsThresh (Credits unspentCredits'))
396
+ (mapM_ (putBackUnspentCredits mergeUnspentCreditsVar )) $ \ case
378
397
Nothing -> pure False
379
- Just c' -> stepMerge mergeSpentCredits mergeStepsPerformed
398
+ Just c' -> stepMerge mergeSpentCreditsVar mergeStepsPerformedVar
380
399
mergeState c'
381
400
382
401
-- If we just finished the merge, then we convert the output of the
@@ -419,7 +438,7 @@ addUnspentCredits (UnspentCreditsVar !var) (Credits c) =
419
438
--
420
439
-- Nothing can be returned if the variable has already gone below the threshold,
421
440
-- which may happen if another thread is concurrently doing the same loop on
422
- -- 'mergeUnspentCredits '.
441
+ -- 'mergeUnspentCreditsVar '.
423
442
tryTakeUnspentCredits ::
424
443
PrimMonad m
425
444
=> UnspentCreditsVar (PrimState m )
@@ -463,42 +482,41 @@ takeAllUnspentCredits ::
463
482
PrimMonad m
464
483
=> UnspentCreditsVar (PrimState m )
465
484
-> m Credits
466
- takeAllUnspentCredits (UnspentCreditsVar ! unspentCreditsVar) = do
467
- prev <- readPrimVar unspentCreditsVar
485
+ takeAllUnspentCredits
486
+ unspentCreditsVar@ (UnspentCreditsVar ! var) = do
487
+ prev <- readUnspentCredits unspentCreditsVar
468
488
casLoop prev
469
489
where
470
490
casLoop ! prev = do
471
- prev' <- casInt unspentCreditsVar prev 0
491
+ prev' <- casInt var prev 0
472
492
if prev' == prev then
473
493
pure (Credits prev)
474
494
else
475
495
casLoop prev'
476
496
477
497
{-# SPECIALISE stepMerge ::
478
498
SpentCreditsVar RealWorld
479
- -> TotalStepsVar RealWorld
499
+ -> StepsPerformedVar RealWorld
480
500
-> StrictMVar IO (MergingRunState IO h)
481
501
-> Credits
482
502
-> IO Bool #-}
483
503
stepMerge ::
484
504
(MonadMVar m , MonadMask m , MonadSTM m , MonadST m )
485
505
=> SpentCreditsVar (PrimState m )
486
- -> TotalStepsVar (PrimState m )
506
+ -> StepsPerformedVar (PrimState m )
487
507
-> StrictMVar m (MergingRunState m h )
488
508
-> Credits
489
509
-> m Bool
490
- stepMerge (SpentCreditsVar spentCreditsVar)
491
- (TotalStepsVar totalStepsVar)
492
- mergeVar (Credits c) =
510
+ stepMerge spentCreditsVar stepsPerformedVar mergeVar (Credits c) =
493
511
withMVar mergeVar $ \ case
494
512
CompletedMerge {} -> pure False
495
513
(OngoingMerge _rs m) -> do
496
- totalSteps <- readPrimVar totalStepsVar
497
- spentCredits <- readPrimVar spentCreditsVar
514
+ stepsPerformed <- readStepsPerformed stepsPerformedVar
515
+ spentCredits <- readSpentCredits spentCreditsVar
498
516
499
517
-- If we previously performed too many merge steps, then we perform
500
518
-- fewer now.
501
- let stepsToDo = max 0 (spentCredits + c - totalSteps )
519
+ let stepsToDo = max 0 (spentCredits + c - stepsPerformed )
502
520
-- Merge.steps guarantees that @stepsDone >= stepsToDo@ /unless/ the
503
521
-- merge was just now finished.
504
522
(stepsDone, stepResult) <- Merge. steps m stepsToDo
@@ -509,17 +527,17 @@ stepMerge (SpentCreditsVar spentCreditsVar)
509
527
510
528
-- This should be the only point at which we write to these variables.
511
529
--
512
- -- It is guaranteed that @totalSteps ' >= spentCredits'@ /unless/ the
530
+ -- It is guaranteed that @stepsPerformed ' >= spentCredits'@ /unless/ the
513
531
-- merge was just now finished.
514
- let totalSteps ' = totalSteps + stepsDone
515
- let spentCredits' = spentCredits + c
532
+ let ! stepsPerformed ' = stepsPerformed + stepsDone
533
+ let ! spentCredits' = spentCredits + c
516
534
-- It is guaranteed that
517
- -- @readPrimVar totalStepsVar >= readPrimVar spentCreditsVar@,
535
+ -- @readStepsPerformed stepsPerformedVar >= readSpentCredits spentCreditsVar@,
518
536
-- /unless/ the merge was just now finished.
519
- writePrimVar totalStepsVar $! totalSteps '
520
- writePrimVar spentCreditsVar $! spentCredits'
537
+ writeStepsPerformed stepsPerformedVar stepsPerformed '
538
+ writeSpentCredits spentCreditsVar spentCredits'
521
539
assert (case stepResult of
522
- MergeInProgress -> totalSteps ' >= spentCredits'
540
+ MergeInProgress -> stepsPerformed ' >= spentCredits'
523
541
MergeDone -> True
524
542
) $ pure ()
525
543
@@ -559,9 +577,9 @@ expectCompleted (DeRef MergingRun {..}) = do
559
577
knownCompleted <- readMutVar mergeKnownCompleted
560
578
-- The merge is not guaranteed to be complete, so we do the remaining steps
561
579
when (knownCompleted == MergeMaybeCompleted ) $ do
562
- totalSteps <- readPrimVar (getTotalStepsVar mergeStepsPerformed)
563
- let ! credits = Credits (unNumEntries mergeNumEntries - totalSteps )
564
- isMergeDone <- stepMerge mergeSpentCredits mergeStepsPerformed
580
+ stepsPerformed <- readStepsPerformed mergeStepsPerformedVar
581
+ let ! credits = Credits (unNumEntries mergeNumEntries - stepsPerformed )
582
+ isMergeDone <- stepMerge mergeSpentCreditsVar mergeStepsPerformedVar
565
583
mergeState credits
566
584
when isMergeDone $ completeMerge mergeState mergeKnownCompleted
567
585
-- TODO: can we think of a check to see if we did not do too much work
0 commit comments