Skip to content

Commit

Permalink
Merge pull request #111 from TheButlah/transport-features
Browse files Browse the repository at this point in the history
Feature-gate each transport type
  • Loading branch information
Alexei-Kornienko authored Dec 28, 2020
2 parents 929ae61 + bd2a412 commit 3c65602
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 15 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/main-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: clippy
args: --all --all-targets --no-default-features --features async-std-runtime -- --deny warnings
args: --all --all-targets --no-default-features --features async-std-runtime,all-transport -- --deny warnings

test:
name: Test
Expand Down Expand Up @@ -58,7 +58,7 @@ jobs:
uses: actions-rs/cargo@v1
with:
command: test
args: --all --no-default-features --features async-std-runtime
args: --all --no-default-features --features async-std-runtime,all-transport

fmt:
name: Formatting
Expand Down
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ license = "MIT"
repository = "https://github.com/zeromq/zmq.rs"

[features]
default = ["tokio-runtime"]
default = ["tokio-runtime", "all-transport"]
tokio-runtime = ["tokio", "tokio-util"]
async-std-runtime = ["async-std"]
all-transport = ["ipc-transport", "tcp-transport"]
ipc-transport = []
tcp-transport = []

[dependencies]
thiserror = "1"
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ Feature flags provide a way to customize the functionality provided by this libr
Features:
- (default) `tokio-runtime`: Use `tokio` as your async runtime.
- `async-std-runtime`: Use `async-std` as your async runtime.
- (default) `all-transport`: Enable all the `*-transport` flags
- `ipc-transport`: Enable IPC as a transport mechanism
- `tcp-transport`: Enable TPC as a transport mechanism

## Contributing
Contributions are welcome! See our issue tracker for a list of the things we need help with.
Expand Down
3 changes: 1 addition & 2 deletions src/codec/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,7 @@ impl<T> FrameableWrite for T where T: futures::AsyncWrite + Unpin + Send + Sync
pub(crate) type ZmqFramedRead = futures_codec::FramedRead<Box<dyn FrameableRead>, ZmqCodec>;
pub(crate) type ZmqFramedWrite = futures_codec::FramedWrite<Box<dyn FrameableWrite>, ZmqCodec>;

/// Equivalent to [`futures_codec::Framed<T, ZmqCodec>`] or
/// [`tokio_util::codec::Framed`]
/// Equivalent to [`futures_codec::Framed<T, ZmqCodec>`]
pub struct FramedIo {
pub read_half: ZmqFramedRead,
pub write_half: ZmqFramedWrite,
Expand Down
2 changes: 1 addition & 1 deletion src/endpoint/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use thiserror::Error;

/// Represents an error when parsing an [`Endpoint`]
/// Represents an error when parsing an [`crate::Endpoint`]
#[derive(Error, Debug)]
pub enum EndpointError {
#[error("Failed to parse IP address or port")]
Expand Down
1 change: 1 addition & 0 deletions src/task_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub struct TaskHandle<T> {
join_handle: async_rt::task::JoinHandle<ZmqResult<T>>,
}
impl<T> TaskHandle<T> {
#[allow(unused)]
pub(crate) fn new(
stop_channel: futures::channel::oneshot::Sender<()>,
join_handle: async_rt::task::JoinHandle<ZmqResult<T>>,
Expand Down
60 changes: 51 additions & 9 deletions src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,44 @@
#[cfg(feature = "ipc-transport")]
mod ipc;
#[cfg(feature = "tcp-transport")]
mod tcp;

use crate::codec::FramedIo;
use crate::endpoint::Endpoint;
use crate::error::ZmqError;
use crate::task_handle::TaskHandle;
use crate::ZmqResult;

macro_rules! do_if_enabled {
($feature:literal, $body:expr) => {{
#[cfg(feature = $feature)]
{
$body
}

#[cfg(not(feature = $feature))]
panic!(format!("feature \"{}\" is not enabled", $feature))
}};
}

/// Connectes to the given endpoint
///
/// # Panics
/// Panics if the requested endpoint uses a transport type that isn't enabled
pub(crate) async fn connect(endpoint: Endpoint) -> ZmqResult<(FramedIo, Endpoint)> {
match endpoint {
Endpoint::Tcp(host, port) => tcp::connect(host, port).await,
Endpoint::Ipc(Some(path)) => ipc::connect(path).await,
Endpoint::Ipc(None) => Err(ZmqError::Socket("Cannot connect to an unnamed ipc socket")),
Endpoint::Tcp(_host, _port) => {
do_if_enabled!("tcp-transport", tcp::connect(_host, _port).await)
}
Endpoint::Ipc(_path) => do_if_enabled!(
"ipc-transport",
if let Some(path) = _path {
ipc::connect(path).await
} else {
Err(crate::error::ZmqError::Socket(
"Cannot connect to an unnamed ipc socket",
))
}
),
}
}

Expand All @@ -25,22 +52,36 @@ pub struct AcceptStopHandle(pub(crate) TaskHandle<()>);
///
/// Returns a ZmqResult, which when Ok is a tuple of the resolved bound
/// endpoint, as well as a channel to stop the async accept task
///
/// # Panics
/// Panics if the requested endpoint uses a transport type that isn't enabled
pub(crate) async fn begin_accept<T>(
endpoint: Endpoint,
cback: impl Fn(ZmqResult<(FramedIo, Endpoint)>) -> T + Send + 'static,
) -> ZmqResult<(Endpoint, AcceptStopHandle)>
where
T: std::future::Future<Output = ()> + Send + 'static,
{
let _cback = cback;
match endpoint {
Endpoint::Tcp(host, port) => tcp::begin_accept(host, port, cback).await,
Endpoint::Ipc(Some(path)) => ipc::begin_accept(path, cback).await,
Endpoint::Ipc(None) => Err(ZmqError::Socket(
"Cannot begin accepting peers at an unnamed ipc socket",
)),
Endpoint::Tcp(_host, _port) => do_if_enabled!(
"tcp-transport",
tcp::begin_accept(_host, _port, _cback).await
),
Endpoint::Ipc(_path) => do_if_enabled!(
"ipc-transport",
if let Some(path) = _path {
ipc::begin_accept(path, _cback).await
} else {
Err(crate::error::ZmqError::Socket(
"Cannot begin accepting peers at an unnamed ipc socket",
))
}
),
}
}

#[allow(unused)]
#[cfg(feature = "tokio-runtime")]
fn make_framed<T>(stream: T) -> FramedIo
where
Expand All @@ -51,6 +92,7 @@ where
FramedIo::new(Box::new(read.compat()), Box::new(write.compat_write()))
}

#[allow(unused)]
#[cfg(feature = "async-std-runtime")]
fn make_framed<T>(stream: T) -> FramedIo
where
Expand Down

0 comments on commit 3c65602

Please sign in to comment.