Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lib-storage): set ChecksumAlgorithm when calling CreateMPU #6802

Merged
merged 9 commits into from
Jan 15, 2025
4 changes: 4 additions & 0 deletions lib/lib-storage/src/Upload.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ vi.mock("@aws-sdk/client-s3", async () => {
send: sendMock,
config: {
endpoint: endpointMock,
requestChecksumCalculation: () => Promise.resolve("WHEN_SUPPORTED"),
},
}),
S3Client: vi.fn().mockReturnValue({
send: sendMock,
config: {
endpoint: endpointMock,
requestHandler: mockRequestHandler,
requestChecksumCalculation: () => Promise.resolve("WHEN_SUPPORTED"),
},
}),
CreateMultipartUploadCommand: vi.fn().mockReturnValue({
Expand Down Expand Up @@ -392,6 +394,7 @@ describe(Upload.name, () => {
expect(CreateMultipartUploadCommand).toHaveBeenCalledWith({
...actionParams,
Body: undefined,
ChecksumAlgorithm: "CRC32",
});
// upload parts is called correctly.
expect(UploadPartCommand).toHaveBeenCalledTimes(2);
Expand Down Expand Up @@ -457,6 +460,7 @@ describe(Upload.name, () => {
expect(CreateMultipartUploadCommand).toHaveBeenCalledWith({
...actionParams,
Body: undefined,
ChecksumAlgorithm: "CRC32",
});

// upload parts is called correctly.
Expand Down
5 changes: 5 additions & 0 deletions lib/lib-storage/src/Upload.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {
AbortMultipartUploadCommand,
ChecksumAlgorithm,
CompletedPart,
CompleteMultipartUploadCommand,
CompleteMultipartUploadCommandOutput,
Expand Down Expand Up @@ -199,8 +200,12 @@ export class Upload extends EventEmitter {
}

private async __createMultipartUpload(): Promise<CreateMultipartUploadCommandOutput> {
const requestChecksumCalculation = await this.client.config.requestChecksumCalculation();
if (!this.createMultiPartPromise) {
const createCommandParams = { ...this.params, Body: undefined };
if (requestChecksumCalculation === "WHEN_SUPPORTED") {
createCommandParams.ChecksumAlgorithm = this.params.ChecksumAlgorithm || ChecksumAlgorithm.CRC32;
}
this.createMultiPartPromise = this.client
.send(new CreateMultipartUploadCommand(createCommandParams))
.then((createMpuResponse) => {
Expand Down
271 changes: 124 additions & 147 deletions lib/lib-storage/src/lib-storage.e2e.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { S3 } from "@aws-sdk/client-s3";
import { ChecksumAlgorithm, S3 } from "@aws-sdk/client-s3";
import { Upload } from "@aws-sdk/lib-storage";
import { randomBytes } from "crypto";
import { Readable } from "stream";
Expand All @@ -7,153 +7,130 @@ import { afterAll, beforeAll, describe, expect, test as it } from "vitest";
import { getIntegTestResources } from "../../../tests/e2e/get-integ-test-resources";

describe("@aws-sdk/lib-storage", () => {
let Key: string;
let client: S3;
let data: Uint8Array;
let dataString: string;
let Bucket: string;
let region: string;

beforeAll(async () => {
const integTestResourcesEnv = await getIntegTestResources();
Object.assign(process.env, integTestResourcesEnv);

region = process?.env?.AWS_SMOKE_TEST_REGION as string;
Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET as string;

Key = ``;
data = randomBytes(20_240_000);
dataString = data.toString();

client = new S3({
region,
// ToDo(JS-5678): Remove this when default checksum is supported by Upload.
requestChecksumCalculation: "WHEN_REQUIRED",
});
});

describe("Upload", () => {
beforeAll(() => {
Key = `multi-part-file-${Date.now()}`;
});
afterAll(async () => {
await client.deleteObject({ Bucket, Key });
});

it("should upload in parts for input type bytes", async () => {
const s3Upload = new Upload({
client,
params: {
Bucket,
Key,
Body: data,
},
});
await s3Upload.done();

const object = await client.getObject({
Bucket,
Key,
});

expect(await object.Body?.transformToString()).toEqual(dataString);
});

it("should upload in parts for input type string", async () => {
const s3Upload = new Upload({
client,
params: {
Bucket,
Key,
Body: dataString,
},
});
await s3Upload.done();

const object = await client.getObject({
Bucket,
Key,
});

expect(await object.Body?.transformToString()).toEqual(dataString);
});

it("should upload in parts for input type Readable", async () => {
const s3Upload = new Upload({
client,
params: {
Bucket,
Key,
Body: Readable.from(data),
},
});
await s3Upload.done();

const object = await client.getObject({
Bucket,
Key,
});

expect(await object.Body?.transformToString()).toEqual(dataString);
});

it("should call AbortMultipartUpload if unable to complete a multipart upload.", async () => {
class MockFailureS3 extends S3 {
public counter = 0;
async send(command: any, ...rest: any[]) {
if (command?.constructor?.name === "UploadPartCommand" && this.counter++ % 3 === 0) {
throw new Error("simulated upload part error");
describe.each([undefined, "WHEN_REQUIRED", "WHEN_SUPPORTED"])(
"requestChecksumCalculation: %s",
(requestChecksumCalculation) => {
describe.each([
undefined,
ChecksumAlgorithm.SHA1,
ChecksumAlgorithm.SHA256,
ChecksumAlgorithm.CRC32,
ChecksumAlgorithm.CRC32C,
])("ChecksumAlgorithm: %s", (ChecksumAlgorithm) => {
let Key: string;
let client: S3;
let data: Uint8Array;
let dataString: string;
let Bucket: string;
let region: string;

beforeAll(async () => {
const integTestResourcesEnv = await getIntegTestResources();
Object.assign(process.env, integTestResourcesEnv);

region = process?.env?.AWS_SMOKE_TEST_REGION as string;
Bucket = process?.env?.AWS_SMOKE_TEST_BUCKET as string;

Key = ``;
data = randomBytes(20_240_000);
dataString = data.toString();

// @ts-expect-error: Types of property 'requestChecksumCalculation' are incompatible
client = new S3({
region,
requestChecksumCalculation,
});
Key = `multi-part-file-${requestChecksumCalculation}-${ChecksumAlgorithm}-${Date.now()}`;
});

afterAll(async () => {
await client.deleteObject({ Bucket, Key });
});

it("should upload in parts for input type bytes", async () => {
const s3Upload = new Upload({
client,
params: { Bucket, Key, Body: data, ChecksumAlgorithm },
});
await s3Upload.done();

const object = await client.getObject({ Bucket, Key });
expect(await object.Body?.transformToString()).toEqual(dataString);
});

it("should upload in parts for input type string", async () => {
const s3Upload = new Upload({
client,
params: { Bucket, Key, Body: dataString, ChecksumAlgorithm },
});
await s3Upload.done();

const object = await client.getObject({ Bucket, Key });
expect(await object.Body?.transformToString()).toEqual(dataString);
});

it("should upload in parts for input type Readable", async () => {
const s3Upload = new Upload({
client,
params: { Bucket, Key, Body: Readable.from(data), ChecksumAlgorithm },
});
await s3Upload.done();

const object = await client.getObject({ Bucket, Key });
expect(await object.Body?.transformToString()).toEqual(dataString);
});

it("should call AbortMultipartUpload if unable to complete a multipart upload.", async () => {
class MockFailureS3 extends S3 {
public counter = 0;
async send(command: any, ...rest: any[]) {
if (command?.constructor?.name === "UploadPartCommand" && this.counter++ % 3 === 0) {
throw new Error("simulated upload part error");
}
return super.send(command, ...rest);
}
}
return super.send(command, ...rest);
}
}

const client = new MockFailureS3({
region,
});

const requestLog = [] as string[];

client.middlewareStack.add(
(next, context) => async (args) => {
const result = await next(args);
requestLog.push([context.clientName, context.commandName, result.output.$metadata.httpStatusCode].join(" "));
return result;
},
{
name: "E2eRequestLog",
step: "build",
override: true,
}
);

const s3Upload = new Upload({
client,
params: {
Bucket,
Key,
Body: data,
},
const client = new MockFailureS3({ region });

const requestLog = [] as string[];

client.middlewareStack.add(
(next, context) => async (args) => {
const result = await next(args);
requestLog.push(
[context.clientName, context.commandName, result.output.$metadata.httpStatusCode].join(" ")
);
return result;
},
{
name: "E2eRequestLog",
step: "build",
override: true,
}
);

const s3Upload = new Upload({
client,
params: { Bucket, Key, Body: data, ChecksumAlgorithm },
});
// eslint-disable-next-line @typescript-eslint/no-unused-vars
await s3Upload.done().catch((ignored) => {});

const uploadStatus = await client
.listParts({ Bucket, Key, UploadId: s3Upload.uploadId })
.then((listParts) => listParts.$metadata.httpStatusCode)
.catch((err) => err.toString());

expect(uploadStatus).toMatch(/NoSuchUpload:(.*?)aborted or completed\./);
expect(requestLog).toEqual([
"S3Client CreateMultipartUploadCommand 200",
"S3Client UploadPartCommand 200",
"S3Client UploadPartCommand 200",
"S3Client AbortMultipartUploadCommand 204",
]);
});
});
// eslint-disable-next-line @typescript-eslint/no-unused-vars
await s3Upload.done().catch((ignored) => {});

const uploadStatus = await client
.listParts({
Bucket,
Key,
UploadId: s3Upload.uploadId,
})
.then((listParts) => listParts.$metadata.httpStatusCode)
.catch((err) => err.toString());

expect(uploadStatus).toMatch(/NoSuchUpload:(.*?)aborted or completed\./);
expect(requestLog).toEqual([
"S3Client CreateMultipartUploadCommand 200",
"S3Client UploadPartCommand 200",
"S3Client UploadPartCommand 200",
"S3Client AbortMultipartUploadCommand 204",
]);
});
});
}
);
}, 45_000);
Loading