Skip to content

Commit

Permalink
prototype: tweak PendingMerge representation
Browse files Browse the repository at this point in the history
This captures the invariant on pending level merges better, making it
more straight-forward to exploit when doing lookups. Also, it avoids
having to wrap a table's runs into additional STRefs to make them
MergingTrees.
  • Loading branch information
mheinzel committed Nov 28, 2024
1 parent 61ea31f commit cd7bfb5
Showing 1 changed file with 89 additions and 104 deletions.
193 changes: 89 additions & 104 deletions prototypes/ScheduledMerges.hs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ data MergingTreeState s = CompletedTreeMerge !Run
| PendingTreeMerge !(PendingMerge s)

-- | Goes via MergingTree (with its STRef) to allow sharing existing unions.
data PendingMerge s = PendingMerge !MergeType ![MergingTree s]
data PendingMerge s = PendingMerge !MergeType ![IncomingRun s] ![MergingTree s]

type Credit = Int
type Debt = Int
Expand Down Expand Up @@ -345,23 +345,23 @@ invariant (LSMContent _ levels ul) = do
MergeUnion -> assertST $ length rs == 2 -- binary union
MergeLevel _ -> assertST $ length rs > 1 -- no trivial merge

PendingTreeMerge (PendingMerge mt trees) -> do
PendingTreeMerge (PendingMerge mergeType irs trees) -> do
-- we never have to consider Deletes
assertST $ mt /= MergeLevel MergeMidLevel
case mt of
MergeUnion ->
assertST $ mergeType /= MergeLevel MergeMidLevel
case mergeType of
MergeUnion -> do
assertST $ length irs == 0
assertST $ length trees == 2
MergeLevel _ -> do
assertST $ length trees > 1
-- only the last tree can be another union
for_ (init trees) $ \t ->
assertST =<< isRunOrLevelMerge t
-- no trivial merge
assertST $ length irs + length trees > 1
-- there can only be one nested tree
assertST $ length trees <= 1
-- all merges in the IncomingRuns are level merges
for_ irs $ \case
Merging _ (MergingRun mt _) -> assertST $ mt /= MergeUnion
Single _ -> return ()
for_ trees treeInvariant
where
isRunOrLevelMerge (MergingTree ref) = readSTRef ref >>= \case
CompletedTreeMerge _ -> return True
OngoingTreeMerge (MergingRun (MergeLevel _) _) -> return True
_ -> return False

-- 'callStack' just ensures that the 'HasCallStack' constraint is not redundant
-- when compiling with debug assertions disabled.
Expand Down Expand Up @@ -641,7 +641,7 @@ union :: LSM s -> LSM s -> ST s (LSM s)
union (LSMHandle _ lsmr1) (LSMHandle _ lsmr2) = do
mt1 <- contentToMergingTree =<< readSTRef lsmr1
mt2 <- contentToMergingTree =<< readSTRef lsmr2
unionLevel <- newPendingMerge MergeUnion (catMaybes [mt1, mt2]) >>= \case
unionLevel <- newPendingMerge MergeUnion [] (catMaybes [mt1, mt2]) >>= \case
Nothing -> return NoUnion
Just tree -> do
debt <- fst <$> remainingDebtMergingTree tree
Expand Down Expand Up @@ -756,31 +756,18 @@ lookupsTree ks = go
OngoingMerge _ rs _ -> case mt of
MergeLevel _ -> return $ lookupRuns rs -- combine into batch
MergeUnion -> return $ unionLookupAccs (map (\r -> lookupRuns [r]) rs)
PendingTreeMerge (PendingMerge mt trees) -> case mt of
MergeUnion -> unionLookupAccs <$> traverse go trees
PendingTreeMerge (PendingMerge mt irs trees) -> case mt of
MergeUnion -> do
assertST (null irs)
unionLookupAccs <$> traverse go trees
MergeLevel _ -> do
(runs, remaining) <- collectLevel [] trees
runs <- concat <$> traverse flattenIncomingRun irs
let acc0 = updateLookupAcc emptyLookupAcc (submitLookups ks runs)
accs <- traverse go remaining
accs <- traverse go trees
return (mergeLookupAccs (acc0 : accs))

lookupRuns = updateLookupAcc emptyLookupAcc . submitLookups ks

-- recover the known structure of pending level merges (created in
-- 'contentToMergingTree'), which consist of all runs of a table, plus
-- potentially a single union merge at the end
collectLevel :: [[Run]] -> [MergingTree s] -> ST s ([Run], [MergingTree s])
collectLevel rss [] = return (concat (reverse rss), [])
collectLevel rss (mt@(MergingTree ref) : mts) =
readSTRef ref >>= \case
CompletedTreeMerge r ->
collectLevel ([r] : rss) mts
OngoingTreeMerge mr@(MergingRun (MergeLevel _) _) -> do
rs <- flattenMergingRun mr
collectLevel (rs : rss) mts
_ ->
return (concat (reverse rss), mt : mts)

supplyCreditsLevels :: Credit -> Levels s -> ST s ()
supplyCreditsLevels n =
traverse_ $ \(Level ir _rs) -> do
Expand Down Expand Up @@ -919,58 +906,33 @@ levellingLevelIsFull :: Int -> [Run] -> Run -> Bool
levellingLevelIsFull ln _incoming resident = levellingRunSizeToLevel resident > ln

-- | Ensures that the merge contains more than one input.
newPendingMerge :: MergeType -> [MergingTree s] -> ST s (Maybe (MergingTree s))
newPendingMerge mergeType = \case
[] -> return Nothing
[tree] -> return (Just tree)
trees -> do
let pm = PendingMerge mergeType trees
Just . MergingTree <$> newSTRef (PendingTreeMerge pm)
newPendingMerge :: MergeType -> [IncomingRun s] -> [MergingTree s]
-> ST s (Maybe (MergingTree s))
newPendingMerge _ [] [] = return Nothing
newPendingMerge _ [] [t] = return (Just t)
newPendingMerge _ [ir] [] = do
let st = case ir of
Single r -> CompletedTreeMerge r
Merging _ mr -> OngoingTreeMerge mr
Just . MergingTree <$> newSTRef st
newPendingMerge mergeType irs ts = do
let st = PendingTreeMerge (PendingMerge mergeType irs ts)
Just . MergingTree <$> newSTRef st

contentToMergingTree :: LSMContent s -> ST s (Maybe (MergingTree s))
contentToMergingTree content = do
newPendingMerge (MergeLevel MergeLastLevel) =<< fromContent content
contentToMergingTree (LSMContent wb ls ul) =
newPendingMerge (MergeLevel MergeLastLevel) (buffers ++ levels) trees
where
fromContent :: LSMContent s -> ST s [MergingTree s]
fromContent (LSMContent wb ls ul) = do
fmap concat $ sequence
[ toList <$> fromBuffer wb
, concat <$> traverse fromLevel ls
, toList <$> fromUnionLevel ul
]

-- TODO: is it okay to just flush the buffer when creating a union?
fromBuffer :: Buffer -> ST s (Maybe (MergingTree s))
fromBuffer wb
| bufferSize wb == 0 = return Nothing
| otherwise = Just <$> fromRun (bufferToRun wb)

fromLevel :: Level s -> ST s [MergingTree s]
fromLevel (Level ir rs) =
fmap concat $ sequence
[ pure <$> fromIncomingRun ir
, traverse fromRun rs
]

fromIncomingRun :: IncomingRun s -> ST s (MergingTree s)
fromIncomingRun (Single r) = fromRun r
fromIncomingRun (Merging _ mr@(MergingRun _ ref)) = do
readSTRef ref >>= \case
CompletedMerge r -> fromRun r
OngoingMerge{} -> MergingTree <$> newSTRef (OngoingTreeMerge mr)
-- TODO: This STRef is not really needed, MergingRun already shares
-- the merging work and has a notion of ongoing and completed.

fromRun :: Run -> ST s (MergingTree s)
fromRun r =
MergingTree <$> newSTRef (CompletedTreeMerge r)
-- TODO: This STRef is not really needed, no work to be shared and its
-- state will never change.

fromUnionLevel :: UnionLevel s -> ST s (Maybe (MergingTree s))
fromUnionLevel = \case
NoUnion -> return Nothing
Union t _ -> return (Just t)
buffers
| bufferSize wb == 0 = []
| otherwise = [Single (bufferToRun wb)]

levels = flip concatMap ls $ \(Level ir rs) -> ir : map Single rs

trees = case ul of
NoUnion -> []
Union t _ -> [t]

type Size = Int

Expand All @@ -984,12 +946,20 @@ remainingDebtMergingTree (MergingTree ref) =
PendingTreeMerge pm -> remainingDebtPendingMerge pm

remainingDebtPendingMerge :: PendingMerge s -> ST s (Debt, Size)
remainingDebtPendingMerge (PendingMerge _ trees) = do
(debts, sizes) <- unzip <$> traverse remainingDebtMergingTree trees
remainingDebtPendingMerge (PendingMerge _ irs trees) = do
(debts, sizes) <- unzip . concat <$> sequence
[ traverse remainingDebtIncomingRun irs
, traverse remainingDebtMergingTree trees
]
let totalSize = sum sizes
let totalDebt = sum debts + totalSize
return (totalDebt, totalSize)

remainingDebtIncomingRun :: IncomingRun s -> ST s (Debt, Size)
remainingDebtIncomingRun = \case
Single r -> return (0, runSize r)
Merging _ mr -> remainingDebtMergingRun mr

-- | Upper bound on the number of credits needed to complete this merge, as well
-- as the size of the resulting run.
remainingDebtMergingRun :: MergingRun s -> ST s (Debt, Size)
Expand Down Expand Up @@ -1054,23 +1024,29 @@ supplyCreditsMergingTreeState credits !state = do
supplyCreditsMergingTreeState c' state'

supplyCreditsPendingMerge :: Credit -> PendingMerge s -> ST s Credit
supplyCreditsPendingMerge = checked remainingDebtPendingMerge $ \credits (PendingMerge mergeType trees) -> do
supplyCreditsPendingMerge = checked remainingDebtPendingMerge $ \credits pm -> do
let PendingMerge mergeType incoming trees = pm
assertST (credits >= 0)
case mergeType of
MergeLevel _ -> leftToRight credits trees
MergeLevel _ -> leftToRight credits incoming trees
MergeUnion ->
case trees of
[t1, t2] -> splitEqually credits t1 t2
_ -> error $ "supplyCreditsPendingMerge: "
++ "expected two union merge inputs, "
++ "got " ++ show (length trees)
case (incoming, trees) of
([], [t1, t2]) -> splitEqually credits t1 t2
_ -> error $ "supplyCreditsPendingMerge: "
++ "expected two union merge inputs, "
++ "got " ++ show (length trees)
where
-- supply credit left to right until it is used up
leftToRight 0 _ = return 0
leftToRight c [] = return c
leftToRight c (mt:mts) = do
leftToRight 0 _ _ = return 0
leftToRight c [] [] = return c
leftToRight c [] (mt:mts) = do
c' <- supplyCreditsMergingTree c mt
leftToRight c' mts
leftToRight c' [] mts
leftToRight c (ir:irs) mts = do
c' <- case ir of
Single _ -> return c
Merging _ mr -> supplyCreditsMergingRun c mr
leftToRight c' irs mts

splitEqually c mt1 mt2 = do
-- supply credit roughly evenly on both sides
Expand All @@ -1090,9 +1066,14 @@ supplyCreditsPendingMerge = checked remainingDebtPendingMerge $ \credits (Pendin

-- TODO: Tracer
expectCompletedChildren :: HasCallStack => PendingMerge s -> ST s (MergeType, [Run])
expectCompletedChildren (PendingMerge mergeType trees) = do
rs <- traverse expectCompletedMergingTree trees
return (mergeType, rs)
expectCompletedChildren (PendingMerge mergeType irs trees) = do
rs1 <- traverse expectCompletedIncomingRun irs
rs2 <- traverse expectCompletedMergingTree trees
return (mergeType, rs1 ++ rs2)
where
expectCompletedIncomingRun = \case
Single r -> return r
Merging _ mr -> expectCompletedMerge nullTracer Nothing mr

expectCompletedMergingTree :: HasCallStack => MergingTree s -> ST s Run
expectCompletedMergingTree (MergingTree ref) = do
Expand Down Expand Up @@ -1123,10 +1104,12 @@ flattenLevels :: Levels s -> ST s [[Run]]
flattenLevels = mapM flattenLevel

flattenLevel :: Level s -> ST s [Run]
flattenLevel (Level ir rs) =
case ir of
Single r -> return (r : rs)
Merging _ mr -> (++ rs) <$> flattenMergingRun mr
flattenLevel (Level ir rs) = (++ rs) <$> flattenIncomingRun ir

flattenIncomingRun :: IncomingRun s -> ST s [Run]
flattenIncomingRun = \case
Single r -> return [r]
Merging _ mr -> flattenMergingRun mr

flattenMergingRun :: MergingRun s -> ST s [Run]
flattenMergingRun (MergingRun _ ref) = do
Expand All @@ -1145,8 +1128,10 @@ flattenTree (MergingTree ref) = do
readSTRef mrs >>= \case
CompletedMerge r -> return (MLeaf r)
OngoingMerge _ rs _ -> return (MNode mt (MLeaf <$> rs))
PendingTreeMerge (PendingMerge mt trees) ->
MNode mt <$> traverse flattenTree trees
PendingTreeMerge (PendingMerge mt irs trees) -> do
irs' <- map MLeaf . concat <$> traverse flattenIncomingRun irs
trees' <- traverse flattenTree trees
return (MNode mt (irs' ++ trees'))

logicalValue :: LSM s -> ST s (Map Key (Value, Maybe Blob))
logicalValue lsm = do
Expand Down

0 comments on commit cd7bfb5

Please sign in to comment.