mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Change PodWorkers to have desired cache.
This commit is contained in:
parent
6d465c4d86
commit
80576dfed3
@ -23,6 +23,7 @@ import (
|
||||
|
||||
type DockerCache interface {
|
||||
RunningContainers() (DockerContainers, error)
|
||||
ForceUpdateIfOlder(time.Time) error
|
||||
}
|
||||
|
||||
func NewDockerCache(client DockerInterface) (DockerCache, error) {
|
||||
@ -49,6 +50,9 @@ type dockerCache struct {
|
||||
updatingThreadStopTime time.Time
|
||||
}
|
||||
|
||||
// Ensure that dockerCache abides by the DockerCache interface.
|
||||
var _ DockerCache = new(dockerCache)
|
||||
|
||||
func (d *dockerCache) RunningContainers() (DockerContainers, error) {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
@ -69,6 +73,20 @@ func (d *dockerCache) RunningContainers() (DockerContainers, error) {
|
||||
return d.containers, nil
|
||||
}
|
||||
|
||||
func (d *dockerCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error {
|
||||
d.lock.Lock()
|
||||
defer d.lock.Unlock()
|
||||
if d.cacheTime.Before(minExpectedCacheTime) {
|
||||
containers, err := GetKubeletDockerContainers(d.client, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.containers = containers
|
||||
d.cacheTime = time.Now()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *dockerCache) startUpdatingCache() {
|
||||
run := true
|
||||
for run {
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/fsouza/go-dockerclient"
|
||||
@ -246,3 +247,7 @@ func NewFakeDockerCache(client DockerInterface) DockerCache {
|
||||
func (f *FakeDockerCache) RunningContainers() (DockerContainers, error) {
|
||||
return GetKubeletDockerContainers(f.client, false)
|
||||
}
|
||||
|
||||
func (f *FakeDockerCache) ForceUpdateIfOlder(time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package kubelet
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
|
||||
@ -41,6 +42,9 @@ type podWorkers struct {
|
||||
// Currently all update request for a given pod coming when another
|
||||
// update of this pod is being processed are ignored.
|
||||
isWorking map[types.UID]bool
|
||||
// Tracks the last undelivered work item for this pod - a work item is
|
||||
// undelivered if it comes in while the worker is working.
|
||||
lastUndeliveredWorkUpdate map[types.UID]workUpdate
|
||||
// DockerCache is used for listing running containers.
|
||||
dockerCache dockertools.DockerCache
|
||||
|
||||
@ -63,22 +67,26 @@ type workUpdate struct {
|
||||
|
||||
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, recorder record.EventRecorder) *podWorkers {
|
||||
return &podWorkers{
|
||||
podUpdates: map[types.UID]chan workUpdate{},
|
||||
isWorking: map[types.UID]bool{},
|
||||
dockerCache: dockerCache,
|
||||
syncPodFn: syncPodFn,
|
||||
recorder: recorder,
|
||||
podUpdates: map[types.UID]chan workUpdate{},
|
||||
isWorking: map[types.UID]bool{},
|
||||
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
|
||||
dockerCache: dockerCache,
|
||||
syncPodFn: syncPodFn,
|
||||
recorder: recorder,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
|
||||
var minDockerCacheTime time.Time
|
||||
for newWork := range podUpdates {
|
||||
// Since we use docker cache, getting current state shouldn't cause
|
||||
// performance overhead on Docker. Moreover, as long as we run syncPod
|
||||
// no matter if it changes anything, having an old version of "containers"
|
||||
// can cause starting eunended containers.
|
||||
func() {
|
||||
defer p.setIsWorking(newWork.pod.UID, false)
|
||||
defer p.checkForUpdates(newWork.pod.UID, newWork.updateCompleteFn)
|
||||
// We would like to have the state of Docker from at least the moment
|
||||
// when we finished the previous processing of that pod.
|
||||
if err := p.dockerCache.ForceUpdateIfOlder(minDockerCacheTime); err != nil {
|
||||
glog.Errorf("Error updating docker cache: %v", err)
|
||||
return
|
||||
}
|
||||
containers, err := p.dockerCache.RunningContainers()
|
||||
if err != nil {
|
||||
glog.Errorf("Error listing containers while syncing pod: %v", err)
|
||||
@ -91,6 +99,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
|
||||
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
|
||||
return
|
||||
}
|
||||
minDockerCacheTime = time.Now()
|
||||
|
||||
newWork.updateCompleteFn()
|
||||
}()
|
||||
@ -106,8 +115,10 @@ func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) {
|
||||
p.podLock.Lock()
|
||||
defer p.podLock.Unlock()
|
||||
if podUpdates, exists = p.podUpdates[uid]; !exists {
|
||||
// Currently all update request for a given pod coming when another
|
||||
// update of this pod is being processed are ignored.
|
||||
// We need to have a buffer here, because checkForUpdates() method that
|
||||
// puts an update into channel is called from the same goroutine where
|
||||
// the channel is consumed. However, it is guaranteed that in such case
|
||||
// the channel is empty, so buffer of size 1 is enough.
|
||||
podUpdates = make(chan workUpdate, 1)
|
||||
p.podUpdates[uid] = podUpdates
|
||||
go func() {
|
||||
@ -115,24 +126,17 @@ func (p *podWorkers) UpdatePod(pod *api.BoundPod, updateComplete func()) {
|
||||
p.managePodLoop(podUpdates)
|
||||
}()
|
||||
}
|
||||
// TODO(wojtek-t): Consider changing to the following model:
|
||||
// - add a cache of "desired" pod state
|
||||
// - whenever an update of a pod comes, we update the "desired" cache
|
||||
// - if per-pod goroutine is currently iddle, we send the it immediately
|
||||
// to the per-pod goroutine and clear the cache;
|
||||
// - when per-pod goroutine finishes processing an update it checks the
|
||||
// desired cache for next update to proces
|
||||
// - the crucial thing in this approach is that we don't accumulate multiple
|
||||
// updates for a given pod (at any point in time there will be at most
|
||||
// one update queued for a given pod, plus potentially one currently being
|
||||
// processed) and additionally don't rely on the fact that an update will
|
||||
// be resend (because we don't drop it)
|
||||
if !p.isWorking[pod.UID] {
|
||||
p.isWorking[pod.UID] = true
|
||||
podUpdates <- workUpdate{
|
||||
pod: pod,
|
||||
updateCompleteFn: updateComplete,
|
||||
}
|
||||
} else {
|
||||
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
|
||||
pod: pod,
|
||||
updateCompleteFn: updateComplete,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,12 +147,23 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty
|
||||
if _, exists := desiredPods[key]; !exists {
|
||||
close(channel)
|
||||
delete(p.podUpdates, key)
|
||||
// If there is an undelivered work update for this pod we need to remove it
|
||||
// since per-pod goroutine won't be able to put it to the already closed
|
||||
// channel when it finish processing the current work update.
|
||||
if _, cached := p.lastUndeliveredWorkUpdate[key]; cached {
|
||||
delete(p.lastUndeliveredWorkUpdate, key)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *podWorkers) setIsWorking(uid types.UID, isWorking bool) {
|
||||
func (p *podWorkers) checkForUpdates(uid types.UID, updateComplete func()) {
|
||||
p.podLock.Lock()
|
||||
p.isWorking[uid] = isWorking
|
||||
p.podLock.Unlock()
|
||||
defer p.podLock.Unlock()
|
||||
if workUpdate, exists := p.lastUndeliveredWorkUpdate[uid]; exists {
|
||||
p.podUpdates[uid] <- workUpdate
|
||||
delete(p.lastUndeliveredWorkUpdate, uid)
|
||||
} else {
|
||||
p.isWorking[uid] = false
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user