diff --git a/pkg/kubelet/container/image_puller.go b/pkg/kubelet/container/image_puller.go new file mode 100644 index 00000000000..4a87ae1055b --- /dev/null +++ b/pkg/kubelet/container/image_puller.go @@ -0,0 +1,104 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package container + +import ( + "fmt" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" +) + +// imagePuller pulls the image using Runtime.PullImage(). +// It will check the presence of the image, and report the 'image pulling', +// 'image pulled' events correspondingly. +type imagePuller struct { + recorder record.EventRecorder + runtime Runtime +} + +// NewImagePuller takes an event recorder and container runtime to create a +// image puller that wraps the container runtime's PullImage interface. +func NewImagePuller(recorder record.EventRecorder, runtime Runtime) ImagePuller { + return &imagePuller{ + recorder: recorder, + runtime: runtime, + } +} + +// shouldPullImage returns whether we should pull an image according to +// the presence and pull policy of the image. +func shouldPullImage(container *api.Container, imagePresent bool) bool { + if container.ImagePullPolicy == api.PullNever { + return false + } + + if container.ImagePullPolicy == api.PullAlways || + (container.ImagePullPolicy == api.PullIfNotPresent && (!imagePresent)) { + return true + } + + return false +} + +// reportImagePull reports 'image pulling', 'image pulled' or 'image pulling failed' events. +func (puller *imagePuller) reportImagePull(ref *api.ObjectReference, event string, image string, pullError error) { + if ref == nil { + return + } + + switch event { + case "pulling": + puller.recorder.Eventf(ref, "pulling", "Pulling image %q", image) + case "pulled": + puller.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", image) + case "failed": + puller.recorder.Eventf(ref, "failed", "Failed to pull image %q: %v", image, pullError) + } +} + +// PullImage pulls the image for the specified pod and container. +func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error { + ref, err := GenerateContainerRef(pod, container) + if err != nil { + glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) + } + spec := ImageSpec{container.Image} + present, err := puller.runtime.IsImagePresent(spec) + if err != nil { + if ref != nil { + puller.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err) + } + return fmt.Errorf("failed to inspect image %q: %v", container.Image, err) + } + + if !shouldPullImage(container, present) { + if present && ref != nil { + puller.recorder.Eventf(ref, "pulled", "Container image %q already present on machine", container.Image) + } + return nil + } + + puller.reportImagePull(ref, "pulling", container.Image, nil) + if err = puller.runtime.PullImage(spec, pullSecrets); err != nil { + puller.reportImagePull(ref, "failed", container.Image, err) + return err + } + puller.reportImagePull(ref, "pulled", container.Image, nil) + return nil +} diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index 0d7106ac73a..d50f021f6a5 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -100,17 +100,11 @@ type ContainerCommandRunner interface { PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error } -// Customizable hooks injected into container runtimes. -type RuntimeHooks interface { - // Determines whether the runtime should pull the specified container's image. - ShouldPullImage(pod *api.Pod, container *api.Container, imagePresent bool) bool - - // Runs when we start to pull an image. - ReportImagePulling(pod *api.Pod, container *api.Container) - - // Runs after an image is pulled reporting its status. Error may be nil - // for a successful pull. - ReportImagePulled(pod *api.Pod, container *api.Container, err error) +// ImagePuller wraps Runtime.PullImage() to pull a container image. +// It will check the presence of the image, and report the 'image pulling', +// 'image pulled' events correspondingly. +type ImagePuller interface { + PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error } // Pod is a group of containers, with the status of the pod. diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index db914567bf3..348a7977fbc 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -656,7 +656,7 @@ func TestFindContainersByPod(t *testing.T) { } fakeClient := &FakeDockerClient{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil) + containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil) for i, test := range tests { fakeClient.ContainerList = test.containerList fakeClient.ExitedContainerList = test.exitedContainerList diff --git a/pkg/kubelet/dockertools/fake_manager.go b/pkg/kubelet/dockertools/fake_manager.go index 386ae597f03..d2df9a35287 100644 --- a/pkg/kubelet/dockertools/fake_manager.go +++ b/pkg/kubelet/dockertools/fake_manager.go @@ -40,15 +40,14 @@ func NewFakeDockerManager( osInterface kubecontainer.OSInterface, networkPlugin network.NetworkPlugin, generator kubecontainer.RunContainerOptionsGenerator, - httpClient kubeletTypes.HttpGetter, - runtimeHooks kubecontainer.RuntimeHooks) *DockerManager { + httpClient kubeletTypes.HttpGetter) *DockerManager { fakeOomAdjuster := oom.NewFakeOomAdjuster() fakeProcFs := procfs.NewFakeProcFs() dm := NewDockerManager(client, recorder, readinessManager, containerRefManager, machineInfo, podInfraContainerImage, qps, - burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, runtimeHooks, &NativeExecHandler{}, + burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{}, fakeOomAdjuster, fakeProcFs) - dm.puller = &FakeDockerPuller{} + dm.dockerPuller = &FakeDockerPuller{} dm.prober = prober.New(nil, readinessManager, containerRefManager, recorder) return dm } diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index d9bc6b05ec4..b845191a824 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -89,9 +89,13 @@ type DockerManager struct { // means that some entries may be recycled before a pod has been // deleted. reasonCache stringCache - // TODO(yifan): Record the pull failure so we can eliminate the image checking + // TODO(yifan): Record the pull failure so we can eliminate the image checking // in GetPodStatus()? - puller DockerPuller + // Lower level docker image puller. + dockerPuller DockerPuller + + // wrapped image puller. + imagePuller kubecontainer.ImagePuller // Root of the Docker runtime. dockerRoot string @@ -111,9 +115,6 @@ type DockerManager struct { // Runner of lifecycle events. runner kubecontainer.HandlerRunner - // Hooks injected into the container runtime. - runtimeHooks kubecontainer.RuntimeHooks - // Handler used to execute commands in containers. execHandler ExecHandler @@ -138,7 +139,6 @@ func NewDockerManager( networkPlugin network.NetworkPlugin, generator kubecontainer.RunContainerOptionsGenerator, httpClient kubeletTypes.HttpGetter, - runtimeHooks kubecontainer.RuntimeHooks, execHandler ExecHandler, oomAdjuster *oom.OomAdjuster, procFs procfs.ProcFsInterface) *DockerManager { @@ -183,19 +183,19 @@ func NewDockerManager( machineInfo: machineInfo, podInfraContainerImage: podInfraContainerImage, reasonCache: reasonCache, - puller: newDockerPuller(client, qps, burst), + dockerPuller: newDockerPuller(client, qps, burst), dockerRoot: dockerRoot, containerLogsDir: containerLogsDir, networkPlugin: networkPlugin, prober: nil, generator: generator, - runtimeHooks: runtimeHooks, execHandler: execHandler, oomAdjuster: oomAdjuster, procFs: procFs, } dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm) dm.prober = prober.New(dm, readinessManager, containerRefManager, recorder) + dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm) return dm } @@ -829,12 +829,12 @@ func (dm *DockerManager) ListImages() ([]kubecontainer.Image, error) { // TODO(vmarmol): Consider unexporting. // PullImage pulls an image from network to local storage. func (dm *DockerManager) PullImage(image kubecontainer.ImageSpec, secrets []api.Secret) error { - return dm.puller.Pull(image.Image, secrets) + return dm.dockerPuller.Pull(image.Image, secrets) } // IsImagePresent checks whether the container image is already in the local storage. func (dm *DockerManager) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) { - return dm.puller.IsImagePresent(image.Image) + return dm.dockerPuller.IsImagePresent(image.Image) } // Removes the specified image. @@ -1368,7 +1368,7 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.Doc } // No pod secrets for the infra container. - if err := dm.pullImage(pod, container, nil); err != nil { + if err := dm.imagePuller.PullImage(pod, container, nil); err != nil { return "", err } @@ -1525,33 +1525,6 @@ func (dm *DockerManager) clearReasonCache(pod *api.Pod, container *api.Container dm.reasonCache.Remove(pod.UID, container.Name) } -// Pull the image for the specified pod and container. -func (dm *DockerManager) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error { - ref, err := kubecontainer.GenerateContainerRef(pod, container) - if err != nil { - glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) - } - spec := kubecontainer.ImageSpec{Image: container.Image} - present, err := dm.IsImagePresent(spec) - if err != nil { - if ref != nil { - dm.recorder.Eventf(ref, "Failed", "Failed to inspect image %q: %v", container.Image, err) - } - return fmt.Errorf("failed to inspect image %q: %v", container.Image, err) - } - if !dm.runtimeHooks.ShouldPullImage(pod, container, present) { - if present && ref != nil { - dm.recorder.Eventf(ref, "Pulled", "Container image %q already present on machine", container.Image) - } - return nil - } - - dm.runtimeHooks.ReportImagePulling(pod, container) - err = dm.PullImage(spec, pullSecrets) - dm.runtimeHooks.ReportImagePulled(pod, container, err) - return err -} - // Sync the running pod to match the specified desired pod. func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error { start := time.Now() @@ -1612,7 +1585,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod for idx := range containerChanges.ContainersToStart { container := &pod.Spec.Containers[idx] glog.V(4).Infof("Creating container %+v in pod %v", container, podFullName) - err := dm.pullImage(pod, container, pullSecrets) + err := dm.imagePuller.PullImage(pod, container, pullSecrets) dm.updateReasonCache(pod, container, err) if err != nil { glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", container.Image, kubecontainer.GetPodFullName(pod), container.Name, err) diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index 1968b9f4b96..137988c0ae5 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -54,40 +54,6 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) { return nil, f.err } -// TODO: Find a better way to mock the runtime hooks so that we don't have to -// duplicate the code here. -type fakeRuntimeHooks struct { - recorder record.EventRecorder -} - -var _ kubecontainer.RuntimeHooks = &fakeRuntimeHooks{} - -func newFakeRuntimeHooks(recorder record.EventRecorder) kubecontainer.RuntimeHooks { - return &fakeRuntimeHooks{ - recorder: recorder, - } -} - -func (fr *fakeRuntimeHooks) ShouldPullImage(pod *api.Pod, container *api.Container, imagePresent bool) bool { - if container.ImagePullPolicy == api.PullNever { - return false - } - if container.ImagePullPolicy == api.PullAlways || - (container.ImagePullPolicy == api.PullIfNotPresent && (!imagePresent)) { - return true - } - - return false -} - -func (fr *fakeRuntimeHooks) ReportImagePulling(pod *api.Pod, container *api.Container) { - fr.recorder.Eventf(nil, "Pulling", fmt.Sprintf("%s:%s:%s", pod.Name, container.Name, container.Image)) -} - -func (fr *fakeRuntimeHooks) ReportImagePulled(pod *api.Pod, container *api.Container, pullError error) { - fr.recorder.Eventf(nil, "Pulled", fmt.Sprintf("%s:%s:%s", pod.Name, container.Name, container.Image)) -} - type fakeOptionGenerator struct{} var _ kubecontainer.RunContainerOptionsGenerator = &fakeOptionGenerator{} @@ -113,7 +79,6 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage readinessManager := kubecontainer.NewReadinessManager() containerRefManager := kubecontainer.NewRefManager() networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - runtimeHooks := newFakeRuntimeHooks(fakeRecorder) optionGenerator := &fakeOptionGenerator{} dockerManager := NewFakeDockerManager( fakeDocker, @@ -126,8 +91,7 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage kubecontainer.FakeOS{}, networkPlugin, optionGenerator, - fakeHTTPClient, - runtimeHooks) + fakeHTTPClient) return dockerManager, fakeDocker } @@ -945,7 +909,7 @@ func TestSyncPodCreateNetAndContainer(t *testing.T) { func TestSyncPodCreatesNetAndContainerPullsImage(t *testing.T) { dm, fakeDocker := newTestDockerManager() dm.podInfraContainerImage = "pod_infra_image" - puller := dm.puller.(*FakeDockerPuller) + puller := dm.dockerPuller.(*FakeDockerPuller) puller.HasImages = []string{} dm.podInfraContainerImage = "pod_infra_image" fakeDocker.ContainerList = []docker.APIContainers{} @@ -1306,7 +1270,7 @@ func TestSyncPodsDoesNothing(t *testing.T) { func TestSyncPodWithPullPolicy(t *testing.T) { api.ForTesting_ReferencesAllowBlankSelfLinks = true dm, fakeDocker := newTestDockerManager() - puller := dm.puller.(*FakeDockerPuller) + puller := dm.dockerPuller.(*FakeDockerPuller) puller.HasImages = []string{"existing_one", "want:latest"} dm.podInfraContainerImage = "pod_infra_image" fakeDocker.ContainerList = []docker.APIContainers{} @@ -1333,22 +1297,21 @@ func TestSyncPodWithPullPolicy(t *testing.T) { fakeDocker.Lock() eventSet := []string{ - "Pulling foo:POD:pod_infra_image", - "Pulled foo:POD:pod_infra_image", - "Pulling foo:bar:pull_always_image", - "Pulled foo:bar:pull_always_image", - "Pulling foo:bar2:pull_if_not_present_image", - "Pulled foo:bar2:pull_if_not_present_image", - `Pulled Container image "existing_one" already present on machine`, - `Pulled Container image "want:latest" already present on machine`, + `pulling Pulling image "pod_infra_image"`, + `pulled Successfully pulled image "pod_infra_image"`, + `pulling Pulling image "pull_always_image"`, + `pulled Successfully pulled image "pull_always_image"`, + `pulling Pulling image "pull_if_not_present_image"`, + `pulled Successfully pulled image "pull_if_not_present_image"`, + `pulled Container image "existing_one" already present on machine`, + `pulled Container image "want:latest" already present on machine`, } - runtimeHooks := dm.runtimeHooks.(*fakeRuntimeHooks) - recorder := runtimeHooks.recorder.(*record.FakeRecorder) + recorder := dm.recorder.(*record.FakeRecorder) var actualEvents []string for _, ev := range recorder.Events { - if strings.HasPrefix(ev, "Pull") { + if strings.HasPrefix(ev, "pull") { actualEvents = append(actualEvents, ev) } } @@ -1699,7 +1662,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) { dm, fakeDocker := newTestDockerManager() // Initialize the FakeDockerPuller so that it'd try to pull non-existent // images. - puller := dm.puller.(*FakeDockerPuller) + puller := dm.dockerPuller.(*FakeDockerPuller) puller.HasImages = []string{} // Inject the pull image failure error. failureReason := "pull image faiulre" diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1f99ed910ec..2b4c3404dae 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -300,7 +300,6 @@ func NewMainKubelet( klet.networkPlugin, klet, klet.httpClient, - newKubeletRuntimeHooks(recorder), dockerExecHandler, oomAdjuster, procFs) diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 8bc1e2ccba6..aa0561bd5b4 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -45,7 +45,7 @@ func newPod(uid, name string) *api.Pod { func createFakeRuntimeCache(fakeRecorder *record.FakeRecorder) kubecontainer.RuntimeCache { fakeDocker := &dockertools.FakeDockerClient{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, newKubeletRuntimeHooks(fakeRecorder)) + dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil) return kubecontainer.NewFakeRuntimeCache(dockerManager) } @@ -225,7 +225,7 @@ func TestFakePodWorkers(t *testing.T) { fakeDocker := &dockertools.FakeDockerClient{} fakeRecorder := &record.FakeRecorder{} np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) - dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, newKubeletRuntimeHooks(fakeRecorder)) + dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil) fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager) kubeletForRealWorkers := &simpleFakeKubelet{} diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index d9fec8fc531..94bbcec48c1 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -162,8 +162,7 @@ func TestRunOnce(t *testing.T) { kubecontainer.FakeOS{}, kb.networkPlugin, kb, - nil, - newKubeletRuntimeHooks(kb.recorder)) + nil) pods := []*api.Pod{ { diff --git a/pkg/kubelet/runtime_hooks.go b/pkg/kubelet/runtime_hooks.go deleted file mode 100644 index 5f72a7d0851..00000000000 --- a/pkg/kubelet/runtime_hooks.go +++ /dev/null @@ -1,73 +0,0 @@ -/* -Copyright 2015 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kubelet - -import ( - "github.com/golang/glog" - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/client/record" - kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" -) - -// Kubelet-specific runtime hooks. -type kubeletRuntimeHooks struct { - recorder record.EventRecorder -} - -var _ kubecontainer.RuntimeHooks = &kubeletRuntimeHooks{} - -func newKubeletRuntimeHooks(recorder record.EventRecorder) kubecontainer.RuntimeHooks { - return &kubeletRuntimeHooks{ - recorder: recorder, - } -} - -func (kr *kubeletRuntimeHooks) ShouldPullImage(pod *api.Pod, container *api.Container, imagePresent bool) bool { - if container.ImagePullPolicy == api.PullNever { - return false - } - - if container.ImagePullPolicy == api.PullAlways || - (container.ImagePullPolicy == api.PullIfNotPresent && (!imagePresent)) { - return true - } - - return false -} - -func (kr *kubeletRuntimeHooks) ReportImagePulled(pod *api.Pod, container *api.Container, pullError error) { - ref, err := kubecontainer.GenerateContainerRef(pod, container) - if err != nil { - glog.Errorf("Couldn't make a ref to pod %q, container %q: '%v'", pod.Name, container.Name, err) - return - } - - if pullError != nil { - kr.recorder.Eventf(ref, "Failed", "Failed to pull image %q: %v", container.Image, pullError) - } else { - kr.recorder.Eventf(ref, "Pulled", "Successfully pulled image %q", container.Image) - } -} - -func (kr *kubeletRuntimeHooks) ReportImagePulling(pod *api.Pod, container *api.Container) { - ref, err := kubecontainer.GenerateContainerRef(pod, container) - if err != nil { - glog.Errorf("Couldn't make a ref to pod %q, container %q: '%v'", pod.Name, container.Name, err) - return - } - kr.recorder.Eventf(ref, "Pulling", "Pulling image %q", container.Image) -}