diff --git a/README.md b/README.md index 6d848e79..d8f58023 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ See [MAINTAINERS.md](https://github.com/thanos-io/thanos/blob/main/MAINTAINERS.m The core this module is the [`Bucket` interface](objstore.go): -```go mdox-exec="sed -n '37,50p' objstore.go" +```go mdox-exec="sed -n '39,55p' objstore.go" // Bucket provides read and write access to an object storage bucket. // NOTE: We assume strong consistency for write-read flow. type Bucket interface { @@ -63,18 +63,31 @@ type Bucket interface { // If object does not exist in the moment of deletion, Delete should throw error. Delete(ctx context.Context, name string) error + // Name returns the bucket name for the provider. + Name() string +} ``` All [provider implementations](providers) have to implement `Bucket` interface that allows common read and write operations that all supported by all object providers. If you want to limit the code that will do bucket operation to only read access (smart idea, allowing to limit access permissions), you can use the [`BucketReader` interface](objstore.go): -```go mdox-exec="sed -n '68,93p' objstore.go" - +```go mdox-exec="sed -n '71,106p' objstore.go" // BucketReader provides read access to an object storage bucket. type BucketReader interface { // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. + // Entries are passed to function in sorted order. - Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error + Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error + + // IterWithAttributes calls f for each entry in the given directory similar to Iter. + // In addition to Name, it also includes requested object attributes in the argument to f. + // + // Attributes can be requested using IterOption. + // Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported. + IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error + + // SupportedIterOptions returns a list of supported IterOptions by the underlying provider. + SupportedIterOptions() []IterOptionType // Get returns a reader for the given object name. Get(ctx context.Context, name string) (io.ReadCloser, error) @@ -374,6 +387,7 @@ config: server_name: "" insecure_skip_verify: false disable_compression: false + chunk_size_bytes: 0 prefix: "" ``` @@ -447,6 +461,7 @@ config: storage_account: "" storage_account_key: "" storage_connection_string: "" + storage_create_container: false container: "" endpoint: "" user_assigned_id: "" diff --git a/inmem.go b/inmem.go index ed256c9c..d550e283 100644 --- a/inmem.go +++ b/inmem.go @@ -106,6 +106,20 @@ func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error, return nil } +func (i *InMemBucket) SupportedIterOptions() []IterOptionType { + return []IterOptionType{Recursive} +} + +func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error { + if err := ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + + return b.Iter(ctx, dir, func(name string) error { + return f(IterObjectAttributes{Name: name}) + }, options...) +} + // Get returns a reader for the given object name. func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error) { if name == "" { diff --git a/objstore.go b/objstore.go index 62f1c655..80916888 100644 --- a/objstore.go +++ b/objstore.go @@ -6,11 +6,13 @@ package objstore import ( "bytes" "context" + "fmt" "io" "io/fs" "os" "path" "path/filepath" + "slices" "strings" "sync" "time" @@ -70,8 +72,19 @@ type InstrumentedBucket interface { type BucketReader interface { // Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full // object name including the prefix of the inspected directory. + // Entries are passed to function in sorted order. - Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error + Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error + + // IterWithAttributes calls f for each entry in the given directory similar to Iter. + // In addition to Name, it also includes requested object attributes in the argument to f. + // + // Attributes can be requested using IterOption. + // Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported. + IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error + + // SupportedIterOptions returns a list of supported IterOptions by the underlying provider. + SupportedIterOptions() []IterOptionType // Get returns a reader for the given object name. Get(ctx context.Context, name string) (io.ReadCloser, error) @@ -101,24 +114,66 @@ type InstrumentedBucketReader interface { ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader } +var ErrOptionNotSupported = errors.New("iter option is not supported") + +// IterOptionType is used for type-safe option support checking. +type IterOptionType int + +const ( + Recursive IterOptionType = iota + UpdatedAt +) + // IterOption configures the provided params. -type IterOption func(params *IterParams) +type IterOption struct { + Type IterOptionType + Apply func(params *IterParams) +} // WithRecursiveIter is an option that can be applied to Iter() to recursively list objects // in the bucket. -func WithRecursiveIter(params *IterParams) { - params.Recursive = true +func WithRecursiveIter() IterOption { + return IterOption{ + Type: Recursive, + Apply: func(params *IterParams) { + params.Recursive = true + }, + } +} + +// WithUpdatedAt is an option that can be applied to Iter() to +// include the last modified time in the attributes. +// NB: Prefixes may not report last modified time. +// This option is currently supported for the azure, aws, bos, gcs and filesystem providers. +func WithUpdatedAt() IterOption { + return IterOption{ + Type: UpdatedAt, + Apply: func(params *IterParams) { + params.LastModified = true + }, + } } // IterParams holds the Iter() parameters and is used by objstore clients implementations. type IterParams struct { - Recursive bool + Recursive bool + LastModified bool +} + +func ValidateIterOptions(supportedOptions []IterOptionType, options ...IterOption) error { + for _, opt := range options { + if !slices.Contains(supportedOptions, opt.Type) { + return fmt.Errorf("%w: %v", ErrOptionNotSupported, opt.Type) + } + } + + return nil } func ApplyIterOptions(options ...IterOption) IterParams { out := IterParams{} for _, opt := range options { - opt(&out) + opt.Apply(&out) } return out } @@ -189,6 +244,20 @@ type ObjectAttributes struct { LastModified time.Time `json:"last_modified"` } +type IterObjectAttributes struct { + Name string + lastModified time.Time +} + +func (i *IterObjectAttributes) SetLastModified(t time.Time) { + i.lastModified = t +} + +// LastModified returns the timestamp the object was last modified. Returns false if the timestamp is not available. +func (i *IterObjectAttributes) LastModified() (time.Time, bool) { + return i.lastModified, !i.lastModified.IsZero() +} + // TryToGetSize tries to get upfront size from reader. // Some implementations may return only size of unread data in the reader, so it's best to call this method before // doing any reading. @@ -531,7 +600,7 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket return b.WithExpectedErrs(fn) } -func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error { +func (b *metricBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error { const op = OpIter b.metrics.ops.WithLabelValues(op).Inc() @@ -546,6 +615,23 @@ func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) return err } +func (b *metricBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + const op = OpIter + b.metrics.ops.WithLabelValues(op).Inc() + + err := b.bkt.IterWithAttributes(ctx, dir, f, options...) + if err != nil { + if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled { + b.metrics.opsFailures.WithLabelValues(op).Inc() + } + } + return err +} + +func (b *metricBucket) SupportedIterOptions() []IterOptionType { + return b.bkt.SupportedIterOptions() +} + func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) { const op = OpAttributes b.metrics.ops.WithLabelValues(op).Inc() diff --git a/prefixed_bucket.go b/prefixed_bucket.go index f2b71434..a76b34c3 100644 --- a/prefixed_bucket.go +++ b/prefixed_bucket.go @@ -54,6 +54,19 @@ func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) er }, options...) } +func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + pdir := withPrefix(p.prefix, dir) + + return p.bkt.IterWithAttributes(ctx, pdir, func(attrs IterObjectAttributes) error { + attrs.Name = strings.TrimPrefix(attrs.Name, p.prefix+DirDelim) + return f(attrs) + }, options...) +} + +func (p *PrefixedBucket) SupportedIterOptions() []IterOptionType { + return p.bkt.SupportedIterOptions() +} + // Get returns a reader for the given object name. func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { return p.bkt.Get(ctx, conditionalPrefix(p.prefix, name)) diff --git a/prefixed_bucket_test.go b/prefixed_bucket_test.go index f93c8580..6252e05d 100644 --- a/prefixed_bucket_test.go +++ b/prefixed_bucket_test.go @@ -74,7 +74,7 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) { testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) expected := []string{"dir/file1.jpg", "file1.jpg"} sort.Strings(expected) sort.Strings(seen) diff --git a/providers/azure/azure.go b/providers/azure/azure.go index e125ca35..5689dc62 100644 --- a/providers/azure/azure.go +++ b/providers/azure/azure.go @@ -193,9 +193,15 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapR return bkt, nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + prefix := dir if prefix != "" && !strings.HasSuffix(prefix, DirDelim) { prefix += DirDelim @@ -211,7 +217,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return err } for _, blob := range resp.Segment.BlobItems { - if err := f(*blob.Name); err != nil { + attrs := objstore.IterObjectAttributes{ + Name: *blob.Name, + } + if params.LastModified { + attrs.SetLastModified(*blob.Properties.LastModified) + } + if err := f(attrs); err != nil { return err } } @@ -227,12 +239,18 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return err } for _, blobItem := range resp.Segment.BlobItems { - if err := f(*blobItem.Name); err != nil { + attrs := objstore.IterObjectAttributes{ + Name: *blobItem.Name, + } + if params.LastModified { + attrs.SetLastModified(*blobItem.Properties.LastModified) + } + if err := f(attrs); err != nil { return err } } for _, blobPrefix := range resp.Segment.BlobPrefixes { - if err := f(*blobPrefix.Name); err != nil { + if err := f(objstore.IterObjectAttributes{Name: *blobPrefix.Name}); err != nil { return err } } @@ -240,6 +258,23 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return nil } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations. func (b *Bucket) IsObjNotFoundErr(err error) bool { if err == nil { diff --git a/providers/bos/bos.go b/providers/bos/bos.go index 9faa93f3..20c8dd3e 100644 --- a/providers/bos/bos.go +++ b/providers/bos/bos.go @@ -176,16 +176,23 @@ func (b *Bucket) Upload(_ context.Context, name string, r io.Reader) error { return nil } -// Iter calls f for each entry in the given directory (not recursive). The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + if dir != "" { dir = strings.TrimSuffix(dir, objstore.DirDelim) + objstore.DirDelim } delimiter := objstore.DirDelim - if objstore.ApplyIterOptions(opt...).Recursive { + params := objstore.ApplyIterOptions(options...) + if params.Recursive { delimiter = "" } @@ -207,13 +214,25 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt marker = objects.NextMarker for _, object := range objects.Contents { - if err := f(object.Key); err != nil { + attrs := objstore.IterObjectAttributes{ + Name: object.Key, + } + + if params.LastModified && object.LastModified != "" { + lastModified, err := time.Parse(time.RFC1123, object.LastModified) + if err != nil { + return fmt.Errorf("iter: get last modified: %w", err) + } + attrs.SetLastModified(lastModified) + } + + if err := f(attrs); err != nil { return err } } for _, object := range objects.CommonPrefixes { - if err := f(object.Prefix); err != nil { + if err := f(objstore.IterObjectAttributes{Name: object.Prefix}); err != nil { return err } } @@ -224,6 +243,23 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return nil } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { return b.getRange(ctx, b.name, name, 0, -1) diff --git a/providers/cos/cos.go b/providers/cos/cos.go index 6bd39caa..9caf567e 100644 --- a/providers/cos/cos.go +++ b/providers/cos/cos.go @@ -276,7 +276,11 @@ func (b *Bucket) Delete(ctx context.Context, name string) error { return nil } -// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive} +} + +// Iter calls f for each entry in the given directory. The argument to f is the full // object name including the prefix of the inspected directory. func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { if dir != "" { @@ -298,6 +302,16 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return nil } +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + + return b.Iter(ctx, dir, func(name string) error { + return f(objstore.IterObjectAttributes{Name: name}) + }, options...) +} + func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { if name == "" { return nil, errors.New("given object name should not empty") @@ -493,7 +507,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) { return nil, nil, err } - if err := b.Iter(context.Background(), "", func(f string) error { + if err := b.Iter(context.Background(), "", func(_ string) error { return errors.Errorf("bucket %s is not empty", c.Bucket) }); err != nil { return nil, nil, errors.Wrapf(err, "cos check bucket %s", c.Bucket) diff --git a/providers/filesystem/filesystem.go b/providers/filesystem/filesystem.go index 2ed42ee8..01dca4bb 100644 --- a/providers/filesystem/filesystem.go +++ b/providers/filesystem/filesystem.go @@ -50,13 +50,19 @@ func NewBucket(rootDir string) (*Bucket, error) { return &Bucket{rootDir: absDir}, nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { if ctx.Err() != nil { return ctx.Err() } + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + params := objstore.ApplyIterOptions(options...) absDir := filepath.Join(b.rootDir, dir) info, err := os.Stat(absDir) @@ -92,7 +98,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if params.Recursive { // Recursively list files in the subdirectory. - if err := b.Iter(ctx, name, f, options...); err != nil { + if err := b.IterWithAttributes(ctx, name, f, options...); err != nil { return err } @@ -101,13 +107,42 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt continue } } - if err := f(name); err != nil { + + attrs := objstore.IterObjectAttributes{ + Name: name, + } + if params.LastModified { + absPath := filepath.Join(absDir, file.Name()) + stat, err := os.Stat(absPath) + if err != nil { + return errors.Wrapf(err, "stat %s", name) + } + attrs.SetLastModified(stat.ModTime()) + } + if err := f(attrs); err != nil { return err } } return nil } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { return b.GetRange(ctx, name, 0, -1) diff --git a/providers/filesystem/filesystem_test.go b/providers/filesystem/filesystem_test.go index c3621fe0..105aab8e 100644 --- a/providers/filesystem/filesystem_test.go +++ b/providers/filesystem/filesystem_test.go @@ -6,11 +6,15 @@ package filesystem import ( "bytes" "context" + "os" "strings" "sync" "testing" + "time" "github.com/efficientgo/core/testutil" + + "github.com/thanos-io/objstore" ) func TestDelete_EmptyDirDeletionRaceCondition(t *testing.T) { @@ -61,6 +65,60 @@ func TestIter_CancelledContext(t *testing.T) { testutil.Equals(t, context.Canceled, err) } +func TestIterWithAttributes(t *testing.T) { + dir := t.TempDir() + f, err := os.CreateTemp(dir, "test") + testutil.Ok(t, err) + defer f.Close() + + stat, err := f.Stat() + testutil.Ok(t, err) + + cases := []struct { + name string + opts []objstore.IterOption + expectedUpdatedAt time.Time + }{ + { + name: "no options", + opts: nil, + }, + { + name: "with updated at", + opts: []objstore.IterOption{ + objstore.WithUpdatedAt(), + }, + expectedUpdatedAt: stat.ModTime(), + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + b, err := NewBucket(dir) + testutil.Ok(t, err) + + var attrs objstore.IterObjectAttributes + + ctx := context.Background() + err = b.IterWithAttributes(ctx, "", func(objectAttrs objstore.IterObjectAttributes) error { + attrs = objectAttrs + return nil + }, tc.opts...) + + testutil.Ok(t, err) + + lastModified, ok := attrs.LastModified() + if zero := tc.expectedUpdatedAt.IsZero(); zero { + testutil.Equals(t, false, ok) + } else { + testutil.Equals(t, true, ok) + testutil.Equals(t, tc.expectedUpdatedAt, lastModified) + } + }) + + } +} + func TestGet_CancelledContext(t *testing.T) { b, err := NewBucket(t.TempDir()) testutil.Ok(t, err) diff --git a/providers/gcs/gcs.go b/providers/gcs/gcs.go index efb208e6..0d4690c2 100644 --- a/providers/gcs/gcs.go +++ b/providers/gcs/gcs.go @@ -181,31 +181,33 @@ func (b *Bucket) Name() string { return b.name } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the // object itself as one prefix item. if dir != "" { dir = strings.TrimSuffix(dir, DirDelim) + DirDelim } + appliedOpts := objstore.ApplyIterOptions(options...) + // If recursive iteration is enabled we should pass an empty delimiter. delimiter := DirDelim - if objstore.ApplyIterOptions(options...).Recursive { + if appliedOpts.Recursive { delimiter = "" } - query := &storage.Query{ + it := b.bkt.Objects(ctx, &storage.Query{ Prefix: dir, Delimiter: delimiter, - } - err := query.SetAttrSelection([]string{"Name"}) - if err != nil { - return err - } - - it := b.bkt.Objects(ctx, query) + }) for { select { case <-ctx.Done(): @@ -219,12 +221,34 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if err != nil { return err } - if err := f(attrs.Prefix + attrs.Name); err != nil { + + objAttrs := objstore.IterObjectAttributes{Name: attrs.Prefix + attrs.Name} + if appliedOpts.LastModified { + objAttrs.SetLastModified(attrs.Updated) + } + if err := f(objAttrs); err != nil { return err } } } +// Iter calls f for each entry in the given directory. The argument to f is the full +// object name including the prefix of the inspected directory. +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { r, err := b.bkt.Object(name).NewReader(ctx) diff --git a/providers/obs/obs.go b/providers/obs/obs.go index 4fb17baa..4ee9a227 100644 --- a/providers/obs/obs.go +++ b/providers/obs/obs.go @@ -232,6 +232,10 @@ func (b *Bucket) multipartUpload(size int64, key, uploadId string, body io.Reade func (b *Bucket) Close() error { return nil } +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive} +} + // Iter calls f for each entry in the given directory (not recursive.) func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { if dir != "" { @@ -270,6 +274,16 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return nil } +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + + return b.Iter(ctx, dir, func(name string) error { + return f(objstore.IterObjectAttributes{Name: name}) + }, options...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { return b.getRange(ctx, name, 0, -1) @@ -381,7 +395,7 @@ func NewTestBucketFromConfig(t testing.TB, c Config, reuseBucket bool, location bktToCreate := c.Bucket if c.Bucket != "" && reuseBucket { - if err := b.Iter(ctx, "", func(f string) error { + if err := b.Iter(ctx, "", func(_ string) error { return errors.Errorf("bucket %s is not empty", c.Bucket) }); err != nil { return nil, nil, err diff --git a/providers/oci/oci.go b/providers/oci/oci.go index bc8a8bd9..062da7c1 100644 --- a/providers/oci/oci.go +++ b/providers/oci/oci.go @@ -21,8 +21,9 @@ import ( "github.com/oracle/oci-go-sdk/v65/objectstorage/transfer" "github.com/pkg/errors" "github.com/prometheus/common/model" - "github.com/thanos-io/objstore" "gopkg.in/yaml.v2" + + "github.com/thanos-io/objstore" ) // DirDelim is the delimiter used to model a directory structure in an object store bucket. @@ -100,7 +101,11 @@ func (b *Bucket) Name() string { return b.name } -// Iter calls f for each entry in the given directory (not recursive). The argument to f is the full +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive} +} + +// Iter calls f for each entry in the given directory. The argument to f is the full // object name including the prefix of the inspected directory. func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the @@ -120,6 +125,7 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if objectName == "" || objectName == dir { continue } + if err := f(objectName); err != nil { return err } @@ -128,6 +134,16 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return nil } +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + + return b.Iter(ctx, dir, func(name string) error { + return f(objstore.IterObjectAttributes{Name: name}) + }, options...) +} + // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { response, err := getObject(ctx, *b, name, "") diff --git a/providers/oss/oss.go b/providers/oss/oss.go index e01aff2e..aee8c623 100644 --- a/providers/oss/oss.go +++ b/providers/oss/oss.go @@ -22,10 +22,9 @@ import ( "github.com/pkg/errors" "gopkg.in/yaml.v2" + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/clientutil" "github.com/thanos-io/objstore/exthttp" - - "github.com/thanos-io/objstore" ) // PartSize is a part size for multi part upload. @@ -216,7 +215,11 @@ func validate(config Config) error { return nil } -// Iter calls f for each entry in the given directory (not recursive). The argument to f is the full +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive} +} + +// Iter calls f for each entry in the given directory. The argument to f is the full // object name including the prefix of the inspected directory. func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { if dir != "" { @@ -258,6 +261,16 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return nil } +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + + return b.Iter(ctx, dir, func(name string) error { + return f(objstore.IterObjectAttributes{Name: name}) + }, options...) +} + func (b *Bucket) Name() string { return b.name } @@ -292,7 +305,7 @@ func NewTestBucketFromConfig(t testing.TB, c Config, reuseBucket bool) (objstore } if reuseBucket { - if err := b.Iter(context.Background(), "", func(f string) error { + if err := b.Iter(context.Background(), "", func(_ string) error { return errors.Errorf("bucket %s is not empty", c.Bucket) }); err != nil { return nil, nil, errors.Wrapf(err, "oss check bucket %s", c.Bucket) diff --git a/providers/s3/s3.go b/providers/s3/s3.go index 8e5b8b56..fc8da7b3 100644 --- a/providers/s3/s3.go +++ b/providers/s3/s3.go @@ -387,18 +387,26 @@ func ValidateForTests(conf Config) error { return nil } -// Iter calls f for each entry in the given directory. The argument to f is the full -// object name including the prefix of the inspected directory. -func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt} +} + +func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil { + return err + } + // Ensure the object name actually ends with a dir suffix. Otherwise we'll just iterate the // object itself as one prefix item. if dir != "" { dir = strings.TrimSuffix(dir, DirDelim) + DirDelim } + appliedOpts := objstore.ApplyIterOptions(options...) + opts := minio.ListObjectsOptions{ Prefix: dir, - Recursive: objstore.ApplyIterOptions(options...).Recursive, + Recursive: appliedOpts.Recursive, UseV1: b.listObjectsV1, } @@ -415,7 +423,15 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt if object.Key == dir { continue } - if err := f(object.Key); err != nil { + + attr := objstore.IterObjectAttributes{ + Name: object.Key, + } + if appliedOpts.LastModified { + attr.SetLastModified(object.LastModified) + } + + if err := f(attr); err != nil { return err } } @@ -423,6 +439,21 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt return ctx.Err() } +func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error { + // Only include recursive option since attributes are not used in this method. + var filteredOpts []objstore.IterOption + for _, opt := range opts { + if opt.Type == objstore.Recursive { + filteredOpts = append(filteredOpts, opt) + break + } + } + + return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error { + return f(attrs.Name) + }, filteredOpts...) +} + func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { sse, err := b.getServerSideEncryption(ctx) if err != nil { @@ -629,7 +660,7 @@ func NewTestBucketFromConfig(t testing.TB, location string, c Config, reuseBucke bktToCreate := c.Bucket if c.Bucket != "" && reuseBucket { - if err := b.Iter(ctx, "", func(f string) error { + if err := b.Iter(ctx, "", func(string) error { return errors.Errorf("bucket %s is not empty", c.Bucket) }); err != nil { return nil, nil, errors.Wrapf(err, "s3 check bucket %s", c.Bucket) diff --git a/providers/swift/swift.go b/providers/swift/swift.go index e872728e..86caa0c1 100644 --- a/providers/swift/swift.go +++ b/providers/swift/swift.go @@ -21,6 +21,7 @@ import ( "github.com/ncw/swift" "github.com/pkg/errors" "github.com/prometheus/common/model" + "github.com/thanos-io/objstore" "github.com/thanos-io/objstore/exthttp" "gopkg.in/yaml.v2" @@ -222,9 +223,13 @@ func (c *Container) Name() string { return c.name } +func (c *Container) SupportedIterOptions() []objstore.IterOptionType { + return []objstore.IterOptionType{objstore.Recursive} +} + // Iter calls f for each entry in the given directory. The argument to f is the full // object name including the prefix of the inspected directory. -func (c *Container) Iter(_ context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { +func (c *Container) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error { if dir != "" { dir = strings.TrimSuffix(dir, string(DirDelim)) + string(DirDelim) } @@ -242,6 +247,7 @@ func (c *Container) Iter(_ context.Context, dir string, f func(string) error, op if err != nil { return objects, errors.Wrap(err, "list object names") } + for _, object := range objects { if object == SegmentsDir { continue @@ -254,6 +260,16 @@ func (c *Container) Iter(_ context.Context, dir string, f func(string) error, op }) } +func (c *Container) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error { + if err := objstore.ValidateIterOptions(c.SupportedIterOptions(), options...); err != nil { + return err + } + + return c.Iter(ctx, dir, func(name string) error { + return f(objstore.IterObjectAttributes{Name: name}) + }, options...) +} + func (c *Container) get(name string, headers swift.Headers, checkHash bool) (io.ReadCloser, error) { if name == "" { return nil, errors.New("object name cannot be empty") diff --git a/testing.go b/testing.go index 28cbd658..d3fa1def 100644 --- a/testing.go +++ b/testing.go @@ -195,7 +195,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) expected = []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some", "id2/obj_4.some", "obj_5.some"} sort.Strings(expected) sort.Strings(seen) @@ -214,7 +214,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "id1/", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some"}, seen) // Can we iter over items from id1 dir? @@ -230,7 +230,7 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { testutil.Ok(t, bkt.Iter(ctx, "id1", func(fn string) error { seen = append(seen, fn) return nil - }, WithRecursiveIter)) + }, WithRecursiveIter())) testutil.Equals(t, []string{"id1/obj_1.some", "id1/obj_2.some", "id1/obj_3.some", "id1/sub/subobj_1.some", "id1/sub/subobj_2.some"}, seen) // Can we iter over items from not existing dir? @@ -295,6 +295,15 @@ func (d *delayingBucket) Iter(ctx context.Context, dir string, f func(string) er return d.bkt.Iter(ctx, dir, f, options...) } +func (d *delayingBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error { + time.Sleep(d.delay) + return d.bkt.IterWithAttributes(ctx, dir, f, options...) +} + +func (d *delayingBucket) SupportedIterOptions() []IterOptionType { + return d.bkt.SupportedIterOptions() +} + func (d *delayingBucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { time.Sleep(d.delay) return d.bkt.GetRange(ctx, name, off, length) diff --git a/tracing/opentelemetry/opentelemetry.go b/tracing/opentelemetry/opentelemetry.go index f65b0f0e..dad71e39 100644 --- a/tracing/opentelemetry/opentelemetry.go +++ b/tracing/opentelemetry/opentelemetry.go @@ -36,6 +36,24 @@ func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) erro return t.bkt.Iter(ctx, dir, f, options...) } +func (t TracingBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) (err error) { + ctx, span := t.tracer.Start(ctx, "bucket_iter_with_attrs") + defer span.End() + span.SetAttributes(attribute.String("dir", dir)) + + defer func() { + if err != nil { + span.RecordError(err) + } + }() + return t.bkt.IterWithAttributes(ctx, dir, f, options...) +} + +// SupportedIterOptions returns a list of supported IterOptions by the underlying provider. +func (t TracingBucket) SupportedIterOptions() []objstore.IterOptionType { + return t.bkt.SupportedIterOptions() +} + func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { ctx, span := t.tracer.Start(ctx, "bucket_get") defer span.End() diff --git a/tracing/opentracing/opentracing.go b/tracing/opentracing/opentracing.go index 0a26ceeb..cabe07b2 100644 --- a/tracing/opentracing/opentracing.go +++ b/tracing/opentracing/opentracing.go @@ -52,6 +52,18 @@ func (t TracingBucket) Iter(ctx context.Context, dir string, f func(string) erro return } +func (t TracingBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) (err error) { + doWithSpan(ctx, "bucket_iter_with_attrs", func(spanCtx context.Context, span opentracing.Span) { + span.LogKV("dir", dir) + err = t.bkt.IterWithAttributes(spanCtx, dir, f, options...) + }) + return +} + +func (t TracingBucket) SupportedIterOptions() []objstore.IterOptionType { + return t.bkt.SupportedIterOptions() +} + func (t TracingBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { span, spanCtx := startSpan(ctx, "bucket_get") span.LogKV("name", name)