Ensure runtimeCache contains all observed started containers on pod delete

This commit is contained in:
Jordan Liggitt 2020-07-21 12:02:26 -04:00
parent de18bd6c89
commit d195fc2ec8
4 changed files with 142 additions and 0 deletions

View File

@ -29,6 +29,7 @@ go_library(
"reason_cache.go",
"runonce.go",
"runtime.go",
"time_cache.go",
"util.go",
"volume_host.go",
],
@ -179,6 +180,7 @@ go_test(
"pod_workers_test.go",
"reason_cache_test.go",
"runonce_test.go",
"time_cache_test.go",
],
embed = [":go_default_library"],
deps = [
@ -251,6 +253,7 @@ go_test(
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/component-base/featuregate/testing:go_default_library",
"//staging/src/k8s.io/component-base/version:go_default_library",
"//vendor/github.com/golang/groupcache/lru:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/github.com/google/cadvisor/info/v2:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",

View File

@ -525,6 +525,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate),
keepTerminatedPodVolumes: keepTerminatedPodVolumes,
nodeStatusMaxImages: nodeStatusMaxImages,
lastContainerStartedTime: newTimeCache(),
}
if klet.cloud != nil {
@ -975,6 +976,9 @@ type Kubelet struct {
// lastStatusReportTime is the time when node status was last reported.
lastStatusReportTime time.Time
// lastContainerStartedTime is the time of the last ContainerStarted event observed per pod
lastContainerStartedTime *timeCache
// syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe.
// This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else.
syncNodeStatusMux sync.Mutex
@ -1655,6 +1659,13 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error {
}
kl.podWorkers.ForgetWorker(pod.UID)
// make sure our runtimeCache is at least as fresh as the last container started event we observed.
// this ensures we correctly send graceful deletion signals to all containers we've reported started.
if lastContainerStarted, ok := kl.lastContainerStartedTime.Get(pod.UID); ok {
if err := kl.runtimeCache.ForceUpdateIfOlder(lastContainerStarted); err != nil {
return fmt.Errorf("error updating containers: %v", err)
}
}
// Runtime cache may not have been updated to with the pod, but it's okay
// because the periodic cleanup routine will attempt to delete again later.
runningPods, err := kl.runtimeCache.GetPods()
@ -1839,6 +1850,12 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle
kl.sourcesReady.AddSource(u.Source)
case e := <-plegCh:
if e.Type == pleg.ContainerStarted {
// record the most recent time we observed a container start for this pod.
// this lets us selectively invalidate the runtimeCache when processing a delete for this pod
// to make sure we don't miss handling graceful termination for containers we reported as having started.
kl.lastContainerStartedTime.Add(e.ID, time.Now())
}
if isSyncPodWorthy(e) {
// PLEG event for a pod; sync it.
if pod, ok := kl.podManager.GetPodByUID(e.ID); ok {

67
pkg/kubelet/time_cache.go Normal file
View File

@ -0,0 +1,67 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"sync"
"time"
"github.com/golang/groupcache/lru"
"k8s.io/apimachinery/pkg/types"
)
// timeCache stores a time keyed by uid
type timeCache struct {
lock sync.RWMutex
cache *lru.Cache
}
// maxTimeCacheEntries is the cache entry number in lru cache. 1000 is a proper number
// for our 100 pods per node target. If we support more pods per node in the future, we
// may want to increase the number.
const maxTimeCacheEntries = 1000
func newTimeCache() *timeCache {
return &timeCache{cache: lru.New(maxTimeCacheEntries)}
}
func (c *timeCache) Add(uid types.UID, t time.Time) {
c.lock.Lock()
defer c.lock.Unlock()
c.cache.Add(uid, t)
}
func (c *timeCache) Remove(uid types.UID) {
c.lock.Lock()
defer c.lock.Unlock()
c.cache.Remove(uid)
}
func (c *timeCache) Get(uid types.UID) (time.Time, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
value, ok := c.cache.Get(uid)
if !ok {
return time.Time{}, false
}
t, ok := value.(time.Time)
if !ok {
return time.Time{}, false
}
return t, true
}

View File

@ -0,0 +1,55 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"testing"
"time"
"github.com/golang/groupcache/lru"
)
func TestTimeCache(t *testing.T) {
cache := &timeCache{cache: lru.New(2)}
if a, ok := cache.Get("123"); ok {
t.Errorf("expected cache miss, got %v, %v", a, ok)
}
now := time.Now()
soon := now.Add(time.Minute)
cache.Add("now", now)
cache.Add("soon", soon)
if a, ok := cache.Get("now"); !ok || !a.Equal(now) {
t.Errorf("expected cache hit matching %v, got %v, %v", now, a, ok)
}
if a, ok := cache.Get("soon"); !ok || !a.Equal(soon) {
t.Errorf("expected cache hit matching %v, got %v, %v", soon, a, ok)
}
then := now.Add(-time.Minute)
cache.Add("then", then)
if a, ok := cache.Get("now"); ok {
t.Errorf("expected cache miss from oldest evicted value, got %v, %v", a, ok)
}
if a, ok := cache.Get("soon"); !ok || !a.Equal(soon) {
t.Errorf("expected cache hit matching %v, got %v, %v", soon, a, ok)
}
if a, ok := cache.Get("then"); !ok || !a.Equal(then) {
t.Errorf("expected cache hit matching %v, got %v, %v", then, a, ok)
}
}