From 39d9fa60e85ee606e276d6f4506b791fe71768bf Mon Sep 17 00:00:00 2001 From: David Ashpole Date: Tue, 6 Mar 2018 15:14:05 -0800 Subject: [PATCH] refresh eviction interval periodically --- pkg/kubelet/eviction/BUILD | 1 - pkg/kubelet/eviction/eviction_manager.go | 21 +++++----- pkg/kubelet/eviction/helpers.go | 40 +++++++++++++++++++ pkg/kubelet/eviction/helpers_test.go | 33 +++++++++++++++ .../eviction/threshold_notifier_linux.go | 14 +++---- pkg/kubelet/eviction/types.go | 12 +++++- 6 files changed, 101 insertions(+), 20 deletions(-) diff --git a/pkg/kubelet/eviction/BUILD b/pkg/kubelet/eviction/BUILD index caa68996fc6..03d662a5d05 100644 --- a/pkg/kubelet/eviction/BUILD +++ b/pkg/kubelet/eviction/BUILD @@ -94,7 +94,6 @@ go_library( "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/clock:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ] + select({ diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index e4b19e5aecd..f580e2951b6 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -27,7 +27,6 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/util/clock" - "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" apiv1resource "k8s.io/kubernetes/pkg/api/v1/resource" @@ -83,8 +82,8 @@ type managerImpl struct { resourceToNodeReclaimFuncs map[v1.ResourceName]nodeReclaimFuncs // last observations from synchronize lastObservations signalObservations - // notifiersInitialized indicates if the threshold notifiers have been initialized (i.e. synchronize() has been called once) - notifiersInitialized bool + // notifierStopCh is a channel used to stop all thresholdNotifiers + notifierStopCh ThresholdStopCh // dedicatedImageFs indicates if imagefs is on a separate device from the rootfs dedicatedImageFs *bool } @@ -114,6 +113,7 @@ func NewManager( nodeRef: nodeRef, nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, thresholdsFirstObservedAt: thresholdsObservedAt{}, + notifierStopCh: NewInitialStopCh(clock), dedicatedImageFs: nil, } return manager, manager @@ -184,8 +184,8 @@ func (m *managerImpl) IsUnderPIDPressure() bool { return hasNodeCondition(m.nodeConditions, v1.NodePIDPressure) } -func startMemoryThresholdNotifier(thresholds []evictionapi.Threshold, summary *statsapi.Summary, hard bool, handler thresholdNotifierHandlerFunc) error { - for _, threshold := range thresholds { +func (m *managerImpl) startMemoryThresholdNotifier(summary *statsapi.Summary, hard bool, handler thresholdNotifierHandlerFunc) error { + for _, threshold := range m.config.Thresholds { if threshold.Signal != evictionapi.SignalMemoryAvailable || hard != isHardEvictionThreshold(threshold) { continue } @@ -215,7 +215,7 @@ func startMemoryThresholdNotifier(thresholds []evictionapi.Threshold, summary *s if err != nil { return err } - go memcgThresholdNotifier.Start(wait.NeverStop) + go memcgThresholdNotifier.Start(m.notifierStopCh) return nil } return nil @@ -252,11 +252,10 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act } // attempt to create a threshold notifier to improve eviction response time - if m.config.KernelMemcgNotification && !m.notifiersInitialized { - glog.Infof("eviction manager attempting to integrate with kernel memcg notification api") - m.notifiersInitialized = true + if m.config.KernelMemcgNotification && m.notifierStopCh.Reset() { + glog.V(4).Infof("eviction manager attempting to integrate with kernel memcg notification api") // start soft memory notification - err = startMemoryThresholdNotifier(m.config.Thresholds, summary, false, func(desc string) { + err = m.startMemoryThresholdNotifier(summary, false, func(desc string) { glog.Infof("soft memory eviction threshold crossed at %s", desc) // TODO wait grace period for soft memory limit m.synchronize(diskInfoProvider, podFunc) @@ -265,7 +264,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act glog.Warningf("eviction manager: failed to create soft memory threshold notifier: %v", err) } // start hard memory notification - err = startMemoryThresholdNotifier(m.config.Thresholds, summary, true, func(desc string) { + err = m.startMemoryThresholdNotifier(summary, true, func(desc string) { glog.Infof("hard memory eviction threshold crossed at %s", desc) m.synchronize(diskInfoProvider, podFunc) }) diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index 92c10ad6f55..444dbbd1729 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -21,11 +21,13 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/golang/glog" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/util/clock" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" @@ -52,6 +54,9 @@ const ( resourceNodeFs v1.ResourceName = "nodefs" // nodefs inodes, number. internal to this module, used to account for local node root filesystem inodes. resourceNodeFsInodes v1.ResourceName = "nodefsInodes" + // this prevents constantly updating the memcg notifier if synchronize + // is run frequently. + notifierRefreshInterval = 10 * time.Second ) var ( @@ -1080,3 +1085,38 @@ func buildResourceToNodeReclaimFuncs(imageGC ImageGC, containerGC ContainerGC, w } return resourceToReclaimFunc } + +// thresholdStopCh is a ThresholdStopCh which can only be closed after notifierRefreshInterval time has passed +type thresholdStopCh struct { + lock sync.Mutex + ch chan struct{} + startTime time.Time + // used to track time + clock clock.Clock +} + +// NewInitialStopCh returns a ThresholdStopCh which can be closed immediately +func NewInitialStopCh(clock clock.Clock) ThresholdStopCh { + return &thresholdStopCh{ch: make(chan struct{}), clock: clock} +} + +// implements ThresholdStopCh.Reset +func (t *thresholdStopCh) Reset() (closed bool) { + t.lock.Lock() + defer t.lock.Unlock() + closed = t.clock.Since(t.startTime) > notifierRefreshInterval + if closed { + // close the old channel and reopen a new one + close(t.ch) + t.startTime = t.clock.Now() + t.ch = make(chan struct{}) + } + return +} + +// implements ThresholdStopCh.Ch +func (t *thresholdStopCh) Ch() <-chan struct{} { + t.lock.Lock() + defer t.lock.Unlock() + return t.ch +} diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index a61d19fee78..094940bea98 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -19,6 +19,7 @@ package eviction import ( "fmt" "reflect" + "sync" "testing" "time" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/clock" utilfeature "k8s.io/apiserver/pkg/util/feature" api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/features" @@ -1914,3 +1916,34 @@ func (s1 thresholdList) Equal(s2 thresholdList) bool { } return true } + +func TestThresholdStopCh(t *testing.T) { + var wg sync.WaitGroup + fakeClock := clock.NewFakeClock(time.Now()) + stop := NewInitialStopCh(fakeClock) + + // Should be able to reset the InitialStopCh right away + if !stop.Reset() { + t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful") + } + + // Need to wait notifierRefreshInterval before closing + if stop.Reset() { + t.Errorf("Expected not to be able to close the initialStopCh, but was successful") + } + + wg.Add(1) + ch := stop.Ch() + go func() { + defer wg.Done() + // wait for the channel to close + <-ch + }() + + fakeClock.Step(2 * notifierRefreshInterval) + if !stop.Reset() { + t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful") + } + // ensure the Reset() closed the channel + wg.Wait() +} diff --git a/pkg/kubelet/eviction/threshold_notifier_linux.go b/pkg/kubelet/eviction/threshold_notifier_linux.go index 1709cac8b79..89d7c79c8d7 100644 --- a/pkg/kubelet/eviction/threshold_notifier_linux.go +++ b/pkg/kubelet/eviction/threshold_notifier_linux.go @@ -67,7 +67,7 @@ func NewMemCGThresholdNotifier(path, attribute, threshold, description string, h unix.Close(eventfd) } }() - glog.V(2).Infof("eviction: setting notification threshold to %s", threshold) + glog.V(3).Infof("eviction: setting notification threshold to %s", threshold) config := fmt.Sprintf("%d %d %s", eventfd, watchfd, threshold) _, err = unix.Write(controlfd, []byte(config)) if err != nil { @@ -82,7 +82,7 @@ func NewMemCGThresholdNotifier(path, attribute, threshold, description string, h }, nil } -func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stopCh <-chan struct{}) { +func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stop ThresholdStopCh) { for { buf := make([]byte, 8) _, err := unix.Read(eventfd, buf) @@ -92,19 +92,19 @@ func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stopCh <-chan stru select { case eventCh <- struct{}{}: - case <-stopCh: + case <-stop.Ch(): return } } } -func (n *memcgThresholdNotifier) Start(stopCh <-chan struct{}) { +func (n *memcgThresholdNotifier) Start(stop ThresholdStopCh) { eventCh := make(chan struct{}) - go getThresholdEvents(n.eventfd, eventCh, stopCh) + go getThresholdEvents(n.eventfd, eventCh, stop) for { select { - case <-stopCh: - glog.V(2).Infof("eviction: stopping threshold notifier") + case <-stop.Ch(): + glog.V(3).Infof("eviction: stopping threshold notifier") unix.Close(n.watchfd) unix.Close(n.controlfd) unix.Close(n.eventfd) diff --git a/pkg/kubelet/eviction/types.go b/pkg/kubelet/eviction/types.go index e4fca84d7a0..cc192b8b5e7 100644 --- a/pkg/kubelet/eviction/types.go +++ b/pkg/kubelet/eviction/types.go @@ -132,7 +132,17 @@ type nodeReclaimFuncs []nodeReclaimFunc // thresholdNotifierHandlerFunc is a function that takes action in response to a crossed threshold type thresholdNotifierHandlerFunc func(thresholdDescription string) +// ThresholdStopCh is an interface for a channel which is closed to stop waiting goroutines. +// Implementations of ThresholdStopCh must correctly handle concurrent calls to all functions. +type ThresholdStopCh interface { + // Reset closes the channel if it can be closed, and returns true if it was closed. + // Reset also creates a new channel. + Reset() bool + // Ch returns the channel that is closed when Reset() is called + Ch() <-chan struct{} +} + // ThresholdNotifier notifies the user when an attribute crosses a threshold value type ThresholdNotifier interface { - Start(stopCh <-chan struct{}) + Start(ThresholdStopCh) }