kubelet: refactor kubelet.Runtimehooks to container.ImagePuller.

This commit is contained in:
Yifan Gu 2015-08-10 10:28:39 -07:00
parent 6cff082918
commit d70a30c069
10 changed files with 142 additions and 184 deletions

View File

@ -0,0 +1,104 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
)
// imagePuller pulls the image using Runtime.PullImage().
// It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly.
type imagePuller struct {
recorder record.EventRecorder
runtime Runtime
}
// NewImagePuller takes an event recorder and container runtime to create a
// image puller that wraps the container runtime's PullImage interface.
func NewImagePuller(recorder record.EventRecorder, runtime Runtime) ImagePuller {
return &imagePuller{
recorder: recorder,
runtime: runtime,
}
}
// shouldPullImage returns whether we should pull an image according to
// the presence and pull policy of the image.
func shouldPullImage(container *api.Container, imagePresent bool) bool {
if container.ImagePullPolicy == api.PullNever {
return false
}
if container.ImagePullPolicy == api.PullAlways ||
(container.ImagePullPolicy == api.PullIfNotPresent && (!imagePresent)) {
return true
}
return false
}
// reportImagePull reports 'image pulling', 'image pulled' or 'image pulling failed' events.
func (puller *imagePuller) reportImagePull(ref *api.ObjectReference, event string, image string, pullError error) {
if ref == nil {
return
}
switch event {
case "pulling":
puller.recorder.Eventf(ref, "pulling", "Pulling image %q", image)
case "pulled":
puller.recorder.Eventf(ref, "pulled", "Successfully pulled image %q", image)
case "failed":
puller.recorder.Eventf(ref, "failed", "Failed to pull image %q: %v", image, pullError)
}
}
// PullImage pulls the image for the specified pod and container.
func (puller *imagePuller) PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error {
ref, err := GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
spec := ImageSpec{container.Image}
present, err := puller.runtime.IsImagePresent(spec)
if err != nil {
if ref != nil {
puller.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
}
return fmt.Errorf("failed to inspect image %q: %v", container.Image, err)
}
if !shouldPullImage(container, present) {
if present && ref != nil {
puller.recorder.Eventf(ref, "pulled", "Container image %q already present on machine", container.Image)
}
return nil
}
puller.reportImagePull(ref, "pulling", container.Image, nil)
if err = puller.runtime.PullImage(spec, pullSecrets); err != nil {
puller.reportImagePull(ref, "failed", container.Image, err)
return err
}
puller.reportImagePull(ref, "pulled", container.Image, nil)
return nil
}

View File

@ -100,17 +100,11 @@ type ContainerCommandRunner interface {
PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error
}
// Customizable hooks injected into container runtimes.
type RuntimeHooks interface {
// Determines whether the runtime should pull the specified container's image.
ShouldPullImage(pod *api.Pod, container *api.Container, imagePresent bool) bool
// Runs when we start to pull an image.
ReportImagePulling(pod *api.Pod, container *api.Container)
// Runs after an image is pulled reporting its status. Error may be nil
// for a successful pull.
ReportImagePulled(pod *api.Pod, container *api.Container, err error)
// ImagePuller wraps Runtime.PullImage() to pull a container image.
// It will check the presence of the image, and report the 'image pulling',
// 'image pulled' events correspondingly.
type ImagePuller interface {
PullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error
}
// Pod is a group of containers, with the status of the pod.

View File

@ -656,7 +656,7 @@ func TestFindContainersByPod(t *testing.T) {
}
fakeClient := &FakeDockerClient{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, nil)
containerManager := NewFakeDockerManager(fakeClient, &record.FakeRecorder{}, nil, nil, &cadvisorApi.MachineInfo{}, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil)
for i, test := range tests {
fakeClient.ContainerList = test.containerList
fakeClient.ExitedContainerList = test.exitedContainerList

View File

@ -40,15 +40,14 @@ func NewFakeDockerManager(
osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin,
generator kubecontainer.RunContainerOptionsGenerator,
httpClient kubeletTypes.HttpGetter,
runtimeHooks kubecontainer.RuntimeHooks) *DockerManager {
httpClient kubeletTypes.HttpGetter) *DockerManager {
fakeOomAdjuster := oom.NewFakeOomAdjuster()
fakeProcFs := procfs.NewFakeProcFs()
dm := NewDockerManager(client, recorder, readinessManager, containerRefManager, machineInfo, podInfraContainerImage, qps,
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, runtimeHooks, &NativeExecHandler{},
burst, containerLogsDir, osInterface, networkPlugin, generator, httpClient, &NativeExecHandler{},
fakeOomAdjuster, fakeProcFs)
dm.puller = &FakeDockerPuller{}
dm.dockerPuller = &FakeDockerPuller{}
dm.prober = prober.New(nil, readinessManager, containerRefManager, recorder)
return dm
}

View File

@ -89,9 +89,13 @@ type DockerManager struct {
// means that some entries may be recycled before a pod has been
// deleted.
reasonCache stringCache
// TODO(yifan): Record the pull failure so we can eliminate the image checking
// TODO(yifan): Record the pull failure so we can eliminate the image checking
// in GetPodStatus()?
puller DockerPuller
// Lower level docker image puller.
dockerPuller DockerPuller
// wrapped image puller.
imagePuller kubecontainer.ImagePuller
// Root of the Docker runtime.
dockerRoot string
@ -111,9 +115,6 @@ type DockerManager struct {
// Runner of lifecycle events.
runner kubecontainer.HandlerRunner
// Hooks injected into the container runtime.
runtimeHooks kubecontainer.RuntimeHooks
// Handler used to execute commands in containers.
execHandler ExecHandler
@ -138,7 +139,6 @@ func NewDockerManager(
networkPlugin network.NetworkPlugin,
generator kubecontainer.RunContainerOptionsGenerator,
httpClient kubeletTypes.HttpGetter,
runtimeHooks kubecontainer.RuntimeHooks,
execHandler ExecHandler,
oomAdjuster *oom.OomAdjuster,
procFs procfs.ProcFsInterface) *DockerManager {
@ -183,19 +183,19 @@ func NewDockerManager(
machineInfo: machineInfo,
podInfraContainerImage: podInfraContainerImage,
reasonCache: reasonCache,
puller: newDockerPuller(client, qps, burst),
dockerPuller: newDockerPuller(client, qps, burst),
dockerRoot: dockerRoot,
containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin,
prober: nil,
generator: generator,
runtimeHooks: runtimeHooks,
execHandler: execHandler,
oomAdjuster: oomAdjuster,
procFs: procFs,
}
dm.runner = lifecycle.NewHandlerRunner(httpClient, dm, dm)
dm.prober = prober.New(dm, readinessManager, containerRefManager, recorder)
dm.imagePuller = kubecontainer.NewImagePuller(recorder, dm)
return dm
}
@ -829,12 +829,12 @@ func (dm *DockerManager) ListImages() ([]kubecontainer.Image, error) {
// TODO(vmarmol): Consider unexporting.
// PullImage pulls an image from network to local storage.
func (dm *DockerManager) PullImage(image kubecontainer.ImageSpec, secrets []api.Secret) error {
return dm.puller.Pull(image.Image, secrets)
return dm.dockerPuller.Pull(image.Image, secrets)
}
// IsImagePresent checks whether the container image is already in the local storage.
func (dm *DockerManager) IsImagePresent(image kubecontainer.ImageSpec) (bool, error) {
return dm.puller.IsImagePresent(image.Image)
return dm.dockerPuller.IsImagePresent(image.Image)
}
// Removes the specified image.
@ -1368,7 +1368,7 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubeletTypes.Doc
}
// No pod secrets for the infra container.
if err := dm.pullImage(pod, container, nil); err != nil {
if err := dm.imagePuller.PullImage(pod, container, nil); err != nil {
return "", err
}
@ -1525,33 +1525,6 @@ func (dm *DockerManager) clearReasonCache(pod *api.Pod, container *api.Container
dm.reasonCache.Remove(pod.UID, container.Name)
}
// Pull the image for the specified pod and container.
func (dm *DockerManager) pullImage(pod *api.Pod, container *api.Container, pullSecrets []api.Secret) error {
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
}
spec := kubecontainer.ImageSpec{Image: container.Image}
present, err := dm.IsImagePresent(spec)
if err != nil {
if ref != nil {
dm.recorder.Eventf(ref, "Failed", "Failed to inspect image %q: %v", container.Image, err)
}
return fmt.Errorf("failed to inspect image %q: %v", container.Image, err)
}
if !dm.runtimeHooks.ShouldPullImage(pod, container, present) {
if present && ref != nil {
dm.recorder.Eventf(ref, "Pulled", "Container image %q already present on machine", container.Image)
}
return nil
}
dm.runtimeHooks.ReportImagePulling(pod, container)
err = dm.PullImage(spec, pullSecrets)
dm.runtimeHooks.ReportImagePulled(pod, container, err)
return err
}
// Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error {
start := time.Now()
@ -1612,7 +1585,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
for idx := range containerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
glog.V(4).Infof("Creating container %+v in pod %v", container, podFullName)
err := dm.pullImage(pod, container, pullSecrets)
err := dm.imagePuller.PullImage(pod, container, pullSecrets)
dm.updateReasonCache(pod, container, err)
if err != nil {
glog.Warningf("Failed to pull image %q from pod %q and container %q: %v", container.Image, kubecontainer.GetPodFullName(pod), container.Name, err)

View File

@ -54,40 +54,6 @@ func (f *fakeHTTP) Get(url string) (*http.Response, error) {
return nil, f.err
}
// TODO: Find a better way to mock the runtime hooks so that we don't have to
// duplicate the code here.
type fakeRuntimeHooks struct {
recorder record.EventRecorder
}
var _ kubecontainer.RuntimeHooks = &fakeRuntimeHooks{}
func newFakeRuntimeHooks(recorder record.EventRecorder) kubecontainer.RuntimeHooks {
return &fakeRuntimeHooks{
recorder: recorder,
}
}
func (fr *fakeRuntimeHooks) ShouldPullImage(pod *api.Pod, container *api.Container, imagePresent bool) bool {
if container.ImagePullPolicy == api.PullNever {
return false
}
if container.ImagePullPolicy == api.PullAlways ||
(container.ImagePullPolicy == api.PullIfNotPresent && (!imagePresent)) {
return true
}
return false
}
func (fr *fakeRuntimeHooks) ReportImagePulling(pod *api.Pod, container *api.Container) {
fr.recorder.Eventf(nil, "Pulling", fmt.Sprintf("%s:%s:%s", pod.Name, container.Name, container.Image))
}
func (fr *fakeRuntimeHooks) ReportImagePulled(pod *api.Pod, container *api.Container, pullError error) {
fr.recorder.Eventf(nil, "Pulled", fmt.Sprintf("%s:%s:%s", pod.Name, container.Name, container.Image))
}
type fakeOptionGenerator struct{}
var _ kubecontainer.RunContainerOptionsGenerator = &fakeOptionGenerator{}
@ -113,7 +79,6 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage
readinessManager := kubecontainer.NewReadinessManager()
containerRefManager := kubecontainer.NewRefManager()
networkPlugin, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
runtimeHooks := newFakeRuntimeHooks(fakeRecorder)
optionGenerator := &fakeOptionGenerator{}
dockerManager := NewFakeDockerManager(
fakeDocker,
@ -126,8 +91,7 @@ func newTestDockerManagerWithHTTPClient(fakeHTTPClient *fakeHTTP) (*DockerManage
kubecontainer.FakeOS{},
networkPlugin,
optionGenerator,
fakeHTTPClient,
runtimeHooks)
fakeHTTPClient)
return dockerManager, fakeDocker
}
@ -945,7 +909,7 @@ func TestSyncPodCreateNetAndContainer(t *testing.T) {
func TestSyncPodCreatesNetAndContainerPullsImage(t *testing.T) {
dm, fakeDocker := newTestDockerManager()
dm.podInfraContainerImage = "pod_infra_image"
puller := dm.puller.(*FakeDockerPuller)
puller := dm.dockerPuller.(*FakeDockerPuller)
puller.HasImages = []string{}
dm.podInfraContainerImage = "pod_infra_image"
fakeDocker.ContainerList = []docker.APIContainers{}
@ -1306,7 +1270,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
func TestSyncPodWithPullPolicy(t *testing.T) {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
dm, fakeDocker := newTestDockerManager()
puller := dm.puller.(*FakeDockerPuller)
puller := dm.dockerPuller.(*FakeDockerPuller)
puller.HasImages = []string{"existing_one", "want:latest"}
dm.podInfraContainerImage = "pod_infra_image"
fakeDocker.ContainerList = []docker.APIContainers{}
@ -1333,22 +1297,21 @@ func TestSyncPodWithPullPolicy(t *testing.T) {
fakeDocker.Lock()
eventSet := []string{
"Pulling foo:POD:pod_infra_image",
"Pulled foo:POD:pod_infra_image",
"Pulling foo:bar:pull_always_image",
"Pulled foo:bar:pull_always_image",
"Pulling foo:bar2:pull_if_not_present_image",
"Pulled foo:bar2:pull_if_not_present_image",
`Pulled Container image "existing_one" already present on machine`,
`Pulled Container image "want:latest" already present on machine`,
`pulling Pulling image "pod_infra_image"`,
`pulled Successfully pulled image "pod_infra_image"`,
`pulling Pulling image "pull_always_image"`,
`pulled Successfully pulled image "pull_always_image"`,
`pulling Pulling image "pull_if_not_present_image"`,
`pulled Successfully pulled image "pull_if_not_present_image"`,
`pulled Container image "existing_one" already present on machine`,
`pulled Container image "want:latest" already present on machine`,
}
runtimeHooks := dm.runtimeHooks.(*fakeRuntimeHooks)
recorder := runtimeHooks.recorder.(*record.FakeRecorder)
recorder := dm.recorder.(*record.FakeRecorder)
var actualEvents []string
for _, ev := range recorder.Events {
if strings.HasPrefix(ev, "Pull") {
if strings.HasPrefix(ev, "pull") {
actualEvents = append(actualEvents, ev)
}
}
@ -1699,7 +1662,7 @@ func TestGetPodPullImageFailureReason(t *testing.T) {
dm, fakeDocker := newTestDockerManager()
// Initialize the FakeDockerPuller so that it'd try to pull non-existent
// images.
puller := dm.puller.(*FakeDockerPuller)
puller := dm.dockerPuller.(*FakeDockerPuller)
puller.HasImages = []string{}
// Inject the pull image failure error.
failureReason := "pull image faiulre"

View File

@ -300,7 +300,6 @@ func NewMainKubelet(
klet.networkPlugin,
klet,
klet.httpClient,
newKubeletRuntimeHooks(recorder),
dockerExecHandler,
oomAdjuster,
procFs)

View File

@ -45,7 +45,7 @@ func newPod(uid, name string) *api.Pod {
func createFakeRuntimeCache(fakeRecorder *record.FakeRecorder) kubecontainer.RuntimeCache {
fakeDocker := &dockertools.FakeDockerClient{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, newKubeletRuntimeHooks(fakeRecorder))
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil)
return kubecontainer.NewFakeRuntimeCache(dockerManager)
}
@ -225,7 +225,7 @@ func TestFakePodWorkers(t *testing.T) {
fakeDocker := &dockertools.FakeDockerClient{}
fakeRecorder := &record.FakeRecorder{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil, newKubeletRuntimeHooks(fakeRecorder))
dockerManager := dockertools.NewFakeDockerManager(fakeDocker, fakeRecorder, nil, nil, &cadvisorApi.MachineInfo{}, dockertools.PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, nil, nil)
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockerManager)
kubeletForRealWorkers := &simpleFakeKubelet{}

View File

@ -162,8 +162,7 @@ func TestRunOnce(t *testing.T) {
kubecontainer.FakeOS{},
kb.networkPlugin,
kb,
nil,
newKubeletRuntimeHooks(kb.recorder))
nil)
pods := []*api.Pod{
{

View File

@ -1,73 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// Kubelet-specific runtime hooks.
type kubeletRuntimeHooks struct {
recorder record.EventRecorder
}
var _ kubecontainer.RuntimeHooks = &kubeletRuntimeHooks{}
func newKubeletRuntimeHooks(recorder record.EventRecorder) kubecontainer.RuntimeHooks {
return &kubeletRuntimeHooks{
recorder: recorder,
}
}
func (kr *kubeletRuntimeHooks) ShouldPullImage(pod *api.Pod, container *api.Container, imagePresent bool) bool {
if container.ImagePullPolicy == api.PullNever {
return false
}
if container.ImagePullPolicy == api.PullAlways ||
(container.ImagePullPolicy == api.PullIfNotPresent && (!imagePresent)) {
return true
}
return false
}
func (kr *kubeletRuntimeHooks) ReportImagePulled(pod *api.Pod, container *api.Container, pullError error) {
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q, container %q: '%v'", pod.Name, container.Name, err)
return
}
if pullError != nil {
kr.recorder.Eventf(ref, "Failed", "Failed to pull image %q: %v", container.Image, pullError)
} else {
kr.recorder.Eventf(ref, "Pulled", "Successfully pulled image %q", container.Image)
}
}
func (kr *kubeletRuntimeHooks) ReportImagePulling(pod *api.Pod, container *api.Container) {
ref, err := kubecontainer.GenerateContainerRef(pod, container)
if err != nil {
glog.Errorf("Couldn't make a ref to pod %q, container %q: '%v'", pod.Name, container.Name, err)
return
}
kr.recorder.Eventf(ref, "Pulling", "Pulling image %q", container.Image)
}