Skip to content

Commit

Permalink
More OCI source clean-up; pass tests
Browse files Browse the repository at this point in the history
* fix bucket => cloud bucket indirection
* refactor
* pass short-test

Signed-off-by: Alex Aizman <[email protected]>
  • Loading branch information
edmc-ss authored and alex-aizman committed Dec 17, 2024
1 parent 6e5d997 commit d44e2f2
Showing 1 changed file with 72 additions and 88 deletions.
160 changes: 72 additions & 88 deletions ais/backend/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@
package backend

// Outstanding [TODO] items:
// 0) Make coding style in this file match the rest of the codebase:
// a) too long/ugly var/field names
// b) cumbersome function names
// c) line length explosions (prefer 80 col... but *not* 1-line-per-arg style)
// 1) Need to parse OCI's ~/.oci/config file for non-ENV defaults (for req'd settings)
// 2) Validate ListObjects() should only return Name & Size in all cases (or improve)
// 3) Handle non-descending ListObjects() case (including listing of "virtual" directories)
// 4) Multi-Segment-Upload utilization (for fast/large object PUTs)... if practical
// 5) Add support for object versioning
// 6) Multi-Segment-Download utilization (for fast/large object GETs)... if practical
// 5) Add support for object versioning

import (
"context"
Expand All @@ -37,42 +33,42 @@ import (
)

const (
maxPageSizeMin = 1
maxPageSizeMax = 1000
maxPageSizeDefault = maxPageSizeMax
maxDownloadSegmentSizeMin = 4 * cos.KiB
maxDownloadSegmentSizeMax = 256 * cos.MiB
maxDownloadSegmentSizeDefault = maxDownloadSegmentSizeMax
multiPartDownloadThresholdMin = 8 * cos.KiB
multiPartDownloadThresholdMax = 512 * cos.MiB
multiPartDownloadThresholdDefault = multiPartDownloadThresholdMax
multiPartDownloadMaxThreadsMin = 4
multiPartDownloadMaxThreadsMax = 64
multiPartDownloadMaxThreadsDefault = 16
maxUploadSegmentSizeMin = 4 * cos.KiB
maxUploadSegmentSizeMax = 256 * cos.MiB
maxUploadSegmentSizeDefault = maxUploadSegmentSizeMax
multiPartUploadThresholdMin = 8 * cos.KiB
multiPartUploadThresholdMax = 512 * cos.MiB
multiPartUploadThresholdDefault = multiPartUploadThresholdMax
multiPartUploadMaxThreadsMin = 4
multiPartUploadMaxThreadsMax = 64
multiPartUploadMaxThreadsDefault = 16
maxPageSizeMin = 1
maxPageSizeMax = 1000
maxPageSizeDefault = maxPageSizeMax
mpdSegmentMaxSizeMin = 4 * cos.KiB
mpdSegmentMaxSizeMax = 256 * cos.MiB
mpdSegmentMaxSizeDefault = mpdSegmentMaxSizeMax
mpdThresholdMin = 8 * cos.KiB
mpdThresholdMax = 512 * cos.MiB
mpdThresholdDefault = mpdThresholdMax
mpdMaxThreadsMin = 4
mpdMaxThreadsMax = 64
mpdMaxThreadsDefault = 16
mpuSegmentMaxSizeMin = 4 * cos.KiB
mpuSegmentMaxSizeMax = 256 * cos.MiB
mpuSegmentMaxSizeDefault = mpuSegmentMaxSizeMax
mpuThresholdMin = 8 * cos.KiB
mpuThresholdMax = 512 * cos.MiB
mpuThresholdDefault = mpuThresholdMax
mpuMaxThreadsMin = 4
mpuMaxThreadsMax = 64
mpuMaxThreadsDefault = 16
)

type ocibp struct {
t core.TargetPut
configurationProvider ocicmn.ConfigurationProvider
compartmentOCID string
maxPageSize int64
maxDownloadSegmentSize int64
multiPartDownloadThreshold int64
multiPartDownloadMaxThreads int64
maxUploadSegmentSize int64
multiPartUploadThreshold int64
multiPartUploadMaxThreads int64
client ocios.ObjectStorageClient
namespace string
t core.TargetPut
configurationProvider ocicmn.ConfigurationProvider
compartmentOCID string
maxPageSize int64
mpdSegmentMaxSize int64
mpdThreshold int64
mpdMaxThreads int64
mpuSegmentMaxSize int64
mpuThreshold int64
mpuMaxThreads int64
client ocios.ObjectStorageClient
namespace string
base
}

Expand All @@ -94,28 +90,28 @@ func NewOCI(t core.TargetPut, tstats stats.Tracker) (core.Backend, error) {
if err := bp.set(env.OCI.MaxPageSize, maxPageSizeMin, maxPageSizeMax, maxPageSizeDefault, &bp.maxPageSize); err != nil {
return nil, err
}
if err := bp.set(env.OCI.MaxDownloadSegmentSize, maxDownloadSegmentSizeMin, maxDownloadSegmentSizeMax,
maxDownloadSegmentSizeDefault, &bp.maxDownloadSegmentSize); err != nil {
if err := bp.set(env.OCI.MaxDownloadSegmentSize, mpdSegmentMaxSizeMin, mpdSegmentMaxSizeMax,
mpdSegmentMaxSizeDefault, &bp.mpdSegmentMaxSize); err != nil {
return nil, err
}
if err := bp.set(env.OCI.MultiPartDownloadThreshold, multiPartDownloadThresholdMin, multiPartDownloadThresholdMax,
multiPartDownloadThresholdDefault, &bp.multiPartDownloadThreshold); err != nil {
if err := bp.set(env.OCI.MultiPartDownloadThreshold, mpdThresholdMin, mpdThresholdMax,
mpdThresholdDefault, &bp.mpdThreshold); err != nil {
return nil, err
}
if err := bp.set(env.OCI.MultiPartDownloadMaxThreads, multiPartDownloadMaxThreadsMin, multiPartDownloadMaxThreadsMax,
multiPartDownloadMaxThreadsDefault, &bp.multiPartDownloadMaxThreads); err != nil {
if err := bp.set(env.OCI.MultiPartDownloadMaxThreads, mpdMaxThreadsMin, mpdMaxThreadsMax,
mpdMaxThreadsDefault, &bp.mpdMaxThreads); err != nil {
return nil, err
}
if err := bp.set(env.OCI.MaxUploadSegmentSize, maxUploadSegmentSizeMin, maxUploadSegmentSizeMax,
maxUploadSegmentSizeDefault, &bp.maxUploadSegmentSize); err != nil {
if err := bp.set(env.OCI.MaxUploadSegmentSize, mpuSegmentMaxSizeMin, mpuSegmentMaxSizeMax,
mpuSegmentMaxSizeDefault, &bp.mpuSegmentMaxSize); err != nil {
return nil, err
}
if err := bp.set(env.OCI.MultiPartUploadThreshold, multiPartUploadThresholdMin, multiPartUploadThresholdMax,
multiPartUploadThresholdDefault, &bp.multiPartUploadThreshold); err != nil {
if err := bp.set(env.OCI.MultiPartUploadThreshold, mpuThresholdMin, mpuThresholdMax,
mpuThresholdDefault, &bp.mpuThreshold); err != nil {
return nil, err
}
if err := bp.set(env.OCI.MultiPartUploadMaxThreads, multiPartUploadMaxThreadsMin, multiPartUploadMaxThreadsMax,
multiPartUploadMaxThreadsDefault, &bp.multiPartUploadMaxThreads); err != nil {
if err := bp.set(env.OCI.MultiPartUploadMaxThreads, mpuMaxThreadsMin, mpuMaxThreadsMax,
mpuMaxThreadsDefault, &bp.mpuMaxThreads); err != nil {
return nil, err
}

Expand Down Expand Up @@ -169,11 +165,10 @@ func ociStatus(rawResponse *http.Response) (ecode int) {

func (bp *ocibp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (ecode int, err error) {
var (
cloudBck = bck.RemoteBck()
delimiter = string("/")
fields string
limitAsInt int
req ocios.ListObjectsRequest
resp ocios.ListObjectsResponse
lsoEnt *cmn.LsoEnt
objectSummary ocios.ObjectSummary
)
Expand All @@ -194,9 +189,9 @@ func (bp *ocibp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (e

fields = "name,size"

req = ocios.ListObjectsRequest{
req := ocios.ListObjectsRequest{
NamespaceName: &bp.namespace,
BucketName: &bck.Name,
BucketName: &cloudBck.Name,
Limit: &limitAsInt,
Fields: &fields,
}
Expand All @@ -211,7 +206,7 @@ func (bp *ocibp) ListObjects(bck *meta.Bck, msg *apc.LsoMsg, lst *cmn.LsoRes) (e
req.Delimiter = &delimiter
}

resp, err = bp.client.ListObjects(context.Background(), req)
resp, err := bp.client.ListObjects(context.Background(), req)
if err != nil {
ecode = ociStatus(resp.RawResponse)
return
Expand Down Expand Up @@ -256,15 +251,18 @@ func (bp *ocibp) ListBuckets(_ cmn.QueryBcks) (bcks cmn.Bcks, ecode int, _ error
return bcks, 0, nil
}

// [TODO] Need to implement multi-threaded PUT when "length" exceeds bp.multiPartUploadThreshold
// [TODO] Need to implement multi-threaded PUT when "length" exceeds bp.mpuThreshold
func (bp *ocibp) PutObj(r io.ReadCloser, lom *core.LOM, _ *http.Request) (int, error) {
cloudBck := lom.Bck().RemoteBck()
req := ocios.PutObjectRequest{
NamespaceName: &bp.namespace,
BucketName: &lom.Bck().Name,
BucketName: &cloudBck.Name,
ObjectName: &lom.ObjName,
PutObjectBody: r,
}
resp, err := bp.client.PutObject(context.Background(), req)
// Note: in case PutObject() failed to close r...
_ = r.Close()
if err != nil {
return ociStatus(resp.RawResponse), err
}
Expand All @@ -277,22 +275,18 @@ func (bp *ocibp) PutObj(r io.ReadCloser, lom *core.LOM, _ *http.Request) (int, e
lom.SetCustomKey(cmn.MD5ObjMD, *resp.OpcContentMd5)
}

// cos.Close(r) TODO -- FIXME: revisit
return 0, nil
}

func (bp *ocibp) DeleteObj(lom *core.LOM) (ecode int, err error) {
var (
req ocios.DeleteObjectRequest
resp ocios.DeleteObjectResponse
)
req = ocios.DeleteObjectRequest{
cloudBck := lom.Bck().RemoteBck()
req := ocios.DeleteObjectRequest{
NamespaceName: &bp.namespace,
BucketName: &lom.Bck().Name,
BucketName: &cloudBck.Name,
ObjectName: &lom.ObjName,
}

resp, err = bp.client.DeleteObject(context.Background(), req)
resp, err := bp.client.DeleteObject(context.Background(), req)
if err != nil {
ecode = ociStatus(resp.RawResponse)
return
Expand All @@ -302,17 +296,13 @@ func (bp *ocibp) DeleteObj(lom *core.LOM) (ecode int, err error) {
}

func (bp *ocibp) HeadBucket(ctx context.Context, bck *meta.Bck) (bckProps cos.StrKVs, ecode int, err error) {
var (
req ocios.HeadBucketRequest
resp ocios.HeadBucketResponse
)

req = ocios.HeadBucketRequest{
cloudBck := bck.RemoteBck()
req := ocios.HeadBucketRequest{
NamespaceName: &bp.namespace,
BucketName: &bck.Name,
BucketName: &cloudBck.Name,
}

resp, err = bp.client.HeadBucket(ctx, req)
resp, err := bp.client.HeadBucket(ctx, req)
if err != nil {
ecode = ociStatus(resp.RawResponse)
return
Expand All @@ -326,17 +316,14 @@ func (bp *ocibp) HeadBucket(ctx context.Context, bck *meta.Bck) (bckProps cos.St
}

func (bp *ocibp) HeadObj(ctx context.Context, lom *core.LOM, _ *http.Request) (objAttrs *cmn.ObjAttrs, ecode int, err error) {
var (
req ocios.HeadObjectRequest
resp ocios.HeadObjectResponse
)
req = ocios.HeadObjectRequest{
cloudBck := lom.Bck().RemoteBck()
req := ocios.HeadObjectRequest{
NamespaceName: &bp.namespace,
BucketName: &lom.Bck().Name,
BucketName: &cloudBck.Name,
ObjectName: &lom.ObjName,
}

resp, err = bp.client.HeadObject(ctx, req)
resp, err := bp.client.HeadObject(ctx, req)
if err != nil {
ecode = ociStatus(resp.RawResponse)
return
Expand Down Expand Up @@ -370,27 +357,24 @@ func (bp *ocibp) GetObj(ctx context.Context, lom *core.LOM, owt cmn.OWT, _ *http
return 0, err
}

// [TODO] Need to implement multi-threaded GET when "length" exceeds bp.multiPartDownloadThreshold
// [TODO]
// 1. Need to implement multi-threaded GET when "length" exceeds bp.mpdThreshold
// 2. Consider setting req.IfMatch to lom.GetCustomKey(cmn.ETag) if present
func (bp *ocibp) GetObjReader(ctx context.Context, lom *core.LOM, offset, length int64) (res core.GetReaderResult) {
var (
cloudBck = lom.Bck().RemoteBck()
rangeHeader string
)
req := ocios.GetObjectRequest{
NamespaceName: &bp.namespace,
BucketName: &lom.Bck().Name,
BucketName: &cloudBck.Name,
ObjectName: &lom.ObjName,
}
if length > 0 {
rangeHeader = cmn.MakeRangeHdr(offset, length)
req.Range = &rangeHeader
}

// TODO -- FIXME: revisit
matchingETag, matchingETagDesired := lom.GetCustomKey(cmn.ETag)
if matchingETagDesired && (matchingETag != "") {
req.IfMatch = &matchingETag
}

resp, err := bp.client.GetObject(ctx, req)
if err != nil {
res.Err = err
Expand Down

0 comments on commit d44e2f2

Please sign in to comment.