From 6699fa6cfcbd51c8368cfc82aa6c51059ffff4cd Mon Sep 17 00:00:00 2001 From: teor Date: Thu, 2 Jan 2025 14:19:49 +1000 Subject: [PATCH] Fetch object batches in RPC and HTTP gateway --- crates/subspace-gateway-rpc/src/lib.rs | 9 +- .../src/commands/http/server.rs | 78 +++++++----- crates/subspace-gateway/src/main.rs | 2 + .../src/object_fetcher.rs | 118 ++++++++++-------- 4 files changed, 118 insertions(+), 89 deletions(-) diff --git a/crates/subspace-gateway-rpc/src/lib.rs b/crates/subspace-gateway-rpc/src/lib.rs index c907ed844c..6dd689b46b 100644 --- a/crates/subspace-gateway-rpc/src/lib.rs +++ b/crates/subspace-gateway-rpc/src/lib.rs @@ -132,13 +132,8 @@ where return Err(Error::TooManyMappings { count }); } - let mut objects = Vec::with_capacity(count); - // TODO: fetch concurrently - for mapping in mappings.objects() { - let data = self.object_fetcher.fetch_object(*mapping).await?; - - objects.push(data.into()); - } + let objects = self.object_fetcher.fetch_objects(mappings).await?; + let objects = objects.into_iter().map(HexData::from).collect(); Ok(objects) } diff --git a/crates/subspace-gateway/src/commands/http/server.rs b/crates/subspace-gateway/src/commands/http/server.rs index 6efbd421e3..6e30883b1a 100644 --- a/crates/subspace-gateway/src/commands/http/server.rs +++ b/crates/subspace-gateway/src/commands/http/server.rs @@ -18,79 +18,101 @@ where pub(crate) http_endpoint: String, } -/// Requests the object mapping with `hash` from the indexer service. +/// Requests the object mappings for `hashes` from the indexer service. +/// Multiple hashes are separated by `+`. async fn request_object_mapping( endpoint: &str, - hash: Blake3Hash, + hashes: &Vec, ) -> anyhow::Result { let client = reqwest::Client::new(); - let object_mappings_url = format!("{}/objects/{}", endpoint, hex::encode(hash)); + let hash_list = hashes.iter().map(hex::encode).collect::>(); + let object_mappings_url = format!("{}/objects/{}", endpoint, hash_list.join("+")); - debug!(?hash, ?object_mappings_url, "Requesting object mapping..."); + debug!( + ?hashes, + ?object_mappings_url, + "Requesting object mappings..." + ); let response = client.get(&object_mappings_url).send().await?.json().await; match &response { Ok(json) => { - trace!(?hash, ?json, "Received object mapping"); + trace!(?hashes, ?json, "Received object mappings"); } Err(err) => { - error!(?hash, ?err, ?object_mappings_url, "Request failed"); + error!(?hashes, ?err, ?object_mappings_url, "Request failed"); } } response.map_err(|err| err.into()) } -/// Fetches a DSN object with `hash`, using the mapping indexer service. +/// Fetches the DSN objects with `hashes`, using the mapping indexer service. +/// Multiple hashes are separated by `+`. async fn serve_object( - hash: web::Path, + hashes: web::Path, additional_data: web::Data>>, ) -> impl Responder where PG: PieceGetter + Send + Sync + 'static, { let server_params = additional_data.into_inner(); - let hash = hash.into_inner(); + let hashes = hashes.into_inner(); + let hashes = hashes + .split('+') + .map(|s| { + let mut hash = Blake3Hash::default(); + hex::decode_to_slice(s, hash.as_mut()).map(|()| hash) + }) + .try_collect::>(); - let Ok(object_mapping) = request_object_mapping(&server_params.indexer_endpoint, hash).await - else { + let Ok(hashes) = hashes else { return HttpResponse::BadRequest().finish(); }; - // TODO: fetch multiple objects - let Some(&object_mapping) = object_mapping.objects.objects().first() else { + let Ok(object_mappings) = + request_object_mapping(&server_params.indexer_endpoint, &hashes).await + else { return HttpResponse::BadRequest().finish(); }; - if object_mapping.hash != hash { - error!( - ?object_mapping, - ?hash, - "Returned object mapping doesn't match requested hash" - ); - return HttpResponse::ServiceUnavailable().finish(); + for object_mapping in object_mappings.objects.objects() { + if !hashes.contains(&object_mapping.hash) { + error!( + ?object_mapping, + ?hashes, + "Returned object mapping wasn't in requested hashes" + ); + return HttpResponse::ServiceUnavailable().finish(); + } } let object_fetcher_result = server_params .object_fetcher - .fetch_object(object_mapping) + .fetch_objects(object_mappings.objects) .await; - let object = match object_fetcher_result { - Ok(object) => { - trace!(?hash, size = %object.len(), "Object fetched successfully"); - object + let objects = match object_fetcher_result { + Ok(objects) => { + trace!( + ?hashes, + count = %objects.len(), + sizes = ?objects.iter().map(|object| object.len()), + "Objects fetched successfully" + ); + objects } Err(err) => { - error!(?hash, ?err, "Failed to fetch object"); + error!(?hashes, ?err, "Failed to fetch objects"); return HttpResponse::ServiceUnavailable().finish(); } }; + // TODO: return a multi-part response, with one part per object HttpResponse::Ok() .content_type("application/octet-stream") - .body(object) + .body(objects.concat()) } /// Starts the DSN object HTTP server. @@ -103,7 +125,7 @@ where HttpServer::new(move || { App::new() .app_data(web::Data::new(server_params.clone())) - .route("/data/{hash}", web::get().to(serve_object::)) + .route("/data/{hashes}", web::get().to(serve_object::)) }) .bind(http_endpoint)? .run() diff --git a/crates/subspace-gateway/src/main.rs b/crates/subspace-gateway/src/main.rs index 65ae756983..70856c154a 100644 --- a/crates/subspace-gateway/src/main.rs +++ b/crates/subspace-gateway/src/main.rs @@ -1,5 +1,7 @@ //! Subspace gateway implementation. +#![feature(iterator_try_collect)] + mod commands; mod node_client; mod piece_getter; diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index 1264b9a613..84e919be8c 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -22,7 +22,7 @@ use parity_scale_codec::{Compact, CompactLen, Decode, Encode}; use std::sync::Arc; use subspace_archiving::archiver::{Segment, SegmentItem}; use subspace_core_primitives::hashes::{blake3_hash, Blake3Hash}; -use subspace_core_primitives::objects::GlobalObject; +use subspace_core_primitives::objects::{GlobalObject, GlobalObjectMapping}; use subspace_core_primitives::pieces::{Piece, PieceIndex, RawRecord}; use subspace_core_primitives::segments::{RecordedHistorySegment, SegmentIndex}; use subspace_erasure_coding::ErasureCoding; @@ -193,74 +193,84 @@ where } } - /// Assemble the object in `mapping` by fetching necessary pieces using the piece getter, and - /// putting the object's bytes together. + /// Assemble the objects in `mapping` by fetching necessary pieces using the piece getter, and + /// putting the objects' bytes together. /// - /// Checks the object's hash to make sure the correct bytes are returned. - pub async fn fetch_object(&self, mapping: GlobalObject) -> Result, Error> { - let GlobalObject { - hash, - piece_index, - offset, - } = mapping; + /// Checks the objects' hashes to make sure the correct bytes are returned. + pub async fn fetch_objects( + &self, + mappings: GlobalObjectMapping, + ) -> Result>, Error> { + let mut objects = Vec::with_capacity(mappings.objects().len()); + + // TODO: sort mappings in piece index order, and keep pieces until they're no longer needed + for &mapping in mappings.objects() { + let GlobalObject { + hash, + piece_index, + offset, + } = mapping; + + // Validate parameters + if !piece_index.is_source() { + debug!( + ?mapping, + "Invalid piece index for object: must be a source piece", + ); - // Validate parameters - if !piece_index.is_source() { - debug!( - ?mapping, - "Invalid piece index for object: must be a source piece", - ); + // Parity pieces contain effectively random data, and can't be used to fetch objects + return Err(Error::NotSourcePiece { mapping }); + } - // Parity pieces contain effectively random data, and can't be used to fetch objects - return Err(Error::NotSourcePiece { mapping }); - } + if offset >= RawRecord::SIZE as u32 { + debug!( + ?mapping, + RawRecord_SIZE = RawRecord::SIZE, + "Invalid piece offset for object: must be less than the size of a raw record", + ); - if offset >= RawRecord::SIZE as u32 { - debug!( - ?mapping, - RawRecord_SIZE = RawRecord::SIZE, - "Invalid piece offset for object: must be less than the size of a raw record", - ); + return Err(Error::PieceOffsetTooLarge { mapping }); + } - return Err(Error::PieceOffsetTooLarge { mapping }); - } + // Try fast object assembling from individual pieces, + // then regular object assembling from segments + let data = match self.fetch_object_fast(mapping).await? { + Some(data) => data, + None => { + let data = self.fetch_object_regular(mapping).await?; + + debug!( + ?mapping, + len = %data.len(), + "Fetched object using regular object assembling", - // Try fast object assembling from individual pieces, - // then regular object assembling from segments - let data = match self.fetch_object_fast(mapping).await? { - Some(data) => data, - None => { - let data = self.fetch_object_regular(mapping).await?; + ); + data + } + }; + + let data_hash = blake3_hash(&data); + if data_hash != hash { debug!( + ?data_hash, + data_size = %data.len(), ?mapping, - len = %data.len(), - "Fetched object using regular object assembling", - + "Retrieved data doesn't match requested mapping hash" ); + trace!(data = %hex::encode(&data), "Retrieved data"); - data + return Err(Error::InvalidDataHash { + data_hash, + data_size: data.len(), + mapping, + }); } - }; - - let data_hash = blake3_hash(&data); - if data_hash != hash { - debug!( - ?data_hash, - data_size = %data.len(), - ?mapping, - "Retrieved data doesn't match requested mapping hash" - ); - trace!(data = %hex::encode(&data), "Retrieved data"); - return Err(Error::InvalidDataHash { - data_hash, - data_size: data.len(), - mapping, - }); + objects.push(data); } - Ok(data) + Ok(objects) } /// Fast object fetching and assembling where the object doesn't cross piece (super fast) or