Skip to content

Commit 99de0ef

Browse files
committed
Refactor: move spent credits var up from MergingRunState to MergingRun
The MergingRun contains the other two credit tracking vars. It makes more sense for all three to live together in once place. And there's no need for one of the vars to disappear when we move from the OngoingMerge state to the CompletedMerge state.
1 parent 6f99a27 commit 99de0ef

File tree

4 files changed

+64
-60
lines changed

4 files changed

+64
-60
lines changed

src/Database/LSMTree/Internal/MergingRun.hs

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ data MergingRun m h = MergingRun {
5555
, mergeNumEntries :: !NumEntries
5656
-- | The number of currently /unspent/ credits
5757
, mergeUnspentCredits :: !(UnspentCreditsVar (PrimState m))
58+
-- | The total number of spent credits.
59+
, mergeSpentCredits :: !(SpentCreditsVar (PrimState m))
5860
-- | The total number of performed merging steps.
5961
, mergeStepsPerformed :: !(TotalStepsVar (PrimState m))
6062
-- | A variable that caches knowledge about whether the merge has been
@@ -77,8 +79,12 @@ newtype UnspentCreditsVar s = UnspentCreditsVar {
7779
getUnspentCreditsVar :: PrimVar s Int
7880
}
7981

82+
newtype SpentCreditsVar s = SpentCreditsVar {
83+
getSpentCreditsVar :: PrimVar s Int
84+
}
85+
8086
newtype TotalStepsVar s = TotalStepsVar {
81-
getTotalStepsVar :: PrimVar s Int
87+
getTotalStepsVar :: PrimVar s Int
8288
}
8389

8490
data MergingRunState m h =
@@ -88,14 +94,8 @@ data MergingRunState m h =
8894
| OngoingMerge
8995
!(V.Vector (Ref (Run m h)))
9096
-- ^ Input runs
91-
!(SpentCreditsVar (PrimState m))
92-
-- ^ The total number of spent credits.
9397
!(Merge m h)
9498

95-
newtype SpentCreditsVar s = SpentCreditsVar {
96-
getSpentCreditsVar :: PrimVar s Int
97-
}
98-
9999
data MergeKnownCompleted = MergeKnownCompleted | MergeMaybeCompleted
100100
deriving stock (Show, Eq, Read)
101101

@@ -139,9 +139,8 @@ new hfs hbio resolve caching alloc mergeType runPaths inputRuns =
139139
<$> Merge.new hfs hbio caching alloc mergeType resolve runPaths runs
140140
let numInputRuns = NumRuns $ V.length runs
141141
let numInputEntries = V.foldMap' Run.size runs
142-
spentCreditsVar <- SpentCreditsVar <$> newPrimVar 0
143142
unsafeNew numInputRuns numInputEntries MergeMaybeCompleted $
144-
OngoingMerge runs spentCreditsVar merge
143+
OngoingMerge runs merge
145144

146145
{-# SPECIALISE newCompleted ::
147146
NumRuns
@@ -176,6 +175,7 @@ unsafeNew ::
176175
-> m (Ref (MergingRun m h))
177176
unsafeNew mergeNumRuns mergeNumEntries knownCompleted state = do
178177
mergeUnspentCredits <- UnspentCreditsVar <$> newPrimVar 0
178+
mergeSpentCredits <- SpentCreditsVar <$> newPrimVar 0
179179
mergeStepsPerformed <- TotalStepsVar <$> newPrimVar 0
180180
case state of
181181
OngoingMerge{} -> assert (knownCompleted == MergeMaybeCompleted) (pure ())
@@ -187,6 +187,7 @@ unsafeNew mergeNumRuns mergeNumEntries knownCompleted state = do
187187
mergeNumRuns
188188
, mergeNumEntries
189189
, mergeUnspentCredits
190+
, mergeSpentCredits
190191
, mergeStepsPerformed
191192
, mergeKnownCompleted
192193
, mergeState
@@ -196,7 +197,7 @@ unsafeNew mergeNumRuns mergeNumEntries knownCompleted state = do
196197
finalise var = withMVar var $ \case
197198
CompletedMerge r ->
198199
releaseRef r
199-
OngoingMerge rs _ m -> do
200+
OngoingMerge rs m -> do
200201
V.forM_ rs releaseRef
201202
Merge.abort m
202203

@@ -211,8 +212,8 @@ duplicateRuns (DeRef mr) =
211212
-- We take the references while holding the MVar to make sure the MergingRun
212213
-- does not get completed concurrently before we are done.
213214
withMVar (mergeState mr) $ \case
214-
CompletedMerge r -> V.singleton <$> dupRef r
215-
OngoingMerge rs _ _ -> withActionRegistry $ \reg ->
215+
CompletedMerge r -> V.singleton <$> dupRef r
216+
OngoingMerge rs _ -> withActionRegistry $ \reg ->
216217
V.mapM (\r -> withRollback reg (dupRef r) releaseRef) rs
217218

218219
{-------------------------------------------------------------------------------
@@ -314,7 +315,8 @@ supplyCredits (Credits c) creditsThresh (DeRef MergingRun {..}) = do
314315
isMergeDone <-
315316
bracketOnError (takeAllUnspentCredits mergeUnspentCredits)
316317
(putBackUnspentCredits mergeUnspentCredits)
317-
(stepMerge mergeState mergeStepsPerformed)
318+
(stepMerge mergeSpentCredits mergeStepsPerformed
319+
mergeState)
318320
when isMergeDone $ completeMerge mergeState mergeKnownCompleted
319321
else if unspentCredits' >= getCreditThreshold creditsThresh then do
320322
-- We can do some merging work without finishing the merge immediately
@@ -329,7 +331,8 @@ supplyCredits (Credits c) creditsThresh (DeRef MergingRun {..}) = do
329331
(tryTakeUnspentCredits mergeUnspentCredits creditsThresh (Credits unspentCredits'))
330332
(mapM_ (putBackUnspentCredits mergeUnspentCredits)) $ \case
331333
Nothing -> pure False
332-
Just c' -> stepMerge mergeState mergeStepsPerformed c'
334+
Just c' -> stepMerge mergeSpentCredits mergeStepsPerformed
335+
mergeState c'
333336

334337
-- If we just finished the merge, then we convert the output of the
335338
-- merge into a new run. i.e., we complete the merge.
@@ -427,20 +430,24 @@ takeAllUnspentCredits (UnspentCreditsVar !unspentCreditsVar) = do
427430
casLoop prev'
428431

429432
{-# SPECIALISE stepMerge ::
430-
StrictMVar IO (MergingRunState IO h)
433+
SpentCreditsVar RealWorld
431434
-> TotalStepsVar RealWorld
435+
-> StrictMVar IO (MergingRunState IO h)
432436
-> Credits
433437
-> IO Bool #-}
434438
stepMerge ::
435439
(MonadMVar m, MonadMask m, MonadSTM m, MonadST m)
436-
=> StrictMVar m (MergingRunState m h)
440+
=> SpentCreditsVar (PrimState m)
437441
-> TotalStepsVar (PrimState m)
442+
-> StrictMVar m (MergingRunState m h)
438443
-> Credits
439444
-> m Bool
440-
stepMerge mergeVar (TotalStepsVar totalStepsVar) (Credits c) =
445+
stepMerge (SpentCreditsVar spentCreditsVar)
446+
(TotalStepsVar totalStepsVar)
447+
mergeVar (Credits c) =
441448
withMVar mergeVar $ \case
442449
CompletedMerge{} -> pure False
443-
(OngoingMerge _rs (SpentCreditsVar spentCreditsVar) m) -> do
450+
(OngoingMerge _rs m) -> do
444451
totalSteps <- readPrimVar totalStepsVar
445452
spentCredits <- readPrimVar spentCreditsVar
446453

@@ -486,7 +493,7 @@ completeMerge ::
486493
completeMerge mergeVar mergeKnownCompletedVar = do
487494
modifyMVarMasked_ mergeVar $ \case
488495
mrs@CompletedMerge{} -> pure $! mrs
489-
(OngoingMerge rs _spentCreditsVar m) -> do
496+
(OngoingMerge rs m) -> do
490497
-- first try to complete the merge before performing other side effects,
491498
-- in case the completion fails
492499
r <- Merge.complete m
@@ -509,7 +516,8 @@ expectCompleted (DeRef MergingRun {..}) = do
509516
when (knownCompleted == MergeMaybeCompleted) $ do
510517
totalSteps <- readPrimVar (getTotalStepsVar mergeStepsPerformed)
511518
let !credits = Credits (unNumEntries mergeNumEntries - totalSteps)
512-
isMergeDone <- stepMerge mergeState mergeStepsPerformed credits
519+
isMergeDone <- stepMerge mergeSpentCredits mergeStepsPerformed
520+
mergeState credits
513521
when isMergeDone $ completeMerge mergeState mergeKnownCompleted
514522
-- TODO: can we think of a check to see if we did not do too much work
515523
-- here?

src/Database/LSMTree/Internal/Snapshot.hs

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,13 @@ instance NFData r => NFData (SnapLevel r) where
138138
rnf (SnapLevel a b) = rnf a `seq` rnf b
139139

140140
data SnapIncomingRun r =
141-
SnapMergingRun !MergePolicyForLevel !NumRuns !NumEntries !UnspentCredits !(SnapMergingRunState r)
141+
SnapMergingRun !MergePolicyForLevel !NumRuns !NumEntries !UnspentCredits !SpentCredits !(SnapMergingRunState r)
142142
| SnapSingleRun !r
143143
deriving stock (Show, Eq, Functor, Foldable, Traversable)
144144

145145
instance NFData r => NFData (SnapIncomingRun r) where
146-
rnf (SnapMergingRun a b c d e) =
147-
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e
146+
rnf (SnapMergingRun a b c d e f) =
147+
rnf a `seq` rnf b `seq` rnf c `seq` rnf d `seq` rnf e `seq` rnf f
148148
rnf (SnapSingleRun a) = rnf a
149149

150150
-- | The total number of unspent credits. This total is used in combination with
@@ -156,12 +156,12 @@ newtype UnspentCredits = UnspentCredits { getUnspentCredits :: Int }
156156

157157
data SnapMergingRunState r =
158158
SnapCompletedMerge !r
159-
| SnapOngoingMerge !(V.Vector r) !SpentCredits !MergeType
159+
| SnapOngoingMerge !(V.Vector r) !MergeType
160160
deriving stock (Show, Eq, Functor, Foldable, Traversable)
161161

162162
instance NFData r => NFData (SnapMergingRunState r) where
163-
rnf (SnapCompletedMerge a) = rnf a
164-
rnf (SnapOngoingMerge a b c) = rnf a `seq` rnf b `seq` rnf c
163+
rnf (SnapCompletedMerge a) = rnf a
164+
rnf (SnapOngoingMerge a b) = rnf a `seq` rnf b
165165

166166
-- | The total number of spent credits. This total is used in combination with
167167
-- 'UnspentCedits' on snapshot load to restore merging work that was lost when
@@ -196,33 +196,28 @@ toSnapIncomingRun ::
196196
=> IncomingRun m h
197197
-> m (SnapIncomingRun (Ref (Run m h)))
198198
toSnapIncomingRun (Single r) = pure (SnapSingleRun r)
199-
-- We need to know how many credits were yet unspent so we can restore merge
200-
-- work on snapshot load. No need to snapshot the contents of totalStepsVar
201-
-- here, since we still start counting from 0 again when loading the snapshot.
202199
toSnapIncomingRun (Merging mergePolicy (DeRef MR.MergingRun {..})) = do
200+
-- We need to know how many credits were spend and yet unspent so we can
201+
-- restore merge work on snapshot load. No need to snapshot the contents
202+
-- of totalStepsVar here, since we still start counting from 0 again when
203+
-- loading the snapshot.
203204
unspentCredits <- readPrimVar (MR.getUnspentCreditsVar mergeUnspentCredits)
204-
smrs <- withMVar mergeState $ \mrs -> toSnapMergingRunState mrs
205+
spentCredits <- readPrimVar (MR.getSpentCreditsVar mergeSpentCredits)
206+
smrs <- toSnapMergingRunState <$> readMVar mergeState
205207
pure $
206208
SnapMergingRun
207209
mergePolicy
208210
mergeNumRuns
209211
mergeNumEntries
210212
(UnspentCredits unspentCredits)
213+
(SpentCredits spentCredits)
211214
smrs
212215

213-
{-# SPECIALISE toSnapMergingRunState ::
214-
MR.MergingRunState IO h
215-
-> IO (SnapMergingRunState (Ref (Run IO h))) #-}
216216
toSnapMergingRunState ::
217-
PrimMonad m
218-
=> MR.MergingRunState m h
219-
-> m (SnapMergingRunState (Ref (Run m h)))
220-
toSnapMergingRunState (MR.CompletedMerge r) = pure (SnapCompletedMerge r)
221-
-- We need to know how many credits were spent already so we can restore merge
222-
-- work on snapshot load.
223-
toSnapMergingRunState (MR.OngoingMerge rs (MR.SpentCreditsVar spentCreditsVar) m) = do
224-
spentCredits <- readPrimVar spentCreditsVar
225-
pure (SnapOngoingMerge rs (SpentCredits spentCredits) (Merge.mergeType m))
217+
MR.MergingRunState m h
218+
-> SnapMergingRunState (Ref (Run m h))
219+
toSnapMergingRunState (MR.CompletedMerge r) = SnapCompletedMerge r
220+
toSnapMergingRunState (MR.OngoingMerge rs m) = SnapOngoingMerge rs (Merge.mergeType m)
226221

227222
{-------------------------------------------------------------------------------
228223
Write Buffer
@@ -454,12 +449,13 @@ fromSnapLevels reg hfs hbio conf@TableConfig{..} uc resolve dir (SnapLevels leve
454449
-> m (IncomingRun m h)
455450
fromSnapIncomingRun (SnapSingleRun run) = do
456451
Single <$> dupRun run
457-
fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits smrs) = do
452+
fromSnapIncomingRun (SnapMergingRun mpfl nr ne unspentCredits
453+
spentCredits smrs) = do
458454
Merging mpfl <$> case smrs of
459455
SnapCompletedMerge run ->
460456
withRollback reg (MR.newCompleted nr ne run) releaseRef
461457

462-
SnapOngoingMerge runs spentCredits mt -> do
458+
SnapOngoingMerge runs mt -> do
463459
rn <- uniqueToRunNumber <$> incrUniqCounter uc
464460
mr <- withRollback reg
465461
(MR.new hfs hbio resolve caching alloc mt (mkPath rn) runs)

src/Database/LSMTree/Internal/Snapshot/Codec.hs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -459,13 +459,14 @@ instance DecodeVersioned RunNumber where
459459
-- SnapIncomingRun
460460

461461
instance Encode (SnapIncomingRun RunNumber) where
462-
encode (SnapMergingRun mpfl nr ne uc smrs) =
463-
encodeListLen 6
462+
encode (SnapMergingRun mpfl nr ne uc sc smrs) =
463+
encodeListLen 7
464464
<> encodeWord 0
465465
<> encode mpfl
466466
<> encode nr
467467
<> encode ne
468468
<> encode uc
469+
<> encode sc
469470
<> encode smrs
470471
encode (SnapSingleRun x) =
471472
encodeListLen 2
@@ -477,9 +478,9 @@ instance DecodeVersioned (SnapIncomingRun RunNumber) where
477478
n <- decodeListLen
478479
tag <- decodeWord
479480
case (n, tag) of
480-
(6, 0) -> SnapMergingRun <$>
481+
(7, 0) -> SnapMergingRun <$>
481482
decodeVersioned v <*> decodeVersioned v <*> decodeVersioned v <*>
482-
decodeVersioned v <*> decodeVersioned v
483+
decodeVersioned v <*> decodeVersioned v <*> decodeVersioned v
483484
(2, 1) -> SnapSingleRun <$> decodeVersioned v
484485
_ -> fail ("[SnapMergingRun] Unexpected combination of list length and tag: " <> show (n, tag))
485486

@@ -520,11 +521,10 @@ instance Encode (SnapMergingRunState RunNumber) where
520521
encodeListLen 2
521522
<> encodeWord 0
522523
<> encode x
523-
encode (SnapOngoingMerge rs tc l) =
524-
encodeListLen 4
524+
encode (SnapOngoingMerge rs l) =
525+
encodeListLen 3
525526
<> encodeWord 1
526527
<> encode rs
527-
<> encode tc
528528
<> encode l
529529

530530
instance DecodeVersioned (SnapMergingRunState RunNumber) where
@@ -533,8 +533,8 @@ instance DecodeVersioned (SnapMergingRunState RunNumber) where
533533
tag <- decodeWord
534534
case (n, tag) of
535535
(2, 0) -> SnapCompletedMerge <$> decodeVersioned v
536-
(4, 1) -> SnapOngoingMerge <$>
537-
decodeVersioned v <*> decodeVersioned v <*> decodeVersioned v
536+
(3, 1) -> SnapOngoingMerge <$> decodeVersioned v
537+
<*> decodeVersioned v
538538
_ -> fail ("[SnapMergingRunState] Unexpected combination of list length and tag: " <> show (n, tag))
539539

540540
-- SpentCredits

test/Test/Database/LSMTree/Internal/Snapshot/Codec.hs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -278,12 +278,12 @@ deriving newtype instance Arbitrary RunNumber
278278
instance Arbitrary (SnapIncomingRun RunNumber) where
279279
arbitrary = oneof [
280280
SnapMergingRun <$> arbitrary <*> arbitrary <*> arbitrary
281-
<*> arbitrary <*> arbitrary
281+
<*> arbitrary <*> arbitrary <*> arbitrary
282282
, SnapSingleRun <$> arbitrary
283283
]
284-
shrink (SnapMergingRun a b c d e) =
285-
[ SnapMergingRun a' b' c' d' e'
286-
| (a', b', c', d', e') <- shrink (a, b, c, d, e) ]
284+
shrink (SnapMergingRun a b c d e f) =
285+
[ SnapMergingRun a' b' c' d' e' f'
286+
| (a', b', c', d', e', f') <- shrink (a, b, c, d, e, f) ]
287287
shrink (SnapSingleRun a) = SnapSingleRun <$> shrink a
288288

289289
deriving newtype instance Arbitrary NumRuns
@@ -297,11 +297,11 @@ deriving newtype instance Arbitrary UnspentCredits
297297
instance Arbitrary (SnapMergingRunState RunNumber) where
298298
arbitrary = oneof [
299299
SnapCompletedMerge <$> arbitrary
300-
, SnapOngoingMerge <$> arbitrary <*> arbitrary <*> arbitrary
300+
, SnapOngoingMerge <$> arbitrary <*> arbitrary
301301
]
302302
shrink (SnapCompletedMerge x) = SnapCompletedMerge <$> shrink x
303-
shrink (SnapOngoingMerge x y z) =
304-
[ SnapOngoingMerge x' y' z' | (x', y', z') <- shrink (x, y, z) ]
303+
shrink (SnapOngoingMerge x y) =
304+
[ SnapOngoingMerge x' y' | (x', y') <- shrink (x, y) ]
305305

306306
deriving newtype instance Arbitrary SpentCredits
307307

0 commit comments

Comments
 (0)