From bb1920edcc472f9eaf828bee5593c702a58a1446 Mon Sep 17 00:00:00 2001 From: Jing Xu Date: Fri, 7 Jul 2017 13:40:13 -0700 Subject: [PATCH] Fix issues for local storage allocatable feature This PR fixes the following issues: 1. Use ResourceStorageScratch instead of ResourceStorage API to represent local storage capacity 2. In eviction manager, use container manager instead of node provider (kubelet) to retrieve the node capacity and reserved resources. Node provider (kubelet) has a feature gate so that storagescratch information may not be exposed if feature gate is not set. On the other hand, container manager has all the capacity and allocatable resource information. --- cmd/kubelet/app/server.go | 7 +- pkg/kubelet/cm/node_container_manager.go | 4 + pkg/kubelet/eviction/eviction_manager.go | 12 +- pkg/kubelet/eviction/eviction_manager_test.go | 119 +++++++++--------- pkg/kubelet/eviction/helpers.go | 35 ++++-- pkg/kubelet/eviction/helpers_test.go | 4 +- pkg/kubelet/eviction/types.go | 12 +- pkg/kubelet/kubelet.go | 2 +- .../algorithm/predicates/predicates_test.go | 24 ++-- .../pkg/scheduler/schedulercache/node_info.go | 4 +- ...local_storage_allocatable_eviction_test.go | 3 +- 11 files changed, 125 insertions(+), 101 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 4a5ccf8141d..2ff84421521 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -942,7 +942,12 @@ func parseResourceList(m componentconfig.ConfigurationMap) (v1.ResourceList, err if q.Sign() == -1 { return nil, fmt.Errorf("resource quantity for %q cannot be negative: %v", k, v) } - rl[v1.ResourceName(k)] = q + // storage specified in configuration map is mapped to ResourceStorageScratch API + if v1.ResourceName(k) == v1.ResourceStorage { + rl[v1.ResourceStorageScratch] = q + } else { + rl[v1.ResourceName(k)] = q + } default: return nil, fmt.Errorf("cannot reserve %q resource", k) } diff --git a/pkg/kubelet/cm/node_container_manager.go b/pkg/kubelet/cm/node_container_manager.go index bb6359f2d4e..d0da78cc4a4 100644 --- a/pkg/kubelet/cm/node_container_manager.go +++ b/pkg/kubelet/cm/node_container_manager.go @@ -217,6 +217,10 @@ func hardEvictionReservation(thresholds []evictionapi.Threshold, capacity v1.Res memoryCapacity := capacity[v1.ResourceMemory] value := evictionapi.GetThresholdQuantity(threshold.Value, &memoryCapacity) ret[v1.ResourceMemory] = *value + case evictionapi.SignalNodeFsAvailable: + storageCapacity := capacity[v1.ResourceStorageScratch] + value := evictionapi.GetThresholdQuantity(threshold.Value, &storageCapacity) + ret[v1.ResourceStorageScratch] = *value } } return ret diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index 2f12390da06..0367f8ffac7 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -148,11 +148,11 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd } // Start starts the control loop to observe and response to low compute resources. -func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, nodeProvider NodeProvider, monitoringInterval time.Duration) { +func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, capacityProvider CapacityProvider, monitoringInterval time.Duration) { // start the eviction manager monitoring go func() { for { - if evictedPods := m.synchronize(diskInfoProvider, podFunc, nodeProvider); evictedPods != nil { + if evictedPods := m.synchronize(diskInfoProvider, podFunc, capacityProvider); evictedPods != nil { glog.Infof("eviction manager: pods %s evicted, waiting for pod to be cleaned up", format.Pods(evictedPods)) m.waitForPodsCleanup(podCleanedUpFunc, evictedPods) } else { @@ -211,7 +211,7 @@ func startMemoryThresholdNotifier(thresholds []evictionapi.Threshold, observatio // synchronize is the main control loop that enforces eviction thresholds. // Returns the pod that was killed, or nil if no pod was killed. -func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, nodeProvider NodeProvider) []*v1.Pod { +func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, capacityProvider CapacityProvider) []*v1.Pod { // if we have nothing to do, just return thresholds := m.config.Thresholds if len(thresholds) == 0 { @@ -233,7 +233,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act activePods := podFunc() // make observations and get a function to derive pod usage stats relative to those observations. - observations, statsFunc, err := makeSignalObservations(m.summaryProvider, nodeProvider, activePods, *m.dedicatedImageFs) + observations, statsFunc, err := makeSignalObservations(m.summaryProvider, capacityProvider, activePods, *m.dedicatedImageFs) if err != nil { glog.Errorf("eviction manager: unexpected err: %v", err) return nil @@ -248,7 +248,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act err = startMemoryThresholdNotifier(m.config.Thresholds, observations, false, func(desc string) { glog.Infof("soft memory eviction threshold crossed at %s", desc) // TODO wait grace period for soft memory limit - m.synchronize(diskInfoProvider, podFunc, nodeProvider) + m.synchronize(diskInfoProvider, podFunc, capacityProvider) }) if err != nil { glog.Warningf("eviction manager: failed to create hard memory threshold notifier: %v", err) @@ -256,7 +256,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act // start hard memory notification err = startMemoryThresholdNotifier(m.config.Thresholds, observations, true, func(desc string) { glog.Infof("hard memory eviction threshold crossed at %s", desc) - m.synchronize(diskInfoProvider, podFunc, nodeProvider) + m.synchronize(diskInfoProvider, podFunc, capacityProvider) }) if err != nil { glog.Warningf("eviction manager: failed to create soft memory threshold notifier: %v", err) diff --git a/pkg/kubelet/eviction/eviction_manager_test.go b/pkg/kubelet/eviction/eviction_manager_test.go index 1db96af8a08..7d4eccc28b3 100644 --- a/pkg/kubelet/eviction/eviction_manager_test.go +++ b/pkg/kubelet/eviction/eviction_manager_test.go @@ -59,22 +59,25 @@ func (m *mockDiskInfoProvider) HasDedicatedImageFs() (bool, error) { return m.dedicatedImageFs, nil } -func newMockNodeProvider(allocatableCapacity v1.ResourceList) *mockNodeProvider { - return &mockNodeProvider{ - node: v1.Node{ - Status: v1.NodeStatus{ - Allocatable: allocatableCapacity, - }, - }, +func newMockCapacityProvider(capacity, reservation v1.ResourceList) *mockCapacityProvider { + return &mockCapacityProvider{ + capacity: capacity, + reservation: reservation, } } -type mockNodeProvider struct { - node v1.Node +type mockCapacityProvider struct { + capacity v1.ResourceList + reservation v1.ResourceList } -func (m *mockNodeProvider) GetNode() (*v1.Node, error) { - return &m.node, nil +func (m *mockCapacityProvider) GetCapacity() v1.ResourceList { + return m.capacity + +} + +func (m *mockCapacityProvider) GetNodeAllocatableReservation() v1.ResourceList { + return m.reservation } // mockDiskGC is used to simulate invoking image and container garbage collection. @@ -200,7 +203,7 @@ func TestMemoryPressure(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} - nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")}) + capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")}) imageGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil} nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} @@ -243,7 +246,7 @@ func TestMemoryPressure(t *testing.T) { burstablePodToAdmit, _ := podMaker("burst-admit", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi"), "0Gi") // synchronize - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have memory pressure if manager.IsUnderMemoryPressure() { @@ -261,7 +264,7 @@ func TestMemoryPressure(t *testing.T) { // induce soft threshold fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("1500Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -276,7 +279,7 @@ func TestMemoryPressure(t *testing.T) { // step forward in time pass the grace period fakeClock.Step(3 * time.Minute) summaryProvider.result = summaryStatsMaker("1500Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -301,7 +304,7 @@ func TestMemoryPressure(t *testing.T) { // remove memory pressure fakeClock.Step(20 * time.Minute) summaryProvider.result = summaryStatsMaker("3Gi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have memory pressure if manager.IsUnderMemoryPressure() { @@ -311,7 +314,7 @@ func TestMemoryPressure(t *testing.T) { // induce memory pressure! fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("500Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -339,7 +342,7 @@ func TestMemoryPressure(t *testing.T) { fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("2Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure (because transition period not yet met) if !manager.IsUnderMemoryPressure() { @@ -363,7 +366,7 @@ func TestMemoryPressure(t *testing.T) { fakeClock.Step(5 * time.Minute) summaryProvider.result = summaryStatsMaker("2Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have memory pressure (because transition period met) if manager.IsUnderMemoryPressure() { @@ -418,7 +421,7 @@ func TestDiskPressureNodeFs(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} - nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")}) + capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")}) diskGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil} nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} @@ -461,7 +464,7 @@ func TestDiskPressureNodeFs(t *testing.T) { podToAdmit, _ := podMaker("pod-to-admit", newResourceList("", ""), newResourceList("", ""), "0Gi", "0Gi", "0Gi") // synchronize - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have disk pressure if manager.IsUnderDiskPressure() { @@ -476,7 +479,7 @@ func TestDiskPressureNodeFs(t *testing.T) { // induce soft threshold fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("1.5Gi", "200Gi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -491,7 +494,7 @@ func TestDiskPressureNodeFs(t *testing.T) { // step forward in time pass the grace period fakeClock.Step(3 * time.Minute) summaryProvider.result = summaryStatsMaker("1.5Gi", "200Gi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -516,7 +519,7 @@ func TestDiskPressureNodeFs(t *testing.T) { // remove disk pressure fakeClock.Step(20 * time.Minute) summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have disk pressure if manager.IsUnderDiskPressure() { @@ -526,7 +529,7 @@ func TestDiskPressureNodeFs(t *testing.T) { // induce disk pressure! fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("500Mi", "200Gi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -551,7 +554,7 @@ func TestDiskPressureNodeFs(t *testing.T) { fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure (because transition period not yet met) if !manager.IsUnderDiskPressure() { @@ -572,7 +575,7 @@ func TestDiskPressureNodeFs(t *testing.T) { fakeClock.Step(5 * time.Minute) summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have disk pressure (because transition period met) if manager.IsUnderDiskPressure() { @@ -617,7 +620,7 @@ func TestMinReclaim(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} - nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")}) + capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")}) diskGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil} nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} @@ -652,7 +655,7 @@ func TestMinReclaim(t *testing.T) { } // synchronize - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have memory pressure if manager.IsUnderMemoryPressure() { @@ -662,7 +665,7 @@ func TestMinReclaim(t *testing.T) { // induce memory pressure! fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("500Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -682,7 +685,7 @@ func TestMinReclaim(t *testing.T) { fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("1.2Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure (because transition period not yet met) if !manager.IsUnderMemoryPressure() { @@ -702,7 +705,7 @@ func TestMinReclaim(t *testing.T) { fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("2Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure (because transition period not yet met) if !manager.IsUnderMemoryPressure() { @@ -718,7 +721,7 @@ func TestMinReclaim(t *testing.T) { fakeClock.Step(5 * time.Minute) summaryProvider.result = summaryStatsMaker("2Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have memory pressure (because transition period met) if manager.IsUnderMemoryPressure() { @@ -757,7 +760,7 @@ func TestNodeReclaimFuncs(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} - nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")}) + capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")}) imageGcFree := resource.MustParse("700Mi") diskGC := &mockDiskGC{imageBytesFreed: imageGcFree.Value(), err: nil} nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} @@ -793,7 +796,7 @@ func TestNodeReclaimFuncs(t *testing.T) { } // synchronize - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have disk pressure if manager.IsUnderDiskPressure() { @@ -803,7 +806,7 @@ func TestNodeReclaimFuncs(t *testing.T) { // induce hard threshold fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker(".9Gi", "200Gi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -827,7 +830,7 @@ func TestNodeReclaimFuncs(t *testing.T) { // remove disk pressure fakeClock.Step(20 * time.Minute) summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have disk pressure if manager.IsUnderDiskPressure() { @@ -837,7 +840,7 @@ func TestNodeReclaimFuncs(t *testing.T) { // induce disk pressure! fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("400Mi", "200Gi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -864,7 +867,7 @@ func TestNodeReclaimFuncs(t *testing.T) { diskGC.imageGCInvoked = false // reset state diskGC.containerGCInvoked = false // reset state podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure (because transition period not yet met) if !manager.IsUnderDiskPressure() { @@ -887,7 +890,7 @@ func TestNodeReclaimFuncs(t *testing.T) { diskGC.imageGCInvoked = false // reset state diskGC.containerGCInvoked = false // reset state podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have disk pressure (because transition period met) if manager.IsUnderDiskPressure() { @@ -955,7 +958,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} - nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")}) + capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")}) diskGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil} nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} @@ -998,7 +1001,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) { podToAdmit, _ := podMaker("pod-to-admit", newResourceList("", ""), newResourceList("", ""), "0", "0", "0") // synchronize - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have disk pressure if manager.IsUnderDiskPressure() { @@ -1013,7 +1016,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) { // induce soft threshold fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("1.5Mi", "4Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -1028,7 +1031,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) { // step forward in time pass the grace period fakeClock.Step(3 * time.Minute) summaryProvider.result = summaryStatsMaker("1.5Mi", "4Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -1053,7 +1056,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) { // remove inode pressure fakeClock.Step(20 * time.Minute) summaryProvider.result = summaryStatsMaker("3Mi", "4Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have disk pressure if manager.IsUnderDiskPressure() { @@ -1063,7 +1066,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) { // induce inode pressure! fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("0.5Mi", "4Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -1088,7 +1091,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) { fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("3Mi", "4Mi", podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have disk pressure (because transition period not yet met) if !manager.IsUnderDiskPressure() { @@ -1109,7 +1112,7 @@ func TestInodePressureNodeFsInodes(t *testing.T) { fakeClock.Step(5 * time.Minute) summaryProvider.result = summaryStatsMaker("3Mi", "4Mi", podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have disk pressure (because transition period met) if manager.IsUnderDiskPressure() { @@ -1157,7 +1160,7 @@ func TestCriticalPodsAreNotEvicted(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} - nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")}) + capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")}) diskGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil} nodeRef := &clientv1.ObjectReference{ Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: "", @@ -1203,7 +1206,7 @@ func TestCriticalPodsAreNotEvicted(t *testing.T) { // induce soft threshold fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("1500Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -1218,7 +1221,7 @@ func TestCriticalPodsAreNotEvicted(t *testing.T) { // step forward in time pass the grace period fakeClock.Step(3 * time.Minute) summaryProvider.result = summaryStatsMaker("1500Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -1236,7 +1239,7 @@ func TestCriticalPodsAreNotEvicted(t *testing.T) { // remove memory pressure fakeClock.Step(20 * time.Minute) summaryProvider.result = summaryStatsMaker("3Gi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have memory pressure if manager.IsUnderMemoryPressure() { @@ -1249,7 +1252,7 @@ func TestCriticalPodsAreNotEvicted(t *testing.T) { // induce memory pressure! fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("500Mi", podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -1290,7 +1293,7 @@ func TestAllocatableMemoryPressure(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} - nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("2Gi")}) + capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("3Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("1Gi")}) diskGC := &mockDiskGC{imageBytesFreed: int64(0), err: nil} nodeRef := &clientv1.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} @@ -1326,7 +1329,7 @@ func TestAllocatableMemoryPressure(t *testing.T) { burstablePodToAdmit, _ := podMaker("burst-admit", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi"), "0Gi") // synchronize - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have memory pressure if manager.IsUnderMemoryPressure() { @@ -1346,7 +1349,7 @@ func TestAllocatableMemoryPressure(t *testing.T) { pod, podStat := podMaker("guaranteed-high-2", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi"), "1Gi") podStats[pod] = podStat summaryProvider.result = summaryStatsMaker(constantCapacity, podStats) - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -1382,7 +1385,7 @@ func TestAllocatableMemoryPressure(t *testing.T) { } summaryProvider.result = summaryStatsMaker(constantCapacity, podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should have memory pressure (because transition period not yet met) if !manager.IsUnderMemoryPressure() { @@ -1406,7 +1409,7 @@ func TestAllocatableMemoryPressure(t *testing.T) { fakeClock.Step(5 * time.Minute) summaryProvider.result = summaryStatsMaker(constantCapacity, podStats) podKiller.pod = nil // reset state - manager.synchronize(diskInfoProvider, activePodsFunc, nodeProvider) + manager.synchronize(diskInfoProvider, activePodsFunc, capacityProvider) // we should not have memory pressure (because transition period met) if manager.IsUnderMemoryPressure() { diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index eb906f3023a..859b8d21a45 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -658,16 +658,11 @@ func (a byEvictionPriority) Less(i, j int) bool { } // makeSignalObservations derives observations using the specified summary provider. -func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider NodeProvider, pods []*v1.Pod, withImageFs bool) (signalObservations, statsFunc, error) { +func makeSignalObservations(summaryProvider stats.SummaryProvider, capacityProvider CapacityProvider, pods []*v1.Pod, withImageFs bool) (signalObservations, statsFunc, error) { summary, err := summaryProvider.Get() if err != nil { return nil, nil, err } - node, err := nodeProvider.GetNode() - if err != nil { - return nil, nil, err - } - // build the function to work against for pod stats statsFunc := cachedStatsFunc(summary.Pods) // build an evaluation context for current eviction signals @@ -714,8 +709,12 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider } } } - if memoryAllocatableCapacity, ok := node.Status.Allocatable[v1.ResourceMemory]; ok { - memoryAllocatableAvailable := memoryAllocatableCapacity.Copy() + + nodeCapacity := capacityProvider.GetCapacity() + allocatableReservation := capacityProvider.GetNodeAllocatableReservation() + + memoryAllocatableCapacity, memoryAllocatableAvailable, exist := getResourceAllocatable(nodeCapacity, allocatableReservation, v1.ResourceMemory) + if exist { for _, pod := range summary.Pods { mu, err := podMemoryUsage(pod) if err == nil { @@ -724,12 +723,12 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider } result[evictionapi.SignalAllocatableMemoryAvailable] = signalObservation{ available: memoryAllocatableAvailable, - capacity: memoryAllocatableCapacity.Copy(), + capacity: memoryAllocatableCapacity, } } - if storageScratchAllocatableCapacity, ok := node.Status.Allocatable[v1.ResourceStorage]; ok { - storageScratchAllocatable := storageScratchAllocatableCapacity.Copy() + storageScratchCapacity, storageScratchAllocatable, exist := getResourceAllocatable(nodeCapacity, allocatableReservation, v1.ResourceStorageScratch) + if exist { for _, pod := range pods { podStat, ok := statsFunc(pod) if !ok { @@ -754,13 +753,25 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider, nodeProvider } result[evictionapi.SignalAllocatableNodeFsAvailable] = signalObservation{ available: storageScratchAllocatable, - capacity: storageScratchAllocatableCapacity.Copy(), + capacity: storageScratchCapacity, } } return result, statsFunc, nil } +func getResourceAllocatable(capacity v1.ResourceList, reservation v1.ResourceList, resourceName v1.ResourceName) (*resource.Quantity, *resource.Quantity, bool) { + if capacity, ok := capacity[resourceName]; ok { + allocate := capacity.Copy() + if reserved, exists := reservation[resourceName]; exists { + allocate.Sub(reserved) + } + return capacity.Copy(), allocate, true + } + glog.Errorf("Could not find capacity information for resource %v", resourceName) + return nil, nil, false +} + // thresholdsMet returns the set of thresholds that were met independent of grace period func thresholdsMet(thresholds []evictionapi.Threshold, observations signalObservations, enforceMinReclaim bool) []evictionapi.Threshold { results := []evictionapi.Threshold{} diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index 9edb38e0f79..4c671d28091 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -782,12 +782,12 @@ func TestMakeSignalObservations(t *testing.T) { fakeStats.Pods = append(fakeStats.Pods, newPodStats(pod, containerWorkingSetBytes)) } res := quantityMustParse("5Gi") - nodeProvider := newMockNodeProvider(v1.ResourceList{v1.ResourceMemory: *res}) + capacityProvider := newMockCapacityProvider(v1.ResourceList{v1.ResourceMemory: *quantityMustParse("5Gi")}, v1.ResourceList{v1.ResourceMemory: *quantityMustParse("0Gi")}) // Allocatable thresholds are always 100%. Verify that Threshold == Capacity. if res.CmpInt64(int64(allocatableMemoryCapacity)) != 0 { t.Errorf("Expected Threshold %v to be equal to value %v", res.Value(), allocatableMemoryCapacity) } - actualObservations, statsFunc, err := makeSignalObservations(provider, nodeProvider, pods, false) + actualObservations, statsFunc, err := makeSignalObservations(provider, capacityProvider, pods, false) if err != nil { t.Errorf("Unexpected err: %v", err) } diff --git a/pkg/kubelet/eviction/types.go b/pkg/kubelet/eviction/types.go index e4149ff1038..7b97f37b061 100644 --- a/pkg/kubelet/eviction/types.go +++ b/pkg/kubelet/eviction/types.go @@ -53,7 +53,7 @@ type Config struct { // Manager evaluates when an eviction threshold for node stability has been met on the node. type Manager interface { // Start starts the control loop to monitor eviction thresholds at specified interval. - Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, nodeProvider NodeProvider, monitoringInterval time.Duration) + Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, podCleanedUpFunc PodCleanedUpFunc, capacityProvider CapacityProvider, monitoringInterval time.Duration) // IsUnderMemoryPressure returns true if the node is under memory pressure. IsUnderMemoryPressure() bool @@ -68,10 +68,12 @@ type DiskInfoProvider interface { HasDedicatedImageFs() (bool, error) } -// NodeProvider is responsible for providing the node api object describing this node -type NodeProvider interface { - // GetNode returns the node info for this node - GetNode() (*v1.Node, error) +// CapacityProvider is responsible for providing the resource capacity and reservation information +type CapacityProvider interface { + // GetCapacity returns the amount of compute resources tracked by container manager available on the node. + GetCapacity() v1.ResourceList + // GetNodeAllocatable returns the amount of compute resources that have to be reserved from scheduling. + GetNodeAllocatableReservation() v1.ResourceList } // ImageGC is responsible for performing garbage collection of unused images. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 766af05eb41..d32c2ccc088 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1250,7 +1250,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { glog.Fatalf("Failed to start cAdvisor %v", err) } // eviction manager must start after cadvisor because it needs to know if the container runtime has a dedicated imagefs - kl.evictionManager.Start(kl.cadvisor, kl.GetActivePods, kl.podResourcesAreReclaimed, kl, evictionMonitoringPeriod) + kl.evictionManager.Start(kl.cadvisor, kl.GetActivePods, kl.podResourcesAreReclaimed, kl.containerManager, evictionMonitoringPeriod) } // Run starts the kubelet reacting to config updates diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index 26081ffc1ee..a9c8e7a42b4 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -80,24 +80,24 @@ var ( func makeResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA, storage int64) v1.NodeResources { return v1.NodeResources{ Capacity: v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI), - v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI), - opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI), - v1.ResourceStorage: *resource.NewQuantity(storage, resource.BinarySI), + v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI), + v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI), + opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI), + v1.ResourceStorageScratch: *resource.NewQuantity(storage, resource.BinarySI), }, } } func makeAllocatableResources(milliCPU, memory, nvidiaGPUs, pods, opaqueA, storage int64) v1.ResourceList { return v1.ResourceList{ - v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), - v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), - v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI), - v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI), - opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI), - v1.ResourceStorage: *resource.NewQuantity(storage, resource.BinarySI), + v1.ResourceCPU: *resource.NewMilliQuantity(milliCPU, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(memory, resource.BinarySI), + v1.ResourcePods: *resource.NewQuantity(pods, resource.DecimalSI), + v1.ResourceNvidiaGPU: *resource.NewQuantity(nvidiaGPUs, resource.DecimalSI), + opaqueResourceA: *resource.NewQuantity(opaqueA, resource.DecimalSI), + v1.ResourceStorageScratch: *resource.NewQuantity(storage, resource.BinarySI), } } diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 5d144f69f9f..bba1c3e76bb 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -97,7 +97,7 @@ func (r *Resource) Add(rl v1.ResourceList) { r.NvidiaGPU += rQuant.Value() case v1.ResourcePods: r.AllowedPodNumber += int(rQuant.Value()) - case v1.ResourceStorage: + case v1.ResourceStorageScratch: r.StorageScratch += rQuant.Value() case v1.ResourceStorageOverlay: r.StorageOverlay += rQuant.Value() @@ -116,7 +116,7 @@ func (r *Resource) ResourceList() v1.ResourceList { v1.ResourceNvidiaGPU: *resource.NewQuantity(r.NvidiaGPU, resource.DecimalSI), v1.ResourcePods: *resource.NewQuantity(int64(r.AllowedPodNumber), resource.BinarySI), v1.ResourceStorageOverlay: *resource.NewQuantity(r.StorageOverlay, resource.BinarySI), - v1.ResourceStorage: *resource.NewQuantity(r.StorageScratch, resource.BinarySI), + v1.ResourceStorageScratch: *resource.NewQuantity(r.StorageScratch, resource.BinarySI), } for rName, rQuant := range r.OpaqueIntResources { result[rName] = *resource.NewQuantity(rQuant, resource.DecimalSI) diff --git a/test/e2e_node/local_storage_allocatable_eviction_test.go b/test/e2e_node/local_storage_allocatable_eviction_test.go index d3dde5f417f..e5360cd20d8 100644 --- a/test/e2e_node/local_storage_allocatable_eviction_test.go +++ b/test/e2e_node/local_storage_allocatable_eviction_test.go @@ -52,8 +52,7 @@ var _ = framework.KubeDescribe("LocalStorageAllocatableEviction [Slow] [Serial] diskReserve = uint64(0.8 * diskAvail / 1000000) // Reserve 0.8 * disk Capacity for kube-reserved scratch storage maxDisk := 10000000 // Set dd command to read and write up to 10MB at a time count := uint64(0.8 * diskAvail / float64(maxDisk)) - command := fmt.Sprintf("dd if=/dev/urandom of=dummy bs=%d count=%d; sleep 0.5; while true; do sleep 5; done", maxDisk, count) - + command := fmt.Sprintf("dd if=/dev/urandom of=dummy bs=%d count=%d; while true; do sleep 5; done", maxDisk, count) podTestSpecs = []podTestSpec{ { evictionPriority: 1, // This pod should be evicted before the innocent pod