|
| 1 | +{-# LANGUAGE TypeFamilies #-} |
| 2 | + |
| 3 | +-- | An incremental merge of multiple runs, preserving a bracketing structure. |
| 4 | +module Database.LSMTree.Internal.MergingTree ( |
| 5 | + -- $mergingtrees |
| 6 | + MergingTree (..) |
| 7 | + , newPendingLevelMerge |
| 8 | + , PreExistingRun (..) |
| 9 | + , newPendingUnionMerge |
| 10 | + , isStructurallyEmpty |
| 11 | + -- * Internal state |
| 12 | + , MergingTreeState (..) |
| 13 | + , PendingMerge (..) |
| 14 | + ) where |
| 15 | + |
| 16 | +import Control.Concurrent.Class.MonadMVar.Strict |
| 17 | +import Control.Monad (filterM) |
| 18 | +import Control.Monad.Class.MonadThrow (MonadMask) |
| 19 | +import Control.Monad.Primitive |
| 20 | +import Control.RefCount |
| 21 | +import Data.Foldable (traverse_) |
| 22 | +import Database.LSMTree.Internal.MergingRun (MergingRun) |
| 23 | +import Database.LSMTree.Internal.Run (Run) |
| 24 | + |
| 25 | +-- $mergingtrees Semantically, tables are key-value stores like Haskell's |
| 26 | +-- @Map@. Table unions then behave like @Map.unionWith (<>)@. If one of the |
| 27 | +-- input tables contains a value at a particular key, the result will also |
| 28 | +-- contain it. If multiple tables share that key, the values will be combined |
| 29 | +-- monoidally. |
| 30 | +-- |
| 31 | +-- Looking at the implementation, tables are not just key-value pairs, but |
| 32 | +-- consist of runs. If each table was just a single run, unioning would involve |
| 33 | +-- a run merge similar to the one used for compaction (when a level is full), |
| 34 | +-- but with a different merge type 'MergeUnion' that differs semantically: |
| 35 | +-- Here, runs don't represent updates (overwriting each other), but they each |
| 36 | +-- represent the full state of a table. There is no distinction between no |
| 37 | +-- entry and a 'Delete', between an 'Insert' and a 'Mupsert'. |
| 38 | +-- |
| 39 | +-- To union two tables, we can therefore first merge down each table into a |
| 40 | +-- single run (using regular level merges) and then union merge these. |
| 41 | +-- |
| 42 | +-- However, we want to spread out the work required and perform these merges |
| 43 | +-- incrementally. At first, we only create a new table that is empty except for |
| 44 | +-- a data structure 'MergingTree', representing the merges that need to be |
| 45 | +-- done. The usual operations can then be performed on the table while the |
| 46 | +-- merge is in progress: Inserts go into the table as usual, not affecting its |
| 47 | +-- last level ('UnionLevel'), lookups need to consider the tree (requiring some |
| 48 | +-- complexity and runtime overhead), further unions incorporate the in-progress |
| 49 | +-- tree into the resulting one, which also shares future merging work. |
| 50 | +-- |
| 51 | +-- It seems necessary to represent the suspended merges using a tree. Other |
| 52 | +-- approaches don't allow for full sharing of the incremental work (e.g. |
| 53 | +-- because they effectively \"re-bracket\" nested unions). It also seems |
| 54 | +-- necessary to first merge each input table into a single run, as there is no |
| 55 | +-- practical distributive property between level and union merges. |
| 56 | + |
| 57 | + |
| 58 | +-- | A \"merging tree\" is a mutable representation of an incremental |
| 59 | +-- tree-shaped nested merge. This allows to represent union merges of entire |
| 60 | +-- tables, each of which itself first need to be merged to become a single run. |
| 61 | +-- |
| 62 | +-- Trees have to support arbitrarily deep nesting, since each input to 'union' |
| 63 | +-- might already contain an in-progress merging tree (which then becomes shared |
| 64 | +-- between multiple tables). |
| 65 | +-- |
| 66 | +data MergingTree m h = MergingTree { |
| 67 | + mergeState :: !(StrictMVar m (MergingTreeState m h)) |
| 68 | + , mergeRefCounter :: !(RefCounter m) |
| 69 | + } |
| 70 | + |
| 71 | +instance RefCounted m (MergingTree m h) where |
| 72 | + getRefCounter = mergeRefCounter |
| 73 | + |
| 74 | +data MergingTreeState m h = |
| 75 | + CompletedTreeMerge |
| 76 | + !(Ref (Run m h)) |
| 77 | + -- ^ Output run |
| 78 | + |
| 79 | + -- | Reuses MergingRun to allow sharing existing merges. |
| 80 | + | OngoingTreeMerge |
| 81 | + !(Ref (MergingRun m h)) |
| 82 | + |
| 83 | + | PendingTreeMerge |
| 84 | + !(PendingMerge m h) |
| 85 | + |
| 86 | +-- | A merge that is waiting for its inputs to complete. |
| 87 | +data PendingMerge m h = |
| 88 | + -- | The collection of inputs is the entire contents of a table, |
| 89 | + -- i.e. its (merging) runs and finally a union merge (if that table |
| 90 | + -- already contained a union). |
| 91 | + PendingLevelMerge |
| 92 | + ![PreExistingRun m h] |
| 93 | + !(Maybe (Ref (MergingTree m h))) |
| 94 | + |
| 95 | + -- | Each input is the entire content of a table (as a merging tree). |
| 96 | + | PendingUnionMerge |
| 97 | + ![Ref (MergingTree m h)] |
| 98 | + |
| 99 | +data PreExistingRun m h = |
| 100 | + PreExistingRun !(Ref (Run m h)) |
| 101 | + | PreExistingMergingRun !(Ref (MergingRun m h)) |
| 102 | + |
| 103 | + |
| 104 | +-- | Create a new 'MergingTree' representing the merge of a sequence of |
| 105 | +-- pre-existing runs (completed or ongoing, plus a optional final tree). |
| 106 | +-- This is for merging the entire contents of a table down to a single run |
| 107 | +-- (while sharing existing ongoing merges). |
| 108 | +-- |
| 109 | +-- Shape: if the list of runs is empty and the optional input tree is |
| 110 | +-- structurally empty, the result will also be structurally empty. See |
| 111 | +-- 'isStructurallyEmpty'. |
| 112 | +-- |
| 113 | +-- Resource tracking: |
| 114 | +-- * This allocates a new 'Ref' which the caller is responsible for releasing |
| 115 | +-- eventually. |
| 116 | +-- * The ownership of all input 'Ref's remains with the caller. This action |
| 117 | +-- will create duplicate references, not adopt the given ones. |
| 118 | +-- |
| 119 | +-- ASYNC: this should be called with asynchronous exceptions masked because it |
| 120 | +-- allocates\/creates resources. |
| 121 | +newPendingLevelMerge :: |
| 122 | + forall m h. |
| 123 | + (MonadMVar m, MonadMask m, PrimMonad m) |
| 124 | + => [PreExistingRun m h] |
| 125 | + -> Maybe (Ref (MergingTree m h)) |
| 126 | + -> m (Ref (MergingTree m h)) |
| 127 | +newPendingLevelMerge [] (Just t) = dupRef t |
| 128 | +newPendingLevelMerge prs mmt = do |
| 129 | + -- There are no interruption points here, and thus provided async exceptions |
| 130 | + -- are masked then there can be no async exceptions here at all. |
| 131 | + mergeTreeState <- case (prs, mmt) of |
| 132 | + ([PreExistingRun r], Nothing) -> |
| 133 | + CompletedTreeMerge <$> dupRef r |
| 134 | + |
| 135 | + ([PreExistingMergingRun mr], Nothing) -> |
| 136 | + OngoingTreeMerge <$> dupRef mr |
| 137 | + |
| 138 | + _ -> PendingTreeMerge <$> |
| 139 | + (PendingLevelMerge <$> traverse dupPreExistingRun prs |
| 140 | + <*> dupMaybeMergingTree mmt) |
| 141 | + |
| 142 | + newMergeTree mergeTreeState |
| 143 | + where |
| 144 | + dupPreExistingRun (PreExistingRun r) = |
| 145 | + PreExistingRun <$> dupRef r |
| 146 | + dupPreExistingRun (PreExistingMergingRun mr) = |
| 147 | + PreExistingMergingRun <$> dupRef mr |
| 148 | + |
| 149 | + dupMaybeMergingTree :: Maybe (Ref (MergingTree m h)) |
| 150 | + -> m (Maybe (Ref (MergingTree m h))) |
| 151 | + dupMaybeMergingTree Nothing = return Nothing |
| 152 | + dupMaybeMergingTree (Just mt) = do |
| 153 | + isempty <- isStructurallyEmpty mt |
| 154 | + if isempty |
| 155 | + then return Nothing |
| 156 | + else Just <$> dupRef mt |
| 157 | + |
| 158 | +-- | Create a new 'MergingTree' representing the union of one or more merging |
| 159 | +-- trees. This is for unioning the content of multiple tables (represented |
| 160 | +-- themselves as merging trees). |
| 161 | +-- |
| 162 | +-- Shape: if all of the input trees are structurally empty, the result will |
| 163 | +-- also be structurally empty. See 'isStructurallyEmpty'. |
| 164 | +-- |
| 165 | +-- Resource tracking: |
| 166 | +-- * This allocates a new 'Ref' which the caller is responsible for releasing |
| 167 | +-- eventually. |
| 168 | +-- * The ownership of all input 'Ref's remains with the caller. This action |
| 169 | +-- will create duplicate references, not adopt the given ones. |
| 170 | +-- |
| 171 | +-- ASYNC: this should be called with asynchronous exceptions masked because it |
| 172 | +-- allocates\/creates resources. |
| 173 | +newPendingUnionMerge :: |
| 174 | + (MonadMVar m, MonadMask m, PrimMonad m) |
| 175 | + => [Ref (MergingTree m h)] |
| 176 | + -> m (Ref (MergingTree m h)) |
| 177 | +newPendingUnionMerge mts = do |
| 178 | + mts' <- mapM dupRef =<< filterM (fmap not . isStructurallyEmpty) mts |
| 179 | + case mts' of |
| 180 | + [mt] -> return mt |
| 181 | + _ -> newMergeTree (PendingTreeMerge (PendingUnionMerge mts')) |
| 182 | + |
| 183 | +-- | Test if a 'MergingTree' is \"obviously\" empty by virtue of its structure. |
| 184 | +-- This is not the same as being empty due to a pending or ongoing merge |
| 185 | +-- happening to produce an empty run. |
| 186 | +-- |
| 187 | +isStructurallyEmpty :: MonadMVar m => Ref (MergingTree m h) -> m Bool |
| 188 | +isStructurallyEmpty (DeRef MergingTree {mergeState}) = |
| 189 | + isEmpty <$> readMVar mergeState |
| 190 | + where |
| 191 | + isEmpty (PendingTreeMerge (PendingLevelMerge [] Nothing)) = True |
| 192 | + isEmpty (PendingTreeMerge (PendingUnionMerge [])) = True |
| 193 | + isEmpty _ = False |
| 194 | + -- It may also turn out to be useful to consider CompletedTreeMerge with |
| 195 | + -- a zero length runs as empty. |
| 196 | + |
| 197 | +-- | Constructor helper. |
| 198 | +newMergeTree :: |
| 199 | + (MonadMVar m, PrimMonad m, MonadMask m) |
| 200 | + => MergingTreeState m h |
| 201 | + -> m (Ref (MergingTree m h)) |
| 202 | +newMergeTree mergeTreeState = do |
| 203 | + mergeState <- newMVar mergeTreeState |
| 204 | + newRef (finalise mergeState) $ \mergeRefCounter -> |
| 205 | + MergingTree { |
| 206 | + mergeState |
| 207 | + , mergeRefCounter |
| 208 | + } |
| 209 | + |
| 210 | +finalise :: (MonadMVar m, PrimMonad m, MonadMask m) |
| 211 | + => StrictMVar m (MergingTreeState m h) -> m () |
| 212 | +finalise mergeState = releaseMTS =<< readMVar mergeState |
| 213 | + where |
| 214 | + releaseMTS (CompletedTreeMerge r) = releaseRef r |
| 215 | + releaseMTS (OngoingTreeMerge mr) = releaseRef mr |
| 216 | + releaseMTS (PendingTreeMerge ptm) = |
| 217 | + case ptm of |
| 218 | + PendingUnionMerge mts -> traverse_ releaseRef mts |
| 219 | + PendingLevelMerge prs mmt -> traverse_ releasePER prs |
| 220 | + >> traverse_ releaseRef mmt |
| 221 | + |
| 222 | + releasePER (PreExistingRun r) = releaseRef r |
| 223 | + releasePER (PreExistingMergingRun mr) = releaseRef mr |
| 224 | + |
0 commit comments