Add timeout for image pulling

This commit is contained in:
Random-Liu 2016-06-01 18:59:46 -07:00
parent 06f742a5ea
commit 49c8683c24

View File

@ -64,6 +64,12 @@ const (
// defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting. // defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting.
defaultImagePullingProgressReportInterval = 10 * time.Second defaultImagePullingProgressReportInterval = 10 * time.Second
// defaultImagePullingStuckTimeout is the default timeout for image pulling stuck. If no progress
// is made for defaultImagePullingStuckTimeout, the image pulling will be cancelled.
// Docker reports image progress for every 512kB block, so normally there shouldn't be too long interval
// between progress updates.
defaultImagePullingStuckTimeout = 1 * time.Minute
) )
// newKubeDockerClient creates an kubeDockerClient from an existing docker client. // newKubeDockerClient creates an kubeDockerClient from an existing docker client.
@ -203,55 +209,76 @@ type progress struct {
sync.RWMutex sync.RWMutex
// message stores the latest docker json message. // message stores the latest docker json message.
message *dockermessage.JSONMessage message *dockermessage.JSONMessage
// timestamp of the latest update.
timestamp time.Time
}
func newProgress() *progress {
return &progress{timestamp: time.Now()}
} }
func (p *progress) set(msg *dockermessage.JSONMessage) { func (p *progress) set(msg *dockermessage.JSONMessage) {
p.Lock() p.Lock()
defer p.Unlock() defer p.Unlock()
p.message = msg p.message = msg
p.timestamp = time.Now()
} }
func (p *progress) get() string { func (p *progress) get() (string, time.Time) {
p.RLock() p.RLock()
defer p.RUnlock() defer p.RUnlock()
if p.message == nil { if p.message == nil {
return "No progress" return "No progress", p.timestamp
} }
// The following code is based on JSONMessage.Display
var prefix string var prefix string
if p.message.ID != "" { if p.message.ID != "" {
prefix = fmt.Sprintf("%s: ", p.message.ID) prefix = fmt.Sprintf("%s: ", p.message.ID)
} }
if p.message.Progress == nil { if p.message.Progress == nil {
return fmt.Sprintf("%s%s", prefix, p.message.Status) return fmt.Sprintf("%s%s", prefix, p.message.Status), p.timestamp
} }
return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String()) return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String()), p.timestamp
} }
// progressReporter keeps the newest image pulling progress and periodically report the newest progress. // progressReporter keeps the newest image pulling progress and periodically report the newest progress.
type progressReporter struct { type progressReporter struct {
progress *progress
image string image string
interval time.Duration cancel context.CancelFunc
stopCh chan struct{} stopCh chan struct{}
} }
// newProgressReporter creates a new progressReporter for specific image with specified reporting interval // newProgressReporter creates a new progressReporter for specific image with specified reporting interval
func newProgressReporter(image string, interval time.Duration) *progressReporter { func newProgressReporter(image string, cancel context.CancelFunc) *progressReporter {
return &progressReporter{image: image, interval: interval, stopCh: make(chan struct{})} return &progressReporter{
progress: newProgress(),
image: image,
cancel: cancel,
stopCh: make(chan struct{}),
}
} }
// start starts the progressReporter // start starts the progressReporter
func (p *progressReporter) start() { func (p *progressReporter) start() {
go func() { go func() {
ticker := time.NewTicker(p.interval) ticker := time.NewTicker(defaultImagePullingProgressReportInterval)
defer ticker.Stop() defer ticker.Stop()
for { for {
// TODO(random-liu): Report as events. // TODO(random-liu): Report as events.
select { select {
case <-ticker.C: case <-ticker.C:
glog.V(2).Infof("Pulling image %q: %q", p.image, p.progress.get()) progress, timestamp := p.progress.get()
// If there is no progress for defaultImagePullingStuckTimeout, cancel the operation.
if time.Now().Sub(timestamp) > defaultImagePullingStuckTimeout {
glog.Errorf("Cancel pulling image %q because of no progress for %v, latest progress: %q", p.image, defaultImagePullingStuckTimeout, progress)
p.cancel()
return
}
glog.V(2).Infof("Pulling image %q: %q", p.image, progress)
case <-p.stopCh: case <-p.stopCh:
glog.V(2).Infof("Stop pulling image %q: %q", p.image, p.progress.get()) progress, _ := p.progress.get()
glog.V(2).Infof("Stop pulling image %q: %q", p.image, progress)
return return
} }
} }
@ -270,14 +297,14 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig,
return err return err
} }
opts.RegistryAuth = base64Auth opts.RegistryAuth = base64Auth
// Don't set timeout for the context because image pulling can be ctx, cancel := context.WithCancel(context.Background())
// take an arbitrarily long time. defer cancel()
resp, err := d.client.ImagePull(context.Background(), image, opts) resp, err := d.client.ImagePull(ctx, image, opts)
if err != nil { if err != nil {
return err return err
} }
defer resp.Close() defer resp.Close()
reporter := newProgressReporter(image, defaultImagePullingProgressReportInterval) reporter := newProgressReporter(image, cancel)
reporter.start() reporter.start()
defer reporter.stop() defer reporter.stop()
decoder := json.NewDecoder(resp) decoder := json.NewDecoder(resp)