diff --git a/pkg/kubelet/dockertools/docker_cache.go b/pkg/kubelet/dockertools/docker_cache.go index ba01df3de2c..ed22fcb0b1c 100644 --- a/pkg/kubelet/dockertools/docker_cache.go +++ b/pkg/kubelet/dockertools/docker_cache.go @@ -23,6 +23,7 @@ import ( type DockerCache interface { RunningContainers() (DockerContainers, error) + ForceUpdateIfOlder(time.Time) error } func NewDockerCache(client DockerInterface) (DockerCache, error) { @@ -49,6 +50,9 @@ type dockerCache struct { updatingThreadStopTime time.Time } +// Ensure that dockerCache abides by the DockerCache interface. +var _ DockerCache = new(dockerCache) + func (d *dockerCache) RunningContainers() (DockerContainers, error) { d.lock.Lock() defer d.lock.Unlock() @@ -69,6 +73,20 @@ func (d *dockerCache) RunningContainers() (DockerContainers, error) { return d.containers, nil } +func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error { + d.lock.Lock() + defer d.lock.Unlock() + if d.cacheTime.Before(minExpectedCacheTime) { + containers, err := GetKubeletDockerContainers(d.client, false) + if err != nil { + return err + } + d.containers = containers + d.cacheTime = time.Now() + } + return nil +} + func (d *dockerCache) startUpdatingCache() { run := true for run { diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index d65074920ca..01d14cb2503 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -20,6 +20,7 @@ import ( "fmt" "reflect" "sync" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/fsouza/go-dockerclient" @@ -246,3 +247,7 @@ func NewFakeDockerCache(client DockerInterface) DockerCache { func (f *FakeDockerCache) RunningContainers() (DockerContainers, error) { return GetKubeletDockerContainers(f.client, false) } + +func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error { + return nil +} diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 52b526fbc3f..80c8eb59d15 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -18,6 +18,7 @@ package kubelet import ( "sync" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/record" @@ -41,6 +42,9 @@ type podWorkers struct { // Currently all update request for a given pod coming when another // update of this pod is being processed are ignored. isWorking map[types.UID]bool + // Tracks the last undelivered work item for this pod - a work item is + // undelivered if it comes in while the worker is working. + lastUndeliveredWorkUpdate map[types.UID]workUpdate // DockerCache is used for listing running containers. dockerCache dockertools.DockerCache @@ -63,22 +67,26 @@ type workUpdate struct { func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, recorder record.EventRecorder) *podWorkers { return &podWorkers{ - podUpdates: map[types.UID]chan workUpdate{}, - isWorking: map[types.UID]bool{}, - dockerCache: dockerCache, - syncPodFn: syncPodFn, - recorder: recorder, + podUpdates: map[types.UID]chan workUpdate{}, + isWorking: map[types.UID]bool{}, + lastUndeliveredWorkUpdate: map[types.UID]workUpdate{}, + dockerCache: dockerCache, + syncPodFn: syncPodFn, + recorder: recorder, } } func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { + var minDockerCacheTime time.Time for newWork := range podUpdates { - // Since we use docker cache, getting current state shouldn't cause - // performance overhead on Docker. Moreover, as long as we run syncPod - // no matter if it changes anything, having an old version of "containers" - // can cause starting eunended containers. func() { - defer p.setIsWorking(newWork.pod.UID, false) + defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn) + // We would like to have the state of Docker from at least the moment + // when we finished the previous processing of that pod. + if err := p.dockerCache.ForceUpdateIfOlder(minDockerCacheTime); err != nil { + glog.Errorf("Error updating docker cache: %v", err) + return + } containers, err := p.dockerCache.RunningContainers() if err != nil { glog.Errorf("Error listing containers while syncing pod: %v", err) @@ -91,6 +99,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err) return } + minDockerCacheTime = time.Now() newWork.updateCompleteFn() }() @@ -106,8 +115,10 @@ func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) { p.podLock.Lock() defer p.podLock.Unlock() if podUpdates, exists = p.podUpdates[uid]; !exists { - // Currently all update request for a given pod coming when another - // update of this pod is being processed are ignored. + // We need to have a buffer here, because checkForUpdates() method that + // puts an update into channel is called from the same goroutine where + // the channel is consumed. However, it is guaranteed that in such case + // the channel is empty, so buffer of size 1 is enough. podUpdates = make(chan workUpdate, 1) p.podUpdates[uid] = podUpdates go func() { @@ -115,24 +126,17 @@ func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) { p.managePodLoop(podUpdates) }() } - // TODO(wojtek-t): Consider changing to the following model: - // - add a cache of "desired" pod state - // - whenever an update of a pod comes, we update the "desired" cache - // - if per-pod goroutine is currently iddle, we send the it immediately - // to the per-pod goroutine and clear the cache; - // - when per-pod goroutine finishes processing an update it checks the - // desired cache for next update to proces - // - the crucial thing in this approach is that we don't accumulate multiple - // updates for a given pod (at any point in time there will be at most - // one update queued for a given pod, plus potentially one currently being - // processed) and additionally don't rely on the fact that an update will - // be resend (because we don't drop it) if !p.isWorking[pod.UID] { p.isWorking[pod.UID] = true podUpdates <- workUpdate{ pod: pod, updateCompleteFn: updateComplete, } + } else { + p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{ + pod: pod, + updateCompleteFn: updateComplete, + } } } @@ -143,12 +147,23 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty if _, exists := desiredPods[key]; !exists { close(channel) delete(p.podUpdates, key) + // If there is an undelivered work update for this pod we need to remove it + // since per-pod goroutine won't be able to put it to the already closed + // channel when it finish processing the current work update. + if _, cached := p.lastUndeliveredWorkUpdate[key]; cached { + delete(p.lastUndeliveredWorkUpdate, key) + } } } } -func (p *podWorkers) setIsWorking(uid types.UID, isWorking bool) { +func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) { p.podLock.Lock() - p.isWorking[uid] = isWorking - p.podLock.Unlock() + defer p.podLock.Unlock() + if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists { + p.podUpdates[uid] <- workUpdate + delete(p.lastUndeliveredWorkUpdate, uid) + } else { + p.isWorking[uid] = false + } }