diff --git a/src/sinks/gcp_chronicle/chronicle_unstructured.rs b/src/sinks/gcp_chronicle/chronicle_unstructured.rs index aba02d742452e..7ca16fe88385c 100644 --- a/src/sinks/gcp_chronicle/chronicle_unstructured.rs +++ b/src/sinks/gcp_chronicle/chronicle_unstructured.rs @@ -5,7 +5,8 @@ use bytes::{Bytes, BytesMut}; use futures_util::{future::BoxFuture, task::Poll}; use goauth::scopes::Scope; -use http::{header::HeaderValue, Request, StatusCode, Uri}; +use http::header::{self, HeaderName, HeaderValue}; +use http::{Request, StatusCode, Uri}; use hyper::Body; use indoc::indoc; use serde::Serialize; @@ -318,6 +319,7 @@ impl ChronicleUnstructuredConfig { pub struct ChronicleRequest { pub body: Bytes, pub finalizers: EventFinalizers, + pub headers: HashMap, metadata: RequestMetadata, } @@ -471,7 +473,33 @@ impl RequestBuilder<(ChroniclePartitionKey, Vec)> for ChronicleRequestBui metadata: RequestMetadata, payload: EncodeResult, ) -> Self::Request { - ChronicleRequest { + let mut headers = HashMap::new(); + headers.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + + match payload.compressed_byte_size { + Some(compressed_byte_size) => { + headers.insert( + header::CONTENT_LENGTH, + HeaderValue::from_str(&compressed_byte_size.to_string()).unwrap(), + ); + headers.insert( + header::CONTENT_ENCODING, + HeaderValue::from_str(&self.compression.content_encoding().unwrap()).unwrap(), + ); + } + None => { + headers.insert( + header::CONTENT_LENGTH, + HeaderValue::from_str(&payload.uncompressed_byte_size.to_string()).unwrap(), + ); + } + } + + return ChronicleRequest { + headers: headers, body: payload.into_payload().bytes, finalizers, metadata, @@ -547,18 +575,13 @@ impl Service for ChronicleService { fn call(&mut self, request: ChronicleRequest) -> Self::Future { let mut builder = Request::post(&self.base_url); - let headers = builder.headers_mut().unwrap(); - headers.insert( - "content-type", - HeaderValue::from_str("application/json").unwrap(), - ); - headers.insert( - "content-length", - HeaderValue::from_str(&request.body.len().to_string()).unwrap(), - ); - let metadata = request.get_metadata().clone(); + let headers = builder.headers_mut().unwrap(); + for (name, value) in request.headers { + headers.insert(name, value); + } + let mut http_request = builder.body(Body::from(request.body)).unwrap(); self.creds.apply(&mut http_request);