Skip to content

Commit

Permalink
Re-enable disabled Podbox features (#2180)
Browse files Browse the repository at this point in the history
Re-enables features turned off during Devcon:

- CSV pipeline issuance
- CSV ticket pipeline issuance
- Pretix pipeline consumer DB and Semaphore groups
- Lemonade pipeline consumer DB and Semaphore groups
  • Loading branch information
robknight authored Dec 2, 2024
1 parent 161e1ac commit bddb8c2
Show file tree
Hide file tree
Showing 10 changed files with 1,032 additions and 1,001 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,6 @@ export class CSVPipeline implements BasePipeline {

private async issue(req: PollFeedRequest): Promise<PollFeedResponseValue> {
return traced(LOG_NAME, "issue", async (span) => {
if (Math.random() < 1000) {
return {
actions: []
};
}

logger(LOG_TAG, `issue`, req);
tracePipeline(this.definition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,6 @@ export class CSVTicketPipeline implements BasePipeline {

private async issue(req: PollFeedRequest): Promise<PollFeedResponseValue> {
return traced(LOG_NAME, "issue", async (span) => {
if (Math.random() < 1000) {
return {
actions: []
};
}

logger(LOG_TAG, `issue`, req);
tracePipeline(this.definition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,7 @@ export class LemonadePipeline implements BasePipeline {
);

if ((this.definition.options.semaphoreGroups ?? []).length > 0) {
await sqlQueryWithPool(this.context.dbPool, (client) =>
this.triggerSemaphoreGroupUpdate(client)
);
this.triggerSemaphoreGroupUpdate();
}

return {
Expand Down Expand Up @@ -542,7 +540,7 @@ export class LemonadePipeline implements BasePipeline {
* Marked as public so that it can be called from tests, but otherwise should
* not be called from outside the class.
*/
public async triggerSemaphoreGroupUpdate(client: PoolClient): Promise<void> {
public async triggerSemaphoreGroupUpdate(): Promise<void> {
return traced(LOG_NAME, "triggerSemaphoreGroupUpdate", async (_span) => {
tracePipeline(this.definition);
// Whenever an update is triggered, we want to make sure that the
Expand All @@ -555,9 +553,15 @@ export class LemonadePipeline implements BasePipeline {
// queuing update requests.
// By returning this promise, we allow the caller to await on the update
// having been processed.

return this.semaphoreUpdateQueue.add(async () => {
const data = await this.semaphoreGroupData();
await this.semaphoreGroupProvider?.update(client, data);
await sqlQueryWithPool(this.context.dbPool, async (client) => {
if (!this.semaphoreGroupProvider) {
return;
}
return this.semaphoreGroupProvider.update(client, data);
});
});
});
}
Expand Down Expand Up @@ -893,28 +897,30 @@ export class LemonadePipeline implements BasePipeline {
span?.setAttribute("emails", emails.map((e) => e.email).join(","));
span?.setAttribute("semaphore_id", semaphoreId);

// let didUpdate = false;
// for (const email of emails) {
// // Consumer is validated, so save them in the consumer list
// didUpdate =
// didUpdate ||
// (await this.consumerDB.save(
// client,
// this.id,
// email.email,
// semaphoreId,
// new Date()
// ));
// }

// if ((this.definition.options.semaphoreGroups ?? []).length > 0) {
// // If the user's Semaphore commitment has changed, `didUpdate` will be
// // true, and we need to update the Semaphore groups
// if (didUpdate) {
// span?.setAttribute("semaphore_groups_updated", true);
// await this.triggerSemaphoreGroupUpdate(client);
// }
// }
let didUpdate = false;
await sqlQueryWithPool(this.context.dbPool, async (client) => {
for (const email of emails) {
// Consumer is validated, so save them in the consumer list
didUpdate =
didUpdate ||
(await this.consumerDB.save(
client,
this.id,
email.email,
semaphoreId,
new Date()
));
}
});

if ((this.definition.options.semaphoreGroups ?? []).length > 0) {
// If the user's Semaphore commitment has changed, `didUpdate` will be
// true, and we need to update the Semaphore groups
if (didUpdate) {
span?.setAttribute("semaphore_groups_updated", true);
await this.triggerSemaphoreGroupUpdate();
}
}

const tickets = await sqlQueryWithPool(
this.context.dbPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1179,105 +1179,124 @@ export class PretixPipeline implements BasePipeline {
span?.setAttribute("email", emails.map((e) => e.email).join(","));
span?.setAttribute("semaphore_id", semaphoreId);

//let didUpdate = false;

return sqlQueryWithPool(this.context.dbPool, async (client) => {
// for (const e of emails) {
// didUpdate =
// didUpdate ||
// (await this.consumerDB.save(
// client,
// this.id,
// e.email,
// semaphoreId,
// new Date()
// ));
// }

// const provider = this.autoIssuanceProvider;
// if (provider) {
// const newManualTickets = (
// await Promise.all(
// emails.map(async (e) =>
// provider.maybeIssueForUser(
// e.email,
// await this.getAllManualTickets(client),
// await this.db.loadByEmail(this.id, e.email)
// )
// )
// )
// ).flat();

// await Promise.allSettled(
// newManualTickets.map((t) =>
// this.manualTicketDB.save(client, this.id, t)
// )
// );
// }
await sqlQueryWithPool(this.context.dbPool, async (client) => {
let didUpdate = false;
for (const e of emails) {
didUpdate =
didUpdate ||
(await this.consumerDB.save(
client,
this.id,
e.email,
semaphoreId,
new Date()
));
}

// If the user's Semaphore commitment has changed, `didUpdate` will be
// true, and we need to update the Semaphore groups
// if ((this.definition.options.semaphoreGroups ?? []).length > 0) {
// if (didUpdate) {
// span?.setAttribute("semaphore_groups_updated", true);
// await this.triggerSemaphoreGroupUpdate(client);
// }
// }

const tickets = (
if ((this.definition.options.semaphoreGroups ?? []).length > 0) {
if (didUpdate) {
span?.setAttribute("semaphore_groups_updated", true);
await this.triggerSemaphoreGroupUpdate(client);
}
}
});

const provider = this.autoIssuanceProvider;
if (provider) {
const allManualTickets = await sqlQueryWithPool(
this.context.dbPool,
async (client) => await this.getAllManualTickets(client)
);
const newManualTickets = (
await Promise.all(
emails.map((e) =>
this.getEdDSATicketsForEmail(client, e.email, semaphoreId)
emails.map(async (e) =>
provider.maybeIssueForUser(
e.email,
allManualTickets,
await this.db.loadByEmail(this.id, e.email)
)
)
)
).flat();

span?.setAttribute("pcds_issued", tickets.length);

const actions: PCDAction[] = [];

if (await this.db.hasLoaded(this.id)) {
actions.push({
type: PCDActionType.DeleteFolder,
folder: this.definition.options.feedOptions.feedFolder,
recursive: true
});
}

const ticketPCDs = await Promise.all(
tickets.map((t) => EdDSATicketPCDPackage.serialize(t))
await sqlQueryWithPool(this.context.dbPool, async (client) =>
Promise.allSettled(
newManualTickets.map((t) =>
this.manualTicketDB.save(client, this.id, t)
)
)
);
}

if (this.definition.options.enablePODTickets) {
const podTickets = (
const tickets = await sqlQueryWithPool(
this.context.dbPool,
async (client) => {
return (
await Promise.all(
emails.map((e) =>
e.semaphoreV4Id
? this.getPODTicketsForEmail(client, e.email, e.semaphoreV4Id)
: undefined
this.getEdDSATicketsForEmail(client, e.email, semaphoreId)
)
)
)
.flat()
.filter((t) => t !== undefined) as PODTicketPCD[];

ticketPCDs.push(
...(await Promise.all(
podTickets.map((t) => PODTicketPCDPackage.serialize(t))
))
);
).flat();
}
);

span?.setAttribute("pcds_issued", tickets.length);

const actions: PCDAction[] = [];

if (await this.db.hasLoaded(this.id)) {
actions.push({
type: PCDActionType.ReplaceInFolder,
type: PCDActionType.DeleteFolder,
folder: this.definition.options.feedOptions.feedFolder,
pcds: ticketPCDs
recursive: true
});
}

const result: PollFeedResponseValue = { actions };
const ticketPCDs = await Promise.all(
tickets.map((t) => EdDSATicketPCDPackage.serialize(t))
);

return result;
if (this.definition.options.enablePODTickets) {
const podTickets = await sqlQueryWithPool(
this.context.dbPool,
async (client) => {
return (
await Promise.all(
emails.map((e) =>
e.semaphoreV4Id
? this.getPODTicketsForEmail(
client,
e.email,
e.semaphoreV4Id
)
: undefined
)
)
)
.flat()
.filter((t) => t !== undefined) as PODTicketPCD[];
}
);

ticketPCDs.push(
...(await Promise.all(
podTickets.map((t) => PODTicketPCDPackage.serialize(t))
))
);
}

actions.push({
type: PCDActionType.ReplaceInFolder,
folder: this.definition.options.feedOptions.feedFolder,
pcds: ticketPCDs
});

const result: PollFeedResponseValue = { actions };

return result;
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import { setupTestCSVTicketPipelineDefinition } from "./setupTestCSVTicketPipeli
/**
* Tests for {@link GenericIssuanceService}, in particular the {@link CSVTicketPipeline}.
*/
describe.skip("generic issuance - CSVTicketPipeline", function () {
describe("generic issuance - CSVTicketPipeline", function () {
const nowDate = new Date();
const now = Date.now();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import { setupTestCSVPipelineDefinition } from "./setupTestCSVPipelineDefinition
/**
* Tests for {@link GenericIssuanceService}, in particular the {@link CSVPipeline}.
*/
describe.skip("generic issuance - CSVPipeline", function () {
describe("generic issuance - CSVPipeline", function () {
const nowDate = new Date();
const now = Date.now();

Expand Down
Loading

0 comments on commit bddb8c2

Please sign in to comment.