From d195fc2ec864988d1375f8a14bc7dbaacc6a1b95 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Tue, 21 Jul 2020 12:02:26 -0400 Subject: [PATCH] Ensure runtimeCache contains all observed started containers on pod delete --- pkg/kubelet/BUILD | 3 ++ pkg/kubelet/kubelet.go | 17 +++++++++ pkg/kubelet/time_cache.go | 67 ++++++++++++++++++++++++++++++++++ pkg/kubelet/time_cache_test.go | 55 ++++++++++++++++++++++++++++ 4 files changed, 142 insertions(+) create mode 100644 pkg/kubelet/time_cache.go create mode 100644 pkg/kubelet/time_cache_test.go diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index e1f24fffc6f..7efbf4a6927 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -29,6 +29,7 @@ go_library( "reason_cache.go", "runonce.go", "runtime.go", + "time_cache.go", "util.go", "volume_host.go", ], @@ -179,6 +180,7 @@ go_test( "pod_workers_test.go", "reason_cache_test.go", "runonce_test.go", + "time_cache_test.go", ], embed = [":go_default_library"], deps = [ @@ -251,6 +253,7 @@ go_test( "//staging/src/k8s.io/client-go/util/testing:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", "//staging/src/k8s.io/component-base/version:go_default_library", + "//vendor/github.com/golang/groupcache/lru:go_default_library", "//vendor/github.com/google/cadvisor/info/v1:go_default_library", "//vendor/github.com/google/cadvisor/info/v2:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 5c735c68176..051671a9fc8 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -525,6 +525,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, experimentalHostUserNamespaceDefaulting: utilfeature.DefaultFeatureGate.Enabled(features.ExperimentalHostUserNamespaceDefaultingGate), keepTerminatedPodVolumes: keepTerminatedPodVolumes, nodeStatusMaxImages: nodeStatusMaxImages, + lastContainerStartedTime: newTimeCache(), } if klet.cloud != nil { @@ -975,6 +976,9 @@ type Kubelet struct { // lastStatusReportTime is the time when node status was last reported. lastStatusReportTime time.Time + // lastContainerStartedTime is the time of the last ContainerStarted event observed per pod + lastContainerStartedTime *timeCache + // syncNodeStatusMux is a lock on updating the node status, because this path is not thread-safe. // This lock is used by Kubelet.syncNodeStatus function and shouldn't be used anywhere else. syncNodeStatusMux sync.Mutex @@ -1655,6 +1659,13 @@ func (kl *Kubelet) deletePod(pod *v1.Pod) error { } kl.podWorkers.ForgetWorker(pod.UID) + // make sure our runtimeCache is at least as fresh as the last container started event we observed. + // this ensures we correctly send graceful deletion signals to all containers we've reported started. + if lastContainerStarted, ok := kl.lastContainerStartedTime.Get(pod.UID); ok { + if err := kl.runtimeCache.ForceUpdateIfOlder(lastContainerStarted); err != nil { + return fmt.Errorf("error updating containers: %v", err) + } + } // Runtime cache may not have been updated to with the pod, but it's okay // because the periodic cleanup routine will attempt to delete again later. runningPods, err := kl.runtimeCache.GetPods() @@ -1839,6 +1850,12 @@ func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handle kl.sourcesReady.AddSource(u.Source) case e := <-plegCh: + if e.Type == pleg.ContainerStarted { + // record the most recent time we observed a container start for this pod. + // this lets us selectively invalidate the runtimeCache when processing a delete for this pod + // to make sure we don't miss handling graceful termination for containers we reported as having started. + kl.lastContainerStartedTime.Add(e.ID, time.Now()) + } if isSyncPodWorthy(e) { // PLEG event for a pod; sync it. if pod, ok := kl.podManager.GetPodByUID(e.ID); ok { diff --git a/pkg/kubelet/time_cache.go b/pkg/kubelet/time_cache.go new file mode 100644 index 00000000000..66528e25729 --- /dev/null +++ b/pkg/kubelet/time_cache.go @@ -0,0 +1,67 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "sync" + "time" + + "github.com/golang/groupcache/lru" + + "k8s.io/apimachinery/pkg/types" +) + +// timeCache stores a time keyed by uid +type timeCache struct { + lock sync.RWMutex + cache *lru.Cache +} + +// maxTimeCacheEntries is the cache entry number in lru cache. 1000 is a proper number +// for our 100 pods per node target. If we support more pods per node in the future, we +// may want to increase the number. +const maxTimeCacheEntries = 1000 + +func newTimeCache() *timeCache { + return &timeCache{cache: lru.New(maxTimeCacheEntries)} +} + +func (c *timeCache) Add(uid types.UID, t time.Time) { + c.lock.Lock() + defer c.lock.Unlock() + c.cache.Add(uid, t) +} + +func (c *timeCache) Remove(uid types.UID) { + c.lock.Lock() + defer c.lock.Unlock() + c.cache.Remove(uid) +} + +func (c *timeCache) Get(uid types.UID) (time.Time, bool) { + c.lock.RLock() + defer c.lock.RUnlock() + value, ok := c.cache.Get(uid) + if !ok { + return time.Time{}, false + } + t, ok := value.(time.Time) + if !ok { + return time.Time{}, false + } + return t, true +} diff --git a/pkg/kubelet/time_cache_test.go b/pkg/kubelet/time_cache_test.go new file mode 100644 index 00000000000..1d91e3391b5 --- /dev/null +++ b/pkg/kubelet/time_cache_test.go @@ -0,0 +1,55 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "testing" + "time" + + "github.com/golang/groupcache/lru" +) + +func TestTimeCache(t *testing.T) { + cache := &timeCache{cache: lru.New(2)} + if a, ok := cache.Get("123"); ok { + t.Errorf("expected cache miss, got %v, %v", a, ok) + } + + now := time.Now() + soon := now.Add(time.Minute) + cache.Add("now", now) + cache.Add("soon", soon) + + if a, ok := cache.Get("now"); !ok || !a.Equal(now) { + t.Errorf("expected cache hit matching %v, got %v, %v", now, a, ok) + } + if a, ok := cache.Get("soon"); !ok || !a.Equal(soon) { + t.Errorf("expected cache hit matching %v, got %v, %v", soon, a, ok) + } + + then := now.Add(-time.Minute) + cache.Add("then", then) + if a, ok := cache.Get("now"); ok { + t.Errorf("expected cache miss from oldest evicted value, got %v, %v", a, ok) + } + if a, ok := cache.Get("soon"); !ok || !a.Equal(soon) { + t.Errorf("expected cache hit matching %v, got %v, %v", soon, a, ok) + } + if a, ok := cache.Get("then"); !ok || !a.Equal(then) { + t.Errorf("expected cache hit matching %v, got %v, %v", then, a, ok) + } +}