mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-07 19:23:40 +00:00
kubelet: remove background updating thread in RuntimeCache
This feature is no longer useful pods don't sync as often. For batch creation/deletions/syncs, the cache will be up-to-date for most pods since it will be updated frequently. For other cases, continue updating for two more seconds don't usually help, as temporal locality doesn't hold across pod syncs.
This commit is contained in:
parent
b12550273e
commit
dc42d25f4a
@ -23,8 +23,7 @@ import (
|
||||
|
||||
var (
|
||||
// TODO(yifan): Maybe set the them as parameters for NewCache().
|
||||
defaultCachePeriod = time.Second * 2
|
||||
defaultUpdateInterval = time.Millisecond * 100
|
||||
defaultCachePeriod = time.Second * 2
|
||||
)
|
||||
|
||||
type RuntimeCache interface {
|
||||
@ -39,8 +38,7 @@ type podsGetter interface {
|
||||
// NewRuntimeCache creates a container runtime cache.
|
||||
func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
|
||||
return &runtimeCache{
|
||||
getter: getter,
|
||||
updating: false,
|
||||
getter: getter,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -48,13 +46,6 @@ func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
|
||||
// before updating the pods, so the timestamp is at most as new as the pods
|
||||
// (and can be slightly older). The timestamp always moves forward. Callers are
|
||||
// expected not to modify the pods returned from GetPods.
|
||||
// The pod updates can be triggered by a request (e.g., GetPods or
|
||||
// ForceUpdateIfOlder) if the cached pods are considered stale. These requests
|
||||
// will be blocked until the cache update is completed. To reduce the cache miss
|
||||
// penalty, upon a miss, runtimeCache would start a separate goroutine
|
||||
// (updatingThread) if one does not exist, to periodically updates the cache.
|
||||
// updatingThread would stop after a period of inactivity (no incoming requests)
|
||||
// to conserve resources.
|
||||
type runtimeCache struct {
|
||||
sync.Mutex
|
||||
// The underlying container runtime used to update the cache.
|
||||
@ -63,15 +54,10 @@ type runtimeCache struct {
|
||||
cacheTime time.Time
|
||||
// The content of the cache.
|
||||
pods []*Pod
|
||||
// Whether the background thread updating the cache is running.
|
||||
updating bool
|
||||
// Time when the background thread should be stopped.
|
||||
updatingThreadStopTime time.Time
|
||||
}
|
||||
|
||||
// GetPods returns the cached pods if they are not outdated; otherwise, it
|
||||
// retrieves the latest pods and return them.
|
||||
// If the cache updating loop has stopped, this function will restart it.
|
||||
func (r *runtimeCache) GetPods() ([]*Pod, error) {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
@ -80,12 +66,6 @@ func (r *runtimeCache) GetPods() ([]*Pod, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
// Stop refreshing thread if there were no requests within the default cache period
|
||||
r.updatingThreadStopTime = time.Now().Add(defaultCachePeriod)
|
||||
if !r.updating {
|
||||
r.updating = true
|
||||
go r.startUpdatingCache()
|
||||
}
|
||||
return r.pods, nil
|
||||
}
|
||||
|
||||
@ -103,7 +83,7 @@ func (r *runtimeCache) updateCache() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.writePodsIfNewer(pods, timestamp)
|
||||
r.pods, r.cacheTime = pods, timestamp
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -114,32 +94,3 @@ func (r *runtimeCache) getPodsWithTimestamp() ([]*Pod, time.Time, error) {
|
||||
pods, err := r.getter.GetPods(false)
|
||||
return pods, timestamp, err
|
||||
}
|
||||
|
||||
// writePodsIfNewer writes the pods and timestamp if they are newer than the
|
||||
// cached ones.
|
||||
func (r *runtimeCache) writePodsIfNewer(pods []*Pod, timestamp time.Time) {
|
||||
if timestamp.After(r.cacheTime) {
|
||||
r.pods, r.cacheTime = pods, timestamp
|
||||
}
|
||||
}
|
||||
|
||||
// startUpdateingCache continues to invoke GetPods to get the newest result until
|
||||
// there are no requests within the default cache period.
|
||||
func (r *runtimeCache) startUpdatingCache() {
|
||||
run := true
|
||||
for run {
|
||||
time.Sleep(defaultUpdateInterval)
|
||||
pods, timestamp, err := r.getPodsWithTimestamp()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
r.Lock()
|
||||
if time.Now().After(r.updatingThreadStopTime) {
|
||||
r.updating = false
|
||||
run = false
|
||||
}
|
||||
r.writePodsIfNewer(pods, timestamp)
|
||||
r.Unlock()
|
||||
}
|
||||
}
|
||||
|
@ -86,27 +86,3 @@ func TestForceUpdateIfOlder(t *testing.T) {
|
||||
t.Errorf("expected %#v, got %#v", newpods, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdatePodsOnlyIfNewer(t *testing.T) {
|
||||
runtime := &FakeRuntime{}
|
||||
cache := newTestRuntimeCache(runtime)
|
||||
|
||||
// Cache new pods with a future timestamp.
|
||||
newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
|
||||
cache.Lock()
|
||||
cache.pods = newpods
|
||||
cache.cacheTime = time.Now().Add(20 * time.Minute)
|
||||
cache.Unlock()
|
||||
|
||||
// Instruct runime to return a list of old pods.
|
||||
oldpods := []*Pod{{ID: "1111"}}
|
||||
runtime.PodList = oldpods
|
||||
|
||||
// Try to update the cache; the attempt should not succeed because the
|
||||
// cache timestamp is newer than the current time.
|
||||
cache.updateCacheWithLock()
|
||||
actual := cache.getCachedPods()
|
||||
if !reflect.DeepEqual(newpods, actual) {
|
||||
t.Errorf("expected %#v, got %#v", newpods, actual)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user