Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get server information and round-robin calls to servers when exporting #8173

Merged
merged 3 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions packages/tuta-wasm-loader/lib/WasmHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ async function generateWasm(command: string, options: WasmGeneratorOptions) {
...process.env,
...options.env,
},
maxBuffer: Infinity,
cwd: options.workingDir ?? process.cwd(),
})
promise.child.stdout?.on("data", (data) => {
Expand All @@ -37,6 +38,7 @@ async function generateWasmFallback(wasmFilePath: string, options: WasmGenerator
...process.env,
...options.env,
},
maxBuffer: Infinity,
})
return result.stdout
}
Expand Down
23 changes: 17 additions & 6 deletions src/common/api/worker/facades/lazy/BlobFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import type { AesApp } from "../../../../native/worker/AesApp.js"
import { InstanceMapper } from "../../crypto/InstanceMapper.js"
import { Blob, BlobReferenceTokenWrapper, createBlobReferenceTokenWrapper } from "../../../entities/sys/TypeRefs.js"
import { FileReference } from "../../../common/utils/FileUtils.js"
import { handleRestError, NotFoundError } from "../../../common/error/RestError.js"
import { handleRestError } from "../../../common/error/RestError.js"
import { ProgrammingError } from "../../../common/error/ProgrammingError.js"
import { IServiceExecutor } from "../../../common/ServiceRequest.js"
import { BlobGetInTypeRef, BlobPostOut, BlobPostOutTypeRef, BlobServerAccessInfo, createBlobGetIn, createBlobId } from "../../../entities/storage/TypeRefs.js"
Expand Down Expand Up @@ -454,23 +454,34 @@ export class BlobFacade {

/**
* Deserializes a list of BlobWrappers that are in the following binary format
* element [ blobId ] [ blobHash ] [blobSize] [blob] [ . . . ] [ blobNId ] [ blobNHash ] [blobNSize] [blobN]
* bytes 9 6 4 blobSize 9 6 4 blobSize
* element [ #blobs ] [ blobId ] [ blobHash ] [blobSize] [blob] [ . . . ] [ blobNId ] [ blobNHash ] [blobNSize] [blobN]
* bytes 4 9 6 4 blobSize 9 6 4 blobSize
*
* @return a map from blobId to the binary data
*/
export function parseMultipleBlobsResponse(concatBinaryData: Uint8Array): Map<Id, Uint8Array> {
let offset = 0
const dataView = new DataView(concatBinaryData.buffer)
const result = new Map<Id, Uint8Array>()
const blobCount = dataView.getInt32(0)
if (blobCount <= 0) {
throw new Error(`Invalid blob count: ${blobCount}`)
}
let offset = 4
while (offset < concatBinaryData.length) {
const blobIdBytes = concatBinaryData.slice(offset, offset + 9)
const blobId = base64ToBase64Ext(uint8ArrayToBase64(blobIdBytes))

const blobSize = dataView.getInt32(offset + 15)
const contents = concatBinaryData.slice(offset + 19, offset + 19 + blobSize)
const dataStartOffset = offset + 19
if (blobSize < 0 || dataStartOffset + blobSize > concatBinaryData.length) {
throw new Error(`Invalid blob size: ${blobSize}. Remaining length: ${concatBinaryData.length - dataStartOffset}`)
}
const contents = concatBinaryData.slice(dataStartOffset, dataStartOffset + blobSize)
result.set(blobId, contents)
offset += 9 + 6 + 4 + blobSize
offset = dataStartOffset + blobSize
}
if (blobCount !== result.size) {
throw new Error(`Parsed wrong number of blobs: ${blobCount}. Expected: ${result.size}`)
}
return result
}
24 changes: 19 additions & 5 deletions src/common/api/worker/facades/lazy/MailExportFacade.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import { MailExportTokenFacade } from "./MailExportTokenFacade.js"
import { assertNotNull, isNotNull } from "@tutao/tutanota-utils"
import { NotFoundError } from "../../../common/error/RestError"
import { elementIdPart } from "../../../common/utils/EntityUtils"
import { BlobAccessTokenFacade } from "../BlobAccessTokenFacade"
import { BlobServerUrl } from "../../../entities/storage/TypeRefs"
import { Group } from "../../../entities/sys/TypeRefs"

assertWorkerOrNode()

Expand All @@ -29,28 +32,39 @@ export class MailExportFacade {
private readonly bulkMailLoader: BulkMailLoader,
private readonly blobFacade: BlobFacade,
private readonly cryptoFacade: CryptoFacade,
private readonly blobAccessTokenFacade: BlobAccessTokenFacade,
) {}

async loadFixedNumberOfMailsWithCache(mailListId: Id, startId: Id): Promise<Mail[]> {
/**
* Returns a list of servers that can be used to request data from.
*/
async getExportServers(group: Group): Promise<BlobServerUrl[]> {
const blobServerAccessInfo = await this.blobAccessTokenFacade.requestWriteToken(ArchiveDataType.Attachments, group._id)
return blobServerAccessInfo.servers
}

async loadFixedNumberOfMailsWithCache(mailListId: Id, startId: Id, baseUrl: string): Promise<Mail[]> {
return this.mailExportTokenFacade.loadWithToken((token) =>
this.bulkMailLoader.loadFixedNumberOfMailsWithCache(mailListId, startId, this.options(token)),
this.bulkMailLoader.loadFixedNumberOfMailsWithCache(mailListId, startId, { baseUrl, ...this.options(token) }),
)
}

async loadMailDetails(mails: readonly Mail[]): Promise<MailWithMailDetails[]> {
return this.mailExportTokenFacade.loadWithToken((token) => this.bulkMailLoader.loadMailDetails(mails, this.options(token)))
}

async loadAttachments(mails: readonly Mail[]): Promise<TutanotaFile[]> {
return this.mailExportTokenFacade.loadWithToken((token) => this.bulkMailLoader.loadAttachments(mails, this.options(token)))
async loadAttachments(mails: readonly Mail[], baseUrl: string): Promise<TutanotaFile[]> {
return this.mailExportTokenFacade.loadWithToken((token) => this.bulkMailLoader.loadAttachments(mails, { baseUrl, ...this.options(token) }))
}

async loadAttachmentData(mail: Mail, attachments: readonly TutanotaFile[]): Promise<DataFile[]> {
const attachmentsWithKeys = await this.cryptoFacade.enforceSessionKeyUpdateIfNeeded(mail, attachments)

const downloads = await this.mailExportTokenFacade.loadWithToken((token) => {
const referencingInstances = attachmentsWithKeys.map(createReferencingInstance)
return this.blobFacade.downloadAndDecryptBlobsOfMultipleInstances(ArchiveDataType.Attachments, referencingInstances, this.options(token))
return this.blobFacade.downloadAndDecryptBlobsOfMultipleInstances(ArchiveDataType.Attachments, referencingInstances, {
...this.options(token),
})
})

const attachmentData = Array.from(downloads.entries()).map(([fileId, bytes]) => {
Expand Down
4 changes: 4 additions & 0 deletions src/common/api/worker/rest/EntityRestClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ export interface EntityRestClientLoadOptions {
ownerKeyProvider?: OwnerKeyProvider
/** Defaults to {@link CacheMode.ReadAndWrite }*/
cacheMode?: CacheMode
baseUrl?: string
}

export interface OwnerEncSessionKeyProvider {
Expand Down Expand Up @@ -204,6 +205,7 @@ export class EntityRestClient implements EntityRestInterface {
queryParams,
headers,
responseType: MediaType.Json,
baseUrl: opts.baseUrl,
})
const entity = JSON.parse(json)
const migratedEntity = await this._crypto.applyMigrations(typeRef, entity)
Expand Down Expand Up @@ -258,6 +260,7 @@ export class EntityRestClient implements EntityRestInterface {
queryParams,
headers,
responseType: MediaType.Json,
baseUrl: opts.baseUrl,
})
return this._handleLoadMultipleResult(typeRef, JSON.parse(json))
}
Expand Down Expand Up @@ -285,6 +288,7 @@ export class EntityRestClient implements EntityRestInterface {
queryParams,
headers,
responseType: MediaType.Json,
baseUrl: opts.baseUrl,
})
}
return this._handleLoadMultipleResult(typeRef, JSON.parse(json), ownerEncSessionKeyProvider)
Expand Down
21 changes: 17 additions & 4 deletions src/mail-app/native/main/MailExportController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import type { TranslationText } from "../../../common/misc/LanguageViewModel"
import { SuspensionError } from "../../../common/api/common/error/SuspensionError"
import { Scheduler } from "../../../common/api/common/utils/Scheduler"
import { ExportError, ExportErrorReason } from "../../../common/api/common/error/ExportError"
import { BlobServerUrl } from "../../../common/api/entities/storage/TypeRefs"
import { assertMainOrNode } from "../../../common/api/common/Env"

assertMainOrNode()
Expand All @@ -37,6 +38,8 @@ const TAG = "MailboxExport"
export class MailExportController {
private _state: Stream<MailExportState> = stream({ type: "idle" })
private _lastExport: Date | null = null
private servers?: BlobServerUrl[]
private serverIndex: number = 0

get lastExport(): Date | null {
return this._lastExport
Expand Down Expand Up @@ -145,6 +148,7 @@ export class MailExportController {

private async runExport(mailboxDetail: MailboxDetail, mailBags: MailBag[], mailId: Id) {
const startTime = assertNotNull(this._lastExport)
this.servers = await this.mailExportFacade.getExportServers(mailboxDetail.mailGroup)
for (const mailBag of mailBags) {
await this.exportMailBag(mailBag, mailId)
if (this._state().type !== "exporting" || this._lastExport !== startTime) {
Expand All @@ -160,18 +164,16 @@ export class MailExportController {
}

private async exportMailBag(mailBag: MailBag, startId: Id): Promise<void> {
console.log(TAG, `Exporting mail bag: ${mailBag._id} ${startId}`)
let currentStartId = startId
while (true) {
try {
const downloadedMails = await this.mailExportFacade.loadFixedNumberOfMailsWithCache(mailBag.mails, currentStartId)
const downloadedMails = await this.mailExportFacade.loadFixedNumberOfMailsWithCache(mailBag.mails, currentStartId, this.getServerUrl())
if (downloadedMails.length === 0) {
break
}

const downloadedMailDetails = await this.mailExportFacade.loadMailDetails(downloadedMails)
const attachmentInfo = await this.mailExportFacade.loadAttachments(downloadedMails)

const attachmentInfo = await this.mailExportFacade.loadAttachments(downloadedMails, this.getServerUrl())
for (const { mail, mailDetails } of downloadedMailDetails) {
if (this._state().type !== "exporting") {
return
Expand Down Expand Up @@ -223,4 +225,15 @@ export class MailExportController {
}
}
}

private getServerUrl(): string {
if (this.servers) {
this.serverIndex += 1
if (this.serverIndex >= this.servers.length) {
this.serverIndex = 0
}
return this.servers[this.serverIndex].url
}
throw new Error("No servers")
}
}
2 changes: 1 addition & 1 deletion src/mail-app/workerUtils/worker/WorkerLocator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ export async function initLocator(worker: WorkerImpl, browserData: BrowserData)
const { MailExportFacade } = await import("../../../common/api/worker/facades/lazy/MailExportFacade.js")
const { MailExportTokenFacade } = await import("../../../common/api/worker/facades/lazy/MailExportTokenFacade.js")
const mailExportTokenFacade = new MailExportTokenFacade(locator.serviceExecutor)
return new MailExportFacade(mailExportTokenFacade, await locator.bulkMailLoader(), await locator.blob(), locator.crypto)
return new MailExportFacade(mailExportTokenFacade, await locator.bulkMailLoader(), await locator.blob(), locator.crypto, locator.blobAccessToken)
})
}

Expand Down
40 changes: 29 additions & 11 deletions test/tests/api/worker/facades/BlobFacadeTest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ o.spec("BlobFacade test", function () {
// data size is 65 (16 data block, 16 iv, 32 hmac, 1 byte for mac marking)
const blobSizeBinary = new Uint8Array([0, 0, 0, 65])
const blobResponse = concat(
// number of blobs
new Uint8Array([0, 0, 0, 1]),
// blob id
base64ToUint8Array(base64ExtToBase64(blobId)),
// blob hash
Expand Down Expand Up @@ -230,6 +232,8 @@ o.spec("BlobFacade test", function () {
// data size is 65 (16 data block, 16 iv, 32 hmac, 1 byte for mac marking)
const blobSizeBinary = new Uint8Array([0, 0, 0, 65])
const blobResponse = concat(
// number of blobs
new Uint8Array([0, 0, 0, 2]),
// blob id
base64ToUint8Array(base64ExtToBase64(blobId1)),
// blob hash
Expand Down Expand Up @@ -374,6 +378,8 @@ o.spec("BlobFacade test", function () {
// data size is 65 (16 data block, 16 iv, 32 hmac, 1 byte for mac marking)
const blobSizeBinary = new Uint8Array([0, 0, 0, 65])
const blobResponse = concat(
// number of blobs
new Uint8Array([0, 0, 0, 3]),
// blob id
base64ToUint8Array(base64ExtToBase64(blobId1)),
// blob hash
Expand Down Expand Up @@ -463,6 +469,8 @@ o.spec("BlobFacade test", function () {
// data size is 65 (16 data block, 16 iv, 32 hmac, 1 byte for mac marking)
const blobSizeBinary = new Uint8Array([0, 0, 0, 65])
const blobResponse1 = concat(
// number of blobs
new Uint8Array([0, 0, 0, 2]),
// blob id
base64ToUint8Array(base64ExtToBase64(blobId1)),
// blob hash
Expand All @@ -482,6 +490,8 @@ o.spec("BlobFacade test", function () {
)

const blobResponse2 = concat(
// number of blobs
new Uint8Array([0, 0, 0, 1]),
//blodId
base64ToUint8Array(base64ExtToBase64(blobId3)),
// blob hash
Expand Down Expand Up @@ -553,6 +563,8 @@ o.spec("BlobFacade test", function () {
// data size is 65 (16 data block, 16 iv, 32 hmac, 1 byte for mac marking)
const blobSizeBinary = new Uint8Array([0, 0, 0, 65])
const blobResponse = concat(
// number of blobs
new Uint8Array([0, 0, 0, 2]),
// blob id
base64ToUint8Array(base64ExtToBase64(blobId1)),
// blob hash
Expand Down Expand Up @@ -583,13 +595,15 @@ o.spec("BlobFacade test", function () {
// Blob id OETv4XP----0 hash [3, -112, 88, -58, -14, -64] bytes [1, 2, 3]
// Blob id OETv4XS----0 hash [113, -110, 56, 92, 60, 6] bytes [1, 2, 3, 4, 5, 6]
const binaryData = new Int8Array([
// blob id 1 [0-8]
// number of blobs [0-3] 2
0, 0, 0, 2,
// blob id 1 [4-12]
100, -9, -69, 22, 38, -128, 0, 0, 1,
// blob hash 1 [9-14]
// blob hash 1 [13-18]
3, -112, 88, -58, -14, -64,
// blob size 1 [15-18]
// blob size 1 [19-22]
0, 0, 0, 3,
// blob data 1 [19-21]
// blob data 1 [23-25]
1, 2, 3,
// blob id 2
100, -9, -69, 22, 39, 64, 0, 0, 1,
Expand All @@ -613,13 +627,15 @@ o.spec("BlobFacade test", function () {
o.test("parses one blob", function () {
// Blob id OETv4XP----0 hash [3, -112, 88, -58, -14, -64] bytes [1, 2, 3]
const binaryData = new Int8Array([
// blob id 1 [0-8]
// number of blobs [0-3]
0, 0, 0, 1,
// blob id 1 [4-12]
100, -9, -69, 22, 38, -128, 0, 0, 1,
// blob hash 1 [9-14]
// blob hash 1 [13-18]
3, -112, 88, -58, -14, -64,
// blob size 1 [15-18]
// blob size 1 [19-22]
0, 0, 0, 3,
// blob data 1 [19-21]
// blob data 1 [23-25]
1, 2, 3,
])

Expand All @@ -632,11 +648,13 @@ o.spec("BlobFacade test", function () {
const blobDataNumbers = Array(384).fill(1)
const binaryData = new Int8Array(
[
// blob id 1 [0-8]
// number of blobs [0-3]
0, 0, 0, 1,
// blob id 1 [4-12]
100, -9, -69, 22, 38, -128, 0, 0, 1,
// blob hash 1 [9-14]
// blob hash 1 [13-18]
3, -112, 88, -58, -14, -64,
// blob size 1 [15-18] 384
// blob size 1 [19-22]
0, 0, 1, 128,
].concat(blobDataNumbers),
)
Expand Down
Loading
Loading