Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include content length in the response of Get and GetRange #145

Merged
merged 7 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

## Unreleased
- [#38](https://github.com/thanos-io/objstore/pull/38) GCS: Upgrade cloud.google.com/go/storage version to `v1.43.0`.
- [#145](https://github.com/thanos-io/objstore/pull/145) Include content length in the response of Get and GetRange.

### Fixed
- [#117](https://github.com/thanos-io/objstore/pull/117) Metrics: Fix `objstore_bucket_operation_failures_total` incorrectly incremented if context is cancelled while reading object contents.
Expand Down
32 changes: 27 additions & 5 deletions inmem.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,12 @@ func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error)
return nil, errNotFound
}

return io.NopCloser(bytes.NewReader(file)), nil
return ObjectSizerReadCloser{
ReadCloser: io.NopCloser(bytes.NewReader(file)),
Size: func() (int64, error) {
return int64(len(file)), nil
},
}, nil
}

// GetRange returns a new range reader for the given object name and range.
Expand All @@ -136,23 +141,40 @@ func (b *InMemBucket) GetRange(_ context.Context, name string, off, length int64
}

if int64(len(file)) < off {
return io.NopCloser(bytes.NewReader(nil)), nil
return ObjectSizerReadCloser{
ReadCloser: io.NopCloser(bytes.NewReader(nil)),
Size: func() (int64, error) { return 0, nil },
}, nil
}

if length == -1 {
return io.NopCloser(bytes.NewReader(file[off:])), nil
return ObjectSizerReadCloser{
ReadCloser: io.NopCloser(bytes.NewReader(file[off:])),
Size: func() (int64, error) {
return int64(len(file[off:])), nil
},
}, nil
}

if length <= 0 {
return io.NopCloser(bytes.NewReader(nil)), errors.New("length cannot be smaller or equal 0")
// wrap with ObjectSizerReadCloser to return 0 size.
return ObjectSizerReadCloser{
ReadCloser: io.NopCloser(bytes.NewReader(nil)),
Size: func() (int64, error) { return 0, nil },
}, errors.New("length cannot be smaller or equal 0")
}

if int64(len(file)) <= off+length {
// Just return maximum of what we have.
length = int64(len(file)) - off
}

return io.NopCloser(bytes.NewReader(file[off : off+length])), nil
return ObjectSizerReadCloser{
ReadCloser: io.NopCloser(bytes.NewReader(file[off : off+length])),
Size: func() (int64, error) {
return length, nil
},
}, nil
}

// Exists checks if the given directory exists in memory.
Expand Down
14 changes: 14 additions & 0 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,3 +829,17 @@ func (t *timingReaderWriterTo) WriteTo(w io.Writer) (n int64, err error) {
t.timingReader.updateMetrics(int(n), err)
return n, err
}

type ObjectSizerReadCloser struct {
io.ReadCloser
Size func() (int64, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I see, we always return nil as the error, so I wonder why we need to have it as a return argument.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

swift could potentially return err if the HEAD call fails

}

// ObjectSize implement ObjectSizer.
func (o ObjectSizerReadCloser) ObjectSize() (int64, error) {
if o.Size == nil {
return 0, errors.New("unknown size")
}

return o.Size()
}
8 changes: 7 additions & 1 deletion providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,13 @@ func (b *Bucket) getBlobReader(ctx context.Context, name string, httpRange blob.
return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobClient.URL())
}
retryOpts := azblob.RetryReaderOptions{MaxRetries: int32(b.readerMaxRetries)}
return resp.NewRetryReader(ctx, &retryOpts), nil

return objstore.ObjectSizerReadCloser{
ReadCloser: resp.NewRetryReader(ctx, &retryOpts),
Size: func() (int64, error) {
return *resp.ContentLength, nil
},
}, nil
}

// Get returns a reader for the given object name.
Expand Down
7 changes: 6 additions & 1 deletion providers/bos/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,12 @@ func (b *Bucket) getRange(_ context.Context, bucketName, objectKey string, off,
return nil, err
}

return obj.Body, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: obj.Body,
Size: func() (int64, error) {
return obj.ContentLength, nil
},
}, err
}

func configFromEnv() Config {
Expand Down
18 changes: 6 additions & 12 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,18 +320,12 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}
// Add size info into reader to pass it to Upload function.
r := objectSizerReadCloser{ReadCloser: resp.Body, size: resp.ContentLength}
return r, nil
}

type objectSizerReadCloser struct {
io.ReadCloser
size int64
}

// ObjectSize implement objstore.ObjectSizer.
func (o objectSizerReadCloser) ObjectSize() (int64, error) {
return o.size, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: resp.Body,
Size: func() (int64, error) {
return resp.ContentLength, nil
},
}, nil
}

// Get returns a reader for the given object name.
Expand Down
29 changes: 24 additions & 5 deletions providers/filesystem/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
return nil, errors.New("object name is empty")
}

file := filepath.Join(b.rootDir, name)
if _, err := os.Stat(file); err != nil {
var (
file = filepath.Join(b.rootDir, name)
stat os.FileInfo
err error
)
if stat, err = os.Stat(file); err != nil {
return nil, errors.Wrapf(err, "stat %s", file)
}

Expand All @@ -160,18 +164,33 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (
return nil, err
}

var newOffset int64
if off > 0 {
_, err := f.Seek(off, 0)
newOffset, err = f.Seek(off, 0)
if err != nil {
return nil, errors.Wrapf(err, "seek %v", off)
}
}

size := stat.Size() - newOffset
if length == -1 {
return f, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: f,
Size: func() (int64, error) {
return size, nil
},
}, nil
}

return &rangeReaderCloser{Reader: io.LimitReader(f, length), f: f}, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: &rangeReaderCloser{
Reader: io.LimitReader(f, length),
f: f,
},
Size: func() (int64, error) {
return min(length, size), nil
},
}, nil
}

// Exists checks if the given directory exists in memory.
Expand Down
25 changes: 23 additions & 2 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,33 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt

// Get returns a reader for the given object name.
func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
return b.bkt.Object(name).NewReader(ctx)
r, err := b.bkt.Object(name).NewReader(ctx)
if err != nil {
return r, err
}

return objstore.ObjectSizerReadCloser{
ReadCloser: r,
Size: func() (int64, error) {
return r.Attrs.Size, nil
},
}, nil
}

// GetRange returns a new range reader for the given object name and range.
func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) {
return b.bkt.Object(name).NewRangeReader(ctx, off, length)
r, err := b.bkt.Object(name).NewRangeReader(ctx, off, length)
if err != nil {
return r, err
}

sz := r.Remain()
return objstore.ObjectSizerReadCloser{
ReadCloser: r,
Size: func() (int64, error) {
return sz, nil
},
}, nil
}

// Attributes returns information about the specified object.
Expand Down
7 changes: 6 additions & 1 deletion providers/obs/obs.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,12 @@ func (b *Bucket) getRange(_ context.Context, name string, off, length int64) (io
if err != nil {
return nil, errors.Wrap(err, "failed to get object")
}
return output.Body, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: output.Body,
Size: func() (int64, error) {
return output.ContentLength, nil
},
}, nil
}

// Exists checks if the given object exists in the bucket.
Expand Down
13 changes: 11 additions & 2 deletions providers/oci/oci.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,12 @@ func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) {
if err != nil {
return nil, err
}
return response.Content, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: response.Content,
Size: func() (int64, error) {
return *response.ContentLength, nil
},
}, nil
}

// GetRange returns a new range reader for the given object name and range.
Expand Down Expand Up @@ -164,7 +169,11 @@ func (b *Bucket) GetRange(ctx context.Context, name string, offset, length int64
if err != nil {
return nil, err
}
return response.Content, nil
return objstore.ObjectSizerReadCloser{ReadCloser: response.Content,
Size: func() (int64, error) {
return *response.ContentLength, nil
},
}, nil
}

// Upload the contents of the reader as an object into the bucket.
Expand Down
15 changes: 13 additions & 2 deletions providers/oss/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"testing"
"time"

"github.com/aliyun/aliyun-oss-go-sdk/oss"
alioss "github.com/aliyun/aliyun-oss-go-sdk/oss"
"github.com/go-kit/log"
"github.com/pkg/errors"
Expand Down Expand Up @@ -342,12 +343,22 @@ func (b *Bucket) getRange(_ context.Context, name string, off, length int64) (io
opts = append(opts, opt)
}

resp, err := b.bucket.GetObject(name, opts...)
resp, err := b.bucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: name}, opts)
if err != nil {
return nil, err
}

return resp, nil
size, err := clientutil.ParseContentLength(resp.Response.Headers)
if err == nil {
return objstore.ObjectSizerReadCloser{
ReadCloser: resp.Response,
Size: func() (int64, error) {
return size, nil
},
}, nil
}

return resp.Response, nil
}

// Get returns a reader for the given object name.
Expand Down
12 changes: 11 additions & 1 deletion providers/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,17 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) (
return nil, err
}

return r, nil
return objstore.ObjectSizerReadCloser{
ReadCloser: r,
Size: func() (int64, error) {
stat, err := r.Stat()
if err != nil {
return 0, err
}

return stat.Size, nil
},
}, nil
}

// Get returns a reader for the given object name.
Expand Down
6 changes: 5 additions & 1 deletion providers/swift/swift.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ func (c *Container) get(name string, headers swift.Headers, checkHash bool) (io.
if err != nil {
return nil, errors.Wrap(err, "open object")
}
return file, err

return objstore.ObjectSizerReadCloser{
ReadCloser: file,
Size: file.Length,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file.Length() returns the value of Content-Length response header if it is set, else it makes a HEAD call to the store

}, nil
}

// Get returns a reader for the given object name.
Expand Down
20 changes: 20 additions & 0 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) {
rc1, err := bkt.Get(ctx, "id1/obj_1.some")
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rc1.Close()) }()

sz, err := TryToGetSize(rc1)
testutil.Ok(t, err)
testutil.Equals(t, int64(11), sz, "expected size to be equal to 11")

content, err := io.ReadAll(rc1)
testutil.Ok(t, err)
testutil.Equals(t, "@test-data@", string(content))
Expand All @@ -118,6 +123,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) {
rc2, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, 3)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rc2.Close()) }()

sz, err = TryToGetSize(rc2)
testutil.Ok(t, err)
testutil.Equals(t, int64(3), sz, "expected size to be equal to 3")

content, err = io.ReadAll(rc2)
testutil.Ok(t, err)
testutil.Equals(t, "tes", string(content))
Expand All @@ -126,6 +136,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) {
rcUnspecifiedLen, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, -1)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rcUnspecifiedLen.Close()) }()

sz, err = TryToGetSize(rcUnspecifiedLen)
testutil.Ok(t, err)
testutil.Equals(t, int64(10), sz, "expected size to be equal to 10")

content, err = io.ReadAll(rcUnspecifiedLen)
testutil.Ok(t, err)
testutil.Equals(t, "test-data@", string(content))
Expand All @@ -141,6 +156,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) {
rcLength, err := bkt.GetRange(ctx, "id1/obj_1.some", 3, 9999)
testutil.Ok(t, err)
defer func() { testutil.Ok(t, rcLength.Close()) }()

sz, err = TryToGetSize(rcLength)
testutil.Ok(t, err)
testutil.Equals(t, int64(8), sz, "expected size to be equal to 8")

content, err = io.ReadAll(rcLength)
testutil.Ok(t, err)
testutil.Equals(t, "st-data@", string(content))
Expand Down
Loading