diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 224a644ebfd..ea316b7821a 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -142,13 +142,6 @@ type ContainerCommandRunner interface { PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error } -// ImagePuller wraps Runtime.PullImage() to pull a container image. -// It will check the presence of the image, and report the 'image pulling', -// 'image pulled' events correspondingly. -type ImagePuller interface { - PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) -} - // Pod is a group of containers. type Pod struct { // The ID of the pod, which can be used to retrieve a particular pod diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 3f6f891cadb..f699a5deade 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/network" @@ -128,7 +129,7 @@ type DockerManager struct { dockerPuller DockerPuller // wrapped image puller. - imagePuller kubecontainer.ImagePuller + imagePuller images.ImageManager // Root of the Docker runtime. dockerRoot string @@ -261,11 +262,7 @@ func NewDockerManager( seccompProfileRoot: seccompProfileRoot, } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) - if serializeImagePulls { - dm.imagePuller = kubecontainer.NewSerializedImagePuller(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff) - } else { - dm.imagePuller = kubecontainer.NewImagePuller(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff) - } + dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls) dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir) dm.versionCache = cache.NewObjectCache( @@ -1718,7 +1715,7 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubecontainer.Do // No pod secrets for the infra container. // The message isn't needed for the Infra container - if err, msg := dm.imagePuller.PullImage(pod, container, nil); err != nil { + if err, msg := dm.imagePuller.EnsureImageExists(pod, container, nil); err != nil { return "", err, msg } @@ -2130,7 +2127,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec // tryContainerStart attempts to pull and start the container, returning an error and a reason string if the start // was not successful. func (dm *DockerManager) tryContainerStart(container *api.Container, pod *api.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, namespaceMode, pidMode, podIP string) (err error, reason string) { - err, msg := dm.imagePuller.PullImage(pod, container, pullSecrets) + err, msg := dm.imagePuller.EnsureImageExists(pod, container, pullSecrets) if err != nil { return err, msg } diff --git a/pkg/kubelet/images/doc.go b/pkg/kubelet/images/doc.go new file mode 100644 index 00000000000..9ccecb6b7a7 --- /dev/null +++ b/pkg/kubelet/images/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package images is responsible for managing lifecycle of container images. +package images diff --git a/pkg/kubelet/images/impl.go b/pkg/kubelet/images/impl.go new file mode 100644 index 00000000000..4a00a2e36f8 --- /dev/null +++ b/pkg/kubelet/images/impl.go @@ -0,0 +1,52 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package images + +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/flowcontrol" +) + +type imageManager struct { + recorder record.EventRecorder + runtime kubecontainer.Runtime + backOff *flowcontrol.Backoff + imagePuller imagePuller +} + +var _ ImageManager = &imageManager{} + +func NewImageManager(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager { + var imagePuller imagePuller + if serialized { + imagePuller = newSerializedImagePuller(recorder, runtime, imageBackOff) + } else { + imagePuller = newParallelImagePuller(recorder, runtime, imageBackOff) + } + return &imageManager{ + recorder: recorder, + runtime: runtime, + backOff: imageBackOff, + imagePuller: imagePuller, + } +} + +func (im *imageManager) EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { + return im.imagePuller.pullImage(pod, container, pullSecrets) +} diff --git a/pkg/kubelet/container/image_puller.go b/pkg/kubelet/images/parallel_image_puller.go similarity index 79% rename from pkg/kubelet/container/image_puller.go rename to pkg/kubelet/images/parallel_image_puller.go index d09b1685e45..c2fe7b16f36 100644 --- a/pkg/kubelet/container/image_puller.go +++ b/pkg/kubelet/images/parallel_image_puller.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package container +package images import ( "fmt" @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/flowcontrol" ) @@ -29,19 +30,19 @@ import ( // imagePuller pulls the image using Runtime.PullImage(). // It will check the presence of the image, and report the 'image pulling', // 'image pulled' events correspondingly. -type imagePuller struct { +type parallelImagePuller struct { recorder record.EventRecorder - runtime Runtime + runtime kubecontainer.Runtime backOff *flowcontrol.Backoff } // enforce compatibility. -var _ ImagePuller = &imagePuller{} +var _ imagePuller = ¶llelImagePuller{} // NewImagePuller takes an event recorder and container runtime to create a // image puller that wraps the container runtime's PullImage interface. -func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) ImagePuller { - return &imagePuller{ +func newParallelImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller { + return ¶llelImagePuller{ recorder: recorder, runtime: runtime, backOff: imageBackOff, @@ -64,7 +65,7 @@ func shouldPullImage(container *api.Container, imagePresent bool) bool { } // records an event using ref, event msg. log to glog using prefix, msg, logFn -func (puller *imagePuller) logIt(ref *api.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) { +func (puller *parallelImagePuller) logIt(ref *api.ObjectReference, eventtype, event, prefix, msg string, logFn func(args ...interface{})) { if ref != nil { puller.recorder.Event(ref, eventtype, event, msg) } else { @@ -73,19 +74,19 @@ func (puller *imagePuller) logIt(ref *api.ObjectReference, eventtype, event, pre } // PullImage pulls the image for the specified pod and container. -func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { +func (puller *parallelImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) - ref, err := GenerateContainerRef(pod, container) + ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } - spec := ImageSpec{container.Image} + spec := kubecontainer.ImageSpec{Image: container.Image} present, err := puller.runtime.IsImagePresent(spec) if err != nil { msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) - return ErrImageInspect, msg + return kubecontainer.ErrImageInspect, msg } if !shouldPullImage(container, present) { @@ -96,7 +97,7 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul } else { msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) - return ErrImageNeverPull, msg + return kubecontainer.ErrImageNeverPull, msg } } @@ -104,17 +105,17 @@ func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pul if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) { msg := fmt.Sprintf("Back-off pulling image %q", container.Image) puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) - return ErrImagePullBackOff, msg + return kubecontainer.ErrImagePullBackOff, msg } puller.logIt(ref, api.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) if err := puller.runtime.PullImage(spec, pullSecrets); err != nil { puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) - if err == RegistryUnavailable { + if err == kubecontainer.RegistryUnavailable { msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image) return err, msg } else { - return ErrImagePull, err.Error() + return kubecontainer.ErrImagePull, err.Error() } } puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) diff --git a/pkg/kubelet/container/image_puller_test.go b/pkg/kubelet/images/parallel_image_puller_test.go similarity index 96% rename from pkg/kubelet/container/image_puller_test.go rename to pkg/kubelet/images/parallel_image_puller_test.go index a18b8c57aaa..c8957cc3261 100644 --- a/pkg/kubelet/container/image_puller_test.go +++ b/pkg/kubelet/images/parallel_image_puller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package container_test +package images import ( "errors" @@ -106,7 +106,7 @@ func TestPuller(t *testing.T) { fakeRuntime := &ctest.FakeRuntime{} fakeRecorder := &record.FakeRecorder{} - puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff) + puller := newParallelImagePuller(fakeRecorder, fakeRuntime, backOff) fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 1}} fakeRuntime.Err = c.pullerErr @@ -114,7 +114,7 @@ func TestPuller(t *testing.T) { for tick, expected := range c.expectedErr { fakeClock.Step(time.Second) - err, _ := puller.PullImage(pod, container, nil) + err, _ := puller.pullImage(pod, container, nil) fakeRuntime.AssertCalls(c.calledFunctions) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) } diff --git a/pkg/kubelet/container/serialized_image_puller.go b/pkg/kubelet/images/serialized_image_puller.go similarity index 85% rename from pkg/kubelet/container/serialized_image_puller.go rename to pkg/kubelet/images/serialized_image_puller.go index 6049f96ce92..98cc6d06e3b 100644 --- a/pkg/kubelet/container/serialized_image_puller.go +++ b/pkg/kubelet/images/serialized_image_puller.go @@ -1,5 +1,5 @@ /* -Copyright 2015 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package container +package images import ( "fmt" @@ -23,13 +23,14 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/wait" ) type imagePullRequest struct { - spec ImageSpec + spec kubecontainer.ImageSpec container *api.Container pullSecrets []api.Secret logPrefix string @@ -42,19 +43,19 @@ type imagePullRequest struct { // 'image pulled' events correspondingly. type serializedImagePuller struct { recorder record.EventRecorder - runtime Runtime + runtime kubecontainer.Runtime backOff *flowcontrol.Backoff pullRequests chan *imagePullRequest } // enforce compatibility. -var _ ImagePuller = &serializedImagePuller{} +var _ imagePuller = &serializedImagePuller{} // NewSerializedImagePuller takes an event recorder and container runtime to create a // image puller that wraps the container runtime's PullImage interface. // Pulls one image at a time. // Issue #10959 has the rationale behind serializing image pulls. -func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) ImagePuller { +func newSerializedImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller { imagePuller := &serializedImagePuller{ recorder: recorder, runtime: runtime, @@ -75,19 +76,19 @@ func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, eventtype, } // PullImage pulls the image for the specified pod and container. -func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { +func (puller *serializedImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) - ref, err := GenerateContainerRef(pod, container) + ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } - spec := ImageSpec{container.Image} + spec := kubecontainer.ImageSpec{Image: container.Image} present, err := puller.runtime.IsImagePresent(spec) if err != nil { msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) - return ErrImageInspect, msg + return kubecontainer.ErrImageInspect, msg } if !shouldPullImage(container, present) { @@ -98,7 +99,7 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont } else { msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) - return ErrImageNeverPull, msg + return kubecontainer.ErrImageNeverPull, msg } } @@ -106,7 +107,7 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) { msg := fmt.Sprintf("Back-off pulling image %q", container.Image) puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) - return ErrImagePullBackOff, msg + return kubecontainer.ErrImagePullBackOff, msg } // enqueue image pull request and wait for response. @@ -122,11 +123,11 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont if err = <-returnChan; err != nil { puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) - if err == RegistryUnavailable { + if err == kubecontainer.RegistryUnavailable { msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image) return err, msg } else { - return ErrImagePull, err.Error() + return kubecontainer.ErrImagePull, err.Error() } } puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) diff --git a/pkg/kubelet/container/serialized_image_puller_test.go b/pkg/kubelet/images/serialized_image_puller_test.go similarity index 96% rename from pkg/kubelet/container/serialized_image_puller_test.go rename to pkg/kubelet/images/serialized_image_puller_test.go index f7830665ee7..182e1270592 100644 --- a/pkg/kubelet/container/serialized_image_puller_test.go +++ b/pkg/kubelet/images/serialized_image_puller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package container_test +package images import ( "errors" @@ -106,7 +106,7 @@ func TestSerializedPuller(t *testing.T) { fakeRuntime := &ctest.FakeRuntime{} fakeRecorder := &record.FakeRecorder{} - puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff) + puller := newSerializedImagePuller(fakeRecorder, fakeRuntime, backOff) fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 0}} fakeRuntime.Err = c.pullerErr @@ -114,7 +114,7 @@ func TestSerializedPuller(t *testing.T) { for tick, expected := range c.expectedErr { fakeClock.Step(time.Second) - err, _ := puller.PullImage(pod, container, nil) + err, _ := puller.pullImage(pod, container, nil) fakeRuntime.AssertCalls(c.calledFunctions) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) } diff --git a/pkg/kubelet/images/types.go b/pkg/kubelet/images/types.go new file mode 100644 index 00000000000..bc2334f42c2 --- /dev/null +++ b/pkg/kubelet/images/types.go @@ -0,0 +1,38 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package images + +import "k8s.io/kubernetes/pkg/api" + +// ImageManager provides an interface to manage the lifecycle of images. +// Implementations of this interface are expected to deal with pulling (downloading), +// managing, and deleting container images. +// Implementations are expected to abstract the underlying runtimes. +// Implementations are expected to be thread safe. +type ImageManager interface { + // EnsureImageExists ensures that image specified in `container` exists. + EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) + + // TODO(ronl): consolidating image managing and deleting operation in this interface +} + +// ImagePuller wraps Runtime.PullImage() to pull a container image. +// It will check the presence of the image, and report the 'image pulling', +// 'image pulled' events correspondingly. +type imagePuller interface { + pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) +} diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 2cfbe2e0d80..593b6a07f9a 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/credentialprovider" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/leaky" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/network" @@ -152,7 +153,7 @@ type Runtime struct { runtimeHelper kubecontainer.RuntimeHelper recorder record.EventRecorder livenessManager proberesults.Manager - imagePuller kubecontainer.ImagePuller + imagePuller images.ImageManager runner kubecontainer.HandlerRunner execer utilexec.Interface os kubecontainer.OSInterface @@ -271,11 +272,7 @@ func New( rkt.runner = lifecycle.NewHandlerRunner(httpClient, rkt, rkt) - if serializeImagePulls { - rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff) - } else { - rkt.imagePuller = kubecontainer.NewImagePuller(recorder, rkt, imageBackOff) - } + rkt.imagePuller = images.NewImageManager(recorder, rkt, imageBackOff, serializeImagePulls) if err := rkt.getVersions(); err != nil { return nil, fmt.Errorf("rkt: error getting version info: %v", err) @@ -753,7 +750,7 @@ func (r *Runtime) newAppcRuntimeApp(pod *api.Pod, podIP string, c api.Container, if requiresPrivileged && !securitycontext.HasPrivilegedRequest(&c) { return fmt.Errorf("cannot make %q: running a custom stage1 requires a privileged security context", format.Pod(pod)) } - if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil { + if err, _ := r.imagePuller.EnsureImageExists(pod, &c, pullSecrets); err != nil { return nil } imgManifest, err := r.getImageManifest(c.Image)