Skip to content

Commit

Permalink
Add support for passing context
Browse files Browse the repository at this point in the history
When serving data from remote location, one might need to pass
the request context to backend storage of entries, for example
for distributed tracing to work.
  • Loading branch information
martin-sucha committed Oct 24, 2020
1 parent bc5258f commit 0b4c75e
Show file tree
Hide file tree
Showing 8 changed files with 424 additions and 60 deletions.
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ zipserve
Package zipserve implements serving virtual zip archives over HTTP,
with support for range queries and resumable downloads. Zipserve keeps only the
archive headers in memory (similar to archive/zip when streaming).
The actual file data is fetched on demand from user-provided ReaderAt,
so the file data can be fetched from a remote location.
Zipserve needs to know CRC32 of the uncompressed data, compressed and uncompressed size of files in advance,
which must be supplied by the user.
Zipserve fetches file data on demand from user-provided `io.ReaderAt` or `zipserve.ReaderAt`,
so the file data can be fetched from a remote location.
`zipserve.ReaderAt` supports passing request context to the backing store.

The user has to provide CRC32 of the uncompressed data, compressed and uncompressed size of files in advance.
These can be computed for example during file uploads.

Differences to archive/zip
--------------------------
Expand All @@ -35,8 +37,7 @@ so there aren't many commits. I update the module when a new version of Go is re
License
-------

Three clause BSD (same as Go) for files in this package (see [LICENSE](LICENSE)),
Apache 2.0 for readerutil package from go4.org which is used as a dependency.
Three clause BSD (same as Go), see [LICENSE](LICENSE).

Alternatives
------------
Expand Down
87 changes: 42 additions & 45 deletions archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package zipserve

import (
"bytes"
"context"
"crypto/md5"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"go4.org/readerutil"
"io"
"net/http"
"strings"
Expand All @@ -34,6 +34,9 @@ type Template struct {
// Prefix is the content at the beginning of the file before ZIP entries.
//
// It may be used to create self-extracting archives, for example.
//
// Prefix may implement ReaderAt interface from this package, in that case
// Prefix's ReadAtContext method will be called instead of ReadAt.
Prefix io.ReaderAt

// PrefixSize is size of Prefix in bytes.
Expand All @@ -54,25 +57,11 @@ type Template struct {
CreateTime time.Time
}

type partsBuilder struct {
parts []readerutil.SizeReaderAt
offset int64
}

func (pb *partsBuilder) add(r readerutil.SizeReaderAt) {
size := r.Size()
if size == 0 {
return
}
pb.parts = append(pb.parts, r)
pb.offset += size
}

// Archive represents the ZIP file data to be downloaded by the user.
//
// It is a ReaderAt, so allows concurrent access to different byte ranges of the archive.
type Archive struct {
data readerutil.SizeReaderAt
parts multiReaderAt
createTime time.Time
etag string
}
Expand All @@ -89,9 +78,9 @@ func NewArchive(t *Template) (*Archive, error) {
return newArchive(t, bufferView, nil)
}

type bufferViewFunc func(content func(w io.Writer) error) (readerutil.SizeReaderAt, error)
type bufferViewFunc func(content func(w io.Writer) error) (sizeReaderAt, error)

func bufferView(content func(w io.Writer) error) (readerutil.SizeReaderAt, error) {
func bufferView(content func(w io.Writer) error) (sizeReaderAt, error) {
var buf bytes.Buffer

err := content(&buf)
Expand All @@ -101,17 +90,24 @@ func bufferView(content func(w io.Writer) error) (readerutil.SizeReaderAt, error
return bytes.NewReader(buf.Bytes()), nil
}

func readerAt(r io.ReaderAt) ReaderAt {
if v, ok := r.(ReaderAt); ok {
return v
}
return ignoreContext{r: r}
}

func newArchive(t *Template, view bufferViewFunc, testHookCloseSizeOffset func(size, offset uint64)) (*Archive, error) {
if len(t.Comment) > uint16max {
return nil, errors.New("comment too long")
}

ar := new(Archive)
dir := make([]*header, 0, len(t.Entries))
var pb partsBuilder
etagHash := md5.New()

if t.Prefix != nil {
pb.add(&addsize{size: t.PrefixSize, source: t.Prefix})
ar.parts.add(readerAt(t.Prefix), t.PrefixSize)

var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], uint64(t.PrefixSize))
Expand All @@ -122,28 +118,28 @@ func newArchive(t *Template, view bufferViewFunc, testHookCloseSizeOffset func(s

for _, entry := range t.Entries {
prepareEntry(entry)
dir = append(dir, &header{FileHeader: entry, offset: uint64(pb.offset)})
dir = append(dir, &header{FileHeader: entry, offset: uint64(ar.parts.size)})
header, err := view(func(w io.Writer) error {
return writeHeader(w, entry)
})
if err != nil {
return nil, err
}
pb.add(header)
ar.parts.addSizeReaderAt(header)
io.Copy(etagHash, io.NewSectionReader(header, 0, header.Size()))
if strings.HasSuffix(entry.Name, "/") {
if entry.Content != nil {
return nil, errors.New("directory entry non-nil content")
}
} else {
if entry.Content != nil {
pb.add(&addsize{size: int64(entry.CompressedSize64), source: entry.Content})
ar.parts.add(readerAt(entry.Content), int64(entry.CompressedSize64))
} else if entry.CompressedSize64 != 0 {
return nil, errors.New("empty entry with nonzero length")
}
// data descriptor
dataDescriptor := makeDataDescriptor(entry)
pb.add(bytes.NewReader(dataDescriptor))
ar.parts.addSizeReaderAt(bytes.NewReader(dataDescriptor))
etagHash.Write(dataDescriptor)
}
if entry.Modified.After(maxTime) {
Expand All @@ -153,37 +149,46 @@ func newArchive(t *Template, view bufferViewFunc, testHookCloseSizeOffset func(s

// capture central directory offset and comment so that content func for central directory
// may be called multiple times and we don't store reference to t in the closure
centralDirectoryOffset := pb.offset
centralDirectoryOffset := ar.parts.size
comment := t.Comment
centralDirectory, err := view(func(w io.Writer) error {
return writeCentralDirectory(centralDirectoryOffset, dir, w, comment, testHookCloseSizeOffset)
})
if err != nil {
return nil, err
}
pb.add(centralDirectory)
ar.parts.addSizeReaderAt(centralDirectory)
io.Copy(etagHash, io.NewSectionReader(centralDirectory, 0, centralDirectory.Size()))

createTime := t.CreateTime
if createTime.IsZero() {
createTime = maxTime
ar.createTime = t.CreateTime
if ar.createTime.IsZero() {
ar.createTime = maxTime
}

etag := fmt.Sprintf("\"%s\"", hex.EncodeToString(etagHash.Sum(nil)))
ar.etag = fmt.Sprintf("\"%s\"", hex.EncodeToString(etagHash.Sum(nil)))

return &Archive{
data: readerutil.NewMultiReaderAt(pb.parts...),
createTime: createTime,
etag: etag}, nil
return ar, nil
}

// Size returns the size of the archive in bytes.
func (ar *Archive) Size() int64 { return ar.data.Size() }
func (ar *Archive) Size() int64 { return ar.parts.Size() }

// ReadAt provides the data of the file.
//
// This is same as calling ReadAtContext with context.TODO()
//
// See io.ReaderAt for the interface.
func (ar *Archive) ReadAt(p []byte, off int64) (int, error) { return ar.data.ReadAt(p, off) }
func (ar *Archive) ReadAt(p []byte, off int64) (int, error) { return ar.parts.ReadAtContext(context.TODO(), p, off) }

// ReadAtContext provides the data of the file.
//
// This methods implements ReaderAt interface.
//
// The context is passed to ReadAtContext of individual entries, if they implement it. The context is ignored if an
// entry implements just io.ReaderAt.
func (ar *Archive) ReadAtContext(ctx context.Context, p []byte, off int64) (int, error) {
return ar.parts.ReadAtContext(ctx, p, off)
}

// ServeHTTP serves the archive over HTTP.
//
Expand All @@ -202,14 +207,6 @@ func (ar *Archive) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Etag", ar.etag)
}

readseeker := io.NewSectionReader(ar.data, 0, ar.data.Size())
readseeker := io.NewSectionReader(withContext{r: &ar.parts, ctx: r.Context()}, 0, ar.parts.Size())
http.ServeContent(w, r, "", ar.createTime, readseeker)
}

type addsize struct {
size int64
source io.ReaderAt
}

func (as *addsize) Size() int64 { return as.size }
func (as *addsize) ReadAt(p []byte, off int64) (int, error) { return as.source.ReadAt(p, off) }
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
module github.com/martin-sucha/zipserve

go 1.12

require go4.org v0.0.0-20180417224846-9599cf28b011
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,2 +0,0 @@
go4.org v0.0.0-20180417224846-9599cf28b011 h1:i0QTVNl3j6yciHiQIHxz+mnsSQqo/xi78EGN7yNpMVw=
go4.org v0.0.0-20180417224846-9599cf28b011/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE=
123 changes: 123 additions & 0 deletions io.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package zipserve

import (
"context"
"fmt"
"io"
"sort"
)

// ReaderAt is like io.ReaderAt, but also takes context.
type ReaderAt interface {
// ReadAtContext has same semantics as ReadAt from io.ReaderAt, but takes context.
ReadAtContext(ctx context.Context, p []byte, off int64) (n int, err error)
}

type sizeReaderAt interface {
io.ReaderAt
Size() int64
}

type offsetAndData struct {
offset int64
data ReaderAt
}

// multiReaderAt is a ReaderAt that joins multiple ReaderAt sequentially together.
type multiReaderAt struct {
parts []offsetAndData
size int64
}

// add a part to the multiContextReader.
// add can be used only before the reader is read from.
func (mcr *multiReaderAt) add(data ReaderAt, size int64) {
switch {
case size < 0:
panic(fmt.Sprintf("size cannot be negative: %v", size))
case size == 0:
return
}
mcr.parts = append(mcr.parts, offsetAndData{
offset: mcr.size,
data: data,
})
mcr.size += size
}

// addSizeReaderAt is like add, but takes sizeReaderAt
func (mcr *multiReaderAt) addSizeReaderAt(r sizeReaderAt) {
mcr.add(ignoreContext{r: r}, r.Size())
}

// endOffset is offset where the given part ends.
func (mcr *multiReaderAt) endOffset(partIndex int) int64 {
if partIndex == len(mcr.parts)-1 {
return mcr.size
}
return mcr.parts[partIndex+1].offset
}

func (mcr *multiReaderAt) ReadAtContext(ctx context.Context, p []byte, off int64) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
if off >= mcr.size {
return 0, io.EOF
}
// find first part that has data for p
firstPartIndex := sort.Search(len(mcr.parts), func(i int) bool {
return mcr.endOffset(i) > off
})
for partIndex := firstPartIndex; partIndex < len(mcr.parts) && len(p) > 0; partIndex++ {
if partIndex > firstPartIndex {
off = mcr.parts[partIndex].offset
}
partRemainingBytes := mcr.endOffset(partIndex) - off
sizeToRead := int64(len(p))
if sizeToRead > partRemainingBytes {
sizeToRead = partRemainingBytes
}
n2, err2 := mcr.parts[partIndex].data.ReadAtContext(ctx, p[0:sizeToRead], off - mcr.parts[partIndex].offset)
n += n2
if err2 != nil {
return n, err2
}
p = p[n2:]
}
if len(p) > 0 {
// tried reading beyond size
return n, io.EOF
}
return n, nil
}

func (mcr *multiReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
return mcr.ReadAtContext(context.TODO(), p, off)
}

func (mcr *multiReaderAt) Size() int64 {
return mcr.size
}

// ignoreContext converts io.ReaderAt to ReaderAt
type ignoreContext struct {
r io.ReaderAt
}

func (a ignoreContext) ReadAtContext(_ context.Context, p []byte, off int64) (n int, err error) {
return a.r.ReadAt(p, off)
}

// withContext converts ReaderAt to io.ReaderAt.
//
// While usually we shouldn't store context in a structure, we ensure that withContext lives only within single
// request.
type withContext struct {
ctx context.Context
r ReaderAt
}

func (w withContext) ReadAt(p []byte, off int64) (n int, err error) {
return w.r.ReadAtContext(w.ctx, p, off)
}
Loading

0 comments on commit 0b4c75e

Please sign in to comment.