Skip to content

Commit

Permalink
Add support for IterWithAttributes
Browse files Browse the repository at this point in the history
This commit adds support for an IterWithAttributes on the bucket client.
The method allows iterating through objects and getting multiple attributes
into the callback function, removing the need to do an Iter followed by Attrs.

For now, we only support getting the last updated time as an attribute, but the
implementation allows adding more in the future.

Not all buckets support this method. The client can check whether the bucket has
support by calling the SupportedIterOptions method on the client.

Co-authored-by: Ashwanth Goli <[email protected]>
Co-authored-by: Filip Petkovski <[email protected]>

Signed-off-by: Filip Petkovski <[email protected]>
  • Loading branch information
fpetkovski committed Oct 29, 2024
1 parent cfdd0e5 commit 3b23d35
Show file tree
Hide file tree
Showing 19 changed files with 520 additions and 61 deletions.
23 changes: 19 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ See [MAINTAINERS.md](https://github.com/thanos-io/thanos/blob/main/MAINTAINERS.m

The core this module is the [`Bucket` interface](objstore.go):

```go mdox-exec="sed -n '37,50p' objstore.go"
```go mdox-exec="sed -n '39,55p' objstore.go"
// Bucket provides read and write access to an object storage bucket.
// NOTE: We assume strong consistency for write-read flow.
type Bucket interface {
Expand All @@ -63,18 +63,31 @@ type Bucket interface {
// If object does not exist in the moment of deletion, Delete should throw error.
Delete(ctx context.Context, name string) error

// Name returns the bucket name for the provider.
Name() string
}
```

All [provider implementations](providers) have to implement `Bucket` interface that allows common read and write operations that all supported by all object providers. If you want to limit the code that will do bucket operation to only read access (smart idea, allowing to limit access permissions), you can use the [`BucketReader` interface](objstore.go):

```go mdox-exec="sed -n '68,93p' objstore.go"

```go mdox-exec="sed -n '71,106p' objstore.go"
// BucketReader provides read access to an object storage bucket.
type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.

// Entries are passed to function in sorted order.
Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error
Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
// In addition to Name, it also includes requested object attributes in the argument to f.
//
// Attributes can be requested using IterOption.
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
SupportedIterOptions() []IterOptionType

// Get returns a reader for the given object name.
Get(ctx context.Context, name string) (io.ReadCloser, error)
Expand Down Expand Up @@ -374,6 +387,7 @@ config:
server_name: ""
insecure_skip_verify: false
disable_compression: false
chunk_size_bytes: 0
prefix: ""
```

Expand Down Expand Up @@ -447,6 +461,7 @@ config:
storage_account: ""
storage_account_key: ""
storage_connection_string: ""
storage_create_container: false
container: ""
endpoint: ""
user_assigned_id: ""
Expand Down
14 changes: 14 additions & 0 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,20 @@ func (b *InMemBucket) Iter(_ context.Context, dir string, f func(string) error,
return nil
}

func (i *InMemBucket) SupportedIterOptions() []IterOptionType {
return []IterOptionType{Recursive}
}

func (b *InMemBucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error {
if err := ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
return err
}

return b.Iter(ctx, dir, func(name string) error {
return f(IterObjectAttributes{Name: name})
}, options...)
}

// Get returns a reader for the given object name.
func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error) {
if name == "" {
Expand Down
100 changes: 93 additions & 7 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ package objstore
import (
"bytes"
"context"
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -70,8 +72,19 @@ type InstrumentedBucket interface {
type BucketReader interface {
// Iter calls f for each entry in the given directory (not recursive.). The argument to f is the full
// object name including the prefix of the inspected directory.

// Entries are passed to function in sorted order.
Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error
Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error

// IterWithAttributes calls f for each entry in the given directory similar to Iter.
// In addition to Name, it also includes requested object attributes in the argument to f.
//
// Attributes can be requested using IterOption.
// Not all IterOptions are supported by all providers, requesting for an unsupported option will fail with ErrOptionNotSupported.
IterWithAttributes(ctx context.Context, dir string, f func(attrs IterObjectAttributes) error, options ...IterOption) error

// SupportedIterOptions returns a list of supported IterOptions by the underlying provider.
SupportedIterOptions() []IterOptionType

// Get returns a reader for the given object name.
Get(ctx context.Context, name string) (io.ReadCloser, error)
Expand Down Expand Up @@ -101,24 +114,66 @@ type InstrumentedBucketReader interface {
ReaderWithExpectedErrs(IsOpFailureExpectedFunc) BucketReader
}

var ErrOptionNotSupported = errors.New("iter option is not supported")

// IterOptionType is used for type-safe option support checking.
type IterOptionType int

const (
Recursive IterOptionType = iota
UpdatedAt
)

// IterOption configures the provided params.
type IterOption func(params *IterParams)
type IterOption struct {
Type IterOptionType
Apply func(params *IterParams)
}

// WithRecursiveIter is an option that can be applied to Iter() to recursively list objects
// in the bucket.
func WithRecursiveIter(params *IterParams) {
params.Recursive = true
func WithRecursiveIter() IterOption {
return IterOption{
Type: Recursive,
Apply: func(params *IterParams) {
params.Recursive = true
},
}
}

// WithUpdatedAt is an option that can be applied to Iter() to
// include the last modified time in the attributes.
// NB: Prefixes may not report last modified time.
// This option is currently supported for the azure, aws, bos, gcs and filesystem providers.
func WithUpdatedAt() IterOption {
return IterOption{
Type: UpdatedAt,
Apply: func(params *IterParams) {
params.LastModified = true
},
}
}

// IterParams holds the Iter() parameters and is used by objstore clients implementations.
type IterParams struct {
Recursive bool
Recursive bool
LastModified bool
}

func ValidateIterOptions(supportedOptions []IterOptionType, options ...IterOption) error {
for _, opt := range options {
if !slices.Contains(supportedOptions, opt.Type) {
return fmt.Errorf("%w: %v", ErrOptionNotSupported, opt.Type)
}
}

return nil
}

func ApplyIterOptions(options ...IterOption) IterParams {
out := IterParams{}
for _, opt := range options {
opt(&out)
opt.Apply(&out)
}
return out
}
Expand Down Expand Up @@ -189,6 +244,20 @@ type ObjectAttributes struct {
LastModified time.Time `json:"last_modified"`
}

type IterObjectAttributes struct {
Name string
lastModified time.Time
}

func (i *IterObjectAttributes) SetLastModified(t time.Time) {
i.lastModified = t
}

// LastModified returns the timestamp the object was last modified. Returns false if the timestamp is not available.
func (i *IterObjectAttributes) LastModified() (time.Time, bool) {
return i.lastModified, !i.lastModified.IsZero()
}

// TryToGetSize tries to get upfront size from reader.
// Some implementations may return only size of unread data in the reader, so it's best to call this method before
// doing any reading.
Expand Down Expand Up @@ -531,7 +600,7 @@ func (b *metricBucket) ReaderWithExpectedErrs(fn IsOpFailureExpectedFunc) Bucket
return b.WithExpectedErrs(fn)
}

func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string) error, options ...IterOption) error {
func (b *metricBucket) Iter(ctx context.Context, dir string, f func(string) error, options ...IterOption) error {
const op = OpIter
b.metrics.ops.WithLabelValues(op).Inc()

Expand All @@ -546,6 +615,23 @@ func (b *metricBucket) Iter(ctx context.Context, dir string, f func(name string)
return err
}

func (b *metricBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error {
const op = OpIter
b.metrics.ops.WithLabelValues(op).Inc()

err := b.bkt.IterWithAttributes(ctx, dir, f, options...)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
}
return err
}

func (b *metricBucket) SupportedIterOptions() []IterOptionType {
return b.bkt.SupportedIterOptions()
}

func (b *metricBucket) Attributes(ctx context.Context, name string) (ObjectAttributes, error) {
const op = OpAttributes
b.metrics.ops.WithLabelValues(op).Inc()
Expand Down
13 changes: 13 additions & 0 deletions prefixed_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,19 @@ func (p *PrefixedBucket) Iter(ctx context.Context, dir string, f func(string) er
}, options...)
}

func (p *PrefixedBucket) IterWithAttributes(ctx context.Context, dir string, f func(IterObjectAttributes) error, options ...IterOption) error {
pdir := withPrefix(p.prefix, dir)

return p.bkt.IterWithAttributes(ctx, pdir, func(attrs IterObjectAttributes) error {
attrs.Name = strings.TrimPrefix(attrs.Name, p.prefix+DirDelim)
return f(attrs)
}, options...)
}

func (p *PrefixedBucket) SupportedIterOptions() []IterOptionType {
return p.bkt.SupportedIterOptions()
}

// Get returns a reader for the given object name.
func (p *PrefixedBucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return p.bkt.Get(ctx, conditionalPrefix(p.prefix, name))
Expand Down
2 changes: 1 addition & 1 deletion prefixed_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func UsesPrefixTest(t *testing.T, bkt Bucket, prefix string) {
testutil.Ok(t, pBkt.Iter(context.Background(), "", func(fn string) error {
seen = append(seen, fn)
return nil
}, WithRecursiveIter))
}, WithRecursiveIter()))
expected := []string{"dir/file1.jpg", "file1.jpg"}
sort.Strings(expected)
sort.Strings(seen)
Expand Down
47 changes: 41 additions & 6 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,9 +193,15 @@ func NewBucketWithConfig(logger log.Logger, conf Config, component string, wrapR
return bkt, nil
}

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, options ...objstore.IterOption) error {
func (b *Bucket) SupportedIterOptions() []objstore.IterOptionType {
return []objstore.IterOptionType{objstore.Recursive, objstore.UpdatedAt}
}

func (b *Bucket) IterWithAttributes(ctx context.Context, dir string, f func(attrs objstore.IterObjectAttributes) error, options ...objstore.IterOption) error {
if err := objstore.ValidateIterOptions(b.SupportedIterOptions(), options...); err != nil {
return err
}

prefix := dir
if prefix != "" && !strings.HasSuffix(prefix, DirDelim) {
prefix += DirDelim
Expand All @@ -211,7 +217,13 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
return err
}
for _, blob := range resp.Segment.BlobItems {
if err := f(*blob.Name); err != nil {
attrs := objstore.IterObjectAttributes{
Name: *blob.Name,
}
if params.LastModified {
attrs.SetLastModified(*blob.Properties.LastModified)
}
if err := f(attrs); err != nil {
return err
}
}
Expand All @@ -227,19 +239,42 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt
return err
}
for _, blobItem := range resp.Segment.BlobItems {
if err := f(*blobItem.Name); err != nil {
attrs := objstore.IterObjectAttributes{
Name: *blobItem.Name,
}
if params.LastModified {
attrs.SetLastModified(*blobItem.Properties.LastModified)
}
if err := f(attrs); err != nil {
return err
}
}
for _, blobPrefix := range resp.Segment.BlobPrefixes {
if err := f(*blobPrefix.Name); err != nil {
if err := f(objstore.IterObjectAttributes{Name: *blobPrefix.Name}); err != nil {
return err
}
}
}
return nil
}

// Iter calls f for each entry in the given directory. The argument to f is the full
// object name including the prefix of the inspected directory.
func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opts ...objstore.IterOption) error {
// Only include recursive option since attributes are not used in this method.
var filteredOpts []objstore.IterOption
for _, opt := range opts {
if opt.Type == objstore.Recursive {
filteredOpts = append(filteredOpts, opt)
break
}
}

return b.IterWithAttributes(ctx, dir, func(attrs objstore.IterObjectAttributes) error {
return f(attrs.Name)
}, filteredOpts...)
}

// IsObjNotFoundErr returns true if error means that object is not found. Relevant to Get operations.
func (b *Bucket) IsObjNotFoundErr(err error) bool {
if err == nil {
Expand Down
Loading

0 comments on commit 3b23d35

Please sign in to comment.