Skip to content

Commit

Permalink
Merge pull request #17 from well-typed/big-batch
Browse files Browse the repository at this point in the history
Big batch
  • Loading branch information
dcoutts authored Jun 17, 2024
2 parents 4c5c933 + 385c84a commit 3bfff11
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 31 deletions.
100 changes: 70 additions & 30 deletions src/System/IO/BlockIO.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ import System.IO.BlockIO.URing (IOResult(..))
-- | IO context: a handle used by threads submitting IO batches.
--
data IOCtx = IOCtx {
-- | This is initialised from the 'ioctxBatchSizeLimit' from the 'IOCtxParams'.
ioctxBatchSizeLimit' :: !Int,

-- | IO concurrency control: used by writers to reserve the
-- right to submit an IO batch of a given size, and by the
-- completion thread to return it on batch completion.
Expand Down Expand Up @@ -111,6 +114,7 @@ initIOCtx IOCtxParams {ioctxBatchSizeLimit, ioctxConcurrencyLimit} = do
initialBatchIxs = [0 .. ioctxConcurrencyLimit-1]
writeList2Chan ioctxChanIOBatchIx initialBatchIxs
return IOCtx {
ioctxBatchSizeLimit' = ioctxBatchSizeLimit,
ioctxQSemN,
ioctxURing,
ioctxChanIOBatch,
Expand Down Expand Up @@ -184,33 +188,67 @@ data IOOp m = IOOpRead !Fd !FileOffset !(MutableByteArray (PrimState m)) !Int !
--
submitIO :: IOCtx -> V.Vector (IOOp IO) -> IO (VU.Vector IOResult)
submitIO IOCtx {
ioctxBatchSizeLimit',
ioctxQSemN,
ioctxURing,
ioctxChanIOBatch,
ioctxChanIOBatchIx
}
ioops = do
let !iobatchOpCount = V.length ioops
waitQSemN ioctxQSemN iobatchOpCount
iobatchIx <- readChan ioctxChanIOBatchIx
iobatchCompletion <- newEmptyMVar
let iobatchKeepAlives = ioops
writeChan ioctxChanIOBatch
IOBatch {
iobatchIx,
iobatchOpCount,
iobatchCompletion,
iobatchKeepAlives
}
withMVar ioctxURing $ \case
Nothing -> throwIO closed
Just uring -> do
-- print ("submitIO", iobatchOpCount)
V.iforM_ ioops $ \ioopix ioop ->
let !ioopid = packIOOpId iobatchIx ioopix
in
--print ioop >>
case ioop of
ioops
| iobatchOpCount == 0 = return VU.empty

| iobatchOpCount > ioctxBatchSizeLimit' = do
-- create completion mvars for each sub-batch
batches <- forM (chunksOf ioctxBatchSizeLimit' ioops) $ \b -> do
iobatchCompletion <- newEmptyMVar
return (b, iobatchCompletion)

forM_ batches $ \(batch, iobatchCompletion) -> do
let !iobatchOpCount' = V.length batch
waitQSemN ioctxQSemN iobatchOpCount'
iobatchIx <- readChan ioctxChanIOBatchIx
let iobatchKeepAlives = batch
writeChan ioctxChanIOBatch
IOBatch {
iobatchIx,
iobatchOpCount = iobatchOpCount',
iobatchCompletion,
iobatchKeepAlives
}

submitBatch iobatchIx batch

waitAndCombine batches

| otherwise = do
waitQSemN ioctxQSemN iobatchOpCount
iobatchIx <- readChan ioctxChanIOBatchIx
iobatchCompletion <- newEmptyMVar
let iobatchKeepAlives = ioops
writeChan ioctxChanIOBatch
IOBatch {
iobatchIx,
iobatchOpCount,
iobatchCompletion,
iobatchKeepAlives
}
submitBatch iobatchIx ioops
takeMVar iobatchCompletion
where
!iobatchOpCount = V.length ioops

guardPinned mba = unless (isMutableByteArrayPinned mba) $ throwIO notPinned
closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing
notPinned = mkIOError InvalidArgument "MutableByteArray is unpinned" Nothing Nothing

{-# INLINE submitBatch #-}
submitBatch iobatchIx batch =
withMVar ioctxURing $ \case
Nothing -> throwIO closed
Just uring -> do
V.iforM_ batch $ \ioopix ioop ->
let !ioopid = packIOOpId iobatchIx ioopix in
case ioop of
IOOpRead fd off buf bufOff cnt -> do
guardPinned buf
URing.prepareRead uring fd off
Expand All @@ -221,14 +259,16 @@ submitIO IOCtx {
URing.prepareWrite uring fd off
(mutableByteArrayContents buf `plusPtr` bufOff)
cnt ioopid
URing.submitIO uring
-- print ("submitIO", "submitting done")
takeMVar iobatchCompletion
where
closed = mkIOError ResourceVanished "IOCtx closed" Nothing Nothing
guardPinned mba = do
unless (isMutableByteArrayPinned mba) $ throwIO notPinned
notPinned = mkIOError InvalidArgument "MutableByteArray is unpinned" Nothing Nothing
URing.submitIO uring

waitAndCombine :: [(a, MVar (VU.Vector IOResult))]
-> IO (VU.Vector IOResult)
waitAndCombine xs = VU.concat <$!> forM xs (takeMVar . snd)

chunksOf :: Int -> V.Vector a -> [V.Vector a]
chunksOf n xs
| V.length xs == 0 = []
| otherwise = V.take n xs : chunksOf n (V.drop n xs)

data IOBatch = IOBatch {
iobatchIx :: !IOBatchIx,
Expand Down
29 changes: 28 additions & 1 deletion test/test.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
{-# LANGUAGE ViewPatterns #-}
module Main (main) where

import Control.Exception (SomeException, try)
import Control.Exception (SomeException, try)
import qualified Data.Primitive.ByteArray as P
import qualified Data.Vector as V
import GHC.IO.FD (FD (..))
import GHC.IO.Handle.FD (handleToFd)
import System.IO
import System.IO.BlockIO
import Test.Tasty
import Test.Tasty.HUnit
Expand All @@ -11,6 +17,10 @@ main = defaultMain tests
tests :: TestTree
tests = testGroup "test"
[ testCase "example_initClose" example_initClose
, testCase "example_initReadClose 32" $ example_initReadClose 32
, testCase "example_initReadClose 96" $ example_initReadClose 96
, testCase "example_initReadClose 200" $ example_initReadClose 200
, testCase "example_initEmptyClose" example_initEmptyClose
, testCase "example_closeIsIdempotent" example_closeIsIdempotent
]

Expand All @@ -19,6 +29,23 @@ example_initClose = do
ctx <- initIOCtx defaultIOCtxParams
closeIOCtx ctx

example_initReadClose :: Int -> Assertion
example_initReadClose size = do
ctx <- initIOCtx defaultIOCtxParams
withFile "blockio-uring.cabal" ReadMode $ \hdl -> do
-- handleToFd is available since base-4.16.0.0
FD { fdFD = fromIntegral -> fd } <- handleToFd hdl
mba <- P.newPinnedByteArray 10 -- TODO: shouldn't use the same array for all ops :)
submitIO ctx $ V.replicate size $
IOOpRead fd 0 mba 0 10
closeIOCtx ctx

example_initEmptyClose :: Assertion
example_initEmptyClose = do
ctx <- initIOCtx defaultIOCtxParams
_ <- submitIO ctx V.empty
closeIOCtx ctx

example_closeIsIdempotent :: Assertion
example_closeIsIdempotent = do
ctx <- initIOCtx defaultIOCtxParams
Expand Down

0 comments on commit 3bfff11

Please sign in to comment.