Skip to content

Commit

Permalink
simplify pipeline retry logic (#2118)
Browse files Browse the repository at this point in the history
- now, when you edit a pipeline (e.g. by changing its name or by
unpausing), instead of loading immediately, the system waits until all
the existing pipelines either finish loading (successfully or
unsuccessfully)
- this simplifies some of the pipeline scheduling logic, which would
prevent issues [like this
one](https://linear.app/0xparc-pcd/issue/0XP-1634/investigate-podbox-error-causing-other-pipelines-not-to-load)
- pipeline loads are now capped at 10 minutes - after that the load
rejects
  • Loading branch information
ichub authored Nov 25, 2024
1 parent 18b6847 commit e55ff23
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ export class PipelineAPISubservice {
(await this.localFileService?.hasCachedLoad(pipelineId)) ?? false,
cachedBytes:
(await this.localFileService?.getCachedLoadSize(pipelineId)) ?? 0,
loading: !!pipelineSlot.loadPromise,
loading: pipelineSlot.loading,
latestAtoms,
lastLoad,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ import {
PipelineLoadSummary
} from "@pcd/passport-interface";
import { RollbarService } from "@pcd/server-shared";
import { sleep, str } from "@pcd/util";
import { str } from "@pcd/util";
import _ from "lodash";
import { PoolClient } from "postgres-pool";
import { IPipelineAtomDB } from "../../../database/queries/pipelineAtomDB";
import { IPipelineDefinitionDB } from "../../../database/queries/pipelineDefinitionDB";
import { sqlQueryWithPool } from "../../../database/sqlQuery";
import { ApplicationContext } from "../../../types";
import { logger } from "../../../util/logger";
import { isAbortError } from "../../../util/util";
import { DiscordService } from "../../discordService";
import {
LocalFileService,
Expand Down Expand Up @@ -48,7 +47,7 @@ export class PipelineExecutorSubservice {
* 3. wait until the time is at least {@link PIPELINE_REFRESH_INTERVAL_MS} milliseconds after the last load started
* 4. go back to step one
*/
private static readonly PIPELINE_REFRESH_INTERVAL_MS = 60_000;
private static readonly PIPELINE_REFRESH_INTERVAL_MS = 60_000; // 1 minute

/**
* Podbox maintains an instance of a {@link PipelineSlot} for each pipeline
Expand Down Expand Up @@ -188,7 +187,8 @@ export class PipelineExecutorSubservice {
owner: await this.userSubservice.getUserById(
client,
definition.ownerUserId
)
),
loading: false
};
this.pipelineSlots.push(pipelineSlot);
} else {
Expand All @@ -201,13 +201,10 @@ export class PipelineExecutorSubservice {
tracePipeline(pipelineSlot.definition);
traceUser(pipelineSlot.owner);

const existingInstance = pipelineSlot.instance;
const stopPipeline = async (): Promise<void> => {
if (existingInstance && !existingInstance.isStopped()) {
span?.setAttribute("stopping", true);
await existingInstance.stop();
}
};
if (pipelineSlot.instance && !pipelineSlot.instance.isStopped()) {
span?.setAttribute("stopping", true);
await pipelineSlot.instance.stop();
}

pipelineSlot.instance = await instantiatePipeline(
this.context,
Expand All @@ -218,9 +215,9 @@ export class PipelineExecutorSubservice {
pipelineSlot.definition = definition;

if (dontLoad !== true) {
await this.performPipelineLoad(pipelineSlot, stopPipeline);
} else {
await stopPipeline();
this.performPipelineLoad(pipelineSlot).catch((e) => {
logger(LOG_TAG, "failed to perform pipeline load", e);
});
}
});
}
Expand Down Expand Up @@ -251,6 +248,7 @@ export class PipelineExecutorSubservice {
this.pipelineSlots.map(async (entry) => {
if (entry.instance && !entry.instance.isStopped()) {
await entry.instance.stop();
entry.loading = false;
}
})
);
Expand All @@ -266,7 +264,8 @@ export class PipelineExecutorSubservice {
owner: await this.userSubservice.getUserById(
client,
pipelineDefinition.ownerUserId
)
),
loading: false
});

// attempt to instantiate a {@link Pipeline}
Expand Down Expand Up @@ -310,8 +309,7 @@ export class PipelineExecutorSubservice {
* If load result caching is enabled globally and for this pipeline, takes care of that too.
*/
public async performPipelineLoad(
pipelineSlot: PipelineSlot,
loadStarted?: () => Promise<void>
pipelineSlot: PipelineSlot
): Promise<PipelineLoadSummary> {
return traced<PipelineLoadSummary>(
SERVICE_NAME,
Expand Down Expand Up @@ -360,8 +358,7 @@ export class PipelineExecutorSubservice {
this.userSubservice,
this.discordService,
this.pagerdutyService,
this.rollbarService,
loadStarted
this.rollbarService
);

if (cachedLoadFromDisk) {
Expand Down Expand Up @@ -435,40 +432,7 @@ export class PipelineExecutorSubservice {

await Promise.allSettled(
this.pipelineSlots.map(async (slot: PipelineSlot): Promise<void> => {
try {
if (slot.loadPromise) {
await slot.loadPromise;
} else {
await this.performPipelineLoad(slot);
}
} catch (e) {
// an abort means a podbox user either deleted or edited (and thus restarted)
// this pipeline. in the case that it was deleted, `slot.loadPromise` will be
// set to `undefined`. In the case that it was edited, Podbox will restart the
// pipeline, and set `slot.loadPromise` to a new promise representing the
// pipeline's load operation.
if (isAbortError(e)) {
while (slot.loadPromise) {
try {
await slot.loadPromise;
slot.loadPromise = undefined;
break;
} catch (e) {
if (isAbortError(e)) {
await sleep(50);
}
}
}
// no load promise means we're done here
return;
}

logger(
LOG_TAG,
`failed to perform pipeline load for pipeline ${slot.definition.id}`,
e
);
}
await this.performPipelineLoad(slot);
})
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ export class PipelineSubservice {
await this.pipelineDB.deleteDefinition(client, pipelineId);
await this.pipelineDB.saveLoadSummary(pipelineId, undefined);
await this.pipelineAtomDB.clear(pipelineId);
await this.executorSubservice.restartPipeline(client, pipelineId);
await this.executorSubservice.restartPipeline(client, pipelineId, true);
await this.localFileService?.clearPipelineCache(pipelineId);
});
}
Expand All @@ -308,7 +308,7 @@ export class PipelineSubservice {
await this.localFileService?.clearPipelineCache(pipelineId);
await this.pipelineDB.saveLoadSummary(pipelineId, undefined);
await this.pipelineAtomDB.clear(pipelineId);
await this.executorSubservice.restartPipeline(client, pipelineId);
await this.executorSubservice.restartPipeline(client, pipelineId, true);
}

public async getPipelineEditHistory(
Expand Down Expand Up @@ -429,7 +429,7 @@ export class PipelineSubservice {
return {
extraInfo: {
ownerEmail: owner?.email,
loading: !!slot.loadPromise,
loading: slot.loading,
lastLoad: summary,
hasCachedLoad:
(await this.localFileService?.hasCachedLoad(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { getActiveSpan } from "@opentelemetry/api/build/src/trace/context-utils";
import { PipelineLoadSummary } from "@pcd/passport-interface";
import { RollbarService } from "@pcd/server-shared";
import { sleep } from "@pcd/util";
import { Pool } from "postgres-pool";
import { sqlQueryWithPool } from "../../../../database/sqlQuery";
import { logger } from "../../../../util/logger";
Expand All @@ -21,6 +22,7 @@ import { UserSubservice } from "../UserSubservice";
import { maybeAlertForPipelineRun } from "./maybeAlertForPipelineRun";

const LOG_TAG = `[performPipelineLoad]`;
const PIPELINE_LOAD_TIMEOUT_MS = 1000 * 60 * 10; // 10 minutes

/**
* Performs a {@link Pipeline#load} for the given {@link Pipeline}, and
Expand All @@ -34,8 +36,7 @@ export async function performPipelineLoad(
userSubservice: UserSubservice,
discordService: DiscordService | null,
pagerdutyService: PagerDutyService | null,
rollbarService: RollbarService | null,
loadStarted?: () => Promise<void>
rollbarService: RollbarService | null
): Promise<PipelineLoadSummary> {
const startTime = new Date();
const pipelineId = pipelineSlot.definition.id;
Expand Down Expand Up @@ -110,20 +111,18 @@ export async function performPipelineLoad(
`loading data for pipeline with id '${pipelineId}'` +
` of type '${pipelineSlot.definition.type}'`
);
const loadPromise = pipeline.load();
pipelineSlot.loadPromise = loadPromise;
try {
// we `.stop()` the previous instance of this pipeline
// here, so that by the time the `AbortError` is thrown/caught,
// we're ready for it with the above `pipelineSlot.loadPromise`
// which can be awaited again, until the load either succeeds
// or fails with an error other than `AbortError`.
await loadStarted?.();
} catch (e) {
logger(LOG_TAG, "failed to run loadStarted callback", e);
}
const summary = await loadPromise;
pipelineSlot.loadPromise = undefined;

pipelineSlot.loading = true;

// in case a pipeline hangs, continue after 10 minutes.
const summary = await Promise.race([
pipeline.load(),
(async (): Promise<PipelineLoadSummary> => {
await sleep(PIPELINE_LOAD_TIMEOUT_MS);
throw new Error(`TIME_OUT: ${pipeline.id}`);
})()
]);

logger(
LOG_TAG,
`successfully loaded data for pipeline with id '${pipelineId}'` +
Expand Down Expand Up @@ -170,5 +169,7 @@ export async function performPipelineLoad(
discordService
);
return summary;
} finally {
pipelineSlot.loading = false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ export async function upsertPipelineDefinition(
// so that the `AbortError` that is thrown by the `stop()` method can be
// handled properly upstream.
if (existingSlot.instance && !existingSlot.instance.isStopped()) {
existingSlot.loadPromise = undefined;
await existingSlot.instance?.stop();
existingSlot.loading = false;
}

if (validatedNewDefinition.options.disableCache) {
Expand All @@ -213,6 +213,7 @@ export async function upsertPipelineDefinition(
}
}

existingSlot.loading = true;
existingSlot.owner = await userSubservice.getUserById(
client,
validatedNewDefinition.ownerUserId
Expand All @@ -223,7 +224,8 @@ export async function upsertPipelineDefinition(
// which can take an arbitrary amount of time.
const restartPromise = executorSubservice.restartPipeline(
client,
validatedNewDefinition.id
validatedNewDefinition.id,
true
);

// To get accurate timestamps, we need to load the pipeline definition
Expand Down
7 changes: 2 additions & 5 deletions apps/passport-server/src/services/generic-issuance/types.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
import {
PipelineDefinition,
PipelineLoadSummary
} from "@pcd/passport-interface";
import { PipelineDefinition } from "@pcd/passport-interface";
import { PipelineCapability } from "./capabilities/types";
import { Pipeline, PipelineUser } from "./pipelines/types";

Expand Down Expand Up @@ -41,5 +38,5 @@ export interface PipelineSlot {
owner?: PipelineUser;
loadIncidentId?: string;
lastLoadDiscordMsgTimestamp?: Date;
loadPromise?: Promise<PipelineLoadSummary>;
loading: boolean;
}

0 comments on commit e55ff23

Please sign in to comment.