Skip to content

Commit 4e68cd3

Browse files
committed
Better encapsulate MergingRun internals
Provide a couple accessor functions and hide the rest (except the constructor needed for deriving Generic in the tests).
1 parent 1ad6513 commit 4e68cd3

File tree

3 files changed

+30
-21
lines changed

3 files changed

+30
-21
lines changed

src/Database/LSMTree/Internal/MergeSchedule.hs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ scaleCreditsForMerge LevelTiering _ (Credits c) =
818818
-- runs come in).
819819
MR.Credits (c * (1 + 4))
820820

821-
scaleCreditsForMerge LevelLevelling (DeRef mr) (Credits c) =
821+
scaleCreditsForMerge LevelLevelling mr (Credits c) =
822822
-- A levelling merge has 1 input run and one resident run, which is (up
823823
-- to) 4x bigger than the others. It needs to be completed before
824824
-- another run comes in.
@@ -828,7 +828,7 @@ scaleCreditsForMerge LevelLevelling (DeRef mr) (Credits c) =
828828
-- worst-case upper bound by looking at the sizes of the input runs.
829829
-- As as result, merge work would/could be more evenly distributed over
830830
-- time when the resident run is smaller than the worst case.
831-
let NumRuns n = MR.mergeNumRuns mr
831+
let NumRuns n = MR.numRuns mr
832832
-- same as division rounding up: ceiling (c * n / 4)
833833
in MR.Credits ((c * n + 3) `div` 4)
834834

src/Database/LSMTree/Internal/MergingRun.hs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,25 +8,27 @@
88
-- | An incremental merge of multiple runs.
99
module Database.LSMTree.Internal.MergingRun (
1010
-- * Merging run
11-
MergingRun (..)
11+
MergingRun
1212
, NumRuns (..)
1313
, new
1414
, newCompleted
1515
, duplicateRuns
1616
, supplyCredits
1717
, expectCompleted
18+
, snapshot
19+
, numRuns
1820

1921
-- * Credit tracking
2022
-- $credittracking
2123
, Credits (..)
2224
, CreditThreshold (..)
2325
, SuppliedCredits (..)
24-
, atomicReadSuppliedCredits
2526

2627
-- * Concurrency
2728
-- $concurrency
2829

2930
-- * Internal state
31+
, pattern MergingRun
3032
, MergingRunState (..)
3133
, MergeKnownCompleted (..)
3234
, CreditsVar (..)
@@ -220,6 +222,24 @@ duplicateRuns (DeRef mr) =
220222
OngoingMerge rs _ -> withActionRegistry $ \reg ->
221223
V.mapM (\r -> withRollback reg (dupRef r) releaseRef) rs
222224

225+
-- | Take a snapshot of the state of a merging run.
226+
snapshot ::
227+
(PrimMonad m, MonadMVar m)
228+
=> Ref (MergingRun m h)
229+
-> m (MergingRunState m h,
230+
SuppliedCredits,
231+
NumRuns,
232+
NumEntries)
233+
snapshot (DeRef MergingRun {..}) = do
234+
state <- readMVar mergeState
235+
(SpentCredits spent,
236+
UnspentCredits unspent) <- atomicReadCredits mergeCreditsVar
237+
let supplied = SuppliedCredits (spent + unspent)
238+
return (state, supplied, mergeNumRuns, mergeNumEntries)
239+
240+
numRuns :: Ref (MergingRun m h) -> NumRuns
241+
numRuns (DeRef MergingRun {mergeNumRuns}) = mergeNumRuns
242+
223243
{-------------------------------------------------------------------------------
224244
Credits
225245
-------------------------------------------------------------------------------}
@@ -476,19 +496,6 @@ atomicReadCredits ::
476496
atomicReadCredits (CreditsVar v) =
477497
unpackCreditsPair <$> atomicReadInt v
478498

479-
{-# INLINE atomicReadSuppliedCredits #-}
480-
atomicReadSuppliedCredits ::
481-
PrimMonad m
482-
=> CreditsVar (PrimState m)
483-
-> m SuppliedCredits
484-
atomicReadSuppliedCredits (CreditsVar v) = do
485-
cp <- atomicReadInt v
486-
let !supplied =
487-
case cp of
488-
CreditsPair (SpentCredits spent)
489-
(UnspentCredits unspent) -> spent + unspent
490-
return (SuppliedCredits supplied)
491-
492499
{-# INLINE atomicModifyInt #-}
493500
-- | Atomically modify a single mutable integer variable, using a CAS loop.
494501
atomicModifyInt ::

src/Database/LSMTree/Internal/Snapshot.hs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -193,14 +193,16 @@ toSnapIncomingRun ::
193193
=> IncomingRun m h
194194
-> m (SnapIncomingRun (Ref (Run m h)))
195195
toSnapIncomingRun (Single r) = pure (SnapSingleRun r)
196-
toSnapIncomingRun (Merging mergePolicy (DeRef MR.MergingRun {..})) = do
196+
toSnapIncomingRun (Merging mergePolicy mergingRun) = do
197197
-- We need to know how many credits were spend and yet unspent so we can
198198
-- restore merge work on snapshot load. No need to snapshot the contents
199199
-- of totalStepsVar here, since we still start counting from 0 again when
200200
-- loading the snapshot.
201-
MR.SuppliedCredits (MR.Credits suppliedCredits)
202-
<- MR.atomicReadSuppliedCredits mergeCreditsVar
203-
smrs <- toSnapMergingRunState <$> readMVar mergeState
201+
(mergingRunState,
202+
MR.SuppliedCredits (MR.Credits suppliedCredits),
203+
mergeNumRuns,
204+
mergeNumEntries) <- MR.snapshot mergingRun
205+
let smrs = toSnapMergingRunState mergingRunState
204206
pure $
205207
SnapMergingRun
206208
mergePolicy

0 commit comments

Comments
 (0)