Skip to content

Commit

Permalink
Get server information and round-robin calls to servers when exporting
Browse files Browse the repository at this point in the history
Close #8129
  • Loading branch information
wrdhub committed Dec 19, 2024
1 parent 41933f8 commit 4d40737
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 53 deletions.
2 changes: 2 additions & 0 deletions packages/tuta-wasm-loader/lib/WasmHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async function generateWasm(command: string, options: WasmGeneratorOptions) {
...process.env,
...options.env,
},
maxBuffer: Infinity,
cwd: options.workingDir ?? process.cwd(),
})
promise.child.stdout?.on("data", (data) => {
Expand All @@ -37,6 +38,7 @@ async function generateWasmFallback(wasmFilePath: string, options: WasmGenerator
...process.env,
...options.env,
},
maxBuffer: Infinity,
})
return result.stdout
}
Expand Down
24 changes: 18 additions & 6 deletions src/common/api/worker/facades/lazy/MailExportFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import { BlobFacade } from "./BlobFacade.js"
import { CryptoFacade } from "../../crypto/CryptoFacade.js"
import { createReferencingInstance } from "../../../common/utils/BlobUtils.js"
import { MailExportTokenFacade } from "./MailExportTokenFacade.js"
import { BlobAccessTokenFacade } from "../BlobAccessTokenFacade"
import { BlobServerUrl } from "../../../entities/storage/TypeRefs"
import { Group } from "../../../entities/sys/TypeRefs"

assertWorkerOrNode()

Expand All @@ -28,31 +31,40 @@ export class MailExportFacade {
private readonly bulkMailLoader: BulkMailLoader,
private readonly blobFacade: BlobFacade,
private readonly cryptoFacade: CryptoFacade,
private readonly blobAccessTokenFacade: BlobAccessTokenFacade,
) {}

async loadFixedNumberOfMailsWithCache(mailListId: Id, startId: Id): Promise<Mail[]> {
/**
* Returns a list of servers that can be used to request data from.
*/
async getExportServers(group: Group): Promise<BlobServerUrl[]> {
const blobServerAccessInfo = await this.blobAccessTokenFacade.requestWriteToken(ArchiveDataType.Attachments, group._id)
return blobServerAccessInfo.servers
}

async loadFixedNumberOfMailsWithCache(mailListId: Id, startId: Id, baseUrl: string): Promise<Mail[]> {
return this.mailExportTokenFacade.loadWithToken((token) =>
this.bulkMailLoader.loadFixedNumberOfMailsWithCache(mailListId, startId, this.options(token)),
this.bulkMailLoader.loadFixedNumberOfMailsWithCache(mailListId, startId, { baseUrl, ...this.options(token) }),
)
}

async loadMailDetails(mails: readonly Mail[]): Promise<MailWithMailDetails[]> {
return this.mailExportTokenFacade.loadWithToken((token) => this.bulkMailLoader.loadMailDetails(mails, this.options(token)))
}

async loadAttachments(mails: readonly Mail[]): Promise<TutanotaFile[]> {
return this.mailExportTokenFacade.loadWithToken((token) => this.bulkMailLoader.loadAttachments(mails, this.options(token)))
async loadAttachments(mails: readonly Mail[], baseUrl: string): Promise<TutanotaFile[]> {
return this.mailExportTokenFacade.loadWithToken((token) => this.bulkMailLoader.loadAttachments(mails, { baseUrl, ...this.options(token) }))
}

async loadAttachmentData(mail: Mail, attachments: readonly TutanotaFile[]): Promise<DataFile[]> {
async loadAttachmentData(mail: Mail, attachments: readonly TutanotaFile[], baseUrl: string): Promise<DataFile[]> {
const attachmentsWithKeys = await this.cryptoFacade.enforceSessionKeyUpdateIfNeeded(mail, attachments)
// TODO: download attachments efficiently.
// - download multiple blobs at once if possible
// - use file references instead of data files (introduce a similar type to MailBundle or change MailBundle)
const attachmentData = await promiseMap(attachmentsWithKeys, async (attachment) => {
try {
const bytes = await this.mailExportTokenFacade.loadWithToken((token) =>
this.blobFacade.downloadAndDecrypt(ArchiveDataType.Attachments, createReferencingInstance(attachment), this.options(token)),
this.blobFacade.downloadAndDecrypt(ArchiveDataType.Attachments, createReferencingInstance(attachment), { baseUrl, ...this.options(token) }),
)
return convertToDataFile(attachment, bytes)
} catch (e) {
Expand Down
4 changes: 4 additions & 0 deletions src/common/api/worker/rest/EntityRestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ export interface EntityRestClientLoadOptions {
ownerKeyProvider?: OwnerKeyProvider
/** Defaults to {@link CacheMode.ReadAndWrite }*/
cacheMode?: CacheMode
baseUrl?: string
}

export interface OwnerEncSessionKeyProvider {
Expand Down Expand Up @@ -200,6 +201,7 @@ export class EntityRestClient implements EntityRestInterface {
queryParams,
headers,
responseType: MediaType.Json,
baseUrl: opts.baseUrl,
})
const entity = JSON.parse(json)
const migratedEntity = await this._crypto.applyMigrations(typeRef, entity)
Expand Down Expand Up @@ -254,6 +256,7 @@ export class EntityRestClient implements EntityRestInterface {
queryParams,
headers,
responseType: MediaType.Json,
baseUrl: opts.baseUrl,
})
return this._handleLoadMultipleResult(typeRef, JSON.parse(json))
}
Expand Down Expand Up @@ -281,6 +284,7 @@ export class EntityRestClient implements EntityRestInterface {
queryParams,
headers,
responseType: MediaType.Json,
baseUrl: opts.baseUrl,
})
}
return this._handleLoadMultipleResult(typeRef, JSON.parse(json), ownerEncSessionKeyProvider)
Expand Down
22 changes: 18 additions & 4 deletions src/mail-app/native/main/MailExportController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { TranslationText } from "../../../common/misc/LanguageViewModel"
import { SuspensionError } from "../../../common/api/common/error/SuspensionError"
import { Scheduler } from "../../../common/api/common/utils/Scheduler"
import { ExportError, ExportErrorReason } from "../../../common/api/common/error/ExportError"
import { BlobServerUrl } from "../../../common/api/entities/storage/TypeRefs"

export type MailExportState =
| { type: "idle" }
Expand All @@ -34,6 +35,8 @@ const TAG = "MailboxExport"
export class MailExportController {
private _state: Stream<MailExportState> = stream({ type: "idle" })
private _lastExport: Date | null = null
private servers?: BlobServerUrl[]
private serverCount: number = 0

get lastExport(): Date | null {
return this._lastExport
Expand Down Expand Up @@ -142,6 +145,7 @@ export class MailExportController {

private async runExport(mailboxDetail: MailboxDetail, mailBags: MailBag[], mailId: Id) {
const startTime = assertNotNull(this._lastExport)
this.servers = await this.mailExportFacade.getExportServers(mailboxDetail.mailGroup)
for (const mailBag of mailBags) {
await this.exportMailBag(mailBag, mailId)
if (this._state().type !== "exporting" || this._lastExport !== startTime) {
Expand All @@ -157,25 +161,24 @@ export class MailExportController {
}

private async exportMailBag(mailBag: MailBag, startId: Id): Promise<void> {
console.log(TAG, `Exporting mail bag: ${mailBag._id} ${startId}`)
let currentStartId = startId
while (true) {
try {
const downloadedMails = await this.mailExportFacade.loadFixedNumberOfMailsWithCache(mailBag.mails, currentStartId)
const downloadedMails = await this.mailExportFacade.loadFixedNumberOfMailsWithCache(mailBag.mails, currentStartId, this.getServerUrl())
if (downloadedMails.length === 0) {
break
}

const downloadedMailDetails = await this.mailExportFacade.loadMailDetails(downloadedMails)
const attachmentInfo = await this.mailExportFacade.loadAttachments(downloadedMails)
const attachmentInfo = await this.mailExportFacade.loadAttachments(downloadedMails, this.getServerUrl())
for (const { mail, mailDetails } of downloadedMailDetails) {
if (this._state().type !== "exporting") {
return
}
const mailAttachmentInfo = mail.attachments
.map((attachmentId) => attachmentInfo.find((attachment) => isSameId(attachment._id, attachmentId)))
.filter(isNotNull)
const attachments = await this.mailExportFacade.loadAttachmentData(mail, mailAttachmentInfo)
const attachments = await this.mailExportFacade.loadAttachmentData(mail, mailAttachmentInfo, this.getServerUrl())
const { makeMailBundle } = await import("../../mail/export/Bundler.js")
const mailBundle = makeMailBundle(this.sanitizer, mail, mailDetails, attachments)

Expand Down Expand Up @@ -219,4 +222,15 @@ export class MailExportController {
}
}
}

private getServerUrl(): string {
if (this.servers) {
this.serverCount += 1
if (this.serverCount >= this.servers.length) {
this.serverCount = 0
}
return this.servers[this.serverCount].url
}
throw new Error("No servers")
}
}
2 changes: 1 addition & 1 deletion src/mail-app/workerUtils/worker/WorkerLocator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ export async function initLocator(worker: WorkerImpl, browserData: BrowserData)
const { MailExportFacade } = await import("../../../common/api/worker/facades/lazy/MailExportFacade.js")
const { MailExportTokenFacade } = await import("../../../common/api/worker/facades/lazy/MailExportTokenFacade.js")
const mailExportTokenFacade = new MailExportTokenFacade(locator.serviceExecutor)
return new MailExportFacade(mailExportTokenFacade, await locator.bulkMailLoader(), await locator.blob(), locator.crypto)
return new MailExportFacade(mailExportTokenFacade, await locator.bulkMailLoader(), await locator.blob(), locator.crypto, locator.blobAccessToken)
})
}

Expand Down
30 changes: 21 additions & 9 deletions test/tests/api/worker/facades/MailExportFacadeTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { MailExportTokenFacade } from "../../../../../src/common/api/worker/faca
import { BulkMailLoader } from "../../../../../src/mail-app/workerUtils/index/BulkMailLoader.js"
import { BlobFacade } from "../../../../../src/common/api/worker/facades/lazy/BlobFacade.js"
import { CryptoFacade } from "../../../../../src/common/api/worker/crypto/CryptoFacade.js"
import { object, when } from "testdouble"
import { instance, object, when } from "testdouble"
import { createTestEntity } from "../../../TestUtils.js"
import { FileTypeRef, MailDetailsTypeRef, MailTypeRef } from "../../../../../src/common/api/entities/tutanota/TypeRefs.js"
import { ArchiveDataType } from "../../../../../src/common/api/common/TutanotaConstants"
import { createReferencingInstance } from "../../../../../src/common/api/common/utils/BlobUtils"
import { BlobAccessTokenFacade } from "../../../../../src/common/api/worker/facades/BlobAccessTokenFacade"

o.spec("MailExportFacade", () => {
const token = "my token"
Expand All @@ -23,6 +24,7 @@ o.spec("MailExportFacade", () => {
let bulkMailLoader!: BulkMailLoader
let blobFacade!: BlobFacade
let cryptoFacade!: CryptoFacade
let blobAccessTokenFacade!: BlobAccessTokenFacade

o.beforeEach(() => {
tokenFacade = {
Expand All @@ -31,13 +33,17 @@ o.spec("MailExportFacade", () => {
bulkMailLoader = object()
blobFacade = object()
cryptoFacade = object()
facade = new MailExportFacade(tokenFacade, bulkMailLoader, blobFacade, cryptoFacade)
blobAccessTokenFacade = instance(BlobAccessTokenFacade)
facade = new MailExportFacade(tokenFacade, bulkMailLoader, blobFacade, cryptoFacade, blobAccessTokenFacade)
})

o.test("loadFixedNumberOfMailsWithCache", async () => {
when(bulkMailLoader.loadFixedNumberOfMailsWithCache("mailListId", "startId", { extraHeaders: tokenHeaders })).thenResolve([mail1, mail2])
when(bulkMailLoader.loadFixedNumberOfMailsWithCache("mailListId", "startId", { baseUrl: "baseUrl", extraHeaders: tokenHeaders })).thenResolve([
mail1,
mail2,
])

const result = await facade.loadFixedNumberOfMailsWithCache("mailListId", "startId")
const result = await facade.loadFixedNumberOfMailsWithCache("mailListId", "startId", "baseUrl")

o(result).deepEquals([mail1, mail2])
})
Expand All @@ -56,9 +62,9 @@ o.spec("MailExportFacade", () => {

o.test("loadAttachments", async () => {
const expected = [createTestEntity(FileTypeRef), createTestEntity(FileTypeRef)]
when(bulkMailLoader.loadAttachments([mail1, mail2], { extraHeaders: tokenHeaders })).thenResolve(expected)
when(bulkMailLoader.loadAttachments([mail1, mail2], { baseUrl: "baseUrl", extraHeaders: tokenHeaders })).thenResolve(expected)

const result = await facade.loadAttachments([mail1, mail2])
const result = await facade.loadAttachments([mail1, mail2], "baseUrl")

o(result).deepEquals(expected)
})
Expand All @@ -73,13 +79,19 @@ o.spec("MailExportFacade", () => {

when(cryptoFacade.enforceSessionKeyUpdateIfNeeded(mail1, mailAttachments)).thenResolve(mailAttachments)
when(
blobFacade.downloadAndDecrypt(ArchiveDataType.Attachments, createReferencingInstance(mailAttachments[0]), { extraHeaders: tokenHeaders }),
blobFacade.downloadAndDecrypt(ArchiveDataType.Attachments, createReferencingInstance(mailAttachments[0]), {
baseUrl: "baseUrl",
extraHeaders: tokenHeaders,
}),
).thenResolve(dataByteMail1)
when(
blobFacade.downloadAndDecrypt(ArchiveDataType.Attachments, createReferencingInstance(mailAttachments[1]), { extraHeaders: tokenHeaders }),
blobFacade.downloadAndDecrypt(ArchiveDataType.Attachments, createReferencingInstance(mailAttachments[1]), {
baseUrl: "baseUrl",
extraHeaders: tokenHeaders,
}),
).thenResolve(dataByteMail2)

const result = await facade.loadAttachmentData(mail1, mailAttachments)
const result = await facade.loadAttachmentData(mail1, mailAttachments, "baseUrl")

o(result).deepEquals([
{ _type: "DataFile", name: "mail1", mimeType: "img/png", data: dataByteMail1, cid: "12345", size: 3, id: ["attachment", "id1"] },
Expand Down
Loading

0 comments on commit 4d40737

Please sign in to comment.