diff --git a/plugin-server/src/cdp/async-function-executor.ts b/plugin-server/src/cdp/async-function-executor.ts index 78a9374a78171..fe6df753dc723 100644 --- a/plugin-server/src/cdp/async-function-executor.ts +++ b/plugin-server/src/cdp/async-function-executor.ts @@ -119,16 +119,11 @@ export class AsyncFunctionExecutor { url, method, headers, - body, - }), - metadata: JSON.stringify({ - // TODO: It seems like Fetch expects metadata to have this shape, which - // I don't understand. I think `metadata` is where all the other Hog - // state is going to be stored? For now I'm just trying to make fetch - // work. - tries: 0, - trace: [], + // The body is passed in the `blob` field below. }), + metadata: JSON.stringify({}), + // Fetch bodies are passed in the binary blob column/field. + blob: toUint8Array(body), }) } catch (e) { status.error( @@ -193,3 +188,23 @@ export class AsyncFunctionExecutor { return response } } + +function toUint8Array(data: any): Uint8Array | undefined { + if (data === null || data === undefined) { + return undefined + } + + if (data instanceof Uint8Array) { + return data + } + + if (data instanceof ArrayBuffer) { + return new Uint8Array(data) + } + + if (typeof data === 'string') { + return new TextEncoder().encode(data) + } + + return new TextEncoder().encode(JSON.stringify(data)) +} diff --git a/rust/.cargo/config.toml b/rust/.cargo/config.toml index 7a657288d3f48..2b5cb3c5910a0 100644 --- a/rust/.cargo/config.toml +++ b/rust/.cargo/config.toml @@ -1,4 +1,4 @@ [env] # Force SQLX to run in offline mode for CI. Devs can change this if they want, to live code against the DB, # but we use it at the workspace level here to allow use of sqlx macros across all crates -SQLX_OFFLINE = "true" \ No newline at end of file +SQLX_OFFLINE = "true" diff --git a/rust/.env b/rust/.env index 43eda2a13040b..d37feead94dcb 100644 --- a/rust/.env +++ b/rust/.env @@ -1 +1 @@ -DATABASE_URL=postgres://posthog:posthog@localhost:15432/test_database +DATABASE_URL=postgres://posthog:posthog@localhost:15432/test_database \ No newline at end of file diff --git a/rust/cyclotron-core/.sqlx/query-1345c0c65a353ab5e6e2086e83cadb742119c89146e609b2db34edd21ca95ae5.json b/rust/cyclotron-core/.sqlx/query-1345c0c65a353ab5e6e2086e83cadb742119c89146e609b2db34edd21ca95ae5.json deleted file mode 100644 index ab9aae4aa0568..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-1345c0c65a353ab5e6e2086e83cadb742119c89146e609b2db34edd21ca95ae5.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT value FROM cyclotron_metadata WHERE key = $1", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "value", - "type_info": "Text" - } - ], - "parameters": { - "Left": ["Text"] - }, - "nullable": [false] - }, - "hash": "1345c0c65a353ab5e6e2086e83cadb742119c89146e609b2db34edd21ca95ae5" -} diff --git a/rust/cyclotron-core/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json b/rust/cyclotron-core/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json index 7a3a8b98d9da5..23b0665a2d357 100644 --- a/rust/cyclotron-core/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json +++ b/rust/cyclotron-core/.sqlx/query-16d533b5a15b0b9926a181f578b5b577efe424710b45f02e1ddeece8bca96f87.json @@ -4,7 +4,7 @@ "describe": { "columns": [], "parameters": { - "Left": ["Text", "Uuid", "Uuid"] + "Left": ["Bytea", "Uuid", "Uuid"] }, "nullable": [] }, diff --git a/rust/cyclotron-core/.sqlx/query-350983ef271029734aff70eb7e298bfe578ecaa8678268863bce917ced9d5d46.json b/rust/cyclotron-core/.sqlx/query-229c28c25aec24180c29e6ed636c165376f43116b85921c62b36b1b8e85562b0.json similarity index 87% rename from rust/cyclotron-core/.sqlx/query-350983ef271029734aff70eb7e298bfe578ecaa8678268863bce917ced9d5d46.json rename to rust/cyclotron-core/.sqlx/query-229c28c25aec24180c29e6ed636c165376f43116b85921c62b36b1b8e85562b0.json index d3a54ba7ef247..ffda6f4b70b26 100644 --- a/rust/cyclotron-core/.sqlx/query-350983ef271029734aff70eb7e298bfe578ecaa8678268863bce917ced9d5d46.json +++ b/rust/cyclotron-core/.sqlx/query-229c28c25aec24180c29e6ed636c165376f43116b85921c62b36b1b8e85562b0.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\nWITH available AS (\n SELECT\n id,\n state\n FROM cyclotron_jobs\n WHERE\n state = 'available'::JobState\n AND queue_name = $1\n AND scheduled <= NOW()\n ORDER BY\n priority ASC,\n scheduled ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n)\nUPDATE cyclotron_jobs\nSET\n state = 'running'::JobState,\n lock_id = $3,\n last_heartbeat = NOW(),\n last_transition = NOW(),\n transition_count = transition_count + 1\nFROM available\nWHERE\n cyclotron_jobs.id = available.id\nRETURNING\n cyclotron_jobs.id,\n team_id,\n available.state as \"state: JobState\",\n queue_name,\n priority,\n function_id,\n created,\n last_transition,\n scheduled,\n transition_count,\n NULL as vm_state,\n metadata,\n parameters,\n lock_id,\n last_heartbeat,\n janitor_touch_count\n ", + "query": "\nWITH available AS (\n SELECT\n id,\n state\n FROM cyclotron_jobs\n WHERE\n state = 'available'::JobState\n AND queue_name = $1\n AND scheduled <= NOW()\n ORDER BY\n priority ASC,\n scheduled ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n)\nUPDATE cyclotron_jobs\nSET\n state = 'running'::JobState,\n lock_id = $3,\n last_heartbeat = NOW(),\n last_transition = NOW(),\n transition_count = transition_count + 1\nFROM available\nWHERE\n cyclotron_jobs.id = available.id\nRETURNING\n cyclotron_jobs.id,\n team_id,\n available.state as \"state: JobState\",\n queue_name,\n priority,\n function_id,\n created,\n last_transition,\n scheduled,\n transition_count,\n NULL::bytea as vm_state,\n metadata,\n parameters,\n blob,\n lock_id,\n last_heartbeat,\n janitor_touch_count\n ", "describe": { "columns": [ { @@ -63,30 +63,35 @@ { "ordinal": 10, "name": "vm_state", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 11, "name": "metadata", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 12, "name": "parameters", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 13, + "name": "blob", + "type_info": "Bytea" + }, + { + "ordinal": 14, "name": "lock_id", "type_info": "Uuid" }, { - "ordinal": 14, + "ordinal": 15, "name": "last_heartbeat", "type_info": "Timestamptz" }, { - "ordinal": 15, + "ordinal": 16, "name": "janitor_touch_count", "type_info": "Int2" } @@ -110,8 +115,9 @@ true, true, true, + true, false ] }, - "hash": "350983ef271029734aff70eb7e298bfe578ecaa8678268863bce917ced9d5d46" + "hash": "229c28c25aec24180c29e6ed636c165376f43116b85921c62b36b1b8e85562b0" } diff --git a/rust/cyclotron-core/.sqlx/query-58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050.json b/rust/cyclotron-core/.sqlx/query-58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050.json new file mode 100644 index 0000000000000..5a2231c7c2fdd --- /dev/null +++ b/rust/cyclotron-core/.sqlx/query-58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050.json @@ -0,0 +1,12 @@ +{ + "db_name": "PostgreSQL", + "query": "UPDATE cyclotron_jobs SET blob = $1 WHERE id = $2 AND lock_id = $3", + "describe": { + "columns": [], + "parameters": { + "Left": ["Bytea", "Uuid", "Uuid"] + }, + "nullable": [] + }, + "hash": "58dfd4671ac3497614b184384ac7f8d490dda8b27a150454e413d02f89c92050" +} diff --git a/rust/cyclotron-core/.sqlx/query-61e0bf6eb6d66519b347441569946d8acfb4ea86954f95a1cab71051eaffc907.json b/rust/cyclotron-core/.sqlx/query-61e0bf6eb6d66519b347441569946d8acfb4ea86954f95a1cab71051eaffc907.json deleted file mode 100644 index b890f6f8d43d5..0000000000000 --- a/rust/cyclotron-core/.sqlx/query-61e0bf6eb6d66519b347441569946d8acfb4ea86954f95a1cab71051eaffc907.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "INSERT INTO cyclotron_metadata (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value RETURNING value", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "value", - "type_info": "Text" - } - ], - "parameters": { - "Left": ["Text", "Text"] - }, - "nullable": [false] - }, - "hash": "61e0bf6eb6d66519b347441569946d8acfb4ea86954f95a1cab71051eaffc907" -} diff --git a/rust/cyclotron-core/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json b/rust/cyclotron-core/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json index 8c3a3dbde8b62..66ae665232405 100644 --- a/rust/cyclotron-core/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json +++ b/rust/cyclotron-core/.sqlx/query-8ab11a89bc4720985e130c58021f46045c332cc45e69b08708b289cc933b3b5c.json @@ -4,7 +4,7 @@ "describe": { "columns": [], "parameters": { - "Left": ["Text", "Uuid", "Uuid"] + "Left": ["Bytea", "Uuid", "Uuid"] }, "nullable": [] }, diff --git a/rust/cyclotron-core/.sqlx/query-aa595eaf28c1f4b872c278be407b59cc00f3125413f4032ac3647a6b5ee1a632.json b/rust/cyclotron-core/.sqlx/query-aa595eaf28c1f4b872c278be407b59cc00f3125413f4032ac3647a6b5ee1a632.json index bd8a7cdd90282..51fb1b018120b 100644 --- a/rust/cyclotron-core/.sqlx/query-aa595eaf28c1f4b872c278be407b59cc00f3125413f4032ac3647a6b5ee1a632.json +++ b/rust/cyclotron-core/.sqlx/query-aa595eaf28c1f4b872c278be407b59cc00f3125413f4032ac3647a6b5ee1a632.json @@ -6,7 +6,7 @@ { "ordinal": 0, "name": "vm_state", - "type_info": "Text" + "type_info": "Bytea" } ], "parameters": { diff --git a/rust/cyclotron-core/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json b/rust/cyclotron-core/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json index ea9c7f8fceb06..4364f2fee8816 100644 --- a/rust/cyclotron-core/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json +++ b/rust/cyclotron-core/.sqlx/query-b160b785a0377b854341105e99e4ef7a5da523e168a5f9be47f6caaef09487d7.json @@ -4,7 +4,7 @@ "describe": { "columns": [], "parameters": { - "Left": ["Text", "Uuid", "Uuid"] + "Left": ["Bytea", "Uuid", "Uuid"] }, "nullable": [] }, diff --git a/rust/cyclotron-core/.sqlx/query-c624261597b9356ff3e7c3e392a84bb0b551e91c503e8b21c29814f1eb660a8e.json b/rust/cyclotron-core/.sqlx/query-ce036f16a37a41b9dc5a164de0b52345454cd3323568c4bef5b8480380287068.json similarity index 89% rename from rust/cyclotron-core/.sqlx/query-c624261597b9356ff3e7c3e392a84bb0b551e91c503e8b21c29814f1eb660a8e.json rename to rust/cyclotron-core/.sqlx/query-ce036f16a37a41b9dc5a164de0b52345454cd3323568c4bef5b8480380287068.json index b94965873e7d6..fe174820c3a07 100644 --- a/rust/cyclotron-core/.sqlx/query-c624261597b9356ff3e7c3e392a84bb0b551e91c503e8b21c29814f1eb660a8e.json +++ b/rust/cyclotron-core/.sqlx/query-ce036f16a37a41b9dc5a164de0b52345454cd3323568c4bef5b8480380287068.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\nWITH available AS (\n SELECT\n id,\n state\n FROM cyclotron_jobs\n WHERE\n state = 'available'::JobState\n AND queue_name = $1\n AND scheduled <= NOW()\n ORDER BY\n priority ASC,\n scheduled ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n)\nUPDATE cyclotron_jobs\nSET\n state = 'running'::JobState,\n lock_id = $3,\n last_heartbeat = NOW(),\n last_transition = NOW(),\n transition_count = transition_count + 1\nFROM available\nWHERE\n cyclotron_jobs.id = available.id\nRETURNING\n cyclotron_jobs.id,\n team_id,\n available.state as \"state: JobState\",\n queue_name,\n priority,\n function_id,\n created,\n last_transition,\n scheduled,\n transition_count,\n vm_state,\n metadata,\n parameters,\n lock_id,\n last_heartbeat,\n janitor_touch_count\n ", + "query": "\nWITH available AS (\n SELECT\n id,\n state\n FROM cyclotron_jobs\n WHERE\n state = 'available'::JobState\n AND queue_name = $1\n AND scheduled <= NOW()\n ORDER BY\n priority ASC,\n scheduled ASC\n LIMIT $2\n FOR UPDATE SKIP LOCKED\n)\nUPDATE cyclotron_jobs\nSET\n state = 'running'::JobState,\n lock_id = $3,\n last_heartbeat = NOW(),\n last_transition = NOW(),\n transition_count = transition_count + 1\nFROM available\nWHERE\n cyclotron_jobs.id = available.id\nRETURNING\n cyclotron_jobs.id,\n team_id,\n available.state as \"state: JobState\",\n queue_name,\n priority,\n function_id,\n created,\n last_transition,\n scheduled,\n transition_count,\n vm_state,\n metadata,\n parameters,\n blob,\n lock_id,\n last_heartbeat,\n janitor_touch_count\n ", "describe": { "columns": [ { @@ -63,30 +63,35 @@ { "ordinal": 10, "name": "vm_state", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 11, "name": "metadata", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 12, "name": "parameters", - "type_info": "Text" + "type_info": "Bytea" }, { "ordinal": 13, + "name": "blob", + "type_info": "Bytea" + }, + { + "ordinal": 14, "name": "lock_id", "type_info": "Uuid" }, { - "ordinal": 14, + "ordinal": 15, "name": "last_heartbeat", "type_info": "Timestamptz" }, { - "ordinal": 15, + "ordinal": 16, "name": "janitor_touch_count", "type_info": "Int2" } @@ -110,8 +115,9 @@ true, true, true, + true, false ] }, - "hash": "c624261597b9356ff3e7c3e392a84bb0b551e91c503e8b21c29814f1eb660a8e" + "hash": "ce036f16a37a41b9dc5a164de0b52345454cd3323568c4bef5b8480380287068" } diff --git a/rust/cyclotron-core/.sqlx/query-7217e766aeb53298238222c0c71a2ce446cac731845c53cb926fc47ace708dd6.json b/rust/cyclotron-core/.sqlx/query-f074766d1fc32df17f92667f412af30c682288988fc6f102e8a063be97c3e51c.json similarity index 72% rename from rust/cyclotron-core/.sqlx/query-7217e766aeb53298238222c0c71a2ce446cac731845c53cb926fc47ace708dd6.json rename to rust/cyclotron-core/.sqlx/query-f074766d1fc32df17f92667f412af30c682288988fc6f102e8a063be97c3e51c.json index 230374e98d610..6139be53026c1 100644 --- a/rust/cyclotron-core/.sqlx/query-7217e766aeb53298238222c0c71a2ce446cac731845c53cb926fc47ace708dd6.json +++ b/rust/cyclotron-core/.sqlx/query-f074766d1fc32df17f92667f412af30c682288988fc6f102e8a063be97c3e51c.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\nINSERT INTO cyclotron_jobs\n (\n id,\n team_id,\n function_id,\n created,\n lock_id,\n last_heartbeat,\n janitor_touch_count,\n transition_count,\n last_transition,\n queue_name,\n state,\n scheduled,\n priority,\n vm_state,\n metadata,\n parameters\n )\nVALUES\n ($1, $2, $3, NOW(), NULL, NULL, 0, 0, NOW(), $4, $5, $6, $7, $8, $9, $10)\n ", + "query": "\nINSERT INTO cyclotron_jobs\n (\n id,\n team_id,\n function_id,\n created,\n lock_id,\n last_heartbeat,\n janitor_touch_count,\n transition_count,\n last_transition,\n queue_name,\n state,\n scheduled,\n priority,\n vm_state,\n metadata,\n parameters,\n blob\n )\nVALUES\n ($1, $2, $3, NOW(), NULL, NULL, 0, 0, NOW(), $4, $5, $6, $7, $8, $9, $10, $11)\n ", "describe": { "columns": [], "parameters": { @@ -19,12 +19,13 @@ }, "Timestamptz", "Int2", - "Text", - "Text", - "Text" + "Bytea", + "Bytea", + "Bytea", + "Bytea" ] }, "nullable": [] }, - "hash": "7217e766aeb53298238222c0c71a2ce446cac731845c53cb926fc47ace708dd6" + "hash": "f074766d1fc32df17f92667f412af30c682288988fc6f102e8a063be97c3e51c" } diff --git a/rust/cyclotron-core/migrations/20240823191751_bytes_over_text.sql b/rust/cyclotron-core/migrations/20240823191751_bytes_over_text.sql new file mode 100644 index 0000000000000..cbe476e3c30e7 --- /dev/null +++ b/rust/cyclotron-core/migrations/20240823191751_bytes_over_text.sql @@ -0,0 +1,5 @@ +ALTER TABLE cyclotron_jobs + ALTER COLUMN vm_state TYPE bytea USING vm_state::bytea, + ALTER COLUMN metadata TYPE bytea USING metadata::bytea, + ALTER COLUMN parameters TYPE bytea USING parameters::bytea, + ADD COLUMN blob bytea; diff --git a/rust/cyclotron-core/src/bin/create_test_data.rs b/rust/cyclotron-core/src/bin/create_test_data.rs index ce875c676c98b..2e194378dcd24 100644 --- a/rust/cyclotron-core/src/bin/create_test_data.rs +++ b/rust/cyclotron-core/src/bin/create_test_data.rs @@ -40,6 +40,7 @@ async fn main() { vm_state: None, parameters: None, metadata: None, + blob: None, }; manager.create_job(test_job).await.unwrap(); diff --git a/rust/cyclotron-core/src/bin/load_test.rs b/rust/cyclotron-core/src/bin/load_test.rs index 16dd825c1c305..f000ab49c6e12 100644 --- a/rust/cyclotron-core/src/bin/load_test.rs +++ b/rust/cyclotron-core/src/bin/load_test.rs @@ -38,6 +38,7 @@ async fn producer_loop(manager: QueueManager, shared_context: Arc function_id: Some(Uuid::now_v7()), vm_state: None, parameters: None, + blob: None, metadata: None, }; diff --git a/rust/cyclotron-core/src/lib.rs b/rust/cyclotron-core/src/lib.rs index 6121fea182df3..e737f38360165 100644 --- a/rust/cyclotron-core/src/lib.rs +++ b/rust/cyclotron-core/src/lib.rs @@ -5,6 +5,7 @@ mod ops; // Types mod types; pub use types::BulkInsertResult; +pub use types::Bytes; pub use types::Job; pub use types::JobInit; pub use types::JobState; diff --git a/rust/cyclotron-core/src/ops/manager.rs b/rust/cyclotron-core/src/ops/manager.rs index 8c2ec30372adf..b0a51403439b9 100644 --- a/rust/cyclotron-core/src/ops/manager.rs +++ b/rust/cyclotron-core/src/ops/manager.rs @@ -30,10 +30,11 @@ INSERT INTO cyclotron_jobs priority, vm_state, metadata, - parameters + parameters, + blob ) VALUES - ($1, $2, $3, NOW(), NULL, NULL, 0, 0, NOW(), $4, $5, $6, $7, $8, $9, $10) + ($1, $2, $3, NOW(), NULL, NULL, 0, 0, NOW(), $4, $5, $6, $7, $8, $9, $10, $11) "#, id, data.team_id, @@ -44,7 +45,8 @@ VALUES data.priority, data.vm_state, data.metadata, - data.parameters + data.parameters, + data.blob ) .execute(executor) .await?; @@ -74,6 +76,7 @@ where let mut vm_states = Vec::with_capacity(jobs.len()); let mut metadatas = Vec::with_capacity(jobs.len()); let mut parameters = Vec::with_capacity(jobs.len()); + let mut blob = Vec::with_capacity(jobs.len()); for d in jobs { ids.push(Uuid::now_v7()); @@ -92,6 +95,7 @@ where vm_states.push(d.vm_state.clone()); metadatas.push(d.metadata.clone()); parameters.push(d.parameters.clone()); + blob.push(d.blob.clone()); } // Using the "unnest" function to turn an array of rows into a set of rows @@ -114,7 +118,8 @@ INSERT INTO cyclotron_jobs priority, vm_state, metadata, - parameters + parameters, + blob ) SELECT * FROM UNNEST( @@ -133,7 +138,8 @@ FROM UNNEST( $13, $14, $15, - $16 + $16, + $17 ) "#, ) @@ -153,6 +159,7 @@ FROM UNNEST( .bind(vm_states) .bind(metadatas) .bind(parameters) + .bind(blob) .execute(executor) .await?; diff --git a/rust/cyclotron-core/src/ops/worker.rs b/rust/cyclotron-core/src/ops/worker.rs index efff1faf2bff6..c7b0f10c86530 100644 --- a/rust/cyclotron-core/src/ops/worker.rs +++ b/rust/cyclotron-core/src/ops/worker.rs @@ -4,7 +4,7 @@ use uuid::Uuid; use crate::{ error::QueueError, - types::{Job, JobState, JobUpdate}, + types::{Bytes, Job, JobState, JobUpdate}, }; use super::meta::throw_if_no_rows; @@ -59,9 +59,10 @@ RETURNING last_transition, scheduled, transition_count, - NULL as vm_state, + NULL::bytea as vm_state, metadata, parameters, + blob, lock_id, last_heartbeat, janitor_touch_count @@ -126,6 +127,7 @@ RETURNING vm_state, metadata, parameters, + blob, lock_id, last_heartbeat, janitor_touch_count @@ -142,12 +144,12 @@ pub async fn get_vm_state<'c, E>( executor: E, job_id: Uuid, lock_id: Uuid, -) -> Result, QueueError> +) -> Result, QueueError> where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { struct VMState { - vm_state: Option, + vm_state: Option, } let res = sqlx::query_as!( @@ -209,6 +211,10 @@ where set_parameters(&mut *txn, job_id, parameters, lock_id).await?; } + if let Some(blob) = updates.blob { + set_blob(&mut *txn, job_id, blob, lock_id).await?; + } + // Calling flush indicates forward progress, so we should touch the heartbeat set_heartbeat(&mut *txn, job_id, lock_id).await?; @@ -316,7 +322,7 @@ where pub async fn set_vm_state<'c, E>( executor: E, job_id: Uuid, - vm_state: Option, + vm_state: Option, lock_id: Uuid, ) -> Result<(), QueueError> where @@ -334,7 +340,7 @@ where pub async fn set_metadata<'c, E>( executor: E, job_id: Uuid, - metadata: Option, + metadata: Option, lock_id: Uuid, ) -> Result<(), QueueError> where @@ -352,7 +358,7 @@ where pub async fn set_parameters<'c, E>( executor: E, job_id: Uuid, - parameters: Option, + parameters: Option, lock_id: Uuid, ) -> Result<(), QueueError> where @@ -367,6 +373,24 @@ where assert_does_update(executor, job_id, lock_id, q).await } +pub async fn set_blob<'c, E>( + executor: E, + job_id: Uuid, + blob: Option, + lock_id: Uuid, +) -> Result<(), QueueError> +where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, +{ + let q = sqlx::query!( + "UPDATE cyclotron_jobs SET blob = $1 WHERE id = $2 AND lock_id = $3", + blob, + job_id, + lock_id + ); + assert_does_update(executor, job_id, lock_id, q).await +} + pub async fn set_heartbeat<'c, E>( executor: E, job_id: Uuid, diff --git a/rust/cyclotron-core/src/types.rs b/rust/cyclotron-core/src/types.rs index 8e0a11a6a822a..5adf86c6050b4 100644 --- a/rust/cyclotron-core/src/types.rs +++ b/rust/cyclotron-core/src/types.rs @@ -7,6 +7,8 @@ use uuid::Uuid; use crate::QueueError; +pub type Bytes = Vec; + #[derive(Debug, Deserialize, Serialize, sqlx::Type)] #[serde(rename_all = "lowercase")] #[sqlx(type_name = "JobState", rename_all = "lowercase")] @@ -47,9 +49,10 @@ pub struct JobInit { pub priority: i16, pub scheduled: DateTime, pub function_id: Option, - pub vm_state: Option, - pub parameters: Option, - pub metadata: Option, + pub vm_state: Option, + pub parameters: Option, + pub blob: Option, + pub metadata: Option, } #[derive(Debug, Deserialize, Serialize)] @@ -78,9 +81,10 @@ pub struct Job { pub scheduled: DateTime, // Job data - pub vm_state: Option, // The state of the VM this job is running on (if it exists) - pub metadata: Option, // Additional fields a worker can tack onto a job, for e.g. tracking some state across retries (or number of retries in general by a given class of worker) - pub parameters: Option, // The actual parameters of the job (function args for a hog function, http request for a fetch function) + pub vm_state: Option, // The state of the VM this job is running on (if it exists) + pub metadata: Option, // Additional fields a worker can tack onto a job, for e.g. tracking some state across retries (or number of retries in general by a given class of worker) + pub parameters: Option, // The actual parameters of the job (function args for a hog function, http request for a fetch function) + pub blob: Option, // An additional, binary, parameter field (for things like fetch request body) } // A struct representing a set of updates for a job. Outer none values mean "don't update this field", @@ -92,9 +96,10 @@ pub struct JobUpdate { pub queue_name: Option, pub priority: Option, pub scheduled: Option>, - pub vm_state: Option>, - pub metadata: Option>, - pub parameters: Option>, + pub vm_state: Option>, + pub metadata: Option>, + pub parameters: Option>, + pub blob: Option>, } impl JobUpdate { @@ -108,6 +113,7 @@ impl JobUpdate { vm_state: None, metadata: None, parameters: None, + blob: None, } } } diff --git a/rust/cyclotron-core/src/worker.rs b/rust/cyclotron-core/src/worker.rs index f9f1ca65894d9..c1862eb9a7c7e 100644 --- a/rust/cyclotron-core/src/worker.rs +++ b/rust/cyclotron-core/src/worker.rs @@ -10,6 +10,7 @@ use crate::{ meta::{dead_letter, run_migrations}, worker::{dequeue_jobs, dequeue_with_vm_state, flush_job, get_vm_state, set_heartbeat}, }, + types::Bytes, Job, JobState, JobUpdate, PoolConfig, QueueError, }; @@ -99,7 +100,7 @@ impl Worker { /// Retrieve the VM state for a job, if, for example, you dequeued it and then realised you /// need the VM state as well. - pub async fn get_vm_state(&self, job_id: Uuid) -> Result, QueueError> { + pub async fn get_vm_state(&self, job_id: Uuid) -> Result, QueueError> { let lock_id = { let pending = self.pending.lock().unwrap(); pending @@ -212,7 +213,7 @@ impl Worker { pub fn set_vm_state( &self, job_id: Uuid, - vm_state: Option, // This (and the following) are Options, because the user can null them (by calling with None) + vm_state: Option, // This (and the following) are Options, because the user can null them (by calling with None) ) -> Result<(), QueueError> { let mut pending = self.pending.lock().unwrap(); pending @@ -223,7 +224,7 @@ impl Worker { } /// Passing None here will clear the metadata - pub fn set_metadata(&self, job_id: Uuid, metadata: Option) -> Result<(), QueueError> { + pub fn set_metadata(&self, job_id: Uuid, metadata: Option) -> Result<(), QueueError> { let mut pending = self.pending.lock().unwrap(); pending .get_mut(&job_id) @@ -236,7 +237,7 @@ impl Worker { pub fn set_parameters( &self, job_id: Uuid, - parameters: Option, + parameters: Option, ) -> Result<(), QueueError> { let mut pending = self.pending.lock().unwrap(); pending @@ -260,4 +261,14 @@ impl Worker { dead_letter(&self.pool, job_id, reason).await } + + /// Passing None here will clear the blob + pub fn set_blob(&self, job_id: Uuid, blob: Option) -> Result<(), QueueError> { + let mut pending = self.pending.lock().unwrap(); + pending + .get_mut(&job_id) + .ok_or(QueueError::UnknownJobId(job_id))? + .blob = Some(blob); + Ok(()) + } } diff --git a/rust/cyclotron-core/tests/base_ops.rs b/rust/cyclotron-core/tests/base_ops.rs index 36ceb7af4743e..35c55c7037f44 100644 --- a/rust/cyclotron-core/tests/base_ops.rs +++ b/rust/cyclotron-core/tests/base_ops.rs @@ -197,13 +197,16 @@ async fn test_queue(db: PgPool) { .set_scheduled_at(job.id, now - Duration::minutes(10)) .expect("failed to set scheduled_at"); worker - .set_vm_state(job.id, Some("test".to_string())) + .set_vm_state(job.id, Some("test".as_bytes().to_owned())) .expect("failed to set vm_state"); worker - .set_parameters(job.id, Some("test".to_string())) + .set_parameters(job.id, Some("test".as_bytes().to_owned())) .expect("failed to set parameters"); worker - .set_metadata(job.id, Some("test".to_string())) + .set_blob(job.id, Some("test".as_bytes().to_owned())) + .expect("failed to set blob"); + worker + .set_metadata(job.id, Some("test".as_bytes().to_owned())) .expect("failed to set metadata"); // Flush the job @@ -221,9 +224,9 @@ async fn test_queue(db: PgPool) { assert_eq!(job.queue_name, "test_2"); assert_eq!(job.priority, 1); assert!(dates_match(&job.scheduled, &(now - Duration::minutes(10))),); - assert_eq!(job.vm_state, Some("test".to_string())); - assert_eq!(job.parameters, Some("test".to_string())); - assert_eq!(job.metadata, Some("test".to_string())); + assert_eq!(job.vm_state, Some("test".as_bytes().to_owned())); + assert_eq!(job.parameters, Some("test".as_bytes().to_owned())); + assert_eq!(job.metadata, Some("test".as_bytes().to_owned())); } #[sqlx::test(migrations = "./migrations")] diff --git a/rust/cyclotron-core/tests/common.rs b/rust/cyclotron-core/tests/common.rs index b1e6d3e715de2..16c4cc5d2eaef 100644 --- a/rust/cyclotron-core/tests/common.rs +++ b/rust/cyclotron-core/tests/common.rs @@ -11,7 +11,9 @@ pub fn create_new_job() -> JobInit { priority: 0, scheduled: Utc::now() - Duration::minutes(1), vm_state: None, + parameters: None, + blob: None, metadata: None, } } diff --git a/rust/cyclotron-fetch/src/fetch.rs b/rust/cyclotron-fetch/src/fetch.rs index 6893861c984de..bc5082e53ebaa 100644 --- a/rust/cyclotron-fetch/src/fetch.rs +++ b/rust/cyclotron-fetch/src/fetch.rs @@ -1,7 +1,7 @@ use std::{cmp::min, collections::HashMap, fmt::Display, sync::Arc}; use chrono::{DateTime, Duration, Utc}; -use cyclotron_core::{Job, JobState, QueueError, Worker}; +use cyclotron_core::{Bytes, Job, JobState, QueueError, Worker}; use futures::StreamExt; use http::StatusCode; use reqwest::Response; @@ -68,7 +68,6 @@ pub struct FetchParameters { pub method: HttpMethod, pub return_queue: String, pub headers: Option>, - pub body: Option, pub max_tries: Option, // Defaults to 3 pub on_finish: Option, // Defaults to Return } @@ -104,6 +103,19 @@ impl FetchResult { pub fn is_success(&self) -> bool { matches!(self, FetchResult::Success { .. }) } + + pub fn take_body(self) -> (Self, Option) { + match self { + FetchResult::Success { mut response } => { + let body = response.body.take(); + (FetchResult::Success { response }, body) + } + FetchResult::Failure { mut trace } => { + let body = trace.last_mut().and_then(|f| f.body.take()); + (FetchResult::Failure { trace }, body) + } + } + } } // We distinguish between a "fetch failure" and a "worker failure" - @@ -117,10 +129,11 @@ impl FetchResult { pub struct FetchFailure { pub kind: FetchFailureKind, pub message: String, - pub body: Option, // If we have a body, we include it in the failure pub headers: Option>, // If we have headers, we include them in the failure pub status: Option, // If we have a status, we include it in the failure pub timestamp: DateTime, // Useful for users to correlate logs when debugging + #[serde(skip)] // We serialise the body seperately into blob + pub body: Option, // If we have a body, we include it in the final failure (but not the trace) } impl FetchFailure { @@ -129,9 +142,9 @@ impl FetchFailure { kind, message: message.as_ref().to_string(), timestamp: Utc::now(), - body: None, headers: None, status: None, + body: None, } } @@ -140,29 +153,29 @@ impl FetchFailure { kind: FetchFailureKind::FailureStatus, message: format!("Received failure status: {}", status), timestamp: Utc::now(), - body: None, headers: None, status: Some(status.as_u16()), + body: None, } } - pub fn with_body(self, body: String) -> Self { + pub fn with_headers(self, headers: HashMap) -> Self { Self { - body: Some(body), + headers: Some(headers), ..self } } - pub fn with_headers(self, headers: HashMap) -> Self { + pub fn with_status(self, status: u16) -> Self { Self { - headers: Some(headers), + status: Some(status), ..self } } - pub fn with_status(self, status: u16) -> Self { + pub fn with_body(self, body: Bytes) -> Self { Self { - status: Some(status), + body: Some(body), ..self } } @@ -179,9 +192,9 @@ impl From for FetchFailure { kind, message: e.to_string(), timestamp: Utc::now(), - body: None, headers: None, status: None, + body: None, } } } @@ -195,7 +208,7 @@ pub enum FetchFailureKind { InvalidParameters, RequestError, FailureStatus, - InvalidBody, // Generally means the body could not be parsed toa utf8 string + InvalidBody, // We force bodies to be a utf8 string, for the sake of callers. TODO - we should consider letting callers enforce a body schema ResponseTooLarge, } @@ -204,7 +217,8 @@ pub enum FetchFailureKind { pub struct FetchResponse { pub status: u16, pub headers: HashMap, - pub body: String, + #[serde(skip)] // We serialise the body seperately into blob + pub body: Option, // This is only an option to let us `take` it, to avoid body copies on serialisation } #[instrument(skip_all)] @@ -273,7 +287,7 @@ impl From<&Job> for FetchMetadata { }; }; - let Ok(m) = serde_json::from_str(m) else { + let Ok(m) = serde_json::from_slice(m) else { return FetchMetadata { tries: 0, trace: vec![], @@ -288,17 +302,15 @@ impl TryFrom<&Job> for FetchParameters { type Error = FetchFailure; fn try_from(job: &Job) -> Result { - let Some(parameters) = &job.parameters else { - return Err(FetchFailure::new( - FetchFailureKind::MissingParameters, - "Job is missing parameters", - )); - }; + let params = job.parameters.as_ref().ok_or(FetchFailure::new( + FetchFailureKind::MissingParameters, + "Missing parameters", + ))?; - let Ok(p) = serde_json::from_str(parameters) else { + let Ok(p) = serde_json::from_slice(params) else { return Err(FetchFailure::new( FetchFailureKind::InvalidParameters, - "Failed to parse parameters", + "Invalid parameters", )); }; @@ -327,7 +339,7 @@ pub async fn run_job( .dead_letter(job.id, "Could not parse job parameters") .await; job_total - .label(OUTCOME_LABEL, "missing_parameters_dead_letter") + .label(OUTCOME_LABEL, "bad_parameters_dead_letter") .fin(); return Ok(res?); } @@ -335,7 +347,6 @@ pub async fn run_job( let method = (¶ms.method).into(); - // Parsing errors are always dead letters - it /will/ fail every time, so dump it let url: reqwest::Url = match (params.url).parse() { Ok(u) => u, Err(e) => { @@ -346,6 +357,7 @@ pub async fn run_job( format!("Invalid url: {} - {}", ¶ms.url, e), ); + // We can skip retries here - this failure will happen every time let res = quick_fail_job( &context.worker, job, @@ -355,9 +367,7 @@ pub async fn run_job( ) .await; - job_total - .label(OUTCOME_LABEL, "url_parse_dead_letter") - .fin(); + job_total.label(OUTCOME_LABEL, "url_parse_failed").fin(); return res; } }; @@ -381,13 +391,13 @@ pub async fn run_job( .await; job_total - .label(OUTCOME_LABEL, "headers_parse_dead_letter") + .label(OUTCOME_LABEL, "headers_parse_failure") .fin(); return res; } }; - let body = reqwest::Body::from(params.body.unwrap_or_default()); + let body = reqwest::Body::from(job.blob.unwrap_or_default()); let mut send_fut = context .client @@ -422,7 +432,8 @@ pub async fn run_job( common_metrics::inc(RESPONSE_RECEIVED, &labels, 1); let res = handle_fetch_failure( &context, - &job, + job.id, + job.priority, &metadata, params.max_tries.unwrap_or(DEFAULT_RETRIES), params.return_queue, @@ -461,13 +472,14 @@ pub async fn run_job( // We pre-emptively get the response body, because we incldued it in the failure trace, even if we got a failure status let body = first_n_bytes_of_response( &context.worker, - &job, + job.id, res, context.config.max_response_bytes, ) .await?; + let body = match body { - Ok(b) => b, + Ok(b) => b.into_bytes(), Err(e) => { body_time.label(OUTCOME_LABEL, "body_fetch_error").fin(); common_metrics::inc(BODY_FETCH_FAILED, &labels, 1); @@ -475,7 +487,8 @@ pub async fn run_job( let e = e.with_status(status.as_u16()).with_headers(headers); let res = handle_fetch_failure( &context, - &job, + job.id, + job.priority, &metadata, params.max_tries.unwrap_or(DEFAULT_RETRIES), params.return_queue, @@ -495,11 +508,12 @@ pub async fn run_job( // rude (and inefficient) if !status.is_success() { let failure = FetchFailure::failure_status(status) - .with_body(body) - .with_headers(headers); + .with_headers(headers) + .with_body(body); let res = handle_fetch_failure( &context, - &job, + job.id, + job.priority, &metadata, params.max_tries.unwrap_or(DEFAULT_RETRIES), params.return_queue, @@ -515,13 +529,13 @@ pub async fn run_job( response: FetchResponse { status: status.as_u16(), headers, - body, + body: Some(body), }, }; let res = complete_job( &context.worker, - &job, + job.id, params.return_queue, params.on_finish.unwrap_or(DEFAULT_ON_FINISH), result, @@ -543,7 +557,7 @@ pub async fn quick_fail_job( let result = FetchResult::Failure { trace: vec![failure], }; - complete_job(worker, &job, return_queue, on_finish, result).await + complete_job(worker, job.id, return_queue, on_finish, result).await } // Checks if the retry limit has been reached, and does one of: @@ -552,7 +566,8 @@ pub async fn quick_fail_job( #[allow(clippy::too_many_arguments)] pub async fn handle_fetch_failure( context: &AppContext, - job: &Job, + job_id: Uuid, + old_priority: i16, metadata: &FetchMetadata, max_tries: u32, return_queue: String, @@ -562,7 +577,7 @@ pub async fn handle_fetch_failure( where F: Into, { - let failure = failure.into(); + let failure: FetchFailure = failure.into(); let mut metadata = metadata.clone(); metadata.tries += 1; metadata.trace.push(failure); @@ -579,24 +594,23 @@ where let next_available = next_available + Duration::seconds((rand::random::() % 30) as i64); - // Set us up for a retry - update metadata, reschedule, and put back in the queue we pulled from + // Set us up for a retry - update metadata, reschedule context .worker - .set_metadata(job.id, Some(serde_json::to_string(&metadata)?))?; - context.worker.set_state(job.id, JobState::Available)?; - context.worker.set_queue(job.id, &job.queue_name)?; - context.worker.set_scheduled_at(job.id, next_available)?; + .set_metadata(job_id, Some(serde_json::to_vec(&metadata)?))?; + context.worker.set_state(job_id, JobState::Available)?; + context.worker.set_scheduled_at(job_id, next_available)?; // We downgrade the priority of jobs that fail, so first attempts at jobs get better QoS - context.worker.set_priority(job.id, job.priority + 1)?; + context.worker.set_priority(job_id, old_priority + 1)?; - context.worker.flush_job(job.id).await?; + context.worker.flush_job(job_id).await?; } else { // Complete the job, with a Failed result - let result = FetchResult::Failure { - trace: metadata.trace.clone(), + let result: FetchResult = FetchResult::Failure { + trace: metadata.trace, }; - complete_job(&context.worker, job, return_queue, on_finish, result).await?; + complete_job(&context.worker, job_id, return_queue, on_finish, result).await?; } Ok(()) @@ -605,34 +619,37 @@ where // Complete the job with some result. pub async fn complete_job( worker: &Worker, - job: &Job, + job_id: Uuid, return_queue: String, on_finish: OnFinish, result: FetchResult, ) -> Result<(), FetchError> { - worker.set_state(job.id, JobState::Available)?; - worker.set_queue(job.id, &return_queue)?; + worker.set_state(job_id, JobState::Available)?; + worker.set_queue(job_id, &return_queue)?; + let (result, body) = result.take_body(); let is_success = result.is_success(); - let result = do_or_dead_letter(worker, job.id, || serde_json::to_string(&result)).await??; + let result = do_or_dead_letter(worker, job_id, || serde_json::to_vec(&result)).await??; match (on_finish, is_success) { (OnFinish::Complete, true) => { - worker.set_state(job.id, JobState::Completed)?; + worker.set_state(job_id, JobState::Completed)?; } (OnFinish::Complete, false) => { - worker.set_state(job.id, JobState::Failed)?; + worker.set_state(job_id, JobState::Failed)?; } (OnFinish::Return, _) => { // If we're retuning the job, we don't care whether it succeeded or not, the caller wants it back - worker.set_state(job.id, JobState::Available)?; + worker.set_state(job_id, JobState::Available)?; } } - worker.set_parameters(job.id, Some(result))?; - worker.set_metadata(job.id, None)?; // We're finished with the job, so clear our internal state - worker.flush_job(job.id).await?; + worker.set_priority(job_id, 0)?; // Reset job priority on completion + worker.set_parameters(job_id, Some(result))?; + worker.set_blob(job_id, body)?; + worker.set_metadata(job_id, None)?; // We're finished with the job, so clear our internal state + worker.flush_job(job_id).await?; Ok(()) } @@ -640,15 +657,15 @@ pub async fn complete_job( // Pulls the body, while maintaining the job heartbeat. pub async fn first_n_bytes_of_response( worker: &Worker, - job: &Job, + job_id: Uuid, response: Response, n: usize, ) -> Result, FetchError> { let mut body = response.bytes_stream(); // We deserialize into a vec, and then parse to a string - let mut buffer = Vec::with_capacity(n); + let mut buffer = Vec::with_capacity(n / 4); // Assume most request responses will be significantly smaller than the max - worker.heartbeat(job.id).await?; + worker.heartbeat(job_id).await?; loop { tokio::select! { @@ -670,14 +687,20 @@ pub async fn first_n_bytes_of_response( _ = tokio::time::sleep(Duration::milliseconds(HEARTBEAT_INTERVAL_MS).to_std().unwrap()) => {} } // Heartbeat every time we get a new body chunk, or every HEARTBEAT_INTERVAL_MS - worker.heartbeat(job.id).await?; + worker.heartbeat(job_id).await?; } - let Ok(body) = String::from_utf8(buffer) else { - return Ok(Err(FetchFailure::new( - FetchFailureKind::InvalidBody, - "Body could not be parsed as utf8", - ))); + // TODO - we can handle binary data here, but for now we force response bodies to be utf8 string + let body = match String::from_utf8(buffer) { + Ok(s) => s, + Err(e) => { + let buffer = e.into_bytes(); + return Ok(Err(FetchFailure::new( + FetchFailureKind::InvalidBody, + "Body could not be parsed as utf8", + ) + .with_body(buffer))); + } }; Ok(Ok(body)) diff --git a/rust/cyclotron-fetch/tests/fetch.rs b/rust/cyclotron-fetch/tests/fetch.rs index 18a7469c6e7fe..42657837112ad 100644 --- a/rust/cyclotron-fetch/tests/fetch.rs +++ b/rust/cyclotron-fetch/tests/fetch.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, str::FromStr, sync::Arc}; +use std::{collections::HashMap, sync::Arc}; use chrono::Duration; use cyclotron_core::{QueueManager, Worker}; @@ -34,7 +34,7 @@ pub async fn test_completes_fetch(db: PgPool) { }); let params = construct_params(server.url("/test"), HttpMethod::Get); - let job = construct_job(params); + let job = construct_job(params, None); producer.create_job(job).await.unwrap(); let started = tick(context).await.unwrap(); @@ -44,14 +44,16 @@ pub async fn test_completes_fetch(db: PgPool) { let returned = wait_on_return(&return_worker, 1, false).await.unwrap(); let response: FetchResult = - serde_json::from_str(returned[0].parameters.as_ref().unwrap()).unwrap(); + serde_json::from_slice(returned[0].parameters.as_ref().unwrap()).unwrap(); let FetchResult::Success { response } = response else { panic!("Expected success response"); }; + let body = String::from_utf8(returned[0].blob.clone().unwrap()).unwrap(); + assert_eq!(response.status, 200); - assert_eq!(response.body, "Hello, world!"); + assert_eq!(body, "Hello, world!"); mock.assert_hits(1); } @@ -71,7 +73,7 @@ pub async fn test_returns_failure_after_retries(db: PgPool) { let mut params = construct_params(server.url("/test"), HttpMethod::Get); params.max_tries = Some(2); - let job = construct_job(params); + let job = construct_job(params, None); producer.create_job(job).await.unwrap(); // Tick twice for retry @@ -86,7 +88,7 @@ pub async fn test_returns_failure_after_retries(db: PgPool) { let returned = wait_on_return(&return_worker, 1, false).await.unwrap(); let response: FetchResult = - serde_json::from_str(returned[0].parameters.as_ref().unwrap()).unwrap(); + serde_json::from_slice(returned[0].parameters.as_ref().unwrap()).unwrap(); let FetchResult::Failure { trace } = response else { panic!("Expected failure response"); @@ -95,7 +97,6 @@ pub async fn test_returns_failure_after_retries(db: PgPool) { assert!(trace.len() == 2); for attempt in trace { assert_eq!(attempt.status, Some(500)); - assert_eq!(attempt.body, Some("test server error body".to_string())); } mock.assert_hits(2); @@ -114,8 +115,8 @@ pub fn fetch_discards_bad_metadata(db: PgPool) { }); let params = construct_params(server.url("/test"), HttpMethod::Get); - let mut job = construct_job(params); - job.metadata = Some("bad json".to_string()); + let mut job = construct_job(params, None); + job.metadata = Some("bad json".as_bytes().to_owned()); producer.create_job(job).await.unwrap(); let started = tick(context).await.unwrap(); @@ -125,14 +126,16 @@ pub fn fetch_discards_bad_metadata(db: PgPool) { let returned = wait_on_return(&return_worker, 1, false).await.unwrap(); let response: FetchResult = - serde_json::from_str(returned[0].parameters.as_ref().unwrap()).unwrap(); + serde_json::from_slice(returned[0].parameters.as_ref().unwrap()).unwrap(); let FetchResult::Success { response } = response else { panic!("Expected success response"); }; + let body = String::from_utf8(returned[0].blob.clone().unwrap()).unwrap(); + assert_eq!(response.status, 200); - assert_eq!(response.body, "Hello, world!"); + assert_eq!(body, "Hello, world!"); mock.assert_hits(1); } @@ -150,7 +153,7 @@ pub fn fetch_with_minimum_params_works(db: PgPool) { }); let params = construct_params(server.url("/test"), HttpMethod::Get); - let mut job = construct_job(params); + let mut job = construct_job(params, None); let url = server.url("/test"); let manual_params = json!({ @@ -160,7 +163,7 @@ pub fn fetch_with_minimum_params_works(db: PgPool) { }) .to_string(); - job.parameters = Some(manual_params); + job.parameters = Some(manual_params.as_bytes().to_owned()); producer.create_job(job).await.unwrap(); @@ -171,14 +174,16 @@ pub fn fetch_with_minimum_params_works(db: PgPool) { let returned = wait_on_return(&return_worker, 1, false).await.unwrap(); let response: FetchResult = - serde_json::from_str(returned[0].parameters.as_ref().unwrap()).unwrap(); + serde_json::from_slice(returned[0].parameters.as_ref().unwrap()).unwrap(); let FetchResult::Success { response } = response else { panic!("Expected success response"); }; + let body = String::from_utf8(returned[0].blob.clone().unwrap()).unwrap(); + assert_eq!(response.status, 200); - assert_eq!(response.body, "Hello, world!"); + assert_eq!(body, "Hello, world!"); mock.assert_hits(1); } @@ -202,7 +207,7 @@ pub async fn test_completes_fetch_with_headers(db: PgPool) { headers.insert("X-Test".to_string(), "test".to_string()); params.headers = Some(headers); - let job = construct_job(params); + let job = construct_job(params, None); producer.create_job(job).await.unwrap(); let started = tick(context).await.unwrap(); @@ -212,14 +217,16 @@ pub async fn test_completes_fetch_with_headers(db: PgPool) { let returned = wait_on_return(&return_worker, 1, false).await.unwrap(); let response: FetchResult = - serde_json::from_str(returned[0].parameters.as_ref().unwrap()).unwrap(); + serde_json::from_slice(returned[0].parameters.as_ref().unwrap()).unwrap(); let FetchResult::Success { response } = response else { panic!("Expected success response"); }; + let body = String::from_utf8(returned[0].blob.clone().unwrap()).unwrap(); + assert_eq!(response.status, 200); - assert_eq!(response.body, "Hello, world!"); + assert_eq!(body, "Hello, world!"); mock.assert_hits(1); } @@ -236,10 +243,9 @@ pub async fn test_completes_fetch_with_body(db: PgPool) { then.status(200).body("Hello, world!"); }); - let mut params = construct_params(server.url("/test"), HttpMethod::Post); - params.body = Some("test body".to_string()); + let params = construct_params(server.url("/test"), HttpMethod::Post); - let job = construct_job(params); + let job = construct_job(params, Some("test body".to_string().into())); producer.create_job(job).await.unwrap(); let started = tick(context).await.unwrap(); @@ -249,14 +255,16 @@ pub async fn test_completes_fetch_with_body(db: PgPool) { let returned = wait_on_return(&return_worker, 1, false).await.unwrap(); let response: FetchResult = - serde_json::from_str(returned[0].parameters.as_ref().unwrap()).unwrap(); + serde_json::from_slice(returned[0].parameters.as_ref().unwrap()).unwrap(); let FetchResult::Success { response } = response else { panic!("Expected success response"); }; + let body = String::from_utf8(returned[0].blob.clone().unwrap()).unwrap(); + assert_eq!(response.status, 200); - assert_eq!(response.body, "Hello, world!"); + assert_eq!(body, "Hello, world!"); mock.assert_hits(1); } @@ -274,8 +282,8 @@ pub async fn test_completes_fetch_with_vm_state(db: PgPool) { }); let params = construct_params(server.url("/test"), HttpMethod::Get); - let mut job = construct_job(params); - job.vm_state = Some(json!({"test": "state"}).to_string()); + let mut job = construct_job(params, None); + job.vm_state = Some(json!({"test": "state"}).to_string().into_bytes()); producer.create_job(job).await.unwrap(); let started = tick(context).await.unwrap(); @@ -284,18 +292,21 @@ pub async fn test_completes_fetch_with_vm_state(db: PgPool) { let returned = wait_on_return(&return_worker, 1, true).await.unwrap(); - let state = serde_json::Value::from_str(returned[0].vm_state.as_ref().unwrap()).unwrap(); + let state: serde_json::Value = + serde_json::from_slice(returned[0].vm_state.as_ref().unwrap()).unwrap(); assert_eq!(state, json!({"test": "state"})); let response: FetchResult = - serde_json::from_str(returned[0].parameters.as_ref().unwrap()).unwrap(); + serde_json::from_slice(returned[0].parameters.as_ref().unwrap()).unwrap(); let FetchResult::Success { response } = response else { panic!("Expected success response"); }; + let body = String::from_utf8(returned[0].blob.clone().unwrap()).unwrap(); + assert_eq!(response.status, 200); - assert_eq!(response.body, "Hello, world!"); + assert_eq!(body, "Hello, world!"); mock.assert_hits(1); } diff --git a/rust/cyclotron-fetch/tests/utils.rs b/rust/cyclotron-fetch/tests/utils.rs index 487fef3d6873c..6041a491d3f9b 100644 --- a/rust/cyclotron-fetch/tests/utils.rs +++ b/rust/cyclotron-fetch/tests/utils.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use chrono::{Duration, Utc}; -use cyclotron_core::{Job, JobInit, QueueError, Worker}; +use cyclotron_core::{Bytes, Job, JobInit, QueueError, Worker}; use cyclotron_fetch::{ config::AppConfig, context::AppContext, @@ -55,13 +55,12 @@ pub fn construct_params(url: String, method: HttpMethod) -> FetchParameters { method, return_queue: RETURN_QUEUE.to_string(), headers: None, - body: None, max_tries: None, on_finish: None, } } -pub fn construct_job(parameters: FetchParameters) -> JobInit { +pub fn construct_job(parameters: FetchParameters, body: Option) -> JobInit { JobInit { team_id: 1, queue_name: FETCH_QUEUE.to_string(), @@ -69,7 +68,8 @@ pub fn construct_job(parameters: FetchParameters) -> JobInit { scheduled: Utc::now() - Duration::seconds(1), function_id: None, vm_state: None, - parameters: Some(serde_json::to_string(¶meters).unwrap()), + parameters: Some(serde_json::to_vec(¶meters).unwrap()), + blob: body, metadata: None, } } diff --git a/rust/cyclotron-janitor/tests/janitor.rs b/rust/cyclotron-janitor/tests/janitor.rs index 494b9431c228a..32846d7f8c647 100644 --- a/rust/cyclotron-janitor/tests/janitor.rs +++ b/rust/cyclotron-janitor/tests/janitor.rs @@ -38,6 +38,7 @@ async fn janitor_test(db: PgPool) { function_id: Some(Uuid::now_v7()), vm_state: None, parameters: None, + blob: None, metadata: None, }; diff --git a/rust/cyclotron-node/examples/basic.js b/rust/cyclotron-node/examples/basic.js index 270f0e5385a87..a625dd51ca851 100644 --- a/rust/cyclotron-node/examples/basic.js +++ b/rust/cyclotron-node/examples/basic.js @@ -44,6 +44,7 @@ async function main() { function_id: crypto.randomUUID(), // Is nullable vm_state: null, parameters: null, + blob: null, metadata: null, } @@ -55,6 +56,7 @@ async function main() { function_id: crypto.randomUUID(), // Is nullable vm_state: null, parameters: null, + blob: null, metadata: null, } diff --git a/rust/cyclotron-node/src/index.ts b/rust/cyclotron-node/src/index.ts index 5f4c38e74545e..fb8dd659d80c3 100644 --- a/rust/cyclotron-node/src/index.ts +++ b/rust/cyclotron-node/src/index.ts @@ -37,6 +37,7 @@ export interface JobInit { scheduled?: Date vmState?: string parameters?: string + blob?: Uint8Array metadata?: string } @@ -71,29 +72,10 @@ export interface Job { vmState: string | null metadata: string | null parameters: string | null + blob: Uint8Array | null } -// Type as returned by Cyclotron. -interface InternalJob { - id: string - team_id: number - function_id: string | null - created: string - lock_id: string | null - last_heartbeat: string | null - janitor_touch_count: number - transition_count: number - last_transition: string - queue_name: string - state: JobState - priority: number - scheduled: string - vm_state: string | null - metadata: string | null - parameters: string | null -} - -async function initWorker(poolConfig: PoolConfig): Promise { +export async function initWorker(poolConfig: PoolConfig): Promise { const initWorkerInternal: InternalPoolConfig = { db_url: poolConfig.dbUrl, max_connections: poolConfig.maxConnections, @@ -105,7 +87,7 @@ async function initWorker(poolConfig: PoolConfig): Promise { return await cyclotron.initWorker(JSON.stringify(initWorkerInternal)) } -async function initManager(managerConfig: ManagerConfig): Promise { +export async function initManager(managerConfig: ManagerConfig): Promise { const managerConfigInternal: InternalManagerConfig = { shards: managerConfig.shards.map((shard) => ({ db_url: shard.dbUrl, @@ -119,7 +101,7 @@ async function initManager(managerConfig: ManagerConfig): Promise { return await cyclotron.initManager(JSON.stringify(managerConfigInternal)) } -async function maybeInitWorker(poolConfig: PoolConfig): Promise { +export async function maybeInitWorker(poolConfig: PoolConfig): Promise { const initWorkerInternal: InternalPoolConfig = { db_url: poolConfig.dbUrl, max_connections: poolConfig.maxConnections, @@ -131,7 +113,7 @@ async function maybeInitWorker(poolConfig: PoolConfig): Promise { return await cyclotron.maybeInitWorker(JSON.stringify(initWorkerInternal)) } -async function maybeInitManager(managerConfig: ManagerConfig): Promise { +export async function maybeInitManager(managerConfig: ManagerConfig): Promise { const managerConfigInternal: InternalManagerConfig = { shards: managerConfig.shards.map((shard) => ({ db_url: shard.dbUrl, @@ -159,62 +141,40 @@ export async function createJob(job: JobInit): Promise { parameters: job.parameters, metadata: job.metadata, } - return await cyclotron.createJob(JSON.stringify(jobInitInternal)) -} -function convertInternalJobToJob(jobInternal: InternalJob): Job { - return { - id: jobInternal.id, - teamId: jobInternal.team_id, - functionId: jobInternal.function_id, - created: new Date(jobInternal.created), - lockId: jobInternal.lock_id, - lastHeartbeat: jobInternal.last_heartbeat ? new Date(jobInternal.last_heartbeat) : null, - janitorTouchCount: jobInternal.janitor_touch_count, - transitionCount: jobInternal.transition_count, - lastTransition: new Date(jobInternal.last_transition), - queueName: jobInternal.queue_name, - state: jobInternal.state, - priority: jobInternal.priority, - scheduled: new Date(jobInternal.scheduled), - vmState: jobInternal.vm_state, - metadata: jobInternal.metadata, - parameters: jobInternal.parameters, - } + const json = JSON.stringify(jobInitInternal) + return await cyclotron.createJob(json, job.blob ? job.blob.buffer : undefined) } -async function dequeueJobs(queueName: string, limit: number): Promise { - const jobsStr = await cyclotron.dequeueJobs(queueName, limit) - const jobs: InternalJob[] = JSON.parse(jobsStr) - return jobs.map(convertInternalJobToJob) +export async function dequeueJobs(queueName: string, limit: number): Promise { + return await cyclotron.dequeueJobs(queueName, limit) } -async function dequeueJobsWithVmState(queueName: string, limit: number): Promise { - const jobsStr = await cyclotron.dequeueJobsWithVmState(queueName, limit) - const jobs: InternalJob[] = JSON.parse(jobsStr) - return jobs.map(convertInternalJobToJob) + +export async function dequeueJobsWithVmState(queueName: string, limit: number): Promise { + return await cyclotron.dequeueJobsWithVmState(queueName, limit) } -async function flushJob(jobId: string): Promise { +export async function flushJob(jobId: string): Promise { return await cyclotron.flushJob(jobId) } -function setState(jobId: string, jobState: JobState): Promise { +export function setState(jobId: string, jobState: JobState): Promise { return cyclotron.setState(jobId, jobState) } -function setQueue(jobId: string, queueName: string): Promise { +export function setQueue(jobId: string, queueName: string): Promise { return cyclotron.setQueue(jobId, queueName) } -function setPriority(jobId: string, priority: number): Promise { +export function setPriority(jobId: string, priority: number): Promise { return cyclotron.setPriority(jobId, priority) } -function setScheduledAt(jobId: string, scheduledAt: Date): Promise { +export function setScheduledAt(jobId: string, scheduledAt: Date): Promise { return cyclotron.setScheduledAt(jobId, scheduledAt.toISOString()) } -function serializeObject(name: string, obj: Record | null): string | null { +export function serializeObject(name: string, obj: Record | null): string | null { if (obj === null) { return null } else if (typeof obj === 'object' && obj !== null) { @@ -223,21 +183,25 @@ function serializeObject(name: string, obj: Record | null): string throw new Error(`${name} must be either an object or null`) } -function setVmState(jobId: string, vmState: Record | null): Promise { +export function setVmState(jobId: string, vmState: Record | null): Promise { const serialized = serializeObject('vmState', vmState) return cyclotron.setVmState(jobId, serialized) } -function setMetadata(jobId: string, metadata: Record | null): Promise { +export function setMetadata(jobId: string, metadata: Record | null): Promise { const serialized = serializeObject('metadata', metadata) return cyclotron.setMetadata(jobId, serialized) } -function setParameters(jobId: string, parameters: Record | null): Promise { +export function setParameters(jobId: string, parameters: Record | null): Promise { const serialized = serializeObject('parameters', parameters) return cyclotron.setParameters(jobId, serialized) } +export function setBlob(jobId: string, blob: Uint8Array | null): Promise { + return cyclotron.setBlob(jobId, blob) +} + export default { initWorker, initManager, @@ -254,4 +218,5 @@ export default { setVmState, setMetadata, setParameters, + setBlob, } diff --git a/rust/cyclotron-node/src/lib.rs b/rust/cyclotron-node/src/lib.rs index b231317a14e16..a9071b96de856 100644 --- a/rust/cyclotron-node/src/lib.rs +++ b/rust/cyclotron-node/src/lib.rs @@ -1,14 +1,19 @@ use chrono::{DateTime, Utc}; -use cyclotron_core::{JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker}; +use cyclotron_core::{Job, JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker}; use neon::{ handle::Handle, - prelude::{Context, FunctionContext, ModuleContext}, + object::Object, + prelude::{Context, FunctionContext, ModuleContext, TaskContext}, result::{JsResult, NeonResult}, - types::{JsNull, JsNumber, JsPromise, JsString, JsValue}, + types::{ + buffer::TypedArray, JsArray, JsArrayBuffer, JsNull, JsNumber, JsObject, JsPromise, + JsString, JsUint8Array, JsUndefined, JsValue, + }, }; use once_cell::sync::OnceCell; use serde::de::DeserializeOwned; +use serde::Deserialize; use serde_json::Value; use tokio::runtime::Runtime; use uuid::Uuid; @@ -140,9 +145,45 @@ where cx.throw_error(msg) } +#[derive(Debug, Deserialize)] +pub struct JsJob { + pub team_id: i32, + pub queue_name: String, + pub priority: i16, + pub scheduled: DateTime, + pub function_id: Option, + pub vm_state: Option, + pub parameters: Option, + pub metadata: Option, +} + fn create_job(mut cx: FunctionContext) -> JsResult { let arg1: Handle = cx.argument::(0)?; - let job: JobInit = from_json_string(&mut cx, arg1)?; + + let blob = cx.argument::(1)?; + let blob = if blob.is_a::(&mut cx) || blob.is_a::(&mut cx) { + None + } else { + Some( + blob.downcast_or_throw::(&mut cx)? + .as_slice(&cx) + .to_vec(), + ) + }; + + let js_job: JsJob = from_json_string(&mut cx, arg1)?; + + let job = JobInit { + team_id: js_job.team_id, + queue_name: js_job.queue_name, + priority: js_job.priority, + scheduled: js_job.scheduled, + function_id: js_job.function_id, + vm_state: js_job.vm_state.map(|s| s.into_bytes()), + parameters: js_job.parameters.map(|s| s.into_bytes()), + metadata: js_job.metadata.map(|s| s.into_bytes()), + blob, + }; let (deferred, promise) = cx.promise(); let channel = cx.channel(); @@ -192,8 +233,8 @@ fn dequeue_jobs(mut cx: FunctionContext) -> JsResult { let jobs = worker.dequeue_jobs(&queue_name, limit).await; deferred.settle_with(&channel, move |mut cx| { let jobs = jobs.or_else(|e| cx.throw_error(format!("{}", e)))?; - let jobs = to_json_string(&mut cx, jobs)?; - Ok(cx.string(jobs)) + let jobs = jobs_to_js_array(&mut cx, jobs)?; + Ok(jobs) }); }; @@ -224,8 +265,8 @@ fn dequeue_with_vm_state(mut cx: FunctionContext) -> JsResult { let jobs = worker.dequeue_with_vm_state(&queue_name, limit).await; deferred.settle_with(&channel, move |mut cx| { let jobs = jobs.or_else(|e| cx.throw_error(format!("{}", e)))?; - let jobs = to_json_string(&mut cx, jobs)?; - Ok(cx.string(jobs)) + let jobs = jobs_to_js_array(&mut cx, jobs)?; + Ok(jobs) }); }; @@ -349,15 +390,17 @@ fn set_vm_state(mut cx: FunctionContext) -> JsResult { // Tricky - we have to support passing nulls here, because that's how you clear vm state. let vm_state = cx.argument::(1)?; - let vm_state = if vm_state.is_a::(&mut cx) { - None - } else { - Some( - vm_state - .downcast_or_throw::(&mut cx)? - .value(&mut cx), - ) - }; + let vm_state = + if vm_state.is_a::(&mut cx) || vm_state.is_a::(&mut cx) { + None + } else { + Some( + vm_state + .downcast_or_throw::(&mut cx)? + .value(&mut cx) + .into_bytes(), + ) + }; WORKER .get() @@ -376,15 +419,17 @@ fn set_metadata(mut cx: FunctionContext) -> JsResult { // Tricky - we have to support passing nulls here, because that's how you clear metadata. let metadata = cx.argument::(1)?; - let metadata = if metadata.is_a::(&mut cx) { - None - } else { - Some( - metadata - .downcast_or_throw::(&mut cx)? - .value(&mut cx), - ) - }; + let metadata = + if metadata.is_a::(&mut cx) || metadata.is_a::(&mut cx) { + None + } else { + Some( + metadata + .downcast_or_throw::(&mut cx)? + .value(&mut cx) + .into_bytes(), + ) + }; WORKER .get() @@ -403,15 +448,17 @@ fn set_parameters(mut cx: FunctionContext) -> JsResult { // Tricky - we have to support passing nulls here, because that's how you clear parameters. let parameters = cx.argument::(1)?; - let parameters = if parameters.is_a::(&mut cx) { - None - } else { - Some( - parameters - .downcast_or_throw::(&mut cx)? - .value(&mut cx), - ) - }; + let parameters = + if parameters.is_a::(&mut cx) || parameters.is_a::(&mut cx) { + None + } else { + Some( + parameters + .downcast_or_throw::(&mut cx)? + .value(&mut cx) + .into_bytes(), + ) + }; WORKER .get() @@ -422,6 +469,144 @@ fn set_parameters(mut cx: FunctionContext) -> JsResult { Ok(cx.null()) } +fn set_blob(mut cx: FunctionContext) -> JsResult { + let arg = cx.argument::(0)?.value(&mut cx); + let job_id: Uuid = arg + .parse() + .or_else(|_| cx.throw_error(format!("invalid job id: {}", arg)))?; + + // Tricky - we have to support passing nulls here, because that's how you clear the blob. + let blob = cx.argument::(1)?; + let blob: Option> = + if blob.is_a::(&mut cx) || blob.is_a::(&mut cx) { + None + } else { + Some( + blob.downcast_or_throw::(&mut cx)? + .as_slice(&cx) + .to_vec(), + ) + }; + + WORKER + .get() + .map_or_else(|| cx.throw_error("worker not initialized"), Ok)? + .set_blob(job_id, blob) + .or_else(|e| cx.throw_error(format!("{}", e)))?; + + Ok(cx.null()) +} + +fn jobs_to_js_array<'a>(cx: &mut TaskContext<'a>, jobs: Vec) -> JsResult<'a, JsArray> { + let js_array = JsArray::new(cx, jobs.len()); + + for (i, job) in jobs.into_iter().enumerate() { + let js_obj = JsObject::new(cx); + let null = cx.null(); + + let id_string = job.id.to_string(); + let js_id = cx.string(id_string); + js_obj.set(cx, "id", js_id)?; + + let team_id = cx.number(job.team_id as f64); + js_obj.set(cx, "teamId", team_id)?; + + if let Some(function_id) = job.function_id { + let function_id_string = function_id.to_string(); + let js_function_id = cx.string(function_id_string); + js_obj.set(cx, "functionId", js_function_id)?; + } else { + js_obj.set(cx, "functionId", null)?; + } + + let js_created = cx + .date(job.created.timestamp_millis() as f64) + .expect("failed to create date"); + js_obj.set(cx, "created", js_created)?; + + if let Some(lock_id) = job.lock_id { + let lock_id_string = lock_id.to_string(); + let js_lock_id = cx.string(lock_id_string); + js_obj.set(cx, "lockId", js_lock_id)?; + } else { + js_obj.set(cx, "lockId", null)?; + } + + if let Some(last_heartbeat) = job.last_heartbeat { + let js_last_heartbeat = cx.string(last_heartbeat.to_rfc3339()); + js_obj.set(cx, "lastHeartbeat", js_last_heartbeat)?; + } else { + js_obj.set(cx, "lastHeartbeat", null)?; + } + + let janitor_touch_count = cx.number(job.janitor_touch_count as f64); + js_obj.set(cx, "janitorTouchCount", janitor_touch_count)?; + let transition_count = cx.number(job.transition_count as f64); + js_obj.set(cx, "transitionCount", transition_count)?; + + let js_last_transition = cx.string(job.last_transition.to_rfc3339()); + js_obj.set(cx, "lastTransition", js_last_transition)?; + + let js_queue_name = cx.string(&job.queue_name); + js_obj.set(cx, "queueName", js_queue_name)?; + + let js_state = cx.string(format!("{:?}", job.state)); + js_obj.set(cx, "state", js_state)?; + + let priority = cx.number(job.priority as f64); + js_obj.set(cx, "priority", priority)?; + + let js_scheduled = cx.string(job.scheduled.to_rfc3339()); + js_obj.set(cx, "scheduled", js_scheduled)?; + + if let Some(vm_state) = job.vm_state { + let vm_state = match std::str::from_utf8(&vm_state) { + Ok(v) => v, + Err(e) => panic!("Invalid UTF-8 sequence in vm_state: {}", e), + }; + let js_vm_state = cx.string(vm_state); + js_obj.set(cx, "vmState", js_vm_state)?; + } else { + js_obj.set(cx, "vmState", null)?; + } + + if let Some(metadata) = job.metadata { + let metadata = match std::str::from_utf8(&metadata) { + Ok(v) => v, + Err(e) => panic!("Invalid UTF-8 sequence in metadata: {}", e), + }; + let js_metadata = cx.string(metadata); + js_obj.set(cx, "metadata", js_metadata)?; + } else { + js_obj.set(cx, "metadata", null)?; + } + + if let Some(parameters) = job.parameters { + let parameters = match std::str::from_utf8(¶meters) { + Ok(v) => v, + Err(e) => panic!("Invalid UTF-8 sequence in parameters: {}", e), + }; + let js_parameters = cx.string(parameters); + js_obj.set(cx, "parameters", js_parameters)?; + } else { + js_obj.set(cx, "parameters", null)?; + } + + if let Some(blob) = job.blob { + let mut js_blob = JsArrayBuffer::new(cx, blob.len())?; + let js_blob_slice = js_blob.as_mut_slice(cx); + js_blob_slice.copy_from_slice(&blob); + js_obj.set(cx, "blob", js_blob)?; + } else { + js_obj.set(cx, "blob", null)?; + } + + js_array.set(cx, i as u32, js_obj)?; + } + + Ok(js_array) +} + #[neon::main] fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("hello", hello)?; @@ -440,6 +625,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("setVmState", set_vm_state)?; cx.export_function("setMetadata", set_metadata)?; cx.export_function("setParameters", set_parameters)?; + cx.export_function("setBlob", set_blob)?; Ok(()) }