Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolving TODO: PrimVar for UniqCounter #508

Merged
merged 2 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src-extras/Database/LSMTree/Extras/NoThunks.hs
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,18 @@ deriving via AllowThunksIn ["cursorSession", "cursorSessionEnv"] (CursorEnv m h)
instance (Typeable m, Typeable h, Typeable (PrimState m))
=> NoThunks (CursorEnv m h)

deriving stock instance Generic TableId
deriving anyclass instance NoThunks TableId

deriving stock instance Generic CursorId
deriving anyclass instance NoThunks CursorId

{-------------------------------------------------------------------------------
UniqCounter
-------------------------------------------------------------------------------}

deriving stock instance Generic (UniqCounter m)
deriving anyclass instance NoThunks (StrictMVar m Word64)
deriving anyclass instance (NoThunks (PrimVar (PrimState m) Int))
=> NoThunks (UniqCounter m)

{-------------------------------------------------------------------------------
Expand Down
5 changes: 2 additions & 3 deletions src-extras/Database/LSMTree/Extras/RunData.hs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import qualified Data.Map as M
import Data.Map.Strict (Map)
import qualified Data.Map.Strict as Map
import qualified Data.Vector as V
import Data.Word (Word64)
import Database.LSMTree.Extras (showPowersOf10)
import Database.LSMTree.Extras.Generators ()
import Database.LSMTree.Internal.Entry
Expand Down Expand Up @@ -172,11 +171,11 @@ serialiseRunData rd =

-- | Create a 'RunFsPaths' using an empty 'FsPath'. The empty path corresponds
-- to the "root" or "mount point" of a 'HasFS' instance.
simplePath :: Word64 -> RunFsPaths
simplePath :: Int -> RunFsPaths
simplePath n = RunFsPaths (mkFsPath []) (RunNumber n)

-- | Like 'simplePath', but for a list.
simplePaths :: [Word64] -> [RunFsPaths]
simplePaths :: [Int] -> [RunFsPaths]
simplePaths ns = fmap simplePath ns

{-------------------------------------------------------------------------------
Expand Down
39 changes: 16 additions & 23 deletions src/Database/LSMTree/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ import Data.Maybe (catMaybes)
import qualified Data.Set as Set
import Data.Typeable
import qualified Data.Vector as V
import Data.Word (Word64)
import Database.LSMTree.Internal.BlobRef (WeakBlobRef (..))
import qualified Database.LSMTree.Internal.BlobRef as BlobRef
import Database.LSMTree.Internal.Config
Expand All @@ -111,6 +110,7 @@ import Database.LSMTree.Internal.Paths (SessionRoot (..),
import qualified Database.LSMTree.Internal.Paths as Paths
import Database.LSMTree.Internal.Range (Range (..))
import Database.LSMTree.Internal.Run (Run)
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.RunReaders (OffsetKey (..))
import qualified Database.LSMTree.Internal.RunReaders as Readers
import Database.LSMTree.Internal.Serialise (SerialisedBlob (..),
Expand Down Expand Up @@ -251,18 +251,13 @@ data LSMTreeTrace =
-- Table
| TraceNewTable
| TraceOpenSnapshot SnapshotName TableConfigOverride
| TraceTable
Word64 -- ^ Table identifier
TableTrace
| TraceTable TableId TableTrace
| TraceDeleteSnapshot SnapshotName
| TraceListSnapshots
-- Cursor
| TraceCursor
Word64 -- ^ Cursor identifier
CursorTrace
| TraceCursor CursorId CursorTrace
-- Unions
| TraceUnions
(NonEmpty Word64) -- ^ Table identifiers
| TraceUnions (NonEmpty TableId)
deriving stock Show

data TableTrace =
Expand All @@ -285,8 +280,7 @@ data TableTrace =
deriving stock Show

data CursorTrace =
TraceCreateCursor
Word64 -- ^ Table identifier
TraceCreateCursor TableId
| TraceCloseCursor
| TraceReadCursor Int
deriving stock Show
Expand Down Expand Up @@ -346,10 +340,10 @@ data SessionEnv m h = SessionEnv {
-- * A table 'close' may delete its own identifier from the set of open
-- tables without restrictions, even concurrently with 'closeSession'.
-- This is safe because 'close' is idempotent'.
, sessionOpenTables :: !(StrictMVar m (Map Word64 (Table m h)))
, sessionOpenTables :: !(StrictMVar m (Map TableId (Table m h)))
-- | Similarly to tables, open cursors are tracked so they can be closed
-- once the session is closed. See 'sessionOpenTables'.
, sessionOpenCursors :: !(StrictMVar m (Map Word64 (Cursor m h)))
, sessionOpenCursors :: !(StrictMVar m (Map CursorId (Cursor m h)))
}

{-# INLINE withOpenSession #-}
Expand Down Expand Up @@ -583,7 +577,7 @@ data Table m h = Table {
--
-- INVARIANT: a table's identifier is never changed during the lifetime of
-- the table.
, tableId :: !Word64
, tableId :: !TableId

-- === Session-inherited

Expand Down Expand Up @@ -645,10 +639,10 @@ tableSessionUniqCounter :: TableEnv m h -> UniqCounter m
tableSessionUniqCounter = sessionUniqCounter . tableSessionEnv

{-# INLINE tableSessionUntrackTable #-}
{-# SPECIALISE tableSessionUntrackTable :: Word64 -> TableEnv IO h -> IO () #-}
{-# SPECIALISE tableSessionUntrackTable :: TableId -> TableEnv IO h -> IO () #-}
-- | Open tables are tracked in the corresponding session, so when a table is
-- closed it should become untracked (forgotten).
tableSessionUntrackTable :: MonadMVar m => Word64 -> TableEnv m h -> m ()
tableSessionUntrackTable :: MonadMVar m => TableId -> TableEnv m h -> m ()
tableSessionUntrackTable tableId thEnv =
modifyMVar_ (sessionOpenTables (tableSessionEnv thEnv)) $ pure . Map.delete tableId

Expand Down Expand Up @@ -738,8 +732,8 @@ newWith ::
-> TableContent m h
-> m (Table m h)
newWith reg sesh seshEnv conf !am !tc = do
tableId <- incrUniqCounter (sessionUniqCounter seshEnv)
let tr = TraceTable (uniqueToWord64 tableId) `contramap` sessionTracer sesh
tableId <- uniqueToTableId <$> incrUniqCounter (sessionUniqCounter seshEnv)
let tr = TraceTable tableId `contramap` sessionTracer sesh
traceWith tr $ TraceCreateTable conf
-- The session is kept open until we've updated the session's set of tracked
-- tables. If 'closeSession' is called by another thread while this code
Expand All @@ -750,12 +744,11 @@ newWith reg sesh seshEnv conf !am !tc = do
tableSessionEnv = seshEnv
, tableContent = contentVar
}
let !tid = uniqueToWord64 tableId
!t = Table conf tableVar am tr tid sesh
let !t = Table conf tableVar am tr tableId sesh
-- Track the current table
delayedCommit reg $
modifyMVar_ (sessionOpenTables seshEnv) $
pure . Map.insert (uniqueToWord64 tableId) t
pure . Map.insert tableId t
pure $! t

{-# SPECIALISE close :: Table IO h -> IO () #-}
Expand Down Expand Up @@ -943,7 +936,7 @@ data CursorEnv m h = CursorEnv {
-- === Cursor-specific

-- | Session-unique identifier for this cursor.
, cursorId :: !Word64
, cursorId :: !CursorId
-- | Readers are immediately discarded once they are 'Readers.Drained',
-- so if there is a 'Just', we can assume that it has further entries.
-- Note that, while the readers do retain references on the blob files
Expand Down Expand Up @@ -997,7 +990,7 @@ newCursor ::
newCursor !offsetKey t = withOpenTable t $ \thEnv -> do
let cursorSession = tableSession t
let cursorSessionEnv = tableSessionEnv thEnv
cursorId <- uniqueToWord64 <$>
cursorId <- uniqueToCursorId <$>
incrUniqCounter (sessionUniqCounter cursorSessionEnv)
let cursorTracer = TraceCursor cursorId `contramap` sessionTracer cursorSession
traceWith cursorTracer $ TraceCreateCursor (tableId t)
Expand Down
2 changes: 1 addition & 1 deletion src/Database/LSMTree/Internal/Paths.hs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ mkSnapshotName s
-- | The file name for a table's write buffer blob file
tableBlobPath :: SessionRoot -> Unique -> FsPath
tableBlobPath session n =
getActiveDir (activeDir session) </> mkFsPath [show (uniqueToWord64 n)] <.> "wbblobs"
getActiveDir (activeDir session) </> mkFsPath [show (uniqueToInt n)] <.> "wbblobs"

{-------------------------------------------------------------------------------
Run paths
Expand Down
11 changes: 9 additions & 2 deletions src/Database/LSMTree/Internal/RunNumber.hs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
module Database.LSMTree.Internal.RunNumber (
RunNumber (..),
TableId (..),
CursorId (..),
) where

import Control.DeepSeq (NFData)
import Data.Word (Word64)

newtype RunNumber = RunNumber Word64
newtype RunNumber = RunNumber Int
deriving newtype (Eq, Ord, Show, NFData)

newtype TableId = TableId Int
deriving newtype (Eq, Ord, Show, NFData)

newtype CursorId = CursorId Int
deriving newtype (Eq, Ord, Show, NFData)
6 changes: 3 additions & 3 deletions src/Database/LSMTree/Internal/Snapshot.hs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ import Database.LSMTree.Internal.Run (Run)
import qualified Database.LSMTree.Internal.Run as Run
import Database.LSMTree.Internal.RunNumber
import Database.LSMTree.Internal.UniqCounter (UniqCounter,
incrUniqCounter, uniqueToRunNumber, uniqueToWord64)
incrUniqCounter, uniqueToInt, uniqueToRunNumber)
import Database.LSMTree.Internal.WriteBuffer (WriteBuffer)
import Database.LSMTree.Internal.WriteBufferBlobs (WriteBufferBlobs)
import qualified Database.LSMTree.Internal.WriteBufferBlobs as WBB
Expand Down Expand Up @@ -298,7 +298,7 @@ openWriteBuffer ::
-> m (WriteBuffer, Ref (WriteBufferBlobs m h))
openWriteBuffer reg resolve hfs hbio uc activeDir snapWriteBufferPaths = do
-- Copy the write buffer blobs file to the active directory and open it.
activeWriteBufferNumber <- uniqueToWord64 <$> incrUniqCounter uc
activeWriteBufferNumber <- uniqueToInt <$> incrUniqCounter uc
let activeWriteBufferBlobPath =
getActiveDir activeDir </> FS.mkFsPath [show activeWriteBufferNumber] <.> "wbblobs"
copyFile reg hfs hbio (writeBufferBlobPath snapWriteBufferPaths) activeWriteBufferBlobPath
Expand Down Expand Up @@ -365,7 +365,7 @@ snapshotRuns reg hbio0 (NamedSnapshotDir targetDir) levels = do
--
-- The result must ultimately be released using 'releaseRuns'.
openRuns ::
(MonadMask m, MonadSTM m, MonadST m, MonadMVar m)
(MonadMask m, MonadSTM m, MonadST m)
=> ActionRegistry m
-> HasFS m h
-> HasBlockIO m h
Expand Down
4 changes: 2 additions & 2 deletions src/Database/LSMTree/Internal/Snapshot/Codec.hs
Original file line number Diff line number Diff line change
Expand Up @@ -453,10 +453,10 @@ instance DecodeVersioned (V.Vector RunNumber) where
-- RunNumber

instance Encode RunNumber where
encode (RunNumber x) = encodeWord64 x
encode (RunNumber x) = encodeInt x

instance DecodeVersioned RunNumber where
decodeVersioned V0 = RunNumber <$> decodeWord64
decodeVersioned V0 = RunNumber <$> decodeInt

-- SnapIncomingRun

Expand Down
41 changes: 24 additions & 17 deletions src/Database/LSMTree/Internal/UniqCounter.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,43 @@ module Database.LSMTree.Internal.UniqCounter (
newUniqCounter,
incrUniqCounter,
Unique,
uniqueToWord64,
uniqueToInt,
uniqueToRunNumber,
uniqueToTableId,
uniqueToCursorId,
) where

import Control.Concurrent.Class.MonadMVar.Strict
import Data.Coerce (coerce)
import Data.Word (Word64)
import Control.Monad.Primitive (PrimMonad, PrimState)
import Data.Primitive.PrimVar as P
import Database.LSMTree.Internal.RunNumber

-- | A newtype wrapper around 'Word64'.
newtype Unique = Unique Word64
-- | A unique value derived from a 'UniqCounter'.
newtype Unique = Unique Int

-- | Avoid this function, use specialised versions like 'uniqueToRunNumber' if possible.
uniqueToWord64 :: Unique -> Word64
uniqueToWord64 = coerce
-- | Use specialised versions like 'uniqueToRunNumber' where possible.
uniqueToInt :: Unique -> Int
uniqueToInt (Unique n) = n

uniqueToRunNumber :: Unique -> RunNumber
uniqueToRunNumber = coerce
uniqueToRunNumber (Unique n) = RunNumber n

uniqueToTableId :: Unique -> TableId
uniqueToTableId (Unique n) = TableId n

uniqueToCursorId :: Unique -> CursorId
uniqueToCursorId (Unique n) = CursorId n

-- | An atomic counter for producing 'Unique' values.
--
-- TODO: could we use a PrimVar here?
newtype UniqCounter m = UniqCounter (StrictMVar m Word64)
newtype UniqCounter m = UniqCounter (PrimVar (PrimState m) Int)

{-# INLINE newUniqCounter #-}
newUniqCounter :: MonadMVar m => Word64 -> m (UniqCounter m)
newUniqCounter x = UniqCounter <$> newMVar x
newUniqCounter :: PrimMonad m => Int -> m (UniqCounter m)
newUniqCounter = fmap UniqCounter . P.newPrimVar

{-# INLINE incrUniqCounter #-}
-- | Return the current state of the atomic counter, and then increment the
-- | Atomically, return the current state of the counter, and increment the
-- counter.
incrUniqCounter :: MonadMVar m => UniqCounter m -> m Unique
incrUniqCounter (UniqCounter uniqVar) = modifyMVar uniqVar (\x -> pure ((x+1), Unique x))
incrUniqCounter :: PrimMonad m => UniqCounter m -> m Unique
incrUniqCounter (UniqCounter uniqVar) =
Unique <$> P.fetchAddInt uniqVar 1
5 changes: 2 additions & 3 deletions test/Test/Database/LSMTree/Internal/RunReaders.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import Data.Foldable (toList, traverse_)
import qualified Data.Map.Strict as Map
import Data.Proxy (Proxy (..))
import qualified Data.Vector as V
import Data.Word (Word64)
import Database.LSMTree.Extras (showPowersOf)
import Database.LSMTree.Extras.Generators (KeyForIndexCompact (..))
import Database.LSMTree.Extras.RunData
Expand Down Expand Up @@ -300,7 +299,7 @@ runRealMonad hfs hbio st = (`runStateT` st) . (`runReaderT` (hfs, hbio))

data RealState =
RealState
!Word64 -- ^ number of runs created so far (to generate fresh run numbers)
!Int -- ^ number of runs created so far (to generate fresh run numbers)
!(Maybe ReadersCtx)

-- | Readers, together with the runs being read, so they can be cleaned up at the end
Expand Down Expand Up @@ -354,7 +353,7 @@ runIO act lu = case act of
return Nothing
Just readers ->
return $ Just (wbblobs, runs, readers)
put (RealState (numRuns + fromIntegral (length wbs)) newReaders)
put (RealState (numRuns + length wbs) newReaders)
return (Right ())
PeekKey -> expectReaders $ \_ r -> do
(,) HasMore <$> Readers.peekKey r
Expand Down
Loading