Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add blockio-uring dependency #103

Merged
merged 1 commit into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .github/workflows/haskell.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,25 @@ jobs:
cabal-version: ${{ matrix.cabal }}
cabal-update: true

- name: Install liburing (on Linux)
id: setup-liburing
if: matrix.os == 'ubuntu-latest'
run: |
sudo apt-get update
sudo apt-get -y install pkg-config
echo "PKG_CONFIG_PATH=$PKG_CONFIG_PATH"
mkdir tmp
cd tmp
git clone https://github.com/axboe/liburing.git
cd liburing
git checkout liburing-2.5
./configure --cc=gcc --cxx=g++
make -j$(nproc)
sudo make install
cd ../..
sudo rm -rf ./tmp
pkg-config --modversion liburing

- name: Configure the build
run: |
cabal configure --enable-tests --enable-benchmark --ghc-options="-Werror" --ghc-options="-fno-ignore-asserts"
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ It has a number of custom features that are primarily tailored towards performan

## System requirements

This library only supports 64-bit, little-endian systems.
This library only supports 64-bit, little-endian systems.

Provide the -threaded flag to executables, test suites and benchmark suites if
you use this library on Linux systems.
15 changes: 15 additions & 0 deletions cabal.project
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,18 @@ package lsm-tree
-- apply this to all components
-- relevant mostly only for development & testing
ghc-options: -fno-ignore-asserts

if(os(linux))
source-repository-package
type: git
location: https://github.com/well-typed/blockio-uring
tag: bbeb81130ec3eafd8ced81564cc8bd46d24aff08

-- fs-api with support for I/O using user-supplied buffers
source-repository-package
type: git
location: https://github.com/input-output-hk/fs-sim
tag: 6a4a456640dd1fed434ccb4cbb553482afe8e2d4
subdir:
fs-api
fs-sim
99 changes: 99 additions & 0 deletions fs-api-blockio/src-linux/System/FS/BlockIO/Async.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{-# LANGUAGE NamedFieldPuns #-}

module System.FS.BlockIO.Async (
asyncHasBlockIO
) where

import Control.Exception
import qualified Control.Exception as E
import Control.Monad
import Foreign.C.Error
import GHC.IO.Exception
import GHC.Stack
import System.FS.API (BufferOffset (..), FsErrorPath, Handle (..),
HasFS (..), SomeHasFS (..), ioToFsError)
import qualified System.FS.BlockIO.API as API
import System.FS.BlockIO.API (IOOp (..), IOResult (..), ioopHandle)
import System.FS.IO (HandleIO)
import System.FS.IO.Internal.Handle
import qualified System.IO.BlockIO as I
import System.IO.Error (ioeSetErrorString, isResourceVanishedError)
import System.Posix.Types

-- | IO instantiation of 'HasBlockIO', using @blockio-uring@.
asyncHasBlockIO :: HasFS IO HandleIO -> Maybe API.IOCtxParams -> IO (API.HasBlockIO IO HandleIO)
asyncHasBlockIO hasFS ctxParams = do
ctx <- I.initIOCtx (maybe I.defaultIOCtxParams ctxParamsConv ctxParams)
pure $ API.HasBlockIO {
API.close = I.closeIOCtx ctx
, API.submitIO = submitIO hasFS ctx
}

ctxParamsConv :: API.IOCtxParams -> I.IOCtxParams
ctxParamsConv API.IOCtxParams{API.ioctxBatchSizeLimit, API.ioctxConcurrencyLimit} =
I.IOCtxParams {
I.ioctxBatchSizeLimit = ioctxBatchSizeLimit
, I.ioctxConcurrencyLimit = ioctxConcurrencyLimit
}

submitIO ::
HasFS IO HandleIO
-> I.IOCtx
-> [IOOp IO HandleIO]
-> IO [IOResult]
submitIO hasFS ioctx ioops = do
ioops' <- mapM ioopConv ioops
ress <- I.submitIO ioctx ioops' `catch` rethrowClosedError
zipWithM rethrowErrno ioops ress
where
rethrowClosedError :: IOError -> IO a
rethrowClosedError e@IOError{} =
-- Pattern matching on the error is brittle, because the structure of
-- the exception might change between versions of @blockio-uring@.
-- Nonetheless, it's better than nothing.
if isResourceVanishedError e && ioe_location e == "IOCtx closed"
then throwIO (API.mkClosedError (SomeHasFS hasFS) "submitIO")
else throwIO e

rethrowErrno ::
HasCallStack
=> IOOp IO HandleIO
-> I.IOResult
-> IO IOResult
rethrowErrno ioop res = do
case res of
I.IOResult c -> pure (IOResult c)
I.IOError e -> throwAsFsError e
where
throwAsFsError :: HasCallStack => Errno -> IO a
throwAsFsError errno = E.throwIO $ ioToFsError fep (fromErrno errno)

fep :: FsErrorPath
fep = mkFsErrorPath hasFS (handlePath (ioopHandle ioop))

fromErrno :: Errno -> IOError
fromErrno errno = ioeSetErrorString
(errnoToIOError "submitIO" errno Nothing Nothing)
("submitIO failed: " <> ioopType)

ioopType :: String
ioopType = case ioop of
IOOpRead{} -> "IOOpRead"
IOOpWrite{} -> "IOOpWrite"

ioopConv :: IOOp IO HandleIO -> IO (I.IOOp IO)
ioopConv (IOOpRead h off buf bufOff c) = handleFd h >>= \fd ->
pure (I.IOOpRead fd off buf (unBufferOffset bufOff) c)
ioopConv (IOOpWrite h off buf bufOff c) = handleFd h >>= \fd ->
pure (I.IOOpWrite fd off buf (unBufferOffset bufOff) c)

-- This only checks whether the handle is open when we convert to an Fd. After
-- that, the handle could be closed when we're still performing blockio
-- operations.
--
-- TODO: if the handle were to have a reader/writer lock, then we could take the
-- reader lock in 'submitIO'. However, the current implementation of 'Handle'
-- only allows mutally exclusive access to the underlying file descriptor, so it
-- would require a change in @fs-api@. See [fs-sim#49].
handleFd :: Handle HandleIO -> IO Fd
handleFd h = withOpenHandle "submitIO" (handleRaw h) pure
15 changes: 15 additions & 0 deletions fs-api-blockio/src-linux/System/FS/BlockIO/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
module System.FS.BlockIO.Internal (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Async as I
import System.FS.IO (HandleIO)

ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO hfs _bhfs = I.asyncHasBlockIO hfs
20 changes: 20 additions & 0 deletions fs-api-blockio/src-macos/System/FS/BlockIO/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module System.FS.BlockIO.Internal (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Serial as Serial
import System.FS.IO (HandleIO)

-- | For now we use the portable serial implementation of HasBlockIO. If you
-- want to provide a proper async I/O implementation for OSX, then this is where
-- you should put it.
--
-- The recommended choice would be to use the POSIX AIO API.
ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO hasFS hasBufFS _ = Serial.serialHasBlockIO hasFS hasBufFS
jorisdral marked this conversation as resolved.
Show resolved Hide resolved
20 changes: 20 additions & 0 deletions fs-api-blockio/src-windows/System/FS/BlockIO/Internal.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
module System.FS.BlockIO.Internal (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Serial as Serial
import System.FS.IO (HandleIO)

-- | For now we use the portable serial implementation of HasBlockIO. If you
-- want to provide a proper async I/O implementation for Windows, then this is
-- where you should put it.
--
-- The recommended choice would be to use the Win32 IOCP API.
ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO hasFS hasBufFS _ = Serial.serialHasBlockIO hasFS hasBufFS
jorisdral marked this conversation as resolved.
Show resolved Hide resolved
64 changes: 64 additions & 0 deletions fs-api-blockio/src/System/FS/BlockIO/API.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RankNTypes #-}

module System.FS.BlockIO.API (
HasBlockIO (..)
, IOCtxParams (..)
, mkClosedError
, IOOp (..)
, ioopHandle
, IOResult (..)
-- * Re-exports
, ByteCount
, FileOffset
) where

import Control.Monad.Primitive (PrimMonad (PrimState))
import Data.Primitive.ByteArray (MutableByteArray)
import GHC.IO.Exception (IOErrorType (ResourceVanished))
import System.FS.API
import System.IO.Error (ioeSetErrorString, mkIOError)
import System.Posix.Types (ByteCount, FileOffset)
import Util.CallStack

-- | Abstract interface for submitting large batches of I\/O operations.
data HasBlockIO m h = HasBlockIO {
-- | (Idempotent) close the interface.
--
-- Using 'submitIO' after 'close' should thrown an 'FsError' exception. See
-- 'mkClosedError'.
close :: HasCallStack => m ()
-- | Submit a batch of I\/O operations and wait for the result.
--
-- Results correspond to input 'IOOp's in a pair-wise manner, i.e., one can
-- match 'IOOp's with 'IOResult's by zipping the input and output list.
--
-- If any of the I\/O operations fails, an 'FsError' exception will be thrown.
, submitIO :: HasCallStack => [IOOp m h] -> m [IOResult]
}

-- | Concurrency parameters for initialising a 'HasBlockIO. Can be ignored by
-- serial implementations.
data IOCtxParams = IOCtxParams {
ioctxBatchSizeLimit :: !Int,
ioctxConcurrencyLimit :: !Int
}

mkClosedError :: HasCallStack => SomeHasFS m -> String -> FsError
mkClosedError (SomeHasFS hasFS) loc = ioToFsError (mkFsErrorPath hasFS (mkFsPath [])) ioerr
where ioerr =
ioeSetErrorString
(mkIOError ResourceVanished loc Nothing Nothing)
("HasBlockIO closed: " <> loc)


data IOOp m h =
IOOpRead !(Handle h) !FileOffset !(MutableByteArray (PrimState m)) !BufferOffset !ByteCount
| IOOpWrite !(Handle h) !FileOffset !(MutableByteArray (PrimState m)) !BufferOffset !ByteCount

ioopHandle :: IOOp m h -> Handle h
ioopHandle (IOOpRead h _ _ _ _) = h
ioopHandle (IOOpWrite h _ _ _ _) = h

-- | Number of read/written bytes.
newtype IOResult = IOResult ByteCount
16 changes: 16 additions & 0 deletions fs-api-blockio/src/System/FS/BlockIO/IO.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
module System.FS.BlockIO.IO (
ioHasBlockIO
) where

import System.FS.API (HasBufFS, HasFS)
import System.FS.BlockIO.API (HasBlockIO, IOCtxParams)
import qualified System.FS.BlockIO.Internal as I
import System.FS.IO (HandleIO)

-- | Platform-dependent IO instantiation of 'HasBlockIO'.
ioHasBlockIO ::
HasFS IO HandleIO
-> HasBufFS IO HandleIO
-> Maybe IOCtxParams
-> IO (HasBlockIO IO HandleIO)
ioHasBlockIO = I.ioHasBlockIO
60 changes: 60 additions & 0 deletions fs-api-blockio/src/System/FS/BlockIO/Serial.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@

module System.FS.BlockIO.Serial (
serialHasBlockIO
) where

import Control.Concurrent.Class.MonadMVar
import Control.Monad (unless)
import Control.Monad.Class.MonadThrow
import System.FS.API
import qualified System.FS.BlockIO.API as API
import System.FS.BlockIO.API (IOOp (..), IOResult (..))

-- | IO instantiation of 'HasBlockIO', using an existing 'HasFS'. Thus this
-- implementation does not take advantage of parallel I/O.
serialHasBlockIO ::
(MonadThrow m, MonadMVar m, Eq h)
=> HasFS m h
-> HasBufFS m h
-> m (API.HasBlockIO m h)
serialHasBlockIO hfs hbfs = do
ctx <- initIOCtx (SomeHasFS hfs)
pure $ API.HasBlockIO {
API.close = close ctx
, API.submitIO = submitIO hfs hbfs ctx
}

data IOCtx m = IOCtx { ctxFS :: SomeHasFS m, openVar :: MVar m Bool }

guardIsOpen :: (MonadMVar m, MonadThrow m) => IOCtx m -> m ()
guardIsOpen ctx = readMVar (openVar ctx) >>= \b ->
unless b $ throwIO (API.mkClosedError (ctxFS ctx) "submitIO")

initIOCtx :: MonadMVar m => SomeHasFS m -> m (IOCtx m)
initIOCtx someHasFS = IOCtx someHasFS <$> newMVar True

close :: MonadMVar m => IOCtx m -> m ()
close ctx = modifyMVar_ (openVar ctx) $ const (pure False)

submitIO ::
(MonadMVar m, MonadThrow m)
=> HasFS m h
-> HasBufFS m h
-> IOCtx m
-> [IOOp m h]
-> m [IOResult]
submitIO hfs hbfs ctx ioops = do
guardIsOpen ctx
mapM (ioop hfs hbfs) ioops

-- | Perform the IOOp using synchronous I\/O.
ioop ::
MonadThrow m
=> HasFS m h
-> HasBufFS m h
-> IOOp m h
-> m IOResult
ioop hfs hbfs (IOOpRead h off buf bufOff c) =
IOResult <$> hGetBufExactlyAt hfs hbfs h buf bufOff c (fromIntegral off)
ioop _hfs hbfs (IOOpWrite h off buf bufOff c) =
IOResult <$> hPutBufExactlyAt hbfs h buf bufOff c (fromIntegral off)
Loading
Loading