From 367a683273342654d84c94f57cbd09c549ac535f Mon Sep 17 00:00:00 2001 From: Ron Lai Date: Thu, 14 Jul 2016 15:54:02 -0700 Subject: [PATCH] Wrapping image pullers inside ImageManager --- pkg/kubelet/images/doc.go | 3 ++ pkg/kubelet/images/impl.go | 45 ++++++++++++------- pkg/kubelet/images/parallel_image_puller.go | 23 +++++----- .../images/parallel_image_puller_test.go | 6 +-- pkg/kubelet/images/serialized_image_puller.go | 23 +++++----- .../images/serialized_image_puller_test.go | 6 +-- pkg/kubelet/images/types.go | 5 ++- 7 files changed, 65 insertions(+), 46 deletions(-) diff --git a/pkg/kubelet/images/doc.go b/pkg/kubelet/images/doc.go index 69322ec3098..9ccecb6b7a7 100644 --- a/pkg/kubelet/images/doc.go +++ b/pkg/kubelet/images/doc.go @@ -1,9 +1,12 @@ /* Copyright 2016 The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/pkg/kubelet/images/impl.go b/pkg/kubelet/images/impl.go index 2aba1655ea8..4a00a2e36f8 100644 --- a/pkg/kubelet/images/impl.go +++ b/pkg/kubelet/images/impl.go @@ -1,10 +1,12 @@ - /* -Copyright 2016 The Kubernetes Authors All. +Copyright 2016 The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -14,28 +16,37 @@ limitations under the License. package images -type ImageManager struct { - recorder record.EventRecorder - runtime container.Runtime - backOff *flowcontrol.Backoff +import ( + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/util/flowcontrol" +) + +type imageManager struct { + recorder record.EventRecorder + runtime kubecontainer.Runtime + backOff *flowcontrol.Backoff imagePuller imagePuller } -func NewImageManager(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager { +var _ ImageManager = &imageManager{} + +func NewImageManager(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff, serialized bool) ImageManager { var imagePuller imagePuller if serialized { - imagePuller = NewSerializedImagePuller(recorder, runtime, imageBackOff) + imagePuller = newSerializedImagePuller(recorder, runtime, imageBackOff) } else { - imagePuller = NewParallelImagePuller(recorder, runtime, imageBackOff) + imagePuller = newParallelImagePuller(recorder, runtime, imageBackOff) } return &imageManager{ - recorder: recorder, - runtime: runtime, - backOff: backOff, - imagePuller: imagePuller, - } + recorder: recorder, + runtime: runtime, + backOff: imageBackOff, + imagePuller: imagePuller, + } } -func (*) EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { - return imagePuller.pullImage(pod , container pullSecrets) -} \ No newline at end of file +func (im *imageManager) EnsureImageExists(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { + return im.imagePuller.pullImage(pod, container, pullSecrets) +} diff --git a/pkg/kubelet/images/parallel_image_puller.go b/pkg/kubelet/images/parallel_image_puller.go index 1594ccfd876..c2fe7b16f36 100644 --- a/pkg/kubelet/images/parallel_image_puller.go +++ b/pkg/kubelet/images/parallel_image_puller.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/flowcontrol" ) @@ -31,7 +32,7 @@ import ( // 'image pulled' events correspondingly. type parallelImagePuller struct { recorder record.EventRecorder - runtime Runtime + runtime kubecontainer.Runtime backOff *flowcontrol.Backoff } @@ -40,8 +41,8 @@ var _ imagePuller = ¶llelImagePuller{} // NewImagePuller takes an event recorder and container runtime to create a // image puller that wraps the container runtime's PullImage interface. -func NewImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) imagePuller { - return &imagePuller{ +func newParallelImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller { + return ¶llelImagePuller{ recorder: recorder, runtime: runtime, backOff: imageBackOff, @@ -73,19 +74,19 @@ func (puller *parallelImagePuller) logIt(ref *api.ObjectReference, eventtype, ev } // PullImage pulls the image for the specified pod and container. -func (puller *parallelImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { +func (puller *parallelImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) - ref, err := GenerateContainerRef(pod, container) + ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } - spec := ImageSpec{container.Image} + spec := kubecontainer.ImageSpec{Image: container.Image} present, err := puller.runtime.IsImagePresent(spec) if err != nil { msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) - return ErrImageInspect, msg + return kubecontainer.ErrImageInspect, msg } if !shouldPullImage(container, present) { @@ -96,7 +97,7 @@ func (puller *parallelImagePuller) PullImage(pod *api.Pod, container *api.Contai } else { msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) - return ErrImageNeverPull, msg + return kubecontainer.ErrImageNeverPull, msg } } @@ -104,17 +105,17 @@ func (puller *parallelImagePuller) PullImage(pod *api.Pod, container *api.Contai if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) { msg := fmt.Sprintf("Back-off pulling image %q", container.Image) puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) - return ErrImagePullBackOff, msg + return kubecontainer.ErrImagePullBackOff, msg } puller.logIt(ref, api.EventTypeNormal, events.PullingImage, logPrefix, fmt.Sprintf("pulling image %q", container.Image), glog.Info) if err := puller.runtime.PullImage(spec, pullSecrets); err != nil { puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) - if err == RegistryUnavailable { + if err == kubecontainer.RegistryUnavailable { msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image) return err, msg } else { - return ErrImagePull, err.Error() + return kubecontainer.ErrImagePull, err.Error() } } puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) diff --git a/pkg/kubelet/images/parallel_image_puller_test.go b/pkg/kubelet/images/parallel_image_puller_test.go index a18b8c57aaa..c8957cc3261 100644 --- a/pkg/kubelet/images/parallel_image_puller_test.go +++ b/pkg/kubelet/images/parallel_image_puller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package container_test +package images import ( "errors" @@ -106,7 +106,7 @@ func TestPuller(t *testing.T) { fakeRuntime := &ctest.FakeRuntime{} fakeRecorder := &record.FakeRecorder{} - puller := NewImagePuller(fakeRecorder, fakeRuntime, backOff) + puller := newParallelImagePuller(fakeRecorder, fakeRuntime, backOff) fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 1}} fakeRuntime.Err = c.pullerErr @@ -114,7 +114,7 @@ func TestPuller(t *testing.T) { for tick, expected := range c.expectedErr { fakeClock.Step(time.Second) - err, _ := puller.PullImage(pod, container, nil) + err, _ := puller.pullImage(pod, container, nil) fakeRuntime.AssertCalls(c.calledFunctions) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) } diff --git a/pkg/kubelet/images/serialized_image_puller.go b/pkg/kubelet/images/serialized_image_puller.go index dd2efe178f3..98cc6d06e3b 100644 --- a/pkg/kubelet/images/serialized_image_puller.go +++ b/pkg/kubelet/images/serialized_image_puller.go @@ -23,13 +23,14 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/wait" ) type imagePullRequest struct { - spec ImageSpec + spec kubecontainer.ImageSpec container *api.Container pullSecrets []api.Secret logPrefix string @@ -42,7 +43,7 @@ type imagePullRequest struct { // 'image pulled' events correspondingly. type serializedImagePuller struct { recorder record.EventRecorder - runtime Runtime + runtime kubecontainer.Runtime backOff *flowcontrol.Backoff pullRequests chan *imagePullRequest } @@ -54,7 +55,7 @@ var _ imagePuller = &serializedImagePuller{} // image puller that wraps the container runtime's PullImage interface. // Pulls one image at a time. // Issue #10959 has the rationale behind serializing image pulls. -func NewSerializedImagePuller(recorder record.EventRecorder, runtime Runtime, imageBackOff *flowcontrol.Backoff) imagePuller { +func newSerializedImagePuller(recorder record.EventRecorder, runtime kubecontainer.Runtime, imageBackOff *flowcontrol.Backoff) imagePuller { imagePuller := &serializedImagePuller{ recorder: recorder, runtime: runtime, @@ -75,19 +76,19 @@ func (puller *serializedImagePuller) logIt(ref *api.ObjectReference, eventtype, } // PullImage pulls the image for the specified pod and container. -func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { +func (puller *serializedImagePuller) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) (error, string) { logPrefix := fmt.Sprintf("%s/%s", pod.Name, container.Image) - ref, err := GenerateContainerRef(pod, container) + ref, err := kubecontainer.GenerateContainerRef(pod, container) if err != nil { glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } - spec := ImageSpec{container.Image} + spec := kubecontainer.ImageSpec{Image: container.Image} present, err := puller.runtime.IsImagePresent(spec) if err != nil { msg := fmt.Sprintf("Failed to inspect image %q: %v", container.Image, err) puller.logIt(ref, api.EventTypeWarning, events.FailedToInspectImage, logPrefix, msg, glog.Warning) - return ErrImageInspect, msg + return kubecontainer.ErrImageInspect, msg } if !shouldPullImage(container, present) { @@ -98,7 +99,7 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont } else { msg := fmt.Sprintf("Container image %q is not present with pull policy of Never", container.Image) puller.logIt(ref, api.EventTypeWarning, events.ErrImageNeverPullPolicy, logPrefix, msg, glog.Warning) - return ErrImageNeverPull, msg + return kubecontainer.ErrImageNeverPull, msg } } @@ -106,7 +107,7 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont if puller.backOff.IsInBackOffSinceUpdate(backOffKey, puller.backOff.Clock.Now()) { msg := fmt.Sprintf("Back-off pulling image %q", container.Image) puller.logIt(ref, api.EventTypeNormal, events.BackOffPullImage, logPrefix, msg, glog.Info) - return ErrImagePullBackOff, msg + return kubecontainer.ErrImagePullBackOff, msg } // enqueue image pull request and wait for response. @@ -122,11 +123,11 @@ func (puller *serializedImagePuller) PullImage(pod *api.Pod, container *api.Cont if err = <-returnChan; err != nil { puller.logIt(ref, api.EventTypeWarning, events.FailedToPullImage, logPrefix, fmt.Sprintf("Failed to pull image %q: %v", container.Image, err), glog.Warning) puller.backOff.Next(backOffKey, puller.backOff.Clock.Now()) - if err == RegistryUnavailable { + if err == kubecontainer.RegistryUnavailable { msg := fmt.Sprintf("image pull failed for %s because the registry is unavailable.", container.Image) return err, msg } else { - return ErrImagePull, err.Error() + return kubecontainer.ErrImagePull, err.Error() } } puller.logIt(ref, api.EventTypeNormal, events.PulledImage, logPrefix, fmt.Sprintf("Successfully pulled image %q", container.Image), glog.Info) diff --git a/pkg/kubelet/images/serialized_image_puller_test.go b/pkg/kubelet/images/serialized_image_puller_test.go index f7830665ee7..182e1270592 100644 --- a/pkg/kubelet/images/serialized_image_puller_test.go +++ b/pkg/kubelet/images/serialized_image_puller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package container_test +package images import ( "errors" @@ -106,7 +106,7 @@ func TestSerializedPuller(t *testing.T) { fakeRuntime := &ctest.FakeRuntime{} fakeRecorder := &record.FakeRecorder{} - puller := NewSerializedImagePuller(fakeRecorder, fakeRuntime, backOff) + puller := newSerializedImagePuller(fakeRecorder, fakeRuntime, backOff) fakeRuntime.ImageList = []Image{{"present_image", nil, nil, 0}} fakeRuntime.Err = c.pullerErr @@ -114,7 +114,7 @@ func TestSerializedPuller(t *testing.T) { for tick, expected := range c.expectedErr { fakeClock.Step(time.Second) - err, _ := puller.PullImage(pod, container, nil) + err, _ := puller.pullImage(pod, container, nil) fakeRuntime.AssertCalls(c.calledFunctions) assert.Equal(t, expected, err, "in test %d tick=%d", i, tick) } diff --git a/pkg/kubelet/images/types.go b/pkg/kubelet/images/types.go index 2161563c53a..bc2334f42c2 100644 --- a/pkg/kubelet/images/types.go +++ b/pkg/kubelet/images/types.go @@ -1,9 +1,12 @@ /* -Copyright 2016 The Kubernetes Authors All. +Copyright 2016 The Kubernetes Authors. + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.