From c3324b88a06d9b9617b6ee15e7dcc173198bdd7e Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Fri, 22 Jul 2016 15:23:09 -0400 Subject: [PATCH 1/4] Eviction manager observes and acts on disk pressure --- pkg/api/types.go | 2 + pkg/api/v1/types.go | 2 + pkg/kubelet/eviction/eviction_manager.go | 32 +- pkg/kubelet/eviction/eviction_manager_test.go | 237 ++++++++++++++- pkg/kubelet/eviction/helpers.go | 127 ++++++-- pkg/kubelet/eviction/helpers_test.go | 284 ++++++++++++++++-- pkg/kubelet/eviction/types.go | 19 ++ pkg/kubelet/kubelet_node_status.go | 59 ++++ pkg/kubelet/kubelet_node_status_test.go | 38 ++- 9 files changed, 750 insertions(+), 50 deletions(-) diff --git a/pkg/api/types.go b/pkg/api/types.go index 5ba2158c340..37d5152c96f 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -2069,6 +2069,8 @@ const ( NodeOutOfDisk NodeConditionType = "OutOfDisk" // NodeMemoryPressure means the kubelet is under pressure due to insufficient available memory. NodeMemoryPressure NodeConditionType = "MemoryPressure" + // NodeDiskPressure means the kubelet is under pressure due to insufficient available disk. + NodeDiskPressure NodeConditionType = "DiskPressure" // NodeNetworkUnavailable means that network for the node is not correctly configured. NodeNetworkUnavailable NodeConditionType = "NetworkUnavailable" ) diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index 54b6e8d6e9b..42ef5808ae3 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -2470,6 +2470,8 @@ const ( NodeOutOfDisk NodeConditionType = "OutOfDisk" // NodeMemoryPressure means the kubelet is under pressure due to insufficient available memory. NodeMemoryPressure NodeConditionType = "MemoryPressure" + // NodeDiskPressure means the kubelet is under pressure due to insufficient available disk. + NodeDiskPressure NodeConditionType = "DiskPressure" // NodeNetworkUnavailable means that network for the node is not correctly configured. NodeNetworkUnavailable NodeConditionType = "NetworkUnavailable" ) diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index 3aed3cea36c..3f2a1913cfc 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -54,6 +54,8 @@ type managerImpl struct { summaryProvider stats.SummaryProvider // records when a threshold was first observed thresholdsFirstObservedAt thresholdsObservedAt + // resourceToRankFunc maps a resource to ranking function for that resource. + resourceToRankFunc map[api.ResourceName]rankFunc } // ensure it implements the required interface @@ -87,12 +89,17 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd if len(m.nodeConditions) == 0 { return lifecycle.PodAdmitResult{Admit: true} } - notBestEffort := qos.BestEffort != qos.GetPodQOS(attrs.Pod) - if notBestEffort { - return lifecycle.PodAdmitResult{Admit: true} + + // the node has memory pressure, admit if not best-effort + if hasNodeCondition(m.nodeConditions, api.NodeMemoryPressure) { + notBestEffort := qos.BestEffort != qos.GetPodQOS(attrs.Pod) + if notBestEffort { + return lifecycle.PodAdmitResult{Admit: true} + } } + + // reject pods when under memory pressure (if pod is best effort), or if under disk pressure. glog.Warningf("Failed to admit pod %v - %s", format.Pod(attrs.Pod), "node has conditions: %v", m.nodeConditions) - // we reject all best effort pods until we are stable. return lifecycle.PodAdmitResult{ Admit: false, Reason: reason, @@ -102,6 +109,14 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd // Start starts the control loop to observe and response to low compute resources. func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, monitoringInterval time.Duration) error { + // build the ranking functions now that we can know if the imagefs is dedicated or not. + hasDedicatedImageFs, err := diskInfoProvider.HasDedicatedImageFs() + if err != nil { + return err + } + m.resourceToRankFunc = buildResourceToRankFunc(hasDedicatedImageFs) + + // start the eviction manager monitoring go wait.Until(func() { m.synchronize(podFunc) }, monitoringInterval, wait.NeverStop) return nil } @@ -113,6 +128,13 @@ func (m *managerImpl) IsUnderMemoryPressure() bool { return hasNodeCondition(m.nodeConditions, api.NodeMemoryPressure) } +// IsUnderDiskPressure returns true if the node is under disk pressure. +func (m *managerImpl) IsUnderDiskPressure() bool { + m.RLock() + defer m.RUnlock() + return hasNodeCondition(m.nodeConditions, api.NodeDiskPressure) +} + // synchronize is the main control loop that enforces eviction thresholds. func (m *managerImpl) synchronize(podFunc ActivePodsFunc) { // if we have nothing to do, just return @@ -175,7 +197,7 @@ func (m *managerImpl) synchronize(podFunc ActivePodsFunc) { m.recorder.Eventf(m.nodeRef, api.EventTypeWarning, "EvictionThresholdMet", "Attempting to reclaim %s", resourceToReclaim) // rank the pods for eviction - rank, ok := resourceToRankFunc[resourceToReclaim] + rank, ok := m.resourceToRankFunc[resourceToReclaim] if !ok { glog.Errorf("eviction manager: no ranking function for resource %s", resourceToReclaim) return diff --git a/pkg/kubelet/eviction/eviction_manager_test.go b/pkg/kubelet/eviction/eviction_manager_test.go index 6da0ed1d107..8cfa546d7c3 100644 --- a/pkg/kubelet/eviction/eviction_manager_test.go +++ b/pkg/kubelet/eviction/eviction_manager_test.go @@ -48,8 +48,8 @@ func (m *mockPodKiller) killPodNow(pod *api.Pod, status api.PodStatus, gracePeri func TestMemoryPressure(t *testing.T) { podMaker := func(name string, requests api.ResourceList, limits api.ResourceList, memoryWorkingSet string) (*api.Pod, statsapi.PodStats) { pod := newPod(name, []api.Container{ - newContainer(name, requests, api.ResourceList{}), - }) + newContainer(name, requests, limits), + }, nil) podStats := newPodMemoryStats(pod, resource.MustParse(memoryWorkingSet)) return pod, podStats } @@ -124,6 +124,7 @@ func TestMemoryPressure(t *testing.T) { nodeRef: nodeRef, nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, thresholdsFirstObservedAt: thresholdsObservedAt{}, + resourceToRankFunc: buildResourceToRankFunc(false), } // create a best effort pod to test admission @@ -271,3 +272,235 @@ func TestMemoryPressure(t *testing.T) { } } } + +// parseQuantity parses the specified value (if provided) otherwise returns 0 value +func parseQuantity(value string) resource.Quantity { + if len(value) == 0 { + return resource.MustParse("0") + } + return resource.MustParse(value) +} + +func TestDiskPressureNodeFs(t *testing.T) { + podMaker := func(name string, requests api.ResourceList, limits api.ResourceList, rootFsUsed, logsUsed, perLocalVolumeUsed string) (*api.Pod, statsapi.PodStats) { + pod := newPod(name, []api.Container{ + newContainer(name, requests, limits), + }, nil) + podStats := newPodDiskStats(pod, parseQuantity(rootFsUsed), parseQuantity(logsUsed), parseQuantity(perLocalVolumeUsed)) + return pod, podStats + } + summaryStatsMaker := func(rootFsAvailableBytes, imageFsAvailableBytes string, podStats map[*api.Pod]statsapi.PodStats) *statsapi.Summary { + rootFsVal := resource.MustParse(rootFsAvailableBytes) + rootFsBytes := uint64(rootFsVal.Value()) + imageFsVal := resource.MustParse(imageFsAvailableBytes) + imageFsBytes := uint64(imageFsVal.Value()) + result := &statsapi.Summary{ + Node: statsapi.NodeStats{ + Fs: &statsapi.FsStats{ + AvailableBytes: &rootFsBytes, + }, + Runtime: &statsapi.RuntimeStats{ + ImageFs: &statsapi.FsStats{ + AvailableBytes: &imageFsBytes, + }, + }, + }, + Pods: []statsapi.PodStats{}, + } + for _, podStat := range podStats { + result.Pods = append(result.Pods, podStat) + } + return result + } + podsToMake := []struct { + name string + requests api.ResourceList + limits api.ResourceList + rootFsUsed string + logsFsUsed string + perLocalVolumeUsed string + }{ + {name: "best-effort-high", requests: newResourceList("", ""), limits: newResourceList("", ""), rootFsUsed: "500Mi"}, + {name: "best-effort-low", requests: newResourceList("", ""), limits: newResourceList("", ""), perLocalVolumeUsed: "300Mi"}, + {name: "burstable-high", requests: newResourceList("100m", "100Mi"), limits: newResourceList("200m", "1Gi"), rootFsUsed: "800Mi"}, + {name: "burstable-low", requests: newResourceList("100m", "100Mi"), limits: newResourceList("200m", "1Gi"), logsFsUsed: "300Mi"}, + {name: "guaranteed-high", requests: newResourceList("100m", "1Gi"), limits: newResourceList("100m", "1Gi"), rootFsUsed: "800Mi"}, + {name: "guaranteed-low", requests: newResourceList("100m", "1Gi"), limits: newResourceList("100m", "1Gi"), rootFsUsed: "200Mi"}, + } + pods := []*api.Pod{} + podStats := map[*api.Pod]statsapi.PodStats{} + for _, podToMake := range podsToMake { + pod, podStat := podMaker(podToMake.name, podToMake.requests, podToMake.limits, podToMake.rootFsUsed, podToMake.logsFsUsed, podToMake.perLocalVolumeUsed) + pods = append(pods, pod) + podStats[pod] = podStat + } + activePodsFunc := func() []*api.Pod { + return pods + } + + fakeClock := util.NewFakeClock(time.Now()) + podKiller := &mockPodKiller{} + nodeRef := &api.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} + + config := Config{ + MaxPodGracePeriodSeconds: 5, + PressureTransitionPeriod: time.Minute * 5, + Thresholds: []Threshold{ + { + Signal: SignalNodeFsAvailable, + Operator: OpLessThan, + Value: quantityMustParse("1Gi"), + }, + { + Signal: SignalNodeFsAvailable, + Operator: OpLessThan, + Value: quantityMustParse("2Gi"), + GracePeriod: time.Minute * 2, + }, + }, + } + summaryProvider := &fakeSummaryProvider{result: summaryStatsMaker("16Gi", "200Gi", podStats)} + manager := &managerImpl{ + clock: fakeClock, + killPodFunc: podKiller.killPodNow, + config: config, + recorder: &record.FakeRecorder{}, + summaryProvider: summaryProvider, + nodeRef: nodeRef, + nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, + thresholdsFirstObservedAt: thresholdsObservedAt{}, + resourceToRankFunc: buildResourceToRankFunc(false), + } + + // create a best effort pod to test admission + podToAdmit, _ := podMaker("pod-to-admit", newResourceList("", ""), newResourceList("", ""), "0Gi", "0Gi", "0Gi") + + // synchronize + manager.synchronize(activePodsFunc) + + // we should not have disk pressure + if manager.IsUnderDiskPressure() { + t.Errorf("Manager should not report disk pressure") + } + + // try to admit our pod (should succeed) + if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: podToAdmit}); !result.Admit { + t.Errorf("Admit pod: %v, expected: %v, actual: %v", podToAdmit, true, result.Admit) + } + + // induce soft threshold + fakeClock.Step(1 * time.Minute) + summaryProvider.result = summaryStatsMaker("1.5Gi", "200Gi", podStats) + manager.synchronize(activePodsFunc) + + // we should have disk pressure + if !manager.IsUnderDiskPressure() { + t.Errorf("Manager should report disk pressure since soft threshold was met") + } + + // verify no pod was yet killed because there has not yet been enough time passed. + if podKiller.pod != nil { + t.Errorf("Manager should not have killed a pod yet, but killed: %v", podKiller.pod) + } + + // step forward in time pass the grace period + fakeClock.Step(3 * time.Minute) + summaryProvider.result = summaryStatsMaker("1.5Gi", "200Gi", podStats) + manager.synchronize(activePodsFunc) + + // we should have disk pressure + if !manager.IsUnderDiskPressure() { + t.Errorf("Manager should report disk pressure since soft threshold was met") + } + + // verify the right pod was killed with the right grace period. + if podKiller.pod != pods[0] { + t.Errorf("Manager chose to kill pod: %v, but should have chosen %v", podKiller.pod, pods[0]) + } + if podKiller.gracePeriodOverride == nil { + t.Errorf("Manager chose to kill pod but should have had a grace period override.") + } + observedGracePeriod := *podKiller.gracePeriodOverride + if observedGracePeriod != manager.config.MaxPodGracePeriodSeconds { + t.Errorf("Manager chose to kill pod with incorrect grace period. Expected: %d, actual: %d", manager.config.MaxPodGracePeriodSeconds, observedGracePeriod) + } + // reset state + podKiller.pod = nil + podKiller.gracePeriodOverride = nil + + // remove disk pressure + fakeClock.Step(20 * time.Minute) + summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats) + manager.synchronize(activePodsFunc) + + // we should not have disk pressure + if manager.IsUnderDiskPressure() { + t.Errorf("Manager should not report disk pressure") + } + + // induce disk pressure! + fakeClock.Step(1 * time.Minute) + summaryProvider.result = summaryStatsMaker("500Mi", "200Gi", podStats) + manager.synchronize(activePodsFunc) + + // we should have disk pressure + if !manager.IsUnderDiskPressure() { + t.Errorf("Manager should report disk pressure") + } + + // check the right pod was killed + if podKiller.pod != pods[0] { + t.Errorf("Manager chose to kill pod: %v, but should have chosen %v", podKiller.pod, pods[0]) + } + observedGracePeriod = *podKiller.gracePeriodOverride + if observedGracePeriod != int64(0) { + t.Errorf("Manager chose to kill pod with incorrect grace period. Expected: %d, actual: %d", 0, observedGracePeriod) + } + + // try to admit our pod (should fail) + if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: podToAdmit}); result.Admit { + t.Errorf("Admit pod: %v, expected: %v, actual: %v", podToAdmit, false, result.Admit) + } + + // reduce disk pressure + fakeClock.Step(1 * time.Minute) + summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats) + podKiller.pod = nil // reset state + manager.synchronize(activePodsFunc) + + // we should have disk pressure (because transition period not yet met) + if !manager.IsUnderDiskPressure() { + t.Errorf("Manager should report disk pressure") + } + + // no pod should have been killed + if podKiller.pod != nil { + t.Errorf("Manager chose to kill pod: %v when no pod should have been killed", podKiller.pod) + } + + // try to admit our pod (should fail) + if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: podToAdmit}); result.Admit { + t.Errorf("Admit pod: %v, expected: %v, actual: %v", podToAdmit, false, result.Admit) + } + + // move the clock past transition period to ensure that we stop reporting pressure + fakeClock.Step(5 * time.Minute) + summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats) + podKiller.pod = nil // reset state + manager.synchronize(activePodsFunc) + + // we should not have disk pressure (because transition period met) + if manager.IsUnderDiskPressure() { + t.Errorf("Manager should not report disk pressure") + } + + // no pod should have been killed + if podKiller.pod != nil { + t.Errorf("Manager chose to kill pod: %v when no pod should have been killed", podKiller.pod) + } + + // try to admit our pod (should succeed) + if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: podToAdmit}); !result.Admit { + t.Errorf("Admit pod: %v, expected: %v, actual: %v", podToAdmit, true, result.Admit) + } +} diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index d10831339cc..d38d31c4410 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -41,21 +41,24 @@ const ( message = "The node was low on compute resources." // disk, in bytes. internal to this module, used to account for local disk usage. resourceDisk api.ResourceName = "disk" + // imagefs, in bytes. internal to this module, used to account for local image filesystem usage. + resourceImageFs api.ResourceName = "imagefs" + // nodefs, in bytes. internal to this module, used to account for local node root filesystem usage. + resourceNodeFs api.ResourceName = "nodefs" ) -// resourceToRankFunc maps a resource to ranking function for that resource. -var resourceToRankFunc = map[api.ResourceName]rankFunc{ - api.ResourceMemory: rankMemoryPressure, -} - // signalToNodeCondition maps a signal to the node condition to report if threshold is met. var signalToNodeCondition = map[Signal]api.NodeConditionType{ - SignalMemoryAvailable: api.NodeMemoryPressure, + SignalMemoryAvailable: api.NodeMemoryPressure, + SignalImageFsAvailable: api.NodeDiskPressure, + SignalNodeFsAvailable: api.NodeDiskPressure, } // signalToResource maps a Signal to its associated Resource. var signalToResource = map[Signal]api.ResourceName{ - SignalMemoryAvailable: api.ResourceMemory, + SignalMemoryAvailable: api.ResourceMemory, + SignalImageFsAvailable: resourceImageFs, + SignalNodeFsAvailable: resourceNodeFs, } // validSignal returns true if the signal is supported. @@ -160,7 +163,6 @@ func parseThresholdStatement(statement string) (Threshold, error) { if quantity.Sign() < 0 { return Threshold{}, fmt.Errorf("eviction threshold %v cannot be negative: %s", signal, &quantity) } - return Threshold{ Signal: signal, Operator: operator, @@ -252,14 +254,52 @@ func memoryUsage(memStats *statsapi.MemoryStats) *resource.Quantity { return resource.NewQuantity(usage, resource.BinarySI) } -// podUsage aggregates usage of compute resources. -// it supports the following memory and disk. -func podUsage(podStats statsapi.PodStats) (api.ResourceList, error) { +// localVolumeNames returns the set of volumes for the pod that are local +func localVolumeNames(pod *api.Pod) []string { + result := []string{} + for _, volume := range pod.Spec.Volumes { + if volume.HostPath != nil || + (volume.EmptyDir != nil && volume.EmptyDir.Medium != api.StorageMediumMemory) || + volume.ConfigMap != nil || + volume.GitRepo != nil { + result = append(result, volume.Name) + } + } + return result +} + +// podDiskUsage aggregates pod disk usage for the specified stats to measure. +func podDiskUsage(podStats statsapi.PodStats, pod *api.Pod, statsToMeasure []fsStats) (api.ResourceList, error) { + disk := resource.Quantity{Format: resource.BinarySI} + for _, container := range podStats.Containers { + if hasFsStats(statsToMeasure, fsStatsRoot) { + disk.Add(*diskUsage(container.Rootfs)) + } + if hasFsStats(statsToMeasure, fsStatsLogs) { + disk.Add(*diskUsage(container.Logs)) + } + } + if hasFsStats(statsToMeasure, fsStatsLocalVolumeSource) { + volumeNames := localVolumeNames(pod) + for _, volumeName := range volumeNames { + for _, volumeStats := range podStats.VolumeStats { + if volumeStats.Name == volumeName { + disk.Add(*diskUsage(&volumeStats.FsStats)) + } + } + } + } + return api.ResourceList{ + resourceDisk: disk, + }, nil +} + +// podMemoryUsage aggregates pod memory usage. +func podMemoryUsage(podStats statsapi.PodStats) (api.ResourceList, error) { disk := resource.Quantity{Format: resource.BinarySI} memory := resource.Quantity{Format: resource.BinarySI} for _, container := range podStats.Containers { // disk usage (if known) - // TODO: need to handle volumes for _, fsStats := range []*statsapi.FsStats{container.Rootfs, container.Logs} { disk.Add(*diskUsage(fsStats)) } @@ -384,12 +424,12 @@ func memory(stats statsFunc) cmpFunc { return 1 } // if we cant get usage for p1 measured, we want p2 first - p1Usage, err := podUsage(p1Stats) + p1Usage, err := podMemoryUsage(p1Stats) if err != nil { return -1 } // if we cant get usage for p2 measured, we want p1 first - p2Usage, err := podUsage(p2Stats) + p2Usage, err := podMemoryUsage(p2Stats) if err != nil { return 1 } @@ -411,7 +451,7 @@ func memory(stats statsFunc) cmpFunc { } // disk compares pods by largest consumer of disk relative to request. -func disk(stats statsFunc) cmpFunc { +func disk(stats statsFunc, fsStatsToMeasure []fsStats) cmpFunc { return func(p1, p2 *api.Pod) int { p1Stats, found := stats(p1) // if we have no usage stats for p1, we want p2 first @@ -424,20 +464,20 @@ func disk(stats statsFunc) cmpFunc { return 1 } // if we cant get usage for p1 measured, we want p2 first - p1Usage, err := podUsage(p1Stats) + p1Usage, err := podDiskUsage(p1Stats, p1, fsStatsToMeasure) if err != nil { return -1 } // if we cant get usage for p2 measured, we want p1 first - p2Usage, err := podUsage(p2Stats) + p2Usage, err := podDiskUsage(p2Stats, p2, fsStatsToMeasure) if err != nil { return 1 } // disk is best effort, so we don't measure relative to a request. // TODO: add disk as a guaranteed resource - p1Disk := p1Usage[api.ResourceStorage] - p2Disk := p2Usage[api.ResourceStorage] + p1Disk := p1Usage[resourceDisk] + p2Disk := p2Usage[resourceDisk] // if p2 is using more than p1, we want p2 first return p2Disk.Cmp(p1Disk) } @@ -448,9 +488,11 @@ func rankMemoryPressure(pods []*api.Pod, stats statsFunc) { orderedBy(qosComparator, memory(stats)).Sort(pods) } -// rankDiskPressure orders the input pods for eviction in response to disk pressure. -func rankDiskPressure(pods []*api.Pod, stats statsFunc) { - orderedBy(qosComparator, disk(stats)).Sort(pods) +// rankDiskPressureFunc returns a rankFunc that measures the specified fs stats. +func rankDiskPressureFunc(fsStatsToMeasure []fsStats) rankFunc { + return func(pods []*api.Pod, stats statsFunc) { + orderedBy(qosComparator, disk(stats, fsStatsToMeasure)).Sort(pods) + } } // byEvictionPriority implements sort.Interface for []api.ResourceName. @@ -474,7 +516,18 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider) (signalObserv statsFunc := cachedStatsFunc(summary.Pods) // build an evaluation context for current eviction signals result := signalObservations{} - result[SignalMemoryAvailable] = resource.NewQuantity(int64(*summary.Node.Memory.AvailableBytes), resource.BinarySI) + + if memory := summary.Node.Memory; memory != nil && memory.AvailableBytes != nil { + result[SignalMemoryAvailable] = resource.NewQuantity(int64(*memory.AvailableBytes), resource.BinarySI) + } + if nodeFs := summary.Node.Fs; nodeFs != nil && nodeFs.AvailableBytes != nil { + result[SignalNodeFsAvailable] = resource.NewQuantity(int64(*nodeFs.AvailableBytes), resource.BinarySI) + } + if summary.Node.Runtime != nil { + if imageFs := summary.Node.Runtime.ImageFs; imageFs != nil && imageFs.AvailableBytes != nil { + result[SignalImageFsAvailable] = resource.NewQuantity(int64(*imageFs.AvailableBytes), resource.BinarySI) + } + } return result, statsFunc, nil } @@ -569,6 +622,16 @@ func nodeConditionsObservedSince(observedAt nodeConditionsObservedAt, period tim return results } +// hgasFsStats returns true if the fsStat is in the input list +func hasFsStats(inputs []fsStats, item fsStats) bool { + for _, input := range inputs { + if input == item { + return true + } + } + return false +} + // hasNodeCondition returns true if the node condition is in the input list func hasNodeCondition(inputs []api.NodeConditionType, item api.NodeConditionType) bool { for _, input := range inputs { @@ -612,3 +675,21 @@ func isSoftEviction(thresholds []Threshold, starvedResource api.ResourceName) bo } return true } + +// buildresourceToRankFunc returns ranking functions associated with resources +func buildResourceToRankFunc(withImageFs bool) map[api.ResourceName]rankFunc { + resourceToRankFunc := map[api.ResourceName]rankFunc{ + api.ResourceMemory: rankMemoryPressure, + } + // usage of an imagefs is optional + if withImageFs { + // with an imagefs, nodefs pod rank func for eviction only includes logs and local volumes + resourceToRankFunc[resourceNodeFs] = rankDiskPressureFunc([]fsStats{fsStatsLogs, fsStatsLocalVolumeSource}) + // with an imagefs, imagefs pod rank func for eviction only includes rootfs + resourceToRankFunc[resourceImageFs] = rankDiskPressureFunc([]fsStats{fsStatsRoot}) + } else { + // without an imagefs, nodefs pod rank func for eviction looks at all fs stats + resourceToRankFunc[resourceNodeFs] = rankDiskPressureFunc([]fsStats{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}) + } + return resourceToRankFunc +} diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index b57b1842771..8e17d5e52c7 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -75,6 +75,41 @@ func TestParseThresholdConfig(t *testing.T) { }, }, }, + "disk flag values": { + evictionHard: "imagefs.available<150Mi,nodefs.available<100Mi", + evictionSoft: "imagefs.available<300Mi,nodefs.available<200Mi", + evictionSoftGracePeriod: "imagefs.available=30s,nodefs.available=30s", + evictionMinReclaim: "imagefs.available=2Gi,nodefs.available=1Gi", + expectErr: false, + expectThresholds: []Threshold{ + { + Signal: SignalImageFsAvailable, + Operator: OpLessThan, + Value: quantityMustParse("150Mi"), + MinReclaim: quantityMustParse("2Gi"), + }, + { + Signal: SignalNodeFsAvailable, + Operator: OpLessThan, + Value: quantityMustParse("100Mi"), + MinReclaim: quantityMustParse("1Gi"), + }, + { + Signal: SignalImageFsAvailable, + Operator: OpLessThan, + Value: quantityMustParse("300Mi"), + GracePeriod: gracePeriod, + MinReclaim: quantityMustParse("2Gi"), + }, + { + Signal: SignalNodeFsAvailable, + Operator: OpLessThan, + Value: quantityMustParse("200Mi"), + GracePeriod: gracePeriod, + MinReclaim: quantityMustParse("1Gi"), + }, + }, + }, "invalid-signal": { evictionHard: "mem.available<150Mi", evictionSoft: "", @@ -127,6 +162,7 @@ func TestParseThresholdConfig(t *testing.T) { evictionHard: "", evictionSoft: "memory.available<150Mi", evictionSoftGracePeriod: "memory.available=-30s", + evictionMinReclaim: "", expectErr: true, expectThresholds: []Threshold{}, }, @@ -199,13 +235,13 @@ func thresholdEqual(a Threshold, b Threshold) bool { func TestOrderedByQoS(t *testing.T) { bestEffort := newPod("best-effort", []api.Container{ newContainer("best-effort", newResourceList("", ""), newResourceList("", "")), - }) + }, nil) burstable := newPod("burstable", []api.Container{ newContainer("burstable", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi")), - }) + }, nil) guaranteed := newPod("guaranteed", []api.Container{ newContainer("guaranteed", newResourceList("200m", "200Mi"), newResourceList("200m", "200Mi")), - }) + }, nil) pods := []*api.Pod{guaranteed, burstable, bestEffort} orderedBy(qosComparator).Sort(pods) @@ -218,26 +254,158 @@ func TestOrderedByQoS(t *testing.T) { } } +// TestOrderedByDisk ensures we order pods by greediest disk consumer +func TestOrderedByDisk(t *testing.T) { + pod1 := newPod("best-effort-high", []api.Container{ + newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + pod2 := newPod("best-effort-low", []api.Container{ + newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + pod3 := newPod("burstable-high", []api.Container{ + newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + pod4 := newPod("burstable-low", []api.Container{ + newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + pod5 := newPod("guaranteed-high", []api.Container{ + newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + pod6 := newPod("guaranteed-low", []api.Container{ + newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + stats := map[*api.Pod]statsapi.PodStats{ + pod1: newPodDiskStats(pod1, resource.MustParse("50Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 200Mi + pod2: newPodDiskStats(pod2, resource.MustParse("100Mi"), resource.MustParse("150Mi"), resource.MustParse("50Mi")), // 300Mi + pod3: newPodDiskStats(pod3, resource.MustParse("200Mi"), resource.MustParse("150Mi"), resource.MustParse("50Mi")), // 400Mi + pod4: newPodDiskStats(pod4, resource.MustParse("300Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 450Mi + pod5: newPodDiskStats(pod5, resource.MustParse("400Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 550Mi + pod6: newPodDiskStats(pod6, resource.MustParse("500Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 650Mi + } + statsFn := func(pod *api.Pod) (statsapi.PodStats, bool) { + result, found := stats[pod] + return result, found + } + pods := []*api.Pod{pod1, pod2, pod3, pod4, pod5, pod6} + orderedBy(disk(statsFn, []fsStats{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource})).Sort(pods) + expected := []*api.Pod{pod6, pod5, pod4, pod3, pod2, pod1} + for i := range expected { + if pods[i] != expected[i] { + t.Errorf("Expected pod[%d]: %s, but got: %s", i, expected[i].Name, pods[i].Name) + } + } +} + +// TestOrderedByQoSDisk ensures we order pods by qos and then greediest disk consumer +func TestOrderedByQoSDisk(t *testing.T) { + pod1 := newPod("best-effort-high", []api.Container{ + newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + pod2 := newPod("best-effort-low", []api.Container{ + newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + pod3 := newPod("burstable-high", []api.Container{ + newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + pod4 := newPod("burstable-low", []api.Container{ + newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + pod5 := newPod("guaranteed-high", []api.Container{ + newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + pod6 := newPod("guaranteed-low", []api.Container{ + newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), + }, []api.Volume{ + newVolume("local-volume", api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }), + }) + stats := map[*api.Pod]statsapi.PodStats{ + pod1: newPodDiskStats(pod1, resource.MustParse("50Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 200Mi + pod2: newPodDiskStats(pod2, resource.MustParse("100Mi"), resource.MustParse("150Mi"), resource.MustParse("50Mi")), // 300Mi + pod3: newPodDiskStats(pod3, resource.MustParse("200Mi"), resource.MustParse("150Mi"), resource.MustParse("50Mi")), // 400Mi + pod4: newPodDiskStats(pod4, resource.MustParse("300Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 450Mi + pod5: newPodDiskStats(pod5, resource.MustParse("400Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 550Mi + pod6: newPodDiskStats(pod6, resource.MustParse("500Mi"), resource.MustParse("100Mi"), resource.MustParse("50Mi")), // 650Mi + } + statsFn := func(pod *api.Pod) (statsapi.PodStats, bool) { + result, found := stats[pod] + return result, found + } + pods := []*api.Pod{pod1, pod2, pod3, pod4, pod5, pod6} + orderedBy(qosComparator, disk(statsFn, []fsStats{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource})).Sort(pods) + expected := []*api.Pod{pod2, pod1, pod4, pod3, pod6, pod5} + for i := range expected { + if pods[i] != expected[i] { + t.Errorf("Expected pod[%d]: %s, but got: %s", i, expected[i].Name, pods[i].Name) + } + } +} + // TestOrderedByMemory ensures we order pods by greediest memory consumer relative to request. func TestOrderedByMemory(t *testing.T) { pod1 := newPod("best-effort-high", []api.Container{ newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")), - }) + }, nil) pod2 := newPod("best-effort-low", []api.Container{ newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")), - }) + }, nil) pod3 := newPod("burstable-high", []api.Container{ newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), - }) + }, nil) pod4 := newPod("burstable-low", []api.Container{ newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), - }) + }, nil) pod5 := newPod("guaranteed-high", []api.Container{ newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), - }) + }, nil) pod6 := newPod("guaranteed-low", []api.Container{ newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), - }) + }, nil) stats := map[*api.Pod]statsapi.PodStats{ pod1: newPodMemoryStats(pod1, resource.MustParse("500Mi")), // 500 relative to request pod2: newPodMemoryStats(pod2, resource.MustParse("300Mi")), // 300 relative to request @@ -264,22 +432,22 @@ func TestOrderedByMemory(t *testing.T) { func TestOrderedByQoSMemory(t *testing.T) { pod1 := newPod("best-effort-high", []api.Container{ newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")), - }) + }, nil) pod2 := newPod("best-effort-low", []api.Container{ newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")), - }) + }, nil) pod3 := newPod("burstable-high", []api.Container{ newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), - }) + }, nil) pod4 := newPod("burstable-low", []api.Container{ newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), - }) + }, nil) pod5 := newPod("guaranteed-high", []api.Container{ newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), - }) + }, nil) pod6 := newPod("guaranteed-low", []api.Container{ newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), - }) + }, nil) stats := map[*api.Pod]statsapi.PodStats{ pod1: newPodMemoryStats(pod1, resource.MustParse("500Mi")), // 500 relative to request pod2: newPodMemoryStats(pod2, resource.MustParse("50Mi")), // 50 relative to request @@ -346,11 +514,21 @@ func TestMakeSignalObservations(t *testing.T) { return pod } nodeAvailableBytes := uint64(1024 * 1024 * 1024) + imageFsAvailableBytes := uint64(1024 * 1024) + nodeFsAvailableBytes := uint64(1024) fakeStats := &statsapi.Summary{ Node: statsapi.NodeStats{ Memory: &statsapi.MemoryStats{ AvailableBytes: &nodeAvailableBytes, }, + Runtime: &statsapi.RuntimeStats{ + ImageFs: &statsapi.FsStats{ + AvailableBytes: &imageFsAvailableBytes, + }, + }, + Fs: &statsapi.FsStats{ + AvailableBytes: &nodeFsAvailableBytes, + }, }, Pods: []statsapi.PodStats{}, } @@ -370,12 +548,26 @@ func TestMakeSignalObservations(t *testing.T) { if err != nil { t.Errorf("Unexpected err: %v", err) } - quantity, found := actualObservations[SignalMemoryAvailable] + memQuantity, found := actualObservations[SignalMemoryAvailable] if !found { t.Errorf("Expected available memory observation: %v", err) } - if expectedBytes := int64(nodeAvailableBytes); quantity.Value() != expectedBytes { - t.Errorf("Expected %v, actual: %v", expectedBytes, quantity.Value()) + if expectedBytes := int64(nodeAvailableBytes); memQuantity.Value() != expectedBytes { + t.Errorf("Expected %v, actual: %v", expectedBytes, memQuantity.Value()) + } + nodeFsQuantity, found := actualObservations[SignalNodeFsAvailable] + if !found { + t.Errorf("Expected available nodefs observation: %v", err) + } + if expectedBytes := int64(nodeFsAvailableBytes); nodeFsQuantity.Value() != expectedBytes { + t.Errorf("Expected %v, actual: %v", expectedBytes, nodeFsQuantity.Value()) + } + imageFsQuantity, found := actualObservations[SignalImageFsAvailable] + if !found { + t.Errorf("Expected available imagefs observation: %v", err) + } + if expectedBytes := int64(imageFsAvailableBytes); imageFsQuantity.Value() != expectedBytes { + t.Errorf("Expected %v, actual: %v", expectedBytes, imageFsQuantity.Value()) } for _, pod := range pods { podStats, found := statsFunc(pod) @@ -670,6 +862,18 @@ func TestReclaimResources(t *testing.T) { }, result: []api.ResourceName{api.ResourceMemory}, }, + "imagefs.available": { + inputs: []Threshold{ + {Signal: SignalImageFsAvailable}, + }, + result: []api.ResourceName{resourceImageFs}, + }, + "nodefs.available": { + inputs: []Threshold{ + {Signal: SignalNodeFsAvailable}, + }, + result: []api.ResourceName{resourceNodeFs}, + }, } for testName, testCase := range testCases { actual := reclaimResources(testCase.inputs) @@ -681,6 +885,40 @@ func TestReclaimResources(t *testing.T) { } } +// newPodDiskStats returns stats with specified usage amounts. +func newPodDiskStats(pod *api.Pod, rootFsUsed, logsUsed, perLocalVolumeUsed resource.Quantity) statsapi.PodStats { + result := statsapi.PodStats{ + PodRef: statsapi.PodReference{ + Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID), + }, + } + + rootFsUsedBytes := uint64(rootFsUsed.Value()) + logsUsedBytes := uint64(logsUsed.Value()) + for range pod.Spec.Containers { + result.Containers = append(result.Containers, statsapi.ContainerStats{ + Rootfs: &statsapi.FsStats{ + UsedBytes: &rootFsUsedBytes, + }, + Logs: &statsapi.FsStats{ + UsedBytes: &logsUsedBytes, + }, + }) + } + + perLocalVolumeUsedBytes := uint64(perLocalVolumeUsed.Value()) + for _, volumeName := range localVolumeNames(pod) { + result.VolumeStats = append(result.VolumeStats, statsapi.VolumeStats{ + Name: volumeName, + FsStats: statsapi.FsStats{ + UsedBytes: &perLocalVolumeUsedBytes, + }, + }) + } + + return result +} + func newPodMemoryStats(pod *api.Pod, workingSet resource.Quantity) statsapi.PodStats { result := statsapi.PodStats{ PodRef: statsapi.PodReference{ @@ -723,13 +961,21 @@ func newContainer(name string, requests api.ResourceList, limits api.ResourceLis } } -func newPod(name string, containers []api.Container) *api.Pod { +func newVolume(name string, volumeSource api.VolumeSource) api.Volume { + return api.Volume{ + Name: name, + VolumeSource: volumeSource, + } +} + +func newPod(name string, containers []api.Container, volumes []api.Volume) *api.Pod { return &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: name, }, Spec: api.PodSpec{ Containers: containers, + Volumes: volumes, }, } } diff --git a/pkg/kubelet/eviction/types.go b/pkg/kubelet/eviction/types.go index 40b6f91e1a5..295cdefd812 100644 --- a/pkg/kubelet/eviction/types.go +++ b/pkg/kubelet/eviction/types.go @@ -30,6 +30,22 @@ type Signal string const ( // SignalMemoryAvailable is memory available (i.e. capacity - workingSet), in bytes. SignalMemoryAvailable Signal = "memory.available" + // SignalNodeFsAvailable is amount of storage available on filesystem that kubelet uses for volumes, daemon logs, etc. + SignalNodeFsAvailable Signal = "nodefs.available" + // SignalImageFsAvailable is amount of storage available on filesystem that container runtime uses for for storing images and container writable layers. + SignalImageFsAvailable Signal = "imagefs.available" +) + +// fsStats defines the types of filesystem stats to collect. +type fsStats string + +const ( + // fsStatsLocalVolumeSource identifies stats for pod local volume sources. + fsStatsLocalVolumeSource fsStats = "localVolumeSource" + // fsStatsLogs identifies stats for pod logs. + fsStatsLogs fsStats = "logs" + // fsStatsRoot identifies stats for pod container writable layers. + fsStatsRoot fsStats = "root" ) // ThresholdOperator is the operator used to express a Threshold. @@ -71,6 +87,9 @@ type Manager interface { // IsUnderMemoryPressure returns true if the node is under memory pressure. IsUnderMemoryPressure() bool + + // IsUnderDiskPressure returns true if the node is under disk pressure. + IsUnderDiskPressure() bool } // DiskInfoProvider is responsible for informing the manager how disk is configured. diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index d128b759ff2..80182d4416b 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -587,6 +587,64 @@ func (kl *Kubelet) setNodeMemoryPressureCondition(node *api.Node) { } } +// setNodeDiskPressureCondition for the node. +// TODO: this needs to move somewhere centralized... +func (kl *Kubelet) setNodeDiskPressureCondition(node *api.Node) { + currentTime := unversioned.NewTime(kl.clock.Now()) + var condition *api.NodeCondition + + // Check if NodeDiskPressure condition already exists and if it does, just pick it up for update. + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == api.NodeDiskPressure { + condition = &node.Status.Conditions[i] + } + } + + newCondition := false + // If the NodeDiskPressure condition doesn't exist, create one + if condition == nil { + condition = &api.NodeCondition{ + Type: api.NodeDiskPressure, + Status: api.ConditionUnknown, + } + // cannot be appended to node.Status.Conditions here because it gets + // copied to the slice. So if we append to the slice here none of the + // updates we make below are reflected in the slice. + newCondition = true + } + + // Update the heartbeat time + condition.LastHeartbeatTime = currentTime + + // Note: The conditions below take care of the case when a new NodeDiskressure condition is + // created and as well as the case when the condition already exists. When a new condition + // is created its status is set to api.ConditionUnknown which matches either + // condition.Status != api.ConditionTrue or + // condition.Status != api.ConditionFalse in the conditions below depending on whether + // the kubelet is under disk pressure or not. + if kl.evictionManager.IsUnderDiskPressure() { + if condition.Status != api.ConditionTrue { + condition.Status = api.ConditionTrue + condition.Reason = "KubeletHasDiskPressure" + condition.Message = "kubelet has disk pressure" + condition.LastTransitionTime = currentTime + kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasDiskPressure") + } + } else { + if condition.Status != api.ConditionFalse { + condition.Status = api.ConditionFalse + condition.Reason = "KubeletHasNoDiskPressure" + condition.Message = "kubelet has no disk pressure" + condition.LastTransitionTime = currentTime + kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasNoDiskPressure") + } + } + + if newCondition { + node.Status.Conditions = append(node.Status.Conditions, *condition) + } +} + // Set OODcondition for the node. func (kl *Kubelet) setNodeOODCondition(node *api.Node) { currentTime := unversioned.NewTime(kl.clock.Now()) @@ -700,6 +758,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*api.Node) error { withoutError(kl.setNodeStatusInfo), withoutError(kl.setNodeOODCondition), withoutError(kl.setNodeMemoryPressureCondition), + withoutError(kl.setNodeDiskPressureCondition), withoutError(kl.setNodeReadyCondition), withoutError(kl.setNodeVolumesInUseStatus), withoutError(kl.recordNodeSchedulableEvent), diff --git a/pkg/kubelet/kubelet_node_status_test.go b/pkg/kubelet/kubelet_node_status_test.go index c06551d711c..e76ab6f970d 100644 --- a/pkg/kubelet/kubelet_node_status_test.go +++ b/pkg/kubelet/kubelet_node_status_test.go @@ -133,6 +133,14 @@ func TestUpdateNewNodeStatus(t *testing.T) { LastHeartbeatTime: unversioned.Time{}, LastTransitionTime: unversioned.Time{}, }, + { + Type: api.NodeDiskPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasNoDiskPressure", + Message: fmt.Sprintf("kubelet has no disk pressure"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, { Type: api.NodeReady, Status: api.ConditionTrue, @@ -316,6 +324,14 @@ func TestUpdateExistingNodeStatus(t *testing.T) { LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, + { + Type: api.NodeDiskPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientDisk", + Message: fmt.Sprintf("kubelet has sufficient disk space available"), + LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, { Type: api.NodeReady, Status: api.ConditionTrue, @@ -380,6 +396,14 @@ func TestUpdateExistingNodeStatus(t *testing.T) { LastHeartbeatTime: unversioned.Time{}, LastTransitionTime: unversioned.Time{}, }, + { + Type: api.NodeDiskPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientDisk", + Message: fmt.Sprintf("kubelet has sufficient disk space available"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, { Type: api.NodeReady, Status: api.ConditionTrue, @@ -489,7 +513,6 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) LastTransitionTime: unversioned.NewTime(clock.Now()), }, { - Type: api.NodeOutOfDisk, Status: api.ConditionTrue, Reason: "KubeletOutOfDisk", @@ -509,8 +532,13 @@ func TestUpdateExistingNodeOutOfDiskStatusWithTransitionFrequency(t *testing.T) NumCores: 2, MemoryCapacity: 1024, } + fsInfo := cadvisorapiv2.FsInfo{ + Device: "123", + } mockCadvisor.On("Start").Return(nil) mockCadvisor.On("MachineInfo").Return(machineInfo, nil) + mockCadvisor.On("ImagesFsInfo").Return(fsInfo, nil) + mockCadvisor.On("RootFsInfo").Return(fsInfo, nil) versionInfo := &cadvisorapi.VersionInfo{ KernelVersion: "3.16.0-0.bpo.4-amd64", ContainerOsVersion: "Debian GNU/Linux 7 (wheezy)", @@ -671,6 +699,14 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { LastHeartbeatTime: unversioned.Time{}, LastTransitionTime: unversioned.Time{}, }, + { + Type: api.NodeDiskPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasNoDiskPressure", + Message: fmt.Sprintf("kubelet has no disk pressure"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, {}, //placeholder }, NodeInfo: api.NodeSystemInfo{ From 9604b47c1348af60d173d06b8695f2390e14f439 Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Fri, 22 Jul 2016 15:23:34 -0400 Subject: [PATCH 2/4] Scheduler does not place pods on nodes that have disk pressure --- docs/devel/scheduler_algorithm.md | 1 + .../scheduler/algorithm/predicates/error.go | 1 + .../algorithm/predicates/predicates.go | 18 +++++ .../algorithm/predicates/predicates_test.go | 69 +++++++++++++++++++ .../algorithmprovider/defaults/defaults.go | 3 + 5 files changed, 92 insertions(+) diff --git a/docs/devel/scheduler_algorithm.md b/docs/devel/scheduler_algorithm.md index 2aaa84df1d1..ab9be4a8edf 100755 --- a/docs/devel/scheduler_algorithm.md +++ b/docs/devel/scheduler_algorithm.md @@ -49,6 +49,7 @@ The purpose of filtering the nodes is to filter out the nodes that do not meet c - `MaxEBSVolumeCount`: Ensure that the number of attached ElasticBlockStore volumes does not exceed a maximum value (by default, 39, since Amazon recommends a maximum of 40 with one of those 40 reserved for the root volume -- see [Amazon's documentation](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/volume_limits.html#linux-specific-volume-limits)). The maximum value can be controlled by setting the `KUBE_MAX_PD_VOLS` environment variable. - `MaxGCEPDVolumeCount`: Ensure that the number of attached GCE PersistentDisk volumes does not exceed a maximum value (by default, 16, which is the maximum GCE allows -- see [GCE's documentation](https://cloud.google.com/compute/docs/disks/persistent-disks#limits_for_predefined_machine_types)). The maximum value can be controlled by setting the `KUBE_MAX_PD_VOLS` environment variable. - `CheckNodeMemoryPressure`: Check if a pod can be scheduled on a node reporting memory pressure condition. Currently, no ``BestEffort`` should be placed on a node under memory pressure as it gets automatically evicted by kubelet. +- `CheckNodeDiskPressure`: Check if a pod can be scheduled on a node reporting disk pressure condition. Currently, no pods should be placed on a node under disk pressure as it gets automatically evicted by kubelet. The details of the above predicates can be found in [plugin/pkg/scheduler/algorithm/predicates/predicates.go](http://releases.k8s.io/HEAD/plugin/pkg/scheduler/algorithm/predicates/predicates.go). All predicates mentioned above can be used in combination to perform a sophisticated filtering policy. Kubernetes uses some, but not all, of these predicates by default. You can see which ones are used by default in [plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go](http://releases.k8s.io/HEAD/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go). diff --git a/plugin/pkg/scheduler/algorithm/predicates/error.go b/plugin/pkg/scheduler/algorithm/predicates/error.go index 72e86a1ba96..cd9d8b53823 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/error.go +++ b/plugin/pkg/scheduler/algorithm/predicates/error.go @@ -39,6 +39,7 @@ var ( ErrServiceAffinityViolated = newPredicateFailureError("CheckServiceAffinity") ErrMaxVolumeCountExceeded = newPredicateFailureError("MaxVolumeCount") ErrNodeUnderMemoryPressure = newPredicateFailureError("NodeUnderMemoryPressure") + ErrNodeUnderDiskPressure = newPredicateFailureError("NodeUnderDiskPressure") // ErrFakePredicate is used for test only. The fake predicates returning false also returns error // as ErrFakePredicate. ErrFakePredicate = newPredicateFailureError("FakePredicateError") diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 71b96c228f3..34b42e2bfe6 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -1041,3 +1041,21 @@ func CheckNodeMemoryPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo * return true, nil } + +// CheckNodeDiskPressurePredicate checks if a pod can be scheduled on a node +// reporting disk pressure condition. +func CheckNodeDiskPressurePredicate(pod *api.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, error) { + node := nodeInfo.Node() + if node == nil { + return false, fmt.Errorf("node not found") + } + + // is node under presure? + for _, cond := range node.Status.Conditions { + if cond.Type == api.NodeDiskPressure && cond.Status == api.ConditionTrue { + return false, ErrNodeUnderDiskPressure + } + } + + return true, nil +} diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index 48295b01da1..b2104996ade 100755 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -2906,3 +2906,72 @@ func TestPodSchedulesOnNodeWithMemoryPressureCondition(t *testing.T) { } } } + +func TestPodSchedulesOnNodeWithDiskPressureCondition(t *testing.T) { + pod := &api.Pod{ + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "container", + Image: "image", + ImagePullPolicy: "Always", + }, + }, + }, + } + + // specify a node with no disk pressure condition on + noPressureNode := &api.Node{ + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: "Ready", + Status: "True", + }, + }, + }, + } + + // specify a node with pressure condition on + pressureNode := &api.Node{ + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: "DiskPressure", + Status: "True", + }, + }, + }, + } + + tests := []struct { + pod *api.Pod + nodeInfo *schedulercache.NodeInfo + fits bool + name string + }{ + { + pod: pod, + nodeInfo: makeEmptyNodeInfo(noPressureNode), + fits: true, + name: "pod schedulable on node without pressure condition on", + }, + { + pod: pod, + nodeInfo: makeEmptyNodeInfo(pressureNode), + fits: false, + name: "pod not schedulable on node with pressure condition on", + }, + } + + for _, test := range tests { + fits, err := CheckNodeDiskPressurePredicate(test.pod, test.nodeInfo) + if fits != test.fits { + t.Errorf("%s: expected %v got %v", test.name, test.fits, fits) + } + + if err != nil && err != ErrNodeUnderDiskPressure { + t.Errorf("%s: unexpected error: %v", test.name, err) + } + } +} diff --git a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go index 151adf070ff..f55f4827080 100644 --- a/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go +++ b/plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go @@ -151,6 +151,9 @@ func defaultPredicates() sets.String { // Fit is determined by node memory pressure condition. factory.RegisterFitPredicate("CheckNodeMemoryPressure", predicates.CheckNodeMemoryPressurePredicate), + + // Fit is determined by node disk pressure condition. + factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate), ) } From 0de1e62b304cc90b78b4a754612c918e68b7da4f Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Mon, 25 Jul 2016 22:40:22 -0400 Subject: [PATCH 3/4] modify fsStats to fsStatsType to avoid confusion with cadvisor types --- pkg/kubelet/eviction/helpers.go | 26 ++++++++++--------- pkg/kubelet/eviction/helpers_test.go | 4 +-- pkg/kubelet/eviction/types.go | 10 +++---- .../algorithm/predicates/predicates_test.go | 2 +- 4 files changed, 22 insertions(+), 20 deletions(-) diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index d38d31c4410..ea6911240d2 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -255,6 +255,7 @@ func memoryUsage(memStats *statsapi.MemoryStats) *resource.Quantity { } // localVolumeNames returns the set of volumes for the pod that are local +// TODO: sumamry API should report what volumes consume local storage rather than hard-code here. func localVolumeNames(pod *api.Pod) []string { result := []string{} for _, volume := range pod.Spec.Volumes { @@ -269,22 +270,23 @@ func localVolumeNames(pod *api.Pod) []string { } // podDiskUsage aggregates pod disk usage for the specified stats to measure. -func podDiskUsage(podStats statsapi.PodStats, pod *api.Pod, statsToMeasure []fsStats) (api.ResourceList, error) { +func podDiskUsage(podStats statsapi.PodStats, pod *api.Pod, statsToMeasure []fsStatsType) (api.ResourceList, error) { disk := resource.Quantity{Format: resource.BinarySI} for _, container := range podStats.Containers { - if hasFsStats(statsToMeasure, fsStatsRoot) { + if hasFsStatsType(statsToMeasure, fsStatsRoot) { disk.Add(*diskUsage(container.Rootfs)) } - if hasFsStats(statsToMeasure, fsStatsLogs) { + if hasFsStatsType(statsToMeasure, fsStatsLogs) { disk.Add(*diskUsage(container.Logs)) } } - if hasFsStats(statsToMeasure, fsStatsLocalVolumeSource) { + if hasFsStatsType(statsToMeasure, fsStatsLocalVolumeSource) { volumeNames := localVolumeNames(pod) for _, volumeName := range volumeNames { for _, volumeStats := range podStats.VolumeStats { if volumeStats.Name == volumeName { disk.Add(*diskUsage(&volumeStats.FsStats)) + break } } } @@ -451,7 +453,7 @@ func memory(stats statsFunc) cmpFunc { } // disk compares pods by largest consumer of disk relative to request. -func disk(stats statsFunc, fsStatsToMeasure []fsStats) cmpFunc { +func disk(stats statsFunc, fsStatsToMeasure []fsStatsType) cmpFunc { return func(p1, p2 *api.Pod) int { p1Stats, found := stats(p1) // if we have no usage stats for p1, we want p2 first @@ -489,7 +491,7 @@ func rankMemoryPressure(pods []*api.Pod, stats statsFunc) { } // rankDiskPressureFunc returns a rankFunc that measures the specified fs stats. -func rankDiskPressureFunc(fsStatsToMeasure []fsStats) rankFunc { +func rankDiskPressureFunc(fsStatsToMeasure []fsStatsType) rankFunc { return func(pods []*api.Pod, stats statsFunc) { orderedBy(qosComparator, disk(stats, fsStatsToMeasure)).Sort(pods) } @@ -622,8 +624,8 @@ func nodeConditionsObservedSince(observedAt nodeConditionsObservedAt, period tim return results } -// hgasFsStats returns true if the fsStat is in the input list -func hasFsStats(inputs []fsStats, item fsStats) bool { +// hasFsStatsType returns true if the fsStat is in the input list +func hasFsStatsType(inputs []fsStatsType, item fsStatsType) bool { for _, input := range inputs { if input == item { return true @@ -676,7 +678,7 @@ func isSoftEviction(thresholds []Threshold, starvedResource api.ResourceName) bo return true } -// buildresourceToRankFunc returns ranking functions associated with resources +// buildResourceToRankFunc returns ranking functions associated with resources func buildResourceToRankFunc(withImageFs bool) map[api.ResourceName]rankFunc { resourceToRankFunc := map[api.ResourceName]rankFunc{ api.ResourceMemory: rankMemoryPressure, @@ -684,12 +686,12 @@ func buildResourceToRankFunc(withImageFs bool) map[api.ResourceName]rankFunc { // usage of an imagefs is optional if withImageFs { // with an imagefs, nodefs pod rank func for eviction only includes logs and local volumes - resourceToRankFunc[resourceNodeFs] = rankDiskPressureFunc([]fsStats{fsStatsLogs, fsStatsLocalVolumeSource}) + resourceToRankFunc[resourceNodeFs] = rankDiskPressureFunc([]fsStatsType{fsStatsLogs, fsStatsLocalVolumeSource}) // with an imagefs, imagefs pod rank func for eviction only includes rootfs - resourceToRankFunc[resourceImageFs] = rankDiskPressureFunc([]fsStats{fsStatsRoot}) + resourceToRankFunc[resourceImageFs] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot}) } else { // without an imagefs, nodefs pod rank func for eviction looks at all fs stats - resourceToRankFunc[resourceNodeFs] = rankDiskPressureFunc([]fsStats{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}) + resourceToRankFunc[resourceNodeFs] = rankDiskPressureFunc([]fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource}) } return resourceToRankFunc } diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index 8e17d5e52c7..93713ddb7f1 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -311,7 +311,7 @@ func TestOrderedByDisk(t *testing.T) { return result, found } pods := []*api.Pod{pod1, pod2, pod3, pod4, pod5, pod6} - orderedBy(disk(statsFn, []fsStats{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource})).Sort(pods) + orderedBy(disk(statsFn, []fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource})).Sort(pods) expected := []*api.Pod{pod6, pod5, pod4, pod3, pod2, pod1} for i := range expected { if pods[i] != expected[i] { @@ -377,7 +377,7 @@ func TestOrderedByQoSDisk(t *testing.T) { return result, found } pods := []*api.Pod{pod1, pod2, pod3, pod4, pod5, pod6} - orderedBy(qosComparator, disk(statsFn, []fsStats{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource})).Sort(pods) + orderedBy(qosComparator, disk(statsFn, []fsStatsType{fsStatsRoot, fsStatsLogs, fsStatsLocalVolumeSource})).Sort(pods) expected := []*api.Pod{pod2, pod1, pod4, pod3, pod6, pod5} for i := range expected { if pods[i] != expected[i] { diff --git a/pkg/kubelet/eviction/types.go b/pkg/kubelet/eviction/types.go index 295cdefd812..053c47809da 100644 --- a/pkg/kubelet/eviction/types.go +++ b/pkg/kubelet/eviction/types.go @@ -36,16 +36,16 @@ const ( SignalImageFsAvailable Signal = "imagefs.available" ) -// fsStats defines the types of filesystem stats to collect. -type fsStats string +// fsStatsType defines the types of filesystem stats to collect. +type fsStatsType string const ( // fsStatsLocalVolumeSource identifies stats for pod local volume sources. - fsStatsLocalVolumeSource fsStats = "localVolumeSource" + fsStatsLocalVolumeSource fsStatsType = "localVolumeSource" // fsStatsLogs identifies stats for pod logs. - fsStatsLogs fsStats = "logs" + fsStatsLogs fsStatsType = "logs" // fsStatsRoot identifies stats for pod container writable layers. - fsStatsRoot fsStats = "root" + fsStatsRoot fsStatsType = "root" ) // ThresholdOperator is the operator used to express a Threshold. diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go index b2104996ade..91959053aa1 100755 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates_test.go @@ -2965,7 +2965,7 @@ func TestPodSchedulesOnNodeWithDiskPressureCondition(t *testing.T) { } for _, test := range tests { - fits, err := CheckNodeDiskPressurePredicate(test.pod, test.nodeInfo) + fits, err := CheckNodeDiskPressurePredicate(test.pod, nil, test.nodeInfo) if fits != test.fits { t.Errorf("%s: expected %v got %v", test.name, test.fits, fits) } From d37710f87b7a2497d918d8c5e51323e352dcce98 Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Thu, 28 Jul 2016 13:02:01 -0400 Subject: [PATCH 4/4] Allow cAdvisor to collect fs stats before building resource ranking funcs --- pkg/kubelet/eviction/eviction_manager.go | 22 ++++++---- pkg/kubelet/eviction/eviction_manager_test.go | 44 ++++++++++++------- 2 files changed, 40 insertions(+), 26 deletions(-) diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index 3f2a1913cfc..adb4ee85bd4 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -109,15 +109,8 @@ func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAd // Start starts the control loop to observe and response to low compute resources. func (m *managerImpl) Start(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc, monitoringInterval time.Duration) error { - // build the ranking functions now that we can know if the imagefs is dedicated or not. - hasDedicatedImageFs, err := diskInfoProvider.HasDedicatedImageFs() - if err != nil { - return err - } - m.resourceToRankFunc = buildResourceToRankFunc(hasDedicatedImageFs) - // start the eviction manager monitoring - go wait.Until(func() { m.synchronize(podFunc) }, monitoringInterval, wait.NeverStop) + go wait.Until(func() { m.synchronize(diskInfoProvider, podFunc) }, monitoringInterval, wait.NeverStop) return nil } @@ -136,13 +129,24 @@ func (m *managerImpl) IsUnderDiskPressure() bool { } // synchronize is the main control loop that enforces eviction thresholds. -func (m *managerImpl) synchronize(podFunc ActivePodsFunc) { +func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc ActivePodsFunc) { // if we have nothing to do, just return thresholds := m.config.Thresholds if len(thresholds) == 0 { return } + // build the ranking functions (if not yet known) + // TODO: have a function in cadvisor that lets us know if global housekeeping has completed + if len(m.resourceToRankFunc) == 0 { + // this may error if cadvisor has yet to complete housekeeping, so we will just try again in next pass. + hasDedicatedImageFs, err := diskInfoProvider.HasDedicatedImageFs() + if err != nil { + return + } + m.resourceToRankFunc = buildResourceToRankFunc(hasDedicatedImageFs) + } + // make observations and get a function to derive pod usage stats relative to those observations. observations, statsFunc, err := makeSignalObservations(m.summaryProvider) if err != nil { diff --git a/pkg/kubelet/eviction/eviction_manager_test.go b/pkg/kubelet/eviction/eviction_manager_test.go index 8cfa546d7c3..0d8c365838e 100644 --- a/pkg/kubelet/eviction/eviction_manager_test.go +++ b/pkg/kubelet/eviction/eviction_manager_test.go @@ -44,6 +44,16 @@ func (m *mockPodKiller) killPodNow(pod *api.Pod, status api.PodStatus, gracePeri return nil } +// mockDiskInfoProvider is used to simulate testing. +type mockDiskInfoProvider struct { + dedicatedImageFs bool +} + +// HasDedicatedImageFs returns the mocked value +func (m *mockDiskInfoProvider) HasDedicatedImageFs() (bool, error) { + return m.dedicatedImageFs, nil +} + // TestMemoryPressure func TestMemoryPressure(t *testing.T) { podMaker := func(name string, requests api.ResourceList, limits api.ResourceList, memoryWorkingSet string) (*api.Pod, statsapi.PodStats) { @@ -95,6 +105,7 @@ func TestMemoryPressure(t *testing.T) { fakeClock := clock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} + diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} nodeRef := &api.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} config := Config{ @@ -124,7 +135,6 @@ func TestMemoryPressure(t *testing.T) { nodeRef: nodeRef, nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, thresholdsFirstObservedAt: thresholdsObservedAt{}, - resourceToRankFunc: buildResourceToRankFunc(false), } // create a best effort pod to test admission @@ -132,7 +142,7 @@ func TestMemoryPressure(t *testing.T) { burstablePodToAdmit, _ := podMaker("burst-admit", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi"), "0Gi") // synchronize - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should not have memory pressure if manager.IsUnderMemoryPressure() { @@ -150,7 +160,7 @@ func TestMemoryPressure(t *testing.T) { // induce soft threshold fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("1500Mi", podStats) - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -165,7 +175,7 @@ func TestMemoryPressure(t *testing.T) { // step forward in time pass the grace period fakeClock.Step(3 * time.Minute) summaryProvider.result = summaryStatsMaker("1500Mi", podStats) - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -190,7 +200,7 @@ func TestMemoryPressure(t *testing.T) { // remove memory pressure fakeClock.Step(20 * time.Minute) summaryProvider.result = summaryStatsMaker("3Gi", podStats) - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should not have memory pressure if manager.IsUnderMemoryPressure() { @@ -200,7 +210,7 @@ func TestMemoryPressure(t *testing.T) { // induce memory pressure! fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("500Mi", podStats) - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should have memory pressure if !manager.IsUnderMemoryPressure() { @@ -228,7 +238,7 @@ func TestMemoryPressure(t *testing.T) { fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("2Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should have memory pressure (because transition period not yet met) if !manager.IsUnderMemoryPressure() { @@ -252,7 +262,7 @@ func TestMemoryPressure(t *testing.T) { fakeClock.Step(5 * time.Minute) summaryProvider.result = summaryStatsMaker("2Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should not have memory pressure (because transition period met) if manager.IsUnderMemoryPressure() { @@ -338,8 +348,9 @@ func TestDiskPressureNodeFs(t *testing.T) { return pods } - fakeClock := util.NewFakeClock(time.Now()) + fakeClock := clock.NewFakeClock(time.Now()) podKiller := &mockPodKiller{} + diskInfoProvider := &mockDiskInfoProvider{dedicatedImageFs: false} nodeRef := &api.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} config := Config{ @@ -369,14 +380,13 @@ func TestDiskPressureNodeFs(t *testing.T) { nodeRef: nodeRef, nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, thresholdsFirstObservedAt: thresholdsObservedAt{}, - resourceToRankFunc: buildResourceToRankFunc(false), } // create a best effort pod to test admission podToAdmit, _ := podMaker("pod-to-admit", newResourceList("", ""), newResourceList("", ""), "0Gi", "0Gi", "0Gi") // synchronize - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should not have disk pressure if manager.IsUnderDiskPressure() { @@ -391,7 +401,7 @@ func TestDiskPressureNodeFs(t *testing.T) { // induce soft threshold fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("1.5Gi", "200Gi", podStats) - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -406,7 +416,7 @@ func TestDiskPressureNodeFs(t *testing.T) { // step forward in time pass the grace period fakeClock.Step(3 * time.Minute) summaryProvider.result = summaryStatsMaker("1.5Gi", "200Gi", podStats) - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -431,7 +441,7 @@ func TestDiskPressureNodeFs(t *testing.T) { // remove disk pressure fakeClock.Step(20 * time.Minute) summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats) - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should not have disk pressure if manager.IsUnderDiskPressure() { @@ -441,7 +451,7 @@ func TestDiskPressureNodeFs(t *testing.T) { // induce disk pressure! fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("500Mi", "200Gi", podStats) - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should have disk pressure if !manager.IsUnderDiskPressure() { @@ -466,7 +476,7 @@ func TestDiskPressureNodeFs(t *testing.T) { fakeClock.Step(1 * time.Minute) summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should have disk pressure (because transition period not yet met) if !manager.IsUnderDiskPressure() { @@ -487,7 +497,7 @@ func TestDiskPressureNodeFs(t *testing.T) { fakeClock.Step(5 * time.Minute) summaryProvider.result = summaryStatsMaker("16Gi", "200Gi", podStats) podKiller.pod = nil // reset state - manager.synchronize(activePodsFunc) + manager.synchronize(diskInfoProvider, activePodsFunc) // we should not have disk pressure (because transition period met) if manager.IsUnderDiskPressure() {