From 151d0ab0c16b45a997e30fa872ded29490228f0e Mon Sep 17 00:00:00 2001 From: Random-Liu Date: Tue, 24 May 2016 00:58:25 -0700 Subject: [PATCH] Periodically reporing image pulling progress in log --- pkg/kubelet/dockertools/kube_docker_client.go | 76 ++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/pkg/kubelet/dockertools/kube_docker_client.go b/pkg/kubelet/dockertools/kube_docker_client.go index 6e440b25279..943954f89e9 100644 --- a/pkg/kubelet/dockertools/kube_docker_client.go +++ b/pkg/kubelet/dockertools/kube_docker_client.go @@ -23,8 +23,11 @@ import ( "fmt" "io" "io/ioutil" + "sync" "time" + "github.com/golang/glog" + dockermessage "github.com/docker/docker/pkg/jsonmessage" dockerstdcopy "github.com/docker/docker/pkg/stdcopy" dockerapi "github.com/docker/engine-api/client" @@ -58,6 +61,9 @@ const ( // defaultShmSize is the default ShmSize to use (in bytes) if not specified. defaultShmSize = int64(1024 * 1024 * 64) + + // defaultImagePullingProgressReportInterval is the default interval of image pulling progress reporting. + defaultImagePullingProgressReportInterval = 10 * time.Second ) // newKubeDockerClient creates an kubeDockerClient from an existing docker client. @@ -192,6 +198,71 @@ func base64EncodeAuth(auth dockertypes.AuthConfig) (string, error) { return base64.URLEncoding.EncodeToString(buf.Bytes()), nil } +// progress is a wrapper of dockermessage.JSONMessage with a lock protecting it. +type progress struct { + sync.RWMutex + // message stores the latest docker json message. + message *dockermessage.JSONMessage +} + +func (p *progress) set(msg *dockermessage.JSONMessage) { + p.Lock() + defer p.Unlock() + p.message = msg +} + +func (p *progress) get() string { + p.RLock() + defer p.RUnlock() + if p.message == nil { + return "No progress" + } + var prefix string + if p.message.ID != "" { + prefix = fmt.Sprintf("%s: ", p.message.ID) + } + if p.message.Progress == nil { + return fmt.Sprintf("%s%s", prefix, p.message.Status) + } + return fmt.Sprintf("%s%s %s", prefix, p.message.Status, p.message.Progress.String()) +} + +// progressReporter keeps the newest image pulling progress and periodically report the newest progress. +type progressReporter struct { + progress + image string + interval time.Duration + stopCh chan struct{} +} + +// newProgressReporter creates a new progressReporter for specific image with specified reporting interval +func newProgressReporter(image string, interval time.Duration) *progressReporter { + return &progressReporter{image: image, interval: interval, stopCh: make(chan struct{})} +} + +// start starts the progressReporter +func (p *progressReporter) start() { + go func() { + ticker := time.NewTicker(p.interval) + defer ticker.Stop() + for { + // TODO(random-liu): Report as events. + select { + case <-ticker.C: + glog.V(2).Infof("Pulling image %q: %q", p.image, p.progress.get()) + case <-p.stopCh: + glog.V(2).Infof("Stop pulling image %q: %q", p.image, p.progress.get()) + return + } + } + }() +} + +// stop stops the progressReporter +func (p *progressReporter) stop() { + close(p.stopCh) +} + func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, opts dockertypes.ImagePullOptions) error { // RegistryAuth is the base64 encoded credentials for the registry base64Auth, err := base64EncodeAuth(auth) @@ -209,7 +280,9 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, return err } defer resp.Close() - // TODO(random-liu): Use the image pulling progress information. + reporter := newProgressReporter(image, defaultImagePullingProgressReportInterval) + reporter.start() + defer reporter.stop() decoder := json.NewDecoder(resp) for { var msg dockermessage.JSONMessage @@ -223,6 +296,7 @@ func (d *kubeDockerClient) PullImage(image string, auth dockertypes.AuthConfig, if msg.Error != nil { return msg.Error } + reporter.set(&msg) } return nil }