Skip to content

Commit

Permalink
Merge pull request #536 from mtrmac/blob-info-caching
Browse files Browse the repository at this point in the history
Blob info caching + mount/reuse
  • Loading branch information
runcom authored Dec 6, 2018
2 parents 63a1cbd + 3a6fc86 commit d53afe1
Show file tree
Hide file tree
Showing 42 changed files with 1,874 additions and 278 deletions.
101 changes: 65 additions & 36 deletions copy/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"time"

"github.com/containers/image/image"
"github.com/containers/image/pkg/blobinfocache"
"github.com/containers/image/pkg/compression"
"github.com/containers/image/signature"
"github.com/containers/image/transports"
Expand All @@ -24,14 +25,16 @@ import (
)

type digestingReader struct {
source io.Reader
digester digest.Digester
expectedDigest digest.Digest
validationFailed bool
source io.Reader
digester digest.Digester
expectedDigest digest.Digest
validationFailed bool
validationSucceeded bool
}

// newDigestingReader returns an io.Reader implementation with contents of source, which will eventually return a non-EOF error
// and set validationFailed to true if the source stream does not match expectedDigest.
// or set validationSucceeded/validationFailed to true if the source stream does/does not match expectedDigest.
// (neither is set if EOF is never reached).
func newDigestingReader(source io.Reader, expectedDigest digest.Digest) (*digestingReader, error) {
if err := expectedDigest.Validate(); err != nil {
return nil, errors.Errorf("Invalid digest specification %s", expectedDigest)
Expand Down Expand Up @@ -64,28 +67,30 @@ func (d *digestingReader) Read(p []byte) (int, error) {
d.validationFailed = true
return 0, errors.Errorf("Digest did not match, expected %s, got %s", d.expectedDigest, actualDigest)
}
d.validationSucceeded = true
}
return n, err
}

// copier allows us to keep track of diffID values for blobs, and other
// data shared across one or more images in a possible manifest list.
type copier struct {
cachedDiffIDs map[digest.Digest]digest.Digest
dest types.ImageDestination
rawSource types.ImageSource
reportWriter io.Writer
progressInterval time.Duration
progress chan types.ProgressProperties
blobInfoCache types.BlobInfoCache
}

// imageCopier tracks state specific to a single image (possibly an item of a manifest list)
type imageCopier struct {
c *copier
manifestUpdates *types.ManifestUpdateOptions
src types.Image
diffIDsAreNeeded bool
canModifyManifest bool
c *copier
manifestUpdates *types.ManifestUpdateOptions
src types.Image
diffIDsAreNeeded bool
canModifyManifest bool
canSubstituteBlobs bool
}

// Options allows supplying non-default configuration modifying the behavior of CopyImage.
Expand Down Expand Up @@ -141,12 +146,15 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef,
}()

c := &copier{
cachedDiffIDs: make(map[digest.Digest]digest.Digest),
dest: dest,
rawSource: rawSource,
reportWriter: reportWriter,
progressInterval: options.ProgressInterval,
progress: options.Progress,
// FIXME? The cache is used for sources and destinations equally, but we only have a SourceCtx and DestinationCtx.
// For now, use DestinationCtx (because blob reuse changes the behavior of the destination side more); eventually
// we might want to add a separate CommonCtx — or would that be too confusing?
blobInfoCache: blobinfocache.DefaultCache(options.DestinationCtx),
}

unparsedToplevel := image.UnparsedInstance(rawSource, nil)
Expand Down Expand Up @@ -235,6 +243,13 @@ func (c *copier) copyOneImage(ctx context.Context, policyContext *signature.Poli
src: src,
// diffIDsAreNeeded is computed later
canModifyManifest: len(sigs) == 0,
// Ensure _this_ copy sees exactly the intended data when either processing a signed image or signing it.
// This may be too conservative, but for now, better safe than sorry, _especially_ on the SignBy path:
// The signature makes the content non-repudiable, so it very much matters that the signature is made over exactly what the user intended.
// We do intend the RecordDigestUncompressedPair calls to only work with reliable data, but at least there’s a risk
// that the compressed version coming from a third party may be designed to attack some other decompressor implementation,
// and we would reuse and sign it.
canSubstituteBlobs: len(sigs) == 0 && options.SignBy == "",
}

if err := ic.updateEmbeddedDockerReference(); err != nil {
Expand Down Expand Up @@ -498,32 +513,24 @@ type diffIDResult struct {
// copyLayer copies a layer with srcInfo (with known Digest and possibly known Size) in src to dest, perhaps compressing it if canCompress,
// and returns a complete blobInfo of the copied layer, and a value for LayerDiffIDs if diffIDIsNeeded
func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo) (types.BlobInfo, digest.Digest, error) {
// Check if we already have a blob with this digest
haveBlob, extantBlobSize, err := ic.c.dest.HasBlob(ctx, srcInfo)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error checking for blob %s at destination", srcInfo.Digest)
}
// If we already have a cached diffID for this blob, we don't need to compute it
diffIDIsNeeded := ic.diffIDsAreNeeded && (ic.c.cachedDiffIDs[srcInfo.Digest] == "")
// If we already have the blob, and we don't need to recompute the diffID, then we might be able to avoid reading it again
if haveBlob && !diffIDIsNeeded {
// Check the blob sizes match, if we were given a size this time
if srcInfo.Size != -1 && srcInfo.Size != extantBlobSize {
return types.BlobInfo{}, "", errors.Errorf("Error: blob %s is already present, but with size %d instead of %d", srcInfo.Digest, extantBlobSize, srcInfo.Size)
}
srcInfo.Size = extantBlobSize
// Tell the image destination that this blob's delta is being applied again. For some image destinations, this can be faster than using GetBlob/PutBlob
blobinfo, err := ic.c.dest.ReapplyBlob(ctx, srcInfo)
cachedDiffID := ic.c.blobInfoCache.UncompressedDigest(srcInfo.Digest) // May be ""
diffIDIsNeeded := ic.diffIDsAreNeeded && cachedDiffID == ""

// If we already have the blob, and we don't need to compute the diffID, then we don't need to read it from the source.
if !diffIDIsNeeded {
reused, blobInfo, err := ic.c.dest.TryReusingBlob(ctx, srcInfo, ic.c.blobInfoCache, ic.canSubstituteBlobs)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error reapplying blob %s at destination", srcInfo.Digest)
return types.BlobInfo{}, "", errors.Wrapf(err, "Error trying to reuse blob %s at destination", srcInfo.Digest)
}
if reused {
ic.c.Printf("Skipping fetch of repeat blob %s\n", srcInfo.Digest)
return blobInfo, cachedDiffID, nil
}
ic.c.Printf("Skipping fetch of repeat blob %s\n", srcInfo.Digest)
return blobinfo, ic.c.cachedDiffIDs[srcInfo.Digest], err
}

// Fallback: copy the layer, computing the diffID if we need to do so
ic.c.Printf("Copying blob %s\n", srcInfo.Digest)
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo)
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache)
if err != nil {
return types.BlobInfo{}, "", errors.Wrapf(err, "Error reading blob %s", srcInfo.Digest)
}
Expand All @@ -543,11 +550,13 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo) (t
return types.BlobInfo{}, "", errors.Wrap(diffIDResult.err, "Error computing layer DiffID")
}
logrus.Debugf("Computed DiffID %s for layer %s", diffIDResult.digest, srcInfo.Digest)
ic.c.cachedDiffIDs[srcInfo.Digest] = diffIDResult.digest
// This is safe because we have just computed diffIDResult.Digest ourselves, and in the process
// we have read all of the input blob, so srcInfo.Digest must have been validated by digestingReader.
ic.c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, diffIDResult.digest)
return blobInfo, diffIDResult.digest, nil
}
} else {
return blobInfo, ic.c.cachedDiffIDs[srcInfo.Digest], nil
return blobInfo, cachedDiffID, nil
}
}

Expand Down Expand Up @@ -624,7 +633,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
// === Process input through digestingReader to validate against the expected digest.
// Be paranoid; in case PutBlob somehow managed to ignore an error from digestingReader,
// use a separate validation failure indicator.
// Note that we don't use a stronger "validationSucceeded" indicator, because
// Note that for this check we don't use the stronger "validationSucceeded" indicator, because
// dest.PutBlob may detect that the layer already exists, in which case we don't
// read stream to the end, and validation does not happen.
digestingReader, err := newDigestingReader(srcStream, srcInfo.Digest)
Expand Down Expand Up @@ -660,8 +669,10 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr

// === Deal with layer compression/decompression if necessary
var inputInfo types.BlobInfo
var compressionOperation types.LayerCompression
if canModifyBlob && c.dest.DesiredLayerCompression() == types.Compress && !isCompressed {
logrus.Debugf("Compressing blob on the fly")
compressionOperation = types.Compress
pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()

Expand All @@ -674,6 +685,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
inputInfo.Size = -1
} else if canModifyBlob && c.dest.DesiredLayerCompression() == types.Decompress && isCompressed {
logrus.Debugf("Blob will be decompressed")
compressionOperation = types.Decompress
s, err := decompressor(destStream)
if err != nil {
return types.BlobInfo{}, err
Expand All @@ -684,6 +696,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
inputInfo.Size = -1
} else {
logrus.Debugf("Using original blob without modification")
compressionOperation = types.PreserveOriginal
inputInfo = srcInfo
}

Expand All @@ -699,7 +712,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
}

// === Finally, send the layer stream to dest.
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, isConfig)
uploadedInfo, err := c.dest.PutBlob(ctx, destStream, inputInfo, c.blobInfoCache, isConfig)
if err != nil {
return types.BlobInfo{}, errors.Wrap(err, "Error writing blob")
}
Expand All @@ -722,6 +735,22 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
if inputInfo.Digest != "" && uploadedInfo.Digest != inputInfo.Digest {
return types.BlobInfo{}, errors.Errorf("Internal error writing blob %s, blob with digest %s saved with digest %s", srcInfo.Digest, inputInfo.Digest, uploadedInfo.Digest)
}
if digestingReader.validationSucceeded {
// If compressionOperation != types.PreserveOriginal, we now have two reliable digest values:
// srcinfo.Digest describes the pre-compressionOperation input, verified by digestingReader
// uploadedInfo.Digest describes the post-compressionOperation output, computed by PutBlob
// (because inputInfo.Digest == "", this must have been computed afresh).
switch compressionOperation {
case types.PreserveOriginal:
break // Do nothing, we have only one digest and we might not have even verified it.
case types.Compress:
c.blobInfoCache.RecordDigestUncompressedPair(uploadedInfo.Digest, srcInfo.Digest)
case types.Decompress:
c.blobInfoCache.RecordDigestUncompressedPair(srcInfo.Digest, uploadedInfo.Digest)
default:
return types.BlobInfo{}, errors.Errorf("Internal error: Unexpected compressionOperation value %#v", compressionOperation)
}
}
return uploadedInfo, nil
}

Expand Down
19 changes: 18 additions & 1 deletion copy/copy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func TestDigestingReaderRead(t *testing.T) {
assert.Equal(t, int64(len(c.input)), n, c.digest.String())
assert.Equal(t, c.input, dest.Bytes(), c.digest.String())
assert.False(t, reader.validationFailed, c.digest.String())
assert.True(t, reader.validationSucceeded, c.digest.String())
}
// Modified input
for _, c := range cases {
Expand All @@ -60,7 +61,23 @@ func TestDigestingReaderRead(t *testing.T) {
dest := bytes.Buffer{}
_, err = io.Copy(&dest, reader)
assert.Error(t, err, c.digest.String())
assert.True(t, reader.validationFailed)
assert.True(t, reader.validationFailed, c.digest.String())
assert.False(t, reader.validationSucceeded, c.digest.String())
}
// Truncated input
for _, c := range cases {
source := bytes.NewReader(c.input)
reader, err := newDigestingReader(source, c.digest)
require.NoError(t, err, c.digest.String())
if len(c.input) != 0 {
dest := bytes.Buffer{}
truncatedLen := int64(len(c.input) - 1)
n, err := io.CopyN(&dest, reader, truncatedLen)
assert.NoError(t, err, c.digest.String())
assert.Equal(t, truncatedLen, n, c.digest.String())
}
assert.False(t, reader.validationFailed, c.digest.String())
assert.False(t, reader.validationSucceeded, c.digest.String())
}
}

Expand Down
27 changes: 14 additions & 13 deletions directory/directory_dest.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,11 @@ func (d *dirImageDestination) IgnoresEmbeddedDockerReference() bool {
// PutBlob writes contents of stream and returns data representing the result (with all data filled in).
// inputInfo.Digest can be optionally provided if known; it is not mandatory for the implementation to verify it.
// inputInfo.Size is the expected length of stream, if known.
// May update cache.
// WARNING: The contents of stream are being verified on the fly. Until stream.Read() returns io.EOF, the contents of the data SHOULD NOT be available
// to any other readers for download using the supplied digest.
// If stream.Read() at any time, ESPECIALLY at end of input, returns an error, PutBlob MUST 1) fail, and 2) delete any data stored so far.
func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, isConfig bool) (types.BlobInfo, error) {
func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inputInfo types.BlobInfo, cache types.BlobInfoCache, isConfig bool) (types.BlobInfo, error) {
blobFile, err := ioutil.TempFile(d.ref.path, "dir-put-blob")
if err != nil {
return types.BlobInfo{}, err
Expand Down Expand Up @@ -169,27 +170,27 @@ func (d *dirImageDestination) PutBlob(ctx context.Context, stream io.Reader, inp
return types.BlobInfo{Digest: computedDigest, Size: size}, nil
}

// HasBlob returns true iff the image destination already contains a blob with the matching digest which can be reapplied using ReapplyBlob.
// Unlike PutBlob, the digest can not be empty. If HasBlob returns true, the size of the blob must also be returned.
// If the destination does not contain the blob, or it is unknown, HasBlob ordinarily returns (false, -1, nil);
// it returns a non-nil error only on an unexpected failure.
func (d *dirImageDestination) HasBlob(ctx context.Context, info types.BlobInfo) (bool, int64, error) {
// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
// If canSubstitute, TryReusingBlob can use an equivalent equivalent of the desired blob; in that case the returned info may not match the input.
// If the blob has been succesfully reused, returns (true, info, nil); info must contain at least a digest and size.
// If the transport can not reuse the requested blob, TryReusingBlob returns (false, {}, nil); it returns a non-nil error only on an unexpected failure.
// May use and/or update cache.
func (d *dirImageDestination) TryReusingBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache, canSubstitute bool) (bool, types.BlobInfo, error) {
if info.Digest == "" {
return false, -1, errors.Errorf(`"Can not check for a blob with unknown digest`)
return false, types.BlobInfo{}, errors.Errorf(`"Can not check for a blob with unknown digest`)
}
blobPath := d.ref.layerPath(info.Digest)
finfo, err := os.Stat(blobPath)
if err != nil && os.IsNotExist(err) {
return false, -1, nil
return false, types.BlobInfo{}, nil
}
if err != nil {
return false, -1, err
return false, types.BlobInfo{}, err
}
return true, finfo.Size(), nil
}
return true, types.BlobInfo{Digest: info.Digest, Size: finfo.Size()}, nil

func (d *dirImageDestination) ReapplyBlob(ctx context.Context, info types.BlobInfo) (types.BlobInfo, error) {
return info, nil
}

// PutManifest writes manifest to the destination.
Expand Down
4 changes: 3 additions & 1 deletion directory/directory_src.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func (s *dirImageSource) GetManifest(ctx context.Context, instanceDigest *digest
}

// GetBlob returns a stream for the specified blob, and the blob’s size (or -1 if unknown).
func (s *dirImageSource) GetBlob(ctx context.Context, info types.BlobInfo) (io.ReadCloser, int64, error) {
// The Digest field in BlobInfo is guaranteed to be provided, Size may be -1 and MediaType may be optionally provided.
// May update BlobInfoCache, preferably after it knows for certain that a blob truly exists at a specific location.
func (s *dirImageSource) GetBlob(ctx context.Context, info types.BlobInfo, cache types.BlobInfoCache) (io.ReadCloser, int64, error) {
r, err := os.Open(s.ref.layerPath(info.Digest))
if err != nil {
return nil, -1, err
Expand Down
9 changes: 6 additions & 3 deletions directory/directory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"testing"

"github.com/containers/image/manifest"
"github.com/containers/image/pkg/blobinfocache"
"github.com/containers/image/types"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
Expand Down Expand Up @@ -57,13 +58,14 @@ func TestGetPutManifest(t *testing.T) {
func TestGetPutBlob(t *testing.T) {
ref, tmpDir := refToTempDir(t)
defer os.RemoveAll(tmpDir)
cache := blobinfocache.NewMemoryCache()

blob := []byte("test-blob")
dest, err := ref.NewImageDestination(context.Background(), nil)
require.NoError(t, err)
defer dest.Close()
assert.Equal(t, types.PreserveOriginal, dest.DesiredLayerCompression())
info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, false)
info, err := dest.PutBlob(context.Background(), bytes.NewReader(blob), types.BlobInfo{Digest: digest.Digest("sha256:digest-test"), Size: int64(9)}, cache, false)
assert.NoError(t, err)
err = dest.Commit(context.Background())
assert.NoError(t, err)
Expand All @@ -73,7 +75,7 @@ func TestGetPutBlob(t *testing.T) {
src, err := ref.NewImageSource(context.Background(), nil)
require.NoError(t, err)
defer src.Close()
rc, size, err := src.GetBlob(context.Background(), info)
rc, size, err := src.GetBlob(context.Background(), info, cache)
assert.NoError(t, err)
defer rc.Close()
b, err := ioutil.ReadAll(rc)
Expand All @@ -99,6 +101,7 @@ func TestPutBlobDigestFailure(t *testing.T) {
dirRef, ok := ref.(dirReference)
require.True(t, ok)
blobPath := dirRef.layerPath(blobDigest)
cache := blobinfocache.NewMemoryCache()

firstRead := true
reader := readerFromFunc(func(p []byte) (int, error) {
Expand All @@ -120,7 +123,7 @@ func TestPutBlobDigestFailure(t *testing.T) {
dest, err := ref.NewImageDestination(context.Background(), nil)
require.NoError(t, err)
defer dest.Close()
_, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, false)
_, err = dest.PutBlob(context.Background(), reader, types.BlobInfo{Digest: blobDigest, Size: -1}, cache, false)
assert.Error(t, err)
assert.Contains(t, digestErrorString, err.Error())
err = dest.Commit(context.Background())
Expand Down
23 changes: 23 additions & 0 deletions docker/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package docker

import (
"github.com/containers/image/docker/reference"
"github.com/containers/image/types"
)

// bicTransportScope returns a BICTransportScope appropriate for ref.
func bicTransportScope(ref dockerReference) types.BICTransportScope {
// Blobs can be reused across the whole registry.
return types.BICTransportScope{Opaque: reference.Domain(ref.ref)}
}

// newBICLocationReference returns a BICLocationReference appropriate for ref.
func newBICLocationReference(ref dockerReference) types.BICLocationReference {
// Blobs are scoped to repositories (the tag/digest are not necessary to reuse a blob).
return types.BICLocationReference{Opaque: ref.ref.Name()}
}

// parseBICLocationReference returns a repository for encoded lr.
func parseBICLocationReference(lr types.BICLocationReference) (reference.Named, error) {
return reference.ParseNormalizedNamed(lr.Opaque)
}
Loading

0 comments on commit d53afe1

Please sign in to comment.