mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Kubelet: fix the runtime cache to not cache the stale pods
If a pod worker sees stale pods from the runtime cache which were retrieved before their last sync finished, it may think that the pod were not started correctly, and attemp to fix that by killing/restarting containers. There are two issues that may cause runtime cache to store stale pods: 1. The timstamp is recorded *after* getting the pods from the container runtime. This may lead the consumer to think the pods are newer than they actually are. 2. The cache updates are triggered by many goroutines (pod workers, and the updating thread). There is no mechanism to enforece that the cache would only be updated to newer pods. This change fixes the above two issues by making sure one always record the timestamp before getting pods from the container runtime, and updates the cached pods only if the timestamp is newer.
This commit is contained in:
parent
b0129089da
commit
c075719f05
@ -46,6 +46,17 @@ func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
// runtimeCache caches a list of pods. It records a timestamp (cacheTime) right
|
||||
// before updating the pods, so the timestamp is at most as new as the pods
|
||||
// (and can be slightly older). The timestamp always moves forward. Callers are
|
||||
// expected not to modify the pods returned from GetPods.
|
||||
// The pod updates can be triggered by a request (e.g., GetPods or
|
||||
// ForceUpdateIfOlder) if the cached pods are considered stale. These requests
|
||||
// will be blocked until the cache update is completed. To reduce the cache miss
|
||||
// penalty, upon a miss, runtimeCache would start a separate goroutine
|
||||
// (updatingThread) if one does not exist, to periodically updates the cache.
|
||||
// updatingThread would stop after a period of inactivity (no incoming requests)
|
||||
// to conserve resources.
|
||||
type runtimeCache struct {
|
||||
sync.Mutex
|
||||
// The underlying container runtime used to update the cache.
|
||||
@ -60,8 +71,8 @@ type runtimeCache struct {
|
||||
updatingThreadStopTime time.Time
|
||||
}
|
||||
|
||||
// GetPods returns the cached result for ListPods if the result is not
|
||||
// outdated, otherwise it will retrieve the newest result.
|
||||
// GetPods returns the cached pods if they are not outdated; otherwise, it
|
||||
// retrieves the latest pods and return them.
|
||||
// If the cache updating loop has stopped, this function will restart it.
|
||||
func (r *runtimeCache) GetPods() ([]*Pod, error) {
|
||||
r.Lock()
|
||||
@ -90,23 +101,37 @@ func (r *runtimeCache) ForceUpdateIfOlder(minExpectedCacheTime time.Time) error
|
||||
}
|
||||
|
||||
func (r *runtimeCache) updateCache() error {
|
||||
pods, err := r.getter.GetPods(false)
|
||||
pods, timestamp, err := r.getPodsWithTimestamp()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.pods = pods
|
||||
r.cacheTime = time.Now()
|
||||
r.writePodsIfNewer(pods, timestamp)
|
||||
return nil
|
||||
}
|
||||
|
||||
// getPodsWithTimestamp records a timestamp and retrieves pods from the getter.
|
||||
func (r *runtimeCache) getPodsWithTimestamp() ([]*Pod, time.Time, error) {
|
||||
// Always record the timestamp before getting the pods to avoid stale pods.
|
||||
timestamp := time.Now()
|
||||
pods, err := r.getter.GetPods(false)
|
||||
return pods, timestamp, err
|
||||
}
|
||||
|
||||
// writePodsIfNewer writes the pods and timestamp if they are newer than the
|
||||
// cached ones.
|
||||
func (r *runtimeCache) writePodsIfNewer(pods []*Pod, timestamp time.Time) {
|
||||
if timestamp.After(r.cacheTime) {
|
||||
r.pods, r.cacheTime = pods, timestamp
|
||||
}
|
||||
}
|
||||
|
||||
// startUpdateingCache continues to invoke GetPods to get the newest result until
|
||||
// there is no requests within the default cache period.
|
||||
func (r *runtimeCache) startUpdatingCache() {
|
||||
run := true
|
||||
for run {
|
||||
time.Sleep(defaultUpdateInterval)
|
||||
pods, err := r.getter.GetPods(false)
|
||||
cacheTime := time.Now()
|
||||
pods, timestamp, err := r.getPodsWithTimestamp()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@ -116,8 +141,7 @@ func (r *runtimeCache) startUpdatingCache() {
|
||||
r.updating = false
|
||||
run = false
|
||||
}
|
||||
r.pods = pods
|
||||
r.cacheTime = cacheTime
|
||||
r.writePodsIfNewer(pods, timestamp)
|
||||
r.Unlock()
|
||||
}
|
||||
}
|
||||
|
112
pkg/kubelet/container/runtime_cache_test.go
Normal file
112
pkg/kubelet/container/runtime_cache_test.go
Normal file
@ -0,0 +1,112 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package container
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// testRunTimeCache embeds runtimeCache with some additional methods for
|
||||
// testing.
|
||||
type testRuntimeCache struct {
|
||||
runtimeCache
|
||||
}
|
||||
|
||||
func (r *testRuntimeCache) updateCacheWithLock() error {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
return r.updateCache()
|
||||
}
|
||||
|
||||
func (r *testRuntimeCache) getCachedPods() []*Pod {
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
return r.pods
|
||||
}
|
||||
|
||||
func newTestRuntimeCache(getter podsGetter) *testRuntimeCache {
|
||||
c, _ := NewRuntimeCache(getter)
|
||||
return &testRuntimeCache{*c.(*runtimeCache)}
|
||||
}
|
||||
|
||||
func TestGetPods(t *testing.T) {
|
||||
runtime := &FakeRuntime{}
|
||||
expected := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
|
||||
runtime.Podlist = expected
|
||||
cache := newTestRuntimeCache(runtime)
|
||||
actual, err := cache.GetPods()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(expected, actual) {
|
||||
t.Errorf("expected %#v, got %#v", expected, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestForceUpdateIfOlder(t *testing.T) {
|
||||
runtime := &FakeRuntime{}
|
||||
cache := newTestRuntimeCache(runtime)
|
||||
|
||||
// Cache old pods.
|
||||
oldpods := []*Pod{{ID: "1111"}}
|
||||
runtime.Podlist = oldpods
|
||||
cache.updateCacheWithLock()
|
||||
|
||||
// Update the runtime to new pods.
|
||||
newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
|
||||
runtime.Podlist = newpods
|
||||
|
||||
// An older timestamp should not force an update.
|
||||
cache.ForceUpdateIfOlder(time.Now().Add(-20 * time.Minute))
|
||||
actual := cache.getCachedPods()
|
||||
if !reflect.DeepEqual(oldpods, actual) {
|
||||
t.Errorf("expected %#v, got %#v", oldpods, actual)
|
||||
}
|
||||
|
||||
// A newer timestamp should force an update.
|
||||
cache.ForceUpdateIfOlder(time.Now().Add(20 * time.Second))
|
||||
actual = cache.getCachedPods()
|
||||
if !reflect.DeepEqual(newpods, actual) {
|
||||
t.Errorf("expected %#v, got %#v", newpods, actual)
|
||||
}
|
||||
}
|
||||
|
||||
func TestUpdatePodsOnlyIfNewer(t *testing.T) {
|
||||
runtime := &FakeRuntime{}
|
||||
cache := newTestRuntimeCache(runtime)
|
||||
|
||||
// Cache new pods with a future timestamp.
|
||||
newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
|
||||
cache.Lock()
|
||||
cache.pods = newpods
|
||||
cache.cacheTime = time.Now().Add(20 * time.Minute)
|
||||
cache.Unlock()
|
||||
|
||||
// Instruct runime to return a list of old pods.
|
||||
oldpods := []*Pod{{ID: "1111"}}
|
||||
runtime.Podlist = oldpods
|
||||
|
||||
// Try to update the cache; the attempt should not succeed because the
|
||||
// cache timestamp is newer than the current time.
|
||||
cache.updateCacheWithLock()
|
||||
actual := cache.getCachedPods()
|
||||
if !reflect.DeepEqual(newpods, actual) {
|
||||
t.Errorf("expected %#v, got %#v", newpods, actual)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user