mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #26677 from Random-Liu/add-image-pull-timeout
Automatic merge from submit-queue
Add timeout for image pulling
Fix #26300.
With this PR, if image pulling makes no progress for *1 minute*, the operation will be cancelled. Docker reports progress for every 512kB block (See [here](3d13fddd2b/pkg/progress/progressreader.go (L32)
)), *512kB/min* means the throughput is *<= 8.5kB/s*, which should be kind of abnormal?
It's a little hard to write unit test for this, so I just manually tested it. If I set the `defaultImagePullingStuckTimeout` to 0s, and `defaultImagePullingProgressReportInterval` to 1s, image pulling will be cancelled.
```
E0601 18:48:29.026003 46185 kube_docker_client.go:274] Cancel pulling image "nginx:latest" because of no progress for 0, latest progress: "89732b811e7f: Pulling fs layer "
E0601 18:48:29.026308 46185 manager.go:2110] container start failed: ErrImagePull: net/http: request canceled
```
/cc @kubernetes/sig-node
[]()
This commit is contained in:
commit
d93f80c86b
@ -64,6 +64,12 @@ const (
|
|||||||
|
|
||||||
// defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting.
|
// defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting.
|
||||||
defaultImagePullingProgressReportInterval = 10 * time.Second
|
defaultImagePullingProgressReportInterval = 10 * time.Second
|
||||||
|
|
||||||
|
// defaultImagePullingStuckTimeout is the default timeout for image pulling stuck. If no progress
|
||||||
|
// is made for defaultImagePullingStuckTimeout, the image pulling will be cancelled.
|
||||||
|
// Docker reports image progress for every 512kB block, so normally there shouldn't be too long interval
|
||||||
|
// between progress updates.
|
||||||
|
defaultImagePullingStuckTimeout = 1 * time.Minute
|
||||||
)
|
)
|
||||||
|
|
||||||
// newKubeDockerClient creates an kubeDockerClient from an existing docker client.
|
// newKubeDockerClient creates an kubeDockerClient from an existing docker client.
|
||||||
@ -203,55 +209,76 @@ type progress struct {
|
|||||||
sync.RWMutex
|
sync.RWMutex
|
||||||
// message stores the latest docker json message.
|
// message stores the latest docker json message.
|
||||||
message *dockermessage.JSONMessage
|
message *dockermessage.JSONMessage
|
||||||
|
// timestamp of the latest update.
|
||||||
|
timestamp time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func newProgress() *progress {
|
||||||
|
return &progress{timestamp: time.Now()}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *progress) set(msg *dockermessage.JSONMessage) {
|
func (p *progress) set(msg *dockermessage.JSONMessage) {
|
||||||
p.Lock()
|
p.Lock()
|
||||||
defer p.Unlock()
|
defer p.Unlock()
|
||||||
p.message = msg
|
p.message = msg
|
||||||
|
p.timestamp = time.Now()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *progress) get() string {
|
func (p *progress) get() (string, time.Time) {
|
||||||
p.RLock()
|
p.RLock()
|
||||||
defer p.RUnlock()
|
defer p.RUnlock()
|
||||||
if p.message == nil {
|
if p.message == nil {
|
||||||
return "No progress"
|
return "No progress", p.timestamp
|
||||||
}
|
}
|
||||||
|
// The following code is based on JSONMessage.Display
|
||||||
var prefix string
|
var prefix string
|
||||||
if p.message.ID != "" {
|
if p.message.ID != "" {
|
||||||
prefix = fmt.Sprintf("%s: ", p.message.ID)
|
prefix = fmt.Sprintf("%s: ", p.message.ID)
|
||||||
}
|
}
|
||||||
if p.message.Progress == nil {
|
if p.message.Progress == nil {
|
||||||
return fmt.Sprintf("%s%s", prefix, p.message.Status)
|
return fmt.Sprintf("%s%s", prefix, p.message.Status), p.timestamp
|
||||||
}
|
}
|
||||||
return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String())
|
return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String()), p.timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
// progressReporter keeps the newest image pulling progress and periodically report the newest progress.
|
// progressReporter keeps the newest image pulling progress and periodically report the newest progress.
|
||||||
type progressReporter struct {
|
type progressReporter struct {
|
||||||
progress
|
*progress
|
||||||
image string
|
image string
|
||||||
interval time.Duration
|
cancel context.CancelFunc
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// newProgressReporter creates a new progressReporter for specific image with specified reporting interval
|
// newProgressReporter creates a new progressReporter for specific image with specified reporting interval
|
||||||
func newProgressReporter(image string, interval time.Duration) *progressReporter {
|
func newProgressReporter(image string, cancel context.CancelFunc) *progressReporter {
|
||||||
return &progressReporter{image: image, interval: interval, stopCh: make(chan struct{})}
|
return &progressReporter{
|
||||||
|
progress: newProgress(),
|
||||||
|
image: image,
|
||||||
|
cancel: cancel,
|
||||||
|
stopCh: make(chan struct{}),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// start starts the progressReporter
|
// start starts the progressReporter
|
||||||
func (p *progressReporter) start() {
|
func (p *progressReporter) start() {
|
||||||
go func() {
|
go func() {
|
||||||
ticker := time.NewTicker(p.interval)
|
ticker := time.NewTicker(defaultImagePullingProgressReportInterval)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
// TODO(random-liu): Report as events.
|
// TODO(random-liu): Report as events.
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
glog.V(2).Infof("Pulling image %q: %q", p.image, p.progress.get())
|
progress, timestamp := p.progress.get()
|
||||||
|
// If there is no progress for defaultImagePullingStuckTimeout, cancel the operation.
|
||||||
|
if time.Now().Sub(timestamp) > defaultImagePullingStuckTimeout {
|
||||||
|
glog.Errorf("Cancel pulling image %q because of no progress for %v, latest progress: %q", p.image, defaultImagePullingStuckTimeout, progress)
|
||||||
|
p.cancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Pulling image %q: %q", p.image, progress)
|
||||||
case <-p.stopCh:
|
case <-p.stopCh:
|
||||||
glog.V(2).Infof("Stop pulling image %q: %q", p.image, p.progress.get())
|
progress, _ := p.progress.get()
|
||||||
|
glog.V(2).Infof("Stop pulling image %q: %q", p.image, progress)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -270,14 +297,14 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
opts.RegistryAuth = base64Auth
|
opts.RegistryAuth = base64Auth
|
||||||
// Don't set timeout for the context because image pulling can be
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
// take an arbitrarily long time.
|
defer cancel()
|
||||||
resp, err := d.client.ImagePull(context.Background(), image, opts)
|
resp, err := d.client.ImagePull(ctx, image, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer resp.Close()
|
defer resp.Close()
|
||||||
reporter := newProgressReporter(image, defaultImagePullingProgressReportInterval)
|
reporter := newProgressReporter(image, cancel)
|
||||||
reporter.start()
|
reporter.start()
|
||||||
defer reporter.stop()
|
defer reporter.stop()
|
||||||
decoder := json.NewDecoder(resp)
|
decoder := json.NewDecoder(resp)
|
||||||
|
Loading…
Reference in New Issue
Block a user