Skip to content

Commit

Permalink
feat(crates/engine): improve fetchers with OpenDAL
Browse files Browse the repository at this point in the history
  • Loading branch information
xrelkd committed Dec 24, 2023
1 parent 97308a8 commit fc4f2ad
Show file tree
Hide file tree
Showing 5 changed files with 101 additions and 118 deletions.
15 changes: 12 additions & 3 deletions crates/engine/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ pub type Result<T> = std::result::Result<T, Error>;
#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Fetching directory is not supported"))]
FetchingDirectory,

#[snafu(display("The chunk size is invalid, value: {value}"))]
BadChunkSize { value: u64 },

Expand Down Expand Up @@ -44,9 +47,6 @@ pub enum Error {
#[snafu(display("Error occurs while writing file `{}`, error: {source}", file_path.display()))]
WriteFile { file_path: PathBuf, source: std::io::Error },

#[snafu(display("Error occurs while reading file `{}`, error: {source}", file_path.display()))]
ReadFile { file_path: PathBuf, source: std::io::Error },

#[snafu(display("Error occurs while flushing file `{}`, error: {source}", file_path.display()))]
FlushFile { file_path: PathBuf, source: std::io::Error },

Expand All @@ -56,6 +56,15 @@ pub enum Error {
#[snafu(display("Error occurs while resizing file `{}`, error: {source}", file_path.display()))]
ResizeFile { file_path: PathBuf, source: std::io::Error },

#[snafu(display("Error occurs while creating reader, error: {source}"))]
CreateReader { source: opendal::Error },

#[snafu(display("Error occurs while reading from reader, error: {source}"))]
ReadFromReader { source: std::io::Error },

#[snafu(display("Error occurs while seeking in reader, error: {source}"))]
SeekReader { source: std::io::Error },

#[snafu(display("Error occurs while cloning file instance `{}`, error: {source}", file_path.display()))]
CloneFileInstance { file_path: PathBuf, source: std::io::Error },

Expand Down
101 changes: 32 additions & 69 deletions crates/engine/src/fetcher/fs.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
use std::{
io::SeekFrom,
path::{Path, PathBuf},
sync::Arc,
};
use std::path::PathBuf;

use bytes::{Bytes, BytesMut};
use opendal::{services, Operator};
use snafu::ResultExt;
use tokio::{
fs::{File, OpenOptions},
io::{AsyncReadExt, AsyncSeekExt},
sync::Mutex,
};

use crate::{error, error::Result, fetcher::Metadata};

const MAX_BUFFER_SIZE: usize = 1 << 16;
use crate::{
error,
error::{Error, Result},
fetcher::{generic::ByteStream, Metadata},
};

#[derive(Clone, Debug)]
pub struct Fetcher {
file: Arc<Mutex<File>>,
operator: Operator,

file_path: PathBuf,

length: u64,
}

impl Fetcher {
pub async fn new(url: reqwest::Url) -> Result<Self> {
let mut builder = services::Fs::default();
let _ = builder.root("/");
let file_path = PathBuf::from(url.path());
let file = OpenOptions::new()
.read(true)
.open(&file_path)
.await
.with_context(|_| error::OpenFileSnafu { file_path: file_path.clone() })?;
let metadata = file
.metadata()

let operator =
Operator::new(builder).with_context(|_| error::BuildOpenDALOperatorSnafu)?.finish();

let metadata = operator
.stat(&file_path.to_string_lossy())
.await
.with_context(|_| error::GetFileLengthSnafu { file_path: file_path.clone() })?;
.with_context(|_| error::GetMetadataFromMinioSnafu)?;

Ok(Self { file: Arc::new(Mutex::new(file)), file_path, length: metadata.len() })
if metadata.is_dir() {
return Err(Error::FetchingDirectory);
}

Ok(Self { operator, file_path, length: metadata.content_length() })
}

pub fn fetch_metadata(&self) -> Metadata {
Expand All @@ -49,53 +49,16 @@ impl Fetcher {
}
}

pub fn fetch_all(&mut self) -> ByteStream { self.fetch_bytes(0, self.length - 1) }

pub fn fetch_bytes(&mut self, start: u64, end: u64) -> ByteStream {
ByteStream::new(&self.file_path, self.file.clone(), start, end.min(self.length))
}
}

pub struct ByteStream {
file_path: PathBuf,
file: Arc<Mutex<File>>,
start: u64,
end: u64,
}

impl ByteStream {
pub fn new<P>(file_path: P, file: Arc<Mutex<File>>, start: u64, end: u64) -> Self
where
P: AsRef<Path>,
{
Self { file_path: file_path.as_ref().to_path_buf(), file, start, end }
pub async fn fetch_all(&mut self) -> Result<ByteStream> {
self.fetch_bytes(0, self.length - 1).await
}

pub async fn bytes(&mut self) -> Result<Option<Bytes>> {
if self.start > self.end {
return Ok(None);
}

let capacity = MAX_BUFFER_SIZE
.min(usize::try_from(self.end - self.start + 1).unwrap_or(MAX_BUFFER_SIZE));
let mut buf = BytesMut::zeroed(capacity);

let mut file = self.file.lock().await;
let _ = file
.seek(SeekFrom::Start(self.start))
pub async fn fetch_bytes(&mut self, start: u64, end: u64) -> Result<ByteStream> {
let reader = self
.operator
.reader(&self.file_path.to_string_lossy())
.await
.with_context(|_| error::SeekFileSnafu { file_path: self.file_path.clone() })?;
let n = file
.read_exact(buf.as_mut())
.await
.with_context(|_| error::ReadFileSnafu { file_path: self.file_path.clone() })?;
drop(file);

if n == 0 {
Ok(None)
} else {
self.start += n as u64;
Ok(Some(buf.freeze()))
}
.context(error::CreateReaderSnafu)?;
Ok(ByteStream::new(reader, start, end))
}
}
42 changes: 42 additions & 0 deletions crates/engine/src/fetcher/generic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::io::SeekFrom;

use bytes::{Bytes, BytesMut};
use snafu::ResultExt;
use tokio::io::{AsyncReadExt, AsyncSeekExt};

use crate::{error, error::Result};

const MAX_BUFFER_SIZE: usize = 1 << 16;

pub struct ByteStream {
reader: opendal::Reader,
start: u64,
end: u64,
}

impl ByteStream {
pub const fn new(reader: opendal::Reader, start: u64, end: u64) -> Self {
Self { reader, start, end }
}

pub async fn bytes(&mut self) -> Result<Option<Bytes>> {
if self.start > self.end {
return Ok(None);
}

let capacity = MAX_BUFFER_SIZE
.min(usize::try_from(self.end - self.start + 1).unwrap_or(MAX_BUFFER_SIZE));
let mut buf = BytesMut::zeroed(capacity);

let _ =
self.reader.seek(SeekFrom::Start(self.start)).await.context(error::SeekReaderSnafu)?;
let n = self.reader.read_exact(buf.as_mut()).await.context(error::ReadFromReaderSnafu)?;

if n == 0 {
Ok(None)
} else {
self.start += n as u64;
Ok(Some(buf.freeze()))
}
}
}
44 changes: 6 additions & 38 deletions crates/engine/src/fetcher/minio.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{fmt, io::SeekFrom, path::PathBuf};
use std::{fmt, path::PathBuf};

use bytes::{Bytes, BytesMut};
use opendal::{services, Operator};
use snafu::ResultExt;
use tokio::io::{AsyncReadExt, AsyncSeekExt};

use crate::{error, error::Result, fetcher::Metadata};

const MAX_BUFFER_SIZE: usize = 1 << 16;
use crate::{
error,
error::Result,
fetcher::{generic::ByteStream, Metadata},
};

#[derive(Clone, Debug)]
pub struct Fetcher {
Expand Down Expand Up @@ -77,35 +77,3 @@ impl Fetcher {
self.fetch_bytes(0, length - 1).await
}
}

pub struct ByteStream {
reader: opendal::Reader,
start: u64,
end: u64,
}

impl ByteStream {
pub const fn new(reader: opendal::Reader, start: u64, end: u64) -> Self {
Self { reader, start, end }
}

pub async fn bytes(&mut self) -> Result<Option<Bytes>> {
if self.start > self.end {
return Ok(None);
}

let capacity = MAX_BUFFER_SIZE
.min(usize::try_from(self.end - self.start + 1).unwrap_or(MAX_BUFFER_SIZE));
let mut buf = BytesMut::zeroed(capacity);

let _ = self.reader.seek(SeekFrom::Start(self.start)).await.unwrap();
let n = self.reader.read_exact(buf.as_mut()).await.unwrap();

if n == 0 {
Ok(None)
} else {
self.start += n as u64;
Ok(Some(buf.freeze()))
}
}
}
17 changes: 9 additions & 8 deletions crates/engine/src/fetcher/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod fs;
mod generic;
mod http;
mod minio;

Expand Down Expand Up @@ -65,33 +66,33 @@ impl Fetcher {
pub async fn fetch_bytes(&mut self, start: u64, end: u64) -> Result<ByteStream> {
debug_assert!(start <= end);
match self {
Self::FileSystem(client) => Ok(ByteStream::FileSystem(client.fetch_bytes(start, end))),
Self::Http(client) => client.fetch_bytes(start, end).await.map(ByteStream::Http),
Self::Minio(client) => client.fetch_bytes(start, end).await.map(ByteStream::Minio),
Self::Minio(client) => client.fetch_bytes(start, end).await.map(ByteStream::Generic),
Self::FileSystem(client) => {
client.fetch_bytes(start, end).await.map(ByteStream::Generic)
}
}
}

pub async fn fetch_all(&mut self) -> Result<ByteStream> {
match self {
Self::FileSystem(client) => Ok(ByteStream::FileSystem(client.fetch_all())),
Self::Http(client) => client.fetch_all().await.map(ByteStream::Http),
Self::Minio(client) => client.fetch_all().await.map(ByteStream::Minio),
Self::Minio(client) => client.fetch_all().await.map(ByteStream::Generic),
Self::FileSystem(client) => client.fetch_all().await.map(ByteStream::Generic),
}
}
}

pub enum ByteStream {
FileSystem(fs::ByteStream),
Http(http::ByteStream),
Minio(minio::ByteStream),
Generic(generic::ByteStream),
}

impl ByteStream {
pub async fn bytes(&mut self) -> Result<Option<Bytes>> {
match self {
Self::FileSystem(stream) => stream.bytes().await,
Self::Http(stream) => stream.bytes().await,
Self::Minio(stream) => stream.bytes().await,
Self::Generic(stream) => stream.bytes().await,
}
}
}

0 comments on commit fc4f2ad

Please sign in to comment.