Merge pull request #6795 from yifan-gu/cache

kubelet/container: Replace DockerCache with RuntimeCache.
This commit is contained in:
Victor Marmol 2015-04-14 09:37:34 -07:00
commit ca9e209ccb
9 changed files with 43 additions and 177 deletions

View File

@ -42,15 +42,15 @@ type FakeRuntime struct {
}
type FakeRuntimeCache struct {
runtime Runtime
getter podsGetter
}
func NewFakeRuntimeCache(runtime Runtime) RuntimeCache {
return &FakeRuntimeCache{runtime}
func NewFakeRuntimeCache(getter podsGetter) RuntimeCache {
return &FakeRuntimeCache{getter}
}
func (f *FakeRuntimeCache) GetPods() ([]*Pod, error) {
return f.runtime.GetPods(false)
return f.getter.GetPods(false)
}
func (f *FakeRuntimeCache) ForceUpdateIfOlder(time.Time) error {

View File

@ -32,24 +32,24 @@ type RuntimeCache interface {
ForceUpdateIfOlder(time.Time) error
}
// TODO(yifan): This interface can be removed once docker manager has implemented
// all the runtime interfaces, (thus we can pass the runtime directly).
type podsGetter interface {
GetPods(bool) ([]*Pod, error)
}
// NewRuntimeCache creates a container runtime cache.
func NewRuntimeCache(runtime Runtime) (RuntimeCache, error) {
pods, err := runtime.GetPods(false)
if err != nil {
return nil, err
}
func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
return &runtimeCache{
runtime: runtime,
cacheTime: time.Now(),
pods: pods,
updating: false,
getter: getter,
updating: false,
}, nil
}
type runtimeCache struct {
sync.Mutex
// The underlying container runtime used to update the cache.
runtime Runtime
getter podsGetter
// Last time when cache was updated.
cacheTime time.Time
// The content of the cache.
@ -90,7 +90,7 @@ func (r *runtimeCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error
}
func (r *runtimeCache) updateCache() error {
pods, err := r.runtime.GetPods(false)
pods, err := r.getter.GetPods(false)
if err != nil {
return err
}
@ -105,7 +105,7 @@ func (r *runtimeCache) startUpdatingCache() {
run := true
for run {
time.Sleep(defaultUpdateInterval)
pods, err := r.runtime.GetPods(false)
pods, err := r.getter.GetPods(false)
cacheTime := time.Now()
if err != nil {
continue

View File

@ -1,115 +0,0 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockertools
import (
"sync"
"time"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
)
type DockerCache interface {
GetPods() ([]*kubecontainer.Pod, error)
ForceUpdateIfOlder(time.Time) error
}
type podsGetter interface {
GetPods(bool) ([]*kubecontainer.Pod, error)
}
func NewDockerCache(getter podsGetter) (DockerCache, error) {
return &dockerCache{
getter: getter,
updatingCache: false,
}, nil
}
// dockerCache is a default implementation of DockerCache interface
// TODO(yifan): Use runtime cache to replace this.
type dockerCache struct {
// The narrowed interface for updating the cache.
getter podsGetter
// Mutex protecting all of the following fields.
lock sync.Mutex
// Last time when cache was updated.
cacheTime time.Time
// The content of the cache.
pods []*kubecontainer.Pod
// Whether the background thread updating the cache is running.
updatingCache bool
// Time when the background thread should be stopped.
updatingThreadStopTime time.Time
}
// Ensure that dockerCache abides by the DockerCache interface.
var _ DockerCache = new(dockerCache)
func (d *dockerCache) GetPods() ([]*kubecontainer.Pod, error) {
d.lock.Lock()
defer d.lock.Unlock()
if time.Since(d.cacheTime) > 2*time.Second {
pods, err := d.getter.GetPods(false)
if err != nil {
return pods, err
}
d.pods = pods
d.cacheTime = time.Now()
}
// Stop refreshing thread if there were no requests within last 2 seconds.
d.updatingThreadStopTime = time.Now().Add(time.Duration(2) * time.Second)
if !d.updatingCache {
d.updatingCache = true
go d.startUpdatingCache()
}
return d.pods, nil
}
func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error {
d.lock.Lock()
defer d.lock.Unlock()
if d.cacheTime.Before(minExpectedCacheTime) {
pods, err := d.getter.GetPods(false)
if err != nil {
return err
}
d.pods = pods
d.cacheTime = time.Now()
}
return nil
}
func (d *dockerCache) startUpdatingCache() {
run := true
for run {
time.Sleep(100 * time.Millisecond)
pods, err := d.getter.GetPods(false)
cacheTime := time.Now()
if err != nil {
continue
}
d.lock.Lock()
if time.Now().After(d.updatingThreadStopTime) {
d.updatingCache = false
run = false
}
d.pods = pods
d.cacheTime = cacheTime
d.lock.Unlock()
}
}

View File

@ -22,9 +22,7 @@ import (
"reflect"
"sort"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/fsouza/go-dockerclient"
)
@ -333,19 +331,3 @@ func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) {
}
return false, nil
}
type FakeDockerCache struct {
getter podsGetter
}
func NewFakeDockerCache(getter podsGetter) DockerCache {
return &FakeDockerCache{getter: getter}
}
func (f *FakeDockerCache) GetPods() ([]*container.Pod, error) {
return f.getter.GetPods(false)
}
func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error {
return nil
}

View File

@ -232,14 +232,14 @@ func NewMainKubelet(
klet.podManager = newBasicPodManager(klet.kubeClient)
dockerCache, err := dockertools.NewDockerCache(containerManager)
runtimeCache, err := kubecontainer.NewRuntimeCache(containerManager)
if err != nil {
return nil, err
}
klet.dockerCache = dockerCache
klet.podWorkers = newPodWorkers(dockerCache, klet.syncPod, recorder)
klet.runtimeCache = runtimeCache
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder)
metrics.Register(dockerCache)
metrics.Register(runtimeCache)
if err = klet.setupDataDirs(); err != nil {
return nil, err
@ -274,7 +274,7 @@ type nodeLister interface {
type Kubelet struct {
hostname string
dockerClient dockertools.DockerInterface
dockerCache dockertools.DockerCache
runtimeCache kubecontainer.RuntimeCache
kubeClient client.Interface
rootDirectory string
podWorkers *podWorkers
@ -1396,7 +1396,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
var err error
desiredPods := make(map[types.UID]empty)
runningPods, err := kl.dockerCache.GetPods()
runningPods, err := kl.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err

View File

@ -103,9 +103,9 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.podManager = podManager
kubelet.containerRefManager = kubecontainer.NewRefManager()
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0)
kubelet.dockerCache = dockertools.NewFakeDockerCache(kubelet.containerManager)
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerManager)
kubelet.podWorkers = newPodWorkers(
kubelet.dockerCache,
kubelet.runtimeCache,
func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error {
err := kubelet.syncPod(pod, mirrorPod, runningPod)
waitGroup.Done()

View File

@ -20,7 +20,7 @@ import (
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
)
@ -70,7 +70,7 @@ var (
var registerMetrics sync.Once
// Register all metrics.
func Register(containerCache dockertools.DockerCache) {
func Register(containerCache kubecontainer.RuntimeCache) {
// Register the metrics.
registerMetrics.Do(func() {
prometheus.MustRegister(ImagePullLatency)
@ -108,7 +108,7 @@ func SinceInMicroseconds(start time.Time) float64 {
return float64(time.Since(start).Nanoseconds() / time.Microsecond.Nanoseconds())
}
func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAndContainerCollector {
func newPodAndContainerCollector(containerCache kubecontainer.RuntimeCache) *podAndContainerCollector {
return &podAndContainerCollector{
containerCache: containerCache,
}
@ -117,7 +117,7 @@ func newPodAndContainerCollector(containerCache dockertools.DockerCache) *podAnd
// Custom collector for current pod and container counts.
type podAndContainerCollector struct {
// Cache for accessing information about running containers.
containerCache dockertools.DockerCache
containerCache kubecontainer.RuntimeCache
}
// TODO(vmarmol): Split by source?

View File

@ -22,14 +22,13 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/golang/glog"
)
type syncPodFnType func(*api.Pod, *api.Pod, container.Pod) error
type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod) error
type podWorkers struct {
// Protects all per worker fields.
@ -45,8 +44,8 @@ type podWorkers struct {
// Tracks the last undelivered work item for this pod - a work item is
// undelivered if it comes in while the worker is working.
lastUndeliveredWorkUpdate map[types.UID]workUpdate
// DockerCache is used for listing running containers.
dockerCache dockertools.DockerCache
// runtimeCache is used for listing running containers.
runtimeCache kubecontainer.RuntimeCache
// This function is run to sync the desired stated of pod.
// NOTE: This function has to be thread-safe - it can be called for
@ -68,43 +67,43 @@ type workUpdate struct {
updateCompleteFn func()
}
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType,
func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
recorder record.EventRecorder) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
dockerCache: dockerCache,
runtimeCache: runtimeCache,
syncPodFn: syncPodFn,
recorder: recorder,
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minDockerCacheTime time.Time
var minRuntimeCacheTime time.Time
for newWork := range podUpdates {
func() {
defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)
// We would like to have the state of Docker from at least the moment
// when we finished the previous processing of that pod.
if err := p.dockerCache.ForceUpdateIfOlder(minDockerCacheTime); err != nil {
if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {
glog.Errorf("Error updating docker cache: %v", err)
return
}
pods, err := p.dockerCache.GetPods()
pods, err := p.runtimeCache.GetPods()
if err != nil {
glog.Errorf("Error getting pods while syncing pod: %v", err)
return
}
err = p.syncPodFn(newWork.pod, newWork.mirrorPod,
container.Pods(pods).FindPodByID(newWork.pod.UID))
kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID))
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
return
}
minDockerCacheTime = time.Now()
minRuntimeCacheTime = time.Now()
newWork.updateCompleteFn()
}()

View File

@ -23,7 +23,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
kubecontainer "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
)
@ -40,14 +40,14 @@ func newPod(uid, name string) *api.Pod {
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
fakeDocker := &dockertools.FakeDockerClient{}
fakeRecorder := &record.FakeRecorder{}
fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0))
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0))
lock := sync.Mutex{}
processed := make(map[types.UID][]string)
podWorkers := newPodWorkers(
fakeDockerCache,
func(pod *api.Pod, mirrorPod *api.Pod, runningPod container.Pod) error {
fakeRuntimeCache,
func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod) error {
func() {
lock.Lock()
defer lock.Unlock()