🔧 Update modules

This commit is contained in:
Ettore Di Giacinto
2021-12-28 18:56:13 +01:00
parent 196cdc5cfc
commit 96aaf5235b
355 changed files with 10547 additions and 4907 deletions

View File

@@ -91,9 +91,10 @@ func Catalog(ctx context.Context, target name.Registry, options ...Option) ([]st
Scheme: target.Scheme(),
Host: target.RegistryStr(),
Path: "/v2/_catalog",
// ECR returns an error if n > 1000:
// https://github.com/google/go-containerregistry/issues/1091
RawQuery: "n=1000",
}
if o.pageSize > 0 {
uri.RawQuery = fmt.Sprintf("n=%d", o.pageSize)
}
client := http.Client{Transport: tr}

View File

@@ -20,13 +20,13 @@ import (
func CheckPushPermission(ref name.Reference, kc authn.Keychain, t http.RoundTripper) error {
auth, err := kc.Resolve(ref.Context().Registry)
if err != nil {
return fmt.Errorf("resolving authorization for %v failed: %v", ref.Context().Registry, err)
return fmt.Errorf("resolving authorization for %v failed: %w", ref.Context().Registry, err)
}
scopes := []string{ref.Scope(transport.PushScope)}
tr, err := transport.New(ref.Context().Registry, auth, t, scopes)
if err != nil {
return fmt.Errorf("creating push check transport for %v failed: %v", ref.Context().Registry, err)
return fmt.Errorf("creating push check transport for %v failed: %w", ref.Context().Registry, err)
}
// TODO(jasonhall): Against GCR, just doing the token handshake is
// enough, but this doesn't extend to Dockerhub

View File

@@ -147,6 +147,40 @@ func (r *remoteIndex) Layer(h v1.Hash) (v1.Layer, error) {
return nil, fmt.Errorf("layer not found: %s", h)
}
// Experiment with a better API for v1.ImageIndex. We might want to move this
// to partial?
func (r *remoteIndex) Manifests() ([]partial.Describable, error) {
m, err := r.IndexManifest()
if err != nil {
return nil, err
}
manifests := []partial.Describable{}
for _, desc := range m.Manifests {
switch {
case desc.MediaType.IsImage():
img, err := r.Image(desc.Digest)
if err != nil {
return nil, err
}
manifests = append(manifests, img)
case desc.MediaType.IsIndex():
idx, err := r.ImageIndex(desc.Digest)
if err != nil {
return nil, err
}
manifests = append(manifests, idx)
default:
layer, err := r.Layer(desc.Digest)
if err != nil {
return nil, err
}
manifests = append(manifests, layer)
}
}
return manifests, nil
}
func (r *remoteIndex) imageByPlatform(platform v1.Platform) (v1.Image, error) {
desc, err := r.childByPlatform(platform)
if err != nil {
@@ -180,7 +214,7 @@ func (r *remoteIndex) childByPlatform(platform v1.Platform) (*Descriptor, error)
return r.childDescriptor(childDesc, platform)
}
}
return nil, fmt.Errorf("no child with platform %s/%s in index %s", platform.OS, platform.Architecture, r.Ref)
return nil, fmt.Errorf("no child with platform %+v in index %s", platform, r.Ref)
}
func (r *remoteIndex) childByHash(h v1.Hash) (*Descriptor, error) {

View File

@@ -55,9 +55,10 @@ func List(repo name.Repository, options ...Option) ([]string, error) {
Scheme: repo.Registry.Scheme(),
Host: repo.Registry.RegistryStr(),
Path: fmt.Sprintf("/v2/%s/tags/list", repo.RepositoryStr()),
// ECR returns an error if n > 1000:
// https://github.com/google/go-containerregistry/issues/681
RawQuery: "n=1000",
}
if o.pageSize > 0 {
uri.RawQuery = fmt.Sprintf("n=%d", o.pageSize)
}
client := http.Client{Transport: tr}

View File

@@ -15,6 +15,7 @@
package remote
import (
"context"
"fmt"
"net/http"
@@ -91,12 +92,14 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
context: o.context,
updates: o.updates,
lastUpdate: &v1.Update{},
backoff: o.retryBackoff,
predicate: o.retryPredicate,
}
// Collect the total size of blobs and manifests we're about to write.
if o.updates != nil {
defer close(o.updates)
defer func() { sendError(o.updates, rerr) }()
defer func() { _ = sendError(o.updates, rerr) }()
for _, b := range blobs {
size, err := b.Size()
if err != nil {
@@ -133,12 +136,13 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
// Upload individual blobs and collect any errors.
blobChan := make(chan v1.Layer, 2*o.jobs)
g, ctx := errgroup.WithContext(o.context)
ctx := o.context
g, gctx := errgroup.WithContext(o.context)
for i := 0; i < o.jobs; i++ {
// Start N workers consuming blobs to upload.
g.Go(func() error {
for b := range blobChan {
if err := w.uploadOne(b); err != nil {
if err := w.uploadOne(gctx, b); err != nil {
return err
}
}
@@ -150,8 +154,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
for _, b := range blobs {
select {
case blobChan <- b:
case <-ctx.Done():
return ctx.Err()
case <-gctx.Done():
return gctx.Err()
}
}
return nil
@@ -160,7 +164,8 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
return err
}
commitMany := func(m map[name.Reference]Taggable) error {
commitMany := func(ctx context.Context, m map[name.Reference]Taggable) error {
g, ctx := errgroup.WithContext(ctx)
// With all of the constituent elements uploaded, upload the manifests
// to commit the images and indexes, and collect any errors.
type task struct {
@@ -172,7 +177,7 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
// Start N workers consuming tasks to upload manifests.
g.Go(func() error {
for t := range taskChan {
if err := w.commitManifest(t.i, t.ref); err != nil {
if err := w.commitManifest(ctx, t.i, t.ref); err != nil {
return err
}
}
@@ -189,19 +194,19 @@ func MultiWrite(m map[name.Reference]Taggable, options ...Option) (rerr error) {
}
// Push originally requested image manifests. These have no
// dependencies.
if err := commitMany(images); err != nil {
if err := commitMany(ctx, images); err != nil {
return err
}
// Push new manifests from lowest levels up.
for i := len(newManifests) - 1; i >= 0; i-- {
if err := commitMany(newManifests[i]); err != nil {
if err := commitMany(ctx, newManifests[i]); err != nil {
return err
}
}
// Push originally requested index manifests, which might depend on
// newly discovered manifests.
return commitMany(indexes)
return commitMany(ctx, indexes)
}
// addIndexBlobs adds blobs to the set of blobs we intend to upload, and

View File

@@ -17,8 +17,13 @@ package remote
import (
"context"
"errors"
"io"
"net"
"net/http"
"syscall"
"time"
"github.com/google/go-containerregistry/internal/retry"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/google/go-containerregistry/pkg/logs"
v1 "github.com/google/go-containerregistry/pkg/v1"
@@ -38,6 +43,9 @@ type options struct {
userAgent string
allowNondistributableArtifacts bool
updates chan<- v1.Update
pageSize int
retryBackoff Backoff
retryPredicate retry.Predicate
}
var defaultPlatform = v1.Platform{
@@ -45,15 +53,63 @@ var defaultPlatform = v1.Platform{
OS: "linux",
}
const defaultJobs = 4
// Backoff is an alias of retry.Backoff to expose this configuration option to consumers of this lib
type Backoff = retry.Backoff
var defaultRetryPredicate retry.Predicate = func(err error) bool {
// Various failure modes here, as we're often reading from and writing to
// the network.
if retry.IsTemporary(err) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) {
logs.Warn.Printf("retrying %v", err)
return true
}
return false
}
// Try this three times, waiting 1s after first failure, 3s after second.
var defaultRetryBackoff = Backoff{
Duration: 1.0 * time.Second,
Factor: 3.0,
Jitter: 0.1,
Steps: 3,
}
const (
defaultJobs = 4
// ECR returns an error if n > 1000:
// https://github.com/google/go-containerregistry/issues/1091
defaultPageSize = 1000
)
// DefaultTransport is based on http.DefaultTransport with modifications
// documented inline below.
var DefaultTransport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
// By default we wrap the transport in retries, so reduce the
// default dial timeout to 5s to avoid 5x 30s of connection
// timeouts when doing the "ping" on certain http registries.
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
ForceAttemptHTTP2: true,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
func makeOptions(target authn.Resource, opts ...Option) (*options, error) {
o := &options{
auth: authn.Anonymous,
transport: http.DefaultTransport,
platform: defaultPlatform,
context: context.Background(),
jobs: defaultJobs,
auth: authn.Anonymous,
transport: DefaultTransport,
platform: defaultPlatform,
context: context.Background(),
jobs: defaultJobs,
pageSize: defaultPageSize,
retryPredicate: defaultRetryPredicate,
retryBackoff: defaultRetryBackoff,
}
for _, option := range opts {
@@ -70,19 +126,23 @@ func makeOptions(target authn.Resource, opts ...Option) (*options, error) {
o.auth = auth
}
// Wrap the transport in something that logs requests and responses.
// It's expensive to generate the dumps, so skip it if we're writing
// to nothing.
if logs.Enabled(logs.Debug) {
o.transport = transport.NewLogger(o.transport)
}
// transport.Wrapper is a signal that consumers are opt-ing into providing their own transport without any additional wrapping.
// This is to allow consumers full control over the transports logic, such as providing retry logic.
if _, ok := o.transport.(*transport.Wrapper); !ok {
// Wrap the transport in something that logs requests and responses.
// It's expensive to generate the dumps, so skip it if we're writing
// to nothing.
if logs.Enabled(logs.Debug) {
o.transport = transport.NewLogger(o.transport)
}
// Wrap the transport in something that can retry network flakes.
o.transport = transport.NewRetry(o.transport)
// Wrap the transport in something that can retry network flakes.
o.transport = transport.NewRetry(o.transport)
// Wrap this last to prevent transport.New from double-wrapping.
if o.userAgent != "" {
o.transport = transport.NewUserAgent(o.transport, o.userAgent)
// Wrap this last to prevent transport.New from double-wrapping.
if o.userAgent != "" {
o.transport = transport.NewUserAgent(o.transport, o.userAgent)
}
}
return o, nil
@@ -90,8 +150,10 @@ func makeOptions(target authn.Resource, opts ...Option) (*options, error) {
// WithTransport is a functional option for overriding the default transport
// for remote operations.
// If transport.Wrapper is provided, this signals that the consumer does *not* want any further wrapping to occur.
// i.e. logging, retry and useragent
//
// The default transport its http.DefaultTransport.
// The default transport is DefaultTransport.
func WithTransport(t http.RoundTripper) Option {
return func(o *options) error {
o.transport = t
@@ -193,3 +255,30 @@ func WithProgress(updates chan<- v1.Update) Option {
return nil
}
}
// WithPageSize sets the given size as the value of parameter 'n' in the request.
//
// To omit the `n` parameter entirely, use WithPageSize(0).
// The default value is 1000.
func WithPageSize(size int) Option {
return func(o *options) error {
o.pageSize = size
return nil
}
}
// WithRetryBackoff sets the httpBackoff for retry HTTP operations.
func WithRetryBackoff(backoff Backoff) Option {
return func(o *options) error {
o.retryBackoff = backoff
return nil
}
}
// WithRetryPredicate sets the predicate for retry HTTP operations.
func WithRetryPredicate(predicate retry.Predicate) Option {
return func(o *options) error {
o.retryPredicate = predicate
return nil
}
}

View File

@@ -17,6 +17,7 @@ package transport
import (
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
@@ -139,7 +140,8 @@ func (bt *bearerTransport) refresh(ctx context.Context) error {
// the Username should be set to <token>, which indicates
// we are using an oauth flow.
content, err = bt.refreshOauth(ctx)
if terr, ok := err.(*Error); ok && terr.StatusCode == http.StatusNotFound {
var terr *Error
if errors.As(err, &terr) && terr.StatusCode == http.StatusNotFound {
// Note: Not all token servers implement oauth2.
// If the request to the endpoint returns 404 using the HTTP POST method,
// refer to Token Documentation for using the HTTP GET method supported by all token servers.

View File

@@ -46,10 +46,10 @@ type Error struct {
Errors []Diagnostic `json:"errors,omitempty"`
// The http status code returned.
StatusCode int
// The request that failed.
Request *http.Request
// The raw body if we couldn't understand it.
rawBody string
// The request that failed.
request *http.Request
}
// Check that Error implements error
@@ -58,8 +58,8 @@ var _ error = (*Error)(nil)
// Error implements error
func (e *Error) Error() string {
prefix := ""
if e.request != nil {
prefix = fmt.Sprintf("%s %s: ", e.request.Method, redactURL(e.request.URL))
if e.Request != nil {
prefix = fmt.Sprintf("%s %s: ", e.Request.Method, redactURL(e.Request.URL))
}
return prefix + e.responseErr()
}
@@ -68,7 +68,7 @@ func (e *Error) responseErr() string {
switch len(e.Errors) {
case 0:
if len(e.rawBody) == 0 {
if e.request != nil && e.request.Method == http.MethodHead {
if e.Request != nil && e.Request.Method == http.MethodHead {
return fmt.Sprintf("unexpected status code %d %s (HEAD responses have no body, use GET for details)", e.StatusCode, http.StatusText(e.StatusCode))
}
return fmt.Sprintf("unexpected status code %d %s", e.StatusCode, http.StatusText(e.StatusCode))
@@ -154,12 +154,14 @@ const (
DeniedErrorCode ErrorCode = "DENIED"
UnsupportedErrorCode ErrorCode = "UNSUPPORTED"
TooManyRequestsErrorCode ErrorCode = "TOOMANYREQUESTS"
UnknownErrorCode ErrorCode = "UNKNOWN"
)
// TODO: Include other error types.
var temporaryErrorCodes = map[ErrorCode]struct{}{
BlobUploadInvalidErrorCode: {},
TooManyRequestsErrorCode: {},
UnknownErrorCode: {},
}
var temporaryStatusCodes = map[int]struct{}{
@@ -167,6 +169,7 @@ var temporaryStatusCodes = map[int]struct{}{
http.StatusInternalServerError: {},
http.StatusBadGateway: {},
http.StatusServiceUnavailable: {},
http.StatusGatewayTimeout: {},
}
// CheckError returns a structured error if the response status is not in codes.
@@ -191,7 +194,7 @@ func CheckError(resp *http.Response, codes ...int) error {
structuredError.rawBody = string(b)
structuredError.StatusCode = resp.StatusCode
structuredError.request = resp.Request
structuredError.Request = resp.Request
return structuredError
}

View File

@@ -129,7 +129,6 @@ func ping(ctx context.Context, reg name.Registry, t http.RoundTripper) (*pingRes
}
func pickFromMultipleChallenges(challenges []authchallenge.Challenge) authchallenge.Challenge {
// It might happen there are multiple www-authenticate headers, e.g. `Negotiate` and `Basic`.
// Picking simply the first one could result eventually in `unrecognized challenge` error,
// that's why we're looping through the challenges in search for one that can be handled.

View File

@@ -46,8 +46,11 @@ type options struct {
predicate retry.Predicate
}
// Backoff is an alias of retry.Backoff to expose this configuration option to consumers of this lib
type Backoff = retry.Backoff
// WithRetryBackoff sets the backoff for retry operations.
func WithRetryBackoff(backoff retry.Backoff) Option {
func WithRetryBackoff(backoff Backoff) Option {
return func(o *options) {
o.backoff = backoff
}

View File

@@ -69,9 +69,9 @@ func NewWithContext(ctx context.Context, reg name.Registry, auth authn.Authentic
switch pr.challenge.Canonical() {
case anonymous:
return t, nil
return &Wrapper{t}, nil
case basic:
return &basicTransport{inner: t, auth: auth, target: reg.RegistryStr()}, nil
return &Wrapper{&basicTransport{inner: t, auth: auth, target: reg.RegistryStr()}}, nil
case bearer:
// We require the realm, which tells us where to send our Basic auth to turn it into Bearer auth.
realm, ok := pr.parameters["realm"]
@@ -96,8 +96,19 @@ func NewWithContext(ctx context.Context, reg name.Registry, auth authn.Authentic
if err := bt.refresh(ctx); err != nil {
return nil, err
}
return bt, nil
return &Wrapper{bt}, nil
default:
return nil, fmt.Errorf("unrecognized challenge: %s", pr.challenge)
}
}
// Wrapper results in *not* wrapping supplied transport with additional logic such as retries, useragent and debug logging
// Consumers are opt-ing into providing their own transport without any additional wrapping.
type Wrapper struct {
inner http.RoundTripper
}
// RoundTrip delegates to the inner RoundTripper
func (w *Wrapper) RoundTrip(in *http.Request) (*http.Response, error) {
return w.inner.RoundTrip(in)
}

View File

@@ -24,8 +24,6 @@ import (
"net/url"
"strings"
"sync/atomic"
"syscall"
"time"
"github.com/google/go-containerregistry/internal/redact"
"github.com/google/go-containerregistry/internal/retry"
@@ -59,12 +57,12 @@ func Write(ref name.Reference, img v1.Image, options ...Option) (rerr error) {
return err
}
defer close(o.updates)
defer func() { sendError(o.updates, rerr) }()
defer func() { _ = sendError(o.updates, rerr) }()
}
return writeImage(ref, img, o, lastUpdate)
return writeImage(o.context, ref, img, o, lastUpdate)
}
func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Update) error {
func writeImage(ctx context.Context, ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Update) error {
ls, err := img.Layers()
if err != nil {
return err
@@ -77,19 +75,21 @@ func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Upd
w := writer{
repo: ref.Context(),
client: &http.Client{Transport: tr},
context: o.context,
context: ctx,
updates: o.updates,
lastUpdate: lastUpdate,
backoff: o.retryBackoff,
predicate: o.retryPredicate,
}
// Upload individual blobs and collect any errors.
blobChan := make(chan v1.Layer, 2*o.jobs)
g, ctx := errgroup.WithContext(o.context)
g, gctx := errgroup.WithContext(ctx)
for i := 0; i < o.jobs; i++ {
// Start N workers consuming blobs to upload.
g.Go(func() error {
for b := range blobChan {
if err := w.uploadOne(b); err != nil {
if err := w.uploadOne(gctx, b); err != nil {
return err
}
}
@@ -128,15 +128,12 @@ func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Upd
}
select {
case blobChan <- l:
case <-ctx.Done():
return ctx.Err()
case <-gctx.Done():
return gctx.Err()
}
}
return nil
})
if err := g.Wait(); err != nil {
return err
}
if l, err := partial.ConfigLayer(img); err != nil {
// We can't read the ConfigLayer, possibly because of streaming layers,
@@ -151,13 +148,13 @@ func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Upd
if err != nil {
return err
}
if err := w.uploadOne(l); err != nil {
if err := w.uploadOne(ctx, l); err != nil {
return err
}
} else {
// We *can* read the ConfigLayer, so upload it concurrently with the layers.
g.Go(func() error {
return w.uploadOne(l)
return w.uploadOne(gctx, l)
})
// Wait for the layers + config.
@@ -168,7 +165,7 @@ func writeImage(ref name.Reference, img v1.Image, o *options, lastUpdate *v1.Upd
// With all of the constituent elements uploaded, upload the manifest
// to commit the image.
return w.commitManifest(img, ref)
return w.commitManifest(ctx, img, ref)
}
// writer writes the elements of an image to a remote image reference.
@@ -179,6 +176,8 @@ type writer struct {
updates chan<- v1.Update
lastUpdate *v1.Update
backoff Backoff
predicate retry.Predicate
}
func sendError(ch chan<- v1.Update, err error) error {
@@ -405,30 +404,12 @@ func (w *writer) incrProgress(written int64) {
}
w.updates <- v1.Update{
Total: w.lastUpdate.Total,
Complete: atomic.AddInt64(&w.lastUpdate.Complete, int64(written)),
Complete: atomic.AddInt64(&w.lastUpdate.Complete, written),
}
}
var shouldRetry retry.Predicate = func(err error) bool {
// Various failure modes here, as we're often reading from and writing to
// the network.
if retry.IsTemporary(err) || errors.Is(err, io.ErrUnexpectedEOF) || errors.Is(err, syscall.EPIPE) {
logs.Warn.Printf("retrying %v", err)
return true
}
return false
}
// Try this three times, waiting 1s after first failure, 3s after second.
var backoff = retry.Backoff{
Duration: 1.0 * time.Second,
Factor: 3.0,
Jitter: 0.1,
Steps: 3,
}
// uploadOne performs a complete upload of a single layer.
func (w *writer) uploadOne(l v1.Layer) error {
func (w *writer) uploadOne(ctx context.Context, l v1.Layer) error {
var from, mount string
if h, err := l.Digest(); err == nil {
// If we know the digest, this isn't a streaming layer. Do an existence
@@ -455,8 +436,6 @@ func (w *writer) uploadOne(l v1.Layer) error {
}
}
ctx := w.context
tryUpload := func() error {
location, mounted, err := w.initiateUpload(from, mount)
if err != nil {
@@ -508,14 +487,14 @@ func (w *writer) uploadOne(l v1.Layer) error {
return nil
}
return retry.Retry(tryUpload, shouldRetry, backoff)
return retry.Retry(tryUpload, w.predicate, w.backoff)
}
type withLayer interface {
Layer(v1.Hash) (v1.Layer, error)
}
func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) error {
func (w *writer) writeIndex(ctx context.Context, ref name.Reference, ii v1.ImageIndex, options ...Option) error {
index, err := ii.IndexManifest()
if err != nil {
return err
@@ -544,7 +523,7 @@ func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Opt
if err != nil {
return err
}
if err := w.writeIndex(ref, ii); err != nil {
if err := w.writeIndex(ctx, ref, ii, options...); err != nil {
return err
}
case types.OCIManifestSchema1, types.DockerManifestSchema2:
@@ -552,7 +531,7 @@ func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Opt
if err != nil {
return err
}
if err := writeImage(ref, img, o, w.lastUpdate); err != nil {
if err := writeImage(ctx, ref, img, o, w.lastUpdate); err != nil {
return err
}
default:
@@ -562,7 +541,7 @@ func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Opt
if err != nil {
return err
}
if err := w.uploadOne(layer); err != nil {
if err := w.uploadOne(ctx, layer); err != nil {
return err
}
}
@@ -571,7 +550,7 @@ func (w *writer) writeIndex(ref name.Reference, ii v1.ImageIndex, options ...Opt
// With all of the constituent elements uploaded, upload the manifest
// to commit the image.
return w.commitManifest(ii, ref)
return w.commitManifest(ctx, ii, ref)
}
type withMediaType interface {
@@ -617,7 +596,7 @@ func unpackTaggable(t Taggable) ([]byte, *v1.Descriptor, error) {
}
// commitManifest does a PUT of the image's manifest.
func (w *writer) commitManifest(t Taggable, ref name.Reference) error {
func (w *writer) commitManifest(ctx context.Context, t Taggable, ref name.Reference) error {
tryUpload := func() error {
raw, desc, err := unpackTaggable(t)
if err != nil {
@@ -633,7 +612,7 @@ func (w *writer) commitManifest(t Taggable, ref name.Reference) error {
}
req.Header.Set("Content-Type", string(desc.MediaType))
resp, err := w.client.Do(req.WithContext(w.context))
resp, err := w.client.Do(req.WithContext(ctx))
if err != nil {
return err
}
@@ -649,7 +628,7 @@ func (w *writer) commitManifest(t Taggable, ref name.Reference) error {
return nil
}
return retry.Retry(tryUpload, shouldRetry, backoff)
return retry.Retry(tryUpload, w.predicate, w.backoff)
}
func scopesForUploadingImage(repo name.Repository, layers []v1.Layer) []string {
@@ -692,10 +671,12 @@ func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) (rerr e
return err
}
w := writer{
repo: ref.Context(),
client: &http.Client{Transport: tr},
context: o.context,
updates: o.updates,
repo: ref.Context(),
client: &http.Client{Transport: tr},
context: o.context,
updates: o.updates,
backoff: o.retryBackoff,
predicate: o.retryPredicate,
}
if o.updates != nil {
@@ -708,7 +689,7 @@ func WriteIndex(ref name.Reference, ii v1.ImageIndex, options ...Option) (rerr e
defer func() { sendError(o.updates, rerr) }()
}
return w.writeIndex(ref, ii, options...)
return w.writeIndex(o.context, ref, ii, options...)
}
// countImage counts the total size of all layers + config blob + manifest for
@@ -831,10 +812,12 @@ func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) (rerr e
return err
}
w := writer{
repo: repo,
client: &http.Client{Transport: tr},
context: o.context,
updates: o.updates,
repo: repo,
client: &http.Client{Transport: tr},
context: o.context,
updates: o.updates,
backoff: o.retryBackoff,
predicate: o.retryPredicate,
}
if o.updates != nil {
@@ -851,7 +834,7 @@ func WriteLayer(repo name.Repository, layer v1.Layer, options ...Option) (rerr e
}
w.lastUpdate = &v1.Update{Total: size}
}
return w.uploadOne(layer)
return w.uploadOne(o.context, layer)
}
// Tag adds a tag to the given Taggable via PUT /v2/.../manifests/<tag>
@@ -898,10 +881,12 @@ func Put(ref name.Reference, t Taggable, options ...Option) error {
return err
}
w := writer{
repo: ref.Context(),
client: &http.Client{Transport: tr},
context: o.context,
repo: ref.Context(),
client: &http.Client{Transport: tr},
context: o.context,
backoff: o.retryBackoff,
predicate: o.retryPredicate,
}
return w.commitManifest(t, ref)
return w.commitManifest(o.context, t, ref)
}