From 0ad13055a45d27314a690191cca99ba8dbacfdf7 Mon Sep 17 00:00:00 2001 From: "jizhuozhi.george" Date: Fri, 11 Oct 2024 02:54:34 +0800 Subject: [PATCH 01/11] Add Promise support for http callout Signed-off-by: jizhuozhi.george --- README.md | 1 + examples/http_parallel_call/Cargo.toml | 22 ++ examples/http_parallel_call/README.md | 27 ++ .../http_parallel_call/docker-compose.yaml | 36 +++ examples/http_parallel_call/envoy.yaml | 68 ++++ examples/http_parallel_call/src/lib.rs | 129 ++++++++ src/lib.rs | 1 + src/promise.rs | 291 ++++++++++++++++++ 8 files changed, 575 insertions(+) create mode 100644 examples/http_parallel_call/Cargo.toml create mode 100644 examples/http_parallel_call/README.md create mode 100644 examples/http_parallel_call/docker-compose.yaml create mode 100644 examples/http_parallel_call/envoy.yaml create mode 100644 examples/http_parallel_call/src/lib.rs create mode 100644 src/promise.rs diff --git a/README.md b/README.md index 464be212..99ba33ec 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ - [HTTP Headers](./examples/http_headers/) - [HTTP Response body](./examples/http_body/) - [HTTP Configuration](./examples/http_config/) +- [HTTP Parallel Call](./examples/http_parallel_call/) - [gRPC Auth (random)](./examples/grpc_auth_random/) ## Articles & blog posts from the community diff --git a/examples/http_parallel_call/Cargo.toml b/examples/http_parallel_call/Cargo.toml new file mode 100644 index 00000000..5328fcf6 --- /dev/null +++ b/examples/http_parallel_call/Cargo.toml @@ -0,0 +1,22 @@ +[package] +publish = false +name = "proxy-wasm-example-http-parallel-call" +version = "0.0.1" +authors = ["Zhuozhi Ji "] +description = "Proxy-Wasm plugin example: HTTP parallel call" +license = "Apache-2.0" +edition = "2018" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +log = "0.4" +proxy-wasm = { path = "../../" } + +[profile.release] +lto = true +opt-level = 3 +codegen-units = 1 +panic = "abort" +strip = "debuginfo" diff --git a/examples/http_parallel_call/README.md b/examples/http_parallel_call/README.md new file mode 100644 index 00000000..3ff62d5f --- /dev/null +++ b/examples/http_parallel_call/README.md @@ -0,0 +1,27 @@ +## Proxy-Wasm plugin example: HTTP parallel call + +Proxy-Wasm plugin that makes multiply HTTP callout and combine responses as final response . + +### Building + +```sh +$ cargo build --target wasm32-wasi --release +``` + +### Using in Envoy + +This example can be run with [`docker compose`](https://docs.docker.com/compose/install/) +and has a matching Envoy configuration. + +```sh +$ docker compose up +``` + +#### Access granted. + +Send HTTP request to `localhost:10000/headers`: + +```sh +$ curl localhost:10000/headers +Hello, World!\n +``` diff --git a/examples/http_parallel_call/docker-compose.yaml b/examples/http_parallel_call/docker-compose.yaml new file mode 100644 index 00000000..6a188511 --- /dev/null +++ b/examples/http_parallel_call/docker-compose.yaml @@ -0,0 +1,36 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +services: + envoy: + image: envoyproxy/envoy:v1.31-latest + hostname: envoy + ports: + - "10000:10000" + volumes: + - ./envoy.yaml:/etc/envoy/envoy.yaml + - ./target/wasm32-wasi/release:/etc/envoy/proxy-wasm-plugins + networks: + - envoymesh + depends_on: + - httpbin + httpbin: + image: mccutchen/go-httpbin + hostname: httpbin + ports: + - "8080:8080" + networks: + - envoymesh +networks: + envoymesh: {} diff --git a/examples/http_parallel_call/envoy.yaml b/examples/http_parallel_call/envoy.yaml new file mode 100644 index 00000000..61fdf8da --- /dev/null +++ b/examples/http_parallel_call/envoy.yaml @@ -0,0 +1,68 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +static_resources: + listeners: + address: + socket_address: + address: 0.0.0.0 + port_value: 10000 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + codec_type: AUTO + route_config: + name: local_routes + virtual_hosts: + - name: local_service + domains: + - "*" + routes: + - match: + prefix: "/" + route: + cluster: httpbin + http_filters: + - name: envoy.filters.http.wasm + typed_config: + "@type": type.googleapis.com/udpa.type.v1.TypedStruct + type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm + value: + config: + name: "http_parallel_call" + vm_config: + runtime: "envoy.wasm.runtime.v8" + code: + local: + filename: "/etc/envoy/proxy-wasm-plugins/proxy_wasm_example_http_parallel_call.wasm" + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + clusters: + - name: httpbin + connect_timeout: 5s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: httpbin + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: httpbin + port_value: 8080 diff --git a/examples/http_parallel_call/src/lib.rs b/examples/http_parallel_call/src/lib.rs new file mode 100644 index 00000000..6616c59a --- /dev/null +++ b/examples/http_parallel_call/src/lib.rs @@ -0,0 +1,129 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use proxy_wasm::hostcalls; +use proxy_wasm::promise::Promise; +use proxy_wasm::traits::*; +use proxy_wasm::types::*; +use std::collections::HashMap; +use std::rc::Rc; +use std::time::Duration; + +proxy_wasm::main! {{ + proxy_wasm::set_log_level(LogLevel::Trace); + proxy_wasm::set_http_context(|_, _| -> Box { Box::new(HttpParallelCall::default()) }); +}} + +#[derive(Default)] +struct HttpParallelCall { + m: HashMap>>, +} + +impl HttpContext for HttpParallelCall { + fn on_http_request_headers(&mut self, _: usize, _: bool) -> Action { + // "Hello, " + let token1 = self + .dispatch_http_call( + "httpbin", + vec![ + (":method", "GET"), + (":path", "/base64/SGVsbG8sIAo="), + (":authority", "httpbin.org"), + ], + None, + vec![], + Duration::from_secs(1), + ) + .unwrap(); + + // "World!" + let token2 = self + .dispatch_http_call( + "httpbin", + vec![ + (":method", "GET"), + (":path", "/base64/V29ybGQhCg=="), + (":authority", "httpbin.org"), + ], + None, + vec![], + Duration::from_secs(1), + ) + .unwrap(); + + let promise1 = Promise::new(); + let promise2 = Promise::new(); + self.m.insert(token1, promise1.clone()); + self.m.insert(token2, promise2.clone()); + + Promise::all_of(vec![ + promise1 + .then(|(_, _, _body_size, _)| get_http_call_response_body_string(0, _body_size)) + .then(|body| body.unwrap_or_else(|| "".to_string())), + promise2 + .then(|(_, _, _body_size, _)| get_http_call_response_body_string(0, _body_size)) + .then(|body| body.unwrap_or_else(|| "".to_string())), + ]) + .then(|results| { + send_http_response( + 200, + vec![], + Some( + format!( + "{}{}\n", + results[0].strip_suffix("\n").unwrap(), + results[1].strip_suffix("\n").unwrap() + ) + .as_bytes(), + ), + ); + }); + + Action::Pause + } + + fn on_http_response_headers(&mut self, _: usize, _: bool) -> Action { + self.set_http_response_header("Powered-By", Some("proxy-wasm")); + Action::Continue + } +} + +impl Context for HttpParallelCall { + fn on_http_call_response( + &mut self, + _token_id: u32, + _num_headers: usize, + _body_size: usize, + _num_trailers: usize, + ) { + let promise = self.m.remove(&_token_id); + promise + .unwrap() + .fulfill((_token_id, _num_headers, _body_size, _num_trailers)); + } +} + +fn get_http_call_response_body_string(start: usize, max_size: usize) -> Option { + match hostcalls::get_buffer(BufferType::HttpCallResponseBody, start, max_size).unwrap() { + None => None, + Some(bytes) => { + let body_string = String::from_utf8(bytes.to_vec()).unwrap(); + Some(body_string) + } + } +} + +fn send_http_response(status_code: u32, headers: Vec<(&str, &str)>, body: Option<&[u8]>) { + hostcalls::send_http_response(status_code, headers, body).unwrap() +} diff --git a/src/lib.rs b/src/lib.rs index a8f42651..05d23579 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ // limitations under the License. pub mod hostcalls; +pub mod promise; pub mod traits; pub mod types; diff --git a/src/promise.rs b/src/promise.rs new file mode 100644 index 00000000..cf0f05d8 --- /dev/null +++ b/src/promise.rs @@ -0,0 +1,291 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cell::RefCell; +use std::rc::Rc; + +enum PromiseState { + Pending, + Fulfilled(T), + Rejected(String), +} + +pub struct Promise { + state: RefCell>, + then_callback: RefCell>>, + catch_callback: RefCell>>, +} + +impl Promise +where + T: 'static + Clone, +{ + pub fn new() -> Rc { + Rc::new(Self { + state: RefCell::new(PromiseState::Pending), + then_callback: RefCell::new(None), + catch_callback: RefCell::new(None), + }) + } + + pub fn fulfill(self: &Rc, value: T) { + *self.state.borrow_mut() = PromiseState::Fulfilled(value.clone()); + if let Some(callback) = self.then_callback.borrow_mut().take() { + callback(value); + } + } + + pub fn reject(self: &Rc, reason: String) { + *self.state.borrow_mut() = PromiseState::Rejected(reason.clone()); + if let Some(callback) = self.catch_callback.borrow_mut().take() { + callback(reason); + } + } + + pub fn then(self: &Rc, f: F) -> Rc> + where + F: FnOnce(T) -> R + 'static, + R: 'static + Clone, + { + let new_promise = Promise::new(); + let new_promise_clone = new_promise.clone(); + match &*self.state.borrow() { + PromiseState::Pending => { + *self.then_callback.borrow_mut() = Some(Box::new(move |value| { + let result = f(value.clone()); + new_promise_clone.fulfill(result); + })); + let new_promise_for_catch = new_promise.clone(); + *self.catch_callback.borrow_mut() = Some(Box::new(move |reason| { + new_promise_for_catch.reject(reason); + })); + } + PromiseState::Fulfilled(value) => { + let result = f(value.clone()); + new_promise.fulfill(result); + } + PromiseState::Rejected(reason) => new_promise.reject(reason.clone()), + } + new_promise + } + + pub fn catch(self: &Rc, f: F) -> Rc + where + F: FnOnce(String) + 'static, + { + match &*self.state.borrow() { + PromiseState::Pending => *self.catch_callback.borrow_mut() = Some(Box::new(f)), + PromiseState::Fulfilled(_) => {} + PromiseState::Rejected(reason) => f(reason.clone()), + } + self.clone() + } + + pub fn all_of(promises: Vec>) -> Rc>> { + let next_promise = Promise::new(); + let total = promises.len(); + let results = Rc::new(RefCell::new(vec![None; total])); + let remaining = Rc::new(RefCell::new(total)); + let rejected = Rc::new(RefCell::new(false)); + + for (i, promise) in promises.iter().enumerate() { + let next_promise_clone = next_promise.clone(); + let next_promise_clone_for_catch = next_promise.clone(); + let results_clone = results.clone(); + let remaining_clone = remaining.clone(); + let rejected_clone = rejected.clone(); + let rejected_clone_for_catch = rejected.clone(); + promise + .then(move |result| { + if *rejected_clone.borrow() { + return; + } + results_clone.borrow_mut()[i] = Some(result); + *remaining_clone.borrow_mut() -= 1; + + if *remaining_clone.borrow() == 0 { + let final_results: Vec = results_clone + .borrow_mut() + .iter_mut() + .map(|res| res.take().unwrap()) + .collect(); + next_promise_clone.fulfill(final_results); + } + }) + .catch(move |reason| { + if !*rejected_clone_for_catch.borrow() { + *rejected_clone_for_catch.borrow_mut() = true; + next_promise_clone_for_catch.reject(reason.clone()); + } + }); + } + next_promise + } + + pub fn any_of(promises: Vec>) -> Rc> { + let next_promise = Promise::new(); + let total = promises.len(); + let remaining = Rc::new(RefCell::new(total)); + let first_error = Rc::new(RefCell::new(None)); // 用来保存第一个错误 + + for promise in promises { + let next_promise_clone = next_promise.clone(); + let next_promise_clone_for_catch = next_promise.clone(); + let remaining_clone = remaining.clone(); + let remaining_clone_for_catch = remaining.clone(); + let first_error_clone = first_error.clone(); + + promise + .then(move |result| { + if *remaining_clone.borrow() > 0 { + next_promise_clone.fulfill(result); + *remaining_clone.borrow_mut() = 0; + } + }) + .catch(move |err| { + if first_error_clone.borrow().is_none() { + *first_error_clone.borrow_mut() = Some(err.clone()); + } + + *remaining_clone_for_catch.borrow_mut() -= 1; + + if *remaining_clone_for_catch.borrow() == 0 { + if let Some(first_err) = first_error_clone.borrow().clone() { + next_promise_clone_for_catch.reject(first_err); + } + } + }); + } + + next_promise + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_promise_new() { + let promise = Promise::::new(); + assert!(matches!(*promise.state.borrow(), PromiseState::Pending)); + assert!(promise.then_callback.borrow().is_none()); + assert!(promise.catch_callback.borrow().is_none()); + } + + #[test] + fn test_promise_fulfill() { + let promise = Promise::::new(); + let next_promise = promise.then(|result| { + assert_eq!(result, 42); + }); + + promise.fulfill(42); + } + + #[test] + fn test_promise_reject() { + let promise = Promise::::new(); + let next_promise = promise.catch(|err| { + assert_eq!(err, "Error"); + }); + + promise.reject("Error".to_string()); + } + + #[test] + fn test_promise_chain() { + let promise = Promise::::new(); + let next_promise = promise.then(|result| { + assert_eq!(result, 10); + 20 + }); + + next_promise.then(|result| { + assert_eq!(result, 20); + }); + + promise.fulfill(10); + } + + #[test] + fn test_all_of_success() { + let promise1 = Promise::::new(); + let promise2 = Promise::::new(); + + let all_promise = Promise::all_of(vec![promise1.clone(), promise2.clone()]); + + promise1.fulfill(42); + promise2.fulfill(100); + + all_promise + .then(|results| { + assert_eq!(results.len(), 2); + assert_eq!(results[0], 42); + assert_eq!(results[1], 100); + }) + .catch(|_err| { + panic!("Should not reach here"); + }); + } + + #[test] + fn test_all_of_failure() { + let promise1 = Promise::::new(); + let promise2 = Promise::::new(); + + let all_promise = Promise::all_of(vec![promise1.clone(), promise2.clone()]); + + promise1.reject("Error 1".to_string()); + promise2.reject("Error 2".to_string()); + + all_promise + .then(|_results| { + panic!("Should not reach here"); + }) + .catch(|err| { + assert_eq!(err, "Error 1"); + }); + } + + #[test] + fn test_all_of_mixed_results() { + let promise1 = Promise::::new(); + let promise2 = Promise::::new(); + + let all_promise = Promise::all_of(vec![promise1.clone(), promise2.clone()]); + + promise1.reject("Error".to_string()); + promise2.fulfill(100); + + all_promise + .then(|_| { + panic!("Should not reach here"); + }) + .catch(|reason| assert_eq!(reason, "Error".to_string())); + } + + #[test] + fn test_all_of_empty() { + let all_promise = Promise::::all_of(vec![]); + + all_promise + .then(|results| { + assert!(results.is_empty()); + }) + .catch(|_err| { + panic!("Should not reach here"); + }); + } +} From cfefbcfd790d50068e487c029223c64966599c53 Mon Sep 17 00:00:00 2001 From: "jizhuozhi.george" Date: Mon, 14 Oct 2024 13:06:40 +0800 Subject: [PATCH 02/11] Add Promise support for http callout fix ci Signed-off-by: jizhuozhi.george --- src/promise.rs | 61 ++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 54 insertions(+), 7 deletions(-) diff --git a/src/promise.rs b/src/promise.rs index cf0f05d8..3678b554 100644 --- a/src/promise.rs +++ b/src/promise.rs @@ -94,6 +94,12 @@ where pub fn all_of(promises: Vec>) -> Rc>> { let next_promise = Promise::new(); + + if promises.is_empty() { + next_promise.fulfill(vec![]); + return next_promise; + } + let total = promises.len(); let results = Rc::new(RefCell::new(vec![None; total])); let remaining = Rc::new(RefCell::new(total)); @@ -186,41 +192,59 @@ mod tests { #[test] fn test_promise_fulfill() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + let promise = Promise::::new(); - let next_promise = promise.then(|result| { + let _next_promise = promise.then(move |result| { assert_eq!(result, 42); + *touched_clone.borrow_mut() = true; }); promise.fulfill(42); + assert_eq!(true, touched.take()) } #[test] fn test_promise_reject() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + let promise = Promise::::new(); - let next_promise = promise.catch(|err| { + let _next_promise = promise.catch(move |err| { assert_eq!(err, "Error"); + *touched_clone.borrow_mut() = true; }); promise.reject("Error".to_string()); + assert_eq!(true, touched.take()) } #[test] fn test_promise_chain() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + let promise = Promise::::new(); let next_promise = promise.then(|result| { assert_eq!(result, 10); 20 }); - next_promise.then(|result| { + next_promise.then(move |result| { assert_eq!(result, 20); + *touched_clone.borrow_mut() = true; }); promise.fulfill(10); + assert_eq!(true, touched.take()) } #[test] fn test_all_of_success() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + let promise1 = Promise::::new(); let promise2 = Promise::::new(); @@ -230,18 +254,24 @@ mod tests { promise2.fulfill(100); all_promise - .then(|results| { + .then(move |results| { assert_eq!(results.len(), 2); assert_eq!(results[0], 42); assert_eq!(results[1], 100); + *touched_clone.borrow_mut() = true; }) .catch(|_err| { panic!("Should not reach here"); }); + + assert_eq!(true, touched.take()) } #[test] fn test_all_of_failure() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + let promise1 = Promise::::new(); let promise2 = Promise::::new(); @@ -254,13 +284,19 @@ mod tests { .then(|_results| { panic!("Should not reach here"); }) - .catch(|err| { + .catch(move |err| { assert_eq!(err, "Error 1"); + *touched_clone.borrow_mut() = true; }); + + assert_eq!(true, touched.take()) } #[test] fn test_all_of_mixed_results() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + let promise1 = Promise::::new(); let promise2 = Promise::::new(); @@ -273,19 +309,30 @@ mod tests { .then(|_| { panic!("Should not reach here"); }) - .catch(|reason| assert_eq!(reason, "Error".to_string())); + .catch(move |reason| { + assert_eq!(reason, "Error".to_string()); + *touched_clone.borrow_mut() = true; + }); + + assert_eq!(true, touched.take()) } #[test] fn test_all_of_empty() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + let all_promise = Promise::::all_of(vec![]); all_promise - .then(|results| { + .then(move |results| { assert!(results.is_empty()); + *touched_clone.borrow_mut() = true; }) .catch(|_err| { panic!("Should not reach here"); }); + + assert_eq!(true, touched.take()) } } From a1e4f9ef9a79e2d3a7aa9b009d50fce20ab1bf58 Mon Sep 17 00:00:00 2001 From: "jizhuozhi.george" Date: Mon, 14 Oct 2024 13:14:40 +0800 Subject: [PATCH 03/11] Add Promise support for http callout ci add `http_parallel_call` Signed-off-by: jizhuozhi.george --- .github/workflows/rust.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 11de901a..2e6cad39 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -287,6 +287,7 @@ jobs: - 'http_body' - 'http_config' - 'http_headers' + - 'http_parallel_call' - 'grpc_auth_random' defaults: @@ -351,6 +352,7 @@ jobs: - 'http_body' - 'http_config' - 'http_headers' + - 'http_parallel_call' - 'grpc_auth_random' defaults: From b4150bc62124b83e6eb675df18c29c0050d32a79 Mon Sep 17 00:00:00 2001 From: "jizhuozhi.george" Date: Mon, 14 Oct 2024 13:26:10 +0800 Subject: [PATCH 04/11] Add Promise support for http callout fix ci Signed-off-by: jizhuozhi.george --- examples/http_parallel_call/src/lib.rs | 8 +++++--- src/promise.rs | 21 ++++++++++++--------- 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/examples/http_parallel_call/src/lib.rs b/examples/http_parallel_call/src/lib.rs index 6616c59a..fb7600be 100644 --- a/examples/http_parallel_call/src/lib.rs +++ b/examples/http_parallel_call/src/lib.rs @@ -25,9 +25,11 @@ proxy_wasm::main! {{ proxy_wasm::set_http_context(|_, _| -> Box { Box::new(HttpParallelCall::default()) }); }} +type OnHttpResponseArgs = (u32, usize, usize, usize); + #[derive(Default)] struct HttpParallelCall { - m: HashMap>>, + m: HashMap>>, } impl HttpContext for HttpParallelCall { @@ -70,10 +72,10 @@ impl HttpContext for HttpParallelCall { Promise::all_of(vec![ promise1 .then(|(_, _, _body_size, _)| get_http_call_response_body_string(0, _body_size)) - .then(|body| body.unwrap_or_else(|| "".to_string())), + .then(|body| body.unwrap_or_default()), promise2 .then(|(_, _, _body_size, _)| get_http_call_response_body_string(0, _body_size)) - .then(|body| body.unwrap_or_else(|| "".to_string())), + .then(|body| body.unwrap_or_default()), ]) .then(|results| { send_http_response( diff --git a/src/promise.rs b/src/promise.rs index 3678b554..3c645573 100644 --- a/src/promise.rs +++ b/src/promise.rs @@ -21,10 +21,13 @@ enum PromiseState { Rejected(String), } +type ThenCallbackRef = RefCell>>; +type CatchCallbackRef = RefCell>>; + pub struct Promise { state: RefCell>, - then_callback: RefCell>>, - catch_callback: RefCell>>, + then_callback: ThenCallbackRef, + catch_callback: CatchCallbackRef, } impl Promise @@ -202,7 +205,7 @@ mod tests { }); promise.fulfill(42); - assert_eq!(true, touched.take()) + assert!(touched.take()) } #[test] @@ -217,7 +220,7 @@ mod tests { }); promise.reject("Error".to_string()); - assert_eq!(true, touched.take()) + assert!(touched.take()) } #[test] @@ -237,7 +240,7 @@ mod tests { }); promise.fulfill(10); - assert_eq!(true, touched.take()) + assert!(touched.take()) } #[test] @@ -264,7 +267,7 @@ mod tests { panic!("Should not reach here"); }); - assert_eq!(true, touched.take()) + assert!(touched.take()) } #[test] @@ -289,7 +292,7 @@ mod tests { *touched_clone.borrow_mut() = true; }); - assert_eq!(true, touched.take()) + assert!(touched.take()) } #[test] @@ -314,7 +317,7 @@ mod tests { *touched_clone.borrow_mut() = true; }); - assert_eq!(true, touched.take()) + assert!(touched.take()) } #[test] @@ -333,6 +336,6 @@ mod tests { panic!("Should not reach here"); }); - assert_eq!(true, touched.take()) + assert!(touched.take()) } } From de19425e39d908f1a68c454cd1249084e93c23e0 Mon Sep 17 00:00:00 2001 From: "jizhuozhi.george" Date: Mon, 14 Oct 2024 16:26:21 +0800 Subject: [PATCH 05/11] Add Promise support for http callout move promise and dispatch http request to callout folder Signed-off-by: jizhuozhi.george --- examples/http_parallel_call/Cargo.toml | 2 +- examples/http_parallel_call/src/lib.rs | 89 ++++++++++---------------- src/callout/http.rs | 42 ++++++++++++ src/callout/mod.rs | 2 + src/{ => callout}/promise.rs | 0 src/lib.rs | 9 ++- 6 files changed, 83 insertions(+), 61 deletions(-) create mode 100644 src/callout/http.rs create mode 100644 src/callout/mod.rs rename src/{ => callout}/promise.rs (100%) diff --git a/examples/http_parallel_call/Cargo.toml b/examples/http_parallel_call/Cargo.toml index 5328fcf6..29899f51 100644 --- a/examples/http_parallel_call/Cargo.toml +++ b/examples/http_parallel_call/Cargo.toml @@ -2,7 +2,7 @@ publish = false name = "proxy-wasm-example-http-parallel-call" version = "0.0.1" -authors = ["Zhuozhi Ji "] +authors = ["Zhuozhi Ji "] description = "Proxy-Wasm plugin example: HTTP parallel call" license = "Apache-2.0" edition = "2018" diff --git a/examples/http_parallel_call/src/lib.rs b/examples/http_parallel_call/src/lib.rs index fb7600be..805baaa6 100644 --- a/examples/http_parallel_call/src/lib.rs +++ b/examples/http_parallel_call/src/lib.rs @@ -12,12 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. +use proxy_wasm::callout::http::HttpClient; use proxy_wasm::hostcalls; -use proxy_wasm::promise::Promise; +use proxy_wasm::callout::promise::Promise; use proxy_wasm::traits::*; use proxy_wasm::types::*; -use std::collections::HashMap; -use std::rc::Rc; use std::time::Duration; proxy_wasm::main! {{ @@ -25,70 +24,52 @@ proxy_wasm::main! {{ proxy_wasm::set_http_context(|_, _| -> Box { Box::new(HttpParallelCall::default()) }); }} -type OnHttpResponseArgs = (u32, usize, usize, usize); - #[derive(Default)] struct HttpParallelCall { - m: HashMap>>, + client: HttpClient, } impl HttpContext for HttpParallelCall { fn on_http_request_headers(&mut self, _: usize, _: bool) -> Action { // "Hello, " - let token1 = self - .dispatch_http_call( - "httpbin", - vec![ - (":method", "GET"), - (":path", "/base64/SGVsbG8sIAo="), - (":authority", "httpbin.org"), - ], - None, - vec![], - Duration::from_secs(1), - ) - .unwrap(); + let promise1 = self.client.dispatch( + "httpbin", + vec![ + (":method", "GET"), + (":path", "/base64/SGVsbG8sIA=="), + (":authority", "httpbin.org"), + ], + None, + vec![], + Duration::from_secs(1), + ); // "World!" - let token2 = self - .dispatch_http_call( - "httpbin", - vec![ - (":method", "GET"), - (":path", "/base64/V29ybGQhCg=="), - (":authority", "httpbin.org"), - ], - None, - vec![], - Duration::from_secs(1), - ) - .unwrap(); - - let promise1 = Promise::new(); - let promise2 = Promise::new(); - self.m.insert(token1, promise1.clone()); - self.m.insert(token2, promise2.clone()); + let promise2 = self.client.dispatch( + "httpbin", + vec![ + (":method", "GET"), + (":path", "/base64/V29ybGQh"), + (":authority", "httpbin.org"), + ], + None, + vec![], + Duration::from_secs(1), + ); Promise::all_of(vec![ promise1 - .then(|(_, _, _body_size, _)| get_http_call_response_body_string(0, _body_size)) + .then(|(_, _, body_size, _)| get_http_call_response_body_string(0, body_size)) .then(|body| body.unwrap_or_default()), promise2 - .then(|(_, _, _body_size, _)| get_http_call_response_body_string(0, _body_size)) + .then(|(_, _, body_size, _)| get_http_call_response_body_string(0, body_size)) .then(|body| body.unwrap_or_default()), ]) .then(|results| { send_http_response( 200, vec![], - Some( - format!( - "{}{}\n", - results[0].strip_suffix("\n").unwrap(), - results[1].strip_suffix("\n").unwrap() - ) - .as_bytes(), - ), + Some(format!("{}{}\n", results[0], results[1]).as_bytes()), ); }); @@ -104,15 +85,13 @@ impl HttpContext for HttpParallelCall { impl Context for HttpParallelCall { fn on_http_call_response( &mut self, - _token_id: u32, - _num_headers: usize, - _body_size: usize, - _num_trailers: usize, + token_id: u32, + num_headers: usize, + body_size: usize, + num_trailers: usize, ) { - let promise = self.m.remove(&_token_id); - promise - .unwrap() - .fulfill((_token_id, _num_headers, _body_size, _num_trailers)); + self.client + .callback(token_id, num_headers, body_size, num_trailers) } } diff --git a/src/callout/http.rs b/src/callout/http.rs new file mode 100644 index 00000000..04d345af --- /dev/null +++ b/src/callout/http.rs @@ -0,0 +1,42 @@ +use crate::callout::promise::Promise; +use crate::hostcalls; +use std::collections::HashMap; +use std::rc::Rc; +use std::time::Duration; + +type OnHttpResponseArgs = (u32, usize, usize, usize); + +#[derive(Default)] +pub struct HttpClient { + m: HashMap>>, +} + +impl HttpClient { + pub fn dispatch( + &mut self, + upstream: &str, + headers: Vec<(&str, &str)>, + body: Option<&[u8]>, + trailers: Vec<(&str, &str)>, + timeout: Duration, + ) -> Rc> { + let token = + hostcalls::dispatch_http_call(upstream, headers, body, trailers, timeout).unwrap(); + let promise = Promise::new(); + self.m.insert(token, promise.clone()); + promise + } + + pub fn callback( + &mut self, + token_id: u32, + num_headers: usize, + body_size: usize, + num_trailers: usize, + ) { + let promise = self.m.remove(&token_id); + promise + .unwrap() + .fulfill((token_id, num_headers, body_size, num_trailers)) + } +} diff --git a/src/callout/mod.rs b/src/callout/mod.rs new file mode 100644 index 00000000..2eb5e9c2 --- /dev/null +++ b/src/callout/mod.rs @@ -0,0 +1,2 @@ +pub mod http; +pub mod promise; diff --git a/src/promise.rs b/src/callout/promise.rs similarity index 100% rename from src/promise.rs rename to src/callout/promise.rs diff --git a/src/lib.rs b/src/lib.rs index 05d23579..974e42f9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,14 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. -pub mod hostcalls; -pub mod promise; -pub mod traits; -pub mod types; - mod allocator; +pub mod callout; mod dispatcher; +pub mod hostcalls; mod logger; +pub mod traits; +pub mod types; // For crate-type="cdylib". #[cfg(not(wasi_exec_model_reactor))] From 5c95bb4af5f1dc10af935322455732e38fd575e6 Mon Sep 17 00:00:00 2001 From: "jizhuozhi.george" Date: Mon, 14 Oct 2024 16:31:22 +0800 Subject: [PATCH 06/11] Add Promise support for http callout remove Chinese comments Signed-off-by: jizhuozhi.george --- src/callout/promise.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/callout/promise.rs b/src/callout/promise.rs index 3c645573..badb5f5b 100644 --- a/src/callout/promise.rs +++ b/src/callout/promise.rs @@ -146,7 +146,7 @@ where let next_promise = Promise::new(); let total = promises.len(); let remaining = Rc::new(RefCell::new(total)); - let first_error = Rc::new(RefCell::new(None)); // 用来保存第一个错误 + let first_error = Rc::new(RefCell::new(None)); for promise in promises { let next_promise_clone = next_promise.clone(); From 95f4d063e037ce497105df6c75b68d9616e7489d Mon Sep 17 00:00:00 2001 From: "jizhuozhi.george" Date: Mon, 14 Oct 2024 17:07:18 +0800 Subject: [PATCH 07/11] Add Promise support for http callout simplify http_parallel_call example Signed-off-by: jizhuozhi.george --- examples/http_parallel_call/README.md | 4 +- examples/http_parallel_call/src/lib.rs | 64 +++++++++++++------------- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/examples/http_parallel_call/README.md b/examples/http_parallel_call/README.md index 3ff62d5f..a837b2c3 100644 --- a/examples/http_parallel_call/README.md +++ b/examples/http_parallel_call/README.md @@ -19,9 +19,9 @@ $ docker compose up #### Access granted. -Send HTTP request to `localhost:10000/headers`: +Send HTTP request to `localhost:10000/`: ```sh -$ curl localhost:10000/headers +$ curl localhost:10000/ Hello, World!\n ``` diff --git a/examples/http_parallel_call/src/lib.rs b/examples/http_parallel_call/src/lib.rs index 805baaa6..1e5e1599 100644 --- a/examples/http_parallel_call/src/lib.rs +++ b/examples/http_parallel_call/src/lib.rs @@ -13,8 +13,8 @@ // limitations under the License. use proxy_wasm::callout::http::HttpClient; -use proxy_wasm::hostcalls; use proxy_wasm::callout::promise::Promise; +use proxy_wasm::hostcalls; use proxy_wasm::traits::*; use proxy_wasm::types::*; use std::time::Duration; @@ -32,40 +32,40 @@ struct HttpParallelCall { impl HttpContext for HttpParallelCall { fn on_http_request_headers(&mut self, _: usize, _: bool) -> Action { // "Hello, " - let promise1 = self.client.dispatch( - "httpbin", - vec![ - (":method", "GET"), - (":path", "/base64/SGVsbG8sIA=="), - (":authority", "httpbin.org"), - ], - None, - vec![], - Duration::from_secs(1), - ); + let promise1 = self + .client + .dispatch( + "httpbin", + vec![ + (":method", "GET"), + (":path", "/base64/SGVsbG8sIA=="), + (":authority", "httpbin.org"), + ], + None, + vec![], + Duration::from_secs(1), + ) + .then(|(_, _, body_size, _)| get_http_call_response_body_string(0, body_size)) + .then(|body| body.unwrap_or_default()); // "World!" - let promise2 = self.client.dispatch( - "httpbin", - vec![ - (":method", "GET"), - (":path", "/base64/V29ybGQh"), - (":authority", "httpbin.org"), - ], - None, - vec![], - Duration::from_secs(1), - ); + let promise2 = self + .client + .dispatch( + "httpbin", + vec![ + (":method", "GET"), + (":path", "/base64/V29ybGQh"), + (":authority", "httpbin.org"), + ], + None, + vec![], + Duration::from_secs(1), + ) + .then(|(_, _, body_size, _)| get_http_call_response_body_string(0, body_size)) + .then(|body| body.unwrap_or_default()); - Promise::all_of(vec![ - promise1 - .then(|(_, _, body_size, _)| get_http_call_response_body_string(0, body_size)) - .then(|body| body.unwrap_or_default()), - promise2 - .then(|(_, _, body_size, _)| get_http_call_response_body_string(0, body_size)) - .then(|body| body.unwrap_or_default()), - ]) - .then(|results| { + Promise::all_of(vec![promise1, promise2]).then(|results| { send_http_response( 200, vec![], From a5ff216f0fe026edb2fd0a75f57ae23f9b01c23d Mon Sep 17 00:00:00 2001 From: "jizhuozhi.george" Date: Mon, 14 Oct 2024 18:52:43 +0800 Subject: [PATCH 08/11] Add Promise support for http callout fix ci Signed-off-by: jizhuozhi.george --- src/callout/promise.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/callout/promise.rs b/src/callout/promise.rs index badb5f5b..873940ca 100644 --- a/src/callout/promise.rs +++ b/src/callout/promise.rs @@ -66,7 +66,7 @@ where match &*self.state.borrow() { PromiseState::Pending => { *self.then_callback.borrow_mut() = Some(Box::new(move |value| { - let result = f(value.clone()); + let result = f(value); new_promise_clone.fulfill(result); })); let new_promise_for_catch = new_promise.clone(); @@ -135,7 +135,7 @@ where .catch(move |reason| { if !*rejected_clone_for_catch.borrow() { *rejected_clone_for_catch.borrow_mut() = true; - next_promise_clone_for_catch.reject(reason.clone()); + next_promise_clone_for_catch.reject(reason); } }); } @@ -164,7 +164,7 @@ where }) .catch(move |err| { if first_error_clone.borrow().is_none() { - *first_error_clone.borrow_mut() = Some(err.clone()); + *first_error_clone.borrow_mut() = Some(err); } *remaining_clone_for_catch.borrow_mut() -= 1; From dc6e39f140dcef3e2e7d63de1857a1761262e32a Mon Sep 17 00:00:00 2001 From: "jizhuozhi.george" Date: Wed, 16 Oct 2024 13:20:39 +0800 Subject: [PATCH 09/11] Add Promise support for http callout fix example cloning self to move but not using hostcalls Signed-off-by: jizhuozhi.george --- examples/http_parallel_call/src/lib.rs | 48 ++++++++++++++------------ 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/examples/http_parallel_call/src/lib.rs b/examples/http_parallel_call/src/lib.rs index 1e5e1599..68c2529f 100644 --- a/examples/http_parallel_call/src/lib.rs +++ b/examples/http_parallel_call/src/lib.rs @@ -14,9 +14,10 @@ use proxy_wasm::callout::http::HttpClient; use proxy_wasm::callout::promise::Promise; -use proxy_wasm::hostcalls; use proxy_wasm::traits::*; use proxy_wasm::types::*; +use std::cell::RefCell; +use std::rc::Rc; use std::time::Duration; proxy_wasm::main! {{ @@ -24,16 +25,21 @@ proxy_wasm::main! {{ proxy_wasm::set_http_context(|_, _| -> Box { Box::new(HttpParallelCall::default()) }); }} -#[derive(Default)] +#[derive(Default, Clone)] struct HttpParallelCall { - client: HttpClient, + client: Rc>, } impl HttpContext for HttpParallelCall { fn on_http_request_headers(&mut self, _: usize, _: bool) -> Action { + let self_clone_for_promise1 = self.clone(); + let self_clone_for_promise2 = self.clone(); + let self_clone_for_join = self.clone(); + // "Hello, " let promise1 = self .client + .borrow_mut() .dispatch( "httpbin", vec![ @@ -45,12 +51,17 @@ impl HttpContext for HttpParallelCall { vec![], Duration::from_secs(1), ) - .then(|(_, _, body_size, _)| get_http_call_response_body_string(0, body_size)) - .then(|body| body.unwrap_or_default()); + .then(move |(_, _, body_size, _)| { + match self_clone_for_promise1.get_http_call_response_body(0, body_size) { + None => "".to_owned(), + Some(bytes) => String::from_utf8(bytes.to_vec()).unwrap(), + } + }); // "World!" let promise2 = self .client + .borrow_mut() .dispatch( "httpbin", vec![ @@ -62,11 +73,15 @@ impl HttpContext for HttpParallelCall { vec![], Duration::from_secs(1), ) - .then(|(_, _, body_size, _)| get_http_call_response_body_string(0, body_size)) - .then(|body| body.unwrap_or_default()); + .then(move |(_, _, body_size, _)| { + match self_clone_for_promise2.get_http_call_response_body(0, body_size) { + None => "".to_owned(), + Some(bytes) => String::from_utf8(bytes.to_vec()).unwrap(), + } + }); - Promise::all_of(vec![promise1, promise2]).then(|results| { - send_http_response( + Promise::all_of(vec![promise1, promise2]).then(move |results| { + self_clone_for_join.send_http_response( 200, vec![], Some(format!("{}{}\n", results[0], results[1]).as_bytes()), @@ -91,20 +106,7 @@ impl Context for HttpParallelCall { num_trailers: usize, ) { self.client + .borrow_mut() .callback(token_id, num_headers, body_size, num_trailers) } } - -fn get_http_call_response_body_string(start: usize, max_size: usize) -> Option { - match hostcalls::get_buffer(BufferType::HttpCallResponseBody, start, max_size).unwrap() { - None => None, - Some(bytes) => { - let body_string = String::from_utf8(bytes.to_vec()).unwrap(); - Some(body_string) - } - } -} - -fn send_http_response(status_code: u32, headers: Vec<(&str, &str)>, body: Option<&[u8]>) { - hostcalls::send_http_response(status_code, headers, body).unwrap() -} From d79799940c53a55e9555cddaed4a30344a061ab7 Mon Sep 17 00:00:00 2001 From: "jizhuozhi.george" Date: Sat, 19 Oct 2024 23:18:01 +0800 Subject: [PATCH 10/11] Add Promise support for http callout fix licenses and bazel (maybe) Signed-off-by: jizhuozhi.george --- BUILD | 2 +- src/callout/http.rs | 14 ++++++++++++++ src/callout/mod.rs | 14 ++++++++++++++ src/lib.rs | 7 ++++--- 4 files changed, 33 insertions(+), 4 deletions(-) diff --git a/BUILD b/BUILD index 7f611e4f..3cc708e3 100644 --- a/BUILD +++ b/BUILD @@ -29,7 +29,7 @@ cargo_build_script( rust_library( name = "proxy_wasm", - srcs = glob(["src/*.rs"]), + srcs = glob(["src/*.rs", "src/callout/*.rs"]), edition = "2018", visibility = ["//visibility:public"], deps = [ diff --git a/src/callout/http.rs b/src/callout/http.rs index 04d345af..90ebdb2a 100644 --- a/src/callout/http.rs +++ b/src/callout/http.rs @@ -1,3 +1,17 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + use crate::callout::promise::Promise; use crate::hostcalls; use std::collections::HashMap; diff --git a/src/callout/mod.rs b/src/callout/mod.rs index 2eb5e9c2..b67a869d 100644 --- a/src/callout/mod.rs +++ b/src/callout/mod.rs @@ -1,2 +1,16 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + pub mod http; pub mod promise; diff --git a/src/lib.rs b/src/lib.rs index 974e42f9..14f37989 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,14 +12,15 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod allocator; pub mod callout; -mod dispatcher; pub mod hostcalls; -mod logger; pub mod traits; pub mod types; +mod allocator; +mod dispatcher; +mod logger; + // For crate-type="cdylib". #[cfg(not(wasi_exec_model_reactor))] #[macro_export] From 9833aa6e08de804697e89e0306edf61f0a6e6639 Mon Sep 17 00:00:00 2001 From: jizhuozhi Date: Mon, 4 Nov 2024 00:44:26 +0800 Subject: [PATCH 11/11] Add Promise support for http callout Signed-off-by: jizhuozhi.george --- BUILD | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/BUILD b/BUILD index 3cc708e3..1f5f914e 100644 --- a/BUILD +++ b/BUILD @@ -29,7 +29,10 @@ cargo_build_script( rust_library( name = "proxy_wasm", - srcs = glob(["src/*.rs", "src/callout/*.rs"]), + srcs = glob([ + "src/*.rs", + "src/callout/*.rs", + ]), edition = "2018", visibility = ["//visibility:public"], deps = [