mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-01 15:58:37 +00:00
Merge pull request #17545 from yujuhong/no_auto_updates
Auto commit by PR queue bot
This commit is contained in:
commit
5e53e281e5
@ -23,8 +23,7 @@ import (
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
// TODO(yifan): Maybe set the them as parameters for NewCache().
|
// TODO(yifan): Maybe set the them as parameters for NewCache().
|
||||||
defaultCachePeriod = time.Second * 2
|
defaultCachePeriod = time.Second * 2
|
||||||
defaultUpdateInterval = time.Millisecond * 100
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type RuntimeCache interface {
|
type RuntimeCache interface {
|
||||||
@ -39,8 +38,7 @@ type podsGetter interface {
|
|||||||
// NewRuntimeCache creates a container runtime cache.
|
// NewRuntimeCache creates a container runtime cache.
|
||||||
func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
|
func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
|
||||||
return &runtimeCache{
|
return &runtimeCache{
|
||||||
getter: getter,
|
getter: getter,
|
||||||
updating: false,
|
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -48,13 +46,6 @@ func NewRuntimeCache(getter podsGetter) (RuntimeCache, error) {
|
|||||||
// before updating the pods, so the timestamp is at most as new as the pods
|
// before updating the pods, so the timestamp is at most as new as the pods
|
||||||
// (and can be slightly older). The timestamp always moves forward. Callers are
|
// (and can be slightly older). The timestamp always moves forward. Callers are
|
||||||
// expected not to modify the pods returned from GetPods.
|
// expected not to modify the pods returned from GetPods.
|
||||||
// The pod updates can be triggered by a request (e.g., GetPods or
|
|
||||||
// ForceUpdateIfOlder) if the cached pods are considered stale. These requests
|
|
||||||
// will be blocked until the cache update is completed. To reduce the cache miss
|
|
||||||
// penalty, upon a miss, runtimeCache would start a separate goroutine
|
|
||||||
// (updatingThread) if one does not exist, to periodically updates the cache.
|
|
||||||
// updatingThread would stop after a period of inactivity (no incoming requests)
|
|
||||||
// to conserve resources.
|
|
||||||
type runtimeCache struct {
|
type runtimeCache struct {
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
// The underlying container runtime used to update the cache.
|
// The underlying container runtime used to update the cache.
|
||||||
@ -63,15 +54,10 @@ type runtimeCache struct {
|
|||||||
cacheTime time.Time
|
cacheTime time.Time
|
||||||
// The content of the cache.
|
// The content of the cache.
|
||||||
pods []*Pod
|
pods []*Pod
|
||||||
// Whether the background thread updating the cache is running.
|
|
||||||
updating bool
|
|
||||||
// Time when the background thread should be stopped.
|
|
||||||
updatingThreadStopTime time.Time
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPods returns the cached pods if they are not outdated; otherwise, it
|
// GetPods returns the cached pods if they are not outdated; otherwise, it
|
||||||
// retrieves the latest pods and return them.
|
// retrieves the latest pods and return them.
|
||||||
// If the cache updating loop has stopped, this function will restart it.
|
|
||||||
func (r *runtimeCache) GetPods() ([]*Pod, error) {
|
func (r *runtimeCache) GetPods() ([]*Pod, error) {
|
||||||
r.Lock()
|
r.Lock()
|
||||||
defer r.Unlock()
|
defer r.Unlock()
|
||||||
@ -80,12 +66,6 @@ func (r *runtimeCache) GetPods() ([]*Pod, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Stop refreshing thread if there were no requests within the default cache period
|
|
||||||
r.updatingThreadStopTime = time.Now().Add(defaultCachePeriod)
|
|
||||||
if !r.updating {
|
|
||||||
r.updating = true
|
|
||||||
go r.startUpdatingCache()
|
|
||||||
}
|
|
||||||
return r.pods, nil
|
return r.pods, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -103,7 +83,7 @@ func (r *runtimeCache) updateCache() error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
r.writePodsIfNewer(pods, timestamp)
|
r.pods, r.cacheTime = pods, timestamp
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,32 +94,3 @@ func (r *runtimeCache) getPodsWithTimestamp() ([]*Pod, time.Time, error) {
|
|||||||
pods, err := r.getter.GetPods(false)
|
pods, err := r.getter.GetPods(false)
|
||||||
return pods, timestamp, err
|
return pods, timestamp, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// writePodsIfNewer writes the pods and timestamp if they are newer than the
|
|
||||||
// cached ones.
|
|
||||||
func (r *runtimeCache) writePodsIfNewer(pods []*Pod, timestamp time.Time) {
|
|
||||||
if timestamp.After(r.cacheTime) {
|
|
||||||
r.pods, r.cacheTime = pods, timestamp
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// startUpdateingCache continues to invoke GetPods to get the newest result until
|
|
||||||
// there are no requests within the default cache period.
|
|
||||||
func (r *runtimeCache) startUpdatingCache() {
|
|
||||||
run := true
|
|
||||||
for run {
|
|
||||||
time.Sleep(defaultUpdateInterval)
|
|
||||||
pods, timestamp, err := r.getPodsWithTimestamp()
|
|
||||||
if err != nil {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
r.Lock()
|
|
||||||
if time.Now().After(r.updatingThreadStopTime) {
|
|
||||||
r.updating = false
|
|
||||||
run = false
|
|
||||||
}
|
|
||||||
r.writePodsIfNewer(pods, timestamp)
|
|
||||||
r.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -86,27 +86,3 @@ func TestForceUpdateIfOlder(t *testing.T) {
|
|||||||
t.Errorf("expected %#v, got %#v", newpods, actual)
|
t.Errorf("expected %#v, got %#v", newpods, actual)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestUpdatePodsOnlyIfNewer(t *testing.T) {
|
|
||||||
runtime := &FakeRuntime{}
|
|
||||||
cache := newTestRuntimeCache(runtime)
|
|
||||||
|
|
||||||
// Cache new pods with a future timestamp.
|
|
||||||
newpods := []*Pod{{ID: "1111"}, {ID: "2222"}, {ID: "3333"}}
|
|
||||||
cache.Lock()
|
|
||||||
cache.pods = newpods
|
|
||||||
cache.cacheTime = time.Now().Add(20 * time.Minute)
|
|
||||||
cache.Unlock()
|
|
||||||
|
|
||||||
// Instruct runime to return a list of old pods.
|
|
||||||
oldpods := []*Pod{{ID: "1111"}}
|
|
||||||
runtime.PodList = oldpods
|
|
||||||
|
|
||||||
// Try to update the cache; the attempt should not succeed because the
|
|
||||||
// cache timestamp is newer than the current time.
|
|
||||||
cache.updateCacheWithLock()
|
|
||||||
actual := cache.getCachedPods()
|
|
||||||
if !reflect.DeepEqual(newpods, actual) {
|
|
||||||
t.Errorf("expected %#v, got %#v", newpods, actual)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
Loading…
Reference in New Issue
Block a user