kubelet/container: Replace DockerCache with RuntimeCache.

This commit is contained in:
Yifan Gu 2015-04-13 18:04:11 -07:00
parent a5e6bea9b5
commit e1feed9a8b
8 changed files with 29 additions and 163 deletions

View File

@ -32,8 +32,8 @@ type RuntimeCache interface {
ForceUpdateIfOlder(time.Time) error ForceUpdateIfOlder(time.Time) error
} }
// TODO(yifan): The duplication of this type (another definition is in dockertools) // TODO(yifan): This interface can be removed once docker manager has implemented
// will be resolved when we removed the docker cache. // all the runtime interfaces, (thus we can pass the runtime directly).
type podsGetter interface { type podsGetter interface {
GetPods(bool) ([]*Pod, error) GetPods(bool) ([]*Pod, error)
} }

View File

@ -1,115 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockertools
import (
"sync"
"time"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
)
type DockerCache interface {
GetPods() ([]*kubecontainer.Pod, error)
ForceUpdateIfOlder(time.Time) error
}
type podsGetter interface {
GetPods(bool) ([]*kubecontainer.Pod, error)
}
func NewDockerCache(getter podsGetter) (DockerCache, error) {
return &dockerCache{
getter: getter,
updatingCache: false,
}, nil
}
// dockerCache is a default implementation of DockerCache interface
// TODO(yifan): Use runtime cache to replace this.
type dockerCache struct {
// The narrowed interface for updating the cache.
getter podsGetter
// Mutex protecting all of the following fields.
lock sync.Mutex
// Last time when cache was updated.
cacheTime time.Time
// The content of the cache.
pods []*kubecontainer.Pod
// Whether the background thread updating the cache is running.
updatingCache bool
// Time when the background thread should be stopped.
updatingThreadStopTime time.Time
}
// Ensure that dockerCache abides by the DockerCache interface.
var _ DockerCache = new(dockerCache)
func (d *dockerCache) GetPods() ([]*kubecontainer.Pod, error) {
d.lock.Lock()
defer d.lock.Unlock()
if time.Since(d.cacheTime) > 2*time.Second {
pods, err := d.getter.GetPods(false)
if err != nil {
return pods, err
}
d.pods = pods
d.cacheTime = time.Now()
}
// Stop refreshing thread if there were no requests within last 2 seconds.
d.updatingThreadStopTime = time.Now().Add(time.Duration(2) * time.Second)
if !d.updatingCache {
d.updatingCache = true
go d.startUpdatingCache()
}
return d.pods, nil
}
func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.cacheTime.Before(minExpectedCacheTime) {
pods, err := d.getter.GetPods(false)
if err != nil {
return err
}
d.pods = pods
d.cacheTime = time.Now()
}
return nil
}
func (d *dockerCache) startUpdatingCache() {
run := true
for run {
time.Sleep(100 * time.Millisecond)
pods, err := d.getter.GetPods(false)
cacheTime := time.Now()
if err != nil {
continue
}
d.lock.Lock()
if time.Now().After(d.updatingThreadStopTime) {
d.updatingCache = false
run = false
}
d.pods = pods
d.cacheTime = cacheTime
d.lock.Unlock()
}
}

View File

@ -22,9 +22,7 @@ import (
"reflect" "reflect"
"sort" "sort"
"sync" "sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient" "github.com/fsouza/go-dockerclient"
) )
@ -331,19 +329,3 @@ func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) {
} }
return false, nil return false, nil
} }
type FakeDockerCache struct {
getter podsGetter
}
func NewFakeDockerCache(getter podsGetter) DockerCache {
return &FakeDockerCache{getter: getter}
}
func (f *FakeDockerCache) GetPods() ([]*container.Pod, error) {
return f.getter.GetPods(false)
}
func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error {
return nil
}

View File

@ -232,14 +232,14 @@ func NewMainKubelet(
klet.podManager = newBasicPodManager(klet.kubeClient) klet.podManager = newBasicPodManager(klet.kubeClient)
dockerCache, err := dockertools.NewDockerCache(containerManager) runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager)
if err != nil { if err != nil {
return nil, err return nil, err
} }
klet.dockerCache = dockerCache klet.runtimeCache = runtimeCache
klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod, recorder) klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder)
metrics.Register(dockerCache) metrics.Register(runtimeCache)
if err = klet.setupDataDirs(); err != nil { if err = klet.setupDataDirs(); err != nil {
return nil, err return nil, err
@ -274,7 +274,7 @@ type nodeLister interface {
type Kubelet struct { type Kubelet struct {
hostname string hostname string
dockerClient dockertools.DockerInterface dockerClient dockertools.DockerInterface
dockerCache dockertools.DockerCache runtimeCache kubecontainer.RuntimeCache
kubeClient client.Interface kubeClient client.Interface
rootDirectory string rootDirectory string
podWorkers *podWorkers podWorkers *podWorkers
@ -1406,7 +1406,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
var err error var err error
desiredPods := make(map[types.UID]empty) desiredPods := make(map[types.UID]empty)
runningPods, err := kl.dockerCache.GetPods() runningPods, err := kl.runtimeCache.GetPods()
if err != nil { if err != nil {
glog.Errorf("Error listing containers: %#v", err) glog.Errorf("Error listing containers: %#v", err)
return err return err

View File

@ -104,9 +104,9 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.podManager = podManager kubelet.podManager = podManager
kubelet.containerRefManager = kubecontainer.NewRefManager() kubelet.containerRefManager = kubecontainer.NewRefManager()
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0) kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0)
kubelet.dockerCache = dockertools.NewFakeDockerCache(kubelet.containerManager) kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager)
kubelet.podWorkers = newPodWorkers( kubelet.podWorkers = newPodWorkers(
kubelet.dockerCache, kubelet.runtimeCache,
func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error { func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error {
err := kubelet.syncPod(pod, mirrorPod, runningPod) err := kubelet.syncPod(pod, mirrorPod, runningPod)
waitGroup.Done() waitGroup.Done()

View File

@ -20,7 +20,7 @@ import (
"sync" "sync"
"time" "time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/golang/glog" "github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -70,7 +70,7 @@ var (
var registerMetrics sync.Once var registerMetrics sync.Once
// Register all metrics. // Register all metrics.
func Register(containerCache dockertools.DockerCache) { func Register(containerCache kubecontainer.RuntimeCache) {
// Register the metrics. // Register the metrics.
registerMetrics.Do(func() { registerMetrics.Do(func() {
prometheus.MustRegister(ImagePullLatency) prometheus.MustRegister(ImagePullLatency)
@ -108,7 +108,7 @@ func SinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds()) return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
} }
func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAndContainerCollector { func newPodAndContainerCollector(containerCache kubecontainer.RuntimeCache) *podAndContainerCollector {
return &podAndContainerCollector{ return &podAndContainerCollector{
containerCache: containerCache, containerCache: containerCache,
} }
@ -117,7 +117,7 @@ func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAnd
// Custom collector for current pod and container counts. // Custom collector for current pod and container counts.
type podAndContainerCollector struct { type podAndContainerCollector struct {
// Cache for accessing information about running containers. // Cache for accessing information about running containers.
containerCache dockertools.DockerCache containerCache kubecontainer.RuntimeCache
} }
// TODO(vmarmol): Split by source? // TODO(vmarmol): Split by source?

View File

@ -22,14 +22,13 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog" "github.com/golang/glog"
) )
type syncPodFnType func(*api.Pod, *api.Pod, container.Pod) error type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod) error
type podWorkers struct { type podWorkers struct {
// Protects all per worker fields. // Protects all per worker fields.
@ -45,8 +44,8 @@ type podWorkers struct {
// Tracks the last undelivered work item for this pod - a work item is // Tracks the last undelivered work item for this pod - a work item is
// undelivered if it comes in while the worker is working. // undelivered if it comes in while the worker is working.
lastUndeliveredWorkUpdate map[types.UID]workUpdate lastUndeliveredWorkUpdate map[types.UID]workUpdate
// DockerCache is used for listing running containers. // runtimeCache is used for listing running containers.
dockerCache dockertools.DockerCache runtimeCache kubecontainer.RuntimeCache
// This function is run to sync the desired stated of pod. // This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for // NOTE: This function has to be thread-safe - it can be called for
@ -68,43 +67,43 @@ type workUpdate struct {
updateCompleteFn func() updateCompleteFn func()
} }
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
recorder record.EventRecorder) *podWorkers { recorder record.EventRecorder) *podWorkers {
return &podWorkers{ return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{}, podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{}, isWorking: map[types.UID]bool{},
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{}, lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
dockerCache: dockerCache, runtimeCache: runtimeCache,
syncPodFn: syncPodFn, syncPodFn: syncPodFn,
recorder: recorder, recorder: recorder,
} }
} }
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minDockerCacheTime time.Time var minRuntimeCacheTime time.Time
for newWork := range podUpdates { for newWork := range podUpdates {
func() { func() {
defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn) defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)
// We would like to have the state of Docker from at least the moment // We would like to have the state of Docker from at least the moment
// when we finished the previous processing of that pod. // when we finished the previous processing of that pod.
if err := p.dockerCache.ForceUpdateIfOlder(minDockerCacheTime); err != nil { if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {
glog.Errorf("Error updating docker cache: %v", err) glog.Errorf("Error updating docker cache: %v", err)
return return
} }
pods, err := p.dockerCache.GetPods() pods, err := p.runtimeCache.GetPods()
if err != nil { if err != nil {
glog.Errorf("Error getting pods while syncing pod: %v", err) glog.Errorf("Error getting pods while syncing pod: %v", err)
return return
} }
err = p.syncPodFn(newWork.pod, newWork.mirrorPod, err = p.syncPodFn(newWork.pod, newWork.mirrorPod,
container.Pods(pods).FindPodByID(newWork.pod.UID)) kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID))
if err != nil { if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
return return
} }
minDockerCacheTime = time.Now() minRuntimeCacheTime = time.Now()
newWork.updateCompleteFn() newWork.updateCompleteFn()
}() }()

View File

@ -23,7 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container" kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types" "github.com/GoogleCloudPlatform/kubernetes/pkg/types"
) )
@ -40,14 +40,14 @@ func newPod(uid, name string) *api.Pod {
func createPodWorkers() (*podWorkers, map[types.UID][]string) { func createPodWorkers() (*podWorkers, map[types.UID][]string) {
fakeDocker := &dockertools.FakeDockerClient{} fakeDocker := &dockertools.FakeDockerClient{}
fakeRecorder := &record.FakeRecorder{} fakeRecorder := &record.FakeRecorder{}
fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0)) fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0))
lock := sync.Mutex{} lock := sync.Mutex{}
processed := make(map[types.UID][]string) processed := make(map[types.UID][]string)
podWorkers := newPodWorkers( podWorkers := newPodWorkers(
fakeDockerCache, fakeRuntimeCache,
func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error { func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error {
func() { func() {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()