mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 15:25:57 +00:00
Merge pull request #7749 from yujuhong/stale_cache
Kubelet: record the timestamp correctly in the runtime cache
This commit is contained in:
commit
da42f13941
@ -46,6 +46,17 @@ func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// runtimeCache caches a list of pods. It records a timestamp (cacheTime) right
|
||||
// before updating the pods, so the timestamp is at most as new as the pods
|
||||
// (and can be slightly older). The timestamp always moves forward. Callers are
|
||||
// expected not to modify the pods returned from GetPods.
|
||||
// The pod updates can be triggered by a request (e.g., GetPods or
|
||||
// ForceUpdateIfOlder) if the cached pods are considered stale. These requests
|
||||
// will be blocked until the cache update is completed. To reduce the cache miss
|
||||
// penalty, upon a miss, runtimeCache would start a separate goroutine
|
||||
// (updatingThread) if one does not exist, to periodically updates the cache.
|
||||
// updatingThread would stop after a period of inactivity (no incoming requests)
|
||||
// to conserve resources.
|
||||
type runtimeCache struct {
|
||||
sync.Mutex
|
||||
// The underlying container runtime used to update the cache.
|
||||
@ -60,8 +71,8 @@ type runtimeCache struct {
|
||||
updatingThreadStopTime time.Time
|
||||
}
|
||||
|
||||
// GetPods returns the cached result for ListPods if the result is not
|
||||
// outdated, otherwise it will retrieve the newest result.
|
||||
// GetPods returns the cached pods if they are not outdated; otherwise, it
|
||||
// retrieves the latest pods and return them.
|
||||
// If the cache updating loop has stopped, this function will restart it.
|
||||
func (r *runtimeCache) GetPods() ([]*Pod, error) {
|
||||
r.Lock()
|
||||
@ -90,23 +101,37 @@ func (r *runtimeCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error
|
||||
}
|
||||
|
||||
func (r *runtimeCache) updateCache() error {
|
||||
pods, err := r.getter.GetPods(false)
|
||||
pods, timestamp, err := r.getPodsWithTimestamp()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.pods = pods
|
||||
r.cacheTime = time.Now()
|
||||
r.writePodsIfNewer(pods, timestamp)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getPodsWithTimestamp records a timestamp and retrieves pods from the getter.
|
||||
func (r *runtimeCache) getPodsWithTimestamp() ([]*Pod, time.Time, error) {
|
||||
// Always record the timestamp before getting the pods to avoid stale pods.
|
||||
timestamp := time.Now()
|
||||
pods, err := r.getter.GetPods(false)
|
||||
return pods, timestamp, err
|
||||
}
|
||||
|
||||
// writePodsIfNewer writes the pods and timestamp if they are newer than the
|
||||
// cached ones.
|
||||
func (r *runtimeCache) writePodsIfNewer(pods []*Pod, timestamp time.Time) {
|
||||
if timestamp.After(r.cacheTime) {
|
||||
r.pods, r.cacheTime = pods, timestamp
|
||||
}
|
||||
}
|
||||
|
||||
// startUpdateingCache continues to invoke GetPods to get the newest result until
|
||||
// there are no requests within the default cache period.
|
||||
func (r *runtimeCache) startUpdatingCache() {
|
||||
run := true
|
||||
for run {
|
||||
time.Sleep(defaultUpdateInterval)
|
||||
pods, err := r.getter.GetPods(false)
|
||||
cacheTime := time.Now()
|
||||
pods, timestamp, err := r.getPodsWithTimestamp()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@ -116,8 +141,7 @@ func (r *runtimeCache) startUpdatingCache() {
|
||||
r.updating = false
|
||||
run = false
|
||||
}
|
||||
r.pods = pods
|
||||
r.cacheTime = cacheTime
|
||||
r.writePodsIfNewer(pods, timestamp)
|
||||
r.Unlock()
|
||||
}
|
||||
}
|
||||
|
112
pkg/kubelet/container/runtime_cache_test.go
Normal file
112
pkg/kubelet/container/runtime_cache_test.go
Normal file
@ -0,0 +1,112 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package container
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// testRunTimeCache embeds runtimeCache with some additional methods for
|
||||
// testing.
|
||||
type testRuntimeCache struct {
|
||||
runtimeCache
|
||||
}
|
||||
|
||||
func (r *testRuntimeCache) updateCacheWithLock() error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
return r.updateCache()
|
||||
}
|
||||
|
||||
func (r *testRuntimeCache) getCachedPods() []*Pod {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
return r.pods
|
||||
}
|
||||
|
||||
func newTestRuntimeCache(getter podsGetter) *testRuntimeCache {
|
||||
c, _ := NewRuntimeCache(getter)
|
||||
return &testRuntimeCache{*c.(*runtimeCache)}
|
||||
}
|
||||
|
||||
func TestGetPods(t *testing.T) {
|
||||
runtime := &FakeRuntime{}
|
||||
expected := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
|
||||
runtime.Podlist = expected
|
||||
cache := newTestRuntimeCache(runtime)
|
||||
actual, err := cache.GetPods()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(expected, actual) {
|
||||
t.Errorf("expected %#v, got %#v", expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestForceUpdateIfOlder(t *testing.T) {
|
||||
runtime := &FakeRuntime{}
|
||||
cache := newTestRuntimeCache(runtime)
|
||||
|
||||
// Cache old pods.
|
||||
oldpods := []*Pod{{ID: "1111"}}
|
||||
runtime.Podlist = oldpods
|
||||
cache.updateCacheWithLock()
|
||||
|
||||
// Update the runtime to new pods.
|
||||
newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
|
||||
runtime.Podlist = newpods
|
||||
|
||||
// An older timestamp should not force an update.
|
||||
cache.ForceUpdateIfOlder(time.Now().Add(-20 * time.Minute))
|
||||
actual := cache.getCachedPods()
|
||||
if !reflect.DeepEqual(oldpods, actual) {
|
||||
t.Errorf("expected %#v, got %#v", oldpods, actual)
|
||||
}
|
||||
|
||||
// A newer timestamp should force an update.
|
||||
cache.ForceUpdateIfOlder(time.Now().Add(20 * time.Second))
|
||||
actual = cache.getCachedPods()
|
||||
if !reflect.DeepEqual(newpods, actual) {
|
||||
t.Errorf("expected %#v, got %#v", newpods, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdatePodsOnlyIfNewer(t *testing.T) {
|
||||
runtime := &FakeRuntime{}
|
||||
cache := newTestRuntimeCache(runtime)
|
||||
|
||||
// Cache new pods with a future timestamp.
|
||||
newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
|
||||
cache.Lock()
|
||||
cache.pods = newpods
|
||||
cache.cacheTime = time.Now().Add(20 * time.Minute)
|
||||
cache.Unlock()
|
||||
|
||||
// Instruct runime to return a list of old pods.
|
||||
oldpods := []*Pod{{ID: "1111"}}
|
||||
runtime.Podlist = oldpods
|
||||
|
||||
// Try to update the cache; the attempt should not succeed because the
|
||||
// cache timestamp is newer than the current time.
|
||||
cache.updateCacheWithLock()
|
||||
actual := cache.getCachedPods()
|
||||
if !reflect.DeepEqual(newpods, actual) {
|
||||
t.Errorf("expected %#v, got %#v", newpods, actual)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user