diff --git a/CHANGELOG.md b/CHANGELOG.md index 12375cbf..4686a470 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ## Unreleased - [#38](https://github.com/thanos-io/objstore/pull/38) GCS: Upgrade cloud.google.com/go/storage version to `v1.43.0`. +- [#145](https://github.com/thanos-io/objstore/pull/145) Include content length in the response of Get and GetRange. ### Fixed - [#117](https://github.com/thanos-io/objstore/pull/117) Metrics: Fix `objstore_bucket_operation_failures_total` incorrectly incremented if context is cancelled while reading object contents. diff --git a/inmem.go b/inmem.go index 3f6f35e9..ed256c9c 100644 --- a/inmem.go +++ b/inmem.go @@ -119,7 +119,12 @@ func (b *InMemBucket) Get(_ context.Context, name string) (io.ReadCloser, error) return nil, errNotFound } - return io.NopCloser(bytes.NewReader(file)), nil + return ObjectSizerReadCloser{ + ReadCloser: io.NopCloser(bytes.NewReader(file)), + Size: func() (int64, error) { + return int64(len(file)), nil + }, + }, nil } // GetRange returns a new range reader for the given object name and range. @@ -136,15 +141,27 @@ func (b *InMemBucket) GetRange(_ context.Context, name string, off, length int64 } if int64(len(file)) < off { - return io.NopCloser(bytes.NewReader(nil)), nil + return ObjectSizerReadCloser{ + ReadCloser: io.NopCloser(bytes.NewReader(nil)), + Size: func() (int64, error) { return 0, nil }, + }, nil } if length == -1 { - return io.NopCloser(bytes.NewReader(file[off:])), nil + return ObjectSizerReadCloser{ + ReadCloser: io.NopCloser(bytes.NewReader(file[off:])), + Size: func() (int64, error) { + return int64(len(file[off:])), nil + }, + }, nil } if length <= 0 { - return io.NopCloser(bytes.NewReader(nil)), errors.New("length cannot be smaller or equal 0") + // wrap with ObjectSizerReadCloser to return 0 size. + return ObjectSizerReadCloser{ + ReadCloser: io.NopCloser(bytes.NewReader(nil)), + Size: func() (int64, error) { return 0, nil }, + }, errors.New("length cannot be smaller or equal 0") } if int64(len(file)) <= off+length { @@ -152,7 +169,12 @@ func (b *InMemBucket) GetRange(_ context.Context, name string, off, length int64 length = int64(len(file)) - off } - return io.NopCloser(bytes.NewReader(file[off : off+length])), nil + return ObjectSizerReadCloser{ + ReadCloser: io.NopCloser(bytes.NewReader(file[off : off+length])), + Size: func() (int64, error) { + return length, nil + }, + }, nil } // Exists checks if the given directory exists in memory. diff --git a/objstore.go b/objstore.go index bfee7503..5bce3ef6 100644 --- a/objstore.go +++ b/objstore.go @@ -829,3 +829,17 @@ func (t *timingReaderWriterTo) WriteTo(w io.Writer) (n int64, err error) { t.timingReader.updateMetrics(int(n), err) return n, err } + +type ObjectSizerReadCloser struct { + io.ReadCloser + Size func() (int64, error) +} + +// ObjectSize implement ObjectSizer. +func (o ObjectSizerReadCloser) ObjectSize() (int64, error) { + if o.Size == nil { + return 0, errors.New("unknown size") + } + + return o.Size() +} diff --git a/providers/azure/azure.go b/providers/azure/azure.go index c2f3adb5..9a4e8518 100644 --- a/providers/azure/azure.go +++ b/providers/azure/azure.go @@ -273,7 +273,13 @@ func (b *Bucket) getBlobReader(ctx context.Context, name string, httpRange blob. return nil, errors.Wrapf(err, "cannot download blob, address: %s", blobClient.URL()) } retryOpts := azblob.RetryReaderOptions{MaxRetries: int32(b.readerMaxRetries)} - return resp.NewRetryReader(ctx, &retryOpts), nil + + return objstore.ObjectSizerReadCloser{ + ReadCloser: resp.NewRetryReader(ctx, &retryOpts), + Size: func() (int64, error) { + return *resp.ContentLength, nil + }, + }, nil } // Get returns a reader for the given object name. diff --git a/providers/bos/bos.go b/providers/bos/bos.go index 74f9688e..1f81e920 100644 --- a/providers/bos/bos.go +++ b/providers/bos/bos.go @@ -308,7 +308,12 @@ func (b *Bucket) getRange(_ context.Context, bucketName, objectKey string, off, return nil, err } - return obj.Body, nil + return objstore.ObjectSizerReadCloser{ + ReadCloser: obj.Body, + Size: func() (int64, error) { + return obj.ContentLength, nil + }, + }, err } func configFromEnv() Config { diff --git a/providers/cos/cos.go b/providers/cos/cos.go index a8b853e3..f88a8e76 100644 --- a/providers/cos/cos.go +++ b/providers/cos/cos.go @@ -320,18 +320,12 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( return nil, err } // Add size info into reader to pass it to Upload function. - r := objectSizerReadCloser{ReadCloser: resp.Body, size: resp.ContentLength} - return r, nil -} - -type objectSizerReadCloser struct { - io.ReadCloser - size int64 -} - -// ObjectSize implement objstore.ObjectSizer. -func (o objectSizerReadCloser) ObjectSize() (int64, error) { - return o.size, nil + return objstore.ObjectSizerReadCloser{ + ReadCloser: resp.Body, + Size: func() (int64, error) { + return resp.ContentLength, nil + }, + }, nil } // Get returns a reader for the given object name. diff --git a/providers/filesystem/filesystem.go b/providers/filesystem/filesystem.go index 21c70485..2ed42ee8 100644 --- a/providers/filesystem/filesystem.go +++ b/providers/filesystem/filesystem.go @@ -150,8 +150,12 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) ( return nil, errors.New("object name is empty") } - file := filepath.Join(b.rootDir, name) - if _, err := os.Stat(file); err != nil { + var ( + file = filepath.Join(b.rootDir, name) + stat os.FileInfo + err error + ) + if stat, err = os.Stat(file); err != nil { return nil, errors.Wrapf(err, "stat %s", file) } @@ -160,18 +164,33 @@ func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) ( return nil, err } + var newOffset int64 if off > 0 { - _, err := f.Seek(off, 0) + newOffset, err = f.Seek(off, 0) if err != nil { return nil, errors.Wrapf(err, "seek %v", off) } } + size := stat.Size() - newOffset if length == -1 { - return f, nil + return objstore.ObjectSizerReadCloser{ + ReadCloser: f, + Size: func() (int64, error) { + return size, nil + }, + }, nil } - return &rangeReaderCloser{Reader: io.LimitReader(f, length), f: f}, nil + return objstore.ObjectSizerReadCloser{ + ReadCloser: &rangeReaderCloser{ + Reader: io.LimitReader(f, length), + f: f, + }, + Size: func() (int64, error) { + return min(length, size), nil + }, + }, nil } // Exists checks if the given directory exists in memory. diff --git a/providers/gcs/gcs.go b/providers/gcs/gcs.go index b5dae200..e022b14f 100644 --- a/providers/gcs/gcs.go +++ b/providers/gcs/gcs.go @@ -226,12 +226,33 @@ func (b *Bucket) Iter(ctx context.Context, dir string, f func(string) error, opt // Get returns a reader for the given object name. func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { - return b.bkt.Object(name).NewReader(ctx) + r, err := b.bkt.Object(name).NewReader(ctx) + if err != nil { + return r, err + } + + return objstore.ObjectSizerReadCloser{ + ReadCloser: r, + Size: func() (int64, error) { + return r.Attrs.Size, nil + }, + }, nil } // GetRange returns a new range reader for the given object name and range. func (b *Bucket) GetRange(ctx context.Context, name string, off, length int64) (io.ReadCloser, error) { - return b.bkt.Object(name).NewRangeReader(ctx, off, length) + r, err := b.bkt.Object(name).NewRangeReader(ctx, off, length) + if err != nil { + return r, err + } + + sz := r.Remain() + return objstore.ObjectSizerReadCloser{ + ReadCloser: r, + Size: func() (int64, error) { + return sz, nil + }, + }, nil } // Attributes returns information about the specified object. diff --git a/providers/obs/obs.go b/providers/obs/obs.go index 7de70633..cb450365 100644 --- a/providers/obs/obs.go +++ b/providers/obs/obs.go @@ -299,7 +299,12 @@ func (b *Bucket) getRange(_ context.Context, name string, off, length int64) (io if err != nil { return nil, errors.Wrap(err, "failed to get object") } - return output.Body, nil + return objstore.ObjectSizerReadCloser{ + ReadCloser: output.Body, + Size: func() (int64, error) { + return output.ContentLength, nil + }, + }, nil } // Exists checks if the given object exists in the bucket. diff --git a/providers/oci/oci.go b/providers/oci/oci.go index e2f9e98b..3bdf80f3 100644 --- a/providers/oci/oci.go +++ b/providers/oci/oci.go @@ -134,7 +134,12 @@ func (b *Bucket) Get(ctx context.Context, name string) (io.ReadCloser, error) { if err != nil { return nil, err } - return response.Content, nil + return objstore.ObjectSizerReadCloser{ + ReadCloser: response.Content, + Size: func() (int64, error) { + return *response.ContentLength, nil + }, + }, nil } // GetRange returns a new range reader for the given object name and range. @@ -164,7 +169,11 @@ func (b *Bucket) GetRange(ctx context.Context, name string, offset, length int64 if err != nil { return nil, err } - return response.Content, nil + return objstore.ObjectSizerReadCloser{ReadCloser: response.Content, + Size: func() (int64, error) { + return *response.ContentLength, nil + }, + }, nil } // Upload the contents of the reader as an object into the bucket. diff --git a/providers/oss/oss.go b/providers/oss/oss.go index 0a4cc76e..d6e1bbf5 100644 --- a/providers/oss/oss.go +++ b/providers/oss/oss.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/aliyun/aliyun-oss-go-sdk/oss" alioss "github.com/aliyun/aliyun-oss-go-sdk/oss" "github.com/go-kit/log" "github.com/pkg/errors" @@ -342,12 +343,22 @@ func (b *Bucket) getRange(_ context.Context, name string, off, length int64) (io opts = append(opts, opt) } - resp, err := b.bucket.GetObject(name, opts...) + resp, err := b.bucket.DoGetObject(&oss.GetObjectRequest{ObjectKey: name}, opts) if err != nil { return nil, err } - return resp, nil + size, err := clientutil.ParseContentLength(resp.Response.Headers) + if err == nil { + return objstore.ObjectSizerReadCloser{ + ReadCloser: resp.Response, + Size: func() (int64, error) { + return size, nil + }, + }, nil + } + + return resp.Response, nil } // Get returns a reader for the given object name. diff --git a/providers/s3/s3.go b/providers/s3/s3.go index 2f0447a7..eac8191a 100644 --- a/providers/s3/s3.go +++ b/providers/s3/s3.go @@ -452,7 +452,17 @@ func (b *Bucket) getRange(ctx context.Context, name string, off, length int64) ( return nil, err } - return r, nil + return objstore.ObjectSizerReadCloser{ + ReadCloser: r, + Size: func() (int64, error) { + stat, err := r.Stat() + if err != nil { + return 0, err + } + + return stat.Size, nil + }, + }, nil } // Get returns a reader for the given object name. diff --git a/providers/swift/swift.go b/providers/swift/swift.go index 682c494c..44fa6ed0 100644 --- a/providers/swift/swift.go +++ b/providers/swift/swift.go @@ -262,7 +262,11 @@ func (c *Container) get(name string, headers swift.Headers, checkHash bool) (io. if err != nil { return nil, errors.Wrap(err, "open object") } - return file, err + + return objstore.ObjectSizerReadCloser{ + ReadCloser: file, + Size: file.Length, + }, nil } // Get returns a reader for the given object name. diff --git a/testing.go b/testing.go index b8e3744c..28cbd658 100644 --- a/testing.go +++ b/testing.go @@ -106,6 +106,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { rc1, err := bkt.Get(ctx, "id1/obj_1.some") testutil.Ok(t, err) defer func() { testutil.Ok(t, rc1.Close()) }() + + sz, err := TryToGetSize(rc1) + testutil.Ok(t, err) + testutil.Equals(t, int64(11), sz, "expected size to be equal to 11") + content, err := io.ReadAll(rc1) testutil.Ok(t, err) testutil.Equals(t, "@test-data@", string(content)) @@ -118,6 +123,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { rc2, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, 3) testutil.Ok(t, err) defer func() { testutil.Ok(t, rc2.Close()) }() + + sz, err = TryToGetSize(rc2) + testutil.Ok(t, err) + testutil.Equals(t, int64(3), sz, "expected size to be equal to 3") + content, err = io.ReadAll(rc2) testutil.Ok(t, err) testutil.Equals(t, "tes", string(content)) @@ -126,6 +136,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { rcUnspecifiedLen, err := bkt.GetRange(ctx, "id1/obj_1.some", 1, -1) testutil.Ok(t, err) defer func() { testutil.Ok(t, rcUnspecifiedLen.Close()) }() + + sz, err = TryToGetSize(rcUnspecifiedLen) + testutil.Ok(t, err) + testutil.Equals(t, int64(10), sz, "expected size to be equal to 10") + content, err = io.ReadAll(rcUnspecifiedLen) testutil.Ok(t, err) testutil.Equals(t, "test-data@", string(content)) @@ -141,6 +156,11 @@ func AcceptanceTest(t *testing.T, bkt Bucket) { rcLength, err := bkt.GetRange(ctx, "id1/obj_1.some", 3, 9999) testutil.Ok(t, err) defer func() { testutil.Ok(t, rcLength.Close()) }() + + sz, err = TryToGetSize(rcLength) + testutil.Ok(t, err) + testutil.Equals(t, int64(8), sz, "expected size to be equal to 8") + content, err = io.ReadAll(rcLength) testutil.Ok(t, err) testutil.Equals(t, "st-data@", string(content))