@@ -63,6 +63,8 @@ module ScheduledMerges (
63
63
LevelMergeType (.. ),
64
64
MergeCredit (.. ),
65
65
MergeDebt (.. ),
66
+ NominalCredit (.. ),
67
+ NominalDebt (.. ),
66
68
Run ,
67
69
runSize ,
68
70
supplyCreditsMergingTree ,
@@ -123,7 +125,9 @@ data Level s = Level !(IncomingRun s) ![Run]
123
125
-- single run without having to read the 'STRef', and secondly to make it easier
124
126
-- to avoid supplying merge credits. It's not essential, but simplifies things
125
127
-- somewhat.
126
- data IncomingRun s = Merging ! MergePolicy ! (MergingRun LevelMergeType s )
128
+ data IncomingRun s = Merging ! MergePolicy
129
+ ! NominalDebt ! (STRef s NominalCredit )
130
+ ! (MergingRun LevelMergeType s )
127
131
| Single ! Run
128
132
129
133
-- | The merge policy for a LSM level can be either tiering or levelling.
@@ -321,7 +325,7 @@ invariant (LSMContent _ levels ul) = do
321
325
mrs <- case ir of
322
326
Single r ->
323
327
return (CompletedMerge r)
324
- Merging mp (MergingRun mt _ ref) -> do
328
+ Merging mp _ _ (MergingRun mt _ ref) -> do
325
329
assertST $ mp == mergePolicyForLevel ln ls ul
326
330
&& mt == mergeTypeForLevel ls ul
327
331
readSTRef ref
@@ -520,7 +524,9 @@ assertST p = assert p $ return ()
520
524
--
521
525
522
526
-- | Credits for keeping track of merge progress. These credits
523
- -- correspond directly to merge steps performed.
527
+ -- correspond directly to merge steps performed. We also call these \"physical\"
528
+ -- credits (since they correspond to steps done), and as opposed to \"nominal\"
529
+ -- credits in 'NominalCredit' and 'NominalDebt'.
524
530
type Credit = Int
525
531
526
532
-- | Debt for keeping track of the total merge work to do.
@@ -640,19 +646,18 @@ mergeBatchSize = 32
640
646
-- Merging run abstraction
641
647
--
642
648
643
- newMergingRun :: IsMergeType t => Maybe Debt -> t -> [Run ] -> ST s (MergingRun t s )
644
- newMergingRun mdebt mergeType runs = do
649
+ newMergingRun :: IsMergeType t => t -> [Run ] -> ST s (MergingRun t s )
650
+ newMergingRun mergeType runs = do
645
651
assertST $ length runs > 1
646
652
-- in some cases, no merging is required at all
647
653
(debt, state) <- case filter (\ r -> runSize r > 0 ) runs of
648
654
[] -> let (r: _) = runs -- just re-use the empty input
649
655
in return (runSize r, CompletedMerge r)
650
656
[r] -> return (runSize r, CompletedMerge r)
651
657
rs -> do
652
- let ! cost = sum (map runSize rs)
653
- ! debt = case mdebt of
654
- Nothing -> cost
655
- Just d -> assert (d >= cost) d
658
+ -- The (physical) debt is always exactly the cost (merge steps),
659
+ -- which is the sum of run lengths in elements.
660
+ let ! debt = sum (map runSize rs)
656
661
let merged = mergek mergeType rs -- deliberately lazy
657
662
return (debt, OngoingMerge zeroMergeCredit rs merged)
658
663
MergingRun mergeType (MergeDebt debt) <$> newSTRef state
@@ -715,6 +720,15 @@ supplyCreditsMergingRun =
715
720
writeSTRef ref (OngoingMerge mergeCredit' rs r)
716
721
return 0
717
722
723
+ suppliedCreditMergingRun :: MergingRun t s -> ST s Credit
724
+ suppliedCreditMergingRun (MergingRun _ d ref) =
725
+ readSTRef ref >>= \ case
726
+ CompletedMerge {} ->
727
+ let MergeDebt { totalDebt } = d in
728
+ return totalDebt
729
+ OngoingMerge MergeCredit {spentCredits, unspentCredits} _ _ ->
730
+ return (spentCredits + unspentCredits)
731
+
718
732
-------------------------------------------------------------------------------
719
733
-- LSM handle
720
734
--
@@ -757,7 +771,7 @@ update tr (LSMHandle scr lsmr) k op = do
757
771
sc <- readSTRef scr
758
772
content@ (LSMContent wb ls unionLevel) <- readSTRef lsmr
759
773
modifySTRef' scr (+ 1 )
760
- supplyCreditsLevels 1 ls
774
+ supplyCreditsLevels ( NominalCredit 1 ) ls
761
775
invariant content
762
776
let wb' = Map. insertWith combine k op wb
763
777
if bufferSize wb' >= maxBufferSize
@@ -769,7 +783,7 @@ update tr (LSMHandle scr lsmr) k op = do
769
783
else
770
784
writeSTRef lsmr (LSMContent wb' ls unionLevel)
771
785
772
- supplyMergeCredits :: LSM s -> Credit -> ST s ()
786
+ supplyMergeCredits :: LSM s -> NominalCredit -> ST s ()
773
787
supplyMergeCredits (LSMHandle scr lsmr) credits = do
774
788
content@ (LSMContent _ ls _) <- readSTRef lsmr
775
789
modifySTRef' scr (+ 1 )
@@ -972,48 +986,79 @@ lookupsTree k = go
972
986
lookupBatch' = lookupBatch Nothing k
973
987
974
988
-------------------------------------------------------------------------------
975
- -- Updates
989
+ -- Nominal credits
976
990
--
977
991
992
+ newtype NominalCredit = NominalCredit Credit
993
+ deriving stock Show
994
+
995
+ newtype NominalDebt = NominalDebt Credit
996
+ deriving stock Show
997
+
978
998
-- TODO: If there is a UnionLevel, there is no (more expensive) last level merge
979
999
-- in the regular levels, so a little less merging work is required than if
980
1000
-- there was no UnionLevel. It might be a good idea to spend this "saved" work
981
1001
-- on the UnionLevel instead. This makes future lookups cheaper and ensures that
982
1002
-- we can get rid of the UnionLevel at some point, even if a user just keeps
983
1003
-- inserting without calling 'supplyUnionCredits'.
984
- supplyCreditsLevels :: Credit -> Levels s -> ST s ()
985
- supplyCreditsLevels unscaled =
1004
+ supplyCreditsLevels :: NominalCredit -> Levels s -> ST s ()
1005
+ supplyCreditsLevels nominalDeposit =
986
1006
traverse_ $ \ (Level ir _rs) -> do
987
1007
case ir of
988
1008
Single {} -> return ()
989
- Merging mp mr -> do
990
- factor <- creditsForMerge mp mr
991
- let credits = ceiling (fromIntegral unscaled * factor)
992
- when (credits > 0 ) $ do
993
- _ <- supplyCreditsMergingRun credits mr
994
- -- we don't mind leftover credits, each level completes independently
995
- return ()
996
-
997
- -- | The general case (and thus worst case) of how many merge credits we need
998
- -- for a level. This is based on the merging policy at the level.
999
- --
1000
- creditsForMerge :: MergePolicy -> MergingRun t s -> ST s Rational
1009
+ Merging _mp nominalDebt nominalCreditVar
1010
+ mr@ (MergingRun _ physicalDebt _) -> do
1011
+
1012
+ nominalCredit <- depositNominalCredit
1013
+ nominalDebt nominalCreditVar nominalDeposit
1014
+ physicalCredit <- suppliedCreditMergingRun mr
1015
+ let ! physicalCredit' = scaleNominalToPhysicalCredit
1016
+ nominalDebt physicalDebt nominalCredit
1017
+ -- Our target physicalCredit' could actually be less than the
1018
+ -- actual current physicalCredit if other tables were contributing
1019
+ -- credits to the shared merge.
1020
+ ! physicalDeposit = physicalCredit' - physicalCredit
1021
+
1022
+ -- So we may have a zero or negative deposit, which we ignore.
1023
+ when (physicalDeposit > 0 ) $ do
1024
+ leftoverCredits <- supplyCreditsMergingRun physicalDeposit mr
1025
+ -- For merges at ordinary levels (not unions) we expect to hit the
1026
+ -- debt limit exactly and never exceed it.
1027
+ assert (leftoverCredits == 0 ) $ return ()
1028
+
1029
+ scaleNominalToPhysicalCredit ::
1030
+ NominalDebt
1031
+ -> MergeDebt
1032
+ -> NominalCredit
1033
+ -> Credit
1034
+ scaleNominalToPhysicalCredit (NominalDebt nominalDebt)
1035
+ MergeDebt { totalDebt = physicalDebt }
1036
+ (NominalCredit nominalCredit) =
1037
+ floor $ toRational nominalCredit * toRational physicalDebt
1038
+ / toRational nominalDebt
1039
+ -- This specification using Rational as an intermediate representation can
1040
+ -- be implemented efficiently using only integer operations.
1041
+
1042
+ depositNominalCredit ::
1043
+ NominalDebt
1044
+ -> STRef s NominalCredit
1045
+ -> NominalCredit
1046
+ -> ST s NominalCredit
1047
+ depositNominalCredit (NominalDebt nominalDebt)
1048
+ nominalCreditVar
1049
+ (NominalCredit deposit) = do
1050
+ NominalCredit before <- readSTRef nominalCreditVar
1051
+ -- Depositing _could_ leave the credit higher than the debt, because
1052
+ -- sometimes under-full runs mean we don't shuffle runs down the levels
1053
+ -- as quickly as the worst case. So here we do just drop excess nominal
1054
+ -- credits.
1055
+ let ! after = NominalCredit (min (before + deposit) nominalDebt)
1056
+ writeSTRef nominalCreditVar after
1057
+ return after
1001
1058
1002
- -- A levelling merge has 1 input run and one resident run, which is (up to) 4x
1003
- -- bigger than the others.
1004
- -- It needs to be completed before another run comes in.
1005
- creditsForMerge MergePolicyLevelling _ =
1006
- return $ (1 + 4 ) / 1
1007
-
1008
- -- A tiering merge has 5 runs at most (once could be held back to merged again)
1009
- -- and must be completed before the level is full (once 4 more runs come in).
1010
- creditsForMerge MergePolicyTiering (MergingRun _ _ ref) = do
1011
- readSTRef ref >>= \ case
1012
- CompletedMerge _ -> return 0
1013
- OngoingMerge _ rs _ -> do
1014
- let numRuns = length rs
1015
- assertST $ numRuns `elem` [4 , 5 ]
1016
- return $ fromIntegral numRuns / 4
1059
+ -------------------------------------------------------------------------------
1060
+ -- Updates
1061
+ --
1017
1062
1018
1063
increment :: forall s . Tracer (ST s ) Event
1019
1064
-> Counter -> Run -> Levels s -> UnionLevel s -> ST s (Levels s )
@@ -1035,7 +1080,7 @@ increment tr sc run0 ls0 ul = do
1035
1080
go ! ln incoming (Level ir rs : ls) = do
1036
1081
r <- case ir of
1037
1082
Single r -> return r
1038
- Merging mergePolicy mr -> do
1083
+ Merging mergePolicy _ _ mr -> do
1039
1084
r <- expectCompletedMergingRun mr
1040
1085
traceWith tr' MergeCompletedEvent {
1041
1086
mergePolicy,
@@ -1094,26 +1139,38 @@ newLevelMerge :: Tracer (ST s) EventDetail
1094
1139
-> [Run ] -> ST s (IncomingRun s )
1095
1140
newLevelMerge _ _ _ _ [r] = return (Single r)
1096
1141
newLevelMerge tr level mergePolicy mergeType rs = do
1142
+ assertST (length rs `elem` [4 , 5 ])
1143
+ mergingRun@ (MergingRun _ physicalDebt _) <- newMergingRun mergeType rs
1144
+ assertST (totalDebt physicalDebt <= maxPhysicalDebt)
1097
1145
traceWith tr MergeStartedEvent {
1098
1146
mergePolicy,
1099
1147
mergeType,
1100
- mergeDebt = debt ,
1148
+ mergeDebt = totalDebt physicalDebt ,
1101
1149
mergeRunsSize = map runSize rs
1102
1150
}
1103
- assertST ( length rs `elem` [ 4 , 5 ] )
1104
- Merging mergePolicy <$> newMergingRun ( Just debt) mergeType rs
1151
+ nominalCreditVar <- newSTRef ( NominalCredit 0 )
1152
+ pure ( Merging mergePolicy nominalDebt nominalCreditVar mergingRun)
1105
1153
where
1106
- -- How much we need to discharge before the merge can be guaranteed
1107
- -- complete. More precisely, this is the maximum amount a merge at this
1108
- -- level could need. While the real @cost@ of a merge would lead to merges
1109
- -- finishing early, the overestimation @debt@ means that in this prototype
1110
- -- merges will only complete at the last possible moment.
1111
- -- Note that for levelling this is includes the single run in the current
1112
- -- level.
1113
- debt = case mergePolicy of
1114
- MergePolicyLevelling -> 4 * tieringRunSize (level- 1 )
1115
- + levellingRunSize level
1116
- MergePolicyTiering -> length rs * tieringRunSize (level- 1 )
1154
+ -- The nominal debt equals the minimum of credits we will supply before we
1155
+ -- expect the merge to complete. This is the same as the number of updates
1156
+ -- in a run that gets moved to this level.
1157
+ nominalDebt = NominalDebt (tieringRunSize level)
1158
+
1159
+ -- The physical debt is the number of actual merge steps we will need to
1160
+ -- perform before the merge is complete. This is always the sum of the
1161
+ -- lengths of the input runs.
1162
+ --
1163
+ -- As we supply nominal credit, we scale them and supply physical credits,
1164
+ -- such that we pay off the physical and nominal debts at the same time.
1165
+ --
1166
+ -- We can bound the worst case physical debt: this is the maximum amount of
1167
+ -- steps a merge at this level could need. Note that for levelling this is
1168
+ -- includes the single run in the current level.
1169
+ maxPhysicalDebt =
1170
+ case mergePolicy of
1171
+ MergePolicyLevelling -> 4 * tieringRunSize (level- 1 )
1172
+ + levellingRunSize level
1173
+ MergePolicyTiering -> length rs * tieringRunSize (level- 1 )
1117
1174
1118
1175
-- | Only based on run count, not their sizes.
1119
1176
tieringLevelIsFull :: Int -> [Run ] -> [Run ] -> Bool
@@ -1179,8 +1236,8 @@ newPendingLevelMerge irs tree = do
1179
1236
st = PendingTreeMerge (PendingLevelMerge prs tree)
1180
1237
Just . MergingTree <$> newSTRef st
1181
1238
where
1182
- incomingToPreExistingRun (Single r) = PreExistingRun r
1183
- incomingToPreExistingRun (Merging _ mr) = PreExistingMergingRun mr
1239
+ incomingToPreExistingRun (Single r) = PreExistingRun r
1240
+ incomingToPreExistingRun (Merging _ _ _ mr) = PreExistingMergingRun mr
1184
1241
1185
1242
-- | Ensures that the merge contains more than one input.
1186
1243
newPendingUnionMerge :: [MergingTree s ] -> ST s (Maybe (MergingTree s ))
@@ -1293,12 +1350,10 @@ supplyCreditsMergingTreeState credits !state = do
1293
1350
else do
1294
1351
-- all children must be done, create new merge!
1295
1352
(mergeType, rs) <- expectCompletedChildren pm
1296
- -- no reason to claim a larger debt than sum of run sizes
1297
- let debt = Nothing
1298
1353
case rs of
1299
1354
[r] -> return (c', CompletedTreeMerge r)
1300
1355
_ -> do
1301
- state' <- OngoingTreeMerge <$> newMergingRun debt mergeType rs
1356
+ state' <- OngoingTreeMerge <$> newMergingRun mergeType rs
1302
1357
-- use any remaining credits to progress the new merge
1303
1358
supplyCreditsMergingTreeState c' state'
1304
1359
@@ -1375,8 +1430,8 @@ flattenLevel (Level ir rs) = (++ rs) <$> flattenIncomingRun ir
1375
1430
1376
1431
flattenIncomingRun :: IncomingRun s -> ST s [Run ]
1377
1432
flattenIncomingRun = \ case
1378
- Single r -> return [r]
1379
- Merging _ mr -> flattenMergingRun mr
1433
+ Single r -> return [r]
1434
+ Merging _ _ _ mr -> flattenMergingRun mr
1380
1435
1381
1436
flattenMergingRun :: MergingRun t s -> ST s [Run ]
1382
1437
flattenMergingRun (MergingRun _ _ ref) = do
@@ -1438,7 +1493,7 @@ dumpRepresentation (LSMHandle _ lsmr) = do
1438
1493
dumpLevel :: Level s -> ST s LevelRepresentation
1439
1494
dumpLevel (Level (Single r) rs) =
1440
1495
return (Nothing , (r: rs))
1441
- dumpLevel (Level (Merging mp (MergingRun mt _ ref)) rs) = do
1496
+ dumpLevel (Level (Merging mp _nd _nc (MergingRun mt _ ref)) rs) = do
1442
1497
mrs <- readSTRef ref
1443
1498
return (Just (mp, mt, mrs), rs)
1444
1499
0 commit comments