Skip to content

Commit

Permalink
Get server information and round-robin calls to servers when exporting
Browse files Browse the repository at this point in the history
Close #8129
  • Loading branch information
wrdhub committed Dec 20, 2024
1 parent 153329f commit 46c805b
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 52 deletions.
2 changes: 2 additions & 0 deletions packages/tuta-wasm-loader/lib/WasmHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async function generateWasm(command: string, options: WasmGeneratorOptions) {
...process.env,
...options.env,
},
maxBuffer: Infinity,
cwd: options.workingDir ?? process.cwd(),
})
promise.child.stdout?.on("data", (data) => {
Expand All @@ -37,6 +38,7 @@ async function generateWasmFallback(wasmFilePath: string, options: WasmGenerator
...process.env,
...options.env,
},
maxBuffer: Infinity,
})
return result.stdout
}
Expand Down
27 changes: 21 additions & 6 deletions src/common/api/worker/facades/lazy/MailExportFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { MailExportTokenFacade } from "./MailExportTokenFacade.js"
import { assertNotNull, isNotNull } from "@tutao/tutanota-utils"
import { NotFoundError } from "../../../common/error/RestError"
import { elementIdPart } from "../../../common/utils/EntityUtils"
import { BlobAccessTokenFacade } from "../BlobAccessTokenFacade"
import { BlobServerUrl } from "../../../entities/storage/TypeRefs"
import { Group } from "../../../entities/sys/TypeRefs"

assertWorkerOrNode()

Expand All @@ -29,28 +32,40 @@ export class MailExportFacade {
private readonly bulkMailLoader: BulkMailLoader,
private readonly blobFacade: BlobFacade,
private readonly cryptoFacade: CryptoFacade,
private readonly blobAccessTokenFacade: BlobAccessTokenFacade,
) {}

async loadFixedNumberOfMailsWithCache(mailListId: Id, startId: Id): Promise<Mail[]> {
/**
* Returns a list of servers that can be used to request data from.
*/
async getExportServers(group: Group): Promise<BlobServerUrl[]> {
const blobServerAccessInfo = await this.blobAccessTokenFacade.requestWriteToken(ArchiveDataType.Attachments, group._id)
return blobServerAccessInfo.servers
}

async loadFixedNumberOfMailsWithCache(mailListId: Id, startId: Id, baseUrl: string): Promise<Mail[]> {
return this.mailExportTokenFacade.loadWithToken((token) =>
this.bulkMailLoader.loadFixedNumberOfMailsWithCache(mailListId, startId, this.options(token)),
this.bulkMailLoader.loadFixedNumberOfMailsWithCache(mailListId, startId, { baseUrl, ...this.options(token) }),
)
}

async loadMailDetails(mails: readonly Mail[]): Promise<MailWithMailDetails[]> {
return this.mailExportTokenFacade.loadWithToken((token) => this.bulkMailLoader.loadMailDetails(mails, this.options(token)))
}

async loadAttachments(mails: readonly Mail[]): Promise<TutanotaFile[]> {
return this.mailExportTokenFacade.loadWithToken((token) => this.bulkMailLoader.loadAttachments(mails, this.options(token)))
async loadAttachments(mails: readonly Mail[], baseUrl: string): Promise<TutanotaFile[]> {
return this.mailExportTokenFacade.loadWithToken((token) => this.bulkMailLoader.loadAttachments(mails, { baseUrl, ...this.options(token) }))
}

async loadAttachmentData(mail: Mail, attachments: readonly TutanotaFile[]): Promise<DataFile[]> {
async loadAttachmentData(mail: Mail, attachments: readonly TutanotaFile[], baseUrl: string): Promise<DataFile[]> {
const attachmentsWithKeys = await this.cryptoFacade.enforceSessionKeyUpdateIfNeeded(mail, attachments)

const downloads = await this.mailExportTokenFacade.loadWithToken((token) => {
const referencingInstances = attachmentsWithKeys.map(createReferencingInstance)
return this.blobFacade.downloadAndDecryptBlobsOfMultipleInstances(ArchiveDataType.Attachments, referencingInstances, this.options(token))
return this.blobFacade.downloadAndDecryptBlobsOfMultipleInstances(ArchiveDataType.Attachments, referencingInstances, {
baseUrl,
...this.options(token),
})
})

const attachmentData = Array.from(downloads.entries()).map(([fileId, bytes]) => {
Expand Down
4 changes: 4 additions & 0 deletions src/common/api/worker/rest/EntityRestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export interface EntityRestClientLoadOptions {
ownerKeyProvider?: OwnerKeyProvider
/** Defaults to {@link CacheMode.ReadAndWrite }*/
cacheMode?: CacheMode
baseUrl?: string
}

export interface OwnerEncSessionKeyProvider {
Expand Down Expand Up @@ -204,6 +205,7 @@ export class EntityRestClient implements EntityRestInterface {
queryParams,
headers,
responseType: MediaType.Json,
baseUrl: opts.baseUrl,
})
const entity = JSON.parse(json)
const migratedEntity = await this._crypto.applyMigrations(typeRef, entity)
Expand Down Expand Up @@ -258,6 +260,7 @@ export class EntityRestClient implements EntityRestInterface {
queryParams,
headers,
responseType: MediaType.Json,
baseUrl: opts.baseUrl,
})
return this._handleLoadMultipleResult(typeRef, JSON.parse(json))
}
Expand Down Expand Up @@ -285,6 +288,7 @@ export class EntityRestClient implements EntityRestInterface {
queryParams,
headers,
responseType: MediaType.Json,
baseUrl: opts.baseUrl,
})
}
return this._handleLoadMultipleResult(typeRef, JSON.parse(json), ownerEncSessionKeyProvider)
Expand Down
23 changes: 18 additions & 5 deletions src/mail-app/native/main/MailExportController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { TranslationText } from "../../../common/misc/LanguageViewModel"
import { SuspensionError } from "../../../common/api/common/error/SuspensionError"
import { Scheduler } from "../../../common/api/common/utils/Scheduler"
import { ExportError, ExportErrorReason } from "../../../common/api/common/error/ExportError"
import { BlobServerUrl } from "../../../common/api/entities/storage/TypeRefs"
import { assertMainOrNode } from "../../../common/api/common/Env"

assertMainOrNode()
Expand All @@ -37,6 +38,8 @@ const TAG = "MailboxExport"
export class MailExportController {
private _state: Stream<MailExportState> = stream({ type: "idle" })
private _lastExport: Date | null = null
private servers?: BlobServerUrl[]
private serverCount: number = 0

get lastExport(): Date | null {
return this._lastExport
Expand Down Expand Up @@ -145,6 +148,7 @@ export class MailExportController {

private async runExport(mailboxDetail: MailboxDetail, mailBags: MailBag[], mailId: Id) {
const startTime = assertNotNull(this._lastExport)
this.servers = await this.mailExportFacade.getExportServers(mailboxDetail.mailGroup)
for (const mailBag of mailBags) {
await this.exportMailBag(mailBag, mailId)
if (this._state().type !== "exporting" || this._lastExport !== startTime) {
Expand All @@ -160,26 +164,24 @@ export class MailExportController {
}

private async exportMailBag(mailBag: MailBag, startId: Id): Promise<void> {
console.log(TAG, `Exporting mail bag: ${mailBag._id} ${startId}`)
let currentStartId = startId
while (true) {
try {
const downloadedMails = await this.mailExportFacade.loadFixedNumberOfMailsWithCache(mailBag.mails, currentStartId)
const downloadedMails = await this.mailExportFacade.loadFixedNumberOfMailsWithCache(mailBag.mails, currentStartId, this.getServerUrl())
if (downloadedMails.length === 0) {
break
}

const downloadedMailDetails = await this.mailExportFacade.loadMailDetails(downloadedMails)
const attachmentInfo = await this.mailExportFacade.loadAttachments(downloadedMails)

const attachmentInfo = await this.mailExportFacade.loadAttachments(downloadedMails, this.getServerUrl())
for (const { mail, mailDetails } of downloadedMailDetails) {
if (this._state().type !== "exporting") {
return
}
const mailAttachmentInfo = mail.attachments
.map((attachmentId) => attachmentInfo.find((attachment) => isSameId(attachment._id, attachmentId)))
.filter(isNotNull)
const attachments = await this.mailExportFacade.loadAttachmentData(mail, mailAttachmentInfo)
const attachments = await this.mailExportFacade.loadAttachmentData(mail, mailAttachmentInfo, this.getServerUrl())
const { makeMailBundle } = await import("../../mail/export/Bundler.js")
const mailBundle = makeMailBundle(this.sanitizer, mail, mailDetails, attachments)

Expand Down Expand Up @@ -223,4 +225,15 @@ export class MailExportController {
}
}
}

private getServerUrl(): string {
if (this.servers) {
this.serverCount += 1
if (this.serverCount >= this.servers.length) {
this.serverCount = 0
}
return this.servers[this.serverCount].url
}
throw new Error("No servers")
}
}
2 changes: 1 addition & 1 deletion src/mail-app/workerUtils/worker/WorkerLocator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ export async function initLocator(worker: WorkerImpl, browserData: BrowserData)
const { MailExportFacade } = await import("../../../common/api/worker/facades/lazy/MailExportFacade.js")
const { MailExportTokenFacade } = await import("../../../common/api/worker/facades/lazy/MailExportTokenFacade.js")
const mailExportTokenFacade = new MailExportTokenFacade(locator.serviceExecutor)
return new MailExportFacade(mailExportTokenFacade, await locator.bulkMailLoader(), await locator.blob(), locator.crypto)
return new MailExportFacade(mailExportTokenFacade, await locator.bulkMailLoader(), await locator.blob(), locator.crypto, locator.blobAccessToken)
})
}

Expand Down
21 changes: 14 additions & 7 deletions test/tests/api/worker/facades/MailExportFacadeTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import { MailExportTokenFacade } from "../../../../../src/common/api/worker/faca
import { BulkMailLoader } from "../../../../../src/mail-app/workerUtils/index/BulkMailLoader.js"
import { BlobFacade } from "../../../../../src/common/api/worker/facades/lazy/BlobFacade.js"
import { CryptoFacade } from "../../../../../src/common/api/worker/crypto/CryptoFacade.js"
import { object, when } from "testdouble"
import { instance, object, when } from "testdouble"
import { createTestEntity } from "../../../TestUtils.js"
import { FileTypeRef, MailDetailsTypeRef, MailTypeRef } from "../../../../../src/common/api/entities/tutanota/TypeRefs.js"
import { ArchiveDataType } from "../../../../../src/common/api/common/TutanotaConstants"
import { createReferencingInstance } from "../../../../../src/common/api/common/utils/BlobUtils"
import { BlobAccessTokenFacade } from "../../../../../src/common/api/worker/facades/BlobAccessTokenFacade"

o.spec("MailExportFacade", () => {
const token = "my token"
Expand All @@ -23,6 +24,7 @@ o.spec("MailExportFacade", () => {
let bulkMailLoader!: BulkMailLoader
let blobFacade!: BlobFacade
let cryptoFacade!: CryptoFacade
let blobAccessTokenFacade!: BlobAccessTokenFacade

o.beforeEach(() => {
tokenFacade = {
Expand All @@ -31,13 +33,17 @@ o.spec("MailExportFacade", () => {
bulkMailLoader = object()
blobFacade = object()
cryptoFacade = object()
facade = new MailExportFacade(tokenFacade, bulkMailLoader, blobFacade, cryptoFacade)
blobAccessTokenFacade = instance(BlobAccessTokenFacade)
facade = new MailExportFacade(tokenFacade, bulkMailLoader, blobFacade, cryptoFacade, blobAccessTokenFacade)
})

o.test("loadFixedNumberOfMailsWithCache", async () => {
when(bulkMailLoader.loadFixedNumberOfMailsWithCache("mailListId", "startId", { extraHeaders: tokenHeaders })).thenResolve([mail1, mail2])
when(bulkMailLoader.loadFixedNumberOfMailsWithCache("mailListId", "startId", { baseUrl: "baseUrl", extraHeaders: tokenHeaders })).thenResolve([
mail1,
mail2,
])

const result = await facade.loadFixedNumberOfMailsWithCache("mailListId", "startId")
const result = await facade.loadFixedNumberOfMailsWithCache("mailListId", "startId", "baseUrl")

o(result).deepEquals([mail1, mail2])
})
Expand All @@ -56,9 +62,9 @@ o.spec("MailExportFacade", () => {

o.test("loadAttachments", async () => {
const expected = [createTestEntity(FileTypeRef), createTestEntity(FileTypeRef)]
when(bulkMailLoader.loadAttachments([mail1, mail2], { extraHeaders: tokenHeaders })).thenResolve(expected)
when(bulkMailLoader.loadAttachments([mail1, mail2], { baseUrl: "baseUrl", extraHeaders: tokenHeaders })).thenResolve(expected)

const result = await facade.loadAttachments([mail1, mail2])
const result = await facade.loadAttachments([mail1, mail2], "baseUrl")

o(result).deepEquals(expected)
})
Expand All @@ -77,6 +83,7 @@ o.spec("MailExportFacade", () => {
ArchiveDataType.Attachments,
[createReferencingInstance(mailAttachments[0]), createReferencingInstance(mailAttachments[1])],
{
baseUrl: "baseUrl",
extraHeaders: tokenHeaders,
},
),
Expand All @@ -87,7 +94,7 @@ o.spec("MailExportFacade", () => {
]),
)

const result = await facade.loadAttachmentData(mail1, mailAttachments)
const result = await facade.loadAttachmentData(mail1, mailAttachments, "baseUrl")

o(result).deepEquals([
{ _type: "DataFile", name: "mail1", mimeType: "img/png", data: dataByteMail1, cid: "12345", size: 3, id: ["attachment", "id1"] },
Expand Down
12 changes: 12 additions & 0 deletions test/tests/api/worker/rest/EntityRestClientTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(tutanotaModelInfo.version) },
responseType: MediaType.Json,
queryParams: undefined,
baseUrl: undefined,
}),
).thenResolve(JSON.stringify({ instance: "calendar" }))

Expand All @@ -139,6 +140,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(sysModelInfo.version) },
responseType: MediaType.Json,
queryParams: undefined,
baseUrl: undefined,
}),
).thenResolve(JSON.stringify({ instance: "customer" }))

Expand All @@ -154,6 +156,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(tutanotaModelInfo.version), baz: "quux" },
responseType: MediaType.Json,
queryParams: { foo: "bar" },
baseUrl: undefined,
}),
).thenResolve(JSON.stringify({ instance: "calendar" }))

Expand All @@ -174,6 +177,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(tutanotaModelInfo.version) },
responseType: MediaType.Json,
queryParams: undefined,
baseUrl: undefined,
}),
).thenResolve(JSON.stringify({ _ownerEncSessionKey: "some key" }))

Expand Down Expand Up @@ -201,6 +205,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(tutanotaModelInfo.version) },
queryParams: { start: startId, count: String(count), reverse: String(false) },
responseType: MediaType.Json,
baseUrl: undefined,
}),
).thenResolve(JSON.stringify([{ instance: 1 }, { instance: 2 }]))

Expand Down Expand Up @@ -229,6 +234,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(sysModelInfo.version) },
queryParams: { ids: "0,1,2,3,4" },
responseType: MediaType.Json,
baseUrl: undefined,
}),
).thenResolve(JSON.stringify([{ instance: 1 }, { instance: 2 }]))

Expand All @@ -250,6 +256,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(sysModelInfo.version) },
queryParams: { ids: ids.join(",") },
responseType: MediaType.Json,
baseUrl: undefined,
}),
{ times: 1 },
).thenResolve(JSON.stringify([{ instance: 1 }, { instance: 2 }]))
Expand All @@ -271,6 +278,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(sysModelInfo.version) },
queryParams: { ids: countFrom(0, 100).join(",") },
responseType: MediaType.Json,
baseUrl: undefined,
}),
{ times: 1 },
).thenResolve(JSON.stringify([{ instance: 1 }]))
Expand All @@ -280,6 +288,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(sysModelInfo.version) },
queryParams: { ids: "100" },
responseType: MediaType.Json,
baseUrl: undefined,
}),
{ times: 1 },
).thenResolve(JSON.stringify([{ instance: 2 }]))
Expand All @@ -299,6 +308,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(sysModelInfo.version) },
queryParams: { ids: countFrom(0, 100).join(",") },
responseType: MediaType.Json,
baseUrl: undefined,
}),
{ times: 1 },
).thenResolve(JSON.stringify([{ instance: 1 }]))
Expand All @@ -308,6 +318,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(sysModelInfo.version) },
queryParams: { ids: countFrom(100, 100).join(",") },
responseType: MediaType.Json,
baseUrl: undefined,
}),
{ times: 1 },
).thenResolve(JSON.stringify([{ instance: 2 }]))
Expand All @@ -317,6 +328,7 @@ o.spec("EntityRestClient", function () {
headers: { ...authHeader, v: String(sysModelInfo.version) },
queryParams: { ids: countFrom(200, 11).join(",") },
responseType: MediaType.Json,
baseUrl: undefined,
}),
{ times: 1 },
).thenResolve(JSON.stringify([{ instance: 3 }]))
Expand Down
Loading

0 comments on commit 46c805b

Please sign in to comment.