kubelet: eviction: avoid duplicate action on stale stats

This commit is contained in:
Seth Jennings 2016-09-14 15:46:20 -05:00
parent 4077c864bf
commit 98e97a475a
4 changed files with 121 additions and 0 deletions

View File

@ -62,6 +62,8 @@ type managerImpl struct {
resourceToRankFunc map[api.ResourceName]rankFunc
// resourceToNodeReclaimFuncs maps a resource to an ordered list of functions that know how to reclaim that resource.
resourceToNodeReclaimFuncs map[api.ResourceName]nodeReclaimFuncs
// last observations from synchronize
lastObservations signalObservations
}
// ensure it implements the required interface
@ -182,6 +184,9 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved)
}
// determine the set of thresholds whose stats have been updated since the last sync
thresholds = thresholdsUpdatedStats(thresholds, observations, m.lastObservations)
// track when a threshold was first observed
thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now)
@ -203,6 +208,7 @@ func (m *managerImpl) synchronize(diskInfoProvider DiskInfoProvider, podFunc Act
m.thresholdsFirstObservedAt = thresholdsFirstObservedAt
m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt
m.thresholdsMet = thresholds
m.lastObservations = observations
m.Unlock()
// determine the set of resources under starvation

View File

@ -600,6 +600,7 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider) (signalObserv
result[SignalMemoryAvailable] = signalObservation{
available: resource.NewQuantity(int64(*memory.AvailableBytes), resource.BinarySI),
capacity: resource.NewQuantity(int64(*memory.AvailableBytes+*memory.WorkingSetBytes), resource.BinarySI),
time: memory.Time,
}
}
if nodeFs := summary.Node.Fs; nodeFs != nil {
@ -607,12 +608,14 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider) (signalObserv
result[SignalNodeFsAvailable] = signalObservation{
available: resource.NewQuantity(int64(*nodeFs.AvailableBytes), resource.BinarySI),
capacity: resource.NewQuantity(int64(*nodeFs.CapacityBytes), resource.BinarySI),
// TODO: add timestamp to stat (see memory stat)
}
}
if nodeFs.InodesFree != nil && nodeFs.Inodes != nil {
result[SignalNodeFsInodesFree] = signalObservation{
available: resource.NewQuantity(int64(*nodeFs.InodesFree), resource.BinarySI),
capacity: resource.NewQuantity(int64(*nodeFs.Inodes), resource.BinarySI),
// TODO: add timestamp to stat (see memory stat)
}
}
}
@ -622,11 +625,13 @@ func makeSignalObservations(summaryProvider stats.SummaryProvider) (signalObserv
result[SignalImageFsAvailable] = signalObservation{
available: resource.NewQuantity(int64(*imageFs.AvailableBytes), resource.BinarySI),
capacity: resource.NewQuantity(int64(*imageFs.CapacityBytes), resource.BinarySI),
// TODO: add timestamp to stat (see memory stat)
}
if imageFs.InodesFree != nil && imageFs.Inodes != nil {
result[SignalImageFsInodesFree] = signalObservation{
available: resource.NewQuantity(int64(*imageFs.InodesFree), resource.BinarySI),
capacity: resource.NewQuantity(int64(*imageFs.Inodes), resource.BinarySI),
// TODO: add timestamp to stat (see memory stat)
}
}
}
@ -664,6 +669,23 @@ func thresholdsMet(thresholds []Threshold, observations signalObservations, enfo
return results
}
func thresholdsUpdatedStats(thresholds []Threshold, observations, lastObservations signalObservations) []Threshold {
results := []Threshold{}
for i := range thresholds {
threshold := thresholds[i]
observed, found := observations[threshold.Signal]
if !found {
glog.Warningf("eviction manager: no observation found for eviction signal %v", threshold.Signal)
continue
}
last, found := lastObservations[threshold.Signal]
if !found || observed.time.IsZero() || observed.time.After(last.time.Time) {
results = append(results, threshold)
}
}
return results
}
// getThresholdQuantity returns the expected quantity value for a thresholdValue
func getThresholdQuantity(value ThresholdValue, capacity *resource.Quantity) *resource.Quantity {
if value.Quantity != nil {

View File

@ -829,6 +829,96 @@ func TestThresholdsMet(t *testing.T) {
}
}
func TestThresholdsUpdatedStats(t *testing.T) {
updatedThreshold := Threshold{
Signal: SignalMemoryAvailable,
}
locationUTC, err := time.LoadLocation("UTC")
if err != nil {
t.Error(err)
return
}
testCases := map[string]struct {
thresholds []Threshold
observations signalObservations
last signalObservations
result []Threshold
}{
"empty": {
thresholds: []Threshold{},
observations: signalObservations{},
last: signalObservations{},
result: []Threshold{},
},
"no-time": {
thresholds: []Threshold{updatedThreshold},
observations: signalObservations{
SignalMemoryAvailable: signalObservation{},
},
last: signalObservations{},
result: []Threshold{updatedThreshold},
},
"no-last-observation": {
thresholds: []Threshold{updatedThreshold},
observations: signalObservations{
SignalMemoryAvailable: signalObservation{
time: unversioned.Date(2016, 1, 1, 0, 0, 0, 0, locationUTC),
},
},
last: signalObservations{},
result: []Threshold{updatedThreshold},
},
"time-machine": {
thresholds: []Threshold{updatedThreshold},
observations: signalObservations{
SignalMemoryAvailable: signalObservation{
time: unversioned.Date(2016, 1, 1, 0, 0, 0, 0, locationUTC),
},
},
last: signalObservations{
SignalMemoryAvailable: signalObservation{
time: unversioned.Date(2016, 1, 1, 0, 1, 0, 0, locationUTC),
},
},
result: []Threshold{},
},
"same-observation": {
thresholds: []Threshold{updatedThreshold},
observations: signalObservations{
SignalMemoryAvailable: signalObservation{
time: unversioned.Date(2016, 1, 1, 0, 0, 0, 0, locationUTC),
},
},
last: signalObservations{
SignalMemoryAvailable: signalObservation{
time: unversioned.Date(2016, 1, 1, 0, 0, 0, 0, locationUTC),
},
},
result: []Threshold{},
},
"new-observation": {
thresholds: []Threshold{updatedThreshold},
observations: signalObservations{
SignalMemoryAvailable: signalObservation{
time: unversioned.Date(2016, 1, 1, 0, 1, 0, 0, locationUTC),
},
},
last: signalObservations{
SignalMemoryAvailable: signalObservation{
time: unversioned.Date(2016, 1, 1, 0, 0, 0, 0, locationUTC),
},
},
result: []Threshold{updatedThreshold},
},
}
for testName, testCase := range testCases {
actual := thresholdsUpdatedStats(testCase.thresholds, testCase.observations, testCase.last)
if !thresholdList(actual).Equal(thresholdList(testCase.result)) {
t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual)
}
}
}
func TestPercentageThresholdsMet(t *testing.T) {
specifiecThresholds := []Threshold{
{

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/kubernetes/pkg/api/unversioned"
statsapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats"
)
@ -145,6 +146,8 @@ type signalObservation struct {
capacity *resource.Quantity
// The available resource
available *resource.Quantity
// Time at which the observation was taken
time unversioned.Time
}
// signalObservations maps a signal to an observed quantity