Skip to content

Commit

Permalink
feat(sike): pipe created ids back up to node (#25926)
Browse files Browse the repository at this point in the history
Co-authored-by: Ben White <[email protected]>
  • Loading branch information
oliverb123 and benjackwhite authored Oct 31, 2024
1 parent a2818cb commit f5cf024
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 25 deletions.
2 changes: 1 addition & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,6 @@
},
"cyclotron": {
"//This is a short term workaround to ensure that cyclotron changes trigger a rebuild": true,
"version": "0.1.6"
"version": "0.1.7"
}
}
17 changes: 9 additions & 8 deletions rust/cyclotron-core/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::sync::atomic::AtomicUsize;
use chrono::{DateTime, Duration, Utc};
use sqlx::PgPool;
use tokio::sync::RwLock;
use uuid::Uuid;

use crate::{
config::{DEFAULT_QUEUE_DEPTH_LIMIT, DEFAULT_SHARD_HEALTH_CHECK_INTERVAL},
Expand Down Expand Up @@ -59,7 +60,7 @@ impl QueueManager {
}
}

pub async fn create_job(&self, init: JobInit) -> Result<(), QueueError> {
pub async fn create_job(&self, init: JobInit) -> Result<Uuid, QueueError> {
// TODO - here is where a lot of shard health and failover logic will go, eventually.
let next = self
.next_shard
Expand All @@ -73,7 +74,7 @@ impl QueueManager {
&self,
init: JobInit,
timeout: Option<Duration>,
) -> Result<(), QueueError> {
) -> Result<Uuid, QueueError> {
let next = self
.next_shard
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -82,7 +83,7 @@ impl QueueManager {
shard.create_job_blocking(init, timeout).await
}

pub async fn bulk_create_jobs(&self, inits: Vec<JobInit>) -> Result<(), QueueError> {
pub async fn bulk_create_jobs(&self, inits: Vec<JobInit>) -> Result<Vec<Uuid>, QueueError> {
let next = self
.next_shard
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -94,7 +95,7 @@ impl QueueManager {
&self,
inits: Vec<JobInit>,
timeout: Option<Duration>,
) -> Result<(), QueueError> {
) -> Result<Vec<Uuid>, QueueError> {
let next = self
.next_shard
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -116,15 +117,15 @@ impl Shard {
}

// Inserts a job, failing if the shard is at capacity
pub async fn create_job(&self, init: JobInit) -> Result<(), QueueError> {
pub async fn create_job(&self, init: JobInit) -> Result<Uuid, QueueError> {
self.insert_guard().await?;
create_job(&self.pool, init).await
}

// Inserts a vec of jobs, failing if the shard is at capacity. Note "capacity" here just
// means "it isn't totally full" - if there's "capacity" for 1 job, and this is a vec of
// 1000, we still insert all 1000.
pub async fn bulk_create_jobs(&self, inits: &[JobInit]) -> Result<(), QueueError> {
pub async fn bulk_create_jobs(&self, inits: &[JobInit]) -> Result<Vec<Uuid>, QueueError> {
self.insert_guard().await?;
bulk_create_jobs(&self.pool, inits).await
}
Expand All @@ -134,7 +135,7 @@ impl Shard {
&self,
init: JobInit,
timeout: Option<Duration>,
) -> Result<(), QueueError> {
) -> Result<Uuid, QueueError> {
let start = Utc::now();
while self.is_full().await? {
tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await;
Expand All @@ -153,7 +154,7 @@ impl Shard {
&self,
inits: &[JobInit],
timeout: Option<Duration>,
) -> Result<(), QueueError> {
) -> Result<Vec<Uuid>, QueueError> {
let start = Utc::now();
while self.is_full().await? {
tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await;
Expand Down
10 changes: 5 additions & 5 deletions rust/cyclotron-core/src/ops/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
types::{JobInit, JobState},
};

pub async fn create_job<'c, E>(executor: E, data: JobInit) -> Result<(), QueueError>
pub async fn create_job<'c, E>(executor: E, data: JobInit) -> Result<Uuid, QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Expand Down Expand Up @@ -51,10 +51,10 @@ VALUES
.execute(executor)
.await?;

Ok(())
Ok(id)
}

pub async fn bulk_create_jobs<'c, E>(executor: E, jobs: &[JobInit]) -> Result<(), QueueError>
pub async fn bulk_create_jobs<'c, E>(executor: E, jobs: &[JobInit]) -> Result<Vec<Uuid>, QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
Expand Down Expand Up @@ -143,7 +143,7 @@ FROM UNNEST(
)
"#,
)
.bind(ids)
.bind(&ids)
.bind(team_ids)
.bind(function_ids)
.bind(created_at)
Expand All @@ -163,5 +163,5 @@ FROM UNNEST(
.execute(executor)
.await?;

Ok(())
Ok(ids)
}
2 changes: 1 addition & 1 deletion rust/cyclotron-node/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@posthog/cyclotron",
"version": "0.1.6",
"version": "0.1.7",
"description": "Node bindings for cyclotron",
"main": "dist/index.js",
"types": "dist/index.d.ts",
Expand Down
15 changes: 10 additions & 5 deletions rust/cyclotron-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ fn create_job(mut cx: FunctionContext) -> JsResult<JsPromise> {
return;
}
};
let job = manager.create_job(job).await;
let res = manager.create_job(job).await;
deferred.settle_with(&channel, move |mut cx| {
job.or_else(|e| cx.throw_error(format!("{}", e)))?;
Ok(cx.null())
let id = res.or_else(|e| cx.throw_error(format!("{}", e)))?;
Ok(cx.string(id.to_string()))
});
};

Expand Down Expand Up @@ -273,8 +273,13 @@ fn bulk_create_jobs(mut cx: FunctionContext) -> JsResult<JsPromise> {

let res = manager.bulk_create_jobs(jobs).await;
deferred.settle_with(&channel, move |mut cx| {
res.or_else(|e| cx.throw_error(format!("{}", e)))?;
Ok(cx.null())
let ids = res.or_else(|e| cx.throw_error(format!("{}", e)))?;
let returned = JsArray::new(&mut cx, ids.len());
for (i, id) in ids.iter().enumerate() {
let id = cx.string(id.to_string());
returned.set(&mut cx, i as u32, id)?;
}
Ok(returned)
});
};

Expand Down
10 changes: 5 additions & 5 deletions rust/cyclotron-node/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { convertToInternalPoolConfig, serializeObject } from './helpers'
import { CyclotronJobInit, CyclotronPoolConfig } from './types'

export class CyclotronManager {
constructor(private config: { shards: CyclotronPoolConfig[], shardDepthLimit: number }) {
constructor(private config: { shards: CyclotronPoolConfig[]; shardDepthLimit: number }) {
this.config = config
}

Expand All @@ -18,7 +18,7 @@ export class CyclotronManager {
)
}

async createJob(job: CyclotronJobInit): Promise<void> {
async createJob(job: CyclotronJobInit): Promise<string> {
job.priority ??= 1
job.scheduled ??= new Date()

Expand All @@ -38,7 +38,7 @@ export class CyclotronManager {
return await cyclotron.createJob(json, job.blob ? job.blob : undefined)
}

async bulkCreateJobs(jobs: CyclotronJobInit[]): Promise<void> {
async bulkCreateJobs(jobs: CyclotronJobInit[]): Promise<string[]> {
const jobInitsInternal = jobs.map((job) => {
job.priority ??= 1
job.scheduled ??= new Date()
Expand All @@ -63,9 +63,9 @@ export class CyclotronManager {
const blobs = new Uint8Array(totalBytes)
const blobLengths = new Uint32Array(jobs.length)

let offset = 0;
let offset = 0
for (let i = 0; i < jobs.length; i++) {
let blob = jobs[i].blob
const blob = jobs[i].blob
if (blob) {
blobLengths[i] = blob.byteLength
blobs.set(blob, offset)
Expand Down

0 comments on commit f5cf024

Please sign in to comment.