From 49c8683c24fad4f7e182d979a3bba606cad20f21 Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Wed, 1 Jun 2016 18:59:46 -0700 Subject: [PATCH] Add timeout for image pulling --- pkg/kubelet/dockertools/kube_docker_client.go | 61 +++++++++++++------ 1 file changed, 44 insertions(+), 17 deletions(-) diff --git a/pkg/kubelet/dockertools/kube_docker_client.go b/pkg/kubelet/dockertools/kube_docker_client.go index 8a0905269a6..29123a83b78 100644 --- a/pkg/kubelet/dockertools/kube_docker_client.go +++ b/pkg/kubelet/dockertools/kube_docker_client.go @@ -64,6 +64,12 @@ const ( // defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting. defaultImagePullingProgressReportInterval = 10 * time.Second + + // defaultImagePullingStuckTimeout is the default timeout for image pulling stuck. If no progress + // is made for defaultImagePullingStuckTimeout, the image pulling will be cancelled. + // Docker reports image progress for every 512kB block, so normally there shouldn't be too long interval + // between progress updates. + defaultImagePullingStuckTimeout = 1 * time.Minute ) // newKubeDockerClient creates an kubeDockerClient from an existing docker client. @@ -203,55 +209,76 @@ type progress struct { sync.RWMutex // message stores the latest docker json message. message *dockermessage.JSONMessage + // timestamp of the latest update. + timestamp time.Time +} + +func newProgress() *progress { + return &progress{timestamp: time.Now()} } func (p *progress) set(msg *dockermessage.JSONMessage) { p.Lock() defer p.Unlock() p.message = msg + p.timestamp = time.Now() } -func (p *progress) get() string { +func (p *progress) get() (string, time.Time) { p.RLock() defer p.RUnlock() if p.message == nil { - return "No progress" + return "No progress", p.timestamp } + // The following code is based on JSONMessage.Display var prefix string if p.message.ID != "" { prefix = fmt.Sprintf("%s: ", p.message.ID) } if p.message.Progress == nil { - return fmt.Sprintf("%s%s", prefix, p.message.Status) + return fmt.Sprintf("%s%s", prefix, p.message.Status), p.timestamp } - return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String()) + return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String()), p.timestamp } // progressReporter keeps the newest image pulling progress and periodically report the newest progress. type progressReporter struct { - progress - image string - interval time.Duration - stopCh chan struct{} + *progress + image string + cancel context.CancelFunc + stopCh chan struct{} } // newProgressReporter creates a new progressReporter for specific image with specified reporting interval -func newProgressReporter(image string, interval time.Duration) *progressReporter { - return &progressReporter{image: image, interval: interval, stopCh: make(chan struct{})} +func newProgressReporter(image string, cancel context.CancelFunc) *progressReporter { + return &progressReporter{ + progress: newProgress(), + image: image, + cancel: cancel, + stopCh: make(chan struct{}), + } } // start starts the progressReporter func (p *progressReporter) start() { go func() { - ticker := time.NewTicker(p.interval) + ticker := time.NewTicker(defaultImagePullingProgressReportInterval) defer ticker.Stop() for { // TODO(random-liu): Report as events. select { case <-ticker.C: - glog.V(2).Infof("Pulling image %q: %q", p.image, p.progress.get()) + progress, timestamp := p.progress.get() + // If there is no progress for defaultImagePullingStuckTimeout, cancel the operation. + if time.Now().Sub(timestamp) > defaultImagePullingStuckTimeout { + glog.Errorf("Cancel pulling image %q because of no progress for %v, latest progress: %q", p.image, defaultImagePullingStuckTimeout, progress) + p.cancel() + return + } + glog.V(2).Infof("Pulling image %q: %q", p.image, progress) case <-p.stopCh: - glog.V(2).Infof("Stop pulling image %q: %q", p.image, p.progress.get()) + progress, _ := p.progress.get() + glog.V(2).Infof("Stop pulling image %q: %q", p.image, progress) return } } @@ -270,14 +297,14 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, return err } opts.RegistryAuth = base64Auth - // Don't set timeout for the context because image pulling can be - // take an arbitrarily long time. - resp, err := d.client.ImagePull(context.Background(), image, opts) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + resp, err := d.client.ImagePull(ctx, image, opts) if err != nil { return err } defer resp.Close() - reporter := newProgressReporter(image, defaultImagePullingProgressReportInterval) + reporter := newProgressReporter(image, cancel) reporter.start() defer reporter.stop() decoder := json.NewDecoder(resp)