From 98e97a475afc5058bbc0a626efae09f35b930ee6 Mon Sep 17 00:00:00 2001 From: Seth Jennings Date: Wed, 14 Sep 2016 15:46:20 -0500 Subject: [PATCH] kubelet: eviction: avoid duplicate action on stale stats --- pkg/kubelet/eviction/eviction_manager.go | 6 ++ pkg/kubelet/eviction/helpers.go | 22 ++++++ pkg/kubelet/eviction/helpers_test.go | 90 ++++++++++++++++++++++++ pkg/kubelet/eviction/types.go | 3 + 4 files changed, 121 insertions(+) diff --git a/pkg/kubelet/eviction/eviction_manager.go b/pkg/kubelet/eviction/eviction_manager.go index e2b7d5742b0..678d6df8c7e 100644 --- a/pkg/kubelet/eviction/eviction_manager.go +++ b/pkg/kubelet/eviction/eviction_manager.go @@ -62,6 +62,8 @@ type managerImpl struct { resourceToRankFunc map[api.ResourceName]rankFunc // resourceToNodeReclaimFuncs maps a resource to an ordered list of functions that know how to reclaim that resource. resourceToNodeReclaimFuncs map[api.ResourceName]nodeReclaimFuncs + // last observations from synchronize + lastObservations signalObservations } // ensure it implements the required interface @@ -182,6 +184,9 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved) } + // determine the set of thresholds whose stats have been updated since the last sync + thresholds = thresholdsUpdatedStats(thresholds, observations, m.lastObservations) + // track when a threshold was first observed thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now) @@ -203,6 +208,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act m.thresholdsFirstObservedAt = thresholdsFirstObservedAt m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt m.thresholdsMet = thresholds + m.lastObservations = observations m.Unlock() // determine the set of resources under starvation diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index 77bf5f3ebe0..4aa37a2b767 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -600,6 +600,7 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider) (signalObserv result[SignalMemoryAvailable] = signalObservation{ available: resource.NewQuantity(int64(*memory.AvailableBytes), resource.BinarySI), capacity: resource.NewQuantity(int64(*memory.AvailableBytes+*memory.WorkingSetBytes), resource.BinarySI), + time: memory.Time, } } if nodeFs := summary.Node.Fs; nodeFs != nil { @@ -607,12 +608,14 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider) (signalObserv result[SignalNodeFsAvailable] = signalObservation{ available: resource.NewQuantity(int64(*nodeFs.AvailableBytes), resource.BinarySI), capacity: resource.NewQuantity(int64(*nodeFs.CapacityBytes), resource.BinarySI), + // TODO: add timestamp to stat (see memory stat) } } if nodeFs.InodesFree != nil && nodeFs.Inodes != nil { result[SignalNodeFsInodesFree] = signalObservation{ available: resource.NewQuantity(int64(*nodeFs.InodesFree), resource.BinarySI), capacity: resource.NewQuantity(int64(*nodeFs.Inodes), resource.BinarySI), + // TODO: add timestamp to stat (see memory stat) } } } @@ -622,11 +625,13 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider) (signalObserv result[SignalImageFsAvailable] = signalObservation{ available: resource.NewQuantity(int64(*imageFs.AvailableBytes), resource.BinarySI), capacity: resource.NewQuantity(int64(*imageFs.CapacityBytes), resource.BinarySI), + // TODO: add timestamp to stat (see memory stat) } if imageFs.InodesFree != nil && imageFs.Inodes != nil { result[SignalImageFsInodesFree] = signalObservation{ available: resource.NewQuantity(int64(*imageFs.InodesFree), resource.BinarySI), capacity: resource.NewQuantity(int64(*imageFs.Inodes), resource.BinarySI), + // TODO: add timestamp to stat (see memory stat) } } } @@ -664,6 +669,23 @@ func thresholdsMet(thresholds []Threshold, observations signalObservations, enfo return results } +func thresholdsUpdatedStats(thresholds []Threshold, observations, lastObservations signalObservations) []Threshold { + results := []Threshold{} + for i := range thresholds { + threshold := thresholds[i] + observed, found := observations[threshold.Signal] + if !found { + glog.Warningf("eviction manager: no observation found for eviction signal %v", threshold.Signal) + continue + } + last, found := lastObservations[threshold.Signal] + if !found || observed.time.IsZero() || observed.time.After(last.time.Time) { + results = append(results, threshold) + } + } + return results +} + // getThresholdQuantity returns the expected quantity value for a thresholdValue func getThresholdQuantity(value ThresholdValue, capacity *resource.Quantity) *resource.Quantity { if value.Quantity != nil { diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index df0aa0eb4bb..61240a65528 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -829,6 +829,96 @@ func TestThresholdsMet(t *testing.T) { } } +func TestThresholdsUpdatedStats(t *testing.T) { + updatedThreshold := Threshold{ + Signal: SignalMemoryAvailable, + } + locationUTC, err := time.LoadLocation("UTC") + if err != nil { + t.Error(err) + return + } + testCases := map[string]struct { + thresholds []Threshold + observations signalObservations + last signalObservations + result []Threshold + }{ + "empty": { + thresholds: []Threshold{}, + observations: signalObservations{}, + last: signalObservations{}, + result: []Threshold{}, + }, + "no-time": { + thresholds: []Threshold{updatedThreshold}, + observations: signalObservations{ + SignalMemoryAvailable: signalObservation{}, + }, + last: signalObservations{}, + result: []Threshold{updatedThreshold}, + }, + "no-last-observation": { + thresholds: []Threshold{updatedThreshold}, + observations: signalObservations{ + SignalMemoryAvailable: signalObservation{ + time: unversioned.Date(2016, 1, 1, 0, 0, 0, 0, locationUTC), + }, + }, + last: signalObservations{}, + result: []Threshold{updatedThreshold}, + }, + "time-machine": { + thresholds: []Threshold{updatedThreshold}, + observations: signalObservations{ + SignalMemoryAvailable: signalObservation{ + time: unversioned.Date(2016, 1, 1, 0, 0, 0, 0, locationUTC), + }, + }, + last: signalObservations{ + SignalMemoryAvailable: signalObservation{ + time: unversioned.Date(2016, 1, 1, 0, 1, 0, 0, locationUTC), + }, + }, + result: []Threshold{}, + }, + "same-observation": { + thresholds: []Threshold{updatedThreshold}, + observations: signalObservations{ + SignalMemoryAvailable: signalObservation{ + time: unversioned.Date(2016, 1, 1, 0, 0, 0, 0, locationUTC), + }, + }, + last: signalObservations{ + SignalMemoryAvailable: signalObservation{ + time: unversioned.Date(2016, 1, 1, 0, 0, 0, 0, locationUTC), + }, + }, + result: []Threshold{}, + }, + "new-observation": { + thresholds: []Threshold{updatedThreshold}, + observations: signalObservations{ + SignalMemoryAvailable: signalObservation{ + time: unversioned.Date(2016, 1, 1, 0, 1, 0, 0, locationUTC), + }, + }, + last: signalObservations{ + SignalMemoryAvailable: signalObservation{ + time: unversioned.Date(2016, 1, 1, 0, 0, 0, 0, locationUTC), + }, + }, + result: []Threshold{updatedThreshold}, + }, + } + for testName, testCase := range testCases { + actual := thresholdsUpdatedStats(testCase.thresholds, testCase.observations, testCase.last) + if !thresholdList(actual).Equal(thresholdList(testCase.result)) { + t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual) + } + } +} + func TestPercentageThresholdsMet(t *testing.T) { specifiecThresholds := []Threshold{ { diff --git a/pkg/kubelet/eviction/types.go b/pkg/kubelet/eviction/types.go index 3fd00dedeab..ecdbd0b924b 100644 --- a/pkg/kubelet/eviction/types.go +++ b/pkg/kubelet/eviction/types.go @@ -21,6 +21,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" statsapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" ) @@ -145,6 +146,8 @@ type signalObservation struct { capacity *resource.Quantity // The available resource available *resource.Quantity + // Time at which the observation was taken + time unversioned.Time } // signalObservations maps a signal to an observed quantity