diff --git a/configuration/configuration.go b/configuration/configuration.go index 5d6456ea5..bb79d94e5 100644 --- a/configuration/configuration.go +++ b/configuration/configuration.go @@ -692,6 +692,12 @@ type Proxy struct { // if not set, defaults to 7 * 24 hours // If set to zero, will never expire cache TTL *time.Duration `yaml:"ttl,omitempty"` + + // CacheWriteTimeout is the maximum duration allowed for cache write operations + // to complete when pulling blobs from the remote registry. This timeout ensures + // that cache writes don't hang indefinitely if the storage backend is slow. + // If not set, defaults to 5 minutes. + CacheWriteTimeout *time.Duration `yaml:"cachewritetimeout,omitempty"` } // ExecConfig defines the configuration for executing a command as a credential helper. diff --git a/internal/client/blob_writer.go b/internal/client/blob_writer.go index a4a9cdec5..ce4425582 100644 --- a/internal/client/blob_writer.go +++ b/internal/client/blob_writer.go @@ -123,7 +123,7 @@ func (hbu *httpBlobUpload) StartedAt() time.Time { func (hbu *httpBlobUpload) Commit(ctx context.Context, desc v1.Descriptor) (v1.Descriptor, error) { // TODO(dmcgowan): Check if already finished, if so just fetch - req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPut, hbu.location, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodPut, hbu.location, nil) if err != nil { return v1.Descriptor{}, err } @@ -146,7 +146,7 @@ func (hbu *httpBlobUpload) Commit(ctx context.Context, desc v1.Descriptor) (v1.D } func (hbu *httpBlobUpload) Cancel(ctx context.Context) error { - req, err := http.NewRequestWithContext(hbu.ctx, http.MethodDelete, hbu.location, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, hbu.location, nil) if err != nil { return err } diff --git a/notifications/listener.go b/notifications/listener.go index 8a36067ef..31ae0bdff 100644 --- a/notifications/listener.go +++ b/notifications/listener.go @@ -167,7 +167,9 @@ func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (i func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { err := bsl.BlobStore.ServeBlob(ctx, w, r, dgst) if err == nil { - if desc, err := bsl.Stat(ctx, dgst); err != nil { + // Use a detached context for Stat() since the HTTP request context may be canceled + // after ServeBlob completes, but we still want to send the notification. + if desc, err := bsl.Stat(context.WithoutCancel(ctx), dgst); err != nil { dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err) } else { if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil { diff --git a/registry/proxy/proxyblobstore.go b/registry/proxy/proxyblobstore.go index 7726f4280..5fca10456 100644 --- a/registry/proxy/proxyblobstore.go +++ b/registry/proxy/proxyblobstore.go @@ -18,12 +18,13 @@ import ( ) type proxyBlobStore struct { - localStore distribution.BlobStore - remoteStore distribution.BlobService - scheduler *scheduler.TTLExpirationScheduler - ttl *time.Duration - repositoryName reference.Named - authChallenger authChallenger + localStore distribution.BlobStore + remoteStore distribution.BlobService + scheduler *scheduler.TTLExpirationScheduler + ttl *time.Duration + cacheWriteTimeout time.Duration + repositoryName reference.Named + authChallenger authChallenger } var _ distribution.BlobStore = &proxyBlobStore{} @@ -113,11 +114,28 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, mu.Unlock() }() - bw, err := pbs.localStore.Create(ctx) + // Create a detached context for the blob writer that won't be canceled + // when the HTTP request context is canceled. This allows the cache write + // to complete even if the client disconnects. + // Use the configured timeout to prevent hanging operations. + writerCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), pbs.cacheWriteTimeout) + defer cancel() + + bw, err := pbs.localStore.Create(writerCtx) if err != nil { return err } + committed := false + // Ensure the writer is canceled if we return early with an error + defer func() { + if !committed { + if err := bw.Cancel(writerCtx); err != nil { + dcontext.GetLogger(ctx).WithError(err).Errorf("Error canceling blob writer") + } + } + }() + // Serving client and storing locally over same fetching request. // This can prevent a redundant blob fetching. multiWriter := io.MultiWriter(w, bw) @@ -126,11 +144,13 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, return err } - _, err = bw.Commit(ctx, desc) + _, err = bw.Commit(writerCtx, desc) if err != nil { return err } + committed = true + blobRef, err := reference.WithDigest(pbs.repositoryName, dgst) if err != nil { dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err) diff --git a/registry/proxy/proxyregistry.go b/registry/proxy/proxyregistry.go index b417ee38f..29650c394 100644 --- a/registry/proxy/proxyregistry.go +++ b/registry/proxy/proxyregistry.go @@ -26,12 +26,13 @@ var repositoryTTL = 24 * 7 * time.Hour // proxyingRegistry fetches content from a remote registry and caches it locally type proxyingRegistry struct { - embedded distribution.Namespace // provides local registry functionality - scheduler *scheduler.TTLExpirationScheduler - ttl *time.Duration - remoteURL url.URL - authChallenger authChallenger - basicAuth auth.CredentialStore + embedded distribution.Namespace // provides local registry functionality + scheduler *scheduler.TTLExpirationScheduler + ttl *time.Duration + cacheWriteTimeout time.Duration + remoteURL url.URL + authChallenger authChallenger + basicAuth auth.CredentialStore } // NewRegistryPullThroughCache creates a registry acting as a pull through cache @@ -55,6 +56,12 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name ttl = nil } + // Set default cache write timeout if not specified + cacheWriteTimeout := 5 * time.Minute + if config.CacheWriteTimeout != nil && *config.CacheWriteTimeout > 0 { + cacheWriteTimeout = *config.CacheWriteTimeout + } + if ttl != nil { s = scheduler.New(ctx, driver, "/scheduler-state.json") s.OnBlobExpire(func(ref reference.Reference) error { @@ -128,10 +135,11 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name } return &proxyingRegistry{ - embedded: registry, - scheduler: s, - ttl: ttl, - remoteURL: *remoteURL, + embedded: registry, + scheduler: s, + ttl: ttl, + cacheWriteTimeout: cacheWriteTimeout, + remoteURL: *remoteURL, authChallenger: &remoteAuthChallenger{ remoteURL: *remoteURL, cm: challenge.NewSimpleManager(), @@ -190,12 +198,13 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named return &proxiedRepository{ blobStore: &proxyBlobStore{ - localStore: localRepo.Blobs(ctx), - remoteStore: remoteRepo.Blobs(ctx), - scheduler: pr.scheduler, - ttl: pr.ttl, - repositoryName: name, - authChallenger: pr.authChallenger, + localStore: localRepo.Blobs(ctx), + remoteStore: remoteRepo.Blobs(ctx), + scheduler: pr.scheduler, + ttl: pr.ttl, + cacheWriteTimeout: pr.cacheWriteTimeout, + repositoryName: name, + authChallenger: pr.authChallenger, }, manifests: &proxyManifestStore{ repositoryName: name,