From 3e89239478bf8b693c22d9219246244af8ccc4b0 Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Fri, 4 Jan 2019 11:41:50 -0800 Subject: [PATCH] Remove resuming multipart uploads (#267) ListParts is never fully complete in its output, there is always a possibility that the List is partial where another PutObjectPart() is being uploaded in parallel. While it may seem like ListParts and ListMultipartUploads provide the possibility of resuming an upload, it would be a mistake as these APIs were inherently never meant do these things. This is one of the reasons why Amazon recommends that one should avoid using these API calls to complete or add parts to an upload - they instead recommend to remember the parts being uploaded at the client. --- Docs/API.md | 4 +- Minio.Functional.Tests/FunctionalTest.cs | 72 +++---------------- Minio/ApiEndpoints/ObjectOperations.cs | 91 ++++++++---------------- Minio/MinioClient.cs | 19 ++--- 4 files changed, 52 insertions(+), 134 deletions(-) diff --git a/Docs/API.md b/Docs/API.md index da6563ca9..63fc845a9 100644 --- a/Docs/API.md +++ b/Docs/API.md @@ -803,7 +803,7 @@ __Parameters__ __Example__ -The maximum size of a single object is limited to 5TB. putObject transparently uploads objects larger than 5MiB in multiple parts. This allows failed uploads to resume safely by only uploading the missing parts. Uploaded data is carefully verified using MD5SUM signatures. +The maximum size of a single object is limited to 5TB. putObject transparently uploads objects larger than 5MiB in multiple parts. Uploaded data is carefully verified using MD5SUM signatures. ```cs @@ -864,7 +864,7 @@ __Parameters__ __Example__ -The maximum size of a single object is limited to 5TB. putObject transparently uploads objects larger than 5MiB in multiple parts. This allows failed uploads to resume safely by only uploading the missing parts. Uploaded data is carefully verified using MD5SUM signatures. +The maximum size of a single object is limited to 5TB. putObject transparently uploads objects larger than 5MiB in multiple parts. Uploaded data is carefully verified using MD5SUM signatures. ```cs diff --git a/Minio.Functional.Tests/FunctionalTest.cs b/Minio.Functional.Tests/FunctionalTest.cs index 2a9072fab..47d73d105 100644 --- a/Minio.Functional.Tests/FunctionalTest.cs +++ b/Minio.Functional.Tests/FunctionalTest.cs @@ -187,13 +187,11 @@ public static void Main(string[] args) // Test Putobject function PutObject_Test1(minioClient).Wait(); PutObject_Test2(minioClient).Wait(); - PutObject_Test3(minioClient).Wait(); PutObject_Test4(minioClient).Wait(); PutObject_Test5(minioClient).Wait(); PutObject_Test6(minioClient).Wait(); PutObject_Test7(minioClient).Wait(); - PutObject_Test8(minioClient).Wait(); // Test StatObject function StatObject_Test1(minioClient).Wait(); @@ -558,53 +556,6 @@ private async static Task PutObject_Test3(MinioClient minio) } } private async static Task PutObject_Test4(MinioClient minio) - { - DateTime startTime = DateTime.Now; - string bucketName = GetRandomName(15); - string objectName = GetRandomName(10); - string contentType = "application/octet-stream"; - Dictionary args = new Dictionary - { - { "bucketName", bucketName}, - {"objectName",objectName}, - {"contentType", contentType}, - {"data","1MB"}, - {"size","4MB"}, - }; - try - { - // Putobject call with incorrect size of stream. See if PutObjectAsync call resumes - await Setup_Test(minio, bucketName); - using (System.IO.MemoryStream filestream = rsg.GenerateStreamFromSeed(1 * MB)) - { - try - { - long size = 4 * MB; - long file_write_size = filestream.Length; - - await minio.PutObjectAsync(bucketName, - objectName, - filestream, - size, - contentType); - } - catch (UnexpectedShortReadException) - { - // PutObject failed as expected since the stream size is incorrect - // default to actual stream size and complete the upload - await PutObject_Tester(minio, bucketName, objectName, null, contentType, 0, null,rsg.GenerateStreamFromSeed(1 * MB)); - } - } - await TearDown(minio, bucketName); - new MintLogger("PutObject_Test4",putObjectSignature1,"Tests whether PutObject with incorrect stream-size passes",TestStatus.PASS,(DateTime.Now - startTime),args:args).Log(); - } - catch (Exception ex) - { - new MintLogger("PutObject_Test4",putObjectSignature1,"Tests whether PutObject with incorrect stream-size passes",TestStatus.FAIL,(DateTime.Now - startTime),"",ex.Message, ex.ToString(),args).Log(); - } - } - - private async static Task PutObject_Test5(MinioClient minio) { DateTime startTime = DateTime.Now; string bucketName = GetRandomName(15); @@ -634,19 +585,18 @@ private async static Task PutObject_Test5(MinioClient minio) Assert.IsTrue(statMeta.ContainsKey("x-amz-meta-customheader")); Assert.IsTrue(statObject.metaData.ContainsKey("Content-Type") && statObject.metaData["Content-Type"].Equals("custom/contenttype")); await TearDown(minio, bucketName); - new MintLogger("PutObject_Test5",putObjectSignature1,"Tests whether PutObject with different content-type passes",TestStatus.PASS,(DateTime.Now - startTime), args:args).Log(); + new MintLogger("PutObject_Test4",putObjectSignature1,"Tests whether PutObject with different content-type passes",TestStatus.PASS,(DateTime.Now - startTime), args:args).Log(); } catch (MinioException ex) { - new MintLogger("PutObject_Test5",putObjectSignature1,"Tests whether PutObject with different content-type passes",TestStatus.FAIL,(DateTime.Now - startTime),"",ex.Message, ex.ToString(),args).Log(); + new MintLogger("PutObject_Test4",putObjectSignature1,"Tests whether PutObject with different content-type passes",TestStatus.FAIL,(DateTime.Now - startTime),"",ex.Message, ex.ToString(),args).Log(); } if (!IsMintEnv()) { File.Delete(fileName); } } - - private async static Task PutObject_Test6(MinioClient minio) + private async static Task PutObject_Test5(MinioClient minio) { DateTime startTime = DateTime.Now; string bucketName = GetRandomName(15); @@ -663,14 +613,14 @@ private async static Task PutObject_Test6(MinioClient minio) await Setup_Test(minio, bucketName); await PutObject_Tester(minio, bucketName, objectName, null, null, 0, null, rsg.GenerateStreamFromSeed(1 * MB)); await TearDown(minio, bucketName); - new MintLogger("PutObject_Test6",putObjectSignature1,"Tests whether PutObject with no content-type passes for small object",TestStatus.PASS,(DateTime.Now - startTime),args:args).Log(); + new MintLogger("PutObject_Test5",putObjectSignature1,"Tests whether PutObject with no content-type passes for small object",TestStatus.PASS,(DateTime.Now - startTime),args:args).Log(); } catch (Exception ex) { - new MintLogger("PutObject_Test6",putObjectSignature1,"Tests whether PutObject with no content-type passes for small object",TestStatus.FAIL,(DateTime.Now - startTime),"",ex.Message, ex.ToString(), args).Log(); + new MintLogger("PutObject_Test5",putObjectSignature1,"Tests whether PutObject with no content-type passes for small object",TestStatus.FAIL,(DateTime.Now - startTime),"",ex.Message, ex.ToString(), args).Log(); } } - private async static Task PutObject_Test7(MinioClient minio) + private async static Task PutObject_Test6(MinioClient minio) { DateTime startTime = DateTime.Now; string bucketName = GetRandomName(15); @@ -692,14 +642,14 @@ private async static Task PutObject_Test7(MinioClient minio) await Task.WhenAll(tasks); await minio.RemoveObjectAsync(bucketName, objectName); await TearDown(minio, bucketName); - new MintLogger("PutObject_Test7",putObjectSignature1,"Tests thread safety of minioclient on a parallel put operation",TestStatus.PASS,(DateTime.Now - startTime), args:args).Log(); + new MintLogger("PutObject_Test6",putObjectSignature1,"Tests thread safety of minioclient on a parallel put operation",TestStatus.PASS,(DateTime.Now - startTime), args:args).Log(); } catch (Exception ex) { - new MintLogger("PutObject_Test7",putObjectSignature1,"Tests thread safety of minioclient on a parallel put operation",TestStatus.FAIL,(DateTime.Now - startTime),"",ex.Message, ex.ToString(), args).Log(); + new MintLogger("PutObject_Test6",putObjectSignature1,"Tests thread safety of minioclient on a parallel put operation",TestStatus.FAIL,(DateTime.Now - startTime),"",ex.Message, ex.ToString(), args).Log(); } } - private async static Task PutObject_Test8(MinioClient minio) + private async static Task PutObject_Test7(MinioClient minio) { DateTime startTime = DateTime.Now; string bucketName = GetRandomName(15); @@ -731,11 +681,11 @@ await minio.PutObjectAsync(bucketName, await minio.RemoveObjectAsync(bucketName, objectName); await TearDown(minio, bucketName); } - new MintLogger("PutObject_Test8",putObjectSignature1,"Tests whether PutObject with unknown stream-size passes",TestStatus.PASS,(DateTime.Now - startTime), args:args).Log(); + new MintLogger("PutObject_Test7",putObjectSignature1,"Tests whether PutObject with unknown stream-size passes",TestStatus.PASS,(DateTime.Now - startTime), args:args).Log(); } catch (Exception ex) { - new MintLogger("PutObject_Test8",putObjectSignature1,"Tests whether PutObject with unknown stream-size passes",TestStatus.FAIL,(DateTime.Now - startTime),"",ex.Message, ex.ToString(), args).Log(); + new MintLogger("PutObject_Test7",putObjectSignature1,"Tests whether PutObject with unknown stream-size passes",TestStatus.FAIL,(DateTime.Now - startTime),"",ex.Message, ex.ToString(), args).Log(); } } private async static Task PutGetStatEncryptedObject_Test1(MinioClient minio) diff --git a/Minio/ApiEndpoints/ObjectOperations.cs b/Minio/ApiEndpoints/ObjectOperations.cs index 20b723693..933b1f895 100644 --- a/Minio/ApiEndpoints/ObjectOperations.cs +++ b/Minio/ApiEndpoints/ObjectOperations.cs @@ -51,10 +51,10 @@ public partial class MinioClient : IObjectOperations await StatObjectAsync(bucketName, objectName, cancellationToken:cancellationToken).ConfigureAwait(false); var headers = new Dictionary(); - if (sse != null && sse.GetType().Equals(EncryptionType.SSE_C)) + if (sse != null && sse.GetType().Equals(EncryptionType.SSE_C)) { sse.Marshal(headers); - } + } var request = await this.CreateRequest(Method.GET, bucketName, objectName: objectName, @@ -90,10 +90,10 @@ public partial class MinioClient : IObjectOperations if (length > 0) headerMap.Add("Range", "bytes=" + offset.ToString() + "-" + (offset + length - 1).ToString()); - if (sse != null && sse.GetType().Equals(EncryptionType.SSE_C)) + if (sse != null && sse.GetType().Equals(EncryptionType.SSE_C)) { sse.Marshal(headerMap); - } + } var request = await this.CreateRequest(Method.GET, bucketName, objectName: objectName, @@ -251,7 +251,7 @@ await GetObjectAsync(bucketName, objectName, (stream) => { throw new UnexpectedShortReadException("Data read " + bytes.Length + " is shorter than the size " + size + " of input buffer."); } - await this.PutObjectAsync(bucketName, objectName, null, 0, bytes, metaData, sseHeaders,cancellationToken).ConfigureAwait(false); + await this.PutObjectAsync(bucketName, objectName, null, 0, bytes, metaData, sseHeaders, cancellationToken).ConfigureAwait(false); return; } // For all sizes greater than 5MiB do multipart. @@ -261,21 +261,11 @@ await GetObjectAsync(bucketName, objectName, (stream) => double partCount = multiPartInfo.partCount; double lastPartSize = multiPartInfo.lastPartSize; Part[] totalParts = new Part[(int)partCount]; - Part part = null; - Part[] existingParts = null; - string uploadId = await this.getLatestIncompleteUploadIdAsync(bucketName, objectName, cancellationToken).ConfigureAwait(false); + string uploadId = await this.NewMultipartUploadAsync(bucketName, objectName, metaData, sseHeaders, cancellationToken).ConfigureAwait(false); - if (uploadId == null) - { - uploadId = await this.NewMultipartUploadAsync(bucketName, objectName, metaData,sseHeaders,cancellationToken).ConfigureAwait(false); - } - else - { - existingParts = await this.ListParts(bucketName, objectName, uploadId, cancellationToken).ToArray(); - } - - if (sse != null && + // Remove SSE-S3 and KMS headers during PutObjectPart operations. + if (sse != null && (sse.GetType().Equals(EncryptionType.SSE_S3) || sse.GetType().Equals(EncryptionType.SSE_KMS))) { @@ -283,10 +273,10 @@ await GetObjectAsync(bucketName, objectName, (stream) => sseHeaders.Remove(Constants.SSEKMSContext); sseHeaders.Remove(Constants.SSEKMSKeyId); } + double expectedReadSize = partSize; int partNumber; int numPartsUploaded = 0; - bool skipUpload = false; for (partNumber = 1; partNumber <= partCount; partNumber++) { byte[] dataToCopy = ReadFull(data, (int)partSize); @@ -294,40 +284,16 @@ await GetObjectAsync(bucketName, objectName, (stream) => { break; } - + if (partNumber == partCount) { expectedReadSize = lastPartSize; } - if (existingParts != null && partNumber <= existingParts.Length) - { - part = existingParts[partNumber - 1]; - if (part != null && partNumber == part.PartNumber && expectedReadSize == part.partSize()) - { - System.Security.Cryptography.MD5 md5 = System.Security.Cryptography.MD5.Create(); - byte[] hash = md5.ComputeHash(dataToCopy); - string etag = BitConverter.ToString(hash).Replace("-", string.Empty).ToLower(); - if (etag.Equals(part.ETag)) - { - totalParts[partNumber - 1] = new Part() { PartNumber = part.PartNumber, ETag = part.ETag, size = part.partSize() }; - skipUpload = true; - - } - - } - } - else - { - skipUpload = false; - } - if (!skipUpload) - { - numPartsUploaded += 1; - string etag = await this.PutObjectAsync(bucketName, objectName, uploadId, partNumber, dataToCopy, metaData,sseHeaders, cancellationToken).ConfigureAwait(false); - totalParts[partNumber - 1] = new Part() { PartNumber = partNumber, ETag = etag, size = (long)expectedReadSize }; - } - + numPartsUploaded += 1; + string etag = await this.PutObjectAsync(bucketName, objectName, uploadId, partNumber, dataToCopy, metaData, sseHeaders, cancellationToken).ConfigureAwait(false); + totalParts[partNumber - 1] = new Part() { PartNumber = partNumber, ETag = etag, size = (long)expectedReadSize }; } + // This shouldn't happen where stream size is known. if (partCount != numPartsUploaded && size != -1) { @@ -384,7 +350,7 @@ private async Task CompleteMultipartUploadAsync(string bucketName, string object /// - /// Returns an async observable of parts corresponding to a uploadId for a specific bucket and objectName + /// Returns an async observable of parts corresponding to a uploadId for a specific bucket and objectName /// /// /// @@ -808,7 +774,7 @@ private async Task> removeObjectsAsync(string bucketName, List objectList.Add(new DeleteObject(objectName)); i++; if (i % 1000 == 0) - break; + break; } if (objectList.Count() > 0) { @@ -851,7 +817,7 @@ private async Task> removeObjectsAsync(string bucketName, List string etag = ""; string contentType = null; Dictionary metaData = new Dictionary(); - + //Supported headers for object. List supportedHeaders = new List { "cache-control", "content-encoding", "content-type" }; @@ -878,7 +844,7 @@ private async Task> removeObjectsAsync(string bucketName, List { metaData[parameter.Name] = parameter.Value.ToString(); } - + } return new ObjectStat(objectName, size, lastModified, etag, contentType, metaData); @@ -956,18 +922,19 @@ internal byte[] ReadFull(Stream data, int currentPartSize) { destObjectName = objectName; } + ServerSideEncryption sseGet = sseSrc; if (sseSrc is SSECopy) { SSECopy sseCpy = (SSECopy)sseSrc; sseGet = sseCpy.CloneToSSEC(); } - // Get Stats on the source object + // Get Stats on the source object ObjectStat srcStats = await this.StatObjectAsync(bucketName, objectName, sse:sseGet, cancellationToken:cancellationToken).ConfigureAwait(false); // Copy metadata from the source object if no metadata replace directive Dictionary meta = new Dictionary(); Dictionary m = metadata; - if (copyConditions != null && !copyConditions.HasReplaceMetadataDirective()) + if (copyConditions != null && !copyConditions.HasReplaceMetadataDirective()) { m = srcStats.metaData; } @@ -1009,7 +976,7 @@ internal byte[] ReadFull(Stream data, int currentPartSize) } } /// - /// Create the copy request,execute it and + /// Create the copy request,execute it and /// /// Bucket name where the object to be copied exists. /// Object name source to be copied. @@ -1054,7 +1021,7 @@ private async Task CopyObjectRequestAsync(string bucketName, string obje // Just read the result and parse content. var contentBytes = System.Text.Encoding.UTF8.GetBytes(response.Content); - + object copyResult = null; using (var stream = new MemoryStream(contentBytes)) { @@ -1063,7 +1030,7 @@ private async Task CopyObjectRequestAsync(string bucketName, string obje if (type == typeof(CopyPartResult)) copyResult = (CopyPartResult)(new XmlSerializer(typeof(CopyPartResult)).Deserialize(stream)); } - + return copyResult; } /// @@ -1082,7 +1049,7 @@ private async Task CopyObjectRequestAsync(string bucketName, string obje /// private async Task MultipartCopyUploadAsync(string bucketName, string objectName, string destBucketName, string destObjectName, CopyConditions copyConditions, long copySize,Dictionary metadata = null, ServerSideEncryption sseSrc = null, ServerSideEncryption sseDest = null, CancellationToken cancellationToken=default(CancellationToken)) { - // For all sizes greater than 5GB or if Copy byte range specified in conditions and byte range larger + // For all sizes greater than 5GB or if Copy byte range specified in conditions and byte range larger // than minimum part size (5 MB) do multipart. dynamic multiPartInfo = utils.CalculateMultiPartSize(copySize); @@ -1142,7 +1109,7 @@ private async Task MultipartCopyUploadAsync(string bucketName, string objectName /// - /// Presigned get url - returns a presigned url to access an object's data without credentials.URL can have a maximum expiry of + /// Presigned get url - returns a presigned url to access an object's data without credentials.URL can have a maximum expiry of /// upto 7 days or a minimum of 1 second.Additionally, you can override a set of response headers using reqParams. /// /// Bucket to retrieve object from @@ -1165,7 +1132,7 @@ public async Task PresignedGetObjectAsync(string bucketName, string obje } /// - /// Presigned Put url -returns a presigned url to upload an object without credentials.URL can have a maximum expiry of + /// Presigned Put url -returns a presigned url to upload an object without credentials.URL can have a maximum expiry of /// upto 7 days or a minimum of 1 second. /// /// Bucket to retrieve object from @@ -1210,7 +1177,7 @@ public async Task>> PresignedPostPolicy { region = await BucketRegionCache.Instance.Update(this, policy.Bucket).ConfigureAwait(false); } - if (region == null) + if (region == null) { region = BucketRegionCache.Instance.Region(policy.Bucket); } @@ -1232,4 +1199,4 @@ public async Task>> PresignedPostPolicy return new Tuple>(this.restClient.BaseUrl.AbsoluteUri, policy.GetFormData()); } } -} \ No newline at end of file +} diff --git a/Minio/MinioClient.cs b/Minio/MinioClient.cs index 1ece43d22..ed0348ad9 100644 --- a/Minio/MinioClient.cs +++ b/Minio/MinioClient.cs @@ -126,7 +126,7 @@ private async Task getRegion(string bucketName) } /// - /// Constructs a RestRequest. For AWS, this function has the side-effect of overriding the baseUrl + /// Constructs a RestRequest. For AWS, this function has the side-effect of overriding the baseUrl /// in the RestClient with region specific host path or virtual style path. /// /// HTTP method @@ -162,7 +162,8 @@ internal async Task CreateRequest(Method method, string bucketName { this.restClient.Authenticator = new V4Authenticator(this.Secure, this.AccessKey, this.SecretKey); } - else { + else + { this.restClient.Authenticator = new V4Authenticator(this.Secure, this.AccessKey, this.SecretKey, this.Region); } @@ -213,7 +214,7 @@ internal async Task CreateRequest(Method method, string bucketName resource += utils.EncodePath(objectName); } - // Append query string passed in + // Append query string passed in if (resourcePath != null) { resource += resourcePath; @@ -303,7 +304,7 @@ public MinioClient(string endpoint, string accessKey = "", string secretKey = "" this.AccessKey = accessKey; this.SecretKey = secretKey; this.Region = region; - // Instantiate a region cache + // Instantiate a region cache this.regionCache = BucketRegionCache.Instance; initClient(); @@ -436,7 +437,7 @@ internal static void ParseError(IRestResponse response) BucketRegionCache.Instance.Remove(resource); e = new BucketNotFoundException(resource, "Not found."); } - + } else { @@ -470,11 +471,11 @@ internal static void ParseError(IRestResponse response) if (response.StatusCode.Equals(HttpStatusCode.NotFound) && response.Request.Resource.EndsWith("?policy") && response.Request.Method.Equals(Method.GET) && (errResponse.Code.Equals("NoSuchBucketPolicy"))) { - + ErrorResponseException ErrorException = new ErrorResponseException(errResponse.Message,errResponse.Code); ErrorException.Response = errResponse; ErrorException.XmlError = response.Content; - throw ErrorException; + throw ErrorException; } MinioException MinioException = new MinioException(errResponse.Message); @@ -505,7 +506,7 @@ private void HandleIfErrorResponse(IRestResponse response, IEnumerable - /// Logs the request sent to server and corresponding response + /// Logs the request sent to server and corresponding response /// /// ///