diff --git a/CHANGELOG.md b/CHANGELOG.md index f0904faa..d0779d62 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,7 +25,6 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#79](https://github.com/thanos-io/objstore/pull/79) Metrics: Fix `objstore_bucket_operation_duration_seconds` for `iter` operations. ### Added -- [#63](https://github.com/thanos-io/objstore/pull/63) Implement a `IterWithAttributes` method on the bucket client. - [#15](https://github.com/thanos-io/objstore/pull/15) Add Oracle Cloud Infrastructure Object Storage Bucket support. - [#25](https://github.com/thanos-io/objstore/pull/25) S3: Support specifying S3 storage class. - [#32](https://github.com/thanos-io/objstore/pull/32) Swift: Support authentication using application credentials. @@ -56,6 +55,9 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#130](https://github.com/thanos-io/objstore/pull/130) feat: Decouple creating bucket metrics from instrumenting the bucket - [#147](https://github.com/thanos-io/objstore/pull/147) feat: Add MaxRetries config to cos, gcs and obs. - [#150](https://github.com/thanos-io/objstore/pull/150) Add support for roundtripper wrapper. +- [#63](https://github.com/thanos-io/objstore/pull/63) Implement a `IterWithAttributes` method on the bucket client. +- [#155](https://github.com/thanos-io/objstore/pull/155) Add a `Provider` method on `objstore.Client`. + ### Changed - [#38](https://github.com/thanos-io/objstore/pull/38) *: Upgrade minio-go version to `v7.0.45`. diff --git a/README.md b/README.md index d8f58023..516d1070 100644 --- a/README.md +++ b/README.md @@ -48,13 +48,15 @@ See [MAINTAINERS.md](https://github.com/thanos-io/thanos/blob/main/MAINTAINERS.m The core this module is the [`Bucket` interface](objstore.go): -```go mdox-exec="sed -n '39,55p' objstore.go" +```go mdox-exec="sed -n '55,73p' objstore.go" // Bucket provides read and write access to an object storage bucket. // NOTE: We assume strong consistency for write-read flow. type Bucket interface { io.Closer BucketReader + Provider() ObjProvider + // Upload the contents of the reader as an object into the bucket. // Upload should be idempotent. Upload(ctx context.Context, name string, r io.Reader) error @@ -70,7 +72,7 @@ type Bucket interface { All [provider implementations](providers) have to implement `Bucket` interface that allows common read and write operations that all supported by all object providers. If you want to limit the code that will do bucket operation to only read access (smart idea, allowing to limit access permissions), you can use the [`BucketReader` interface](objstore.go): -```go mdox-exec="sed -n '71,106p' objstore.go" +```go mdox-exec="sed -n '89,124p' objstore.go" // BucketReader provides read access to an object storage bucket. type BucketReader interface { // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full diff --git a/client/factory.go b/client/factory.go index 5fe5a741..089fd843 100644 --- a/client/factory.go +++ b/client/factory.go @@ -27,25 +27,10 @@ import ( "gopkg.in/yaml.v2" ) -type ObjProvider string - -const ( - FILESYSTEM ObjProvider = "FILESYSTEM" - GCS ObjProvider = "GCS" - S3 ObjProvider = "S3" - AZURE ObjProvider = "AZURE" - SWIFT ObjProvider = "SWIFT" - COS ObjProvider = "COS" - ALIYUNOSS ObjProvider = "ALIYUNOSS" - BOS ObjProvider = "BOS" - OCI ObjProvider = "OCI" - OBS ObjProvider = "OBS" -) - type BucketConfig struct { - Type ObjProvider `yaml:"type"` - Config interface{} `yaml:"config"` - Prefix string `yaml:"prefix" default:""` + Type objstore.ObjProvider `yaml:"type"` + Config interface{} `yaml:"config"` + Prefix string `yaml:"prefix" default:""` } // NewBucket initializes and returns new object storage clients. @@ -64,25 +49,25 @@ func NewBucket(logger log.Logger, confContentYaml []byte, component string, wrap var bucket objstore.Bucket switch strings.ToUpper(string(bucketConf.Type)) { - case string(GCS): + case string(objstore.GCS): bucket, err = gcs.NewBucket(context.Background(), logger, config, component, wrapRoundtripper) - case string(S3): + case string(objstore.S3): bucket, err = s3.NewBucket(logger, config, component, wrapRoundtripper) - case string(AZURE): + case string(objstore.AZURE): bucket, err = azure.NewBucket(logger, config, component, wrapRoundtripper) - case string(SWIFT): + case string(objstore.SWIFT): bucket, err = swift.NewContainer(logger, config, wrapRoundtripper) - case string(COS): + case string(objstore.COS): bucket, err = cos.NewBucket(logger, config, component, wrapRoundtripper) - case string(ALIYUNOSS): + case string(objstore.ALIYUNOSS): bucket, err = oss.NewBucket(logger, config, component, wrapRoundtripper) - case string(FILESYSTEM): + case string(objstore.FILESYSTEM): bucket, err = filesystem.NewBucketFromConfig(config) - case string(BOS): + case string(objstore.BOS): bucket, err = bos.NewBucket(logger, config, component) - case string(OCI): + case string(objstore.OCI): bucket, err = oci.NewBucket(logger, config, wrapRoundtripper) - case string(OBS): + case string(objstore.OBS): bucket, err = obs.NewBucket(logger, config) default: return nil, errors.Errorf("bucket with type %s is not supported", bucketConf.Type) diff --git a/inmem.go b/inmem.go index d550e283..6a344066 100644 --- a/inmem.go +++ b/inmem.go @@ -34,6 +34,8 @@ func NewInMemBucket() *InMemBucket { } } +func (b *InMemBucket) Provider() ObjProvider { return MEMORY } + // Objects returns a copy of the internally stored objects. // NOTE: For assert purposes. func (b *InMemBucket) Objects() map[string][]byte { diff --git a/objstore.go b/objstore.go index 33c6e5e8..86ecfa26 100644 --- a/objstore.go +++ b/objstore.go @@ -26,6 +26,22 @@ import ( "golang.org/x/sync/errgroup" ) +type ObjProvider string + +const ( + MEMORY ObjProvider = "MEMORY" + FILESYSTEM ObjProvider = "FILESYSTEM" + GCS ObjProvider = "GCS" + S3 ObjProvider = "S3" + AZURE ObjProvider = "AZURE" + SWIFT ObjProvider = "SWIFT" + COS ObjProvider = "COS" + ALIYUNOSS ObjProvider = "ALIYUNOSS" + BOS ObjProvider = "BOS" + OCI ObjProvider = "OCI" + OBS ObjProvider = "OBS" +) + const ( OpIter = "iter" OpGet = "get" @@ -42,6 +58,8 @@ type Bucket interface { io.Closer BucketReader + Provider() ObjProvider + // Upload the contents of the reader as an object into the bucket. // Upload should be idempotent. Upload(ctx context.Context, name string, r io.Reader) error @@ -583,6 +601,10 @@ type metricBucket struct { metrics *Metrics } +func (b *metricBucket) Provider() ObjProvider { + return b.bkt.Provider() +} + func (b *metricBucket) WithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket { return &metricBucket{ bkt: b.bkt, diff --git a/objtesting/foreach.go b/objtesting/foreach.go index 87d9ed1b..29d16c39 100644 --- a/objtesting/foreach.go +++ b/objtesting/foreach.go @@ -9,7 +9,6 @@ import ( "testing" "github.com/thanos-io/objstore" - "github.com/thanos-io/objstore/client" "github.com/thanos-io/objstore/providers/azure" "github.com/thanos-io/objstore/providers/bos" "github.com/thanos-io/objstore/providers/cos" @@ -26,7 +25,7 @@ import ( // IsObjStoreSkipped returns true if given provider ID is found in THANOS_TEST_OBJSTORE_SKIP array delimited by comma e.g: // THANOS_TEST_OBJSTORE_SKIP=GCS,S3,AZURE,SWIFT,COS,ALIYUNOSS,BOS,OCI. -func IsObjStoreSkipped(t *testing.T, provider client.ObjProvider) bool { +func IsObjStoreSkipped(t *testing.T, provider objstore.ObjProvider) bool { if e, ok := os.LookupEnv("THANOS_TEST_OBJSTORE_SKIP"); ok { obstores := strings.Split(e, ",") for _, objstore := range obstores { @@ -69,7 +68,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) }) // Optional GCS. - if !IsObjStoreSkipped(t, client.GCS) { + if !IsObjStoreSkipped(t, objstore.GCS) { t.Run("gcs", func(t *testing.T) { bkt, closeFn, err := gcs.NewTestBucket(t, os.Getenv("GCP_PROJECT")) testutil.Ok(t, err) @@ -84,7 +83,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) } // Optional S3. - if !IsObjStoreSkipped(t, client.S3) { + if !IsObjStoreSkipped(t, objstore.S3) { t.Run("aws s3", func(t *testing.T) { // TODO(bwplotka): Allow taking location from envvar. bkt, closeFn, err := s3.NewTestBucket(t, "us-west-2") @@ -103,7 +102,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) } // Optional Azure. - if !IsObjStoreSkipped(t, client.AZURE) { + if !IsObjStoreSkipped(t, objstore.AZURE) { t.Run("azure", func(t *testing.T) { bkt, closeFn, err := azure.NewTestBucket(t, "e2e-tests") testutil.Ok(t, err) @@ -117,7 +116,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) } // Optional SWIFT. - if !IsObjStoreSkipped(t, client.SWIFT) { + if !IsObjStoreSkipped(t, objstore.SWIFT) { t.Run("swift", func(t *testing.T) { container, closeFn, err := swift.NewTestContainer(t) testutil.Ok(t, err) @@ -131,7 +130,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) } // Optional COS. - if !IsObjStoreSkipped(t, client.COS) { + if !IsObjStoreSkipped(t, objstore.COS) { t.Run("Tencent cos", func(t *testing.T) { bkt, closeFn, err := cos.NewTestBucket(t) testutil.Ok(t, err) @@ -145,7 +144,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) } // Optional OSS. - if !IsObjStoreSkipped(t, client.ALIYUNOSS) { + if !IsObjStoreSkipped(t, objstore.ALIYUNOSS) { t.Run("AliYun oss", func(t *testing.T) { bkt, closeFn, err := oss.NewTestBucket(t) testutil.Ok(t, err) @@ -159,7 +158,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) } // Optional BOS. - if !IsObjStoreSkipped(t, client.BOS) { + if !IsObjStoreSkipped(t, objstore.BOS) { t.Run("Baidu BOS", func(t *testing.T) { bkt, closeFn, err := bos.NewTestBucket(t) testutil.Ok(t, err) @@ -173,7 +172,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) } // Optional OCI. - if !IsObjStoreSkipped(t, client.OCI) { + if !IsObjStoreSkipped(t, objstore.OCI) { t.Run("oci", func(t *testing.T) { bkt, closeFn, err := oci.NewTestBucket(t) testutil.Ok(t, err) @@ -186,7 +185,7 @@ func ForeachStore(t *testing.T, testFn func(t *testing.T, bkt objstore.Bucket)) } // Optional OBS. - if !IsObjStoreSkipped(t, client.OBS) { + if !IsObjStoreSkipped(t, objstore.OBS) { t.Run("obs", func(t *testing.T) { bkt, closeFn, err := obs.NewTestBucket(t, "cn-south-1") testutil.Ok(t, err) diff --git a/prefixed_bucket.go b/prefixed_bucket.go index a76b34c3..a37450ca 100644 --- a/prefixed_bucket.go +++ b/prefixed_bucket.go @@ -39,6 +39,8 @@ func withPrefix(prefix, name string) string { return prefix + DirDelim + name } +func (p *PrefixedBucket) Provider() ObjProvider { return p.bkt.Provider() } + func (p *PrefixedBucket) Close() error { return p.bkt.Close() } @@ -93,7 +95,7 @@ func (p *PrefixedBucket) IsAccessDeniedErr(err error) bool { } // Attributes returns information about the specified object. -func (p PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { +func (p *PrefixedBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { return p.bkt.Attributes(ctx, conditionalPrefix(p.prefix, name)) } diff --git a/providers/azure/azure.go b/providers/azure/azure.go index 5689dc62..8d055e77 100644 --- a/providers/azure/azure.go +++ b/providers/azure/azure.go @@ -193,6 +193,8 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapR return bkt, nil } +func (b *Bucket) Provider() objstore.ObjProvider { return objstore.AZURE } + func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} } diff --git a/providers/bos/bos.go b/providers/bos/bos.go index 20c8dd3e..0cc4352c 100644 --- a/providers/bos/bos.go +++ b/providers/bos/bos.go @@ -100,6 +100,8 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B return bkt, nil } +func (b *Bucket) Provider() objstore.ObjProvider { return objstore.BOS } + // Name returns the bucket name for the provider. func (b *Bucket) Name() string { return b.name diff --git a/providers/cos/cos.go b/providers/cos/cos.go index d81150b1..1851c984 100644 --- a/providers/cos/cos.go +++ b/providers/cos/cos.go @@ -159,6 +159,8 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string, wra return bkt, nil } +func (b *Bucket) Provider() objstore.ObjProvider { return objstore.COS } + // Name returns the bucket name for COS. func (b *Bucket) Name() string { return b.name diff --git a/providers/filesystem/filesystem.go b/providers/filesystem/filesystem.go index 01dca4bb..df602877 100644 --- a/providers/filesystem/filesystem.go +++ b/providers/filesystem/filesystem.go @@ -50,6 +50,8 @@ func NewBucket(rootDir string) (*Bucket, error) { return &Bucket{rootDir: absDir}, nil } +func (b *Bucket) Provider() objstore.ObjProvider { return objstore.FILESYSTEM } + func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} } diff --git a/providers/gcs/gcs.go b/providers/gcs/gcs.go index d54a6782..b89f8735 100644 --- a/providers/gcs/gcs.go +++ b/providers/gcs/gcs.go @@ -186,6 +186,8 @@ func newBucket(ctx context.Context, logger log.Logger, gc Config, opts []option. return bkt, nil } +func (b *Bucket) Provider() objstore.ObjProvider { return objstore.GCS } + // Name returns the bucket name for gcs. func (b *Bucket) Name() string { return b.name diff --git a/providers/obs/obs.go b/providers/obs/obs.go index ecd235c9..20294cc6 100644 --- a/providers/obs/obs.go +++ b/providers/obs/obs.go @@ -122,6 +122,8 @@ func NewBucketWithConfig(logger log.Logger, config Config) (*Bucket, error) { return bkt, nil } +func (b *Bucket) Provider() objstore.ObjProvider { return objstore.OBS } + // Name returns the bucket name for the provider. func (b *Bucket) Name() string { return b.name diff --git a/providers/oci/oci.go b/providers/oci/oci.go index 062da7c1..7b4230ec 100644 --- a/providers/oci/oci.go +++ b/providers/oci/oci.go @@ -96,6 +96,8 @@ type Bucket struct { requestMetadata common.RequestMetadata } +func (b *Bucket) Provider() objstore.ObjProvider { return objstore.OCI } + // Name returns the bucket name for the provider. func (b *Bucket) Name() string { return b.name diff --git a/providers/oss/oss.go b/providers/oss/oss.go index aee8c623..2a6cb219 100644 --- a/providers/oss/oss.go +++ b/providers/oss/oss.go @@ -68,6 +68,8 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { return NewTestBucketFromConfig(t, c, false) } +func (b *Bucket) Provider() objstore.ObjProvider { return objstore.ALIYUNOSS } + // Upload the contents of the reader as an object into the bucket. func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { // TODO(https://github.com/thanos-io/thanos/issues/678): Remove guessing length when minio provider will support multipart upload without this. diff --git a/providers/s3/s3.go b/providers/s3/s3.go index 27e82ffb..58590486 100644 --- a/providers/s3/s3.go +++ b/providers/s3/s3.go @@ -345,6 +345,8 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string, wra return bkt, nil } +func (b *Bucket) Provider() objstore.ObjProvider { return objstore.S3 } + // Name returns the bucket name for s3. func (b *Bucket) Name() string { return b.name diff --git a/providers/swift/swift.go b/providers/swift/swift.go index 86caa0c1..19eb0d45 100644 --- a/providers/swift/swift.go +++ b/providers/swift/swift.go @@ -218,6 +218,8 @@ func NewContainerFromConfig(logger log.Logger, sc *Config, createContainer bool, }, nil } +func (c *Container) Provider() objstore.ObjProvider { return objstore.SWIFT } + // Name returns the container name for swift. func (c *Container) Name() string { return c.name diff --git a/scripts/cfggen/main.go b/scripts/cfggen/main.go index 424bf9b0..e1bc6410 100644 --- a/scripts/cfggen/main.go +++ b/scripts/cfggen/main.go @@ -5,6 +5,7 @@ package main import ( "fmt" + "github.com/thanos-io/objstore" "io" "os" "path/filepath" @@ -35,17 +36,17 @@ var ( configs map[string]interface{} possibleValues []string - bucketConfigs = map[client.ObjProvider]interface{}{ - client.AZURE: azure.Config{}, - client.GCS: gcs.Config{}, - client.S3: s3.DefaultConfig, - client.SWIFT: swift.DefaultConfig, - client.COS: cos.DefaultConfig, - client.ALIYUNOSS: oss.Config{}, - client.FILESYSTEM: filesystem.Config{}, - client.BOS: bos.Config{}, - client.OCI: oci.Config{}, - client.OBS: obs.DefaultConfig, + bucketConfigs = map[objstore.ObjProvider]interface{}{ + objstore.AZURE: azure.Config{}, + objstore.GCS: gcs.Config{}, + objstore.S3: s3.DefaultConfig, + objstore.SWIFT: swift.DefaultConfig, + objstore.COS: cos.DefaultConfig, + objstore.ALIYUNOSS: oss.Config{}, + objstore.FILESYSTEM: filesystem.Config{}, + objstore.BOS: bos.Config{}, + objstore.OCI: oci.Config{}, + objstore.OBS: obs.DefaultConfig, } ) diff --git a/testing.go b/testing.go index d3fa1def..80f1e198 100644 --- a/testing.go +++ b/testing.go @@ -280,6 +280,8 @@ func WithDelay(bkt Bucket, delay time.Duration) Bucket { return &delayingBucket{bkt: bkt, delay: delay} } +func (d *delayingBucket) Provider() ObjProvider { return d.bkt.Provider() } + func (d *delayingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { time.Sleep(d.delay) return d.bkt.Get(ctx, name) diff --git a/tracing/opentelemetry/opentelemetry.go b/tracing/opentelemetry/opentelemetry.go index dad71e39..d5f9bd8c 100644 --- a/tracing/opentelemetry/opentelemetry.go +++ b/tracing/opentelemetry/opentelemetry.go @@ -23,6 +23,8 @@ func WrapWithTraces(bkt objstore.Bucket, tracer trace.Tracer) objstore.Instrumen return TracingBucket{tracer: tracer, bkt: bkt} } +func (t TracingBucket) Provider() objstore.ObjProvider { return t.bkt.Provider() } + func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) (err error) { ctx, span := t.tracer.Start(ctx, "bucket_iter") defer span.End() diff --git a/tracing/opentracing/opentracing.go b/tracing/opentracing/opentracing.go index cabe07b2..58bdea07 100644 --- a/tracing/opentracing/opentracing.go +++ b/tracing/opentracing/opentracing.go @@ -44,6 +44,8 @@ func WrapWithTraces(bkt objstore.Bucket) objstore.InstrumentedBucket { return TracingBucket{bkt: bkt} } +func (t TracingBucket) Provider() objstore.ObjProvider { return t.bkt.Provider() } + func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) (err error) { doWithSpan(ctx, "bucket_iter", func(spanCtx context.Context, span opentracing.Span) { span.LogKV("dir", dir)