diff --git a/pkg/kubelet/eviction/BUILD b/pkg/kubelet/eviction/BUILD index 08c744ba87d..a6e6c847694 100644 --- a/pkg/kubelet/eviction/BUILD +++ b/pkg/kubelet/eviction/BUILD @@ -11,6 +11,8 @@ go_test( srcs = [ "eviction_manager_test.go", "helpers_test.go", + "memory_threshold_notifier_test.go", + "mock_threshold_notifier_test.go", ], embed = [":go_default_library"], deps = [ @@ -20,6 +22,7 @@ go_test( "//pkg/kubelet/eviction/api:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/types:go_default_library", + "//vendor/github.com/stretchr/testify/mock:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", @@ -36,6 +39,7 @@ go_library( "doc.go", "eviction_manager.go", "helpers.go", + "memory_threshold_notifier.go", "types.go", ] + select({ "@io_bazel_rules_go//go/platform:android": [ diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index cadc6afaa6c..0f5738bf515 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -19,7 +19,6 @@ package eviction import ( "fmt" "sort" - "strconv" "sync" "time" @@ -35,7 +34,6 @@ import ( v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos" "k8s.io/kubernetes/pkg/features" statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" - "k8s.io/kubernetes/pkg/kubelet/cm" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" @@ -85,10 +83,12 @@ type managerImpl struct { signalToNodeReclaimFuncs map[evictionapi.Signal]nodeReclaimFuncs // last observations from synchronize lastObservations signalObservations - // notifierStopCh is a channel used to stop all thresholdNotifiers - notifierStopCh ThresholdStopCh // dedicatedImageFs indicates if imagefs is on a separate device from the rootfs dedicatedImageFs *bool + // thresholdNotifiers is a list of memory threshold notifiers which each notify for a memory eviction threshold + thresholdNotifiers []ThresholdNotifier + // thresholdsLastUpdated is the last time the thresholdNotifiers were updated. + thresholdsLastUpdated time.Time } // ensure it implements the required interface @@ -116,8 +116,8 @@ func NewManager( nodeRef: nodeRef, nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, thresholdsFirstObservedAt: thresholdsObservedAt{}, - notifierStopCh: NewInitialStopCh(clock), dedicatedImageFs: nil, + thresholdNotifiers: []ThresholdNotifier{}, } return manager, manager } @@ -163,6 +163,23 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd // Start starts the control loop to observe and response to low compute resources. func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, monitoringInterval time.Duration) { + thresholdHandler := func(message string) { + glog.Infof(message) + m.synchronize(diskInfoProvider, podFunc) + } + if m.config.KernelMemcgNotification { + for _, threshold := range m.config.Thresholds { + if threshold.Signal == evictionapi.SignalMemoryAvailable || threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable { + notifier, err := NewMemoryThresholdNotifier(threshold, m.config.PodCgroupRoot, &CgroupNotifierFactory{}, thresholdHandler) + if err != nil { + glog.Warningf("eviction manager: failed to create memory threshold notifier: %v", err) + } else { + go notifier.Start() + m.thresholdNotifiers = append(m.thresholdNotifiers, notifier) + } + } + } + } // start the eviction manager monitoring go func() { for { @@ -197,51 +214,6 @@ func (m *managerImpl) IsUnderPIDPressure() bool { return hasNodeCondition(m.nodeConditions, v1.NodePIDPressure) } -func (m *managerImpl) startMemoryThresholdNotifier(summary *statsapi.Summary, hard, allocatable bool, handler thresholdNotifierHandlerFunc) error { - for _, threshold := range m.config.Thresholds { - if threshold.Signal != evictionapi.SignalMemoryAvailable || hard != isHardEvictionThreshold(threshold) { - continue - } - cgroups, err := cm.GetCgroupSubsystems() - if err != nil { - return err - } - cgpath, found := cgroups.MountPoints["memory"] - if !found || len(cgpath) == 0 { - return fmt.Errorf("memory cgroup mount point not found") - } - attribute := "memory.usage_in_bytes" - memoryStats := summary.Node.Memory - if allocatable { - cgpath += m.config.PodCgroupRoot - allocatableContainer, err := getSysContainer(summary.Node.SystemContainers, statsapi.SystemContainerPods) - if err != nil { - return err - } - memoryStats = allocatableContainer.Memory - } - if memoryStats == nil || memoryStats.UsageBytes == nil || memoryStats.WorkingSetBytes == nil || memoryStats.AvailableBytes == nil { - return fmt.Errorf("summary was incomplete") - } - // Set threshold on usage to capacity - eviction_hard + inactive_file, - // since we want to be notified when working_set = capacity - eviction_hard - inactiveFile := resource.NewQuantity(int64(*memoryStats.UsageBytes-*memoryStats.WorkingSetBytes), resource.BinarySI) - capacity := resource.NewQuantity(int64(*memoryStats.AvailableBytes+*memoryStats.WorkingSetBytes), resource.BinarySI) - evictionThresholdQuantity := evictionapi.GetThresholdQuantity(threshold.Value, capacity) - memcgThreshold := capacity.DeepCopy() - memcgThreshold.Sub(*evictionThresholdQuantity) - memcgThreshold.Add(*inactiveFile) - description := fmt.Sprintf("<%s available", formatThresholdValue(threshold.Value)) - memcgThresholdNotifier, err := NewMemCGThresholdNotifier(cgpath, attribute, strconv.FormatInt(memcgThreshold.Value(), 10), description, handler) - if err != nil { - return err - } - go memcgThresholdNotifier.Start(m.notifierStopCh) - return nil - } - return nil -} - // synchronize is the main control loop that enforces eviction thresholds. // Returns the pod that was killed, or nil if no pod was killed. func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) []*v1.Pod { @@ -272,41 +244,12 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act return nil } - // attempt to create a threshold notifier to improve eviction response time - if m.config.KernelMemcgNotification && m.notifierStopCh.Reset() { - glog.V(4).Infof("eviction manager attempting to integrate with kernel memcg notification api") - // start soft memory notification - err = m.startMemoryThresholdNotifier(summary, false, false, func(desc string) { - glog.Infof("soft memory eviction threshold crossed at %s", desc) - // TODO wait grace period for soft memory limit - m.synchronize(diskInfoProvider, podFunc) - }) - if err != nil { - glog.Warningf("eviction manager: failed to create soft memory threshold notifier: %v", err) - } // start soft memory notification - err = m.startMemoryThresholdNotifier(summary, false, true, func(desc string) { - glog.Infof("soft allocatable memory eviction threshold crossed at %s", desc) - // TODO wait grace period for soft memory limit - m.synchronize(diskInfoProvider, podFunc) - }) - if err != nil { - glog.Warningf("eviction manager: failed to create allocatable soft memory threshold notifier: %v", err) - } - // start hard memory notification - err = m.startMemoryThresholdNotifier(summary, true, false, func(desc string) { - glog.Infof("hard memory eviction threshold crossed at %s", desc) - m.synchronize(diskInfoProvider, podFunc) - }) - if err != nil { - glog.Warningf("eviction manager: failed to create hard memory threshold notifier: %v", err) - } - // start hard memory notification - err = m.startMemoryThresholdNotifier(summary, true, true, func(desc string) { - glog.Infof("hard allocatable memory eviction threshold crossed at %s", desc) - m.synchronize(diskInfoProvider, podFunc) - }) - if err != nil { - glog.Warningf("eviction manager: failed to create hard allocatable memory threshold notifier: %v", err) + if m.clock.Since(m.thresholdsLastUpdated) > notifierRefreshInterval { + m.thresholdsLastUpdated = m.clock.Now() + for _, notifier := range m.thresholdNotifiers { + if err := notifier.UpdateThreshold(summary); err != nil { + glog.Warningf("eviction manager: failed to update %s: %v", notifier.Description(), err) + } } } diff --git a/pkg/kubelet/eviction/eviction_manager_test.go b/pkg/kubelet/eviction/eviction_manager_test.go index 9ec64c4089a..e8b4a4e36d3 100644 --- a/pkg/kubelet/eviction/eviction_manager_test.go +++ b/pkg/kubelet/eviction/eviction_manager_test.go @@ -17,6 +17,7 @@ limitations under the License. package eviction import ( + "fmt" "testing" "time" @@ -1434,3 +1435,74 @@ func TestAllocatableMemoryPressure(t *testing.T) { } } } + +func TestUpdateMemcgThreshold(t *testing.T) { + activePodsFunc := func() []*v1.Pod { + return []*v1.Pod{} + } + + fakeClock := clock.NewFakeClock(time.Now()) + podKiller := &mockPodKiller{} + diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} + diskGC := &mockDiskGC{err: nil} + nodeRef := &v1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} + + config := Config{ + MaxPodGracePeriodSeconds: 5, + PressureTransitionPeriod: time.Minute * 5, + Thresholds: []evictionapi.Threshold{ + { + Signal: evictionapi.SignalMemoryAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("1Gi"), + }, + }, + }, + PodCgroupRoot: "kubepods", + } + summaryProvider := &fakeSummaryProvider{result: makeMemoryStats("2Gi", map[*v1.Pod]statsapi.PodStats{})} + + thresholdNotifier := &MockThresholdNotifier{} + thresholdNotifier.On("UpdateThreshold", summaryProvider.result).Return(nil).Twice() + + manager := &managerImpl{ + clock: fakeClock, + killPodFunc: podKiller.killPodNow, + imageGC: diskGC, + containerGC: diskGC, + config: config, + recorder: &record.FakeRecorder{}, + summaryProvider: summaryProvider, + nodeRef: nodeRef, + nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, + thresholdsFirstObservedAt: thresholdsObservedAt{}, + thresholdNotifiers: []ThresholdNotifier{thresholdNotifier}, + } + + manager.synchronize(diskInfoProvider, activePodsFunc) + // The UpdateThreshold method should have been called once, since this is the first run. + thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 1) + + manager.synchronize(diskInfoProvider, activePodsFunc) + // The UpdateThreshold method should not have been called again, since not enough time has passed + thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 1) + + fakeClock.Step(2 * notifierRefreshInterval) + manager.synchronize(diskInfoProvider, activePodsFunc) + // The UpdateThreshold method should be called again since enough time has passed + thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 2) + + // new memory threshold notifier that returns an error + thresholdNotifier = &MockThresholdNotifier{} + thresholdNotifier.On("UpdateThreshold", summaryProvider.result).Return(fmt.Errorf("error updating threshold")) + thresholdNotifier.On("Description").Return("mock thresholdNotifier").Once() + manager.thresholdNotifiers = []ThresholdNotifier{thresholdNotifier} + + fakeClock.Step(2 * notifierRefreshInterval) + manager.synchronize(diskInfoProvider, activePodsFunc) + // The UpdateThreshold method should be called because at least notifierRefreshInterval time has passed. + // The Description method should be called because UpdateThreshold returned an error + thresholdNotifier.AssertNumberOfCalls(t, "UpdateThreshold", 1) + thresholdNotifier.AssertNumberOfCalls(t, "Description", 1) +} diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index 1c309360a39..752d62329b1 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -21,13 +21,11 @@ import ( "sort" "strconv" "strings" - "sync" "time" "github.com/golang/glog" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/clock" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" @@ -52,9 +50,6 @@ const ( emptyDirMessage = "Usage of EmptyDir volume %q exceeds the limit %q. " // inodes, number. internal to this module, used to account for local disk inode consumption. resourceInodes v1.ResourceName = "inodes" - // this prevents constantly updating the memcg notifier if synchronize - // is run frequently. - notifierRefreshInterval = 10 * time.Second // OffendingContainersKey is the key in eviction event annotations for the list of container names which exceeded their requests OffendingContainersKey = "offending_containers" // OffendingContainersUsageKey is the key in eviction event annotations for the list of usage of containers which exceeded their requests @@ -1007,6 +1002,10 @@ func isHardEvictionThreshold(threshold evictionapi.Threshold) bool { return threshold.GracePeriod == time.Duration(0) } +func isAllocatableEvictionThreshold(threshold evictionapi.Threshold) bool { + return threshold.Signal == evictionapi.SignalAllocatableMemoryAvailable +} + // buildSignalToRankFunc returns ranking functions associated with resources func buildSignalToRankFunc(withImageFs bool) map[evictionapi.Signal]rankFunc { signalToRankFunc := map[evictionapi.Signal]rankFunc{ @@ -1097,38 +1096,3 @@ func evictionMessage(resourceToReclaim v1.ResourceName, pod *v1.Pod, stats stats annotations[StarvedResourceKey] = string(resourceToReclaim) return } - -// thresholdStopCh is a ThresholdStopCh which can only be closed after notifierRefreshInterval time has passed -type thresholdStopCh struct { - lock sync.Mutex - ch chan struct{} - startTime time.Time - // used to track time - clock clock.Clock -} - -// NewInitialStopCh returns a ThresholdStopCh which can be closed immediately -func NewInitialStopCh(clock clock.Clock) ThresholdStopCh { - return &thresholdStopCh{ch: make(chan struct{}), clock: clock} -} - -// implements ThresholdStopCh.Reset -func (t *thresholdStopCh) Reset() (closed bool) { - t.lock.Lock() - defer t.lock.Unlock() - closed = t.clock.Since(t.startTime) > notifierRefreshInterval - if closed { - // close the old channel and reopen a new one - close(t.ch) - t.startTime = t.clock.Now() - t.ch = make(chan struct{}) - } - return -} - -// implements ThresholdStopCh.Ch -func (t *thresholdStopCh) Ch() <-chan struct{} { - t.lock.Lock() - defer t.lock.Unlock() - return t.ch -} diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index d7ce3c8586f..0ed5a1b673f 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -20,7 +20,6 @@ import ( "fmt" "reflect" "sort" - "sync" "testing" "time" @@ -28,7 +27,6 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/clock" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/features" statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" @@ -1948,34 +1946,3 @@ func (s1 thresholdList) Equal(s2 thresholdList) bool { } return true } - -func TestThresholdStopCh(t *testing.T) { - var wg sync.WaitGroup - fakeClock := clock.NewFakeClock(time.Now()) - stop := NewInitialStopCh(fakeClock) - - // Should be able to reset the InitialStopCh right away - if !stop.Reset() { - t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful") - } - - // Need to wait notifierRefreshInterval before closing - if stop.Reset() { - t.Errorf("Expected not to be able to close the initialStopCh, but was successful") - } - - wg.Add(1) - ch := stop.Ch() - go func() { - defer wg.Done() - // wait for the channel to close - <-ch - }() - - fakeClock.Step(2 * notifierRefreshInterval) - if !stop.Reset() { - t.Errorf("Expected to be able to close the initialStopCh, but was unsuccessful") - } - // ensure the Reset() closed the channel - wg.Wait() -} diff --git a/pkg/kubelet/eviction/memory_threshold_notifier.go b/pkg/kubelet/eviction/memory_threshold_notifier.go new file mode 100644 index 00000000000..8d86944f39d --- /dev/null +++ b/pkg/kubelet/eviction/memory_threshold_notifier.go @@ -0,0 +1,135 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eviction + +import ( + "fmt" + "time" + + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/api/resource" + statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/cm" + evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" +) + +const ( + memoryUsageAttribute = "memory.usage_in_bytes" + // this prevents constantly updating the memcg notifier if synchronize + // is run frequently. + notifierRefreshInterval = 10 * time.Second +) + +type memoryThresholdNotifier struct { + threshold evictionapi.Threshold + cgroupPath string + events chan struct{} + factory NotifierFactory + handler func(string) + notifier CgroupNotifier +} + +var _ ThresholdNotifier = &memoryThresholdNotifier{} + +// NewMemoryThresholdNotifier creates a ThresholdNotifier which is designed to respond to the given threshold. +// UpdateThreshold must be called once before the threshold will be active. +func NewMemoryThresholdNotifier(threshold evictionapi.Threshold, cgroupRoot string, factory NotifierFactory, handler func(string)) (ThresholdNotifier, error) { + cgroups, err := cm.GetCgroupSubsystems() + if err != nil { + return nil, err + } + cgpath, found := cgroups.MountPoints["memory"] + if !found || len(cgpath) == 0 { + return nil, fmt.Errorf("memory cgroup mount point not found") + } + if isAllocatableEvictionThreshold(threshold) { + // for allocatable thresholds, point the cgroup notifier at the allocatable cgroup + cgpath += cgroupRoot + } + return &memoryThresholdNotifier{ + threshold: threshold, + cgroupPath: cgpath, + events: make(chan struct{}), + handler: handler, + factory: factory, + }, nil +} + +func (m *memoryThresholdNotifier) Start() { + glog.Infof("eviction manager: created %s", m.Description()) + for range m.events { + m.handler(fmt.Sprintf("eviction manager: %s crossed", m.Description())) + } +} + +func (m *memoryThresholdNotifier) UpdateThreshold(summary *statsapi.Summary) error { + memoryStats := summary.Node.Memory + if isAllocatableEvictionThreshold(m.threshold) { + allocatableContainer, err := getSysContainer(summary.Node.SystemContainers, statsapi.SystemContainerPods) + if err != nil { + return err + } + memoryStats = allocatableContainer.Memory + } + if memoryStats == nil || memoryStats.UsageBytes == nil || memoryStats.WorkingSetBytes == nil || memoryStats.AvailableBytes == nil { + return fmt.Errorf("summary was incomplete. Expected MemoryStats and all subfields to be non-nil, but got %+v", memoryStats) + } + // Set threshold on usage to capacity - eviction_hard + inactive_file, + // since we want to be notified when working_set = capacity - eviction_hard + inactiveFile := resource.NewQuantity(int64(*memoryStats.UsageBytes-*memoryStats.WorkingSetBytes), resource.BinarySI) + capacity := resource.NewQuantity(int64(*memoryStats.AvailableBytes+*memoryStats.WorkingSetBytes), resource.BinarySI) + evictionThresholdQuantity := evictionapi.GetThresholdQuantity(m.threshold.Value, capacity) + memcgThreshold := capacity.DeepCopy() + memcgThreshold.Sub(*evictionThresholdQuantity) + memcgThreshold.Add(*inactiveFile) + + glog.V(3).Infof("eviction manager: setting %s to %s\n", m.Description(), memcgThreshold.String()) + if m.notifier != nil { + m.notifier.Stop() + } + newNotifier, err := m.factory.NewCgroupNotifier(m.cgroupPath, memoryUsageAttribute, memcgThreshold.Value()) + if err != nil { + return err + } + m.notifier = newNotifier + go m.notifier.Start(m.events) + return nil +} + +func (m *memoryThresholdNotifier) Description() string { + var hard, allocatable string + if isHardEvictionThreshold(m.threshold) { + hard = "hard " + } else { + hard = "soft " + } + if isAllocatableEvictionThreshold(m.threshold) { + allocatable = "allocatable " + } + return fmt.Sprintf("%s%smemory eviction threshold", hard, allocatable) +} + +var _ NotifierFactory = &CgroupNotifierFactory{} + +// CgroupNotifierFactory knows how to make CgroupNotifiers which integrate with the kernel +type CgroupNotifierFactory struct{} + +// NewCgroupNotifier implements the NotifierFactory interface +func (n *CgroupNotifierFactory) NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) { + return NewCgroupNotifier(path, attribute, threshold) +} diff --git a/pkg/kubelet/eviction/memory_threshold_notifier_test.go b/pkg/kubelet/eviction/memory_threshold_notifier_test.go new file mode 100644 index 00000000000..8317e051fe6 --- /dev/null +++ b/pkg/kubelet/eviction/memory_threshold_notifier_test.go @@ -0,0 +1,270 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eviction + +import ( + "fmt" + "strings" + "sync" + "testing" + "time" + + "k8s.io/apimachinery/pkg/api/resource" + statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" + evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" +) + +const testCgroupPath = "/sys/fs/cgroups/memory" + +func nodeSummary(available, workingSet, usage resource.Quantity, allocatable bool) *statsapi.Summary { + availableBytes := uint64(available.Value()) + workingSetBytes := uint64(workingSet.Value()) + usageBytes := uint64(usage.Value()) + memoryStats := statsapi.MemoryStats{ + AvailableBytes: &availableBytes, + WorkingSetBytes: &workingSetBytes, + UsageBytes: &usageBytes, + } + if allocatable { + return &statsapi.Summary{ + Node: statsapi.NodeStats{ + SystemContainers: []statsapi.ContainerStats{ + { + Name: statsapi.SystemContainerPods, + Memory: &memoryStats, + }, + }, + }, + } + } + return &statsapi.Summary{ + Node: statsapi.NodeStats{ + Memory: &memoryStats, + }, + } +} + +func newTestMemoryThresholdNotifier(threshold evictionapi.Threshold, factory NotifierFactory, handler func(string)) *memoryThresholdNotifier { + return &memoryThresholdNotifier{ + threshold: threshold, + cgroupPath: testCgroupPath, + events: make(chan struct{}), + factory: factory, + handler: handler, + } +} + +func TestUpdateThreshold(t *testing.T) { + testCases := []struct { + description string + available resource.Quantity + workingSet resource.Quantity + usage resource.Quantity + evictionThreshold evictionapi.Threshold + expectedThreshold resource.Quantity + updateThresholdErr error + expectErr bool + }{ + { + description: "node level threshold", + available: resource.MustParse("3Gi"), + usage: resource.MustParse("2Gi"), + workingSet: resource.MustParse("1Gi"), + evictionThreshold: evictionapi.Threshold{ + Signal: evictionapi.SignalMemoryAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("1Gi"), + }, + }, + expectedThreshold: resource.MustParse("4Gi"), + updateThresholdErr: nil, + expectErr: false, + }, + { + description: "allocatable threshold", + available: resource.MustParse("4Gi"), + usage: resource.MustParse("3Gi"), + workingSet: resource.MustParse("1Gi"), + evictionThreshold: evictionapi.Threshold{ + Signal: evictionapi.SignalAllocatableMemoryAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("1Gi"), + }, + }, + expectedThreshold: resource.MustParse("6Gi"), + updateThresholdErr: nil, + expectErr: false, + }, + { + description: "error updating node level threshold", + available: resource.MustParse("3Gi"), + usage: resource.MustParse("2Gi"), + workingSet: resource.MustParse("1Gi"), + evictionThreshold: evictionapi.Threshold{ + Signal: evictionapi.SignalMemoryAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("1Gi"), + }, + }, + expectedThreshold: resource.MustParse("4Gi"), + updateThresholdErr: fmt.Errorf("unexpected error"), + expectErr: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + notifierFactory := &MockNotifierFactory{} + notifier := &MockCgroupNotifier{} + m := newTestMemoryThresholdNotifier(tc.evictionThreshold, notifierFactory, nil) + notifierFactory.On("NewCgroupNotifier", testCgroupPath, memoryUsageAttribute, tc.expectedThreshold.Value()).Return(notifier, tc.updateThresholdErr) + var events chan<- struct{} + events = m.events + notifier.On("Start", events).Return() + err := m.UpdateThreshold(nodeSummary(tc.available, tc.workingSet, tc.usage, isAllocatableEvictionThreshold(tc.evictionThreshold))) + if err != nil && !tc.expectErr { + t.Errorf("Unexpected error updating threshold: %v", err) + } else if err == nil && tc.expectErr { + t.Errorf("Expected error updating threshold, but got nil") + } + if !tc.expectErr { + notifierFactory.AssertNumberOfCalls(t, "NewCgroupNotifier", 1) + } + }) + } +} + +func TestStart(t *testing.T) { + noResources := resource.MustParse("0") + threshold := evictionapi.Threshold{ + Signal: evictionapi.SignalMemoryAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: &noResources, + }, + } + notifier := &MockCgroupNotifier{} + notifierFactory := &MockNotifierFactory{} + + var wg sync.WaitGroup + wg.Add(4) + m := newTestMemoryThresholdNotifier(threshold, notifierFactory, func(string) { + wg.Done() + }) + notifierFactory.On("NewCgroupNotifier", testCgroupPath, memoryUsageAttribute, int64(0)).Return(notifier, nil) + var events chan<- struct{} + events = m.events + notifier.On("Start", events).Return() + notifier.On("Stop").Return() + + err := m.UpdateThreshold(nodeSummary(noResources, noResources, noResources, isAllocatableEvictionThreshold(threshold))) + if err != nil { + t.Errorf("Unexpected error updating threshold: %v", err) + } + notifierFactory.AssertNumberOfCalls(t, "NewCgroupNotifier", 1) + + go m.Start() + + for i := 0; i < 4; i++ { + m.events <- struct{}{} + } + wg.Wait() +} + +func TestThresholdDescription(t *testing.T) { + testCases := []struct { + description string + evictionThreshold evictionapi.Threshold + expectedSubstrings []string + omittedSubstrings []string + }{ + { + description: "hard node level threshold", + evictionThreshold: evictionapi.Threshold{ + Signal: evictionapi.SignalMemoryAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("2Gi"), + }, + }, + expectedSubstrings: []string{"hard"}, + omittedSubstrings: []string{"allocatable", "soft"}, + }, + { + description: "soft node level threshold", + evictionThreshold: evictionapi.Threshold{ + Signal: evictionapi.SignalMemoryAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("2Gi"), + }, + GracePeriod: time.Minute * 2, + }, + expectedSubstrings: []string{"soft"}, + omittedSubstrings: []string{"allocatable", "hard"}, + }, + { + description: "hard allocatable threshold", + evictionThreshold: evictionapi.Threshold{ + Signal: evictionapi.SignalAllocatableMemoryAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("2Gi"), + }, + GracePeriod: time.Minute * 2, + }, + expectedSubstrings: []string{"soft", "allocatable"}, + omittedSubstrings: []string{"hard"}, + }, + { + description: "soft allocatable threshold", + evictionThreshold: evictionapi.Threshold{ + Signal: evictionapi.SignalAllocatableMemoryAvailable, + Operator: evictionapi.OpLessThan, + Value: evictionapi.ThresholdValue{ + Quantity: quantityMustParse("2Gi"), + }, + }, + expectedSubstrings: []string{"hard", "allocatable"}, + omittedSubstrings: []string{"soft"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + m := &memoryThresholdNotifier{ + notifier: &MockCgroupNotifier{}, + threshold: tc.evictionThreshold, + cgroupPath: testCgroupPath, + } + desc := m.Description() + for _, expected := range tc.expectedSubstrings { + if !strings.Contains(desc, expected) { + t.Errorf("expected description for notifier with threshold %+v to contain %s, but it did not", tc.evictionThreshold, expected) + } + } + for _, omitted := range tc.omittedSubstrings { + if strings.Contains(desc, omitted) { + t.Errorf("expected description for notifier with threshold %+v NOT to contain %s, but it did", tc.evictionThreshold, omitted) + } + } + }) + } +} diff --git a/pkg/kubelet/eviction/mock_threshold_notifier_test.go b/pkg/kubelet/eviction/mock_threshold_notifier_test.go new file mode 100644 index 00000000000..c94e6fe5f32 --- /dev/null +++ b/pkg/kubelet/eviction/mock_threshold_notifier_test.go @@ -0,0 +1,98 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eviction + +import ( + mock "github.com/stretchr/testify/mock" + statsapi "k8s.io/kubernetes/pkg/kubelet/apis/stats/v1alpha1" +) + +// MockCgroupNotifier is a mock implementation of the CgroupNotifier interface +type MockCgroupNotifier struct { + mock.Mock +} + +// Start implements the NotifierFactory interface +func (m *MockCgroupNotifier) Start(a0 chan<- struct{}) { + m.Called(a0) +} + +// Stop implements the NotifierFactory interface +func (m *MockCgroupNotifier) Stop() { + m.Called() +} + +// MockNotifierFactory is a mock of the NotifierFactory interface +type MockNotifierFactory struct { + mock.Mock +} + +// NewCgroupNotifier implements the NotifierFactory interface +func (m *MockNotifierFactory) NewCgroupNotifier(a0, a1 string, a2 int64) (CgroupNotifier, error) { + ret := m.Called(a0, a1, a2) + + var r0 CgroupNotifier + if rf, ok := ret.Get(0).(func(string, string, int64) CgroupNotifier); ok { + r0 = rf(a0, a1, a2) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(CgroupNotifier) + } + } + var r1 error + if rf, ok := ret.Get(1).(func(string, string, int64) error); ok { + r1 = rf(a0, a1, a2) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockThresholdNotifier is a mock implementation of the ThresholdNotifier interface +type MockThresholdNotifier struct { + mock.Mock +} + +// Start implements the ThresholdNotifier interface +func (m *MockThresholdNotifier) Start() { + m.Called() +} + +// UpdateThreshold implements the ThresholdNotifier interface +func (m *MockThresholdNotifier) UpdateThreshold(a0 *statsapi.Summary) error { + ret := m.Called(a0) + + var r0 error + if rf, ok := ret.Get(0).(func(*statsapi.Summary) error); ok { + r0 = rf(a0) + } else { + r0 = ret.Error(0) + } + return r0 +} + +// Description implements the ThresholdNotifier interface +func (m *MockThresholdNotifier) Description() string { + ret := m.Called() + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.String(0) + } + return r0 +} diff --git a/pkg/kubelet/eviction/threshold_notifier_linux.go b/pkg/kubelet/eviction/threshold_notifier_linux.go index 89d7c79c8d7..4fabd7345a5 100644 --- a/pkg/kubelet/eviction/threshold_notifier_linux.go +++ b/pkg/kubelet/eviction/threshold_notifier_linux.go @@ -18,43 +18,47 @@ package eviction import ( "fmt" + "sync" + "time" "github.com/golang/glog" "golang.org/x/sys/unix" ) -type memcgThresholdNotifier struct { - watchfd int - controlfd int - eventfd int - handler thresholdNotifierHandlerFunc - description string +const ( + // eventSize is the number of bytes returned by a successful read from an eventfd + // see http://man7.org/linux/man-pages/man2/eventfd.2.html for more information + eventSize = 8 + // numFdEvents is the number of events we can record at once. + // If EpollWait finds more than this, they will be missed. + numFdEvents = 6 +) + +type linuxCgroupNotifier struct { + eventfd int + epfd int + stop chan struct{} + stopLock sync.Mutex } -var _ ThresholdNotifier = &memcgThresholdNotifier{} +var _ CgroupNotifier = &linuxCgroupNotifier{} -// NewMemCGThresholdNotifier sends notifications when a cgroup threshold -// is crossed (in either direction) for a given cgroup attribute -func NewMemCGThresholdNotifier(path, attribute, threshold, description string, handler thresholdNotifierHandlerFunc) (ThresholdNotifier, error) { - watchfd, err := unix.Open(fmt.Sprintf("%s/%s", path, attribute), unix.O_RDONLY, 0) +// NewCgroupNotifier returns a linuxCgroupNotifier, which performs cgroup control operations required +// to receive notifications from the cgroup when the threshold is crossed in either direction. +func NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) { + var watchfd, eventfd, epfd, controlfd int + var err error + watchfd, err = unix.Open(fmt.Sprintf("%s/%s", path, attribute), unix.O_RDONLY, 0) if err != nil { return nil, err } - defer func() { - if err != nil { - unix.Close(watchfd) - } - }() - controlfd, err := unix.Open(fmt.Sprintf("%s/cgroup.event_control", path), unix.O_WRONLY, 0) + defer unix.Close(watchfd) + controlfd, err = unix.Open(fmt.Sprintf("%s/cgroup.event_control", path), unix.O_WRONLY, 0) if err != nil { return nil, err } - defer func() { - if err != nil { - unix.Close(controlfd) - } - }() - eventfd, err := unix.Eventfd(0, unix.EFD_CLOEXEC) + defer unix.Close(controlfd) + eventfd, err = unix.Eventfd(0, unix.EFD_CLOEXEC) if err != nil { return nil, err } @@ -63,55 +67,119 @@ func NewMemCGThresholdNotifier(path, attribute, threshold, description string, h return nil, err } defer func() { + // Close eventfd if we get an error later in initialization if err != nil { unix.Close(eventfd) } }() - glog.V(3).Infof("eviction: setting notification threshold to %s", threshold) - config := fmt.Sprintf("%d %d %s", eventfd, watchfd, threshold) + epfd, err = unix.EpollCreate1(0) + if err != nil { + return nil, err + } + if epfd < 0 { + err = fmt.Errorf("EpollCreate1 call failed") + return nil, err + } + defer func() { + // Close epfd if we get an error later in initialization + if err != nil { + unix.Close(epfd) + } + }() + config := fmt.Sprintf("%d %d %d", eventfd, watchfd, threshold) _, err = unix.Write(controlfd, []byte(config)) if err != nil { return nil, err } - return &memcgThresholdNotifier{ - watchfd: watchfd, - controlfd: controlfd, - eventfd: eventfd, - handler: handler, - description: description, + return &linuxCgroupNotifier{ + eventfd: eventfd, + epfd: epfd, + stop: make(chan struct{}), }, nil } -func getThresholdEvents(eventfd int, eventCh chan<- struct{}, stop ThresholdStopCh) { +func (n *linuxCgroupNotifier) Start(eventCh chan<- struct{}) { + err := unix.EpollCtl(n.epfd, unix.EPOLL_CTL_ADD, n.eventfd, &unix.EpollEvent{ + Fd: int32(n.eventfd), + Events: unix.EPOLLIN, + }) + if err != nil { + glog.Warningf("eviction manager: error adding epoll eventfd: %v", err) + return + } for { - buf := make([]byte, 8) - _, err := unix.Read(eventfd, buf) + select { + case <-n.stop: + return + default: + } + event, err := wait(n.epfd, n.eventfd, notifierRefreshInterval) if err != nil { + glog.Warningf("eviction manager: error while waiting for memcg events: %v", err) + return + } else if !event { + // Timeout on wait. This is expected if the threshold was not crossed + continue + } + // Consume the event from the eventfd + buf := make([]byte, eventSize) + _, err = unix.Read(n.eventfd, buf) + if err != nil { + glog.Warningf("eviction manager: error reading memcg events: %v", err) return } - - select { - case eventCh <- struct{}{}: - case <-stop.Ch(): - return - } + eventCh <- struct{}{} } } -func (n *memcgThresholdNotifier) Start(stop ThresholdStopCh) { - eventCh := make(chan struct{}) - go getThresholdEvents(n.eventfd, eventCh, stop) - for { - select { - case <-stop.Ch(): - glog.V(3).Infof("eviction: stopping threshold notifier") - unix.Close(n.watchfd) - unix.Close(n.controlfd) - unix.Close(n.eventfd) - return - case <-eventCh: - glog.V(2).Infof("eviction: threshold crossed") - n.handler(n.description) +// wait waits up to notifierRefreshInterval for an event on the Epoll FD for the +// eventfd we are concerned about. It returns an error if one occurrs, and true +// if the consumer should read from the eventfd. +func wait(epfd, eventfd int, timeout time.Duration) (bool, error) { + events := make([]unix.EpollEvent, numFdEvents+1) + timeoutMS := int(timeout / time.Millisecond) + n, err := unix.EpollWait(epfd, events, timeoutMS) + if n == -1 { + if err == unix.EINTR { + // Interrupt, ignore the error + return false, nil + } + return false, err + } + if n == 0 { + // Timeout + return false, nil + } + if n > numFdEvents { + return false, fmt.Errorf("epoll_wait returned more events than we know what to do with") + } + for _, event := range events[:n] { + if event.Fd == int32(eventfd) { + if event.Events&unix.EPOLLHUP != 0 || event.Events&unix.EPOLLERR != 0 || event.Events&unix.EPOLLIN != 0 { + // EPOLLHUP: should not happen, but if it does, treat it as a wakeup. + + // EPOLLERR: If an error is waiting on the file descriptor, we should pretend + // something is ready to read, and let unix.Read pick up the error. + + // EPOLLIN: There is data to read. + return true, nil + } } } + // An event occurred that we don't care about. + return false, nil +} + +func (n *linuxCgroupNotifier) Stop() { + n.stopLock.Lock() + defer n.stopLock.Unlock() + select { + case <-n.stop: + // the linuxCgroupNotifier is already stopped + return + default: + } + unix.Close(n.eventfd) + unix.Close(n.epfd) + close(n.stop) } diff --git a/pkg/kubelet/eviction/threshold_notifier_unsupported.go b/pkg/kubelet/eviction/threshold_notifier_unsupported.go index 77a9b26cfc2..7078c7865a9 100644 --- a/pkg/kubelet/eviction/threshold_notifier_unsupported.go +++ b/pkg/kubelet/eviction/threshold_notifier_unsupported.go @@ -18,10 +18,16 @@ limitations under the License. package eviction -import "fmt" +import "github.com/golang/glog" -// NewMemCGThresholdNotifier sends notifications when a cgroup threshold -// is crossed (in either direction) for a given cgroup attribute -func NewMemCGThresholdNotifier(path, attribute, threshold, description string, handler thresholdNotifierHandlerFunc) (ThresholdNotifier, error) { - return nil, fmt.Errorf("threshold notification not supported") +// NewCgroupNotifier creates a cgroup notifier that does nothing because cgroups do not exist on non-linux systems. +func NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) { + glog.V(5).Infof("cgroup notifications not supported") + return &unsupportedThresholdNotifier{}, nil } + +type unsupportedThresholdNotifier struct{} + +func (*unsupportedThresholdNotifier) Start(_ chan<- struct{}) {} + +func (*unsupportedThresholdNotifier) Stop() {} diff --git a/pkg/kubelet/eviction/types.go b/pkg/kubelet/eviction/types.go index 26e0432fd9e..d78e7e0695b 100644 --- a/pkg/kubelet/eviction/types.go +++ b/pkg/kubelet/eviction/types.go @@ -131,20 +131,30 @@ type nodeReclaimFunc func() error // nodeReclaimFuncs is an ordered list of nodeReclaimFunc type nodeReclaimFuncs []nodeReclaimFunc -// thresholdNotifierHandlerFunc is a function that takes action in response to a crossed threshold -type thresholdNotifierHandlerFunc func(thresholdDescription string) - -// ThresholdStopCh is an interface for a channel which is closed to stop waiting goroutines. -// Implementations of ThresholdStopCh must correctly handle concurrent calls to all functions. -type ThresholdStopCh interface { - // Reset closes the channel if it can be closed, and returns true if it was closed. - // Reset also creates a new channel. - Reset() bool - // Ch returns the channel that is closed when Reset() is called - Ch() <-chan struct{} +// CgroupNotifier generates events from cgroup events +type CgroupNotifier interface { + // Start causes the CgroupNotifier to begin notifying on the eventCh + Start(eventCh chan<- struct{}) + // Stop stops all processes and cleans up file descriptors associated with the CgroupNotifier + Stop() } -// ThresholdNotifier notifies the user when an attribute crosses a threshold value +// NotifierFactory creates CgroupNotifer +type NotifierFactory interface { + // NewCgroupNotifier creates a CgroupNotifier that creates events when the threshold + // on the attribute in the cgroup specified by the path is crossed. + NewCgroupNotifier(path, attribute string, threshold int64) (CgroupNotifier, error) +} + +// ThresholdNotifier manages CgroupNotifiers based on memory eviction thresholds, and performs a function +// when memory eviction thresholds are crossed type ThresholdNotifier interface { - Start(ThresholdStopCh) + // Start calls the notifier function when the CgroupNotifier notifies the ThresholdNotifier that an event occurred + Start() + // UpdateThreshold updates the memory cgroup threshold based on the metrics provided. + // Calling UpdateThreshold with recent metrics allows the ThresholdNotifier to trigger at the + // eviction threshold more accurately + UpdateThreshold(summary *statsapi.Summary) error + // Description produces a relevant string describing the Memory Threshold Notifier + Description() string }