From cd7bfb585abb4cc199d30acf9a86fa08bece5780 Mon Sep 17 00:00:00 2001 From: Matthias Heinzel Date: Mon, 25 Nov 2024 18:52:22 +0100 Subject: [PATCH] prototype: tweak PendingMerge representation This captures the invariant on pending level merges better, making it more straight-forward to exploit when doing lookups. Also, it avoids having to wrap a table's runs into additional STRefs to make them MergingTrees. --- prototypes/ScheduledMerges.hs | 193 ++++++++++++++++------------------ 1 file changed, 89 insertions(+), 104 deletions(-) diff --git a/prototypes/ScheduledMerges.hs b/prototypes/ScheduledMerges.hs index af5fcced8..03f0cd4f3 100644 --- a/prototypes/ScheduledMerges.hs +++ b/prototypes/ScheduledMerges.hs @@ -140,7 +140,7 @@ data MergingTreeState s = CompletedTreeMerge !Run | PendingTreeMerge !(PendingMerge s) -- | Goes via MergingTree (with its STRef) to allow sharing existing unions. -data PendingMerge s = PendingMerge !MergeType ![MergingTree s] +data PendingMerge s = PendingMerge !MergeType ![IncomingRun s] ![MergingTree s] type Credit = Int type Debt = Int @@ -345,23 +345,23 @@ invariant (LSMContent _ levels ul) = do MergeUnion -> assertST $ length rs == 2 -- binary union MergeLevel _ -> assertST $ length rs > 1 -- no trivial merge - PendingTreeMerge (PendingMerge mt trees) -> do + PendingTreeMerge (PendingMerge mergeType irs trees) -> do -- we never have to consider Deletes - assertST $ mt /= MergeLevel MergeMidLevel - case mt of - MergeUnion -> + assertST $ mergeType /= MergeLevel MergeMidLevel + case mergeType of + MergeUnion -> do + assertST $ length irs == 0 assertST $ length trees == 2 MergeLevel _ -> do - assertST $ length trees > 1 - -- only the last tree can be another union - for_ (init trees) $ \t -> - assertST =<< isRunOrLevelMerge t + -- no trivial merge + assertST $ length irs + length trees > 1 + -- there can only be one nested tree + assertST $ length trees <= 1 + -- all merges in the IncomingRuns are level merges + for_ irs $ \case + Merging _ (MergingRun mt _) -> assertST $ mt /= MergeUnion + Single _ -> return () for_ trees treeInvariant - where - isRunOrLevelMerge (MergingTree ref) = readSTRef ref >>= \case - CompletedTreeMerge _ -> return True - OngoingTreeMerge (MergingRun (MergeLevel _) _) -> return True - _ -> return False -- 'callStack' just ensures that the 'HasCallStack' constraint is not redundant -- when compiling with debug assertions disabled. @@ -641,7 +641,7 @@ union :: LSM s -> LSM s -> ST s (LSM s) union (LSMHandle _ lsmr1) (LSMHandle _ lsmr2) = do mt1 <- contentToMergingTree =<< readSTRef lsmr1 mt2 <- contentToMergingTree =<< readSTRef lsmr2 - unionLevel <- newPendingMerge MergeUnion (catMaybes [mt1, mt2]) >>= \case + unionLevel <- newPendingMerge MergeUnion [] (catMaybes [mt1, mt2]) >>= \case Nothing -> return NoUnion Just tree -> do debt <- fst <$> remainingDebtMergingTree tree @@ -756,31 +756,18 @@ lookupsTree ks = go OngoingMerge _ rs _ -> case mt of MergeLevel _ -> return $ lookupRuns rs -- combine into batch MergeUnion -> return $ unionLookupAccs (map (\r -> lookupRuns [r]) rs) - PendingTreeMerge (PendingMerge mt trees) -> case mt of - MergeUnion -> unionLookupAccs <$> traverse go trees + PendingTreeMerge (PendingMerge mt irs trees) -> case mt of + MergeUnion -> do + assertST (null irs) + unionLookupAccs <$> traverse go trees MergeLevel _ -> do - (runs, remaining) <- collectLevel [] trees + runs <- concat <$> traverse flattenIncomingRun irs let acc0 = updateLookupAcc emptyLookupAcc (submitLookups ks runs) - accs <- traverse go remaining + accs <- traverse go trees return (mergeLookupAccs (acc0 : accs)) lookupRuns = updateLookupAcc emptyLookupAcc . submitLookups ks - -- recover the known structure of pending level merges (created in - -- 'contentToMergingTree'), which consist of all runs of a table, plus - -- potentially a single union merge at the end - collectLevel :: [[Run]] -> [MergingTree s] -> ST s ([Run], [MergingTree s]) - collectLevel rss [] = return (concat (reverse rss), []) - collectLevel rss (mt@(MergingTree ref) : mts) = - readSTRef ref >>= \case - CompletedTreeMerge r -> - collectLevel ([r] : rss) mts - OngoingTreeMerge mr@(MergingRun (MergeLevel _) _) -> do - rs <- flattenMergingRun mr - collectLevel (rs : rss) mts - _ -> - return (concat (reverse rss), mt : mts) - supplyCreditsLevels :: Credit -> Levels s -> ST s () supplyCreditsLevels n = traverse_ $ \(Level ir _rs) -> do @@ -919,58 +906,33 @@ levellingLevelIsFull :: Int -> [Run] -> Run -> Bool levellingLevelIsFull ln _incoming resident = levellingRunSizeToLevel resident > ln -- | Ensures that the merge contains more than one input. -newPendingMerge :: MergeType -> [MergingTree s] -> ST s (Maybe (MergingTree s)) -newPendingMerge mergeType = \case - [] -> return Nothing - [tree] -> return (Just tree) - trees -> do - let pm = PendingMerge mergeType trees - Just . MergingTree <$> newSTRef (PendingTreeMerge pm) +newPendingMerge :: MergeType -> [IncomingRun s] -> [MergingTree s] + -> ST s (Maybe (MergingTree s)) +newPendingMerge _ [] [] = return Nothing +newPendingMerge _ [] [t] = return (Just t) +newPendingMerge _ [ir] [] = do + let st = case ir of + Single r -> CompletedTreeMerge r + Merging _ mr -> OngoingTreeMerge mr + Just . MergingTree <$> newSTRef st +newPendingMerge mergeType irs ts = do + let st = PendingTreeMerge (PendingMerge mergeType irs ts) + Just . MergingTree <$> newSTRef st contentToMergingTree :: LSMContent s -> ST s (Maybe (MergingTree s)) -contentToMergingTree content = do - newPendingMerge (MergeLevel MergeLastLevel) =<< fromContent content +contentToMergingTree (LSMContent wb ls ul) = + newPendingMerge (MergeLevel MergeLastLevel) (buffers ++ levels) trees where - fromContent :: LSMContent s -> ST s [MergingTree s] - fromContent (LSMContent wb ls ul) = do - fmap concat $ sequence - [ toList <$> fromBuffer wb - , concat <$> traverse fromLevel ls - , toList <$> fromUnionLevel ul - ] - -- TODO: is it okay to just flush the buffer when creating a union? - fromBuffer :: Buffer -> ST s (Maybe (MergingTree s)) - fromBuffer wb - | bufferSize wb == 0 = return Nothing - | otherwise = Just <$> fromRun (bufferToRun wb) - - fromLevel :: Level s -> ST s [MergingTree s] - fromLevel (Level ir rs) = - fmap concat $ sequence - [ pure <$> fromIncomingRun ir - , traverse fromRun rs - ] - - fromIncomingRun :: IncomingRun s -> ST s (MergingTree s) - fromIncomingRun (Single r) = fromRun r - fromIncomingRun (Merging _ mr@(MergingRun _ ref)) = do - readSTRef ref >>= \case - CompletedMerge r -> fromRun r - OngoingMerge{} -> MergingTree <$> newSTRef (OngoingTreeMerge mr) - -- TODO: This STRef is not really needed, MergingRun already shares - -- the merging work and has a notion of ongoing and completed. - - fromRun :: Run -> ST s (MergingTree s) - fromRun r = - MergingTree <$> newSTRef (CompletedTreeMerge r) - -- TODO: This STRef is not really needed, no work to be shared and its - -- state will never change. - - fromUnionLevel :: UnionLevel s -> ST s (Maybe (MergingTree s)) - fromUnionLevel = \case - NoUnion -> return Nothing - Union t _ -> return (Just t) + buffers + | bufferSize wb == 0 = [] + | otherwise = [Single (bufferToRun wb)] + + levels = flip concatMap ls $ \(Level ir rs) -> ir : map Single rs + + trees = case ul of + NoUnion -> [] + Union t _ -> [t] type Size = Int @@ -984,12 +946,20 @@ remainingDebtMergingTree (MergingTree ref) = PendingTreeMerge pm -> remainingDebtPendingMerge pm remainingDebtPendingMerge :: PendingMerge s -> ST s (Debt, Size) -remainingDebtPendingMerge (PendingMerge _ trees) = do - (debts, sizes) <- unzip <$> traverse remainingDebtMergingTree trees +remainingDebtPendingMerge (PendingMerge _ irs trees) = do + (debts, sizes) <- unzip . concat <$> sequence + [ traverse remainingDebtIncomingRun irs + , traverse remainingDebtMergingTree trees + ] let totalSize = sum sizes let totalDebt = sum debts + totalSize return (totalDebt, totalSize) +remainingDebtIncomingRun :: IncomingRun s -> ST s (Debt, Size) +remainingDebtIncomingRun = \case + Single r -> return (0, runSize r) + Merging _ mr -> remainingDebtMergingRun mr + -- | Upper bound on the number of credits needed to complete this merge, as well -- as the size of the resulting run. remainingDebtMergingRun :: MergingRun s -> ST s (Debt, Size) @@ -1054,23 +1024,29 @@ supplyCreditsMergingTreeState credits !state = do supplyCreditsMergingTreeState c' state' supplyCreditsPendingMerge :: Credit -> PendingMerge s -> ST s Credit -supplyCreditsPendingMerge = checked remainingDebtPendingMerge $ \credits (PendingMerge mergeType trees) -> do +supplyCreditsPendingMerge = checked remainingDebtPendingMerge $ \credits pm -> do + let PendingMerge mergeType incoming trees = pm assertST (credits >= 0) case mergeType of - MergeLevel _ -> leftToRight credits trees + MergeLevel _ -> leftToRight credits incoming trees MergeUnion -> - case trees of - [t1, t2] -> splitEqually credits t1 t2 - _ -> error $ "supplyCreditsPendingMerge: " - ++ "expected two union merge inputs, " - ++ "got " ++ show (length trees) + case (incoming, trees) of + ([], [t1, t2]) -> splitEqually credits t1 t2 + _ -> error $ "supplyCreditsPendingMerge: " + ++ "expected two union merge inputs, " + ++ "got " ++ show (length trees) where -- supply credit left to right until it is used up - leftToRight 0 _ = return 0 - leftToRight c [] = return c - leftToRight c (mt:mts) = do + leftToRight 0 _ _ = return 0 + leftToRight c [] [] = return c + leftToRight c [] (mt:mts) = do c' <- supplyCreditsMergingTree c mt - leftToRight c' mts + leftToRight c' [] mts + leftToRight c (ir:irs) mts = do + c' <- case ir of + Single _ -> return c + Merging _ mr -> supplyCreditsMergingRun c mr + leftToRight c' irs mts splitEqually c mt1 mt2 = do -- supply credit roughly evenly on both sides @@ -1090,9 +1066,14 @@ supplyCreditsPendingMerge = checked remainingDebtPendingMerge $ \credits (Pendin -- TODO: Tracer expectCompletedChildren :: HasCallStack => PendingMerge s -> ST s (MergeType, [Run]) -expectCompletedChildren (PendingMerge mergeType trees) = do - rs <- traverse expectCompletedMergingTree trees - return (mergeType, rs) +expectCompletedChildren (PendingMerge mergeType irs trees) = do + rs1 <- traverse expectCompletedIncomingRun irs + rs2 <- traverse expectCompletedMergingTree trees + return (mergeType, rs1 ++ rs2) + where + expectCompletedIncomingRun = \case + Single r -> return r + Merging _ mr -> expectCompletedMerge nullTracer Nothing mr expectCompletedMergingTree :: HasCallStack => MergingTree s -> ST s Run expectCompletedMergingTree (MergingTree ref) = do @@ -1123,10 +1104,12 @@ flattenLevels :: Levels s -> ST s [[Run]] flattenLevels = mapM flattenLevel flattenLevel :: Level s -> ST s [Run] -flattenLevel (Level ir rs) = - case ir of - Single r -> return (r : rs) - Merging _ mr -> (++ rs) <$> flattenMergingRun mr +flattenLevel (Level ir rs) = (++ rs) <$> flattenIncomingRun ir + +flattenIncomingRun :: IncomingRun s -> ST s [Run] +flattenIncomingRun = \case + Single r -> return [r] + Merging _ mr -> flattenMergingRun mr flattenMergingRun :: MergingRun s -> ST s [Run] flattenMergingRun (MergingRun _ ref) = do @@ -1145,8 +1128,10 @@ flattenTree (MergingTree ref) = do readSTRef mrs >>= \case CompletedMerge r -> return (MLeaf r) OngoingMerge _ rs _ -> return (MNode mt (MLeaf <$> rs)) - PendingTreeMerge (PendingMerge mt trees) -> - MNode mt <$> traverse flattenTree trees + PendingTreeMerge (PendingMerge mt irs trees) -> do + irs' <- map MLeaf . concat <$> traverse flattenIncomingRun irs + trees' <- traverse flattenTree trees + return (MNode mt (irs' ++ trees')) logicalValue :: LSM s -> ST s (Map Key (Value, Maybe Blob)) logicalValue lsm = do