Skip to content

Commit

Permalink
Start refactoring from top
Browse files Browse the repository at this point in the history
  • Loading branch information
izderadicka committed Oct 22, 2023
1 parent bbc25c4 commit ff91759
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 48 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "audioserve"
version = "0.27.1"
version = "0.27.2"
authors = ["Ivan <[email protected]>"]
edition = "2021"
rust-version = "1.70"
Expand Down
104 changes: 59 additions & 45 deletions src/services/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use self::auth::{AuthResult, Authenticator};
use self::request::{QueryParams, RequestWrapper};
use self::response::ResponseFuture;
use self::response::{ResponseFuture, ResponseResult};
use self::search::Search;
use self::transcode::QualityLevel;
use crate::config::get_config;
Expand All @@ -27,7 +27,6 @@ use std::{
convert::Infallible,
net::SocketAddr,
path::{Path, PathBuf},
pin::Pin,
sync::{atomic::AtomicUsize, Arc},
task::Poll,
};
Expand Down Expand Up @@ -87,24 +86,33 @@ impl<T> ServiceFactory<T> {
is_ssl: bool,
) -> impl Future<Output = Result<MainService<T>, Infallible>> {
future::ok(MainService {
state: ServiceComponents {
search: self.search.clone(),
transcoding: self.transcoding.clone(),
collections: self.collections.clone(),
},
authenticator: self.authenticator.clone(),
rate_limitter: self.rate_limitter.clone(),
search: self.search.clone(),
transcoding: self.transcoding.clone(),
collections: self.collections.clone(),
remote_addr,
is_ssl,
})
}
}

#[derive(Clone)]
pub struct MainService<T> {
pub authenticator: Option<Arc<dyn Authenticator<Credentials = T>>>,
pub rate_limitter: Option<Arc<Leaky>>,
pub struct ServiceComponents {
pub search: Search<String>,
pub transcoding: TranscodingDetails,
pub collections: Arc<Collections>,
}

type OptionalAuthenticatorType<T> = Option<Arc<dyn Authenticator<Credentials = T>>>;

#[derive(Clone)]
pub struct MainService<T> {
pub state: ServiceComponents,
pub authenticator: OptionalAuthenticatorType<T>,
pub rate_limitter: Option<Arc<Leaky>>,
pub remote_addr: SocketAddr,
pub is_ssl: bool,
}
Expand Down Expand Up @@ -177,31 +185,24 @@ fn is_static_file(path: &str) -> bool {
impl<C: 'static> Service<Request<Body>> for MainService<C> {
type Response = Response<Body>;
type Error = error::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
type Future = ResponseFuture;

fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
Box::pin(self.process_request(req).or_else(|e| {
error!("Request processing error: {}", e);
future::ok(response::internal_error())
}))
}
}
let state = self.state.clone();

impl<C: 'static> MainService<C> {
fn process_request(&mut self, req: Request<Body>) -> ResponseFuture {
//Limit rate of requests if configured
if let Some(limiter) = self.rate_limitter.as_ref() {
if let Some(ref limiter) = self.rate_limitter {
if limiter.start_one().is_err() {
debug!("Rejecting request due to rate limit");
return response::fut(response::too_many_requests);
}
}

// handle OPTIONS method for CORS preflightAtomicUsize
// handle OPTIONS method for CORS preflight
if req.method() == Method::OPTIONS && RequestWrapper::is_cors_enabled_for_request(&req) {
debug!(
"Got OPTIONS request in CORS mode : {} {:?}",
Expand All @@ -223,61 +224,74 @@ impl<C: 'static> MainService<C> {
return response::fut(response::bad_request);
}
};

Box::pin(
MainService::<C>::process_request(state, self.authenticator.clone(), req).or_else(
|e| {
error!("Request processing error: {}", e);
future::ok(response::internal_error())
},
),
)
}
}

impl<C: 'static> MainService<C> {
async fn process_request(
subservices: ServiceComponents,
authenticator: OptionalAuthenticatorType<C>,
req: RequestWrapper,
) -> ResponseResult {
//static files
if req.method() == Method::GET {
if req.path() == "/" || req.path() == "/index.html" {
return files::send_static_file(
&get_config().client_dir,
"index.html",
get_config().static_resource_cache_age,
);
)
.await;
} else if is_static_file(req.path()) {
return files::send_static_file(
&get_config().client_dir,
&req.path()[1..],
get_config().static_resource_cache_age,
);
)
.await;
}
}
// from here everything must be authenticated
let searcher = self.search.clone();
let transcoding = self.transcoding.clone();
let cors = req.is_cors_enabled();
let origin = req.headers().typed_get::<Origin>();

let resp = match self.authenticator {
let resp = match authenticator {
Some(ref auth) => {
let collections = self.collections.clone();
Box::pin(auth.authenticate(req).and_then(move |result| match result {
AuthResult::Authenticated { request, .. } => MainService::<C>::process_checked(
request,
searcher,
transcoding,
collections,
),
AuthResult::Authenticated { request, .. } => {
MainService::<C>::process_authenticated(request, subservices)
}
AuthResult::LoggedIn(resp) | AuthResult::Rejected(resp) => {
Box::pin(future::ok(resp))
}
}))
}
None => MainService::<C>::process_checked(
req,
searcher,
transcoding,
self.collections.clone(),
),
None => MainService::<C>::process_authenticated(req, subservices),
};
Box::pin(resp.map_ok(move |r| add_cors_headers(r, origin, cors)))
resp.map_ok(move |r| add_cors_headers(r, origin, cors))
.await
}

fn process_checked(
#[allow(unused_mut)] mut req: RequestWrapper,
searcher: Search<String>,
transcoding: TranscodingDetails,
collections: Arc<Collections>,
fn process_authenticated(
mut req: RequestWrapper,
subservices: ServiceComponents,
) -> ResponseFuture {
let params = req.params();
let path = req.path();
let ServiceComponents {
search,
transcoding,
collections,
} = subservices;
match *req.method() {
Method::GET => {
if path.starts_with("/collections") {
Expand Down Expand Up @@ -402,7 +416,7 @@ impl<C: 'static> MainService<C> {
let group = params.get_string("group");
api::search(
colllection_index,
searcher,
search,
search_string,
ord,
group,
Expand All @@ -414,7 +428,7 @@ impl<C: 'static> MainService<C> {
}
} else if path.starts_with("/recent") {
let group = params.get_string("group");
api::recent(colllection_index, searcher, group, req.can_compress())
api::recent(colllection_index, search, group, req.can_compress())
} else if path.starts_with("/cover/") {
files::send_cover(
base_dir,
Expand Down
3 changes: 2 additions & 1 deletion src/services/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ const NOT_IMPLEMENTED_MSG: &str = "Not Implemented";
const INTERNAL_SERVER_ERROR: &str = "Internal server error";
const UNPROCESSABLE_ENTITY: &str = "Ignored";

pub type ResponseFuture = Pin<Box<dyn Future<Output = Result<Response<Body>, Error>> + Send>>;
pub type ResponseResult = Result<Response<Body>, Error>;
pub type ResponseFuture = Pin<Box<dyn Future<Output = ResponseResult> + Send>>;

fn short_response(status: StatusCode, msg: &'static str) -> Response<Body> {
Response::builder()
Expand Down

0 comments on commit ff91759

Please sign in to comment.