diff --git a/pkg/kubelet/dockertools/docker.go b/pkg/kubelet/dockertools/docker.go index aef4ebe6580..ad3e58934cc 100644 --- a/pkg/kubelet/dockertools/docker.go +++ b/pkg/kubelet/dockertools/docker.go @@ -96,8 +96,8 @@ type throttledDockerPuller struct { limiter util.RateLimiter } -// NewDockerPuller creates a new instance of the default implementation of DockerPuller. -func NewDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller { +// newDockerPuller creates a new instance of the default implementation of DockerPuller. +func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller { dp := dockerPuller{ client: client, keyring: credentialprovider.NewDockerKeyring(), diff --git a/pkg/kubelet/dockertools/docker_test.go b/pkg/kubelet/dockertools/docker_test.go index 30eae08a2e7..dec82f3bce0 100644 --- a/pkg/kubelet/dockertools/docker_test.go +++ b/pkg/kubelet/dockertools/docker_test.go @@ -396,7 +396,7 @@ func TestIsImagePresent(t *testing.T) { func TestGetRunningContainers(t *testing.T) { fakeDocker := &FakeDockerClient{Errors: make(map[string]error)} fakeRecorder := &record.FakeRecorder{} - containerManager := NewDockerManager(fakeDocker, fakeRecorder, PodInfraContainerImage) + containerManager := NewDockerManager(fakeDocker, fakeRecorder, PodInfraContainerImage, 0, 0) tests := []struct { containers map[string]*docker.Container inputIDs []string diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 0aa2f1852b3..77ca5a4e017 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -60,18 +60,25 @@ type DockerManager struct { // means that some entries may be recycled before a pod has been // deleted. reasonCache stringCache + // TODO(yifan): We export this for testability, so when we have a fake + // container manager, then we can unexport this. Also at that time, we + // use the concrete type so that we can record the pull failure and eliminate + // the image checking in GetPodStatus(). + Puller DockerPuller } // Ensures DockerManager implements ConatinerRunner. var _ kubecontainer.ContainerRunner = new(DockerManager) -func NewDockerManager(client DockerInterface, recorder record.EventRecorder, podInfraContainerImage string) *DockerManager { +func NewDockerManager(client DockerInterface, recorder record.EventRecorder, podInfraContainerImage string, qps float32, burst int) *DockerManager { reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)} return &DockerManager{ client: client, recorder: recorder, PodInfraContainerImage: podInfraContainerImage, - reasonCache: reasonCache} + reasonCache: reasonCache, + Puller: newDockerPuller(client, qps, burst), + } } // A cache which stores strings keyed by _. @@ -569,3 +576,11 @@ func (self *DockerManager) GetPods(all bool) ([]*kubecontainer.Pod, error) { } return result, nil } + +func (self *DockerManager) Pull(image string) error { + return self.Puller.Pull(image) +} + +func (self *DockerManager) IsImagePresent(image string) (bool, error) { + return self.Puller.IsImagePresent(image) +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index bb57ebd8d06..e22b3d1d1ad 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -199,7 +199,7 @@ func NewMainKubelet( return nil, fmt.Errorf("failed to initialize image manager: %v", err) } statusManager := newStatusManager(kubeClient) - containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage) + containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst) klet := &Kubelet{ hostname: hostname, @@ -211,8 +211,6 @@ func NewMainKubelet( readinessManager: kubecontainer.NewReadinessManager(), runner: dockertools.NewDockerContainerCommandRunner(dockerClient), httpClient: &http.Client{}, - pullQPS: pullQPS, - pullBurst: pullBurst, sourcesReady: sourcesReady, clusterDomain: clusterDomain, clusterDNS: clusterDNS, @@ -289,18 +287,12 @@ type Kubelet struct { // Tracks references for reporting events containerRefManager *kubecontainer.RefManager - // Optional, defaults to simple Docker implementation - dockerPuller dockertools.DockerPuller // Optional, defaults to /logs/ from /var/log logServer http.Handler // Optional, defaults to simple Docker implementation runner dockertools.ContainerCommandRunner // Optional, client for http requests, defaults to empty client httpClient httpGetter - // Optional, maximum pull QPS from the docker registry, 0.0 means unlimited. - pullQPS float32 - // Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0 - pullBurst int // cAdvisor used for container information. cadvisor cadvisor.Interface @@ -541,9 +533,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { if kl.logServer == nil { kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) } - if kl.dockerPuller == nil { - kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) - } if kl.kubeClient == nil { glog.Warning("No api server defined - no node status update will be sent.") } @@ -877,7 +866,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.Pod) (dockertools.DockerID, glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } // TODO: make this a TTL based pull (if image older than X policy, pull) - ok, err := kl.dockerPuller.IsImagePresent(container.Image) + ok, err := kl.containerManager.IsImagePresent(container.Image) if err != nil { if ref != nil { kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err) @@ -919,7 +908,7 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error { metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start)) }() - if err := kl.dockerPuller.Pull(img); err != nil { + if err := kl.containerManager.Pull(img); err != nil { if ref != nil { kl.recorder.Eventf(ref, "failed", "Failed to pull image %q: %v", img, err) } @@ -1033,7 +1022,7 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Contain glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err) } if container.ImagePullPolicy != api.PullNever { - present, err := kl.dockerPuller.IsImagePresent(container.Image) + present, err := kl.containerManager.IsImagePresent(container.Image) if err != nil { if ref != nil { kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err) diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 8c33813b335..e279594bca4 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -73,11 +73,10 @@ func newTestKubelet(t *testing.T) *TestKubelet { fakeRecorder := &record.FakeRecorder{} fakeKubeClient := &testclient.Fake{} - kubelet := &Kubelet{} kubelet.dockerClient = fakeDocker kubelet.kubeClient = fakeKubeClient - kubelet.dockerPuller = &dockertools.FakeDockerPuller{} + kubelet.hostname = "testnode" kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil { @@ -104,7 +103,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { podManager, fakeMirrorClient := newFakePodManager() kubelet.podManager = podManager kubelet.containerRefManager = kubecontainer.NewRefManager() - kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage) + kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0) kubelet.dockerCache = dockertools.NewFakeDockerCache(kubelet.containerManager) kubelet.podWorkers = newPodWorkers( kubelet.dockerCache, @@ -114,6 +113,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { return err }, fakeRecorder) + kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{} return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient} } @@ -593,7 +593,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker waitGroup := testKubelet.waitGroup - puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) + puller := kubelet.containerManager.Puller.(*dockertools.FakeDockerPuller) puller.HasImages = []string{} kubelet.containerManager.PodInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} @@ -1249,7 +1249,6 @@ func TestGetRootInfo(t *testing.T) { kubelet := Kubelet{ dockerClient: &fakeDocker, - dockerPuller: &dockertools.FakeDockerPuller{}, cadvisor: mockCadvisor, } @@ -1652,7 +1651,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) { kubelet := testKubelet.kubelet fakeDocker := testKubelet.fakeDocker waitGroup := testKubelet.waitGroup - puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) + puller := kubelet.containerManager.Puller.(*dockertools.FakeDockerPuller) puller.HasImages = []string{"existing_one", "want:latest"} kubelet.containerManager.PodInfraContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 5a25e5a3e8a..83f6b7c4664 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -40,7 +40,7 @@ func newPod(uid, name string) *api.Pod { func createPodWorkers() (*podWorkers, map[types.UID][]string) { fakeDocker := &dockertools.FakeDockerClient{} fakeRecorder := &record.FakeRecorder{} - fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage)) + fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0)) lock := sync.Mutex{} processed := make(map[types.UID][]string) diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index be9458a1f84..67f53de042e 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -22,7 +22,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/golang/glog" ) @@ -53,9 +52,6 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) { // runOnce runs a given set of pods and returns their status. func (kl *Kubelet) runOnce(pods []api.Pod, retryDelay time.Duration) (results []RunPodResult, err error) { - if kl.dockerPuller == nil { - kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) - } kl.handleNotFittingPods(pods) ch := make(chan RunPodResult) diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index c7ad3dc86c8..75ddab52b69 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -144,8 +144,9 @@ func TestRunOnce(t *testing.T) { }, t: t, } - kb.dockerPuller = &dockertools.FakeDockerPuller{} - kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder, dockertools.PodInfraContainerImage) + + kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder, dockertools.PodInfraContainerImage, 0, 0) + kb.containerManager.Puller = &dockertools.FakeDockerPuller{} pods := []api.Pod{ {