diff --git a/pkg/kubelet/container/runtime_cache.go b/pkg/kubelet/container/runtime_cache.go index 0c02e0a4b58..b5e860cb2a5 100644 --- a/pkg/kubelet/container/runtime_cache.go +++ b/pkg/kubelet/container/runtime_cache.go @@ -32,8 +32,8 @@ type RuntimeCache interface { ForceUpdateIfOlder(time.Time) error } -// TODO(yifan): The duplication of this type (another definition is in dockertools) -// will be resolved when we removed the docker cache. +// TODO(yifan): This interface can be removed once docker manager has implemented +// all the runtime interfaces, (thus we can pass the runtime directly). type podsGetter interface { GetPods(bool) ([]*Pod, error) } diff --git a/pkg/kubelet/dockertools/docker_cache.go b/pkg/kubelet/dockertools/docker_cache.go deleted file mode 100644 index 2bc9b1b4aca..00000000000 --- a/pkg/kubelet/dockertools/docker_cache.go +++ /dev/null @@ -1,115 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package dockertools - -import ( - "sync" - "time" - - kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" -) - -type DockerCache interface { - GetPods() ([]*kubecontainer.Pod, error) - ForceUpdateIfOlder(time.Time) error -} - -type podsGetter interface { - GetPods(bool) ([]*kubecontainer.Pod, error) -} - -func NewDockerCache(getter podsGetter) (DockerCache, error) { - return &dockerCache{ - getter: getter, - updatingCache: false, - }, nil -} - -// dockerCache is a default implementation of DockerCache interface -// TODO(yifan): Use runtime cache to replace this. -type dockerCache struct { - // The narrowed interface for updating the cache. - getter podsGetter - // Mutex protecting all of the following fields. - lock sync.Mutex - // Last time when cache was updated. - cacheTime time.Time - // The content of the cache. - pods []*kubecontainer.Pod - // Whether the background thread updating the cache is running. - updatingCache bool - // Time when the background thread should be stopped. - updatingThreadStopTime time.Time -} - -// Ensure that dockerCache abides by the DockerCache interface. -var _ DockerCache = new(dockerCache) - -func (d *dockerCache) GetPods() ([]*kubecontainer.Pod, error) { - d.lock.Lock() - defer d.lock.Unlock() - if time.Since(d.cacheTime) > 2*time.Second { - pods, err := d.getter.GetPods(false) - if err != nil { - return pods, err - } - d.pods = pods - d.cacheTime = time.Now() - } - // Stop refreshing thread if there were no requests within last 2 seconds. - d.updatingThreadStopTime = time.Now().Add(time.Duration(2) * time.Second) - if !d.updatingCache { - d.updatingCache = true - go d.startUpdatingCache() - } - return d.pods, nil -} - -func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error { - d.lock.Lock() - defer d.lock.Unlock() - if d.cacheTime.Before(minExpectedCacheTime) { - pods, err := d.getter.GetPods(false) - if err != nil { - return err - } - d.pods = pods - d.cacheTime = time.Now() - } - return nil -} - -func (d *dockerCache) startUpdatingCache() { - run := true - for run { - time.Sleep(100 * time.Millisecond) - pods, err := d.getter.GetPods(false) - cacheTime := time.Now() - if err != nil { - continue - } - - d.lock.Lock() - if time.Now().After(d.updatingThreadStopTime) { - d.updatingCache = false - run = false - } - d.pods = pods - d.cacheTime = cacheTime - d.lock.Unlock() - } -} diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index c0df853a741..e67365f08ed 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -22,9 +22,7 @@ import ( "reflect" "sort" "sync" - "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/fsouza/go-dockerclient" ) @@ -331,19 +329,3 @@ func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) { } return false, nil } - -type FakeDockerCache struct { - getter podsGetter -} - -func NewFakeDockerCache(getter podsGetter) DockerCache { - return &FakeDockerCache{getter: getter} -} - -func (f *FakeDockerCache) GetPods() ([]*container.Pod, error) { - return f.getter.GetPods(false) -} - -func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error { - return nil -} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e22b3d1d1ad..2f36d6682da 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -232,14 +232,14 @@ func NewMainKubelet( klet.podManager = newBasicPodManager(klet.kubeClient) - dockerCache, err := dockertools.NewDockerCache(containerManager) + runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager) if err != nil { return nil, err } - klet.dockerCache = dockerCache - klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod, recorder) + klet.runtimeCache = runtimeCache + klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder) - metrics.Register(dockerCache) + metrics.Register(runtimeCache) if err = klet.setupDataDirs(); err != nil { return nil, err @@ -274,7 +274,7 @@ type nodeLister interface { type Kubelet struct { hostname string dockerClient dockertools.DockerInterface - dockerCache dockertools.DockerCache + runtimeCache kubecontainer.RuntimeCache kubeClient client.Interface rootDirectory string podWorkers *podWorkers @@ -1406,7 +1406,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric var err error desiredPods := make(map[types.UID]empty) - runningPods, err := kl.dockerCache.GetPods() + runningPods, err := kl.runtimeCache.GetPods() if err != nil { glog.Errorf("Error listing containers: %#v", err) return err diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index e279594bca4..739ec86ece0 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -104,9 +104,9 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.podManager = podManager kubelet.containerRefManager = kubecontainer.NewRefManager() kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0) - kubelet.dockerCache = dockertools.NewFakeDockerCache(kubelet.containerManager) + kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager) kubelet.podWorkers = newPodWorkers( - kubelet.dockerCache, + kubelet.runtimeCache, func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error { err := kubelet.syncPod(pod, mirrorPod, runningPod) waitGroup.Done() diff --git a/pkg/kubelet/metrics/metrics.go b/pkg/kubelet/metrics/metrics.go index 70b626a8d3d..c4debd45b7e 100644 --- a/pkg/kubelet/metrics/metrics.go +++ b/pkg/kubelet/metrics/metrics.go @@ -20,7 +20,7 @@ import ( "sync" "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/golang/glog" "github.com/prometheus/client_golang/prometheus" ) @@ -70,7 +70,7 @@ var ( var registerMetrics sync.Once // Register all metrics. -func Register(containerCache dockertools.DockerCache) { +func Register(containerCache kubecontainer.RuntimeCache) { // Register the metrics. registerMetrics.Do(func() { prometheus.MustRegister(ImagePullLatency) @@ -108,7 +108,7 @@ func SinceInMicroseconds(start time.Time) float64 { return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) } -func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAndContainerCollector { +func newPodAndContainerCollector(containerCache kubecontainer.RuntimeCache) *podAndContainerCollector { return &podAndContainerCollector{ containerCache: containerCache, } @@ -117,7 +117,7 @@ func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAnd // Custom collector for current pod and container counts. type podAndContainerCollector struct { // Cache for accessing information about running containers. - containerCache dockertools.DockerCache + containerCache kubecontainer.RuntimeCache } // TODO(vmarmol): Split by source? diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 9c980dac448..8828307d7cd 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -22,14 +22,13 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" ) -type syncPodFnType func(*api.Pod, *api.Pod, container.Pod) error +type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod) error type podWorkers struct { // Protects all per worker fields. @@ -45,8 +44,8 @@ type podWorkers struct { // Tracks the last undelivered work item for this pod - a work item is // undelivered if it comes in while the worker is working. lastUndeliveredWorkUpdate map[types.UID]workUpdate - // DockerCache is used for listing running containers. - dockerCache dockertools.DockerCache + // runtimeCache is used for listing running containers. + runtimeCache kubecontainer.RuntimeCache // This function is run to sync the desired stated of pod. // NOTE: This function has to be thread-safe - it can be called for @@ -68,43 +67,43 @@ type workUpdate struct { updateCompleteFn func() } -func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, +func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType, recorder record.EventRecorder) *podWorkers { return &podWorkers{ podUpdates: map[types.UID]chan workUpdate{}, isWorking: map[types.UID]bool{}, lastUndeliveredWorkUpdate: map[types.UID]workUpdate{}, - dockerCache: dockerCache, + runtimeCache: runtimeCache, syncPodFn: syncPodFn, recorder: recorder, } } func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { - var minDockerCacheTime time.Time + var minRuntimeCacheTime time.Time for newWork := range podUpdates { func() { defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn) // We would like to have the state of Docker from at least the moment // when we finished the previous processing of that pod. - if err := p.dockerCache.ForceUpdateIfOlder(minDockerCacheTime); err != nil { + if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil { glog.Errorf("Error updating docker cache: %v", err) return } - pods, err := p.dockerCache.GetPods() + pods, err := p.runtimeCache.GetPods() if err != nil { glog.Errorf("Error getting pods while syncing pod: %v", err) return } err = p.syncPodFn(newWork.pod, newWork.mirrorPod, - container.Pods(pods).FindPodByID(newWork.pod.UID)) + kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID)) if err != nil { glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) return } - minDockerCacheTime = time.Now() + minRuntimeCacheTime = time.Now() newWork.updateCompleteFn() }() diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index 83f6b7c4664..c9199849648 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -23,7 +23,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" - "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" + kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/types" ) @@ -40,14 +40,14 @@ func newPod(uid, name string) *api.Pod { func createPodWorkers() (*podWorkers, map[types.UID][]string) { fakeDocker := &dockertools.FakeDockerClient{} fakeRecorder := &record.FakeRecorder{} - fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0)) + fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0)) lock := sync.Mutex{} processed := make(map[types.UID][]string) podWorkers := newPodWorkers( - fakeDockerCache, - func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error { + fakeRuntimeCache, + func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error { func() { lock.Lock() defer lock.Unlock()