From 5ac37880e60c245e442ccabd0b35496f7b7054d1 Mon Sep 17 00:00:00 2001 From: Ivan Chub Date: Tue, 12 Nov 2024 01:54:28 -0800 Subject: [PATCH] remove transactionality from non-critical services and read-only operations (#2165) --- apps/passport-server/src/database/sqlQuery.ts | 12 + .../routing/routes/genericIssuanceRoutes.ts | 8 +- .../src/routing/routes/miscRoutes.ts | 4 +- .../src/routing/routes/semaphoreRoutes.ts | 8 +- .../src/routing/routes/telegramRoutes.ts | 68 +- .../pipelines/CSVPipeline/CSVPipeline.ts | 8 +- .../CSVTicketPipeline/CSVTicketPipeline.ts | 8 +- .../pipelines/LemonadePipeline.ts | 233 +++-- .../pipelines/PODPipeline/PODPipeline.ts | 4 +- .../pipelines/PretixPipeline.ts | 140 ++- .../subservices/PipelineAPISubservice.ts | 4 +- .../subservices/PipelineExecutorSubservice.ts | 4 +- .../src/services/issuanceService.ts | 182 ++-- .../src/services/metricsService.ts | 22 +- .../src/services/persistentCacheService.ts | 24 +- .../src/services/rateLimitService.ts | 6 +- .../src/services/telegramService.ts | 960 +++++++++--------- .../src/util/telegramHelpers.ts | 16 +- .../test/semaphore/checkSemaphore.ts | 4 +- apps/passport-server/test/user/testLogin.ts | 4 +- apps/passport-server/test/util/rateLimit.ts | 4 +- 21 files changed, 822 insertions(+), 901 deletions(-) diff --git a/apps/passport-server/src/database/sqlQuery.ts b/apps/passport-server/src/database/sqlQuery.ts index 87e3ea364a..c6c54971ad 100644 --- a/apps/passport-server/src/database/sqlQuery.ts +++ b/apps/passport-server/src/database/sqlQuery.ts @@ -63,6 +63,18 @@ export function sqlTransaction( ); } +export async function sqlQueryWithPool( + pool: Pool, + func: (client: PoolClient) => Promise +): Promise { + const client = await pool.connect(); + try { + return await func(client); + } finally { + client.release(); + } +} + /** * Executes a given function inside a transaction against the database, and * traces its performance. Retries queries that fail due to a connection error. diff --git a/apps/passport-server/src/routing/routes/genericIssuanceRoutes.ts b/apps/passport-server/src/routing/routes/genericIssuanceRoutes.ts index e9eef3c401..13a986a73c 100644 --- a/apps/passport-server/src/routing/routes/genericIssuanceRoutes.ts +++ b/apps/passport-server/src/routing/routes/genericIssuanceRoutes.ts @@ -34,7 +34,7 @@ import path from "path"; import * as QRCode from "qrcode"; import urljoin from "url-join"; import { PipelineCheckinDB } from "../../database/queries/pipelineCheckinDB"; -import { namedSqlTransaction, sqlTransaction } from "../../database/sqlQuery"; +import { namedSqlTransaction, sqlQueryWithPool } from "../../database/sqlQuery"; import { getAllGenericIssuanceHTTPQuery, getAllGenericIssuanceQuery, @@ -74,7 +74,7 @@ export function initGenericIssuanceRoutes( */ app.post("/generic-issuance/api/self", async (req, res) => { checkExistsForRoute(genericIssuanceService); - const user = await sqlTransaction(context.dbPool, (client) => + const user = await sqlQueryWithPool(context.dbPool, (client) => genericIssuanceService.authSession(client, req) ); @@ -110,7 +110,7 @@ export function initGenericIssuanceRoutes( const checkinDb = new PipelineCheckinDB(); const tickets = await pipeline.getAllTickets(); - const checkins = await sqlTransaction(context.dbPool, (client) => + const checkins = await sqlQueryWithPool(context.dbPool, (client) => checkinDb.getByPipelineId(client, pragueId) ); @@ -499,7 +499,7 @@ export function initGenericIssuanceRoutes( "/generic-issuance/api/fetch-pretix-products", async (req: express.Request, res: express.Response) => { checkExistsForRoute(genericIssuanceService); - const user = await sqlTransaction(context.dbPool, (client) => + const user = await sqlQueryWithPool(context.dbPool, (client) => genericIssuanceService.authSession(client, req) ); traceUser(user); diff --git a/apps/passport-server/src/routing/routes/miscRoutes.ts b/apps/passport-server/src/routing/routes/miscRoutes.ts index eb4869916a..3ac5db36e9 100644 --- a/apps/passport-server/src/routing/routes/miscRoutes.ts +++ b/apps/passport-server/src/routing/routes/miscRoutes.ts @@ -1,7 +1,7 @@ import { bigintToPseudonymNumber, emailToBigint } from "@pcd/util"; import express, { Request, Response } from "express"; import { kvGetByPrefix } from "../../database/queries/kv"; -import { sqlTransaction } from "../../database/sqlQuery"; +import { sqlQueryWithPool } from "../../database/sqlQuery"; import { ApplicationContext, GlobalServices } from "../../types"; import { logger } from "../../util/logger"; @@ -15,7 +15,7 @@ export function initMiscRoutes( app.get( "/misc/protocol-worlds-scoreboard", async (req: Request, res: Response) => { - const scores = (await sqlTransaction(context.dbPool, (client) => + const scores = (await sqlQueryWithPool(context.dbPool, (client) => kvGetByPrefix(client, "protocol_worlds_score:") )) as Array<{ email: string; score: number }>; diff --git a/apps/passport-server/src/routing/routes/semaphoreRoutes.ts b/apps/passport-server/src/routing/routes/semaphoreRoutes.ts index 7c3b1072d7..39ea6eaeb2 100644 --- a/apps/passport-server/src/routing/routes/semaphoreRoutes.ts +++ b/apps/passport-server/src/routing/routes/semaphoreRoutes.ts @@ -4,7 +4,7 @@ import { serializeSemaphoreGroup } from "@pcd/semaphore-group-pcd"; import express, { Request, Response } from "express"; -import { sqlTransaction } from "../../database/sqlQuery"; +import { sqlQueryWithPool } from "../../database/sqlQuery"; import { ApplicationContext, GlobalServices } from "../../types"; import { logger } from "../../util/logger"; import { checkExistsForRoute } from "../../util/util"; @@ -40,7 +40,7 @@ export function initSemaphoreRoutes( const groupId = checkUrlParam(req, "id"); const roothash = checkUrlParam(req, "root"); - const historicGroupValid = await sqlTransaction( + const historicGroupValid = await sqlQueryWithPool( context.dbPool, (client) => semaphoreService.getHistoricSemaphoreGroupValid( @@ -70,7 +70,7 @@ export function initSemaphoreRoutes( "/semaphore/historic/:id/:root", async (req: Request, res: Response) => { checkExistsForRoute(semaphoreService); - const historicGroup = await sqlTransaction(context.dbPool, (client) => + const historicGroup = await sqlQueryWithPool(context.dbPool, (client) => semaphoreService.getHistoricSemaphoreGroup( client, checkUrlParam(req, "id"), @@ -99,7 +99,7 @@ export function initSemaphoreRoutes( app.get("/semaphore/latest-root/:id", async (req: Request, res: Response) => { checkExistsForRoute(semaphoreService); const id = checkUrlParam(req, "id"); - const latestGroups = await sqlTransaction(context.dbPool, (client) => + const latestGroups = await sqlQueryWithPool(context.dbPool, (client) => semaphoreService.getLatestSemaphoreGroups(client) ); const matchingGroup = latestGroups.find((g) => g.groupId.toString() === id); diff --git a/apps/passport-server/src/routing/routes/telegramRoutes.ts b/apps/passport-server/src/routing/routes/telegramRoutes.ts index 43ee85ff72..c54886cc5a 100644 --- a/apps/passport-server/src/routing/routes/telegramRoutes.ts +++ b/apps/passport-server/src/routing/routes/telegramRoutes.ts @@ -1,6 +1,6 @@ import { AnonWebAppPayload, PayloadType } from "@pcd/passport-interface"; import express, { Request, Response } from "express"; -import { namedSqlTransaction, sqlTransaction } from "../../database/sqlQuery"; +import { sqlQueryWithPool } from "../../database/sqlQuery"; import { startTelegramService } from "../../services/telegramService"; import { ApplicationContext, GlobalServices } from "../../types"; import { logger } from "../../util/logger"; @@ -66,17 +66,14 @@ export function initTelegramRoutes( if (!telegramService) { throw new Error("Telegram service not initialized"); } - await namedSqlTransaction( - context.dbPool, - "/telegram/verify", - (client) => - telegramService.handleVerification( - client, - proof, - parseInt(telegram_user_id), - telegram_chat_id, - telegram_username - ) + await sqlQueryWithPool(context.dbPool, (client) => + telegramService.handleVerification( + client, + proof, + parseInt(telegram_user_id), + telegram_chat_id, + telegram_username + ) ); logger( `[TELEGRAM] Redirecting to telegram for user id ${telegram_user_id}` + @@ -214,7 +211,7 @@ export function initTelegramRoutes( } case PayloadType.ReactData: { - const proofUrl = await sqlTransaction(context.dbPool, (client) => + const proofUrl = await sqlQueryWithPool(context.dbPool, (client) => telegramService.handleRequestReactProofLink(client, anonPayload) ); res.redirect(proofUrl); @@ -248,21 +245,17 @@ export function initTelegramRoutes( "nullifierHash field needs to be a string and be non-empty" ); } - await namedSqlTransaction( - context.dbPool, - "/telegram/anonget/:nullifier", - async (client) => { - const messages = await telegramService.handleGetAnonMessages( - client, - nullifierHash - ); - const totalKarma = await telegramService.handleGetAnonTotalKarma( - client, - nullifierHash - ); - res.json({ messages, totalKarma }); - } - ); + await sqlQueryWithPool(context.dbPool, async (client) => { + const messages = await telegramService.handleGetAnonMessages( + client, + nullifierHash + ); + const totalKarma = await telegramService.handleGetAnonTotalKarma( + client, + nullifierHash + ); + res.json({ messages, totalKarma }); + }); } catch (e) { logger("[TELEGRAM] failed to get posts", e); } @@ -280,17 +273,14 @@ export function initTelegramRoutes( throw new Error("Telegram service not initialized"); } - await namedSqlTransaction( - context.dbPool, - "/telegram/anonreact", - (client) => - telegramService.handleReactAnonymousMessage( - client, - proof, - chatId, - anonMessageId, - reaction - ) + await sqlQueryWithPool(context.dbPool, (client) => + telegramService.handleReactAnonymousMessage( + client, + proof, + chatId, + anonMessageId, + reaction + ) ); res.setHeader("Content-Type", "text/html"); diff --git a/apps/passport-server/src/services/generic-issuance/pipelines/CSVPipeline/CSVPipeline.ts b/apps/passport-server/src/services/generic-issuance/pipelines/CSVPipeline/CSVPipeline.ts index fa22151cf4..09042ac646 100644 --- a/apps/passport-server/src/services/generic-issuance/pipelines/CSVPipeline/CSVPipeline.ts +++ b/apps/passport-server/src/services/generic-issuance/pipelines/CSVPipeline/CSVPipeline.ts @@ -25,7 +25,7 @@ import { } from "../../../../database/queries/pipelineAtomDB"; import { IPipelineConsumerDB } from "../../../../database/queries/pipelineConsumerDB"; import { IPipelineSemaphoreHistoryDB } from "../../../../database/queries/pipelineSemaphoreHistoryDB"; -import { sqlTransaction } from "../../../../database/sqlQuery"; +import { sqlQueryWithPool } from "../../../../database/sqlQuery"; import { ApplicationContext } from "../../../../types"; import { logger } from "../../../../util/logger"; import { setError, traced } from "../../../telemetryService"; @@ -110,7 +110,7 @@ export class CSVPipeline implements BasePipeline { getSerializedLatestGroup: async ( groupId: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getSerializedLatestGroup( @@ -122,7 +122,7 @@ export class CSVPipeline implements BasePipeline { getLatestGroupRoot: async ( groupId: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getLatestGroupRoot(client, groupId) @@ -132,7 +132,7 @@ export class CSVPipeline implements BasePipeline { groupId: string, rootHash: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getSerializedHistoricalGroup( diff --git a/apps/passport-server/src/services/generic-issuance/pipelines/CSVTicketPipeline/CSVTicketPipeline.ts b/apps/passport-server/src/services/generic-issuance/pipelines/CSVTicketPipeline/CSVTicketPipeline.ts index f86fee51d0..4e22175935 100644 --- a/apps/passport-server/src/services/generic-issuance/pipelines/CSVTicketPipeline/CSVTicketPipeline.ts +++ b/apps/passport-server/src/services/generic-issuance/pipelines/CSVTicketPipeline/CSVTicketPipeline.ts @@ -33,7 +33,7 @@ import { } from "../../../../database/queries/pipelineAtomDB"; import { IPipelineConsumerDB } from "../../../../database/queries/pipelineConsumerDB"; import { IPipelineSemaphoreHistoryDB } from "../../../../database/queries/pipelineSemaphoreHistoryDB"; -import { sqlTransaction } from "../../../../database/sqlQuery"; +import { sqlQueryWithPool } from "../../../../database/sqlQuery"; import { ApplicationContext } from "../../../../types"; import { logger } from "../../../../util/logger"; import { PersistentCacheService } from "../../../persistentCacheService"; @@ -133,7 +133,7 @@ export class CSVTicketPipeline implements BasePipeline { getSerializedLatestGroup: async ( groupId: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getSerializedLatestGroup( @@ -145,7 +145,7 @@ export class CSVTicketPipeline implements BasePipeline { getLatestGroupRoot: async ( groupId: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getLatestGroupRoot(client, groupId) @@ -155,7 +155,7 @@ export class CSVTicketPipeline implements BasePipeline { groupId: string, rootHash: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getSerializedHistoricalGroup( diff --git a/apps/passport-server/src/services/generic-issuance/pipelines/LemonadePipeline.ts b/apps/passport-server/src/services/generic-issuance/pipelines/LemonadePipeline.ts index a546bdac4a..cc29d07a2d 100644 --- a/apps/passport-server/src/services/generic-issuance/pipelines/LemonadePipeline.ts +++ b/apps/passport-server/src/services/generic-issuance/pipelines/LemonadePipeline.ts @@ -63,6 +63,7 @@ import { } from "../../../database/queries/ticketActionDBs"; import { namedSqlTransaction, + sqlQueryWithPool, sqlTransaction } from "../../../database/sqlQuery"; import { PCDHTTPError } from "../../../routing/pcdHttpError"; @@ -221,7 +222,7 @@ export class LemonadePipeline implements BasePipeline { getSerializedLatestGroup: async ( groupId: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getSerializedLatestGroup( @@ -233,7 +234,7 @@ export class LemonadePipeline implements BasePipeline { getLatestGroupRoot: async ( groupId: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getLatestGroupRoot(client, groupId) @@ -243,7 +244,7 @@ export class LemonadePipeline implements BasePipeline { groupId: string, rootHash: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getSerializedHistoricalGroup( @@ -829,139 +830,133 @@ export class LemonadePipeline implements BasePipeline { req: PollFeedRequest ): Promise { return traced(LOG_NAME, "issueLemonadeTicketPCDs", async (span) => { - return namedSqlTransaction( - this.context.dbPool, - "issueLemonadeTicketPCDs", - async (client) => { - tracePipeline(this.definition); + return sqlQueryWithPool(this.context.dbPool, async (client) => { + tracePipeline(this.definition); - if (!req.pcd) { - throw new Error("missing credential pcd"); - } + if (!req.pcd) { + throw new Error("missing credential pcd"); + } - if ( - this.definition.options.paused && - !(await this.db.hasLoaded(this.id)) - ) { - return { actions: [] }; - } + if ( + this.definition.options.paused && + !(await this.db.hasLoaded(this.id)) + ) { + return { actions: [] }; + } - const credential = - await this.credentialSubservice.verifyAndExpectZupassEmail(req.pcd); + const credential = + await this.credentialSubservice.verifyAndExpectZupassEmail(req.pcd); - const { emails, semaphoreId } = credential; + const { emails, semaphoreId } = credential; - if (!emails || emails.length === 0) { - throw new Error("missing emails in credential"); - } + if (!emails || emails.length === 0) { + throw new Error("missing emails in credential"); + } - span?.setAttribute("emails", emails.map((e) => e.email).join(",")); - span?.setAttribute("semaphore_id", semaphoreId); - - // let didUpdate = false; - // for (const email of emails) { - // // Consumer is validated, so save them in the consumer list - // didUpdate = - // didUpdate || - // (await this.consumerDB.save( - // client, - // this.id, - // email.email, - // semaphoreId, - // new Date() - // )); - // } - - // if ((this.definition.options.semaphoreGroups ?? []).length > 0) { - // // If the user's Semaphore commitment has changed, `didUpdate` will be - // // true, and we need to update the Semaphore groups - // if (didUpdate) { - // span?.setAttribute("semaphore_groups_updated", true); - // await this.triggerSemaphoreGroupUpdate(client); - // } - // } - - const tickets = ( - await Promise.all( - emails.map((e) => - this.getTicketsForEmail(client, e.email, semaphoreId) - ) + span?.setAttribute("emails", emails.map((e) => e.email).join(",")); + span?.setAttribute("semaphore_id", semaphoreId); + + // let didUpdate = false; + // for (const email of emails) { + // // Consumer is validated, so save them in the consumer list + // didUpdate = + // didUpdate || + // (await this.consumerDB.save( + // client, + // this.id, + // email.email, + // semaphoreId, + // new Date() + // )); + // } + + // if ((this.definition.options.semaphoreGroups ?? []).length > 0) { + // // If the user's Semaphore commitment has changed, `didUpdate` will be + // // true, and we need to update the Semaphore groups + // if (didUpdate) { + // span?.setAttribute("semaphore_groups_updated", true); + // await this.triggerSemaphoreGroupUpdate(client); + // } + // } + + const tickets = ( + await Promise.all( + emails.map((e) => + this.getTicketsForEmail(client, e.email, semaphoreId) ) - ).flat(); - - const ticketActions: PCDAction[] = []; - - if (await this.db.hasLoaded(this.id)) { - ticketActions.push({ - type: PCDActionType.DeleteFolder, - folder: this.definition.options.feedOptions.feedFolder, - recursive: true - }); - } + ) + ).flat(); - const ticketPCDs = await Promise.all( - tickets.map((t) => EdDSATicketPCDPackage.serialize(t)) - ); + const ticketActions: PCDAction[] = []; + if (await this.db.hasLoaded(this.id)) { ticketActions.push({ - type: PCDActionType.ReplaceInFolder, + type: PCDActionType.DeleteFolder, folder: this.definition.options.feedOptions.feedFolder, - pcds: ticketPCDs + recursive: true }); + } - const contactsFolder = `${this.definition.options.feedOptions.feedFolder}/contacts`; - const contacts = ( - await Promise.all( - emails.map((e) => - this.getReceivedContactsForEmail(client, e.email) - ) - ) - ).flat(); - const contactActions: PCDAction[] = [ - { - type: PCDActionType.DeleteFolder, - folder: contactsFolder, - recursive: true - }, - { - type: PCDActionType.ReplaceInFolder, - folder: contactsFolder, - pcds: contacts - } - ]; + const ticketPCDs = await Promise.all( + tickets.map((t) => EdDSATicketPCDPackage.serialize(t)) + ); - const badgeFolder = `${this.definition.options.feedOptions.feedFolder}/badges`; + ticketActions.push({ + type: PCDActionType.ReplaceInFolder, + folder: this.definition.options.feedOptions.feedFolder, + pcds: ticketPCDs + }); - const badges = ( - await Promise.all( - emails.map((e) => this.getReceivedBadgesForEmail(client, e.email)) - ) - ).flat(); - const badgeActions: PCDAction[] = [ - { - type: PCDActionType.DeleteFolder, - folder: badgeFolder, - recursive: true - }, - { - type: PCDActionType.ReplaceInFolder, - folder: badgeFolder, - pcds: badges - } - ]; + const contactsFolder = `${this.definition.options.feedOptions.feedFolder}/contacts`; + const contacts = ( + await Promise.all( + emails.map((e) => this.getReceivedContactsForEmail(client, e.email)) + ) + ).flat(); + const contactActions: PCDAction[] = [ + { + type: PCDActionType.DeleteFolder, + folder: contactsFolder, + recursive: true + }, + { + type: PCDActionType.ReplaceInFolder, + folder: contactsFolder, + pcds: contacts + } + ]; - traceFlattenedObject(span, { - pcds_issued: tickets.length + badges.length + contacts.length, - tickets_issued: tickets.length, - badges_issued: badges.length, - contacts_issued: contacts.length - }); + const badgeFolder = `${this.definition.options.feedOptions.feedFolder}/badges`; - return { - actions: [...ticketActions, ...contactActions, ...badgeActions] - }; - } - ); + const badges = ( + await Promise.all( + emails.map((e) => this.getReceivedBadgesForEmail(client, e.email)) + ) + ).flat(); + const badgeActions: PCDAction[] = [ + { + type: PCDActionType.DeleteFolder, + folder: badgeFolder, + recursive: true + }, + { + type: PCDActionType.ReplaceInFolder, + folder: badgeFolder, + pcds: badges + } + ]; + + traceFlattenedObject(span, { + pcds_issued: tickets.length + badges.length + contacts.length, + tickets_issued: tickets.length, + badges_issued: badges.length, + contacts_issued: contacts.length + }); + + return { + actions: [...ticketActions, ...contactActions, ...badgeActions] + }; + }); }); } diff --git a/apps/passport-server/src/services/generic-issuance/pipelines/PODPipeline/PODPipeline.ts b/apps/passport-server/src/services/generic-issuance/pipelines/PODPipeline/PODPipeline.ts index b88a3af142..64d9c0fb40 100644 --- a/apps/passport-server/src/services/generic-issuance/pipelines/PODPipeline/PODPipeline.ts +++ b/apps/passport-server/src/services/generic-issuance/pipelines/PODPipeline/PODPipeline.ts @@ -25,7 +25,7 @@ import { PipelineAtom } from "../../../../database/queries/pipelineAtomDB"; import { IPipelineConsumerDB } from "../../../../database/queries/pipelineConsumerDB"; -import { sqlTransaction } from "../../../../database/sqlQuery"; +import { sqlQueryWithPool } from "../../../../database/sqlQuery"; import { ApplicationContext } from "../../../../types"; import { logger } from "../../../../util/logger"; import { PersistentCacheService } from "../../../persistentCacheService"; @@ -343,7 +343,7 @@ export class PODPipeline implements BasePipeline { span?.setAttribute("email", emails?.map((e) => e.email)?.join(",") ?? ""); span?.setAttribute("semaphore_id", semaphoreId); - await sqlTransaction(this.context.dbPool, async (client) => { + await sqlQueryWithPool(this.context.dbPool, async (client) => { for (const e of emails ?? []) { await this.consumerDB.save( client, diff --git a/apps/passport-server/src/services/generic-issuance/pipelines/PretixPipeline.ts b/apps/passport-server/src/services/generic-issuance/pipelines/PretixPipeline.ts index b9b0b98983..fc03b4221d 100644 --- a/apps/passport-server/src/services/generic-issuance/pipelines/PretixPipeline.ts +++ b/apps/passport-server/src/services/generic-issuance/pipelines/PretixPipeline.ts @@ -62,6 +62,7 @@ import { IPipelineManualTicketDB } from "../../../database/queries/pipelineManua import { IPipelineSemaphoreHistoryDB } from "../../../database/queries/pipelineSemaphoreHistoryDB"; import { namedSqlTransaction, + sqlQueryWithPool, sqlTransaction } from "../../../database/sqlQuery"; import { PCDHTTPError } from "../../../routing/pcdHttpError"; @@ -253,7 +254,7 @@ export class PretixPipeline implements BasePipeline { getSerializedLatestGroup: async ( groupId: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getSerializedLatestGroup( @@ -265,7 +266,7 @@ export class PretixPipeline implements BasePipeline { getLatestGroupRoot: async ( groupId: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getLatestGroupRoot(client, groupId) @@ -275,7 +276,7 @@ export class PretixPipeline implements BasePipeline { groupId: string, rootHash: string ): Promise => { - return sqlTransaction( + return sqlQueryWithPool( this.context.dbPool, async (client) => this.semaphoreGroupProvider?.getSerializedHistoricalGroup( @@ -457,85 +458,80 @@ export class PretixPipeline implements BasePipeline { `saving ${atomsToSave.length} atoms for pipeline id '${this.id}' of type ${this.type}` ); - return namedSqlTransaction( - this.context.dbPool, - "load", - async (client) => { - if (this.autoIssuanceProvider) { - const newManualTickets = - await this.autoIssuanceProvider.dripNewManualTickets( - client, - this.consumerDB, - await this.getAllManualTickets(client), - atomsToSave - ); - - await Promise.allSettled( - newManualTickets.map((t) => - this.manualTicketDB.save(client, this.id, t) - ) + return sqlQueryWithPool(this.context.dbPool, async (client) => { + if (this.autoIssuanceProvider) { + const newManualTickets = + await this.autoIssuanceProvider.dripNewManualTickets( + client, + this.consumerDB, + await this.getAllManualTickets(client), + atomsToSave ); - } - await this.db.clear(this.definition.id); - await this.db.save(this.definition.id, atomsToSave); - await this.db.markAsLoaded(this.id); + await Promise.allSettled( + newManualTickets.map((t) => + this.manualTicketDB.save(client, this.id, t) + ) + ); + } - logs.push(makePLogInfo(`saved ${atomsToSave.length} items`)); + await this.db.clear(this.definition.id); + await this.db.save(this.definition.id, atomsToSave); + await this.db.markAsLoaded(this.id); - const loadEnd = Date.now(); + logs.push(makePLogInfo(`saved ${atomsToSave.length} items`)); - logger( - LOG_TAG, - `loaded ${atomsToSave.length} atoms for pipeline id ${ - this.id - } in ${loadEnd - startTime.getTime()}ms` - ); + const loadEnd = Date.now(); - span?.setAttribute("atoms_saved", atomsToSave.length); - - // Remove any pending check-ins that succeeded before loading started. - // Those that succeeded after loading started might not be represented in - // the data we fetched, so we can remove them on the next run. - // Pending checkins with the "Pending" status should not be removed, as - // they are still in-progress. - this.pendingCheckIns.forEach((value, key) => { - if ( - value.status === CheckinStatus.Success && - value.timestamp <= startTime.getTime() - ) { - this.pendingCheckIns.delete(key); - } - }); - - const end = new Date(); - logs.push( - makePLogInfo( - `load finished in ${end.getTime() - startTime.getTime()}ms` - ) - ); + logger( + LOG_TAG, + `loaded ${atomsToSave.length} atoms for pipeline id ${this.id} in ${ + loadEnd - startTime.getTime() + }ms` + ); - if ((this.definition.options.semaphoreGroups ?? []).length > 0) { - await this.triggerSemaphoreGroupUpdate(client); + span?.setAttribute("atoms_saved", atomsToSave.length); + + // Remove any pending check-ins that succeeded before loading started. + // Those that succeeded after loading started might not be represented in + // the data we fetched, so we can remove them on the next run. + // Pending checkins with the "Pending" status should not be removed, as + // they are still in-progress. + this.pendingCheckIns.forEach((value, key) => { + if ( + value.status === CheckinStatus.Success && + value.timestamp <= startTime.getTime() + ) { + this.pendingCheckIns.delete(key); } + }); - const loadSummary: PipelineLoadSummary = { - paused: false, - fromCache: false, - lastRunEndTimestamp: end.toISOString(), - lastRunStartTimestamp: startTime.toISOString(), - latestLogs: logs, - atomsLoaded: atomsToSave.length, - atomsExpected: atomsToSave.length, - errorMessage: undefined, - semaphoreGroups: - this.semaphoreGroupProvider?.getSupportedGroups(), - success: true - }; + const end = new Date(); + logs.push( + makePLogInfo( + `load finished in ${end.getTime() - startTime.getTime()}ms` + ) + ); - return loadSummary; + if ((this.definition.options.semaphoreGroups ?? []).length > 0) { + await this.triggerSemaphoreGroupUpdate(client); } - ); + + const loadSummary: PipelineLoadSummary = { + paused: false, + fromCache: false, + lastRunEndTimestamp: end.toISOString(), + lastRunStartTimestamp: startTime.toISOString(), + latestLogs: logs, + atomsLoaded: atomsToSave.length, + atomsExpected: atomsToSave.length, + errorMessage: undefined, + semaphoreGroups: this.semaphoreGroupProvider?.getSupportedGroups(), + success: true + }; + + return loadSummary; + }); } ); } @@ -1189,7 +1185,7 @@ export class PretixPipeline implements BasePipeline { //let didUpdate = false; - return namedSqlTransaction(this.context.dbPool, "", async (client) => { + return sqlQueryWithPool(this.context.dbPool, async (client) => { // for (const e of emails) { // didUpdate = // didUpdate || diff --git a/apps/passport-server/src/services/generic-issuance/subservices/PipelineAPISubservice.ts b/apps/passport-server/src/services/generic-issuance/subservices/PipelineAPISubservice.ts index 8a99630984..ff123795e4 100644 --- a/apps/passport-server/src/services/generic-issuance/subservices/PipelineAPISubservice.ts +++ b/apps/passport-server/src/services/generic-issuance/subservices/PipelineAPISubservice.ts @@ -20,7 +20,7 @@ import { str } from "@pcd/util"; import _ from "lodash"; import { PoolClient } from "postgres-pool"; import { IPipelineConsumerDB } from "../../../database/queries/pipelineConsumerDB"; -import { sqlTransaction } from "../../../database/sqlQuery"; +import { sqlQueryWithPool } from "../../../database/sqlQuery"; import { PCDHTTPError } from "../../../routing/pcdHttpError"; import { ApplicationContext } from "../../../types"; import { logger } from "../../../util/logger"; @@ -152,7 +152,7 @@ export class PipelineAPISubservice { pipelineHasSemaphoreGroups = true; } - const latestConsumers = await sqlTransaction( + const latestConsumers = await sqlQueryWithPool( this.context.dbPool, (client) => this.consumerDB.loadAll(client, pipelineInstance.id) ); diff --git a/apps/passport-server/src/services/generic-issuance/subservices/PipelineExecutorSubservice.ts b/apps/passport-server/src/services/generic-issuance/subservices/PipelineExecutorSubservice.ts index fc2739bfe8..69e11a6380 100644 --- a/apps/passport-server/src/services/generic-issuance/subservices/PipelineExecutorSubservice.ts +++ b/apps/passport-server/src/services/generic-issuance/subservices/PipelineExecutorSubservice.ts @@ -10,7 +10,7 @@ import { IPipelineAtomDB } from "../../../database/queries/pipelineAtomDB"; import { IPipelineDefinitionDB } from "../../../database/queries/pipelineDefinitionDB"; import { namedSqlTransaction, - sqlTransaction + sqlQueryWithPool } from "../../../database/sqlQuery"; import { ApplicationContext } from "../../../types"; import { logger } from "../../../util/logger"; @@ -101,7 +101,7 @@ export class PipelineExecutorSubservice { * schedules a load loop which loads data for each {@link Pipeline} once per minute. */ public async start(startLoadLoop?: boolean): Promise { - await sqlTransaction(this.context.internalPool, (client) => + await sqlQueryWithPool(this.context.internalPool, (client) => this.loadAndInstantiatePipelines(client) ); diff --git a/apps/passport-server/src/services/issuanceService.ts b/apps/passport-server/src/services/issuanceService.ts index d961f83c89..c797e1867f 100644 --- a/apps/passport-server/src/services/issuanceService.ts +++ b/apps/passport-server/src/services/issuanceService.ts @@ -64,7 +64,7 @@ import { import { fetchUserByV3Commitment } from "../database/queries/users"; import { fetchZuconnectTicketsByEmail } from "../database/queries/zuconnect/fetchZuconnectTickets"; import { fetchAllUsersWithZuzaluTickets } from "../database/queries/zuzalu_pretix_tickets/fetchZuzaluUser"; -import { namedSqlTransaction, sqlTransaction } from "../database/sqlQuery"; +import { sqlQueryWithPool } from "../database/sqlQuery"; import { PCDHTTPError } from "../routing/pcdHttpError"; import { ApplicationContext } from "../types"; import { logger } from "../util/logger"; @@ -168,37 +168,31 @@ export class IssuanceService { const actions: PCDAction[] = []; try { - await namedSqlTransaction( - this.context.dbPool, - "issueEmailPCDs", - async (client) => { - if (req.pcd === undefined) { - throw new Error(`Missing credential`); - } - const verifiedCredential = await this.verifyCredential( - req.pcd - ); - const pcds = await this.issueEmailPCDs( - client, - verifiedCredential - ); - - // Clear out the folder - actions.push({ - type: PCDActionType.DeleteFolder, - folder: "Email", - recursive: false - }); - - actions.push({ - type: PCDActionType.ReplaceInFolder, - folder: "Email", - pcds: await Promise.all( - pcds.map((pcd) => EmailPCDPackage.serialize(pcd)) - ) - }); + await sqlQueryWithPool(this.context.dbPool, async (client) => { + if (req.pcd === undefined) { + throw new Error(`Missing credential`); } - ); + const verifiedCredential = await this.verifyCredential(req.pcd); + const pcds = await this.issueEmailPCDs( + client, + verifiedCredential + ); + + // Clear out the folder + actions.push({ + type: PCDActionType.DeleteFolder, + folder: "Email", + recursive: false + }); + + actions.push({ + type: PCDActionType.ReplaceInFolder, + folder: "Email", + pcds: await Promise.all( + pcds.map((pcd) => EmailPCDPackage.serialize(pcd)) + ) + }); + }); } catch (e) { logger(`Error encountered while serving feed:`, e); this.rollbarService?.reportError(e); @@ -215,38 +209,32 @@ export class IssuanceService { const actions: PCDAction[] = []; try { - await namedSqlTransaction( - this.context.dbPool, - "issueZuzaluTicketPCDs", - async (client) => { - if (req.pcd === undefined) { - throw new Error(`Missing credential`); - } - - const verifiedCredential = await this.verifyCredential( - req.pcd - ); - const pcds = await this.issueZuzaluTicketPCDs( - client, - verifiedCredential - ); - - // Clear out the folder - actions.push({ - type: PCDActionType.DeleteFolder, - folder: "Zuzalu '23", - recursive: false - }); - - actions.push({ - type: PCDActionType.ReplaceInFolder, - folder: "Zuzalu '23", - pcds: await Promise.all( - pcds.map((pcd) => EdDSATicketPCDPackage.serialize(pcd)) - ) - }); + await sqlQueryWithPool(this.context.dbPool, async (client) => { + if (req.pcd === undefined) { + throw new Error(`Missing credential`); } - ); + + const verifiedCredential = await this.verifyCredential(req.pcd); + const pcds = await this.issueZuzaluTicketPCDs( + client, + verifiedCredential + ); + + // Clear out the folder + actions.push({ + type: PCDActionType.DeleteFolder, + folder: "Zuzalu '23", + recursive: false + }); + + actions.push({ + type: PCDActionType.ReplaceInFolder, + folder: "Zuzalu '23", + pcds: await Promise.all( + pcds.map((pcd) => EdDSATicketPCDPackage.serialize(pcd)) + ) + }); + }); } catch (e) { logger(`Error encountered while serving feed:`, e); this.rollbarService?.reportError(e); @@ -263,44 +251,38 @@ export class IssuanceService { const actions: PCDAction[] = []; try { - await namedSqlTransaction( - this.context.dbPool, - "issueZuconnectTicketPCDs", - async (client) => { - if (req.pcd === undefined) { - throw new Error(`Missing credential`); - } - const verifiedCredential = await this.verifyCredential( - req.pcd - ); - const pcds = await this.issueZuconnectTicketPCDs( - client, - verifiedCredential - ); - - // Clear out the old folder - actions.push({ - type: PCDActionType.DeleteFolder, - folder: "Zuconnect", - recursive: false - }); - - // Clear out the folder - actions.push({ - type: PCDActionType.DeleteFolder, - folder: "ZuConnect", - recursive: false - }); - - actions.push({ - type: PCDActionType.ReplaceInFolder, - folder: "ZuConnect", - pcds: await Promise.all( - pcds.map((pcd) => EdDSATicketPCDPackage.serialize(pcd)) - ) - }); + await sqlQueryWithPool(this.context.dbPool, async (client) => { + if (req.pcd === undefined) { + throw new Error(`Missing credential`); } - ); + const verifiedCredential = await this.verifyCredential(req.pcd); + const pcds = await this.issueZuconnectTicketPCDs( + client, + verifiedCredential + ); + + // Clear out the old folder + actions.push({ + type: PCDActionType.DeleteFolder, + folder: "Zuconnect", + recursive: false + }); + + // Clear out the folder + actions.push({ + type: PCDActionType.DeleteFolder, + folder: "ZuConnect", + recursive: false + }); + + actions.push({ + type: PCDActionType.ReplaceInFolder, + folder: "ZuConnect", + pcds: await Promise.all( + pcds.map((pcd) => EdDSATicketPCDPackage.serialize(pcd)) + ) + }); + }); } catch (e) { logger(`Error encountered while serving feed:`, e); this.rollbarService?.reportError(e); @@ -975,7 +957,7 @@ export async function startIssuanceService( return null; } - await sqlTransaction(context.dbPool, async (client) => + await sqlQueryWithPool(context.dbPool, async (client) => setupKnownTicketTypes(client, await getEdDSAPublicKey(zupassEddsaKey)) ); diff --git a/apps/passport-server/src/services/metricsService.ts b/apps/passport-server/src/services/metricsService.ts index b439e92ea1..4daa18180b 100644 --- a/apps/passport-server/src/services/metricsService.ts +++ b/apps/passport-server/src/services/metricsService.ts @@ -2,7 +2,7 @@ import { RollbarService } from "@pcd/server-shared"; import { getCacheSize } from "../database/queries/cache"; import { fetchE2EEStorageCount } from "../database/queries/e2ee"; import { fetchUserCount } from "../database/queries/users"; -import { namedSqlTransaction } from "../database/sqlQuery"; +import { sqlQueryWithPool } from "../database/sqlQuery"; import { ApplicationContext, ServerMode } from "../types"; import { logger } from "../util/logger"; import { traced } from "./telemetryService"; @@ -54,19 +54,15 @@ export class MetricsService { } private async collectMetrics(): Promise { - return await namedSqlTransaction( - this.context.dbPool, - "collectMetrics", - async (client) => { - const metrics: Metrics = { - usersCount: await fetchUserCount(client), - e2eeCount: await fetchE2EEStorageCount(client), - cacheSize: await getCacheSize(client) - }; + return await sqlQueryWithPool(this.context.dbPool, async (client) => { + const metrics: Metrics = { + usersCount: await fetchUserCount(client), + e2eeCount: await fetchE2EEStorageCount(client), + cacheSize: await getCacheSize(client) + }; - return metrics; - } - ); + return metrics; + }); } private async reportMetrics(metrics: Metrics): Promise { diff --git a/apps/passport-server/src/services/persistentCacheService.ts b/apps/passport-server/src/services/persistentCacheService.ts index 8fa8b8c499..1a4a9fc174 100644 --- a/apps/passport-server/src/services/persistentCacheService.ts +++ b/apps/passport-server/src/services/persistentCacheService.ts @@ -6,7 +6,7 @@ import { getCacheValue, setCacheValue } from "../database/queries/cache"; -import { namedSqlTransaction } from "../database/sqlQuery"; +import { sqlQueryWithPool } from "../database/sqlQuery"; import { logger } from "../util/logger"; import { traced } from "./telemetryService"; @@ -61,7 +61,7 @@ export class PersistentCacheService { public async setValue(key: string, value: string): Promise { return traced("Cache", "setValue", async (span) => { span?.setAttribute("cache_key", key); - await namedSqlTransaction(this.pool, "setValue", (client) => + await sqlQueryWithPool(this.pool, async (client) => setCacheValue(client, key, value) ); }); @@ -70,7 +70,7 @@ export class PersistentCacheService { public async getValue(key: string): Promise { return traced("Cache", "getValue", async (span) => { span?.setAttribute("cache_key", key); - return await namedSqlTransaction(this.pool, "getValue", (client) => { + return await sqlQueryWithPool(this.pool, async (client) => { const value = getCacheValue(client, key); span?.setAttribute("hit", !!value); return value; @@ -79,18 +79,14 @@ export class PersistentCacheService { } private async tryExpireOldEntries(): Promise { - return namedSqlTransaction( - this.pool, - "tryExpireOldEntries", - async (client): Promise => { - try { - return this.expireOldEntries(client); - } catch (e) { - logger("failed to expire old cache entries", e); - this.rollbarService?.reportError(e); - } + return sqlQueryWithPool(this.pool, async (client): Promise => { + try { + return this.expireOldEntries(client); + } catch (e) { + logger("failed to expire old cache entries", e); + this.rollbarService?.reportError(e); } - ); + }); } private async expireOldEntries(client: PoolClient): Promise { diff --git a/apps/passport-server/src/services/rateLimitService.ts b/apps/passport-server/src/services/rateLimitService.ts index 52447075db..e274020750 100644 --- a/apps/passport-server/src/services/rateLimitService.ts +++ b/apps/passport-server/src/services/rateLimitService.ts @@ -6,7 +6,7 @@ import { deleteUnsupportedRateLimitBuckets, pruneRateLimitBuckets } from "../database/queries/rateLimit"; -import { sqlTransaction } from "../database/sqlQuery"; +import { sqlQueryWithPool, sqlTransaction } from "../database/sqlQuery"; import { ApplicationContext } from "../types"; import { logger } from "../util/logger"; import { traced } from "./telemetryService"; @@ -40,13 +40,13 @@ export class RateLimitService { this.context = context; this.rollbarService = rollbarService; const pruneExpiredBuckets = async (): Promise => { - await sqlTransaction(this.context.dbPool, (client) => + await sqlQueryWithPool(this.context.dbPool, (client) => this.pruneBuckets(client) ); this.pruneTimeout = setTimeout(() => pruneExpiredBuckets(), ONE_HOUR_MS); }; pruneExpiredBuckets(); - sqlTransaction(this.context.dbPool, (client) => + sqlQueryWithPool(this.context.dbPool, (client) => this.removeUnsupportedBuckets(client) ); this.disabled = disabled; diff --git a/apps/passport-server/src/services/telegramService.ts b/apps/passport-server/src/services/telegramService.ts index eadba4c3dc..213c5d346e 100644 --- a/apps/passport-server/src/services/telegramService.ts +++ b/apps/passport-server/src/services/telegramService.ts @@ -72,7 +72,7 @@ import { insertTelegramVerification } from "../database/queries/telegram/insertTelegramConversation"; import { insertTelegramReaction } from "../database/queries/telegram/insertTelegramReaction"; -import { namedSqlTransaction, sqlTransaction } from "../database/sqlQuery"; +import { sqlQueryWithPool } from "../database/sqlQuery"; import { ApplicationContext, ServerMode } from "../types"; import { handleFrogVerification } from "../util/frogTelegramHelpers"; import { logger } from "../util/logger"; @@ -182,7 +182,7 @@ export class TelegramService { `[TELEGRAM] Got chat join request for ${chatId} from ${userId}` ); // Check if this user is verified for the chat in question - const isVerified = await sqlTransaction( + const isVerified = await sqlQueryWithPool( this.context.dbPool, (client) => fetchTelegramVerificationStatus(client, userId, chatId) ); @@ -243,43 +243,36 @@ export class TelegramService { this.authBot.on("chat_member", async (ctx) => { return traced("telegram", "chat_member", async (span) => { try { - namedSqlTransaction( - this.context.dbPool, - "chat_member", - async (client) => { - const newMember = ctx.update.chat_member.new_chat_member; - span?.setAttribute("userId", newMember.user.id.toString()); - span?.setAttribute("status", newMember.status); - - if ( - newMember.status === "left" || - newMember.status === "kicked" - ) { - logger( - `[TELEGRAM] Deleting verification for user leaving ${ - newMember.user.username || newMember.user.first_name - } in chat ${ctx.chat.id}` - ); - await deleteTelegramVerification( - client, - newMember.user.id, - ctx.chat.id - ); - const chat = await getGroupChat(ctx.api, ctx.chat.id); - span?.setAttribute("chatId", chat.id); - span?.setAttribute("chatTitle", chat.title); + sqlQueryWithPool(this.context.dbPool, async (client) => { + const newMember = ctx.update.chat_member.new_chat_member; + span?.setAttribute("userId", newMember.user.id.toString()); + span?.setAttribute("status", newMember.status); - const userId = newMember.user.id; - if (!newMember.user.is_bot) { - await this.authBot.api.sendMessage( - userId, - `You left ${chat?.title}. To join again, you must re-verify by typing /start.`, - { parse_mode: "HTML" } - ); - } + if (newMember.status === "left" || newMember.status === "kicked") { + logger( + `[TELEGRAM] Deleting verification for user leaving ${ + newMember.user.username || newMember.user.first_name + } in chat ${ctx.chat.id}` + ); + await deleteTelegramVerification( + client, + newMember.user.id, + ctx.chat.id + ); + const chat = await getGroupChat(ctx.api, ctx.chat.id); + span?.setAttribute("chatId", chat.id); + span?.setAttribute("chatTitle", chat.title); + + const userId = newMember.user.id; + if (!newMember.user.is_bot) { + await this.authBot.api.sendMessage( + userId, + `You left ${chat?.title}. To join again, you must re-verify by typing /start.`, + { parse_mode: "HTML" } + ); } } - ); + }); } catch (e) { logger("[TELEGRAM] chat_member error", e); this.rollbarService?.reportError(e); @@ -302,7 +295,7 @@ export class TelegramService { const name = firstName || username; ctx.session.directLinkMode = false; if (ctx.match && Number.isInteger(Number(ctx.match))) { - const [chatWithMembership] = await sqlTransaction( + const [chatWithMembership] = await sqlQueryWithPool( ctx.session.dbPool, (client) => getChatsWithMembershipStatus( @@ -455,7 +448,7 @@ export class TelegramService { userId, `Loading tickets and events...` ); - const events = await sqlTransaction(ctx.session.dbPool, (client) => + const events = await sqlQueryWithPool(ctx.session.dbPool, (client) => fetchEventsWithTelegramChats(client, false) ); const eventsWithChats = await chatIDsToChats(ctx, events); @@ -500,75 +493,67 @@ export class TelegramService { this.authBot.on(":forum_topic_created", async (ctx) => { return traced("telegram", "forum_topic_created", async (span) => { - await namedSqlTransaction( - this.context.dbPool, - "forum_topic_created", - async (client) => { - const topicName = ctx.update?.message?.forum_topic_created.name; - const messageThreadId = ctx.update.message?.message_thread_id; - const chatId = ctx.chat.id; - span?.setAttributes({ topicName, messageThreadId, chatId }); + await sqlQueryWithPool(this.context.dbPool, async (client) => { + const topicName = ctx.update?.message?.forum_topic_created.name; + const messageThreadId = ctx.update.message?.message_thread_id; + const chatId = ctx.chat.id; + span?.setAttributes({ topicName, messageThreadId, chatId }); - if (!chatId || !topicName) - throw new Error(`Missing chatId or topic name`); + if (!chatId || !topicName) + throw new Error(`Missing chatId or topic name`); - await insertTelegramChat(client, chatId); - await insertTelegramTopic( - client, - chatId, - topicName, - messageThreadId, - false - ); + await insertTelegramChat(client, chatId); + await insertTelegramTopic( + client, + chatId, + topicName, + messageThreadId, + false + ); - logger(`[TELEGRAM CREATED]`, topicName, messageThreadId, chatId); - } - ); + logger(`[TELEGRAM CREATED]`, topicName, messageThreadId, chatId); + }); }); }); this.authBot.on(":forum_topic_edited", async (ctx) => { return traced("telegram", "forum_topic_edited", async (span) => { - await namedSqlTransaction( - this.context.dbPool, - "forum_topic_edited", - async (client) => { - const topicName = ctx.update?.message?.forum_topic_edited.name; - const chatId = ctx.chat.id; - const messageThreadId = ctx.update.message?.message_thread_id; - span?.setAttributes({ topicName, messageThreadId, chatId }); + await sqlQueryWithPool(this.context.dbPool, async (client) => { + const topicName = ctx.update?.message?.forum_topic_edited.name; + const chatId = ctx.chat.id; + const messageThreadId = ctx.update.message?.message_thread_id; + span?.setAttributes({ topicName, messageThreadId, chatId }); - if (!chatId || !topicName) - throw new Error(`Missing chatId or topic name`); + if (!chatId || !topicName) + throw new Error(`Missing chatId or topic name`); - const topic = await fetchTelegramTopic( + const topic = await fetchTelegramTopic( + client, + chatId, + messageThreadId + ); + + if (!topic) { + logger(`[TELEGRAM] adding topic ${topicName} to db`); + await insertTelegramChat(client, chatId); + await insertTelegramTopic( client, chatId, - messageThreadId + topicName, + messageThreadId, + false + ); + } else { + logger(`[TELEGRAM] updating topic ${topicName} in db`); + await insertTelegramTopic( + client, + topic.telegramChatID, + topicName, + topic.topic_id, + topic.is_anon_topic ); - - if (!topic) { - logger(`[TELEGRAM] adding topic ${topicName} to db`); - await insertTelegramChat(client, chatId); - await insertTelegramTopic( - client, - chatId, - topicName, - messageThreadId, - false - ); - } else { - logger(`[TELEGRAM] updating topic ${topicName} in db`); - await insertTelegramTopic( - client, - topic.telegramChatID, - topicName, - topic.topic_id, - topic.is_anon_topic - ); - } } - ); + }); }); }); @@ -597,86 +582,82 @@ export class TelegramService { }); try { - await namedSqlTransaction( - ctx.session.dbPool, - "incognito", - async (client) => { - const telegramEvents = await fetchTelegramEventsByChatId( - client, - ctx.chat.id - ); - - const hasLinked = telegramEvents.length > 0; - if (!hasLinked) { - await ctx.reply( - "This group is not linked to an event. If you're an admin, use /manage to link this group to an event.", - { message_thread_id: messageThreadId } - ); - return; - } + await sqlQueryWithPool(ctx.session.dbPool, async (client) => { + const telegramEvents = await fetchTelegramEventsByChatId( + client, + ctx.chat.id + ); - const topicToUpdate = await fetchTelegramTopic( - client, - ctx.chat.id, - messageThreadId + const hasLinked = telegramEvents.length > 0; + if (!hasLinked) { + await ctx.reply( + "This group is not linked to an event. If you're an admin, use /manage to link this group to an event.", + { message_thread_id: messageThreadId } ); + return; + } - if (!topicToUpdate) - throw new Error(`Couldn't find this topic in the db.`); + const topicToUpdate = await fetchTelegramTopic( + client, + ctx.chat.id, + messageThreadId + ); - if (topicToUpdate.is_anon_topic) { - await ctx.reply(`This topic is already anonymous.`, { - message_thread_id: messageThreadId - }); - return; - } + if (!topicToUpdate) + throw new Error(`Couldn't find this topic in the db.`); - const topicName = - topicToUpdate?.topic_name || - ctx.message?.reply_to_message?.forum_topic_created?.name; - if (!topicName) throw new Error(`No topic name found`); + if (topicToUpdate.is_anon_topic) { + await ctx.reply(`This topic is already anonymous.`, { + message_thread_id: messageThreadId + }); + return; + } - await insertTelegramTopic( - client, - ctx.chat.id, - topicName, - messageThreadId, - true - ); + const topicName = + topicToUpdate?.topic_name || + ctx.message?.reply_to_message?.forum_topic_created?.name; + if (!topicName) throw new Error(`No topic name found`); - await ctx.reply( - `Linked with topic name ${topicName}.\nIf this name is incorrect, edit this topic name to update`, - { - message_thread_id: messageThreadId, - parse_mode: "HTML" - } - ); + await insertTelegramTopic( + client, + ctx.chat.id, + topicName, + messageThreadId, + true + ); - const directLinkParams: RedirectTopicDataPayload = { - type: PayloadType.RedirectTopicData, - value: { - topicId: messageThreadId, - chatId: ctx.chat.id - } - }; + await ctx.reply( + `Linked with topic name ${topicName}.\nIf this name is incorrect, edit this topic name to update`, + { + message_thread_id: messageThreadId, + parse_mode: "HTML" + } + ); - const encodedPayload = Buffer.from( - JSON.stringify(directLinkParams), - "utf-8" - ).toString("base64"); + const directLinkParams: RedirectTopicDataPayload = { + type: PayloadType.RedirectTopicData, + value: { + topicId: messageThreadId, + chatId: ctx.chat.id + } + }; - const messageToPin = await ctx.reply("Click to post", { - message_thread_id: messageThreadId, - reply_markup: new InlineKeyboard().url( - "Post Anonymously", - // NOTE: The order and casing of the direct link params is VERY IMPORTANT. https://github.com/TelegramMessenger/Telegram-iOS/issues/1091 - `${process.env.TELEGRAM_ANON_BOT_DIRECT_LINK}?startApp=${encodedPayload}&startapp=${encodedPayload}` - ) - }); - ctx.pinChatMessage(messageToPin.message_id); - ctx.api.closeForumTopic(ctx.chat.id, messageThreadId); - } - ); + const encodedPayload = Buffer.from( + JSON.stringify(directLinkParams), + "utf-8" + ).toString("base64"); + + const messageToPin = await ctx.reply("Click to post", { + message_thread_id: messageThreadId, + reply_markup: new InlineKeyboard().url( + "Post Anonymously", + // NOTE: The order and casing of the direct link params is VERY IMPORTANT. https://github.com/TelegramMessenger/Telegram-iOS/issues/1091 + `${process.env.TELEGRAM_ANON_BOT_DIRECT_LINK}?startApp=${encodedPayload}&startapp=${encodedPayload}` + ) + }); + ctx.pinChatMessage(messageToPin.message_id); + ctx.api.closeForumTopic(ctx.chat.id, messageThreadId); + }); } catch (error) { logger(`[ERROR] ${error}`); await ctx.reply(`Failed to link anonymous chat. ${error} `, { @@ -709,109 +690,98 @@ export class TelegramService { if (this.forwardBot) { this.forwardBot.command("receive", async (ctx) => { - await namedSqlTransaction( - ctx.session.dbPool, - "receive", - async (client) => { - const messageThreadId = ctx.message?.message_thread_id; - try { - logger(`[TELEGRAM] running receive`); - if (isDirectMessage(ctx)) - return ctx.reply(`/receive can only be run in a group chat`); - - if (!ctx.from?.username) - return ctx.reply(`No username found`, { - reply_to_message_id: messageThreadId - }); + await sqlQueryWithPool(ctx.session.dbPool, async (client) => { + const messageThreadId = ctx.message?.message_thread_id; + try { + logger(`[TELEGRAM] running receive`); + if (isDirectMessage(ctx)) + return ctx.reply(`/receive can only be run in a group chat`); - if (!ALLOWED_TICKET_MANAGERS.includes(ctx.from.username)) - return ctx.reply( - `Only Zupass team members are allowed to run this command.`, - { reply_to_message_id: messageThreadId } - ); + if (!ctx.from?.username) + return ctx.reply(`No username found`, { + reply_to_message_id: messageThreadId + }); - // Look up topic. - const topic = await fetchTelegramTopic( - client, - ctx.chat.id, - messageThreadId + if (!ALLOWED_TICKET_MANAGERS.includes(ctx.from.username)) + return ctx.reply( + `Only Zupass team members are allowed to run this command.`, + { reply_to_message_id: messageThreadId } ); - if (!topic) - throw new Error( - `No topic found. Edit the topic name and try again!` - ); + // Look up topic. + const topic = await fetchTelegramTopic( + client, + ctx.chat.id, + messageThreadId + ); - await insertTelegramForward(client, null, topic.id); - logger(`[TELEGRAM] ${topic.topic_name} can receive messages`); - await ctx.reply( - `${topic.topic_name} can receive messages`, - { - reply_to_message_id: messageThreadId, - parse_mode: "HTML" - } + if (!topic) + throw new Error( + `No topic found. Edit the topic name and try again!` ); - } catch (error) { - ctx.reply(`${error}`, { - reply_to_message_id: messageThreadId - }); - } + + await insertTelegramForward(client, null, topic.id); + logger(`[TELEGRAM] ${topic.topic_name} can receive messages`); + await ctx.reply(`${topic.topic_name} can receive messages`, { + reply_to_message_id: messageThreadId, + parse_mode: "HTML" + }); + } catch (error) { + ctx.reply(`${error}`, { + reply_to_message_id: messageThreadId + }); } - ); + }); }); this.forwardBot.command("stopreceive", async (ctx) => { - await namedSqlTransaction( - ctx.session.dbPool, - "stopreceive", - async (client) => { - const messageThreadId = ctx.message?.message_thread_id; - try { - logger(`[TELEGRAM] running receive`); - if (isDirectMessage(ctx)) - return ctx.reply(`/receive can only be run in a group chat`); - - if (!ctx.from?.username) - return ctx.reply(`No username found`, { - reply_to_message_id: messageThreadId - }); + await sqlQueryWithPool(ctx.session.dbPool, async (client) => { + const messageThreadId = ctx.message?.message_thread_id; + try { + logger(`[TELEGRAM] running receive`); + if (isDirectMessage(ctx)) + return ctx.reply(`/receive can only be run in a group chat`); - if (!ALLOWED_TICKET_MANAGERS.includes(ctx.from.username)) - return ctx.reply( - `Only Zupass team members are allowed to run this command.`, - { reply_to_message_id: messageThreadId } - ); + if (!ctx.from?.username) + return ctx.reply(`No username found`, { + reply_to_message_id: messageThreadId + }); - // Look up topic. - const topic = await fetchTelegramTopic( - client, - ctx.chat.id, - messageThreadId + if (!ALLOWED_TICKET_MANAGERS.includes(ctx.from.username)) + return ctx.reply( + `Only Zupass team members are allowed to run this command.`, + { reply_to_message_id: messageThreadId } ); - if (!topic) - throw new Error( - `No topic found. Edit the topic name and try again!` - ); + // Look up topic. + const topic = await fetchTelegramTopic( + client, + ctx.chat.id, + messageThreadId + ); - await deleteTelegramForward(client, topic.id, null); - logger( - `[TELEGRAM] ${topic.topic_name} can no longer receive messages` + if (!topic) + throw new Error( + `No topic found. Edit the topic name and try again!` ); - await ctx.reply( - `${topic.topic_name} can no longer receive messages`, - { - reply_to_message_id: messageThreadId, - parse_mode: "HTML" - } - ); - } catch (error) { - ctx.reply(`${error}`, { - reply_to_message_id: messageThreadId - }); - } + + await deleteTelegramForward(client, topic.id, null); + logger( + `[TELEGRAM] ${topic.topic_name} can no longer receive messages` + ); + await ctx.reply( + `${topic.topic_name} can no longer receive messages`, + { + reply_to_message_id: messageThreadId, + parse_mode: "HTML" + } + ); + } catch (error) { + ctx.reply(`${error}`, { + reply_to_message_id: messageThreadId + }); } - ); + }); }); this.forwardBot.command("forward", async (ctx) => { @@ -839,96 +809,88 @@ export class TelegramService { this.forwardBot.on(":forum_topic_edited", async (ctx) => { return traced("telegram", "forum_topic_edited", async (span) => { - await namedSqlTransaction( - ctx.session.dbPool, - "forum_topic_edited", - async (client) => { - const topicName = ctx.update?.message?.forum_topic_edited.name; - const chatId = ctx.chat.id; - const messageThreadId = ctx.update.message?.message_thread_id; - span?.setAttributes({ topicName, messageThreadId, chatId }); - - if (!chatId || !topicName) - throw new Error(`Missing chatId or topic name`); - - const topic = await fetchTelegramTopic( + await sqlQueryWithPool(ctx.session.dbPool, async (client) => { + const topicName = ctx.update?.message?.forum_topic_edited.name; + const chatId = ctx.chat.id; + const messageThreadId = ctx.update.message?.message_thread_id; + span?.setAttributes({ topicName, messageThreadId, chatId }); + + if (!chatId || !topicName) + throw new Error(`Missing chatId or topic name`); + + const topic = await fetchTelegramTopic( + client, + chatId, + messageThreadId + ); + + if (!topic) { + logger(`[TELEGRAM] adding topic ${topicName} to db`); + await insertTelegramChat(client, chatId); + await insertTelegramTopic( client, chatId, - messageThreadId + topicName, + messageThreadId, + false + ); + } else { + logger(`[TELEGRAM] updating topic ${topicName} in db`); + await insertTelegramTopic( + client, + topic.telegramChatID, + topicName, + topic.topic_id, + topic.is_anon_topic ); - - if (!topic) { - logger(`[TELEGRAM] adding topic ${topicName} to db`); - await insertTelegramChat(client, chatId); - await insertTelegramTopic( - client, - chatId, - topicName, - messageThreadId, - false - ); - } else { - logger(`[TELEGRAM] updating topic ${topicName} in db`); - await insertTelegramTopic( - client, - topic.telegramChatID, - topicName, - topic.topic_id, - topic.is_anon_topic - ); - } } - ); + }); }); }); this.forwardBot.on("message", async (ctx) => { - await namedSqlTransaction( - ctx.session.dbPool, - "message", - async (client) => { - const text = ctx.message.text; - - if (isDirectMessage(ctx)) { - return await ctx.reply( - `I can only forward messages in a group chat` + await sqlQueryWithPool(ctx.session.dbPool, async (client) => { + const text = ctx.message.text; + + if (isDirectMessage(ctx)) { + return await ctx.reply( + `I can only forward messages in a group chat` + ); + } else { + const messageThreadId = ctx.message?.message_thread_id; + + try { + // Check to see if message is from a topic in the forwarding table + const forwardResults = await fetchTelegramTopicForwarding( + client, + ctx.chat.id, + messageThreadId ); - } else { - const messageThreadId = ctx.message?.message_thread_id; - - try { - // Check to see if message is from a topic in the forwarding table - const forwardResults = await fetchTelegramTopicForwarding( - client, - ctx.chat.id, - messageThreadId - ); - if (forwardResults?.length > 0) { - const sentMessages = forwardResults.map((f) => { - const destinationTopicID = f.receiverTopicID - ? parseInt(f.receiverTopicID) - : undefined; - logger( - `[TElEGRAM] forwarding message ${text} to ${f.receiverTopicName}` - ); - return ctx.api.forwardMessage( - f.receiverChatID, - f.senderChatID, - ctx.message.message_id, - { - message_thread_id: destinationTopicID - } - ); - }); - await Promise.allSettled(sentMessages); - } - } catch (error) { - ctx.reply(`${error}`, { reply_to_message_id: messageThreadId }); + if (forwardResults?.length > 0) { + const sentMessages = forwardResults.map((f) => { + const destinationTopicID = f.receiverTopicID + ? parseInt(f.receiverTopicID) + : undefined; + logger( + `[TElEGRAM] forwarding message ${text} to ${f.receiverTopicName}` + ); + return ctx.api.forwardMessage( + f.receiverChatID, + f.senderChatID, + ctx.message.message_id, + { + message_thread_id: destinationTopicID + } + ); + }); + await Promise.allSettled(sentMessages); } + } catch (error) { + ctx.reply(`${error}`, { reply_to_message_id: messageThreadId }); } } - ); + }); }); } @@ -1467,140 +1429,137 @@ export class TelegramService { topicId: string ): Promise { return traced("telegram", "handleSendAnonymousMessage", async (span) => { - return namedSqlTransaction( - this.context.dbPool, - "handleSendAnonymousMessage", - async (client) => { - span?.setAttribute("topicId", topicId); - span?.setAttribute("message", rawMessage); + return sqlQueryWithPool(this.context.dbPool, async (client) => { + span?.setAttribute("topicId", topicId); + span?.setAttribute("message", rawMessage); - logger("[TELEGRAM] Verifying anonymous message"); + logger("[TELEGRAM] Verifying anonymous message"); - const pcd = await this.verifyZKEdDSAEventTicketPCD( - serializedZKEdDSATicket - ); + const pcd = await this.verifyZKEdDSAEventTicketPCD( + serializedZKEdDSATicket + ); - if (!pcd) { - throw new Error("Could not verify PCD for anonymous message"); - } + if (!pcd) { + throw new Error("Could not verify PCD for anonymous message"); + } - const { watermark, validEventIds, externalNullifier, nullifierHash } = - pcd.claim; + const { watermark, validEventIds, externalNullifier, nullifierHash } = + pcd.claim; - if (!validEventIds) { - throw new Error(`User did not submit any valid event ids`); - } - - const eventsByChat = await fetchTelegramEventsByChatId( - client, - telegramChatId - ); - if (eventsByChat.length === 0) - throw new Error(`No valid events found for given chat`); - if (!verifyUserEventIds(eventsByChat, validEventIds)) { - throw new Error(`User submitted event Ids are invalid `); - } + if (!validEventIds) { + throw new Error(`User did not submit any valid event ids`); + } - span?.setAttribute("chatId", telegramChatId); + const eventsByChat = await fetchTelegramEventsByChatId( + client, + telegramChatId + ); + if (eventsByChat.length === 0) + throw new Error(`No valid events found for given chat`); + if (!verifyUserEventIds(eventsByChat, validEventIds)) { + throw new Error(`User submitted event Ids are invalid `); + } - if (!watermark) { - throw new Error("Anonymous message PCD did not contain watermark"); - } + span?.setAttribute("chatId", telegramChatId); - if ( - getMessageWatermark(rawMessage).toString() !== watermark.toString() - ) { - throw new Error( - `Anonymous message string ${rawMessage} didn't match watermark. got ${watermark} and expected ${getMessageWatermark( - rawMessage - ).toString()}` - ); - } + if (!watermark) { + throw new Error("Anonymous message PCD did not contain watermark"); + } - logger( - `[TELEGRAM] Verified PCD for anonynmous message with events ${validEventIds}` + if ( + getMessageWatermark(rawMessage).toString() !== watermark.toString() + ) { + throw new Error( + `Anonymous message string ${rawMessage} didn't match watermark. got ${watermark} and expected ${getMessageWatermark( + rawMessage + ).toString()}` ); + } - const topic = await fetchTelegramTopic( - client, - parseInt(telegramChatId), - parseInt(topicId) - ); + logger( + `[TELEGRAM] Verified PCD for anonynmous message with events ${validEventIds}` + ); - if (!topic || !topic.is_anon_topic || !topic.topic_id) { - throw new Error(`this group doesn't have any anon topics`); - } + const topic = await fetchTelegramTopic( + client, + parseInt(telegramChatId), + parseInt(topicId) + ); - // The event is linked to a chat. Make sure we can access it. - const chat = await getGroupChat(this.anonBot.api, telegramChatId); - span?.setAttribute("chatTitle", chat.title); + if (!topic || !topic.is_anon_topic || !topic.topic_id) { + throw new Error(`this group doesn't have any anon topics`); + } - if (!nullifierHash) throw new Error(`Nullifier hash not found`); + // The event is linked to a chat. Make sure we can access it. + const chat = await getGroupChat(this.anonBot.api, telegramChatId); + span?.setAttribute("chatTitle", chat.title); - const expectedExternalNullifier = getAnonTopicNullifier().toString(); + if (!nullifierHash) throw new Error(`Nullifier hash not found`); + + const expectedExternalNullifier = getAnonTopicNullifier().toString(); + + if (externalNullifier !== expectedExternalNullifier) + throw new Error("Nullifier mismatch - try proving again."); + + const nullifierData = await fetchAnonTopicNullifier( + client, + nullifierHash, + topic.id + ); - if (externalNullifier !== expectedExternalNullifier) - throw new Error("Nullifier mismatch - try proving again."); + const currentTime = new Date(); + const timestamp = currentTime.toISOString(); - const nullifierData = await fetchAnonTopicNullifier( + if (!nullifierData) { + await insertOrUpdateTelegramNullifier( client, nullifierHash, + [timestamp], topic.id ); + } else { + const timestamps = nullifierData.message_timestamps.map((t) => + new Date(t).getTime() + ); + const maxDailyPostsPerTopic = parseInt( + process.env.MAX_DAILY_ANON_TOPIC_POSTS_PER_USER ?? "3" + ); + const rlDuration = ONE_HOUR_MS * 24; + const { rateLimitExceeded, newTimestamps } = + checkSlidingWindowRateLimit( + timestamps, + maxDailyPostsPerTopic, + rlDuration + ); + span?.setAttribute("rateLimitExceeded", rateLimitExceeded); - const currentTime = new Date(); - const timestamp = currentTime.toISOString(); - - if (!nullifierData) { + if (!rateLimitExceeded) { await insertOrUpdateTelegramNullifier( client, nullifierHash, - [timestamp], + newTimestamps, topic.id ); } else { - const timestamps = nullifierData.message_timestamps.map((t) => - new Date(t).getTime() - ); - const maxDailyPostsPerTopic = parseInt( - process.env.MAX_DAILY_ANON_TOPIC_POSTS_PER_USER ?? "3" + const rlError = new Error( + `You have exceeded the daily limit of ${maxDailyPostsPerTopic} messages for this topic.` ); - const rlDuration = ONE_HOUR_MS * 24; - const { rateLimitExceeded, newTimestamps } = - checkSlidingWindowRateLimit( - timestamps, - maxDailyPostsPerTopic, - rlDuration - ); - span?.setAttribute("rateLimitExceeded", rateLimitExceeded); - - if (!rateLimitExceeded) { - await insertOrUpdateTelegramNullifier( - client, - nullifierHash, - newTimestamps, - topic.id - ); - } else { - const rlError = new Error( - `You have exceeded the daily limit of ${maxDailyPostsPerTopic} messages for this topic.` - ); - rlError.name = "Rate limit exceeded"; - throw rlError; - } + rlError.name = "Rate limit exceeded"; + throw rlError; } + } - const payloadData: NullifierHashPayload = { - type: PayloadType.NullifierHash, - value: BigInt(nullifierHash).toString() - }; + const payloadData: NullifierHashPayload = { + type: PayloadType.NullifierHash, + value: BigInt(nullifierHash).toString() + }; - const encodedPayload = Buffer.from( - JSON.stringify(payloadData), - "utf-8" - ).toString("base64"); + const encodedPayload = Buffer.from( + JSON.stringify(payloadData), + "utf-8" + ).toString("base64"); - const formattedMessage = ` + const formattedMessage = ` ${bigIntToPseudonymEmoji(BigInt(nullifierHash))} ${bigIntToPseudonymName( @@ -1609,46 +1568,45 @@ export class TelegramService { "en-GB" )}\n----------------------------------------------------------`; - const anonMessageId = uuidV1(); - - const payloads = getDisplayEmojis().map((emoji) => { - return { - emoji, - payload: encodePayload(buildReactPayload(emoji, anonMessageId)) - }; - }); - const link = process.env.TELEGRAM_ANON_BOT_DIRECT_LINK; - const buttons: InlineKeyboardButton[] = payloads.map((p) => { - return { - text: `${p.emoji}`, - url: `${link}?startApp=${p.payload}&startapp=${p.payload}` - }; - }); + const anonMessageId = uuidV1(); - const replyMarkup: InlineKeyboardMarkup = { - inline_keyboard: [buttons] + const payloads = getDisplayEmojis().map((emoji) => { + return { + emoji, + payload: encodePayload(buildReactPayload(emoji, anonMessageId)) }; - const message = await this.sendToAnonymousChannel( - client, - chat.id, - parseInt(topic.topic_id), - formattedMessage, - replyMarkup - ); - if (!message) throw new Error(`Failed to send telegram message`); + }); + const link = process.env.TELEGRAM_ANON_BOT_DIRECT_LINK; + const buttons: InlineKeyboardButton[] = payloads.map((p) => { + return { + text: `${p.emoji}`, + url: `${link}?startApp=${p.payload}&startapp=${p.payload}` + }; + }); - await insertTelegramAnonMessage( - client, - anonMessageId, - nullifierHash, - topic.id, - rawMessage, - serializedZKEdDSATicket, - timestamp, - message.message_id - ); - } - ); + const replyMarkup: InlineKeyboardMarkup = { + inline_keyboard: [buttons] + }; + const message = await this.sendToAnonymousChannel( + client, + chat.id, + parseInt(topic.topic_id), + formattedMessage, + replyMarkup + ); + if (!message) throw new Error(`Failed to send telegram message`); + + await insertTelegramAnonMessage( + client, + anonMessageId, + nullifierHash, + topic.id, + rawMessage, + serializedZKEdDSATicket, + timestamp, + message.message_id + ); + }); }); } @@ -1660,49 +1618,45 @@ export class TelegramService { "telegram", "handleRequestAnonymousMessageLink", async (span) => { - return namedSqlTransaction( - this.context.dbPool, - "handleRequestAnonymousMessageLink", - async (client) => { - // Confirm that topicId exists and is anonymous - const topic = await fetchTelegramTopic( - client, - telegramChatId, - topicId - ); + return sqlQueryWithPool(this.context.dbPool, async (client) => { + // Confirm that topicId exists and is anonymous + const topic = await fetchTelegramTopic( + client, + telegramChatId, + topicId + ); - if (!topic || !topic.is_anon_topic || !topic.topic_id) - throw new Error(`No anonymous topic found`); + if (!topic || !topic.is_anon_topic || !topic.topic_id) + throw new Error(`No anonymous topic found`); - // Get valid eventIds for this chat - const telegramEvents = await fetchTelegramEventsByChatId( - client, - telegramChatId - ); - if (telegramEvents.length === 0) - throw new Error(`No events associated with this group`); - - const validEventIds = telegramEvents.map((e) => e.ticket_event_id); - - const encodedTopicData = encodeTopicData({ - type: PayloadType.AnonTopicDataPayload, - value: { - chatId: telegramChatId, - topicName: topic.topic_name, - topicId: parseInt(topic.topic_id), - validEventIds - } - }); + // Get valid eventIds for this chat + const telegramEvents = await fetchTelegramEventsByChatId( + client, + telegramChatId + ); + if (telegramEvents.length === 0) + throw new Error(`No events associated with this group`); + + const validEventIds = telegramEvents.map((e) => e.ticket_event_id); + + const encodedTopicData = encodeTopicData({ + type: PayloadType.AnonTopicDataPayload, + value: { + chatId: telegramChatId, + topicName: topic.topic_name, + topicId: parseInt(topic.topic_id), + validEventIds + } + }); - const url = `${process.env.TELEGRAM_ANON_WEBSITE}?tgWebAppStartParam=${encodedTopicData}`; - span?.setAttribute(`redirect url`, url); - logger( - `[TELEGRAM] generated redirect url to ${process.env.TELEGRAM_ANON_WEBSITE}` - ); + const url = `${process.env.TELEGRAM_ANON_WEBSITE}?tgWebAppStartParam=${encodedTopicData}`; + span?.setAttribute(`redirect url`, url); + logger( + `[TELEGRAM] generated redirect url to ${process.env.TELEGRAM_ANON_WEBSITE}` + ); - return url; - } - ); + return url; + }); } ); } diff --git a/apps/passport-server/src/util/telegramHelpers.ts b/apps/passport-server/src/util/telegramHelpers.ts index 2624e42fea..61fb64fc3b 100644 --- a/apps/passport-server/src/util/telegramHelpers.ts +++ b/apps/passport-server/src/util/telegramHelpers.ts @@ -52,7 +52,7 @@ import { insertTelegramEvent, insertTelegramForward } from "../database/queries/telegram/insertTelegramConversation"; -import { namedSqlTransaction, sqlTransaction } from "../database/sqlQuery"; +import { sqlQueryWithPool } from "../database/sqlQuery"; import { traced } from "../services/telemetryService"; import { generateFrogProofUrl } from "./frogTelegramHelpers"; import { logger } from "./logger"; @@ -622,7 +622,7 @@ export const eventsToLink = async ( .text( `Yes, ${event.isLinkedToCurrentChat ? "remove" : "add"}`, async (ctx) => { - await sqlTransaction(ctx.session.dbPool, async (client) => { + await sqlQueryWithPool(ctx.session.dbPool, async (client) => { let replyText = ""; if (!(await senderIsAdmin(ctx))) return; @@ -645,7 +645,7 @@ export const eventsToLink = async ( } // Otherwise, display all events to add or remove. else { - const events = await sqlTransaction(ctx.session.dbPool, (client) => + const events = await sqlQueryWithPool(ctx.session.dbPool, (client) => fetchEventsWithTelegramChats(client, true, chatId) ); @@ -688,7 +688,7 @@ export const chatsToJoin = async ( } span?.setAttribute("userId", userId?.toString()); - const chatsWithMembership = await sqlTransaction( + const chatsWithMembership = await sqlQueryWithPool( ctx.session.dbPool, (client) => getChatsWithMembershipStatus(client, ctx, userId) ); @@ -771,7 +771,7 @@ export const chatsToJoinV2 = async ( }); } } else { - const chatsWithMembership = await sqlTransaction( + const chatsWithMembership = await sqlQueryWithPool( ctx.session.dbPool, (client) => getChatsWithMembershipStatus(client, ctx, userId) ); @@ -834,7 +834,7 @@ export const chatsToPostIn = async ( const chat = ctx.session.selectedChat; // Fetch anon topics for the selected chat - const { topics, telegramEvents } = await sqlTransaction( + const { topics, telegramEvents } = await sqlQueryWithPool( ctx.session.dbPool, async (client) => { const topics = await fetchTelegramAnonTopicsByChatId( @@ -896,7 +896,7 @@ export const chatsToPostIn = async ( // Otherwise, give the user a list of chats that they are members of. else { // Only show chats a user is in - const chatsWithMembership = await sqlTransaction( + const chatsWithMembership = await sqlQueryWithPool( ctx.session.dbPool, async (client) => (await getChatsWithMembershipStatus(client, ctx, userId)).filter( @@ -971,7 +971,7 @@ export const chatsToForwardTo = async ( return; } - await namedSqlTransaction(db, "chatsToForwardTo", async (client) => { + await sqlQueryWithPool(db, async (client) => { const userId = ctx.from?.id; if (!userId) { range.text(`User not found. Try again...`); diff --git a/apps/passport-server/test/semaphore/checkSemaphore.ts b/apps/passport-server/test/semaphore/checkSemaphore.ts index a81e6778d6..75daa50277 100644 --- a/apps/passport-server/test/semaphore/checkSemaphore.ts +++ b/apps/passport-server/test/semaphore/checkSemaphore.ts @@ -2,7 +2,7 @@ import { deserializeSemaphoreGroup } from "@pcd/semaphore-group-pcd"; import { BigNumberish, Group } from "@semaphore-protocol/group"; import { expect } from "chai"; import { fetchLatestHistoricSemaphoreGroups } from "../../src/database/queries/historicSemaphore"; -import { sqlTransaction } from "../../src/database/sqlQuery"; +import { sqlQueryWithPool } from "../../src/database/sqlQuery"; import { Zupass } from "../../src/types"; export interface SemaphoreGroups { @@ -97,7 +97,7 @@ function getCurrentSemaphoreServiceGroups( async function getLatestHistoricalSemaphoreGroups( application: Zupass ): Promise { - const latestGroups = await sqlTransaction( + const latestGroups = await sqlQueryWithPool( application.context.dbPool, (client) => fetchLatestHistoricSemaphoreGroups(client) ); diff --git a/apps/passport-server/test/user/testLogin.ts b/apps/passport-server/test/user/testLogin.ts index f8e35c500c..0ae1c4b456 100644 --- a/apps/passport-server/test/user/testLogin.ts +++ b/apps/passport-server/test/user/testLogin.ts @@ -15,7 +15,7 @@ import { randomUUID } from "@pcd/util"; import { Identity } from "@semaphore-protocol/identity"; import { expect } from "chai"; import { randomBytes } from "crypto"; -import { sqlTransaction } from "../../src/database/sqlQuery"; +import { sqlQueryWithPool } from "../../src/database/sqlQuery"; import { Zupass } from "../../src/types"; export async function testLogin( @@ -75,7 +75,7 @@ export async function testLogin( } token = confirmationEmailResult.value.devToken; } else { - const serverToken = await sqlTransaction( + const serverToken = await sqlQueryWithPool( application.context.dbPool, (client) => emailTokenService.getTokenForEmail(client, email) ); diff --git a/apps/passport-server/test/util/rateLimit.ts b/apps/passport-server/test/util/rateLimit.ts index 322f35b179..28d05df738 100644 --- a/apps/passport-server/test/util/rateLimit.ts +++ b/apps/passport-server/test/util/rateLimit.ts @@ -1,11 +1,11 @@ import { Pool } from "postgres-pool"; -import { sqlQuery, sqlTransaction } from "../../src/database/sqlQuery"; +import { sqlQuery, sqlQueryWithPool } from "../../src/database/sqlQuery"; /** * Deletes the rate limit buckets, resetting all rate limits. */ export async function resetRateLimitBuckets(pool: Pool): Promise { - await sqlTransaction(pool, (client) => + await sqlQueryWithPool(pool, (client) => sqlQuery(client, "DELETE FROM rate_limit_buckets") ); }