Skip to content
This repository has been archived by the owner on Oct 5, 2022. It is now read-only.

Commit

Permalink
Merge pull request #14 from tbrand/read-timeout2
Browse files Browse the repository at this point in the history
Add read-timeout option and refactor about http (not TLS) requests
  • Loading branch information
tbrand authored May 1, 2019
2 parents f0e65cd + 3ffdbba commit c81b904
Show file tree
Hide file tree
Showing 11 changed files with 350 additions and 168 deletions.
7 changes: 4 additions & 3 deletions dytp-cloud/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ fn join(
Box::new(f)
}

fn process(socket: TcpStream, manager: Box<Manager + Send>) {
let origin = Origin::new(socket);
fn process(socket: TcpStream, manager: Box<Manager + Send>, read_timeout: u64) {
let origin = Origin::new_with_timeout(socket, read_timeout);
let f = origin
.into_future()
.map_err(|(e, _)| e)
Expand Down Expand Up @@ -229,6 +229,7 @@ pub fn main_inner(
addr: SocketAddr,
healthcheck_timeout: u64,
node_deletion_timeout: u64,
read_timeout: u64,
) -> Result<()> {
let manager = manager::create(ManagerType::MEM);
let manager_healthcheck = manager.clone();
Expand All @@ -237,7 +238,7 @@ pub fn main_inner(
let tasks = listener
.incoming()
.for_each(move |socket| {
process(socket, manager.clone());
process(socket, manager.clone(), read_timeout);
Ok(())
})
.map_err(|e| {
Expand Down
79 changes: 53 additions & 26 deletions dytp-connection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use dytp_protocol as protocol;
use dytp_protocol::delim::Delim;
use failure::Error;
use futures::prelude::*;
use std::time::{Duration, Instant};
use tokio::prelude::*;

pub trait Connection {
Expand All @@ -26,6 +27,10 @@ pub trait Connection {
fn rb_mut(&mut self) -> &mut BytesMut;
fn read_delim(&self) -> &Delim;
fn read_delim_mut(&mut self) -> &mut Delim;
fn read_timeout(&self) -> &Duration;
fn read_timeout_mut(&mut self) -> &mut Duration;
fn read_since(&self) -> &Option<Instant>;
fn read_since_mut(&mut self) -> &mut Option<Instant>;
fn write_delim(&self) -> &Delim;
fn write_delim_mut(&mut self) -> &mut Delim;
fn fill(&mut self) -> Poll<(), Error>;
Expand All @@ -38,8 +43,12 @@ pub trait Connection {
*self.write_delim_mut() = delim;
}

fn remaining(&self) -> bool {
!self.wb().is_empty() || !self.rb().is_empty()
fn wb_remaining(&self) -> bool {
!self.wb().is_empty()
}

fn rb_remaining(&self) -> bool {
!self.rb().is_empty()
}

fn try_write(&mut self, buf: &[u8]) -> Result<usize, std::io::Error> {
Expand All @@ -66,43 +75,61 @@ pub trait Connection {
}

fn try_read(&mut self) -> Poll<Option<BytesMut>, Error> {
if self.read_since().is_none() {
*self.read_since_mut() = Some(Instant::now());
}

let disconnected = self.fill()?.is_ready();

if !self.rb().is_empty() {
if let Some(payload) = match self.read_delim() {
Delim::Dytp => {
if let Some(p) = protocol::parse(self.rb_mut()) {
return Ok(Async::Ready(Some((p.1).0)));
} else {
return Ok(Async::NotReady);
}
}
Delim::Http => self
.rb()
.windows(2)
.enumerate()
.find(|&(_, bytes)| bytes == b"\r\n")
.map(|(i, _)| i)
.map(|i| {
let mut p = self.rb_mut().split_to(i + 2);
p.split_off(i);
p
}),
Delim::None => {
let len = self.rb().len();
Some(self.rb_mut().split_to(len))
}
} {
if let Some(payload) = self.try_read_delim() {
*self.read_since_mut() = None;

return Ok(Async::Ready(Some(payload)));
}
}

if disconnected {
Ok(Async::Ready(None))
} else {
if Instant::now().duration_since(*self.read_since().as_ref().unwrap())
> *self.read_timeout()
{
log::debug!("read timeout");

return Ok(Async::Ready(None));
}

task::current().notify();

Ok(Async::NotReady)
}
}

fn try_read_delim(&mut self) -> Option<BytesMut> {
match self.read_delim() {
Delim::Dytp => {
if let Some(p) = protocol::parse(self.rb_mut()) {
Some((p.1).0)
} else {
None
}
}
Delim::Http => self
.rb()
.windows(2)
.enumerate()
.find(|&(_, bytes)| bytes == b"\r\n")
.map(|(i, _)| i)
.map(|i| {
let mut p = self.rb_mut().split_to(i + 2);
p.split_off(i);
p
}),
Delim::None => {
let len = self.rb().len();
Some(self.rb_mut().split_to(len))
}
}
}
}
33 changes: 33 additions & 0 deletions dytp-connection/src/origin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use failure::Error;
use futures::prelude::*;
use futures::try_ready;
use std::io::Write;
use std::time::{Duration, Instant};
use tokio::net::TcpStream;
use tokio::prelude::*;

Expand All @@ -15,6 +16,8 @@ pub struct Origin {
wb: BytesMut,
read_delim: Delim,
write_delim: Delim,
read_timeout: Duration,
read_since: Option<Instant>,
}

impl Connection for Origin {
Expand Down Expand Up @@ -42,6 +45,22 @@ impl Connection for Origin {
&mut self.read_delim
}

fn read_timeout(&self) -> &Duration {
&self.read_timeout
}

fn read_timeout_mut(&mut self) -> &mut Duration {
&mut self.read_timeout
}

fn read_since(&self) -> &Option<Instant> {
&self.read_since
}

fn read_since_mut(&mut self) -> &mut Option<Instant> {
&mut self.read_since
}

fn write_delim(&self) -> &Delim {
&self.write_delim
}
Expand Down Expand Up @@ -71,6 +90,20 @@ impl Origin {
wb: BytesMut::new(),
read_delim: Delim::Dytp,
write_delim: Delim::Dytp,
read_timeout: Duration::from_secs(5),
read_since: None,
}
}

pub fn new_with_timeout(stream: TcpStream, read_timeout: u64) -> Self {
Origin {
stream,
rb: BytesMut::new(),
wb: BytesMut::new(),
read_delim: Delim::Dytp,
write_delim: Delim::Dytp,
read_timeout: Duration::from_secs(read_timeout),
read_since: None,
}
}
}
Expand Down
37 changes: 35 additions & 2 deletions dytp-connection/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use futures::prelude::*;
use futures::try_ready;
use http::uri::Uri;
use std::io::Write;
use std::net::ToSocketAddrs;
use std::net::{IpAddr, SocketAddr};
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
use std::time::{Duration, Instant};
use tokio::net::TcpStream;
use tokio::prelude::*;

Expand Down Expand Up @@ -129,6 +129,8 @@ pub struct Request {
wb: BytesMut,
read_delim: Delim,
write_delim: Delim,
read_timeout: Duration,
read_since: Option<Instant>,
}

impl Connection for Request {
Expand Down Expand Up @@ -156,6 +158,22 @@ impl Connection for Request {
&mut self.read_delim
}

fn read_timeout(&self) -> &Duration {
&self.read_timeout
}

fn read_timeout_mut(&mut self) -> &mut Duration {
&mut self.read_timeout
}

fn read_since(&self) -> &Option<Instant> {
&self.read_since
}

fn read_since_mut(&mut self) -> &mut Option<Instant> {
&mut self.read_since
}

fn write_delim(&self) -> &Delim {
&self.write_delim
}
Expand Down Expand Up @@ -186,6 +204,21 @@ impl Request {
wb: BytesMut::new(),
read_delim: Delim::Http,
write_delim: Delim::Http,
read_timeout: Duration::from_secs(5),
read_since: None,
}
}

pub fn new_with_timeout(stream: TcpStream, read_timeout: u64) -> Self {
Request {
stream,
http_buf: BytesMut::new(),
rb: BytesMut::new(),
wb: BytesMut::new(),
read_delim: Delim::Http,
write_delim: Delim::Http,
read_timeout: Duration::from_secs(read_timeout),
read_since: None,
}
}

Expand Down
Loading

0 comments on commit c81b904

Please sign in to comment.