diff --git a/ais/backend/oci.go b/ais/backend/oci.go index 7d03de9783..c727212dfb 100644 --- a/ais/backend/oci.go +++ b/ais/backend/oci.go @@ -7,16 +7,12 @@ package backend // Outstanding [TODO] items: -// 0) Make coding style in this file match the rest of the codebase: -// a) too long/ugly var/field names -// b) cumbersome function names -// c) line length explosions (prefer 80 col... but *not* 1-line-per-arg style) // 1) Need to parse OCI's ~/.oci/config file for non-ENV defaults (for req'd settings) // 2) Validate ListObjects() should only return Name & Size in all cases (or improve) // 3) Handle non-descending ListObjects() case (including listing of "virtual" directories) // 4) Multi-Segment-Upload utilization (for fast/large object PUTs)... if practical -// 5) Add support for object versioning // 6) Multi-Segment-Download utilization (for fast/large object GETs)... if practical +// 5) Add support for object versioning import ( "context" @@ -37,42 +33,42 @@ import ( ) const ( - maxPageSizeMin = 1 - maxPageSizeMax = 1000 - maxPageSizeDefault = maxPageSizeMax - maxDownloadSegmentSizeMin = 4 * cos.KiB - maxDownloadSegmentSizeMax = 256 * cos.MiB - maxDownloadSegmentSizeDefault = maxDownloadSegmentSizeMax - multiPartDownloadThresholdMin = 8 * cos.KiB - multiPartDownloadThresholdMax = 512 * cos.MiB - multiPartDownloadThresholdDefault = multiPartDownloadThresholdMax - multiPartDownloadMaxThreadsMin = 4 - multiPartDownloadMaxThreadsMax = 64 - multiPartDownloadMaxThreadsDefault = 16 - maxUploadSegmentSizeMin = 4 * cos.KiB - maxUploadSegmentSizeMax = 256 * cos.MiB - maxUploadSegmentSizeDefault = maxUploadSegmentSizeMax - multiPartUploadThresholdMin = 8 * cos.KiB - multiPartUploadThresholdMax = 512 * cos.MiB - multiPartUploadThresholdDefault = multiPartUploadThresholdMax - multiPartUploadMaxThreadsMin = 4 - multiPartUploadMaxThreadsMax = 64 - multiPartUploadMaxThreadsDefault = 16 + maxPageSizeMin = 1 + maxPageSizeMax = 1000 + maxPageSizeDefault = maxPageSizeMax + mpdSegmentMaxSizeMin = 4 * cos.KiB + mpdSegmentMaxSizeMax = 256 * cos.MiB + mpdSegmentMaxSizeDefault = mpdSegmentMaxSizeMax + mpdThresholdMin = 8 * cos.KiB + mpdThresholdMax = 512 * cos.MiB + mpdThresholdDefault = mpdThresholdMax + mpdMaxThreadsMin = 4 + mpdMaxThreadsMax = 64 + mpdMaxThreadsDefault = 16 + mpuSegmentMaxSizeMin = 4 * cos.KiB + mpuSegmentMaxSizeMax = 256 * cos.MiB + mpuSegmentMaxSizeDefault = mpuSegmentMaxSizeMax + mpuThresholdMin = 8 * cos.KiB + mpuThresholdMax = 512 * cos.MiB + mpuThresholdDefault = mpuThresholdMax + mpuMaxThreadsMin = 4 + mpuMaxThreadsMax = 64 + mpuMaxThreadsDefault = 16 ) type ocibp struct { - t core.TargetPut - configurationProvider ocicmn.ConfigurationProvider - compartmentOCID string - maxPageSize int64 - maxDownloadSegmentSize int64 - multiPartDownloadThreshold int64 - multiPartDownloadMaxThreads int64 - maxUploadSegmentSize int64 - multiPartUploadThreshold int64 - multiPartUploadMaxThreads int64 - client ocios.ObjectStorageClient - namespace string + t core.TargetPut + configurationProvider ocicmn.ConfigurationProvider + compartmentOCID string + maxPageSize int64 + mpdSegmentMaxSize int64 + mpdThreshold int64 + mpdMaxThreads int64 + mpuSegmentMaxSize int64 + mpuThreshold int64 + mpuMaxThreads int64 + client ocios.ObjectStorageClient + namespace string base } @@ -94,28 +90,28 @@ func NewOCI(t core.TargetPut, tstats stats.Tracker) (core.Backend, error) { if err := bp.set(env.OCI.MaxPageSize, maxPageSizeMin, maxPageSizeMax, maxPageSizeDefault, &bp.maxPageSize); err != nil { return nil, err } - if err := bp.set(env.OCI.MaxDownloadSegmentSize, maxDownloadSegmentSizeMin, maxDownloadSegmentSizeMax, - maxDownloadSegmentSizeDefault, &bp.maxDownloadSegmentSize); err != nil { + if err := bp.set(env.OCI.MaxDownloadSegmentSize, mpdSegmentMaxSizeMin, mpdSegmentMaxSizeMax, + mpdSegmentMaxSizeDefault, &bp.mpdSegmentMaxSize); err != nil { return nil, err } - if err := bp.set(env.OCI.MultiPartDownloadThreshold, multiPartDownloadThresholdMin, multiPartDownloadThresholdMax, - multiPartDownloadThresholdDefault, &bp.multiPartDownloadThreshold); err != nil { + if err := bp.set(env.OCI.MultiPartDownloadThreshold, mpdThresholdMin, mpdThresholdMax, + mpdThresholdDefault, &bp.mpdThreshold); err != nil { return nil, err } - if err := bp.set(env.OCI.MultiPartDownloadMaxThreads, multiPartDownloadMaxThreadsMin, multiPartDownloadMaxThreadsMax, - multiPartDownloadMaxThreadsDefault, &bp.multiPartDownloadMaxThreads); err != nil { + if err := bp.set(env.OCI.MultiPartDownloadMaxThreads, mpdMaxThreadsMin, mpdMaxThreadsMax, + mpdMaxThreadsDefault, &bp.mpdMaxThreads); err != nil { return nil, err } - if err := bp.set(env.OCI.MaxUploadSegmentSize, maxUploadSegmentSizeMin, maxUploadSegmentSizeMax, - maxUploadSegmentSizeDefault, &bp.maxUploadSegmentSize); err != nil { + if err := bp.set(env.OCI.MaxUploadSegmentSize, mpuSegmentMaxSizeMin, mpuSegmentMaxSizeMax, + mpuSegmentMaxSizeDefault, &bp.mpuSegmentMaxSize); err != nil { return nil, err } - if err := bp.set(env.OCI.MultiPartUploadThreshold, multiPartUploadThresholdMin, multiPartUploadThresholdMax, - multiPartUploadThresholdDefault, &bp.multiPartUploadThreshold); err != nil { + if err := bp.set(env.OCI.MultiPartUploadThreshold, mpuThresholdMin, mpuThresholdMax, + mpuThresholdDefault, &bp.mpuThreshold); err != nil { return nil, err } - if err := bp.set(env.OCI.MultiPartUploadMaxThreads, multiPartUploadMaxThreadsMin, multiPartUploadMaxThreadsMax, - multiPartUploadMaxThreadsDefault, &bp.multiPartUploadMaxThreads); err != nil { + if err := bp.set(env.OCI.MultiPartUploadMaxThreads, mpuMaxThreadsMin, mpuMaxThreadsMax, + mpuMaxThreadsDefault, &bp.mpuMaxThreads); err != nil { return nil, err } @@ -169,11 +165,10 @@ func ociStatus(rawResponse *http.Response) (ecode int) { func (bp *ocibp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (ecode int, err error) { var ( + cloudBck = bck.RemoteBck() delimiter = string("/") fields string limitAsInt int - req ocios.ListObjectsRequest - resp ocios.ListObjectsResponse lsoEnt *cmn.LsoEnt objectSummary ocios.ObjectSummary ) @@ -194,9 +189,9 @@ func (bp *ocibp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (e fields = "name,size" - req = ocios.ListObjectsRequest{ + req := ocios.ListObjectsRequest{ NamespaceName: &bp.namespace, - BucketName: &bck.Name, + BucketName: &cloudBck.Name, Limit: &limitAsInt, Fields: &fields, } @@ -211,7 +206,7 @@ func (bp *ocibp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (e req.Delimiter = &delimiter } - resp, err = bp.client.ListObjects(context.Background(), req) + resp, err := bp.client.ListObjects(context.Background(), req) if err != nil { ecode = ociStatus(resp.RawResponse) return @@ -256,15 +251,18 @@ func (bp *ocibp) ListBuckets(_ cmn.QueryBcks) (bcks cmn.Bcks, ecode int, _ error return bcks, 0, nil } -// [TODO] Need to implement multi-threaded PUT when "length" exceeds bp.multiPartUploadThreshold +// [TODO] Need to implement multi-threaded PUT when "length" exceeds bp.mpuThreshold func (bp *ocibp) PutObj(r io.ReadCloser, lom *core.LOM, _ *http.Request) (int, error) { + cloudBck := lom.Bck().RemoteBck() req := ocios.PutObjectRequest{ NamespaceName: &bp.namespace, - BucketName: &lom.Bck().Name, + BucketName: &cloudBck.Name, ObjectName: &lom.ObjName, PutObjectBody: r, } resp, err := bp.client.PutObject(context.Background(), req) + // Note: in case PutObject() failed to close r... + _ = r.Close() if err != nil { return ociStatus(resp.RawResponse), err } @@ -277,22 +275,18 @@ func (bp *ocibp) PutObj(r io.ReadCloser, lom *core.LOM, _ *http.Request) (int, e lom.SetCustomKey(cmn.MD5ObjMD, *resp.OpcContentMd5) } - // cos.Close(r) TODO -- FIXME: revisit return 0, nil } func (bp *ocibp) DeleteObj(lom *core.LOM) (ecode int, err error) { - var ( - req ocios.DeleteObjectRequest - resp ocios.DeleteObjectResponse - ) - req = ocios.DeleteObjectRequest{ + cloudBck := lom.Bck().RemoteBck() + req := ocios.DeleteObjectRequest{ NamespaceName: &bp.namespace, - BucketName: &lom.Bck().Name, + BucketName: &cloudBck.Name, ObjectName: &lom.ObjName, } - resp, err = bp.client.DeleteObject(context.Background(), req) + resp, err := bp.client.DeleteObject(context.Background(), req) if err != nil { ecode = ociStatus(resp.RawResponse) return @@ -302,17 +296,13 @@ func (bp *ocibp) DeleteObj(lom *core.LOM) (ecode int, err error) { } func (bp *ocibp) HeadBucket(ctx context.Context, bck *meta.Bck) (bckProps cos.StrKVs, ecode int, err error) { - var ( - req ocios.HeadBucketRequest - resp ocios.HeadBucketResponse - ) - - req = ocios.HeadBucketRequest{ + cloudBck := bck.RemoteBck() + req := ocios.HeadBucketRequest{ NamespaceName: &bp.namespace, - BucketName: &bck.Name, + BucketName: &cloudBck.Name, } - resp, err = bp.client.HeadBucket(ctx, req) + resp, err := bp.client.HeadBucket(ctx, req) if err != nil { ecode = ociStatus(resp.RawResponse) return @@ -326,17 +316,14 @@ func (bp *ocibp) HeadBucket(ctx context.Context, bck *meta.Bck) (bckProps cos.St } func (bp *ocibp) HeadObj(ctx context.Context, lom *core.LOM, _ *http.Request) (objAttrs *cmn.ObjAttrs, ecode int, err error) { - var ( - req ocios.HeadObjectRequest - resp ocios.HeadObjectResponse - ) - req = ocios.HeadObjectRequest{ + cloudBck := lom.Bck().RemoteBck() + req := ocios.HeadObjectRequest{ NamespaceName: &bp.namespace, - BucketName: &lom.Bck().Name, + BucketName: &cloudBck.Name, ObjectName: &lom.ObjName, } - resp, err = bp.client.HeadObject(ctx, req) + resp, err := bp.client.HeadObject(ctx, req) if err != nil { ecode = ociStatus(resp.RawResponse) return @@ -370,14 +357,17 @@ func (bp *ocibp) GetObj(ctx context.Context, lom *core.LOM, owt cmn.OWT, _ *http return 0, err } -// [TODO] Need to implement multi-threaded GET when "length" exceeds bp.multiPartDownloadThreshold +// [TODO] +// 1. Need to implement multi-threaded GET when "length" exceeds bp.mpdThreshold +// 2. Consider setting req.IfMatch to lom.GetCustomKey(cmn.ETag) if present func (bp *ocibp) GetObjReader(ctx context.Context, lom *core.LOM, offset, length int64) (res core.GetReaderResult) { var ( + cloudBck = lom.Bck().RemoteBck() rangeHeader string ) req := ocios.GetObjectRequest{ NamespaceName: &bp.namespace, - BucketName: &lom.Bck().Name, + BucketName: &cloudBck.Name, ObjectName: &lom.ObjName, } if length > 0 { @@ -385,12 +375,6 @@ func (bp *ocibp) GetObjReader(ctx context.Context, lom *core.LOM, offset, length req.Range = &rangeHeader } - // TODO -- FIXME: revisit - matchingETag, matchingETagDesired := lom.GetCustomKey(cmn.ETag) - if matchingETagDesired && (matchingETag != "") { - req.IfMatch = &matchingETag - } - resp, err := bp.client.GetObject(ctx, req) if err != nil { res.Err = err