@@ -63,6 +63,8 @@ module ScheduledMerges (
63
63
LevelMergeType (.. ),
64
64
MergeCredit (.. ),
65
65
MergeDebt (.. ),
66
+ NominalCredit (.. ),
67
+ NominalDebt (.. ),
66
68
Run ,
67
69
runSize ,
68
70
supplyCreditsMergingTree ,
@@ -123,7 +125,9 @@ data Level s = Level !(IncomingRun s) ![Run]
123
125
-- single run without having to read the 'STRef', and secondly to make it easier
124
126
-- to avoid supplying merge credits. It's not essential, but simplifies things
125
127
-- somewhat.
126
- data IncomingRun s = Merging ! MergePolicy ! (MergingRun LevelMergeType s )
128
+ data IncomingRun s = Merging ! MergePolicy
129
+ ! NominalDebt ! (STRef s NominalCredit )
130
+ ! (MergingRun LevelMergeType s )
127
131
| Single ! Run
128
132
129
133
-- | The merge policy for a LSM level can be either tiering or levelling.
@@ -321,7 +325,7 @@ invariant (LSMContent _ levels ul) = do
321
325
mrs <- case ir of
322
326
Single r ->
323
327
return (CompletedMerge r)
324
- Merging mp (MergingRun mt _ ref) -> do
328
+ Merging mp _ _ (MergingRun mt _ ref) -> do
325
329
assertST $ mp == mergePolicyForLevel ln ls ul
326
330
&& mt == mergeTypeForLevel ls ul
327
331
readSTRef ref
@@ -520,7 +524,9 @@ assertST p = assert p $ return ()
520
524
--
521
525
522
526
-- | Credits for keeping track of merge progress. These credits
523
- -- correspond directly to merge steps performed.
527
+ -- correspond directly to merge steps performed. We also call these \"physical\"
528
+ -- credits (since they correspond to steps done), and as opposed to \"nominal\"
529
+ -- credits in 'NominalCredit' and 'NominalDebt'.
524
530
type Credit = Int
525
531
526
532
-- | Debt for keeping track of the total merge work to do.
@@ -586,6 +592,8 @@ paydownMergeDebt :: MergeDebt -> MergeCredit -> Credit -> MergeDebtPaydown
586
592
paydownMergeDebt MergeDebt {totalDebt}
587
593
MergeCredit {spentCredits, unspentCredits}
588
594
c
595
+ | assert (c >= 0 ) False = undefined
596
+
589
597
| let ! suppliedCredits' = suppliedCredits + c
590
598
, suppliedCredits' >= totalDebt
591
599
, let ! leftover = suppliedCredits' - totalDebt
@@ -633,19 +641,18 @@ mergeBatchSize = 32
633
641
-- Merging run abstraction
634
642
--
635
643
636
- newMergingRun :: IsMergeType t => Maybe Debt -> t -> [Run ] -> ST s (MergingRun t s )
637
- newMergingRun mdebt mergeType runs = do
644
+ newMergingRun :: IsMergeType t => t -> [Run ] -> ST s (MergingRun t s )
645
+ newMergingRun mergeType runs = do
638
646
assertST $ length runs > 1
639
647
-- in some cases, no merging is required at all
640
648
(debt, state) <- case filter (\ r -> runSize r > 0 ) runs of
641
649
[] -> let (r: _) = runs -- just re-use the empty input
642
650
in return (runSize r, CompletedMerge r)
643
651
[r] -> return (runSize r, CompletedMerge r)
644
652
rs -> do
645
- let ! cost = sum (map runSize rs)
646
- ! debt = case mdebt of
647
- Nothing -> cost
648
- Just d -> assert (d >= cost) d
653
+ -- The (physical) debt is always exactly the cost (merge steps),
654
+ -- which is the sum of run lengths in elements.
655
+ let ! debt = sum (map runSize rs)
649
656
let merged = mergek mergeType rs -- deliberately lazy
650
657
return (debt, OngoingMerge zeroMergeCredit rs merged)
651
658
MergingRun mergeType (MergeDebt debt) <$> newSTRef state
@@ -708,6 +715,15 @@ supplyCreditsMergingRun =
708
715
writeSTRef ref (OngoingMerge mergeCredit' rs r)
709
716
return 0
710
717
718
+ suppliedCreditMergingRun :: MergingRun t s -> ST s Credit
719
+ suppliedCreditMergingRun (MergingRun _ d ref) =
720
+ readSTRef ref >>= \ case
721
+ CompletedMerge {} ->
722
+ let MergeDebt { totalDebt } = d in
723
+ return totalDebt
724
+ OngoingMerge MergeCredit {spentCredits, unspentCredits} _ _ ->
725
+ return (spentCredits + unspentCredits)
726
+
711
727
-------------------------------------------------------------------------------
712
728
-- LSM handle
713
729
--
@@ -750,7 +766,7 @@ update tr (LSMHandle scr lsmr) k op = do
750
766
sc <- readSTRef scr
751
767
content@ (LSMContent wb ls unionLevel) <- readSTRef lsmr
752
768
modifySTRef' scr (+ 1 )
753
- supplyCreditsLevels 1 ls
769
+ supplyCreditsLevels ( NominalCredit 1 ) ls
754
770
invariant content
755
771
let wb' = Map. insertWith combine k op wb
756
772
if bufferSize wb' >= maxBufferSize
@@ -762,7 +778,7 @@ update tr (LSMHandle scr lsmr) k op = do
762
778
else
763
779
writeSTRef lsmr (LSMContent wb' ls unionLevel)
764
780
765
- supplyMergeCredits :: LSM s -> Credit -> ST s ()
781
+ supplyMergeCredits :: LSM s -> NominalCredit -> ST s ()
766
782
supplyMergeCredits (LSMHandle scr lsmr) credits = do
767
783
content@ (LSMContent _ ls _) <- readSTRef lsmr
768
784
modifySTRef' scr (+ 1 )
@@ -965,48 +981,79 @@ lookupsTree k = go
965
981
lookupBatch' = lookupBatch Nothing k
966
982
967
983
-------------------------------------------------------------------------------
968
- -- Updates
984
+ -- Nominal credits
969
985
--
970
986
987
+ newtype NominalCredit = NominalCredit Credit
988
+ deriving stock Show
989
+
990
+ newtype NominalDebt = NominalDebt Credit
991
+ deriving stock Show
992
+
971
993
-- TODO: If there is a UnionLevel, there is no (more expensive) last level merge
972
994
-- in the regular levels, so a little less merging work is required than if
973
995
-- there was no UnionLevel. It might be a good idea to spend this "saved" work
974
996
-- on the UnionLevel instead. This makes future lookups cheaper and ensures that
975
997
-- we can get rid of the UnionLevel at some point, even if a user just keeps
976
998
-- inserting without calling 'supplyUnionCredits'.
977
- supplyCreditsLevels :: Credit -> Levels s -> ST s ()
978
- supplyCreditsLevels unscaled =
999
+ supplyCreditsLevels :: NominalCredit -> Levels s -> ST s ()
1000
+ supplyCreditsLevels nominalDeposit =
979
1001
traverse_ $ \ (Level ir _rs) -> do
980
1002
case ir of
981
1003
Single {} -> return ()
982
- Merging mp mr -> do
983
- factor <- creditsForMerge mp mr
984
- let credits = ceiling (fromIntegral unscaled * factor)
985
- when (credits > 0 ) $ do
986
- _ <- supplyCreditsMergingRun credits mr
987
- -- we don't mind leftover credits, each level completes independently
988
- return ()
989
-
990
- -- | The general case (and thus worst case) of how many merge credits we need
991
- -- for a level. This is based on the merging policy at the level.
992
- --
993
- creditsForMerge :: MergePolicy -> MergingRun t s -> ST s Rational
994
-
995
- -- A levelling merge has 1 input run and one resident run, which is (up to) 4x
996
- -- bigger than the others.
997
- -- It needs to be completed before another run comes in.
998
- creditsForMerge MergePolicyLevelling _ =
999
- return $ (1 + 4 ) / 1
1004
+ Merging _mp nominalDebt nominalCreditVar
1005
+ mr@ (MergingRun _ physicalDebt _) -> do
1006
+
1007
+ nominalCredit <- depositNominalCredit
1008
+ nominalDebt nominalCreditVar nominalDeposit
1009
+ physicalCredit <- suppliedCreditMergingRun mr
1010
+ let ! physicalCredit' = scaleNominalToPhysicalCredit
1011
+ nominalDebt physicalDebt nominalCredit
1012
+ -- Our target physicalCredit' could actually be less than the
1013
+ -- actual current physicalCredit if other tables were contributing
1014
+ -- credits to the shared merge.
1015
+ ! physicalDeposit = physicalCredit' - physicalCredit
1016
+
1017
+ -- So we may have a zero or negative deposit, which we ignore.
1018
+ when (physicalDeposit > 0 ) $ do
1019
+ leftoverCredits <- supplyCreditsMergingRun physicalDeposit mr
1020
+ -- For merges at ordinary levels (not unions) we expect to hit the
1021
+ -- debt limit exactly and never exceed it.
1022
+ assert (leftoverCredits == 0 ) $ return ()
1023
+
1024
+ scaleNominalToPhysicalCredit ::
1025
+ NominalDebt
1026
+ -> MergeDebt
1027
+ -> NominalCredit
1028
+ -> Credit
1029
+ scaleNominalToPhysicalCredit (NominalDebt nominalDebt)
1030
+ MergeDebt { totalDebt = physicalDebt }
1031
+ (NominalCredit nominalCredit) =
1032
+ floor $ toRational nominalCredit * toRational physicalDebt
1033
+ / toRational nominalDebt
1034
+ -- This specification using Rational as an intermediate representation can
1035
+ -- be implemented efficiently using only integer operations.
1036
+
1037
+ depositNominalCredit ::
1038
+ NominalDebt
1039
+ -> STRef s NominalCredit
1040
+ -> NominalCredit
1041
+ -> ST s NominalCredit
1042
+ depositNominalCredit (NominalDebt nominalDebt)
1043
+ nominalCreditVar
1044
+ (NominalCredit deposit) = do
1045
+ NominalCredit before <- readSTRef nominalCreditVar
1046
+ -- Depositing _could_ leave the credit higher than the debt, because
1047
+ -- sometimes under-full runs mean we don't shuffle runs down the levels
1048
+ -- as quickly as the worst case. So here we do just drop excess nominal
1049
+ -- credits.
1050
+ let ! after = NominalCredit (min (before + deposit) nominalDebt)
1051
+ writeSTRef nominalCreditVar after
1052
+ return after
1000
1053
1001
- -- A tiering merge has 5 runs at most (once could be held back to merged again)
1002
- -- and must be completed before the level is full (once 4 more runs come in).
1003
- creditsForMerge MergePolicyTiering (MergingRun _ _ ref) = do
1004
- readSTRef ref >>= \ case
1005
- CompletedMerge _ -> return 0
1006
- OngoingMerge _ rs _ -> do
1007
- let numRuns = length rs
1008
- assertST $ numRuns `elem` [4 , 5 ]
1009
- return $ fromIntegral numRuns / 4
1054
+ -------------------------------------------------------------------------------
1055
+ -- Updates
1056
+ --
1010
1057
1011
1058
increment :: forall s . Tracer (ST s ) Event
1012
1059
-> Counter -> Run -> Levels s -> UnionLevel s -> ST s (Levels s )
@@ -1028,7 +1075,7 @@ increment tr sc run0 ls0 ul = do
1028
1075
go ! ln incoming (Level ir rs : ls) = do
1029
1076
r <- case ir of
1030
1077
Single r -> return r
1031
- Merging mergePolicy mr -> do
1078
+ Merging mergePolicy _ _ mr -> do
1032
1079
r <- expectCompletedMergingRun mr
1033
1080
traceWith tr' MergeCompletedEvent {
1034
1081
mergePolicy,
@@ -1087,26 +1134,37 @@ newLevelMerge :: Tracer (ST s) EventDetail
1087
1134
-> [Run ] -> ST s (IncomingRun s )
1088
1135
newLevelMerge _ _ _ _ [r] = return (Single r)
1089
1136
newLevelMerge tr level mergePolicy mergeType rs = do
1137
+ assertST (length rs `elem` [4 , 5 ])
1138
+ mergingRun@ (MergingRun _ physicalDebt _) <- newMergingRun mergeType rs
1139
+ assertST (totalDebt physicalDebt <= maxPhysicalDebt)
1090
1140
traceWith tr MergeStartedEvent {
1091
1141
mergePolicy,
1092
1142
mergeType,
1093
- mergeDebt = debt ,
1143
+ mergeDebt = totalDebt physicalDebt ,
1094
1144
mergeRunsSize = map runSize rs
1095
1145
}
1096
- assertST ( length rs `elem` [ 4 , 5 ] )
1097
- Merging mergePolicy <$> newMergingRun ( Just debt) mergeType rs
1146
+ nominalCreditVar <- newSTRef ( NominalCredit 0 )
1147
+ pure ( Merging mergePolicy nominalDebt nominalCreditVar mergingRun)
1098
1148
where
1099
- -- How much we need to discharge before the merge can be guaranteed
1100
- -- complete. More precisely, this is the maximum amount a merge at this
1101
- -- level could need. While the real @cost@ of a merge would lead to merges
1102
- -- finishing early, the overestimation @debt@ means that in this prototype
1103
- -- merges will only complete at the last possible moment.
1104
- -- Note that for levelling this is includes the single run in the current
1105
- -- level.
1106
- debt = case mergePolicy of
1107
- MergePolicyLevelling -> 4 * tieringRunSize (level- 1 )
1108
- + levellingRunSize level
1109
- MergePolicyTiering -> length rs * tieringRunSize (level- 1 )
1149
+ -- The nominal debt equals the minimum of credits we will supply before we
1150
+ -- expect the merge to complete. This is the same as the number of updates.
1151
+ nominalDebt = NominalDebt (tieringRunSize level)
1152
+
1153
+ -- The physical debt is the number of actual merge steps we will need to
1154
+ -- perform before the merge is complete. This is always the sum of the
1155
+ -- lengths of the input runs.
1156
+ --
1157
+ -- As we supply nominal credit, we scale them and supply physical credits,
1158
+ -- such that we pay off the physical and nominal debts at the same time.
1159
+ --
1160
+ -- We can bound the worst case physical debt: this is the maximum amount of
1161
+ -- steps a merge at this level could need. Note that for levelling this is
1162
+ -- includes the single run in the current level.
1163
+ maxPhysicalDebt =
1164
+ case mergePolicy of
1165
+ MergePolicyLevelling -> 4 * tieringRunSize (level- 1 )
1166
+ + levellingRunSize level
1167
+ MergePolicyTiering -> length rs * tieringRunSize (level- 1 )
1110
1168
1111
1169
-- | Only based on run count, not their sizes.
1112
1170
tieringLevelIsFull :: Int -> [Run ] -> [Run ] -> Bool
@@ -1172,8 +1230,8 @@ newPendingLevelMerge irs tree = do
1172
1230
st = PendingTreeMerge (PendingLevelMerge prs tree)
1173
1231
Just . MergingTree <$> newSTRef st
1174
1232
where
1175
- incomingToPreExistingRun (Single r) = PreExistingRun r
1176
- incomingToPreExistingRun (Merging _ mr) = PreExistingMergingRun mr
1233
+ incomingToPreExistingRun (Single r) = PreExistingRun r
1234
+ incomingToPreExistingRun (Merging _ _ _ mr) = PreExistingMergingRun mr
1177
1235
1178
1236
-- | Ensures that the merge contains more than one input.
1179
1237
newPendingUnionMerge :: [MergingTree s ] -> ST s (Maybe (MergingTree s ))
@@ -1286,12 +1344,10 @@ supplyCreditsMergingTreeState credits !state = do
1286
1344
else do
1287
1345
-- all children must be done, create new merge!
1288
1346
(mergeType, rs) <- expectCompletedChildren pm
1289
- -- no reason to claim a larger debt than sum of run sizes
1290
- let debt = Nothing
1291
1347
case rs of
1292
1348
[r] -> return (c', CompletedTreeMerge r)
1293
1349
_ -> do
1294
- state' <- OngoingTreeMerge <$> newMergingRun debt mergeType rs
1350
+ state' <- OngoingTreeMerge <$> newMergingRun mergeType rs
1295
1351
-- use any remaining credits to progress the new merge
1296
1352
supplyCreditsMergingTreeState c' state'
1297
1353
@@ -1368,8 +1424,8 @@ flattenLevel (Level ir rs) = (++ rs) <$> flattenIncomingRun ir
1368
1424
1369
1425
flattenIncomingRun :: IncomingRun s -> ST s [Run ]
1370
1426
flattenIncomingRun = \ case
1371
- Single r -> return [r]
1372
- Merging _ mr -> flattenMergingRun mr
1427
+ Single r -> return [r]
1428
+ Merging _ _ _ mr -> flattenMergingRun mr
1373
1429
1374
1430
flattenMergingRun :: MergingRun t s -> ST s [Run ]
1375
1431
flattenMergingRun (MergingRun _ _ ref) = do
@@ -1431,7 +1487,7 @@ dumpRepresentation (LSMHandle _ lsmr) = do
1431
1487
dumpLevel :: Level s -> ST s LevelRepresentation
1432
1488
dumpLevel (Level (Single r) rs) =
1433
1489
return (Nothing , (r: rs))
1434
- dumpLevel (Level (Merging mp (MergingRun mt _ ref)) rs) = do
1490
+ dumpLevel (Level (Merging mp _nd _nc (MergingRun mt _ ref)) rs) = do
1435
1491
mrs <- readSTRef ref
1436
1492
return (Just (mp, mt, mrs), rs)
1437
1493
0 commit comments