mirror of
https://github.com/distribution/distribution.git
synced 2026-05-01 12:55:30 +00:00
fix(registry/proxy): do not re-use http request context for background writes
Signed-off-by: Chandon Pierre <cpierre@coreweave.com>
This commit is contained in:
committed by
Chandon Pierre
parent
5d3a61a3e6
commit
e0bac48375
@@ -692,6 +692,12 @@ type Proxy struct {
|
||||
// if not set, defaults to 7 * 24 hours
|
||||
// If set to zero, will never expire cache
|
||||
TTL *time.Duration `yaml:"ttl,omitempty"`
|
||||
|
||||
// CacheWriteTimeout is the maximum duration allowed for cache write operations
|
||||
// to complete when pulling blobs from the remote registry. This timeout ensures
|
||||
// that cache writes don't hang indefinitely if the storage backend is slow.
|
||||
// If not set, defaults to 5 minutes.
|
||||
CacheWriteTimeout *time.Duration `yaml:"cachewritetimeout,omitempty"`
|
||||
}
|
||||
|
||||
// ExecConfig defines the configuration for executing a command as a credential helper.
|
||||
|
||||
@@ -123,7 +123,7 @@ func (hbu *httpBlobUpload) StartedAt() time.Time {
|
||||
|
||||
func (hbu *httpBlobUpload) Commit(ctx context.Context, desc v1.Descriptor) (v1.Descriptor, error) {
|
||||
// TODO(dmcgowan): Check if already finished, if so just fetch
|
||||
req, err := http.NewRequestWithContext(hbu.ctx, http.MethodPut, hbu.location, nil)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, hbu.location, nil)
|
||||
if err != nil {
|
||||
return v1.Descriptor{}, err
|
||||
}
|
||||
@@ -146,7 +146,7 @@ func (hbu *httpBlobUpload) Commit(ctx context.Context, desc v1.Descriptor) (v1.D
|
||||
}
|
||||
|
||||
func (hbu *httpBlobUpload) Cancel(ctx context.Context) error {
|
||||
req, err := http.NewRequestWithContext(hbu.ctx, http.MethodDelete, hbu.location, nil)
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodDelete, hbu.location, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -167,7 +167,9 @@ func (bsl *blobServiceListener) Open(ctx context.Context, dgst digest.Digest) (i
|
||||
func (bsl *blobServiceListener) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error {
|
||||
err := bsl.BlobStore.ServeBlob(ctx, w, r, dgst)
|
||||
if err == nil {
|
||||
if desc, err := bsl.Stat(ctx, dgst); err != nil {
|
||||
// Use a detached context for Stat() since the HTTP request context may be canceled
|
||||
// after ServeBlob completes, but we still want to send the notification.
|
||||
if desc, err := bsl.Stat(context.WithoutCancel(ctx), dgst); err != nil {
|
||||
dcontext.GetLogger(ctx).Errorf("error resolving descriptor in ServeBlob listener: %v", err)
|
||||
} else {
|
||||
if err := bsl.parent.listener.BlobPulled(bsl.parent.Repository.Named(), desc); err != nil {
|
||||
|
||||
@@ -18,12 +18,13 @@ import (
|
||||
)
|
||||
|
||||
type proxyBlobStore struct {
|
||||
localStore distribution.BlobStore
|
||||
remoteStore distribution.BlobService
|
||||
scheduler *scheduler.TTLExpirationScheduler
|
||||
ttl *time.Duration
|
||||
repositoryName reference.Named
|
||||
authChallenger authChallenger
|
||||
localStore distribution.BlobStore
|
||||
remoteStore distribution.BlobService
|
||||
scheduler *scheduler.TTLExpirationScheduler
|
||||
ttl *time.Duration
|
||||
cacheWriteTimeout time.Duration
|
||||
repositoryName reference.Named
|
||||
authChallenger authChallenger
|
||||
}
|
||||
|
||||
var _ distribution.BlobStore = &proxyBlobStore{}
|
||||
@@ -113,11 +114,28 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
|
||||
mu.Unlock()
|
||||
}()
|
||||
|
||||
bw, err := pbs.localStore.Create(ctx)
|
||||
// Create a detached context for the blob writer that won't be canceled
|
||||
// when the HTTP request context is canceled. This allows the cache write
|
||||
// to complete even if the client disconnects.
|
||||
// Use the configured timeout to prevent hanging operations.
|
||||
writerCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), pbs.cacheWriteTimeout)
|
||||
defer cancel()
|
||||
|
||||
bw, err := pbs.localStore.Create(writerCtx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
committed := false
|
||||
// Ensure the writer is canceled if we return early with an error
|
||||
defer func() {
|
||||
if !committed {
|
||||
if err := bw.Cancel(writerCtx); err != nil {
|
||||
dcontext.GetLogger(ctx).WithError(err).Errorf("Error canceling blob writer")
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Serving client and storing locally over same fetching request.
|
||||
// This can prevent a redundant blob fetching.
|
||||
multiWriter := io.MultiWriter(w, bw)
|
||||
@@ -126,11 +144,13 @@ func (pbs *proxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter,
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = bw.Commit(ctx, desc)
|
||||
_, err = bw.Commit(writerCtx, desc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
committed = true
|
||||
|
||||
blobRef, err := reference.WithDigest(pbs.repositoryName, dgst)
|
||||
if err != nil {
|
||||
dcontext.GetLogger(ctx).Errorf("Error creating reference: %s", err)
|
||||
|
||||
@@ -26,12 +26,13 @@ var repositoryTTL = 24 * 7 * time.Hour
|
||||
|
||||
// proxyingRegistry fetches content from a remote registry and caches it locally
|
||||
type proxyingRegistry struct {
|
||||
embedded distribution.Namespace // provides local registry functionality
|
||||
scheduler *scheduler.TTLExpirationScheduler
|
||||
ttl *time.Duration
|
||||
remoteURL url.URL
|
||||
authChallenger authChallenger
|
||||
basicAuth auth.CredentialStore
|
||||
embedded distribution.Namespace // provides local registry functionality
|
||||
scheduler *scheduler.TTLExpirationScheduler
|
||||
ttl *time.Duration
|
||||
cacheWriteTimeout time.Duration
|
||||
remoteURL url.URL
|
||||
authChallenger authChallenger
|
||||
basicAuth auth.CredentialStore
|
||||
}
|
||||
|
||||
// NewRegistryPullThroughCache creates a registry acting as a pull through cache
|
||||
@@ -55,6 +56,12 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name
|
||||
ttl = nil
|
||||
}
|
||||
|
||||
// Set default cache write timeout if not specified
|
||||
cacheWriteTimeout := 5 * time.Minute
|
||||
if config.CacheWriteTimeout != nil && *config.CacheWriteTimeout > 0 {
|
||||
cacheWriteTimeout = *config.CacheWriteTimeout
|
||||
}
|
||||
|
||||
if ttl != nil {
|
||||
s = scheduler.New(ctx, driver, "/scheduler-state.json")
|
||||
s.OnBlobExpire(func(ref reference.Reference) error {
|
||||
@@ -128,10 +135,11 @@ func NewRegistryPullThroughCache(ctx context.Context, registry distribution.Name
|
||||
}
|
||||
|
||||
return &proxyingRegistry{
|
||||
embedded: registry,
|
||||
scheduler: s,
|
||||
ttl: ttl,
|
||||
remoteURL: *remoteURL,
|
||||
embedded: registry,
|
||||
scheduler: s,
|
||||
ttl: ttl,
|
||||
cacheWriteTimeout: cacheWriteTimeout,
|
||||
remoteURL: *remoteURL,
|
||||
authChallenger: &remoteAuthChallenger{
|
||||
remoteURL: *remoteURL,
|
||||
cm: challenge.NewSimpleManager(),
|
||||
@@ -190,12 +198,13 @@ func (pr *proxyingRegistry) Repository(ctx context.Context, name reference.Named
|
||||
|
||||
return &proxiedRepository{
|
||||
blobStore: &proxyBlobStore{
|
||||
localStore: localRepo.Blobs(ctx),
|
||||
remoteStore: remoteRepo.Blobs(ctx),
|
||||
scheduler: pr.scheduler,
|
||||
ttl: pr.ttl,
|
||||
repositoryName: name,
|
||||
authChallenger: pr.authChallenger,
|
||||
localStore: localRepo.Blobs(ctx),
|
||||
remoteStore: remoteRepo.Blobs(ctx),
|
||||
scheduler: pr.scheduler,
|
||||
ttl: pr.ttl,
|
||||
cacheWriteTimeout: pr.cacheWriteTimeout,
|
||||
repositoryName: name,
|
||||
authChallenger: pr.authChallenger,
|
||||
},
|
||||
manifests: &proxyManifestStore{
|
||||
repositoryName: name,
|
||||
|
||||
Reference in New Issue
Block a user