Update to enabled containers/image version

Signed-off-by: Silvio Moioli <moio@suse.com>
This commit is contained in:
Silvio Moioli
2021-06-17 16:06:13 +02:00
parent 64dc748e5e
commit 76eb9bc9e9
33 changed files with 2209 additions and 194 deletions

View File

@@ -20,6 +20,7 @@ import (
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/blobinfocache"
"github.com/containers/image/v5/pkg/compression"
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/signature"
"github.com/containers/image/v5/transports"
"github.com/containers/image/v5/types"
@@ -57,7 +58,7 @@ var compressionBufferSize = 1048576
// expectedCompressionFormats is used to check if a blob with a specified media type is compressed
// using the algorithm that the media type says it should be compressed with
var expectedCompressionFormats = map[string]*compression.Algorithm{
var expectedCompressionFormats = map[string]*compressiontypes.Algorithm{
imgspecv1.MediaTypeImageLayerGzip: &compression.Gzip,
imgspecv1.MediaTypeImageLayerZstd: &compression.Zstd,
manifest.DockerV2Schema2LayerMediaType: &compression.Gzip,
@@ -117,12 +118,13 @@ type copier struct {
progress chan types.ProgressProperties
blobInfoCache internalblobinfocache.BlobInfoCache2
copyInParallel bool
compressionFormat compression.Algorithm
compressionFormat compressiontypes.Algorithm
compressionLevel *int
ociDecryptConfig *encconfig.DecryptConfig
ociEncryptConfig *encconfig.EncryptConfig
maxParallelDownloads uint
downloadForeignLayers bool
fetchPartialBlobs bool
}
// imageCopier tracks state specific to a single image (possibly an item of a manifest list)
@@ -194,15 +196,21 @@ type Options struct {
// OciDecryptConfig contains the config that can be used to decrypt an image if it is
// encrypted if non-nil. If nil, it does not attempt to decrypt an image.
OciDecryptConfig *encconfig.DecryptConfig
// MaxParallelDownloads indicates the maximum layers to pull at the same time. A reasonable default is used if this is left as 0.
MaxParallelDownloads uint
// When OptimizeDestinationImageAlreadyExists is set, optimize the copy assuming that the destination image already
// exists (and is equivalent). Making the eventual (no-op) copy more performant for this case. Enabling the option
// is slightly pessimistic if the destination image doesn't exist, or is not equivalent.
OptimizeDestinationImageAlreadyExists bool
// Download layer contents with "nondistributable" media types ("foreign" layers) and translate the layer media type
// to not indicate "nondistributable".
DownloadForeignLayers bool
// FetchPartialBlobs indicates whether to attempt to fetch the blob partially. Experimental.
FetchPartialBlobs bool
}
// validateImageListSelection returns an error if the passed-in value is not one that we recognize as a valid ImageListSelection value
@@ -283,14 +291,11 @@ func Image(ctx context.Context, policyContext *signature.PolicyContext, destRef,
ociEncryptConfig: options.OciEncryptConfig,
maxParallelDownloads: options.MaxParallelDownloads,
downloadForeignLayers: options.DownloadForeignLayers,
fetchPartialBlobs: options.FetchPartialBlobs,
}
// Default to using gzip compression unless specified otherwise.
if options.DestinationCtx == nil || options.DestinationCtx.CompressionFormat == nil {
algo, err := compression.AlgorithmByName("gzip")
if err != nil {
return nil, err
}
c.compressionFormat = algo
c.compressionFormat = compression.Gzip
} else {
c.compressionFormat = *options.DestinationCtx.CompressionFormat
}
@@ -1072,9 +1077,25 @@ func (c *copier) newProgressPool(ctx context.Context) (*mpb.Progress, func()) {
}
}
// customPartialBlobCounter provides a decorator function for the partial blobs retrieval progress bar
func customPartialBlobCounter(filler interface{}, wcc ...decor.WC) decor.Decorator {
producer := func(filler interface{}) decor.DecorFunc {
return func(s decor.Statistics) string {
if s.Total == 0 {
pairFmt := "%.1f / %.1f (skipped: %.1f)"
return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill))
}
pairFmt := "%.1f / %.1f (skipped: %.1f = %.2f%%)"
percentage := 100.0 * float64(s.Refill) / float64(s.Total)
return fmt.Sprintf(pairFmt, decor.SizeB1024(s.Current), decor.SizeB1024(s.Total), decor.SizeB1024(s.Refill), percentage)
}
}
return decor.Any(producer(filler), wcc...)
}
// createProgressBar creates a mpb.Bar in pool. Note that if the copier's reportWriter
// is ioutil.Discard, the progress bar's output will be discarded
func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
func (c *copier) createProgressBar(pool *mpb.Progress, partial bool, info types.BlobInfo, kind string, onComplete string) *mpb.Bar {
// shortDigestLen is the length of the digest used for blobs.
const shortDigestLen = 12
@@ -1091,18 +1112,30 @@ func (c *copier) createProgressBar(pool *mpb.Progress, info types.BlobInfo, kind
// Use a normal progress bar when we know the size (i.e., size > 0).
// Otherwise, use a spinner to indicate that something's happening.
var bar *mpb.Bar
sstyle := mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft()
if info.Size > 0 {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""),
),
)
if partial {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
customPartialBlobCounter(sstyle.Build()),
),
)
} else {
bar = pool.AddBar(info.Size,
mpb.BarFillerClearOnComplete(),
mpb.PrependDecorators(
decor.OnComplete(decor.Name(prefix), onComplete),
),
mpb.AppendDecorators(
decor.OnComplete(decor.CountersKibiByte("%.1f / %.1f"), ""),
),
)
}
} else {
sstyle := mpb.SpinnerStyle(".", "..", "...", "....", "").PositionLeft()
bar = pool.Add(0,
sstyle.Build(),
mpb.BarFillerClearOnComplete(),
@@ -1129,7 +1162,7 @@ func (c *copier) copyConfig(ctx context.Context, src types.Image) error {
destInfo, err := func() (types.BlobInfo, error) { // A scope for defer
progressPool, progressCleanup := c.newProgressPool(ctx)
defer progressCleanup()
bar := c.createProgressBar(progressPool, srcInfo, "config", "done")
bar := c.createProgressBar(progressPool, false, srcInfo, "config", "done")
destInfo, err := c.copyBlobFromStream(ctx, bytes.NewReader(configBlob), srcInfo, nil, false, true, false, bar, -1, false)
if err != nil {
return types.BlobInfo{}, err
@@ -1217,7 +1250,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
if reused {
logrus.Debugf("Skipping blob %s (already present):", srcInfo.Digest)
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "skipped: already exists")
bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "skipped: already exists")
bar.SetTotal(0, true)
// Throw an event that the layer has been skipped
@@ -1244,6 +1277,49 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
}
// A partial pull is managed by the destination storage, that decides what portions
// of the source file are not known yet and must be fetched.
// Attempt a partial only when the source allows to retrieve a blob partially and
// the destination has support for it.
imgSource, okSource := ic.c.rawSource.(internalTypes.ImageSourceSeekable)
imgDest, okDest := ic.c.dest.(internalTypes.ImageDestinationPartial)
if ic.c.fetchPartialBlobs && okSource && okDest && !diffIDIsNeeded {
bar := ic.c.createProgressBar(pool, true, srcInfo, "blob", "done")
progress := make(chan int64)
terminate := make(chan interface{})
defer close(terminate)
defer close(progress)
proxy := imageSourceSeekableProxy{
source: imgSource,
progress: progress,
}
go func() {
for {
select {
case written := <-progress:
bar.IncrInt64(written)
case <-terminate:
return
}
}
}()
bar.SetTotal(srcInfo.Size, false)
info, err := imgDest.PutBlobPartial(ctx, proxy, srcInfo, ic.c.blobInfoCache)
if err == nil {
bar.SetRefill(srcInfo.Size - bar.Current())
bar.SetCurrent(srcInfo.Size)
bar.SetTotal(srcInfo.Size, true)
logrus.Debugf("Retrieved partial blob %v", srcInfo.Digest)
return info, cachedDiffID, nil
}
bar.Abort(true)
logrus.Errorf("Failed to retrieve partial blob: %v", err)
}
// Fallback: copy the layer, computing the diffID if we need to do so
srcStream, srcBlobSize, err := ic.c.rawSource.GetBlob(ctx, srcInfo, ic.c.blobInfoCache)
if err != nil {
@@ -1251,7 +1327,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
}
defer srcStream.Close()
bar := ic.c.createProgressBar(pool, srcInfo, "blob", "done")
bar := ic.c.createProgressBar(pool, false, srcInfo, "blob", "done")
blobInfo, diffIDChan, err := ic.copyLayerFromStream(ctx, srcStream, types.BlobInfo{Digest: srcInfo.Digest, Size: srcBlobSize, MediaType: srcInfo.MediaType, Annotations: srcInfo.Annotations}, diffIDIsNeeded, toEncrypt, bar, layerIndex, emptyLayer)
if err != nil {
@@ -1285,7 +1361,7 @@ func (ic *imageCopier) copyLayer(ctx context.Context, srcInfo types.BlobInfo, to
// and returns a complete blobInfo of the copied blob and perhaps a <-chan diffIDResult if diffIDIsNeeded, to be read by the caller.
func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
diffIDIsNeeded bool, toEncrypt bool, bar *mpb.Bar, layerIndex int, emptyLayer bool) (types.BlobInfo, <-chan diffIDResult, error) {
var getDiffIDRecorder func(compression.DecompressorFunc) io.Writer // = nil
var getDiffIDRecorder func(compressiontypes.DecompressorFunc) io.Writer // = nil
var diffIDChan chan diffIDResult
err := errors.New("Internal error: unexpected panic in copyLayer") // For pipeWriter.CloseWithbelow
@@ -1296,7 +1372,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea
_ = pipeWriter.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil
}()
getDiffIDRecorder = func(decompressor compression.DecompressorFunc) io.Writer {
getDiffIDRecorder = func(decompressor compressiontypes.DecompressorFunc) io.Writer {
// If this fails, e.g. because we have exited and due to pipeWriter.CloseWithError() above further
// reading from the pipe has failed, we dont really care.
// We only read from diffIDChan if the rest of the flow has succeeded, and when we do read from it,
@@ -1315,7 +1391,7 @@ func (ic *imageCopier) copyLayerFromStream(ctx context.Context, srcStream io.Rea
}
// diffIDComputationGoroutine reads all input from layerStream, uncompresses using decompressor if necessary, and sends its digest, and status, if any, to dest.
func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadCloser, decompressor compression.DecompressorFunc) {
func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadCloser, decompressor compressiontypes.DecompressorFunc) {
result := diffIDResult{
digest: "",
err: errors.New("Internal error: unexpected panic in diffIDComputationGoroutine"),
@@ -1327,7 +1403,7 @@ func diffIDComputationGoroutine(dest chan<- diffIDResult, layerStream io.ReadClo
}
// computeDiffID reads all input from layerStream, uncompresses it using decompressor if necessary, and returns its digest.
func computeDiffID(stream io.Reader, decompressor compression.DecompressorFunc) (digest.Digest, error) {
func computeDiffID(stream io.Reader, decompressor compressiontypes.DecompressorFunc) (digest.Digest, error) {
if decompressor != nil {
s, err := decompressor(stream)
if err != nil {
@@ -1360,7 +1436,7 @@ func (r errorAnnotationReader) Read(b []byte) (n int, err error) {
// perhaps (de/re/)compressing it if canModifyBlob,
// and returns a complete blobInfo of the copied blob.
func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, srcInfo types.BlobInfo,
getOriginalLayerCopyWriter func(decompressor compression.DecompressorFunc) io.Writer,
getOriginalLayerCopyWriter func(decompressor compressiontypes.DecompressorFunc) io.Writer,
canModifyBlob bool, isConfig bool, toEncrypt bool, bar *mpb.Bar, layerIndex int, emptyLayer bool) (types.BlobInfo, error) {
if isConfig { // This is guaranteed by the caller, but set it here to be explicit.
canModifyBlob = false
@@ -1425,6 +1501,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
originalLayerReader = destStream
}
compressionMetadata := map[string]string{}
// === Deal with layer compression/decompression if necessary
// WARNING: If you are adding new reasons to change the blob, update also the OptimizeDestinationImageAlreadyExists
// short-circuit conditions
@@ -1453,7 +1530,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
// If this fails while writing data, it will do pipeWriter.CloseWithError(); if it fails otherwise,
// e.g. because we have exited and due to pipeReader.Close() above further writing to the pipe has failed,
// we dont care.
go c.compressGoroutine(pipeWriter, destStream, *uploadCompressionFormat) // Closes pipeWriter
go c.compressGoroutine(pipeWriter, destStream, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter
destStream = pipeReader
inputInfo.Digest = ""
inputInfo.Size = -1
@@ -1473,7 +1550,7 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
pipeReader, pipeWriter := io.Pipe()
defer pipeReader.Close()
go c.compressGoroutine(pipeWriter, s, *uploadCompressionFormat) // Closes pipeWriter
go c.compressGoroutine(pipeWriter, s, compressionMetadata, *uploadCompressionFormat) // Closes pipeWriter
destStream = pipeReader
inputInfo.Digest = ""
@@ -1640,23 +1717,42 @@ func (c *copier) copyBlobFromStream(ctx context.Context, srcStream io.Reader, sr
c.blobInfoCache.RecordDigestCompressorName(srcInfo.Digest, srcCompressorName)
}
}
// Copy all the metadata generated by the compressor into the annotations.
if uploadedInfo.Annotations == nil {
uploadedInfo.Annotations = map[string]string{}
}
for k, v := range compressionMetadata {
uploadedInfo.Annotations[k] = v
}
return uploadedInfo, nil
}
// doCompression reads all input from src and writes its compressed equivalent to dest.
func doCompression(dest io.Writer, src io.Reader, metadata map[string]string, compressionFormat compressiontypes.Algorithm, compressionLevel *int) error {
compressor, err := compression.CompressStreamWithMetadata(dest, metadata, compressionFormat, compressionLevel)
if err != nil {
return err
}
buf := make([]byte, compressionBufferSize)
_, err = io.CopyBuffer(compressor, src, buf) // Sets err to nil, i.e. causes dest.Close()
if err != nil {
compressor.Close()
return err
}
return compressor.Close()
}
// compressGoroutine reads all input from src and writes its compressed equivalent to dest.
func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, compressionFormat compression.Algorithm) {
func (c *copier) compressGoroutine(dest *io.PipeWriter, src io.Reader, metadata map[string]string, compressionFormat compressiontypes.Algorithm) {
err := errors.New("Internal error: unexpected panic in compressGoroutine")
defer func() { // Note that this is not the same as {defer dest.CloseWithError(err)}; we need err to be evaluated lazily.
_ = dest.CloseWithError(err) // CloseWithError(nil) is equivalent to Close(), always returns nil
}()
compressor, err := compression.CompressStream(dest, compressionFormat, c.compressionLevel)
if err != nil {
return
}
defer compressor.Close()
buf := make([]byte, compressionBufferSize)
_, err = io.CopyBuffer(compressor, src, buf) // Sets err to nil, i.e. causes dest.Close()
err = doCompression(dest, src, metadata, compressionFormat, c.compressionLevel)
}

View File

@@ -1,9 +1,11 @@
package copy
import (
"context"
"io"
"time"
internalTypes "github.com/containers/image/v5/internal/types"
"github.com/containers/image/v5/types"
)
@@ -77,3 +79,26 @@ func (r *progressReader) Read(p []byte) (int, error) {
}
return n, err
}
// imageSourceSeekableProxy wraps ImageSourceSeekable and keeps track of how many bytes
// are received.
type imageSourceSeekableProxy struct {
// source is the seekable input to read from.
source internalTypes.ImageSourceSeekable
// progress is the chan where the total number of bytes read so far are reported.
progress chan int64
}
// GetBlobAt reads from the ImageSourceSeekable and report how many bytes were received
// to the progress chan.
func (s imageSourceSeekableProxy) GetBlobAt(ctx context.Context, bInfo types.BlobInfo, chunks []internalTypes.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
rc, errs, err := s.source.GetBlobAt(ctx, bInfo, chunks)
if err == nil {
total := int64(0)
for _, c := range chunks {
total += int64(c.Length)
}
s.progress <- total
}
return rc, errs, err
}

View File

@@ -21,13 +21,26 @@ const version = "Directory Transport Version: 1.1\n"
var ErrNotContainerImageDir = errors.New("not a containers image directory, don't want to overwrite important data")
type dirImageDestination struct {
ref dirReference
compress bool
ref dirReference
desiredLayerCompression types.LayerCompression
}
// newImageDestination returns an ImageDestination for writing to a directory.
func newImageDestination(ref dirReference, compress bool) (types.ImageDestination, error) {
d := &dirImageDestination{ref: ref, compress: compress}
func newImageDestination(sys *types.SystemContext, ref dirReference) (types.ImageDestination, error) {
desiredLayerCompression := types.PreserveOriginal
if sys != nil {
if sys.DirForceCompress {
desiredLayerCompression = types.Compress
if sys.DirForceDecompress {
return nil, errors.Errorf("Cannot compress and decompress at the same time")
}
}
if sys.DirForceDecompress {
desiredLayerCompression = types.Decompress
}
}
d := &dirImageDestination{ref: ref, desiredLayerCompression: desiredLayerCompression}
// If directory exists check if it is empty
// if not empty, check whether the contents match that of a container image directory and overwrite the contents
@@ -101,10 +114,7 @@ func (d *dirImageDestination) SupportsSignatures(ctx context.Context) error {
}
func (d *dirImageDestination) DesiredLayerCompression() types.LayerCompression {
if d.compress {
return types.Compress
}
return types.PreserveOriginal
return d.desiredLayerCompression
}
// AcceptsForeignLayerURLs returns false iff foreign layers in manifest should be actually

View File

@@ -153,11 +153,7 @@ func (ref dirReference) NewImageSource(ctx context.Context, sys *types.SystemCon
// NewImageDestination returns a types.ImageDestination for this reference.
// The caller must call .Close() on the returned ImageDestination.
func (ref dirReference) NewImageDestination(ctx context.Context, sys *types.SystemContext) (types.ImageDestination, error) {
compress := false
if sys != nil {
compress = sys.DirForceCompress
}
return newImageDestination(ref, compress)
return newImageDestination(sys, ref)
}
// DeleteImage deletes the named image from the registry, if supported.

View File

@@ -304,7 +304,7 @@ func CheckAuth(ctx context.Context, sys *types.SystemContext, username, password
Password: password,
}
resp, err := client.makeRequest(ctx, "GET", "/v2/", nil, nil, v2Auth, nil)
resp, err := client.makeRequest(ctx, http.MethodGet, "/v2/", nil, nil, v2Auth, nil)
if err != nil {
return err
}
@@ -343,8 +343,8 @@ func SearchRegistry(ctx context.Context, sys *types.SystemContext, registry, ima
v1Res := &V1Results{}
// Get credentials from authfile for the underlying hostname
// lint:ignore SA1019 We can't use GetCredentialsForRef because we want to search the whole registry.
auth, err := config.GetCredentials(sys, registry) // nolint:staticcheck // https://github.com/golangci/golangci-lint/issues/741
// We can't use GetCredentialsForRef here because we want to search the whole registry.
auth, err := config.GetCredentials(sys, registry)
if err != nil {
return nil, errors.Wrapf(err, "getting username and password")
}
@@ -380,7 +380,7 @@ func SearchRegistry(ctx context.Context, sys *types.SystemContext, registry, ima
u.RawQuery = q.Encode()
logrus.Debugf("trying to talk to v1 search endpoint")
resp, err := client.makeRequest(ctx, "GET", u.String(), nil, nil, noAuth, nil)
resp, err := client.makeRequest(ctx, http.MethodGet, u.String(), nil, nil, noAuth, nil)
if err != nil {
logrus.Debugf("error getting search results from v1 endpoint %q: %v", registry, err)
} else {
@@ -400,14 +400,15 @@ func SearchRegistry(ctx context.Context, sys *types.SystemContext, registry, ima
searchRes := []SearchResult{}
path := "/v2/_catalog"
for len(searchRes) < limit {
resp, err := client.makeRequest(ctx, "GET", path, nil, nil, v2Auth, nil)
resp, err := client.makeRequest(ctx, http.MethodGet, path, nil, nil, v2Auth, nil)
if err != nil {
logrus.Debugf("error getting search results from v2 endpoint %q: %v", registry, err)
return nil, errors.Wrapf(err, "couldn't search registry %q", registry)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logrus.Errorf("error getting search results from v2 endpoint %q: %v", registry, httpResponseToError(resp, ""))
err := httpResponseToError(resp, "")
logrus.Errorf("error getting search results from v2 endpoint %q: %v", registry, err)
return nil, errors.Wrapf(err, "couldn't search registry %q", registry)
}
v2Res := &V2Results{}
@@ -423,7 +424,14 @@ func SearchRegistry(ctx context.Context, sys *types.SystemContext, registry, ima
res := SearchResult{
Name: repo,
}
searchRes = append(searchRes, res)
// bugzilla.redhat.com/show_bug.cgi?id=1976283
// If we have a full match, make sure it's listed as the first result.
// (Note there might be a full match we never see if we reach the result limit first.)
if repo == image {
searchRes = append([]SearchResult{res}, searchRes...)
} else {
searchRes = append(searchRes, res)
}
}
}
@@ -526,11 +534,10 @@ func (c *dockerClient) makeRequestToResolvedURL(ctx context.Context, method, url
// makeRequest should generally be preferred.
// Note that no exponential back off is performed when receiving an http 429 status code.
func (c *dockerClient) makeRequestToResolvedURLOnce(ctx context.Context, method, url string, headers map[string][]string, stream io.Reader, streamLen int64, auth sendAuth, extraScope *authScope) (*http.Response, error) {
req, err := http.NewRequest(method, url, stream)
req, err := http.NewRequestWithContext(ctx, method, url, stream)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
if streamLen != -1 { // Do not blindly overwrite if streamLen == -1, http.NewRequest above can figure out the length of bytes.Reader and similar objects without us having to compute it.
req.ContentLength = streamLen
}
@@ -623,13 +630,11 @@ func (c *dockerClient) getBearerTokenOAuth2(ctx context.Context, challenge chall
return nil, errors.Errorf("missing realm in bearer auth challenge")
}
authReq, err := http.NewRequest(http.MethodPost, realm, nil)
authReq, err := http.NewRequestWithContext(ctx, http.MethodPost, realm, nil)
if err != nil {
return nil, err
}
authReq = authReq.WithContext(ctx)
// Make the form data required against the oauth2 authentication
// More details here: https://docs.docker.com/registry/spec/auth/oauth/
params := authReq.URL.Query()
@@ -673,12 +678,11 @@ func (c *dockerClient) getBearerToken(ctx context.Context, challenge challenge,
return nil, errors.Errorf("missing realm in bearer auth challenge")
}
authReq, err := http.NewRequest(http.MethodGet, realm, nil)
authReq, err := http.NewRequestWithContext(ctx, http.MethodGet, realm, nil)
if err != nil {
return nil, err
}
authReq = authReq.WithContext(ctx)
params := authReq.URL.Query()
if c.auth.Username != "" {
params.Add("account", c.auth.Username)
@@ -732,7 +736,7 @@ func (c *dockerClient) detectPropertiesHelper(ctx context.Context) error {
ping := func(scheme string) error {
url := fmt.Sprintf(resolvedPingV2URL, scheme, c.registry)
resp, err := c.makeRequestToResolvedURL(ctx, "GET", url, nil, nil, -1, noAuth, nil)
resp, err := c.makeRequestToResolvedURL(ctx, http.MethodGet, url, nil, nil, -1, noAuth, nil)
if err != nil {
logrus.Debugf("Ping %s err %s (%#v)", url, err.Error(), err)
return err
@@ -759,7 +763,7 @@ func (c *dockerClient) detectPropertiesHelper(ctx context.Context) error {
// best effort to understand if we're talking to a V1 registry
pingV1 := func(scheme string) bool {
url := fmt.Sprintf(resolvedPingV1URL, scheme, c.registry)
resp, err := c.makeRequestToResolvedURL(ctx, "GET", url, nil, nil, -1, noAuth, nil)
resp, err := c.makeRequestToResolvedURL(ctx, http.MethodGet, url, nil, nil, -1, noAuth, nil)
if err != nil {
logrus.Debugf("Ping %s err %s (%#v)", url, err.Error(), err)
return false
@@ -793,7 +797,7 @@ func (c *dockerClient) detectProperties(ctx context.Context) error {
// using the original data structures.
func (c *dockerClient) getExtensionsSignatures(ctx context.Context, ref dockerReference, manifestDigest digest.Digest) (*extensionSignatureList, error) {
path := fmt.Sprintf(extensionsSignaturePath, reference.Path(ref.ref), manifestDigest)
res, err := c.makeRequest(ctx, "GET", path, nil, nil, v2Auth, nil)
res, err := c.makeRequest(ctx, http.MethodGet, path, nil, nil, v2Auth, nil)
if err != nil {
return nil, err
}

View File

@@ -68,7 +68,7 @@ func GetRepositoryTags(ctx context.Context, sys *types.SystemContext, ref types.
tags := make([]string, 0)
for {
res, err := client.makeRequest(ctx, "GET", path, nil, nil, v2Auth, nil)
res, err := client.makeRequest(ctx, http.MethodGet, path, nil, nil, v2Auth, nil)
if err != nil {
return nil, err
}
@@ -134,7 +134,7 @@ func GetDigest(ctx context.Context, sys *types.SystemContext, ref types.ImageRef
"Accept": manifest.DefaultRequestedManifestMIMETypes,
}
res, err := client.makeRequest(ctx, "HEAD", path, headers, nil, v2Auth, nil)
res, err := client.makeRequest(ctx, http.MethodHead, path, headers, nil, v2Auth, nil)
if err != nil {
return "", err
}

View File

@@ -147,7 +147,7 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,
// FIXME? Chunked upload, progress reporting, etc.
uploadPath := fmt.Sprintf(blobUploadPath, reference.Path(d.ref.ref))
logrus.Debugf("Uploading %s", uploadPath)
res, err := d.c.makeRequest(ctx, "POST", uploadPath, nil, nil, v2Auth, nil)
res, err := d.c.makeRequest(ctx, http.MethodPost, uploadPath, nil, nil, v2Auth, nil)
if err != nil {
return types.BlobInfo{}, err
}
@@ -168,7 +168,7 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,
// This error text should never be user-visible, we terminate only after makeRequestToResolvedURL
// returns, so there isnt a way for the error text to be provided to any of our callers.
defer uploadReader.Terminate(errors.New("Reading data from an already terminated upload"))
res, err = d.c.makeRequestToResolvedURL(ctx, "PATCH", uploadLocation.String(), map[string][]string{"Content-Type": {"application/octet-stream"}}, uploadReader, inputInfo.Size, v2Auth, nil)
res, err = d.c.makeRequestToResolvedURL(ctx, http.MethodPatch, uploadLocation.String(), map[string][]string{"Content-Type": {"application/octet-stream"}}, uploadReader, inputInfo.Size, v2Auth, nil)
if err != nil {
logrus.Debugf("Error uploading layer chunked %v", err)
return nil, err
@@ -194,7 +194,7 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,
// TODO: check inputInfo.Digest == computedDigest https://github.com/containers/image/pull/70#discussion_r77646717
locationQuery.Set("digest", computedDigest.String())
uploadLocation.RawQuery = locationQuery.Encode()
res, err = d.c.makeRequestToResolvedURL(ctx, "PUT", uploadLocation.String(), map[string][]string{"Content-Type": {"application/octet-stream"}}, nil, -1, v2Auth, nil)
res, err = d.c.makeRequestToResolvedURL(ctx, http.MethodPut, uploadLocation.String(), map[string][]string{"Content-Type": {"application/octet-stream"}}, nil, -1, v2Auth, nil)
if err != nil {
return types.BlobInfo{}, err
}
@@ -215,7 +215,7 @@ func (d *dockerImageDestination) PutBlob(ctx context.Context, stream io.Reader,
func (d *dockerImageDestination) blobExists(ctx context.Context, repo reference.Named, digest digest.Digest, extraScope *authScope) (bool, int64, error) {
checkPath := fmt.Sprintf(blobsPath, reference.Path(repo), digest.String())
logrus.Debugf("Checking %s", checkPath)
res, err := d.c.makeRequest(ctx, "HEAD", checkPath, nil, nil, v2Auth, extraScope)
res, err := d.c.makeRequest(ctx, http.MethodHead, checkPath, nil, nil, v2Auth, extraScope)
if err != nil {
return false, -1, err
}
@@ -246,7 +246,7 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc
}
mountPath := u.String()
logrus.Debugf("Trying to mount %s", mountPath)
res, err := d.c.makeRequest(ctx, "POST", mountPath, nil, nil, v2Auth, extraScope)
res, err := d.c.makeRequest(ctx, http.MethodPost, mountPath, nil, nil, v2Auth, extraScope)
if err != nil {
return err
}
@@ -264,7 +264,7 @@ func (d *dockerImageDestination) mountBlob(ctx context.Context, srcRepo referenc
return errors.Wrap(err, "determining upload URL after a mount attempt")
}
logrus.Debugf("... started an upload instead of mounting, trying to cancel at %s", uploadLocation.String())
res2, err := d.c.makeRequestToResolvedURL(ctx, "DELETE", uploadLocation.String(), nil, nil, -1, v2Auth, extraScope)
res2, err := d.c.makeRequestToResolvedURL(ctx, http.MethodDelete, uploadLocation.String(), nil, nil, -1, v2Auth, extraScope)
if err != nil {
logrus.Debugf("Error trying to cancel an inadvertent upload: %s", err)
} else {
@@ -424,7 +424,7 @@ func (d *dockerImageDestination) PutManifest(ctx context.Context, m []byte, inst
if mimeType != "" {
headers["Content-Type"] = []string{mimeType}
}
res, err := d.c.makeRequest(ctx, "PUT", path, headers, bytes.NewReader(m), v2Auth, nil)
res, err := d.c.makeRequest(ctx, http.MethodPut, path, headers, bytes.NewReader(m), v2Auth, nil)
if err != nil {
return err
}
@@ -640,7 +640,7 @@ sigExists:
}
path := fmt.Sprintf(extensionsSignaturePath, reference.Path(d.ref.ref), manifestDigest.String())
res, err := d.c.makeRequest(ctx, "PUT", path, nil, bytes.NewReader(body), v2Auth, nil)
res, err := d.c.makeRequest(ctx, http.MethodPut, path, nil, bytes.NewReader(body), v2Auth, nil)
if err != nil {
return err
}

View File

@@ -6,14 +6,17 @@ import (
"io"
"io/ioutil"
"mime"
"mime/multipart"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"sync"
"github.com/containers/image/v5/docker/reference"
"github.com/containers/image/v5/internal/iolimits"
internalTypes "github.com/containers/image/v5/internal/types"
"github.com/containers/image/v5/manifest"
"github.com/containers/image/v5/pkg/sysregistriesv2"
"github.com/containers/image/v5/types"
@@ -189,7 +192,7 @@ func (s *dockerImageSource) fetchManifest(ctx context.Context, tagOrDigest strin
headers := map[string][]string{
"Accept": manifest.DefaultRequestedManifestMIMETypes,
}
res, err := s.c.makeRequest(ctx, "GET", path, headers, nil, v2Auth, nil)
res, err := s.c.makeRequest(ctx, http.MethodGet, path, headers, nil, v2Auth, nil)
if err != nil {
return nil, "", err
}
@@ -245,7 +248,7 @@ func (s *dockerImageSource) getExternalBlob(ctx context.Context, urls []string)
// NOTE: we must not authenticate on additional URLs as those
// can be abused to leak credentials or tokens. Please
// refer to CVE-2020-15157 for more information.
resp, err = s.c.makeRequestToResolvedURL(ctx, "GET", url, nil, nil, -1, noAuth, nil)
resp, err = s.c.makeRequestToResolvedURL(ctx, http.MethodGet, url, nil, nil, -1, noAuth, nil)
if err == nil {
if resp.StatusCode != http.StatusOK {
err = errors.Errorf("error fetching external blob from %q: %d (%s)", url, resp.StatusCode, http.StatusText(resp.StatusCode))
@@ -275,6 +278,82 @@ func (s *dockerImageSource) HasThreadSafeGetBlob() bool {
return true
}
// GetBlobAt returns a stream for the specified blob.
func (s *dockerImageSource) GetBlobAt(ctx context.Context, info types.BlobInfo, chunks []internalTypes.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
headers := make(map[string][]string)
var rangeVals []string
for _, c := range chunks {
rangeVals = append(rangeVals, fmt.Sprintf("%d-%d", c.Offset, c.Offset+c.Length-1))
}
headers["Range"] = []string{fmt.Sprintf("bytes=%s", strings.Join(rangeVals, ","))}
if len(info.URLs) != 0 {
return nil, nil, fmt.Errorf("external URLs not supported with GetBlobAt")
}
path := fmt.Sprintf(blobsPath, reference.Path(s.physicalRef.ref), info.Digest.String())
logrus.Debugf("Downloading %s", path)
res, err := s.c.makeRequest(ctx, http.MethodGet, path, headers, nil, v2Auth, nil)
if err != nil {
return nil, nil, err
}
if err := httpResponseToError(res, "Error fetching partial blob"); err != nil {
if res.Body != nil {
res.Body.Close()
}
return nil, nil, err
}
if res.StatusCode != http.StatusPartialContent {
res.Body.Close()
return nil, nil, errors.Errorf("invalid status code returned when fetching blob %d (%s)", res.StatusCode, http.StatusText(res.StatusCode))
}
mediaType, params, err := mime.ParseMediaType(res.Header.Get("Content-Type"))
if err != nil {
return nil, nil, err
}
streams := make(chan io.ReadCloser)
errs := make(chan error)
go func() {
defer close(streams)
defer close(errs)
if !strings.HasPrefix(mediaType, "multipart/") {
streams <- res.Body
return
}
boundary, found := params["boundary"]
if !found {
errs <- errors.Errorf("could not find boundary")
return
}
buffered := makeBufferedNetworkReader(res.Body, 64, 16384)
defer buffered.Close()
mr := multipart.NewReader(buffered, boundary)
for {
p, err := mr.NextPart()
if err != nil {
if err != io.EOF {
errs <- err
}
return
}
s := signalCloseReader{
Closed: make(chan interface{}),
Stream: p,
}
streams <- s
// NextPart() cannot be called while the current part
// is being read, so wait until it is closed
<-s.Closed
}
}()
return streams, errs, nil
}
// GetBlob returns a stream for the specified blob, and the blobs size (or -1 if unknown).
// The Digest field in BlobInfo is guaranteed to be provided, Size may be -1 and MediaType may be optionally provided.
// May update BlobInfoCache, preferably after it knows for certain that a blob truly exists at a specific location.
@@ -285,7 +364,7 @@ func (s *dockerImageSource) GetBlob(ctx context.Context, info types.BlobInfo, ca
path := fmt.Sprintf(blobsPath, reference.Path(s.physicalRef.ref), info.Digest.String())
logrus.Debugf("Downloading %s", path)
res, err := s.c.makeRequest(ctx, "GET", path, nil, nil, v2Auth, nil)
res, err := s.c.makeRequest(ctx, http.MethodGet, path, nil, nil, v2Auth, nil)
if err != nil {
return nil, 0, err
}
@@ -375,11 +454,10 @@ func (s *dockerImageSource) getOneSignature(ctx context.Context, url *url.URL) (
case "http", "https":
logrus.Debugf("GET %s", url)
req, err := http.NewRequest("GET", url.String(), nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil)
if err != nil {
return nil, false, err
}
req = req.WithContext(ctx)
res, err := s.c.client.Do(req)
if err != nil {
return nil, false, err
@@ -444,7 +522,7 @@ func deleteImage(ctx context.Context, sys *types.SystemContext, ref dockerRefere
return err
}
getPath := fmt.Sprintf(manifestPath, reference.Path(ref.ref), refTail)
get, err := c.makeRequest(ctx, "GET", getPath, headers, nil, v2Auth, nil)
get, err := c.makeRequest(ctx, http.MethodGet, getPath, headers, nil, v2Auth, nil)
if err != nil {
return err
}
@@ -466,7 +544,7 @@ func deleteImage(ctx context.Context, sys *types.SystemContext, ref dockerRefere
// When retrieving the digest from a registry >= 2.3 use the following header:
// "Accept": "application/vnd.docker.distribution.manifest.v2+json"
delete, err := c.makeRequest(ctx, "DELETE", deletePath, headers, nil, v2Auth, nil)
delete, err := c.makeRequest(ctx, http.MethodDelete, deletePath, headers, nil, v2Auth, nil)
if err != nil {
return err
}
@@ -498,3 +576,119 @@ func deleteImage(ctx context.Context, sys *types.SystemContext, ref dockerRefere
return nil
}
type bufferedNetworkReaderBuffer struct {
data []byte
len int
consumed int
err error
}
type bufferedNetworkReader struct {
stream io.Reader
emptyBuffer chan *bufferedNetworkReaderBuffer
readyBuffer chan *bufferedNetworkReaderBuffer
terminate chan bool
current *bufferedNetworkReaderBuffer
mutex sync.Mutex
gotEOF bool
}
// handleBufferedNetworkReader runs in a goroutine
func handleBufferedNetworkReader(br *bufferedNetworkReader) {
defer close(br.readyBuffer)
for {
select {
case b := <-br.emptyBuffer:
b.len, b.err = br.stream.Read(b.data)
br.readyBuffer <- b
if b.err != nil {
return
}
case <-br.terminate:
return
}
}
}
func (n *bufferedNetworkReader) Close() {
close(n.terminate)
close(n.emptyBuffer)
}
func (n *bufferedNetworkReader) read(p []byte) (int, error) {
if n.current != nil {
copied := copy(p, n.current.data[n.current.consumed:n.current.len])
n.current.consumed += copied
if n.current.consumed == n.current.len {
n.emptyBuffer <- n.current
n.current = nil
}
if copied > 0 {
return copied, nil
}
}
if n.gotEOF {
return 0, io.EOF
}
var b *bufferedNetworkReaderBuffer
select {
case b = <-n.readyBuffer:
if b.err != nil {
if b.err != io.EOF {
return b.len, b.err
}
n.gotEOF = true
}
b.consumed = 0
n.current = b
return n.read(p)
case <-n.terminate:
return 0, io.EOF
}
}
func (n *bufferedNetworkReader) Read(p []byte) (int, error) {
n.mutex.Lock()
defer n.mutex.Unlock()
return n.read(p)
}
func makeBufferedNetworkReader(stream io.Reader, nBuffers, bufferSize uint) *bufferedNetworkReader {
br := bufferedNetworkReader{
stream: stream,
emptyBuffer: make(chan *bufferedNetworkReaderBuffer, nBuffers),
readyBuffer: make(chan *bufferedNetworkReaderBuffer, nBuffers),
terminate: make(chan bool),
}
go func() {
handleBufferedNetworkReader(&br)
}()
for i := uint(0); i < nBuffers; i++ {
b := bufferedNetworkReaderBuffer{
data: make([]byte, bufferSize),
}
br.emptyBuffer <- &b
}
return &br
}
type signalCloseReader struct {
Closed chan interface{}
Stream io.ReadCloser
}
func (s signalCloseReader) Read(p []byte) (int, error) {
return s.Stream.Read(p)
}
func (s signalCloseReader) Close() error {
defer close(s.Closed)
return s.Stream.Close()
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
internalTypes "github.com/containers/image/v5/internal/types"
"github.com/docker/distribution/registry/client"
perrors "github.com/pkg/errors"
)
@@ -32,11 +33,15 @@ func httpResponseToError(res *http.Response, context string) error {
switch res.StatusCode {
case http.StatusOK:
return nil
case http.StatusPartialContent:
return nil
case http.StatusTooManyRequests:
return ErrTooManyRequests
case http.StatusUnauthorized:
err := client.HandleErrorResponse(res)
return ErrUnauthorizedForCredentials{Err: err}
case http.StatusBadRequest:
return internalTypes.BadPartialRequestError{Status: res.Status}
default:
if context != "" {
context = context + ": "

View File

@@ -2,6 +2,7 @@ package blobinfocache
import (
"github.com/containers/image/v5/pkg/compression"
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/types"
digest "github.com/opencontainers/go-digest"
)
@@ -47,7 +48,7 @@ func CandidateLocationsFromV2(v2candidates []BICReplacementCandidate2) []types.B
// compression algorithm, or Uncompressed, or UnknownCompression. This is typically used by
// TryReusingBlob() implementations to set values in the BlobInfo structure that they return
// upon success.
func OperationAndAlgorithmForCompressor(compressorName string) (types.LayerCompression, *compression.Algorithm, error) {
func OperationAndAlgorithmForCompressor(compressorName string) (types.LayerCompression, *compressiontypes.Algorithm, error) {
switch compressorName {
case Uncompressed:
return types.Decompress, nil, nil

View File

@@ -58,3 +58,33 @@ type TryReusingBlobOptions struct {
// The reference of the image that contains the target blob.
SrcRef reference.Named
}
// ImageSourceChunk is a portion of a blob.
// This API is experimental and can be changed without bumping the major version number.
type ImageSourceChunk struct {
Offset uint64
Length uint64
}
// ImageSourceSeekable is an image source that permits to fetch chunks of the entire blob.
// This API is experimental and can be changed without bumping the major version number.
type ImageSourceSeekable interface {
// GetBlobAt returns a stream for the specified blob.
GetBlobAt(context.Context, publicTypes.BlobInfo, []ImageSourceChunk) (chan io.ReadCloser, chan error, error)
}
// ImageDestinationPartial is a service to store a blob by requesting the missing chunks to a ImageSourceSeekable.
// This API is experimental and can be changed without bumping the major version number.
type ImageDestinationPartial interface {
// PutBlobPartial writes contents of stream and returns data representing the result.
PutBlobPartial(ctx context.Context, stream ImageSourceSeekable, srcInfo publicTypes.BlobInfo, cache publicTypes.BlobInfoCache) (publicTypes.BlobInfo, error)
}
// BadPartialRequestError is returned by ImageSourceSeekable.GetBlobAt on an invalid request.
type BadPartialRequestError struct {
Status string
}
func (e BadPartialRequestError) Error() string {
return e.Status
}

View File

@@ -3,7 +3,7 @@ package manifest
import (
"fmt"
"github.com/containers/image/v5/pkg/compression"
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/types"
"github.com/sirupsen/logrus"
)
@@ -44,7 +44,7 @@ func layerInfosToStrings(infos []LayerInfo) []string {
// compressionMIMETypeSet describes a set of MIME type “variants” that represent differently-compressed
// versions of “the same kind of content”.
// The map key is the return value of compression.Algorithm.Name(), or mtsUncompressed;
// The map key is the return value of compressiontypes.Algorithm.Name(), or mtsUncompressed;
// the map value is a MIME type, or mtsUnsupportedMIMEType to mean "recognized but unsupported".
type compressionMIMETypeSet map[string]string
@@ -59,7 +59,7 @@ const mtsUnsupportedMIMEType = "" // A value in compressionMIMETypeSet that mean
// If the compression algorithm is unrecognized, or mimeType is not known to have variants that
// differ from it only in what type of compression has been applied, the returned error will not be
// a ManifestLayerCompressionIncompatibilityError.
func compressionVariantMIMEType(variantTable []compressionMIMETypeSet, mimeType string, algorithm *compression.Algorithm) (string, error) {
func compressionVariantMIMEType(variantTable []compressionMIMETypeSet, mimeType string, algorithm *compressiontypes.Algorithm) (string, error) {
if mimeType == mtsUnsupportedMIMEType { // Prevent matching against the {algo:mtsUnsupportedMIMEType} entries
return "", fmt.Errorf("cannot update unknown MIME type")
}
@@ -68,7 +68,7 @@ func compressionVariantMIMEType(variantTable []compressionMIMETypeSet, mimeType
if mt == mimeType { // Found the variant
name := mtsUncompressed
if algorithm != nil {
name = algorithm.Name()
name = algorithm.InternalUnstableUndocumentedMIMEQuestionMark()
}
if res, ok := variants[name]; ok {
if res != mtsUnsupportedMIMEType {

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"time"
"github.com/containers/image/v5/pkg/compression"
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/pkg/strslice"
"github.com/containers/image/v5/types"
"github.com/opencontainers/go-digest"
@@ -214,14 +214,14 @@ func (m *Schema2) LayerInfos() []LayerInfo {
var schema2CompressionMIMETypeSets = []compressionMIMETypeSet{
{
mtsUncompressed: DockerV2Schema2ForeignLayerMediaType,
compression.Gzip.Name(): DockerV2Schema2ForeignLayerMediaTypeGzip,
compression.Zstd.Name(): mtsUnsupportedMIMEType,
mtsUncompressed: DockerV2Schema2ForeignLayerMediaType,
compressiontypes.GzipAlgorithmName: DockerV2Schema2ForeignLayerMediaTypeGzip,
compressiontypes.ZstdAlgorithmName: mtsUnsupportedMIMEType,
},
{
mtsUncompressed: DockerV2SchemaLayerMediaTypeUncompressed,
compression.Gzip.Name(): DockerV2Schema2LayerMediaType,
compression.Zstd.Name(): mtsUnsupportedMIMEType,
mtsUncompressed: DockerV2SchemaLayerMediaTypeUncompressed,
compressiontypes.GzipAlgorithmName: DockerV2Schema2LayerMediaType,
compressiontypes.ZstdAlgorithmName: mtsUnsupportedMIMEType,
},
}

View File

@@ -5,7 +5,7 @@ import (
"fmt"
"strings"
"github.com/containers/image/v5/pkg/compression"
compressiontypes "github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/image/v5/types"
ociencspec "github.com/containers/ocicrypt/spec"
"github.com/opencontainers/go-digest"
@@ -96,14 +96,14 @@ func (m *OCI1) LayerInfos() []LayerInfo {
var oci1CompressionMIMETypeSets = []compressionMIMETypeSet{
{
mtsUncompressed: imgspecv1.MediaTypeImageLayerNonDistributable,
compression.Gzip.Name(): imgspecv1.MediaTypeImageLayerNonDistributableGzip,
compression.Zstd.Name(): imgspecv1.MediaTypeImageLayerNonDistributableZstd,
mtsUncompressed: imgspecv1.MediaTypeImageLayerNonDistributable,
compressiontypes.GzipAlgorithmName: imgspecv1.MediaTypeImageLayerNonDistributableGzip,
compressiontypes.ZstdAlgorithmName: imgspecv1.MediaTypeImageLayerNonDistributableZstd,
},
{
mtsUncompressed: imgspecv1.MediaTypeImageLayer,
compression.Gzip.Name(): imgspecv1.MediaTypeImageLayerGzip,
compression.Zstd.Name(): imgspecv1.MediaTypeImageLayerZstd,
mtsUncompressed: imgspecv1.MediaTypeImageLayer,
compressiontypes.GzipAlgorithmName: imgspecv1.MediaTypeImageLayerGzip,
compressiontypes.ZstdAlgorithmName: imgspecv1.MediaTypeImageLayerZstd,
},
}

View File

@@ -148,13 +148,13 @@ func (s *ociImageSource) getExternalBlob(ctx context.Context, urls []string) (io
errWrap := errors.New("failed fetching external blob from all urls")
for _, url := range urls {
req, err := http.NewRequest("GET", url, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
errWrap = errors.Wrapf(errWrap, "fetching %s failed %s", url, err.Error())
continue
}
resp, err := s.client.Do(req.WithContext(ctx))
resp, err := s.client.Do(req)
if err != nil {
errWrap = errors.Wrapf(errWrap, "fetching %s failed %s", url, err.Error())
continue

View File

@@ -79,11 +79,10 @@ func (c *openshiftClient) doRequest(ctx context.Context, method, path string, re
logrus.Debugf("Will send body: %s", requestBody)
requestBodyReader = bytes.NewReader(requestBody)
}
req, err := http.NewRequest(method, url.String(), requestBodyReader)
req, err := http.NewRequestWithContext(ctx, method, url.String(), requestBodyReader)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
if len(c.bearerToken) != 0 {
req.Header.Set("Authorization", "Bearer "+c.bearerToken)
@@ -137,7 +136,7 @@ func (c *openshiftClient) doRequest(ctx context.Context, method, path string, re
func (c *openshiftClient) getImage(ctx context.Context, imageStreamImageName string) (*image, error) {
// FIXME: validate components per validation.IsValidPathSegmentName?
path := fmt.Sprintf("/oapi/v1/namespaces/%s/imagestreamimages/%s@%s", c.ref.namespace, c.ref.stream, imageStreamImageName)
body, err := c.doRequest(ctx, "GET", path, nil)
body, err := c.doRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return nil, err
}
@@ -273,7 +272,7 @@ func (s *openshiftImageSource) ensureImageIsResolved(ctx context.Context) error
// FIXME: validate components per validation.IsValidPathSegmentName?
path := fmt.Sprintf("/oapi/v1/namespaces/%s/imagestreams/%s", s.client.ref.namespace, s.client.ref.stream)
body, err := s.client.doRequest(ctx, "GET", path, nil)
body, err := s.client.doRequest(ctx, http.MethodGet, path, nil)
if err != nil {
return err
}
@@ -496,7 +495,7 @@ sigExists:
if err != nil {
return err
}
_, err = d.client.doRequest(ctx, "POST", "/oapi/v1/imagesignatures", body)
_, err = d.client.doRequest(ctx, http.MethodPost, "/oapi/v1/imagesignatures", body)
if err != nil {
return err
}

View File

@@ -9,6 +9,7 @@ import (
"github.com/containers/image/v5/pkg/compression/internal"
"github.com/containers/image/v5/pkg/compression/types"
"github.com/containers/storage/pkg/chunked/compressor"
"github.com/klauspost/pgzip"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@@ -20,19 +21,27 @@ type Algorithm = types.Algorithm
var (
// Gzip compression.
Gzip = internal.NewAlgorithm("gzip", []byte{0x1F, 0x8B, 0x08}, GzipDecompressor, gzipCompressor)
Gzip = internal.NewAlgorithm(types.GzipAlgorithmName, types.GzipAlgorithmName,
[]byte{0x1F, 0x8B, 0x08}, GzipDecompressor, gzipCompressor)
// Bzip2 compression.
Bzip2 = internal.NewAlgorithm("bzip2", []byte{0x42, 0x5A, 0x68}, Bzip2Decompressor, bzip2Compressor)
Bzip2 = internal.NewAlgorithm(types.Bzip2AlgorithmName, types.Bzip2AlgorithmName,
[]byte{0x42, 0x5A, 0x68}, Bzip2Decompressor, bzip2Compressor)
// Xz compression.
Xz = internal.NewAlgorithm("Xz", []byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, XzDecompressor, xzCompressor)
Xz = internal.NewAlgorithm(types.XzAlgorithmName, types.XzAlgorithmName,
[]byte{0xFD, 0x37, 0x7A, 0x58, 0x5A, 0x00}, XzDecompressor, xzCompressor)
// Zstd compression.
Zstd = internal.NewAlgorithm("zstd", []byte{0x28, 0xb5, 0x2f, 0xfd}, ZstdDecompressor, zstdCompressor)
Zstd = internal.NewAlgorithm(types.ZstdAlgorithmName, types.ZstdAlgorithmName,
[]byte{0x28, 0xb5, 0x2f, 0xfd}, ZstdDecompressor, zstdCompressor)
// Zstd:chunked compression.
ZstdChunked = internal.NewAlgorithm(types.ZstdChunkedAlgorithmName, types.ZstdAlgorithmName, /* Note: InternalUnstableUndocumentedMIMEQuestionMark is not ZstdChunkedAlgorithmName */
nil, ZstdDecompressor, compressor.ZstdCompressor)
compressionAlgorithms = map[string]Algorithm{
Gzip.Name(): Gzip,
Bzip2.Name(): Bzip2,
Xz.Name(): Xz,
Zstd.Name(): Zstd,
Gzip.Name(): Gzip,
Bzip2.Name(): Bzip2,
Xz.Name(): Xz,
Zstd.Name(): Zstd,
ZstdChunked.Name(): ZstdChunked,
}
)
@@ -69,7 +78,7 @@ func XzDecompressor(r io.Reader) (io.ReadCloser, error) {
}
// gzipCompressor is a CompressorFunc for the gzip compression algorithm.
func gzipCompressor(r io.Writer, level *int) (io.WriteCloser, error) {
func gzipCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
if level != nil {
return pgzip.NewWriterLevel(r, *level)
}
@@ -77,18 +86,25 @@ func gzipCompressor(r io.Writer, level *int) (io.WriteCloser, error) {
}
// bzip2Compressor is a CompressorFunc for the bzip2 compression algorithm.
func bzip2Compressor(r io.Writer, level *int) (io.WriteCloser, error) {
func bzip2Compressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
return nil, fmt.Errorf("bzip2 compression not supported")
}
// xzCompressor is a CompressorFunc for the xz compression algorithm.
func xzCompressor(r io.Writer, level *int) (io.WriteCloser, error) {
func xzCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
return xz.NewWriter(r)
}
// CompressStream returns the compressor by its name
func CompressStream(dest io.Writer, algo Algorithm, level *int) (io.WriteCloser, error) {
return internal.AlgorithmCompressor(algo)(dest, level)
m := map[string]string{}
return internal.AlgorithmCompressor(algo)(dest, m, level)
}
// CompressStreamWithMetadata returns the compressor by its name. If the compression
// generates any metadata, it is written to the provided metadata map.
func CompressStreamWithMetadata(dest io.Writer, metadata map[string]string, algo Algorithm, level *int) (io.WriteCloser, error) {
return internal.AlgorithmCompressor(algo)(dest, metadata, level)
}
// DetectCompressionFormat returns an Algorithm and DecompressorFunc if the input is recognized as a compressed format, an invalid
@@ -107,7 +123,8 @@ func DetectCompressionFormat(input io.Reader) (Algorithm, DecompressorFunc, io.R
var retAlgo Algorithm
var decompressor DecompressorFunc
for _, algo := range compressionAlgorithms {
if bytes.HasPrefix(buffer[:n], internal.AlgorithmPrefix(algo)) {
prefix := internal.AlgorithmPrefix(algo)
if len(prefix) > 0 && bytes.HasPrefix(buffer[:n], prefix) {
logrus.Debugf("Detected compression format %s", algo.Name())
retAlgo = algo
decompressor = internal.AlgorithmDecompressor(algo)

View File

@@ -4,7 +4,7 @@ import "io"
// CompressorFunc writes the compressed stream to the given writer using the specified compression level.
// The caller must call Close() on the stream (even if the input stream does not need closing!).
type CompressorFunc func(io.Writer, *int) (io.WriteCloser, error)
type CompressorFunc func(io.Writer, map[string]string, *int) (io.WriteCloser, error)
// DecompressorFunc returns the decompressed stream, given a compressed stream.
// The caller must call Close() on the decompressed stream (even if the compressed input stream does not need closing!).
@@ -13,7 +13,8 @@ type DecompressorFunc func(io.Reader) (io.ReadCloser, error)
// Algorithm is a compression algorithm that can be used for CompressStream.
type Algorithm struct {
name string
prefix []byte
mime string
prefix []byte // Initial bytes of a stream compressed using this algorithm, or empty to disable detection.
decompressor DecompressorFunc
compressor CompressorFunc
}
@@ -21,9 +22,10 @@ type Algorithm struct {
// NewAlgorithm creates an Algorithm instance.
// This function exists so that Algorithm instances can only be created by code that
// is allowed to import this internal subpackage.
func NewAlgorithm(name string, prefix []byte, decompressor DecompressorFunc, compressor CompressorFunc) Algorithm {
func NewAlgorithm(name, mime string, prefix []byte, decompressor DecompressorFunc, compressor CompressorFunc) Algorithm {
return Algorithm{
name: name,
mime: mime,
prefix: prefix,
decompressor: decompressor,
compressor: compressor,
@@ -35,6 +37,12 @@ func (c Algorithm) Name() string {
return c.name
}
// InternalUnstableUndocumentedMIMEQuestionMark ???
// DO NOT USE THIS anywhere outside of c/image until it is properly documented.
func (c Algorithm) InternalUnstableUndocumentedMIMEQuestionMark() string {
return c.mime
}
// AlgorithmCompressor returns the compressor field of algo.
// This is a function instead of a public method so that it is only callable from by code
// that is allowed to import this internal subpackage.

View File

@@ -11,3 +11,31 @@ type DecompressorFunc = internal.DecompressorFunc
// Algorithm is a compression algorithm provided and supported by pkg/compression.
// It cant be supplied from the outside.
type Algorithm = internal.Algorithm
const (
// GzipAlgorithmName is the name used by pkg/compression.Gzip.
// NOTE: Importing only this /types package does not inherently guarantee a Gzip algorithm
// will actually be available. (In fact it is intended for this types package not to depend
// on any of the implementations.)
GzipAlgorithmName = "gzip"
// Bzip2AlgorithmName is the name used by pkg/compression.Bzip2.
// NOTE: Importing only this /types package does not inherently guarantee a Bzip2 algorithm
// will actually be available. (In fact it is intended for this types package not to depend
// on any of the implementations.)
Bzip2AlgorithmName = "bzip2"
// XzAlgorithmName is the name used by pkg/compression.Xz.
// NOTE: Importing only this /types package does not inherently guarantee a Xz algorithm
// will actually be available. (In fact it is intended for this types package not to depend
// on any of the implementations.)
XzAlgorithmName = "Xz"
// ZstdAlgorithmName is the name used by pkg/compression.Zstd.
// NOTE: Importing only this /types package does not inherently guarantee a Zstd algorithm
// will actually be available. (In fact it is intended for this types package not to depend
// on any of the implementations.)
ZstdAlgorithmName = "zstd"
// ZstdChunkedAlgorithmName is the name used by pkg/compression.ZstdChunked.
// NOTE: Importing only this /types package does not inherently guarantee a ZstdChunked algorithm
// will actually be available. (In fact it is intended for this types package not to depend
// on any of the implementations.)
ZstdChunkedAlgorithmName = "zstd:chunked"
)

View File

@@ -40,13 +40,13 @@ func zstdWriter(dest io.Writer) (io.WriteCloser, error) {
return zstd.NewWriter(dest)
}
func zstdWriterWithLevel(dest io.Writer, level int) (io.WriteCloser, error) {
func zstdWriterWithLevel(dest io.Writer, level int) (*zstd.Encoder, error) {
el := zstd.EncoderLevelFromZstd(level)
return zstd.NewWriter(dest, zstd.WithEncoderLevel(el))
}
// zstdCompressor is a CompressorFunc for the zstd compression algorithm.
func zstdCompressor(r io.Writer, level *int) (io.WriteCloser, error) {
func zstdCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
if level == nil {
return zstdWriter(r)
}

View File

@@ -52,12 +52,19 @@ var (
ErrNotSupported = errors.New("not supported")
)
// SetCredentials stores the username and password in the credential helper or file
// and returns path to file or helper name in format (helper:%s).
// SetCredentials stores the username and password in a location
// appropriate for sys and the users configuration.
// A valid key can be either a registry hostname or additionally a namespace if
// the AuthenticationFileHelper is being unsed.
// Returns a human-redable description of the location that was updated.
// NOTE: The return value is only intended to be read by humans; its form is not an API,
// it may change (or new forms can be added) any time.
func SetCredentials(sys *types.SystemContext, registry, username, password string) (string, error) {
func SetCredentials(sys *types.SystemContext, key, username, password string) (string, error) {
isNamespaced, err := validateKey(key)
if err != nil {
return "", err
}
helpers, err := sysregistriesv2.CredentialHelpers(sys)
if err != nil {
return "", err
@@ -72,33 +79,45 @@ func SetCredentials(sys *types.SystemContext, registry, username, password strin
// Special-case the built-in helpers for auth files.
case sysregistriesv2.AuthenticationFileHelper:
desc, err = modifyJSON(sys, func(auths *dockerConfigFile) (bool, error) {
if ch, exists := auths.CredHelpers[registry]; exists {
return false, setAuthToCredHelper(ch, registry, username, password)
if ch, exists := auths.CredHelpers[key]; exists {
if isNamespaced {
return false, unsupportedNamespaceErr(ch)
}
return false, setAuthToCredHelper(ch, key, username, password)
}
creds := base64.StdEncoding.EncodeToString([]byte(username + ":" + password))
newCreds := dockerAuthConfig{Auth: creds}
auths.AuthConfigs[registry] = newCreds
auths.AuthConfigs[key] = newCreds
return true, nil
})
// External helpers.
default:
desc = fmt.Sprintf("credential helper: %s", helper)
err = setAuthToCredHelper(helper, registry, username, password)
if isNamespaced {
err = unsupportedNamespaceErr(helper)
} else {
desc = fmt.Sprintf("credential helper: %s", helper)
err = setAuthToCredHelper(helper, key, username, password)
}
}
if err != nil {
multiErr = multierror.Append(multiErr, err)
logrus.Debugf("Error storing credentials for %s in credential helper %s: %v", registry, helper, err)
logrus.Debugf("Error storing credentials for %s in credential helper %s: %v", key, helper, err)
continue
}
logrus.Debugf("Stored credentials for %s in credential helper %s", registry, helper)
logrus.Debugf("Stored credentials for %s in credential helper %s", key, helper)
return desc, nil
}
return "", multiErr
}
func unsupportedNamespaceErr(helper string) error {
return errors.Errorf("namespaced key is not supported for credential helper %s", helper)
}
// SetAuthentication stores the username and password in the credential helper or file
func SetAuthentication(sys *types.SystemContext, registry, username, password string) error {
_, err := SetCredentials(sys, registry, username, password)
// See the documentation of SetCredentials for format of "key"
func SetAuthentication(sys *types.SystemContext, key, username, password string) error {
_, err := SetCredentials(sys, key, username, password)
return err
}
@@ -217,9 +236,8 @@ func getAuthFilePaths(sys *types.SystemContext, homeDir string) []authPath {
// file or .docker/config.json, including support for OAuth2 and IdentityToken.
// If an entry is not found, an empty struct is returned.
//
// Deprecated: GetCredentialsForRef should be used in favor of this API
// because it allows different credentials for different repositories on the
// same registry.
// GetCredentialsForRef should almost always be used in favor of this API to
// allow different credentials for different repositories on the same registry.
func GetCredentials(sys *types.SystemContext, registry string) (types.DockerAuthConfig, error) {
return getCredentialsWithHomeDir(sys, nil, registry, homedir.Get())
}
@@ -326,9 +344,16 @@ func getAuthenticationWithHomeDir(sys *types.SystemContext, registry, homeDir st
return auth.Username, auth.Password, nil
}
// RemoveAuthentication removes credentials for `registry` from all possible
// RemoveAuthentication removes credentials for `key` from all possible
// sources such as credential helpers and auth files.
func RemoveAuthentication(sys *types.SystemContext, registry string) error {
// A valid key can be either a registry hostname or additionally a namespace if
// the AuthenticationFileHelper is being unsed.
func RemoveAuthentication(sys *types.SystemContext, key string) error {
isNamespaced, err := validateKey(key)
if err != nil {
return err
}
helpers, err := sysregistriesv2.CredentialHelpers(sys)
if err != nil {
return err
@@ -338,17 +363,22 @@ func RemoveAuthentication(sys *types.SystemContext, registry string) error {
isLoggedIn := false
removeFromCredHelper := func(helper string) {
err := deleteAuthFromCredHelper(helper, registry)
if err == nil {
logrus.Debugf("Credentials for %q were deleted from credential helper %s", registry, helper)
isLoggedIn = true
if isNamespaced {
logrus.Debugf("Not removing credentials because namespaced keys are not supported for the credential helper: %s", helper)
return
} else {
err := deleteAuthFromCredHelper(helper, key)
if err == nil {
logrus.Debugf("Credentials for %q were deleted from credential helper %s", key, helper)
isLoggedIn = true
return
}
if credentials.IsErrCredentialsNotFoundMessage(err.Error()) {
logrus.Debugf("Not logged in to %s with credential helper %s", key, helper)
return
}
}
if credentials.IsErrCredentialsNotFoundMessage(err.Error()) {
logrus.Debugf("Not logged in to %s with credential helper %s", registry, helper)
return
}
multiErr = multierror.Append(multiErr, errors.Wrapf(err, "removing credentials for %s from credential helper %s", registry, helper))
multiErr = multierror.Append(multiErr, errors.Wrapf(err, "removing credentials for %s from credential helper %s", key, helper))
}
for _, helper := range helpers {
@@ -357,15 +387,12 @@ func RemoveAuthentication(sys *types.SystemContext, registry string) error {
// Special-case the built-in helper for auth files.
case sysregistriesv2.AuthenticationFileHelper:
_, err = modifyJSON(sys, func(auths *dockerConfigFile) (bool, error) {
if innerHelper, exists := auths.CredHelpers[registry]; exists {
if innerHelper, exists := auths.CredHelpers[key]; exists {
removeFromCredHelper(innerHelper)
}
if _, ok := auths.AuthConfigs[registry]; ok {
if _, ok := auths.AuthConfigs[key]; ok {
isLoggedIn = true
delete(auths.AuthConfigs, registry)
} else if _, ok := auths.AuthConfigs[normalizeRegistry(registry)]; ok {
isLoggedIn = true
delete(auths.AuthConfigs, normalizeRegistry(registry))
delete(auths.AuthConfigs, key)
}
return true, multiErr
})
@@ -638,13 +665,10 @@ func findAuthentication(ref reference.Named, registry, path string, legacyFormat
// The docker.io registry still uses the /v1/ key with a special host name,
// so account for that as well.
registry = normalizeRegistry(registry)
normalizedAuths := map[string]dockerAuthConfig{}
for k, v := range auths.AuthConfigs {
normalizedAuths[normalizeRegistry(k)] = v
}
if val, exists := normalizedAuths[registry]; exists {
return decodeDockerAuth(val)
if normalizeAuthFileKey(k, legacyFormat) == registry {
return decodeDockerAuth(v)
}
}
return types.DockerAuthConfig{}, nil
@@ -695,27 +719,36 @@ func decodeDockerAuth(conf dockerAuthConfig) (types.DockerAuthConfig, error) {
}, nil
}
// convertToHostname converts a registry url which has http|https prepended
// to just an hostname.
// Copied from github.com/docker/docker/registry/auth.go
func convertToHostname(url string) string {
stripped := url
if strings.HasPrefix(url, "http://") {
stripped = strings.TrimPrefix(url, "http://")
} else if strings.HasPrefix(url, "https://") {
stripped = strings.TrimPrefix(url, "https://")
// normalizeAuthFileKey takes a key, converts it to a host name and normalizes
// the resulting registry.
func normalizeAuthFileKey(key string, legacyFormat bool) string {
stripped := strings.TrimPrefix(key, "http://")
stripped = strings.TrimPrefix(stripped, "https://")
if legacyFormat || stripped != key {
stripped = strings.SplitN(stripped, "/", 2)[0]
}
nameParts := strings.SplitN(stripped, "/", 2)
return nameParts[0]
return normalizeRegistry(stripped)
}
// normalizeRegistry converts the provided registry if a known docker.io host
// is provided.
func normalizeRegistry(registry string) string {
normalized := convertToHostname(registry)
switch normalized {
switch registry {
case "registry-1.docker.io", "docker.io":
return "index.docker.io"
}
return normalized
return registry
}
// validateKey verifies that the input key does not have a prefix that is not
// allowed and returns an indicator if the key is namespaced.
func validateKey(key string) (isNamespaced bool, err error) {
if strings.HasPrefix(key, "http://") || strings.HasPrefix(key, "https://") {
return isNamespaced, errors.Errorf("key %s contains http[s]:// prefix", key)
}
// check if the provided key contains one or more subpaths.
return strings.ContainsRune(key, '/'), nil
}

View File

@@ -23,7 +23,9 @@ import (
"github.com/containers/image/v5/pkg/blobinfocache/none"
"github.com/containers/image/v5/types"
"github.com/containers/storage"
"github.com/containers/storage/drivers"
"github.com/containers/storage/pkg/archive"
"github.com/containers/storage/pkg/chunked"
"github.com/containers/storage/pkg/ioutils"
digest "github.com/opencontainers/go-digest"
imgspecv1 "github.com/opencontainers/image-spec/specs-go/v1"
@@ -77,12 +79,13 @@ type storageImageDestination struct {
indexToStorageID map[int]*string
// All accesses to below data are protected by `lock` which is made
// *explicit* in the code.
blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed)
indexToPulledLayerInfo map[int]*manifest.LayerInfo // Mapping from layer (by index) to pulled down blob
blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer
blobDiffIDs map[digest.Digest]digest.Digest // Mapping from layer blobsums to their corresponding DiffIDs
fileSizes map[digest.Digest]int64 // Mapping from layer blobsums to their sizes
filenames map[digest.Digest]string // Mapping from layer blobsums to names of files we used to hold them
currentIndex int // The index of the layer to be committed (i.e., lower indices have already been committed)
indexToPulledLayerInfo map[int]*manifest.LayerInfo // Mapping from layer (by index) to pulled down blob
blobAdditionalLayer map[digest.Digest]storage.AdditionalLayer // Mapping from layer blobsums to their corresponding additional layer
diffOutputs map[digest.Digest]*graphdriver.DriverWithDifferOutput // Mapping from digest to differ output
}
type storageImageCloser struct {
@@ -404,6 +407,7 @@ func newImageDestination(sys *types.SystemContext, imageRef storageReference) (*
SignaturesSizes: make(map[digest.Digest][]int),
indexToStorageID: make(map[int]*string),
indexToPulledLayerInfo: make(map[int]*manifest.LayerInfo),
diffOutputs: make(map[digest.Digest]*graphdriver.DriverWithDifferOutput),
}
return image, nil
}
@@ -419,6 +423,11 @@ func (s *storageImageDestination) Close() error {
for _, al := range s.blobAdditionalLayer {
al.Release()
}
for _, v := range s.diffOutputs {
if v.Target != "" {
_ = s.imageRef.transport.store.CleanupStagingDirectory(v.Target)
}
}
return os.RemoveAll(s.directory)
}
@@ -573,6 +582,61 @@ func (s *storageImageDestination) tryReusingBlobWithSrcRef(ctx context.Context,
return s.tryReusingBlobLocked(ctx, blobinfo, cache, canSubstitute)
}
type zstdFetcher struct {
stream internalTypes.ImageSourceSeekable
ctx context.Context
blobInfo types.BlobInfo
}
// GetBlobAt converts from chunked.GetBlobAt to ImageSourceSeekable.GetBlobAt.
func (f *zstdFetcher) GetBlobAt(chunks []chunked.ImageSourceChunk) (chan io.ReadCloser, chan error, error) {
var newChunks []internalTypes.ImageSourceChunk
for _, v := range chunks {
i := internalTypes.ImageSourceChunk{
Offset: v.Offset,
Length: v.Length,
}
newChunks = append(newChunks, i)
}
rc, errs, err := f.stream.GetBlobAt(f.ctx, f.blobInfo, newChunks)
if _, ok := err.(internalTypes.BadPartialRequestError); ok {
err = chunked.ErrBadRequest{}
}
return rc, errs, err
}
// PutBlobPartial attempts to create a blob using the data that is already present at the destination storage. stream is accessed
// in a non-sequential way to retrieve the missing chunks.
func (s *storageImageDestination) PutBlobPartial(ctx context.Context, stream internalTypes.ImageSourceSeekable, srcInfo types.BlobInfo, cache types.BlobInfoCache) (types.BlobInfo, error) {
fetcher := zstdFetcher{
stream: stream,
ctx: ctx,
blobInfo: srcInfo,
}
differ, err := chunked.GetDiffer(ctx, s.imageRef.transport.store, srcInfo.Size, srcInfo.Annotations, &fetcher)
if err != nil {
return srcInfo, err
}
out, err := s.imageRef.transport.store.ApplyDiffWithDiffer("", nil, differ)
if err != nil {
return srcInfo, err
}
blobDigest := srcInfo.Digest
s.lock.Lock()
s.blobDiffIDs[blobDigest] = blobDigest
s.fileSizes[blobDigest] = 0
s.filenames[blobDigest] = ""
s.diffOutputs[blobDigest] = out
s.lock.Unlock()
return srcInfo, nil
}
// TryReusingBlob checks whether the transport already contains, or can efficiently reuse, a blob, and if so, applies it to the current destination
// (e.g. if the blob is a filesystem layer, this signifies that the changes it describes need to be applied again when composing a filesystem tree).
// info.Digest must not be empty.
@@ -844,6 +908,27 @@ func (s *storageImageDestination) commitLayer(ctx context.Context, blob manifest
return nil
}
s.lock.Lock()
diffOutput, ok := s.diffOutputs[blob.Digest]
s.lock.Unlock()
if ok {
layer, err := s.imageRef.transport.store.CreateLayer(id, lastLayer, nil, "", false, nil)
if err != nil {
return err
}
// FIXME: what to do with the uncompressed digest?
diffOutput.UncompressedDigest = blob.Digest
if err := s.imageRef.transport.store.ApplyDiffFromStagingDirectory(layer.ID, diffOutput.Target, diffOutput, nil); err != nil {
_ = s.imageRef.transport.store.Delete(layer.ID)
return err
}
s.indexToStorageID[index] = &layer.ID
return nil
}
s.lock.Lock()
al, ok := s.blobAdditionalLayer[blob.Digest]
s.lock.Unlock()
@@ -969,6 +1054,7 @@ func (s *storageImageDestination) Commit(ctx context.Context, unparsedToplevel t
return errors.Wrapf(err, "parsing manifest")
}
layerBlobs := man.LayerInfos()
// Extract, commit, or find the layers.
for i, blob := range layerBlobs {
if err := s.commitLayer(ctx, blob, i); err != nil {

View File

@@ -636,6 +636,8 @@ type SystemContext struct {
// === dir.Transport overrides ===
// DirForceCompress compresses the image layers if set to true
DirForceCompress bool
// DirForceDecompress decompresses the image layers if set to true
DirForceDecompress bool
// CompressionFormat is the format to use for the compression of the blobs
CompressionFormat *compression.Algorithm

View File

@@ -6,9 +6,9 @@ const (
// VersionMajor is for an API incompatible changes
VersionMajor = 5
// VersionMinor is for functionality in a backwards-compatible manner
VersionMinor = 13
VersionMinor = 14
// VersionPatch is for backwards-compatible bug fixes
VersionPatch = 3
VersionPatch = 1
// VersionDev indicates development branch. Releases will be empty string.
VersionDev = "-dev"

View File

@@ -0,0 +1,169 @@
package chunked
import (
"bytes"
"encoding/binary"
"fmt"
"io"
"github.com/containers/storage/pkg/chunked/compressor"
"github.com/containers/storage/pkg/chunked/internal"
"github.com/klauspost/compress/zstd"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/vbatts/tar-split/archive/tar"
)
const (
TypeReg = internal.TypeReg
TypeChunk = internal.TypeChunk
TypeLink = internal.TypeLink
TypeChar = internal.TypeChar
TypeBlock = internal.TypeBlock
TypeDir = internal.TypeDir
TypeFifo = internal.TypeFifo
TypeSymlink = internal.TypeSymlink
)
var typesToTar = map[string]byte{
TypeReg: tar.TypeReg,
TypeLink: tar.TypeLink,
TypeChar: tar.TypeChar,
TypeBlock: tar.TypeBlock,
TypeDir: tar.TypeDir,
TypeFifo: tar.TypeFifo,
TypeSymlink: tar.TypeSymlink,
}
func typeToTarType(t string) (byte, error) {
r, found := typesToTar[t]
if !found {
return 0, fmt.Errorf("unknown type: %v", t)
}
return r, nil
}
func isZstdChunkedFrameMagic(data []byte) bool {
if len(data) < 8 {
return false
}
return bytes.Equal(internal.ZstdChunkedFrameMagic, data[:8])
}
// readZstdChunkedManifest reads the zstd:chunked manifest from the seekable stream blobStream. The blob total size must
// be specified.
// This function uses the io.containers.zstd-chunked. annotations when specified.
func readZstdChunkedManifest(blobStream ImageSourceSeekable, blobSize int64, annotations map[string]string) ([]byte, error) {
footerSize := int64(internal.FooterSizeSupported)
if blobSize <= footerSize {
return nil, errors.New("blob too small")
}
manifestChecksumAnnotation := annotations[internal.ManifestChecksumKey]
if manifestChecksumAnnotation == "" {
return nil, fmt.Errorf("manifest checksum annotation %q not found", internal.ManifestChecksumKey)
}
var offset, length, lengthUncompressed, manifestType uint64
if offsetMetadata := annotations[internal.ManifestInfoKey]; offsetMetadata != "" {
if _, err := fmt.Sscanf(offsetMetadata, "%d:%d:%d:%d", &offset, &length, &lengthUncompressed, &manifestType); err != nil {
return nil, err
}
} else {
chunk := ImageSourceChunk{
Offset: uint64(blobSize - footerSize),
Length: uint64(footerSize),
}
parts, errs, err := blobStream.GetBlobAt([]ImageSourceChunk{chunk})
if err != nil {
return nil, err
}
var reader io.ReadCloser
select {
case r := <-parts:
reader = r
case err := <-errs:
return nil, err
}
footer := make([]byte, footerSize)
if _, err := io.ReadFull(reader, footer); err != nil {
return nil, err
}
offset = binary.LittleEndian.Uint64(footer[0:8])
length = binary.LittleEndian.Uint64(footer[8:16])
lengthUncompressed = binary.LittleEndian.Uint64(footer[16:24])
manifestType = binary.LittleEndian.Uint64(footer[24:32])
if !isZstdChunkedFrameMagic(footer[32:40]) {
return nil, errors.New("invalid magic number")
}
}
if manifestType != internal.ManifestTypeCRFS {
return nil, errors.New("invalid manifest type")
}
// set a reasonable limit
if length > (1<<20)*50 {
return nil, errors.New("manifest too big")
}
if lengthUncompressed > (1<<20)*50 {
return nil, errors.New("manifest too big")
}
chunk := ImageSourceChunk{
Offset: offset,
Length: length,
}
parts, errs, err := blobStream.GetBlobAt([]ImageSourceChunk{chunk})
if err != nil {
return nil, err
}
var reader io.ReadCloser
select {
case r := <-parts:
reader = r
case err := <-errs:
return nil, err
}
manifest := make([]byte, length)
if _, err := io.ReadFull(reader, manifest); err != nil {
return nil, err
}
manifestDigester := digest.Canonical.Digester()
manifestChecksum := manifestDigester.Hash()
if _, err := manifestChecksum.Write(manifest); err != nil {
return nil, err
}
d, err := digest.Parse(manifestChecksumAnnotation)
if err != nil {
return nil, err
}
if manifestDigester.Digest() != d {
return nil, errors.New("invalid manifest checksum")
}
decoder, err := zstd.NewReader(nil)
if err != nil {
return nil, err
}
defer decoder.Close()
b := make([]byte, 0, lengthUncompressed)
if decoded, err := decoder.DecodeAll(manifest, b); err == nil {
return decoded, nil
}
return manifest, nil
}
// ZstdCompressor is a CompressorFunc for the zstd compression algorithm.
// Deprecated: Use pkg/chunked/compressor.ZstdCompressor.
func ZstdCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
return compressor.ZstdCompressor(r, metadata, level)
}

View File

@@ -0,0 +1,220 @@
package compressor
// NOTE: This is used from github.com/containers/image by callers that
// don't otherwise use containers/storage, so don't make this depend on any
// larger software like the graph drivers.
import (
"encoding/base64"
"io"
"io/ioutil"
"github.com/containers/storage/pkg/chunked/internal"
"github.com/containers/storage/pkg/ioutils"
"github.com/opencontainers/go-digest"
"github.com/vbatts/tar-split/archive/tar"
)
func writeZstdChunkedStream(destFile io.Writer, outMetadata map[string]string, reader io.Reader, level int) error {
// total written so far. Used to retrieve partial offsets in the file
dest := ioutils.NewWriteCounter(destFile)
tr := tar.NewReader(reader)
tr.RawAccounting = true
buf := make([]byte, 4096)
zstdWriter, err := internal.ZstdWriterWithLevel(dest, level)
if err != nil {
return err
}
defer func() {
if zstdWriter != nil {
zstdWriter.Close()
zstdWriter.Flush()
}
}()
restartCompression := func() (int64, error) {
var offset int64
if zstdWriter != nil {
if err := zstdWriter.Close(); err != nil {
return 0, err
}
if err := zstdWriter.Flush(); err != nil {
return 0, err
}
offset = dest.Count
zstdWriter.Reset(dest)
}
return offset, nil
}
var metadata []internal.ZstdFileMetadata
for {
hdr, err := tr.Next()
if err != nil {
if err == io.EOF {
break
}
return err
}
rawBytes := tr.RawBytes()
if _, err := zstdWriter.Write(rawBytes); err != nil {
return err
}
payloadDigester := digest.Canonical.Digester()
payloadChecksum := payloadDigester.Hash()
payloadDest := io.MultiWriter(payloadChecksum, zstdWriter)
// Now handle the payload, if any
var startOffset, endOffset int64
checksum := ""
for {
read, errRead := tr.Read(buf)
if errRead != nil && errRead != io.EOF {
return err
}
// restart the compression only if there is
// a payload.
if read > 0 {
if startOffset == 0 {
startOffset, err = restartCompression()
if err != nil {
return err
}
}
_, err := payloadDest.Write(buf[:read])
if err != nil {
return err
}
}
if errRead == io.EOF {
if startOffset > 0 {
endOffset, err = restartCompression()
if err != nil {
return err
}
checksum = payloadDigester.Digest().String()
}
break
}
}
typ, err := internal.GetType(hdr.Typeflag)
if err != nil {
return err
}
xattrs := make(map[string]string)
for k, v := range hdr.Xattrs {
xattrs[k] = base64.StdEncoding.EncodeToString([]byte(v))
}
m := internal.ZstdFileMetadata{
Type: typ,
Name: hdr.Name,
Linkname: hdr.Linkname,
Mode: hdr.Mode,
Size: hdr.Size,
UID: hdr.Uid,
GID: hdr.Gid,
ModTime: hdr.ModTime,
AccessTime: hdr.AccessTime,
ChangeTime: hdr.ChangeTime,
Devmajor: hdr.Devmajor,
Devminor: hdr.Devminor,
Xattrs: xattrs,
Digest: checksum,
Offset: startOffset,
EndOffset: endOffset,
// ChunkSize is 0 for the last chunk
ChunkSize: 0,
ChunkOffset: 0,
ChunkDigest: checksum,
}
metadata = append(metadata, m)
}
rawBytes := tr.RawBytes()
if _, err := zstdWriter.Write(rawBytes); err != nil {
return err
}
if err := zstdWriter.Flush(); err != nil {
return err
}
if err := zstdWriter.Close(); err != nil {
return err
}
zstdWriter = nil
return internal.WriteZstdChunkedManifest(dest, outMetadata, uint64(dest.Count), metadata, level)
}
type zstdChunkedWriter struct {
tarSplitOut *io.PipeWriter
tarSplitErr chan error
}
func (w zstdChunkedWriter) Close() error {
err := <-w.tarSplitErr
if err != nil {
w.tarSplitOut.Close()
return err
}
return w.tarSplitOut.Close()
}
func (w zstdChunkedWriter) Write(p []byte) (int, error) {
select {
case err := <-w.tarSplitErr:
w.tarSplitOut.Close()
return 0, err
default:
return w.tarSplitOut.Write(p)
}
}
// zstdChunkedWriterWithLevel writes a zstd compressed tarball where each file is
// compressed separately so it can be addressed separately. Idea based on CRFS:
// https://github.com/google/crfs
// The difference with CRFS is that the zstd compression is used instead of gzip.
// The reason for it is that zstd supports embedding metadata ignored by the decoder
// as part of the compressed stream.
// A manifest json file with all the metadata is appended at the end of the tarball
// stream, using zstd skippable frames.
// The final file will look like:
// [FILE_1][FILE_2]..[FILE_N][SKIPPABLE FRAME 1][SKIPPABLE FRAME 2]
// Where:
// [FILE_N]: [ZSTD HEADER][TAR HEADER][PAYLOAD FILE_N][ZSTD FOOTER]
// [SKIPPABLE FRAME 1]: [ZSTD SKIPPABLE FRAME, SIZE=MANIFEST LENGTH][MANIFEST]
// [SKIPPABLE FRAME 2]: [ZSTD SKIPPABLE FRAME, SIZE=16][MANIFEST_OFFSET][MANIFEST_LENGTH][MANIFEST_LENGTH_UNCOMPRESSED][MANIFEST_TYPE][CHUNKED_ZSTD_MAGIC_NUMBER]
// MANIFEST_OFFSET, MANIFEST_LENGTH, MANIFEST_LENGTH_UNCOMPRESSED and CHUNKED_ZSTD_MAGIC_NUMBER are 64 bits unsigned in little endian format.
func zstdChunkedWriterWithLevel(out io.Writer, metadata map[string]string, level int) (io.WriteCloser, error) {
ch := make(chan error, 1)
r, w := io.Pipe()
go func() {
ch <- writeZstdChunkedStream(out, metadata, r, level)
io.Copy(ioutil.Discard, r)
r.Close()
close(ch)
}()
return zstdChunkedWriter{
tarSplitOut: w,
tarSplitErr: ch,
}, nil
}
// ZstdCompressor is a CompressorFunc for the zstd compression algorithm.
func ZstdCompressor(r io.Writer, metadata map[string]string, level *int) (io.WriteCloser, error) {
if level == nil {
l := 3
level = &l
}
return zstdChunkedWriterWithLevel(r, metadata, *level)
}

View File

@@ -0,0 +1,172 @@
package internal
// NOTE: This is used from github.com/containers/image by callers that
// don't otherwise use containers/storage, so don't make this depend on any
// larger software like the graph drivers.
import (
"archive/tar"
"bytes"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"time"
"github.com/klauspost/compress/zstd"
"github.com/opencontainers/go-digest"
)
type ZstdTOC struct {
Version int `json:"version"`
Entries []ZstdFileMetadata `json:"entries"`
}
type ZstdFileMetadata struct {
Type string `json:"type"`
Name string `json:"name"`
Linkname string `json:"linkName,omitempty"`
Mode int64 `json:"mode,omitempty"`
Size int64 `json:"size"`
UID int `json:"uid"`
GID int `json:"gid"`
ModTime time.Time `json:"modtime"`
AccessTime time.Time `json:"accesstime"`
ChangeTime time.Time `json:"changetime"`
Devmajor int64 `json:"devMajor"`
Devminor int64 `json:"devMinor"`
Xattrs map[string]string `json:"xattrs,omitempty"`
Digest string `json:"digest,omitempty"`
Offset int64 `json:"offset,omitempty"`
EndOffset int64 `json:"endOffset,omitempty"`
// Currently chunking is not supported.
ChunkSize int64 `json:"chunkSize,omitempty"`
ChunkOffset int64 `json:"chunkOffset,omitempty"`
ChunkDigest string `json:"chunkDigest,omitempty"`
}
const (
TypeReg = "reg"
TypeChunk = "chunk"
TypeLink = "hardlink"
TypeChar = "char"
TypeBlock = "block"
TypeDir = "dir"
TypeFifo = "fifo"
TypeSymlink = "symlink"
)
var TarTypes = map[byte]string{
tar.TypeReg: TypeReg,
tar.TypeRegA: TypeReg,
tar.TypeLink: TypeLink,
tar.TypeChar: TypeChar,
tar.TypeBlock: TypeBlock,
tar.TypeDir: TypeDir,
tar.TypeFifo: TypeFifo,
tar.TypeSymlink: TypeSymlink,
}
func GetType(t byte) (string, error) {
r, found := TarTypes[t]
if !found {
return "", fmt.Errorf("unknown tarball type: %v", t)
}
return r, nil
}
const (
ManifestChecksumKey = "io.containers.zstd-chunked.manifest-checksum"
ManifestInfoKey = "io.containers.zstd-chunked.manifest-position"
// ManifestTypeCRFS is a manifest file compatible with the CRFS TOC file.
ManifestTypeCRFS = 1
// FooterSizeSupported is the footer size supported by this implementation.
// Newer versions of the image format might increase this value, so reject
// any version that is not supported.
FooterSizeSupported = 40
)
var (
// when the zstd decoder encounters a skippable frame + 1 byte for the size, it
// will ignore it.
// https://tools.ietf.org/html/rfc8478#section-3.1.2
skippableFrameMagic = []byte{0x50, 0x2a, 0x4d, 0x18}
ZstdChunkedFrameMagic = []byte{0x47, 0x6e, 0x55, 0x6c, 0x49, 0x6e, 0x55, 0x78}
)
func appendZstdSkippableFrame(dest io.Writer, data []byte) error {
if _, err := dest.Write(skippableFrameMagic); err != nil {
return err
}
var size []byte = make([]byte, 4)
binary.LittleEndian.PutUint32(size, uint32(len(data)))
if _, err := dest.Write(size); err != nil {
return err
}
if _, err := dest.Write(data); err != nil {
return err
}
return nil
}
func WriteZstdChunkedManifest(dest io.Writer, outMetadata map[string]string, offset uint64, metadata []ZstdFileMetadata, level int) error {
// 8 is the size of the zstd skippable frame header + the frame size
manifestOffset := offset + 8
toc := ZstdTOC{
Version: 1,
Entries: metadata,
}
// Generate the manifest
manifest, err := json.Marshal(toc)
if err != nil {
return err
}
var compressedBuffer bytes.Buffer
zstdWriter, err := ZstdWriterWithLevel(&compressedBuffer, level)
if err != nil {
return err
}
if _, err := zstdWriter.Write(manifest); err != nil {
zstdWriter.Close()
return err
}
if err := zstdWriter.Close(); err != nil {
return err
}
compressedManifest := compressedBuffer.Bytes()
manifestDigester := digest.Canonical.Digester()
manifestChecksum := manifestDigester.Hash()
if _, err := manifestChecksum.Write(compressedManifest); err != nil {
return err
}
outMetadata[ManifestChecksumKey] = manifestDigester.Digest().String()
outMetadata[ManifestInfoKey] = fmt.Sprintf("%d:%d:%d:%d", manifestOffset, len(compressedManifest), len(manifest), ManifestTypeCRFS)
if err := appendZstdSkippableFrame(dest, compressedManifest); err != nil {
return err
}
// Store the offset to the manifest and its size in LE order
var manifestDataLE []byte = make([]byte, FooterSizeSupported)
binary.LittleEndian.PutUint64(manifestDataLE, manifestOffset)
binary.LittleEndian.PutUint64(manifestDataLE[8:], uint64(len(compressedManifest)))
binary.LittleEndian.PutUint64(manifestDataLE[16:], uint64(len(manifest)))
binary.LittleEndian.PutUint64(manifestDataLE[24:], uint64(ManifestTypeCRFS))
copy(manifestDataLE[32:], ZstdChunkedFrameMagic)
return appendZstdSkippableFrame(dest, manifestDataLE)
}
func ZstdWriterWithLevel(dest io.Writer, level int) (*zstd.Encoder, error) {
el := zstd.EncoderLevelFromZstd(level)
return zstd.NewWriter(dest, zstd.WithEncoderLevel(el))
}

View File

@@ -0,0 +1,26 @@
package chunked
import (
"fmt"
"io"
)
// ImageSourceChunk is a portion of a blob.
type ImageSourceChunk struct {
Offset uint64
Length uint64
}
// ImageSourceSeekable is an image source that permits to fetch chunks of the entire blob.
type ImageSourceSeekable interface {
// GetBlobAt returns a stream for the specified blob.
GetBlobAt([]ImageSourceChunk) (chan io.ReadCloser, chan error, error)
}
// ErrBadRequest is returned when the request is not valid
type ErrBadRequest struct {
}
func (e ErrBadRequest) Error() string {
return fmt.Sprintf("bad request")
}

View File

@@ -0,0 +1,875 @@
package chunked
import (
archivetar "archive/tar"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"sort"
"strings"
"syscall"
"time"
storage "github.com/containers/storage"
graphdriver "github.com/containers/storage/drivers"
driversCopy "github.com/containers/storage/drivers/copy"
"github.com/containers/storage/pkg/archive"
"github.com/containers/storage/pkg/chunked/internal"
"github.com/containers/storage/pkg/idtools"
"github.com/containers/storage/types"
"github.com/klauspost/compress/zstd"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/vbatts/tar-split/archive/tar"
"golang.org/x/sys/unix"
)
const (
maxNumberMissingChunks = 1024
newFileFlags = (unix.O_CREAT | unix.O_TRUNC | unix.O_WRONLY | unix.O_EXCL)
containersOverrideXattr = "user.containers.override_stat"
bigDataKey = "zstd-chunked-manifest"
)
type chunkedZstdDiffer struct {
stream ImageSourceSeekable
manifest []byte
layersMetadata map[string][]internal.ZstdFileMetadata
layersTarget map[string]string
}
func timeToTimespec(time time.Time) (ts unix.Timespec) {
if time.IsZero() {
// Return UTIME_OMIT special value
ts.Sec = 0
ts.Nsec = ((1 << 30) - 2)
return
}
return unix.NsecToTimespec(time.UnixNano())
}
func copyFileContent(src, destFile, root string, dirfd int, missingDirsMode, mode os.FileMode) (*os.File, int64, error) {
st, err := os.Stat(src)
if err != nil {
return nil, -1, err
}
copyWithFileRange, copyWithFileClone := true, true
// If the destination file already exists, we shouldn't blow it away
dstFile, err := openFileUnderRoot(destFile, root, dirfd, newFileFlags, mode)
if err != nil {
return nil, -1, err
}
err = driversCopy.CopyRegularToFile(src, dstFile, st, &copyWithFileRange, &copyWithFileClone)
if err != nil {
dstFile.Close()
return nil, -1, err
}
return dstFile, st.Size(), err
}
func prepareOtherLayersCache(layersMetadata map[string][]internal.ZstdFileMetadata) map[string]map[string]*internal.ZstdFileMetadata {
maps := make(map[string]map[string]*internal.ZstdFileMetadata)
for layerID, v := range layersMetadata {
r := make(map[string]*internal.ZstdFileMetadata)
for i := range v {
r[v[i].Digest] = &v[i]
}
maps[layerID] = r
}
return maps
}
func getLayersCache(store storage.Store) (map[string][]internal.ZstdFileMetadata, map[string]string, error) {
allLayers, err := store.Layers()
if err != nil {
return nil, nil, err
}
layersMetadata := make(map[string][]internal.ZstdFileMetadata)
layersTarget := make(map[string]string)
for _, r := range allLayers {
manifestReader, err := store.LayerBigData(r.ID, bigDataKey)
if err != nil {
continue
}
defer manifestReader.Close()
manifest, err := ioutil.ReadAll(manifestReader)
if err != nil {
return nil, nil, err
}
var toc internal.ZstdTOC
if err := json.Unmarshal(manifest, &toc); err != nil {
continue
}
layersMetadata[r.ID] = toc.Entries
target, err := store.DifferTarget(r.ID)
if err != nil {
return nil, nil, err
}
layersTarget[r.ID] = target
}
return layersMetadata, layersTarget, nil
}
// GetDiffer returns a differ than can be used with ApplyDiffWithDiffer.
func GetDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable) (graphdriver.Differ, error) {
if _, ok := annotations[internal.ManifestChecksumKey]; ok {
return makeZstdChunkedDiffer(ctx, store, blobSize, annotations, iss)
}
return nil, errors.New("blob type not supported for partial retrieval")
}
func makeZstdChunkedDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable) (*chunkedZstdDiffer, error) {
manifest, err := readZstdChunkedManifest(iss, blobSize, annotations)
if err != nil {
return nil, err
}
layersMetadata, layersTarget, err := getLayersCache(store)
if err != nil {
return nil, err
}
return &chunkedZstdDiffer{
stream: iss,
manifest: manifest,
layersMetadata: layersMetadata,
layersTarget: layersTarget,
}, nil
}
func findFileInOtherLayers(file internal.ZstdFileMetadata, root string, dirfd int, layersMetadata map[string]map[string]*internal.ZstdFileMetadata, layersTarget map[string]string, missingDirsMode os.FileMode) (*os.File, int64, error) {
// this is ugly, needs to be indexed
for layerID, checksums := range layersMetadata {
m, found := checksums[file.Digest]
if !found {
continue
}
source, ok := layersTarget[layerID]
if !ok {
continue
}
srcDirfd, err := unix.Open(source, unix.O_RDONLY, 0)
if err != nil {
continue
}
defer unix.Close(srcDirfd)
srcFile, err := openFileUnderRoot(m.Name, source, srcDirfd, unix.O_RDONLY, 0)
if err != nil {
continue
}
defer srcFile.Close()
srcPath := fmt.Sprintf("/proc/self/fd/%d", srcFile.Fd())
dstFile, written, err := copyFileContent(srcPath, file.Name, root, dirfd, missingDirsMode, 0)
if err != nil {
continue
}
return dstFile, written, nil
}
return nil, 0, nil
}
func getFileDigest(f *os.File) (digest.Digest, error) {
digester := digest.Canonical.Digester()
if _, err := io.Copy(digester.Hash(), f); err != nil {
return "", err
}
return digester.Digest(), nil
}
// findFileOnTheHost checks whether the requested file already exist on the host and copies the file content from there if possible.
// It is currently implemented to look only at the file with the same path. Ideally it can detect the same content also at different
// paths.
func findFileOnTheHost(file internal.ZstdFileMetadata, root string, dirfd int, missingDirsMode os.FileMode) (*os.File, int64, error) {
sourceFile := filepath.Clean(filepath.Join("/", file.Name))
if !strings.HasPrefix(sourceFile, "/usr/") {
// limit host deduplication to files under /usr.
return nil, 0, nil
}
st, err := os.Stat(sourceFile)
if err != nil || !st.Mode().IsRegular() {
return nil, 0, nil
}
if st.Size() != file.Size {
return nil, 0, nil
}
fd, err := unix.Open(sourceFile, unix.O_RDONLY|unix.O_NONBLOCK, 0)
if err != nil {
return nil, 0, nil
}
f := os.NewFile(uintptr(fd), "fd")
defer f.Close()
manifestChecksum, err := digest.Parse(file.Digest)
if err != nil {
return nil, 0, err
}
checksum, err := getFileDigest(f)
if err != nil {
return nil, 0, err
}
if checksum != manifestChecksum {
return nil, 0, nil
}
dstFile, written, err := copyFileContent(fmt.Sprintf("/proc/self/fd/%d", fd), file.Name, root, dirfd, missingDirsMode, 0)
if err != nil {
return nil, 0, nil
}
// calculate the checksum again to make sure the file wasn't modified while it was copied
if _, err := f.Seek(0, 0); err != nil {
return nil, 0, err
}
checksum, err = getFileDigest(f)
if err != nil {
return nil, 0, err
}
if checksum != manifestChecksum {
return nil, 0, nil
}
return dstFile, written, nil
}
func maybeDoIDRemap(manifest []internal.ZstdFileMetadata, options *archive.TarOptions) error {
if options.ChownOpts == nil && len(options.UIDMaps) == 0 || len(options.GIDMaps) == 0 {
return nil
}
idMappings := idtools.NewIDMappingsFromMaps(options.UIDMaps, options.GIDMaps)
for i := range manifest {
if options.ChownOpts != nil {
manifest[i].UID = options.ChownOpts.UID
manifest[i].GID = options.ChownOpts.GID
} else {
pair := idtools.IDPair{
UID: manifest[i].UID,
GID: manifest[i].GID,
}
var err error
manifest[i].UID, manifest[i].GID, err = idMappings.ToContainer(pair)
if err != nil {
return err
}
}
}
return nil
}
type missingFile struct {
File *internal.ZstdFileMetadata
Gap int64
}
func (m missingFile) Length() int64 {
return m.File.EndOffset - m.File.Offset
}
type missingChunk struct {
RawChunk ImageSourceChunk
Files []missingFile
}
func setFileAttrs(file *os.File, mode os.FileMode, metadata *internal.ZstdFileMetadata, options *archive.TarOptions) error {
if file == nil || file.Fd() < 0 {
return errors.Errorf("invalid file")
}
fd := int(file.Fd())
t, err := typeToTarType(metadata.Type)
if err != nil {
return err
}
if t == tar.TypeSymlink {
return nil
}
if err := unix.Fchown(fd, metadata.UID, metadata.GID); err != nil {
if !options.IgnoreChownErrors {
return err
}
}
for k, v := range metadata.Xattrs {
data, err := base64.StdEncoding.DecodeString(v)
if err != nil {
return err
}
if err := unix.Fsetxattr(fd, k, data, 0); err != nil {
return err
}
}
ts := []unix.Timespec{timeToTimespec(metadata.AccessTime), timeToTimespec(metadata.ModTime)}
if err := unix.UtimesNanoAt(fd, "", ts, 0); err != nil && errors.Is(err, unix.ENOSYS) {
return err
}
if err := unix.Fchmod(fd, uint32(mode)); err != nil {
return err
}
return nil
}
func openFileUnderRoot(name, root string, dirfd int, flags uint64, mode os.FileMode) (*os.File, error) {
how := unix.OpenHow{
Flags: flags,
Mode: uint64(mode & 07777),
Resolve: unix.RESOLVE_IN_ROOT,
}
fd, err := unix.Openat2(dirfd, name, &how)
if err != nil {
return nil, err
}
return os.NewFile(uintptr(fd), name), nil
}
func createFileFromZstdStream(dest string, dirfd int, reader io.Reader, missingDirsMode, mode os.FileMode, metadata *internal.ZstdFileMetadata, options *archive.TarOptions) (err error) {
file, err := openFileUnderRoot(metadata.Name, dest, dirfd, newFileFlags, 0)
if err != nil {
return err
}
defer func() {
err2 := file.Close()
if err == nil {
err = err2
}
}()
z, err := zstd.NewReader(reader)
if err != nil {
return err
}
defer z.Close()
digester := digest.Canonical.Digester()
checksum := digester.Hash()
_, err = z.WriteTo(io.MultiWriter(file, checksum))
if err != nil {
return err
}
manifestChecksum, err := digest.Parse(metadata.Digest)
if err != nil {
return err
}
if digester.Digest() != manifestChecksum {
return fmt.Errorf("checksum mismatch for %q", dest)
}
return setFileAttrs(file, mode, metadata, options)
}
func storeMissingFiles(streams chan io.ReadCloser, errs chan error, dest string, dirfd int, missingChunks []missingChunk, missingDirsMode os.FileMode, options *archive.TarOptions) error {
for mc := 0; ; mc++ {
var part io.ReadCloser
select {
case p := <-streams:
part = p
case err := <-errs:
return err
}
if part == nil {
if mc == len(missingChunks) {
break
}
return errors.Errorf("invalid stream returned %d %d", mc, len(missingChunks))
}
if mc == len(missingChunks) {
return errors.Errorf("too many chunks returned")
}
for _, mf := range missingChunks[mc].Files {
if mf.Gap > 0 {
limitReader := io.LimitReader(part, mf.Gap)
_, err := io.Copy(ioutil.Discard, limitReader)
if err != nil {
return err
}
continue
}
limitReader := io.LimitReader(part, mf.Length())
if err := createFileFromZstdStream(dest, dirfd, limitReader, missingDirsMode, os.FileMode(mf.File.Mode), mf.File, options); err != nil {
part.Close()
return err
}
}
part.Close()
}
return nil
}
func mergeMissingChunks(missingChunks []missingChunk, target int) []missingChunk {
if len(missingChunks) <= target {
return missingChunks
}
getGap := func(missingChunks []missingChunk, i int) int {
prev := missingChunks[i-1].RawChunk.Offset + missingChunks[i-1].RawChunk.Length
return int(missingChunks[i].RawChunk.Offset - prev)
}
// this implementation doesn't account for duplicates, so it could merge
// more than necessary to reach the specified target. Since target itself
// is a heuristic value, it doesn't matter.
var gaps []int
for i := 1; i < len(missingChunks); i++ {
gaps = append(gaps, getGap(missingChunks, i))
}
sort.Ints(gaps)
toShrink := len(missingChunks) - target
targetValue := gaps[toShrink-1]
newMissingChunks := missingChunks[0:1]
for i := 1; i < len(missingChunks); i++ {
gap := getGap(missingChunks, i)
if gap > targetValue {
newMissingChunks = append(newMissingChunks, missingChunks[i])
} else {
prev := &newMissingChunks[len(newMissingChunks)-1]
gapFile := missingFile{
Gap: int64(gap),
}
prev.RawChunk.Length += uint64(gap) + missingChunks[i].RawChunk.Length
prev.Files = append(append(prev.Files, gapFile), missingChunks[i].Files...)
}
}
return newMissingChunks
}
func retrieveMissingFiles(input *chunkedZstdDiffer, dest string, dirfd int, missingChunks []missingChunk, missingDirsMode os.FileMode, options *archive.TarOptions) error {
var chunksToRequest []ImageSourceChunk
for _, c := range missingChunks {
chunksToRequest = append(chunksToRequest, c.RawChunk)
}
// There are some missing files. Prepare a multirange request for the missing chunks.
var streams chan io.ReadCloser
var err error
var errs chan error
for {
streams, errs, err = input.stream.GetBlobAt(chunksToRequest)
if err == nil {
break
}
if _, ok := err.(ErrBadRequest); ok {
requested := len(missingChunks)
// If the server cannot handle at least 64 chunks in a single request, just give up.
if requested < 64 {
return err
}
// Merge more chunks to request
missingChunks = mergeMissingChunks(missingChunks, requested/2)
continue
}
return err
}
if err := storeMissingFiles(streams, errs, dest, dirfd, missingChunks, missingDirsMode, options); err != nil {
return err
}
return nil
}
func safeMkdir(target string, dirfd int, mode os.FileMode, metadata *internal.ZstdFileMetadata, options *archive.TarOptions) error {
parent := filepath.Dir(metadata.Name)
base := filepath.Base(metadata.Name)
parentFd := dirfd
if parent != "." {
parentFile, err := openFileUnderRoot(parent, target, dirfd, unix.O_DIRECTORY|unix.O_PATH|unix.O_RDONLY, 0)
if err != nil {
return err
}
defer parentFile.Close()
parentFd = int(parentFile.Fd())
}
if err := unix.Mkdirat(parentFd, base, uint32(mode)); err != nil {
if !os.IsExist(err) {
return err
}
}
file, err := openFileUnderRoot(metadata.Name, target, dirfd, unix.O_RDONLY, 0)
if err != nil {
return err
}
defer file.Close()
return setFileAttrs(file, mode, metadata, options)
}
func safeLink(target string, dirfd int, mode os.FileMode, metadata *internal.ZstdFileMetadata, options *archive.TarOptions) error {
sourceFile, err := openFileUnderRoot(metadata.Linkname, target, dirfd, unix.O_RDONLY, 0)
if err != nil {
return err
}
defer sourceFile.Close()
destDir, destBase := filepath.Dir(metadata.Name), filepath.Base(metadata.Name)
destDirFd := dirfd
if destDir != "." {
f, err := openFileUnderRoot(destDir, target, dirfd, unix.O_RDONLY, 0)
if err != nil {
return err
}
defer f.Close()
destDirFd = int(f.Fd())
}
err = unix.Linkat(int(sourceFile.Fd()), "", destDirFd, destBase, unix.AT_EMPTY_PATH)
if err != nil {
return err
}
newFile, err := openFileUnderRoot(metadata.Name, target, dirfd, unix.O_WRONLY, 0)
if err != nil {
return err
}
defer newFile.Close()
return setFileAttrs(newFile, mode, metadata, options)
}
func safeSymlink(target string, dirfd int, mode os.FileMode, metadata *internal.ZstdFileMetadata, options *archive.TarOptions) error {
destDir, destBase := filepath.Dir(metadata.Name), filepath.Base(metadata.Name)
destDirFd := dirfd
if destDir != "." {
f, err := openFileUnderRoot(destDir, target, dirfd, unix.O_RDONLY, 0)
if err != nil {
return err
}
defer f.Close()
destDirFd = int(f.Fd())
}
return unix.Symlinkat(metadata.Linkname, destDirFd, destBase)
}
type whiteoutHandler struct {
Dirfd int
Root string
}
func (d whiteoutHandler) Setxattr(path, name string, value []byte) error {
file, err := openFileUnderRoot(path, d.Root, d.Dirfd, unix.O_RDONLY, 0)
if err != nil {
return err
}
defer file.Close()
return unix.Fsetxattr(int(file.Fd()), name, value, 0)
}
func (d whiteoutHandler) Mknod(path string, mode uint32, dev int) error {
dir := filepath.Dir(path)
base := filepath.Base(path)
dirfd := d.Dirfd
if dir != "" {
dir, err := openFileUnderRoot(dir, d.Root, d.Dirfd, unix.O_RDONLY, 0)
if err != nil {
return err
}
defer dir.Close()
dirfd = int(dir.Fd())
}
return unix.Mknodat(dirfd, base, mode, dev)
}
func checkChownErr(err error, name string, uid, gid int) error {
if errors.Is(err, syscall.EINVAL) {
return errors.Wrapf(err, "potentially insufficient UIDs or GIDs available in user namespace (requested %d:%d for %s): Check /etc/subuid and /etc/subgid", uid, gid, name)
}
return err
}
func (d whiteoutHandler) Chown(path string, uid, gid int) error {
file, err := openFileUnderRoot(path, d.Root, d.Dirfd, unix.O_PATH, 0)
if err != nil {
return err
}
defer file.Close()
if err := unix.Fchownat(int(file.Fd()), "", uid, gid, unix.AT_EMPTY_PATH); err != nil {
var stat unix.Stat_t
if unix.Fstat(int(file.Fd()), &stat) == nil {
if stat.Uid == uint32(uid) && stat.Gid == uint32(gid) {
return nil
}
}
return checkChownErr(err, path, uid, gid)
}
return nil
}
type hardLinkToCreate struct {
dest string
dirfd int
mode os.FileMode
metadata *internal.ZstdFileMetadata
}
func (d *chunkedZstdDiffer) ApplyDiff(dest string, options *archive.TarOptions) (graphdriver.DriverWithDifferOutput, error) {
bigData := map[string][]byte{
bigDataKey: d.manifest,
}
output := graphdriver.DriverWithDifferOutput{
Differ: d,
BigData: bigData,
}
storeOpts, err := types.DefaultStoreOptionsAutoDetectUID()
if err != nil {
return output, err
}
enableHostDedup := false
if value := storeOpts.PullOptions["enable_host_deduplication"]; strings.ToLower(value) == "true" {
enableHostDedup = true
}
// Generate the manifest
var toc internal.ZstdTOC
if err := json.Unmarshal(d.manifest, &toc); err != nil {
return output, err
}
whiteoutConverter := archive.GetWhiteoutConverter(options.WhiteoutFormat, options.WhiteoutData)
var missingChunks []missingChunk
var mergedEntries []internal.ZstdFileMetadata
if err := maybeDoIDRemap(toc.Entries, options); err != nil {
return output, err
}
for _, e := range toc.Entries {
if e.Type == TypeChunk {
l := len(mergedEntries)
if l == 0 || mergedEntries[l-1].Type != TypeReg {
return output, errors.New("chunk type without a regular file")
}
mergedEntries[l-1].EndOffset = e.EndOffset
continue
}
mergedEntries = append(mergedEntries, e)
}
if options.ForceMask != nil {
uid, gid, mode, err := archive.GetFileOwner(dest)
if err == nil {
value := fmt.Sprintf("%d:%d:0%o", uid, gid, mode)
if err := unix.Setxattr(dest, containersOverrideXattr, []byte(value), 0); err != nil {
return output, err
}
}
}
dirfd, err := unix.Open(dest, unix.O_RDONLY|unix.O_PATH, 0)
if err != nil {
return output, err
}
defer unix.Close(dirfd)
otherLayersCache := prepareOtherLayersCache(d.layersMetadata)
missingDirsMode := os.FileMode(0700)
if options.ForceMask != nil {
missingDirsMode = *options.ForceMask
}
// hardlinks can point to missing files. So create them after all files
// are retrieved
var hardLinks []hardLinkToCreate
missingChunksSize, totalChunksSize := int64(0), int64(0)
for i, r := range mergedEntries {
if options.ForceMask != nil {
value := fmt.Sprintf("%d:%d:0%o", r.UID, r.GID, r.Mode&07777)
r.Xattrs[containersOverrideXattr] = base64.StdEncoding.EncodeToString([]byte(value))
r.Mode = int64(*options.ForceMask)
}
mode := os.FileMode(r.Mode)
r.Name = filepath.Clean(r.Name)
r.Linkname = filepath.Clean(r.Linkname)
t, err := typeToTarType(r.Type)
if err != nil {
return output, err
}
if whiteoutConverter != nil {
hdr := archivetar.Header{
Typeflag: t,
Name: r.Name,
Linkname: r.Linkname,
Size: r.Size,
Mode: r.Mode,
Uid: r.UID,
Gid: r.GID,
}
handler := whiteoutHandler{
Dirfd: dirfd,
Root: dest,
}
writeFile, err := whiteoutConverter.ConvertReadWithHandler(&hdr, r.Name, &handler)
if err != nil {
return output, err
}
if !writeFile {
continue
}
}
switch t {
case tar.TypeReg:
// Create directly empty files.
if r.Size == 0 {
// Used to have a scope for cleanup.
createEmptyFile := func() error {
file, err := openFileUnderRoot(r.Name, dest, dirfd, newFileFlags, 0)
if err != nil {
return err
}
defer file.Close()
if err := setFileAttrs(file, mode, &r, options); err != nil {
return err
}
return nil
}
if err := createEmptyFile(); err != nil {
return output, err
}
continue
}
case tar.TypeDir:
if err := safeMkdir(dest, dirfd, mode, &r, options); err != nil {
return output, err
}
continue
case tar.TypeLink:
dest := dest
dirfd := dirfd
mode := mode
r := r
hardLinks = append(hardLinks, hardLinkToCreate{
dest: dest,
dirfd: dirfd,
mode: mode,
metadata: &r,
})
continue
case tar.TypeSymlink:
if err := safeSymlink(dest, dirfd, mode, &r, options); err != nil {
return output, err
}
continue
case tar.TypeChar:
case tar.TypeBlock:
case tar.TypeFifo:
/* Ignore. */
default:
return output, fmt.Errorf("invalid type %q", t)
}
totalChunksSize += r.Size
dstFile, _, err := findFileInOtherLayers(r, dest, dirfd, otherLayersCache, d.layersTarget, missingDirsMode)
if err != nil {
return output, err
}
if dstFile != nil {
if err := setFileAttrs(dstFile, mode, &r, options); err != nil {
dstFile.Close()
return output, err
}
dstFile.Close()
continue
}
if enableHostDedup {
dstFile, _, err = findFileOnTheHost(r, dest, dirfd, missingDirsMode)
if err != nil {
return output, err
}
if dstFile != nil {
if err := setFileAttrs(dstFile, mode, &r, options); err != nil {
dstFile.Close()
return output, err
}
dstFile.Close()
continue
}
}
missingChunksSize += r.Size
if t == tar.TypeReg {
rawChunk := ImageSourceChunk{
Offset: uint64(r.Offset),
Length: uint64(r.EndOffset - r.Offset),
}
file := missingFile{
File: &toc.Entries[i],
}
missingChunks = append(missingChunks, missingChunk{
RawChunk: rawChunk,
Files: []missingFile{
file,
},
})
}
}
// There are some missing files. Prepare a multirange request for the missing chunks.
if len(missingChunks) > 0 {
missingChunks = mergeMissingChunks(missingChunks, maxNumberMissingChunks)
if err := retrieveMissingFiles(d, dest, dirfd, missingChunks, missingDirsMode, options); err != nil {
return output, err
}
}
for _, m := range hardLinks {
if err := safeLink(m.dest, m.dirfd, m.mode, m.metadata, options); err != nil {
return output, err
}
}
if totalChunksSize > 0 {
logrus.Debugf("Missing %d bytes out of %d (%.2f %%)", missingChunksSize, totalChunksSize, float32(missingChunksSize*100.0)/float32(totalChunksSize))
}
return output, nil
}

View File

@@ -0,0 +1,16 @@
// +build !linux
package chunked
import (
"context"
storage "github.com/containers/storage"
graphdriver "github.com/containers/storage/drivers"
"github.com/pkg/errors"
)
// GetDiffer returns a differ than can be used with ApplyDiffWithDiffer.
func GetDiffer(ctx context.Context, store storage.Store, blobSize int64, annotations map[string]string, iss ImageSourceSeekable) (graphdriver.Differ, error) {
return nil, errors.New("format not supported on this architecture")
}