diff --git a/pkg/api/types.go b/pkg/api/types.go index 911ad13ad71..2d0e72caca9 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -1889,6 +1889,8 @@ const ( // NodeOutOfDisk means the kubelet will not accept new pods due to insufficient free disk // space on the node. NodeOutOfDisk NodeConditionType = "OutOfDisk" + // NodeMemoryPressure means the kubelet is under pressure due to insufficient available memory. + NodeMemoryPressure NodeConditionType = "MemoryPressure" ) type NodeCondition struct { diff --git a/pkg/api/v1/types.go b/pkg/api/v1/types.go index b1fa56f0578..7d9e6002fa6 100644 --- a/pkg/api/v1/types.go +++ b/pkg/api/v1/types.go @@ -2273,6 +2273,8 @@ const ( // NodeOutOfDisk means the kubelet will not accept new pods due to insufficient free disk // space on the node. NodeOutOfDisk NodeConditionType = "OutOfDisk" + // NodeMemoryPressure means the kubelet is under pressure due to insufficient available memory. + NodeMemoryPressure NodeConditionType = "MemoryPressure" ) // NodeCondition contains condition infromation for a node. diff --git a/pkg/kubelet/eviction/helpers.go b/pkg/kubelet/eviction/helpers.go index 4b689c967b6..e7077be1a16 100644 --- a/pkg/kubelet/eviction/helpers.go +++ b/pkg/kubelet/eviction/helpers.go @@ -18,18 +18,41 @@ package eviction import ( "fmt" + "sort" "strings" "time" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" + statsapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" + qosutil "k8s.io/kubernetes/pkg/kubelet/qos/util" + "k8s.io/kubernetes/pkg/kubelet/server/stats" + "k8s.io/kubernetes/pkg/quota/evaluator/core" "k8s.io/kubernetes/pkg/util/sets" ) const ( unsupportedEvictionSignal = "unsupported eviction signal %v" + // the reason reported back in status. + reason = "Evicted" + // the message associated with the reason. + message = "The node was low on compute resources." + // disk, in bytes. internal to this module, used to account for local disk usage. + resourceDisk api.ResourceName = "disk" ) +// resourceToRankFunc maps a resource to ranking function for that resource. +var resourceToRankFunc = map[api.ResourceName]rankFunc{ + api.ResourceMemory: rankMemoryPressure, +} + +// signalToNodeCondition maps a signal to the node condition to report if threshold is met. +var signalToNodeCondition = map[Signal]api.NodeConditionType{ + SignalMemoryAvailable: api.NodeMemoryPressure, +} + // signalToResource maps a Signal to its associated Resource. var signalToResource = map[Signal]api.ResourceName{ SignalMemoryAvailable: api.ResourceMemory, @@ -162,3 +185,373 @@ func parseGracePeriods(expr string) (map[Signal]time.Duration, error) { } return results, nil } + +// diskUsage converts used bytes into a resource quantity. +func diskUsage(fsStats *statsapi.FsStats) *resource.Quantity { + if fsStats == nil || fsStats.UsedBytes == nil { + return &resource.Quantity{Format: resource.BinarySI} + } + usage := int64(*fsStats.UsedBytes) + return resource.NewQuantity(usage, resource.BinarySI) +} + +// memoryUsage converts working set into a resource quantity. +func memoryUsage(memStats *statsapi.MemoryStats) *resource.Quantity { + if memStats == nil || memStats.WorkingSetBytes == nil { + return &resource.Quantity{Format: resource.BinarySI} + } + usage := int64(*memStats.WorkingSetBytes) + return resource.NewQuantity(usage, resource.BinarySI) +} + +// podUsage aggregates usage of compute resources. +// it supports the following memory and disk. +func podUsage(podStats statsapi.PodStats) (api.ResourceList, error) { + disk := resource.Quantity{Format: resource.BinarySI} + memory := resource.Quantity{Format: resource.BinarySI} + for _, container := range podStats.Containers { + // disk usage (if known) + // TODO: need to handle volumes + for _, fsStats := range []*statsapi.FsStats{container.Rootfs, container.Logs} { + if err := disk.Add(*diskUsage(fsStats)); err != nil { + return nil, err + } + } + // memory usage (if known) + if err := memory.Add(*memoryUsage(container.Memory)); err != nil { + return nil, err + } + } + return api.ResourceList{ + api.ResourceMemory: memory, + resourceDisk: disk, + }, nil +} + +// formatThreshold formats a threshold for logging. +func formatThreshold(threshold Threshold) string { + return fmt.Sprintf("threshold(signal=%v, operator=%v, value=%v, gracePeriod=%v)", threshold.Signal, threshold.Value.String(), threshold.Operator, threshold.GracePeriod) +} + +// cachedStatsFunc returns a statsFunc based on the provided pod stats. +func cachedStatsFunc(podStats []statsapi.PodStats) statsFunc { + uid2PodStats := map[string]statsapi.PodStats{} + for i := range podStats { + uid2PodStats[podStats[i].PodRef.UID] = podStats[i] + } + return func(pod *api.Pod) (statsapi.PodStats, bool) { + stats, found := uid2PodStats[string(pod.UID)] + return stats, found + } +} + +// Cmp compares p1 and p2 and returns: +// +// -1 if p1 < p2 +// 0 if p1 == p2 +// +1 if p1 > p2 +// +type cmpFunc func(p1, p2 *api.Pod) int + +// multiSorter implements the Sort interface, sorting changes within. +type multiSorter struct { + pods []*api.Pod + cmp []cmpFunc +} + +// Sort sorts the argument slice according to the less functions passed to OrderedBy. +func (ms *multiSorter) Sort(pods []*api.Pod) { + ms.pods = pods + sort.Sort(ms) +} + +// OrderedBy returns a Sorter that sorts using the cmp functions, in order. +// Call its Sort method to sort the data. +func orderedBy(cmp ...cmpFunc) *multiSorter { + return &multiSorter{ + cmp: cmp, + } +} + +// Len is part of sort.Interface. +func (ms *multiSorter) Len() int { + return len(ms.pods) +} + +// Swap is part of sort.Interface. +func (ms *multiSorter) Swap(i, j int) { + ms.pods[i], ms.pods[j] = ms.pods[j], ms.pods[i] +} + +// Less is part of sort.Interface. +func (ms *multiSorter) Less(i, j int) bool { + p1, p2 := ms.pods[i], ms.pods[j] + var k int + for k = 0; k < len(ms.cmp)-1; k++ { + cmpResult := ms.cmp[k](p1, p2) + // p1 is less than p2 + if cmpResult < 0 { + return true + } + // p1 is greater than p2 + if cmpResult > 0 { + return false + } + // we don't know yet + } + // the last cmp func is the final decider + return ms.cmp[k](p1, p2) < 0 +} + +// qos compares pods by QoS (BestEffort < Burstable < Guaranteed) +func qos(p1, p2 *api.Pod) int { + qosP1 := qosutil.GetPodQos(p1) + qosP2 := qosutil.GetPodQos(p2) + // its a tie + if qosP1 == qosP2 { + return 0 + } + // if p1 is best effort, we know p2 is burstable or guaranteed + if qosP1 == qosutil.BestEffort { + return -1 + } + // we know p1 and p2 are not besteffort, so if p1 is burstable, p2 must be guaranteed + if qosP1 == qosutil.Burstable { + if qosP2 == qosutil.Guaranteed { + return -1 + } + return 1 + } + // ok, p1 must be guaranteed. + return 1 +} + +// memory compares pods by largest consumer of memory relative to request. +func memory(stats statsFunc) cmpFunc { + return func(p1, p2 *api.Pod) int { + p1Stats, found := stats(p1) + // if we have no usage stats for p1, we want p2 first + if !found { + return -1 + } + // if we have no usage stats for p2, but p1 has usage, we want p1 first. + p2Stats, found := stats(p2) + if !found { + return 1 + } + // if we cant get usage for p1 measured, we want p2 first + p1Usage, err := podUsage(p1Stats) + if err != nil { + return -1 + } + // if we cant get usage for p2 measured, we want p1 first + p2Usage, err := podUsage(p2Stats) + if err != nil { + return 1 + } + + // adjust p1, p2 usage relative to the request (if any) + p1Memory := p1Usage[api.ResourceMemory] + p1Spec := core.PodUsageFunc(p1) + p1Request := p1Spec[api.ResourceRequestsMemory] + p1Memory.Sub(p1Request) + + p2Memory := p2Usage[api.ResourceMemory] + p2Spec := core.PodUsageFunc(p2) + p2Request := p2Spec[api.ResourceRequestsMemory] + p2Memory.Sub(p2Request) + + // if p2 is using more than p1, we want p2 first + return p2Memory.Cmp(p1Memory) + } +} + +// disk compares pods by largest consumer of disk relative to request. +func disk(stats statsFunc) cmpFunc { + return func(p1, p2 *api.Pod) int { + p1Stats, found := stats(p1) + // if we have no usage stats for p1, we want p2 first + if !found { + return -1 + } + // if we have no usage stats for p2, but p1 has usage, we want p1 first. + p2Stats, found := stats(p2) + if !found { + return 1 + } + // if we cant get usage for p1 measured, we want p2 first + p1Usage, err := podUsage(p1Stats) + if err != nil { + return -1 + } + // if we cant get usage for p2 measured, we want p1 first + p2Usage, err := podUsage(p2Stats) + if err != nil { + return 1 + } + + // disk is best effort, so we don't measure relative to a request. + // TODO: add disk as a guaranteed resource + p1Disk := p1Usage[api.ResourceStorage] + p2Disk := p2Usage[api.ResourceStorage] + // if p2 is using more than p1, we want p2 first + return p2Disk.Cmp(p1Disk) + } +} + +// rankMemoryPressure orders the input pods for eviction in response to memory pressure. +func rankMemoryPressure(pods []*api.Pod, stats statsFunc) { + orderedBy(qos, memory(stats)).Sort(pods) +} + +// rankDiskPressure orders the input pods for eviction in response to disk pressure. +func rankDiskPressure(pods []*api.Pod, stats statsFunc) { + orderedBy(qos, disk(stats)).Sort(pods) +} + +// byEvictionPriority implements sort.Interface for []api.ResourceName. +type byEvictionPriority []api.ResourceName + +func (a byEvictionPriority) Len() int { return len(a) } +func (a byEvictionPriority) Swap(i, j int) { a[i], a[j] = a[j], a[i] } + +// Less ranks memory before all other resources. +func (a byEvictionPriority) Less(i, j int) bool { + return a[i] == api.ResourceMemory +} + +// makeSignalObservations derives observations using the specified summary provider. +func makeSignalObservations(summaryProvider stats.SummaryProvider) (signalObservations, statsFunc, error) { + summary, err := summaryProvider.Get() + if err != nil { + return nil, nil, err + } + // build the function to work against for pod stats + statsFunc := cachedStatsFunc(summary.Pods) + // build an evaluation context for current eviction signals + result := signalObservations{} + result[SignalMemoryAvailable] = *resource.NewQuantity(int64(*summary.Node.Memory.AvailableBytes), resource.BinarySI) + return result, statsFunc, nil +} + +// thresholdsMet returns the set of thresholds that were met independent of grace period +func thresholdsMet(thresholds []Threshold, observations signalObservations) []Threshold { + results := []Threshold{} + for i := range thresholds { + threshold := thresholds[i] + observed, found := observations[threshold.Signal] + if !found { + glog.Warningf("eviction manager: no observation found for eviction signal %v", threshold.Signal) + continue + } + // determine if we have met the specified threshold + thresholdMet := false + thresholdResult := threshold.Value.Cmp(observed) + switch threshold.Operator { + case OpLessThan: + thresholdMet = thresholdResult > 0 + } + if thresholdMet { + results = append(results, threshold) + } + } + return results +} + +// thresholdsFirstObservedAt merges the input set of thresholds with the previous observation to determine when active set of thresholds were initially met. +func thresholdsFirstObservedAt(thresholds []Threshold, lastObservedAt thresholdsObservedAt, now time.Time) thresholdsObservedAt { + results := thresholdsObservedAt{} + for i := range thresholds { + observedAt, found := lastObservedAt[thresholds[i]] + if !found { + observedAt = now + } + results[thresholds[i]] = observedAt + } + return results +} + +// thresholdsMetGracePeriod returns the set of thresholds that have satisfied associated grace period +func thresholdsMetGracePeriod(observedAt thresholdsObservedAt, now time.Time) []Threshold { + results := []Threshold{} + for threshold, at := range observedAt { + duration := now.Sub(at) + if duration < threshold.GracePeriod { + glog.V(2).Infof("eviction manager: eviction criteria not yet met for %v, duration: %v", formatThreshold(threshold), duration) + continue + } + results = append(results, threshold) + } + return results +} + +// nodeConditions returns the set of node conditions associated with a threshold +func nodeConditions(thresholds []Threshold) []api.NodeConditionType { + results := []api.NodeConditionType{} + for _, threshold := range thresholds { + if nodeCondition, found := signalToNodeCondition[threshold.Signal]; found { + results = append(results, nodeCondition) + } + } + return results +} + +// nodeConditionsLastObservedAt merges the input with the previous observation to determine when a condition was most recently met. +func nodeConditionsLastObservedAt(nodeConditions []api.NodeConditionType, lastObservedAt nodeConditionsObservedAt, now time.Time) nodeConditionsObservedAt { + results := nodeConditionsObservedAt{} + // the input conditions were observed "now" + for i := range nodeConditions { + results[nodeConditions[i]] = now + } + // the conditions that were not observed now are merged in with their old time + for key, value := range lastObservedAt { + _, found := results[key] + if !found { + results[key] = value + } + } + return results +} + +// nodeConditionsObservedSince returns the set of conditions that have been observed within the specified period +func nodeConditionsObservedSince(observedAt nodeConditionsObservedAt, period time.Duration, now time.Time) []api.NodeConditionType { + results := []api.NodeConditionType{} + for nodeCondition, at := range observedAt { + duration := now.Sub(at) + if duration < period { + results = append(results, nodeCondition) + } + } + return results +} + +// hasNodeCondition returns true if the node condition is in the input list +func hasNodeCondition(inputs []api.NodeConditionType, item api.NodeConditionType) bool { + for _, input := range inputs { + if input == item { + return true + } + } + return false +} + +// hasThreshold returns true if the node condition is in the input list +func hasThreshold(inputs []Threshold, item Threshold) bool { + for _, input := range inputs { + if input.GracePeriod == item.GracePeriod && input.Operator == item.Operator && input.Signal == item.Signal && input.Value.Cmp(item.Value) == 0 { + return true + } + } + return false +} + +// reclaimResources returns the set of resources that are starved based on thresholds met. +func reclaimResources(thresholds []Threshold) []api.ResourceName { + results := []api.ResourceName{} + for _, threshold := range thresholds { + if starvedResource, found := signalToResource[threshold.Signal]; found { + results = append(results, starvedResource) + } + } + return results +} diff --git a/pkg/kubelet/eviction/helpers_test.go b/pkg/kubelet/eviction/helpers_test.go index ad0983a2195..b908b734652 100644 --- a/pkg/kubelet/eviction/helpers_test.go +++ b/pkg/kubelet/eviction/helpers_test.go @@ -17,10 +17,17 @@ limitations under the License. package eviction import ( + "fmt" + "reflect" "testing" "time" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/api/unversioned" + statsapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" + "k8s.io/kubernetes/pkg/quota" + "k8s.io/kubernetes/pkg/types" ) func TestParseThresholdConfig(t *testing.T) { @@ -140,3 +147,574 @@ func thresholdEqual(a Threshold, b Threshold) bool { a.Signal == b.Signal && a.Value.Cmp(b.Value) == 0 } + +// TestOrderedByQoS ensures we order BestEffort < Burstable < Guaranteed +func TestOrderedByQoS(t *testing.T) { + bestEffort := newPod("best-effort", []api.Container{ + newContainer("best-effort", newResourceList("", ""), newResourceList("", "")), + }) + burstable := newPod("burstable", []api.Container{ + newContainer("burstable", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi")), + }) + guaranteed := newPod("guaranteed", []api.Container{ + newContainer("guaranteed", newResourceList("200m", "200Mi"), newResourceList("200m", "200Mi")), + }) + + pods := []*api.Pod{guaranteed, burstable, bestEffort} + orderedBy(qos).Sort(pods) + + expected := []*api.Pod{bestEffort, burstable, guaranteed} + for i := range expected { + if pods[i] != expected[i] { + t.Errorf("Expected pod: %s, but got: %s", expected[i].Name, pods[i].Name) + } + } +} + +// TestOrderedByMemory ensures we order pods by greediest memory consumer relative to request. +func TestOrderedByMemory(t *testing.T) { + pod1 := newPod("best-effort-high", []api.Container{ + newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")), + }) + pod2 := newPod("best-effort-low", []api.Container{ + newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")), + }) + pod3 := newPod("burstable-high", []api.Container{ + newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), + }) + pod4 := newPod("burstable-low", []api.Container{ + newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), + }) + pod5 := newPod("guaranteed-high", []api.Container{ + newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), + }) + pod6 := newPod("guaranteed-low", []api.Container{ + newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), + }) + stats := map[*api.Pod]statsapi.PodStats{ + pod1: newPodMemoryStats(pod1, resource.MustParse("500Mi")), // 500 relative to request + pod2: newPodMemoryStats(pod2, resource.MustParse("300Mi")), // 300 relative to request + pod3: newPodMemoryStats(pod3, resource.MustParse("800Mi")), // 700 relative to request + pod4: newPodMemoryStats(pod4, resource.MustParse("300Mi")), // 200 relative to request + pod5: newPodMemoryStats(pod5, resource.MustParse("800Mi")), // -200 relative to request + pod6: newPodMemoryStats(pod6, resource.MustParse("200Mi")), // -800 relative to request + } + statsFn := func(pod *api.Pod) (statsapi.PodStats, bool) { + result, found := stats[pod] + return result, found + } + pods := []*api.Pod{pod1, pod2, pod3, pod4, pod5, pod6} + orderedBy(memory(statsFn)).Sort(pods) + expected := []*api.Pod{pod3, pod1, pod2, pod4, pod5, pod6} + for i := range expected { + if pods[i] != expected[i] { + t.Errorf("Expected pod[%d]: %s, but got: %s", i, expected[i].Name, pods[i].Name) + } + } +} + +// TestOrderedByQoSMemory ensures we order by qos and then memory consumption relative to request. +func TestOrderedByQoSMemory(t *testing.T) { + pod1 := newPod("best-effort-high", []api.Container{ + newContainer("best-effort-high", newResourceList("", ""), newResourceList("", "")), + }) + pod2 := newPod("best-effort-low", []api.Container{ + newContainer("best-effort-low", newResourceList("", ""), newResourceList("", "")), + }) + pod3 := newPod("burstable-high", []api.Container{ + newContainer("burstable-high", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), + }) + pod4 := newPod("burstable-low", []api.Container{ + newContainer("burstable-low", newResourceList("100m", "100Mi"), newResourceList("200m", "1Gi")), + }) + pod5 := newPod("guaranteed-high", []api.Container{ + newContainer("guaranteed-high", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), + }) + pod6 := newPod("guaranteed-low", []api.Container{ + newContainer("guaranteed-low", newResourceList("100m", "1Gi"), newResourceList("100m", "1Gi")), + }) + stats := map[*api.Pod]statsapi.PodStats{ + pod1: newPodMemoryStats(pod1, resource.MustParse("500Mi")), // 500 relative to request + pod2: newPodMemoryStats(pod2, resource.MustParse("50Mi")), // 50 relative to request + pod3: newPodMemoryStats(pod3, resource.MustParse("50Mi")), // -50 relative to request + pod4: newPodMemoryStats(pod4, resource.MustParse("300Mi")), // 200 relative to request + pod5: newPodMemoryStats(pod5, resource.MustParse("800Mi")), // -200 relative to request + pod6: newPodMemoryStats(pod6, resource.MustParse("200Mi")), // -800 relative to request + } + statsFn := func(pod *api.Pod) (statsapi.PodStats, bool) { + result, found := stats[pod] + return result, found + } + pods := []*api.Pod{pod1, pod2, pod3, pod4, pod5, pod6} + expected := []*api.Pod{pod1, pod2, pod4, pod3, pod5, pod6} + orderedBy(qos, memory(statsFn)).Sort(pods) + for i := range expected { + if pods[i] != expected[i] { + t.Errorf("Expected pod[%d]: %s, but got: %s", i, expected[i].Name, pods[i].Name) + } + } +} + +type fakeSummaryProvider struct { + result *statsapi.Summary +} + +func (f *fakeSummaryProvider) Get() (*statsapi.Summary, error) { + return f.result, nil +} + +// newPodStats returns a pod stat where each container is using the specified working set +// each pod must have a Name, UID, Namespace +func newPodStats(pod *api.Pod, containerWorkingSetBytes int64) statsapi.PodStats { + result := statsapi.PodStats{ + PodRef: statsapi.PodReference{ + Name: pod.Name, + Namespace: pod.Namespace, + UID: string(pod.UID), + }, + } + val := uint64(containerWorkingSetBytes) + for range pod.Spec.Containers { + result.Containers = append(result.Containers, statsapi.ContainerStats{ + Memory: &statsapi.MemoryStats{ + WorkingSetBytes: &val, + }, + }) + } + return result +} + +func TestMakeSignalObservations(t *testing.T) { + podMaker := func(name, namespace, uid string, numContainers int) *api.Pod { + pod := &api.Pod{} + pod.Name = name + pod.Namespace = namespace + pod.UID = types.UID(uid) + pod.Spec = api.PodSpec{} + for i := 0; i < numContainers; i++ { + pod.Spec.Containers = append(pod.Spec.Containers, api.Container{ + Name: fmt.Sprintf("ctr%v", i), + }) + } + return pod + } + nodeAvailableBytes := uint64(1024 * 1024 * 1024) + fakeStats := &statsapi.Summary{ + Node: statsapi.NodeStats{ + Memory: &statsapi.MemoryStats{ + AvailableBytes: &nodeAvailableBytes, + }, + }, + Pods: []statsapi.PodStats{}, + } + provider := &fakeSummaryProvider{ + result: fakeStats, + } + pods := []*api.Pod{ + podMaker("pod1", "ns1", "uuid1", 1), + podMaker("pod1", "ns2", "uuid2", 1), + podMaker("pod3", "ns3", "uuid3", 1), + } + containerWorkingSetBytes := int64(1024 * 1024) + for _, pod := range pods { + fakeStats.Pods = append(fakeStats.Pods, newPodStats(pod, containerWorkingSetBytes)) + } + actualObservations, statsFunc, err := makeSignalObservations(provider) + if err != nil { + t.Errorf("Unexpected err: %v", err) + } + quantity, found := actualObservations[SignalMemoryAvailable] + if !found { + t.Errorf("Expected available memory observation: %v", err) + } + if expectedBytes := int64(nodeAvailableBytes); quantity.Value() != expectedBytes { + t.Errorf("Expected %v, actual: %v", expectedBytes, quantity.Value()) + } + for _, pod := range pods { + podStats, found := statsFunc(pod) + if !found { + t.Errorf("Pod stats were not found for pod %v", pod.UID) + } + for _, container := range podStats.Containers { + actual := int64(*container.Memory.WorkingSetBytes) + if containerWorkingSetBytes != actual { + t.Errorf("Container working set expected %v, actual: %v", containerWorkingSetBytes, actual) + } + } + } +} + +func TestThresholdsMet(t *testing.T) { + hardThreshold := Threshold{ + Signal: SignalMemoryAvailable, + Operator: OpLessThan, + Value: resource.MustParse("1Gi"), + } + testCases := map[string]struct { + thresholds []Threshold + observations signalObservations + result []Threshold + }{ + "empty": { + thresholds: []Threshold{}, + observations: signalObservations{}, + result: []Threshold{}, + }, + "threshold-met": { + thresholds: []Threshold{hardThreshold}, + observations: signalObservations{ + SignalMemoryAvailable: resource.MustParse("500Mi"), + }, + result: []Threshold{hardThreshold}, + }, + "threshold-not-met": { + thresholds: []Threshold{hardThreshold}, + observations: signalObservations{ + SignalMemoryAvailable: resource.MustParse("2Gi"), + }, + result: []Threshold{}, + }, + } + for testName, testCase := range testCases { + actual := thresholdsMet(testCase.thresholds, testCase.observations) + if !thresholdList(actual).Equal(thresholdList(testCase.result)) { + t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual) + } + } +} + +func TestThresholdsFirstObservedAt(t *testing.T) { + hardThreshold := Threshold{ + Signal: SignalMemoryAvailable, + Operator: OpLessThan, + Value: resource.MustParse("1Gi"), + } + now := unversioned.Now() + oldTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute)) + testCases := map[string]struct { + thresholds []Threshold + lastObservedAt thresholdsObservedAt + now time.Time + result thresholdsObservedAt + }{ + "empty": { + thresholds: []Threshold{}, + lastObservedAt: thresholdsObservedAt{}, + now: now.Time, + result: thresholdsObservedAt{}, + }, + "no-previous-observation": { + thresholds: []Threshold{hardThreshold}, + lastObservedAt: thresholdsObservedAt{}, + now: now.Time, + result: thresholdsObservedAt{ + hardThreshold: now.Time, + }, + }, + "previous-observation": { + thresholds: []Threshold{hardThreshold}, + lastObservedAt: thresholdsObservedAt{ + hardThreshold: oldTime.Time, + }, + now: now.Time, + result: thresholdsObservedAt{ + hardThreshold: oldTime.Time, + }, + }, + } + for testName, testCase := range testCases { + actual := thresholdsFirstObservedAt(testCase.thresholds, testCase.lastObservedAt, testCase.now) + if !reflect.DeepEqual(actual, testCase.result) { + t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual) + } + } +} + +func TestThresholdsMetGracePeriod(t *testing.T) { + now := unversioned.Now() + hardThreshold := Threshold{ + Signal: SignalMemoryAvailable, + Operator: OpLessThan, + Value: resource.MustParse("1Gi"), + } + softThreshold := Threshold{ + Signal: SignalMemoryAvailable, + Operator: OpLessThan, + Value: resource.MustParse("2Gi"), + GracePeriod: 1 * time.Minute, + } + oldTime := unversioned.NewTime(now.Time.Add(-2 * time.Minute)) + testCases := map[string]struct { + observedAt thresholdsObservedAt + now time.Time + result []Threshold + }{ + "empty": { + observedAt: thresholdsObservedAt{}, + now: now.Time, + result: []Threshold{}, + }, + "hard-threshold-met": { + observedAt: thresholdsObservedAt{ + hardThreshold: now.Time, + }, + now: now.Time, + result: []Threshold{hardThreshold}, + }, + "soft-threshold-not-met": { + observedAt: thresholdsObservedAt{ + softThreshold: now.Time, + }, + now: now.Time, + result: []Threshold{}, + }, + "soft-threshold-met": { + observedAt: thresholdsObservedAt{ + softThreshold: oldTime.Time, + }, + now: now.Time, + result: []Threshold{softThreshold}, + }, + } + for testName, testCase := range testCases { + actual := thresholdsMetGracePeriod(testCase.observedAt, now.Time) + if !thresholdList(actual).Equal(thresholdList(testCase.result)) { + t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual) + } + } +} + +func TestNodeConditions(t *testing.T) { + testCases := map[string]struct { + inputs []Threshold + result []api.NodeConditionType + }{ + "empty-list": { + inputs: []Threshold{}, + result: []api.NodeConditionType{}, + }, + "memory.available": { + inputs: []Threshold{ + {Signal: SignalMemoryAvailable}, + }, + result: []api.NodeConditionType{api.NodeMemoryPressure}, + }, + } + for testName, testCase := range testCases { + actual := nodeConditions(testCase.inputs) + if !nodeConditionList(actual).Equal(nodeConditionList(testCase.result)) { + t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual) + } + } +} + +func TestNodeConditionsLastObservedAt(t *testing.T) { + now := unversioned.Now() + oldTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute)) + testCases := map[string]struct { + nodeConditions []api.NodeConditionType + lastObservedAt nodeConditionsObservedAt + now time.Time + result nodeConditionsObservedAt + }{ + "no-previous-observation": { + nodeConditions: []api.NodeConditionType{api.NodeMemoryPressure}, + lastObservedAt: nodeConditionsObservedAt{}, + now: now.Time, + result: nodeConditionsObservedAt{ + api.NodeMemoryPressure: now.Time, + }, + }, + "previous-observation": { + nodeConditions: []api.NodeConditionType{api.NodeMemoryPressure}, + lastObservedAt: nodeConditionsObservedAt{ + api.NodeMemoryPressure: oldTime.Time, + }, + now: now.Time, + result: nodeConditionsObservedAt{ + api.NodeMemoryPressure: now.Time, + }, + }, + "old-observation": { + nodeConditions: []api.NodeConditionType{}, + lastObservedAt: nodeConditionsObservedAt{ + api.NodeMemoryPressure: oldTime.Time, + }, + now: now.Time, + result: nodeConditionsObservedAt{ + api.NodeMemoryPressure: oldTime.Time, + }, + }, + } + for testName, testCase := range testCases { + actual := nodeConditionsLastObservedAt(testCase.nodeConditions, testCase.lastObservedAt, testCase.now) + if !reflect.DeepEqual(actual, testCase.result) { + t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual) + } + } +} + +func TestNodeConditionsObservedSince(t *testing.T) { + now := unversioned.Now() + observedTime := unversioned.NewTime(now.Time.Add(-1 * time.Minute)) + testCases := map[string]struct { + observedAt nodeConditionsObservedAt + period time.Duration + now time.Time + result []api.NodeConditionType + }{ + "in-period": { + observedAt: nodeConditionsObservedAt{ + api.NodeMemoryPressure: observedTime.Time, + }, + period: 2 * time.Minute, + now: now.Time, + result: []api.NodeConditionType{api.NodeMemoryPressure}, + }, + "out-of-period": { + observedAt: nodeConditionsObservedAt{ + api.NodeMemoryPressure: observedTime.Time, + }, + period: 30 * time.Second, + now: now.Time, + result: []api.NodeConditionType{}, + }, + } + for testName, testCase := range testCases { + actual := nodeConditionsObservedSince(testCase.observedAt, testCase.period, testCase.now) + if !nodeConditionList(actual).Equal(nodeConditionList(testCase.result)) { + t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual) + } + } +} + +func TestHasNodeConditions(t *testing.T) { + testCases := map[string]struct { + inputs []api.NodeConditionType + item api.NodeConditionType + result bool + }{ + "has-condition": { + inputs: []api.NodeConditionType{api.NodeReady, api.NodeOutOfDisk, api.NodeMemoryPressure}, + item: api.NodeMemoryPressure, + result: true, + }, + "does-not-have-condition": { + inputs: []api.NodeConditionType{api.NodeReady, api.NodeOutOfDisk}, + item: api.NodeMemoryPressure, + result: false, + }, + } + for testName, testCase := range testCases { + if actual := hasNodeCondition(testCase.inputs, testCase.item); actual != testCase.result { + t.Errorf("Test case: %s, expected: %v, actual: %v", testName, testCase.result, actual) + } + } +} + +func TestReclaimResources(t *testing.T) { + testCases := map[string]struct { + inputs []Threshold + result []api.ResourceName + }{ + "memory.available": { + inputs: []Threshold{ + {Signal: SignalMemoryAvailable}, + }, + result: []api.ResourceName{api.ResourceMemory}, + }, + } + for testName, testCase := range testCases { + actual := reclaimResources(testCase.inputs) + actualSet := quota.ToSet(actual) + expectedSet := quota.ToSet(testCase.result) + if !actualSet.Equal(expectedSet) { + t.Errorf("Test case: %s, expected: %v, actual: %v", testName, expectedSet, actualSet) + } + } +} + +func newPodMemoryStats(pod *api.Pod, workingSet resource.Quantity) statsapi.PodStats { + result := statsapi.PodStats{ + PodRef: statsapi.PodReference{ + Name: pod.Name, Namespace: pod.Namespace, UID: string(pod.UID), + }, + } + for range pod.Spec.Containers { + workingSetBytes := uint64(workingSet.Value()) + result.Containers = append(result.Containers, statsapi.ContainerStats{ + Memory: &statsapi.MemoryStats{ + WorkingSetBytes: &workingSetBytes, + }, + }) + } + return result +} + +func newResourceList(cpu, memory string) api.ResourceList { + res := api.ResourceList{} + if cpu != "" { + res[api.ResourceCPU] = resource.MustParse(cpu) + } + if memory != "" { + res[api.ResourceMemory] = resource.MustParse(memory) + } + return res +} + +func newResourceRequirements(requests, limits api.ResourceList) api.ResourceRequirements { + res := api.ResourceRequirements{} + res.Requests = requests + res.Limits = limits + return res +} + +func newContainer(name string, requests api.ResourceList, limits api.ResourceList) api.Container { + return api.Container{ + Name: name, + Resources: newResourceRequirements(requests, limits), + } +} + +func newPod(name string, containers []api.Container) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: name, + }, + Spec: api.PodSpec{ + Containers: containers, + }, + } +} + +// nodeConditionList is a simple alias to support equality checking independent of order +type nodeConditionList []api.NodeConditionType + +// Equal adds the ability to check equality between two lists of node conditions. +func (s1 nodeConditionList) Equal(s2 nodeConditionList) bool { + if len(s1) != len(s2) { + return false + } + for _, item := range s1 { + if !hasNodeCondition(s2, item) { + return false + } + } + return true +} + +// thresholdList is a simple alias to support equality checking independent of order +type thresholdList []Threshold + +// Equal adds the ability to check equality between two lists of node conditions. +func (s1 thresholdList) Equal(s2 thresholdList) bool { + if len(s1) != len(s2) { + return false + } + for _, item := range s1 { + if !hasThreshold(s2, item) { + return false + } + } + return true +} diff --git a/pkg/kubelet/eviction/manager.go b/pkg/kubelet/eviction/manager.go new file mode 100644 index 00000000000..8a6afbc3cfb --- /dev/null +++ b/pkg/kubelet/eviction/manager.go @@ -0,0 +1,215 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eviction + +import ( + "sort" + "sync" + "time" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/record" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + qosutil "k8s.io/kubernetes/pkg/kubelet/qos/util" + "k8s.io/kubernetes/pkg/kubelet/server/stats" + "k8s.io/kubernetes/pkg/kubelet/util/format" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" +) + +// managerImpl implements NodeStabilityManager +type managerImpl struct { + // used to track time + clock util.Clock + // config is how the manager is configured + config Config + // the function to invoke to kill a pod + killPodFunc KillPodFunc + // protects access to internal state + sync.RWMutex + // node conditions are the set of conditions present + nodeConditions []api.NodeConditionType + // captures when a node condition was last observed based on a threshold being met + nodeConditionsLastObservedAt nodeConditionsObservedAt + // nodeRef is a reference to the node + nodeRef *api.ObjectReference + // used to record events about the node + recorder record.EventRecorder + // used to measure usage stats on system + summaryProvider stats.SummaryProvider + // records when a threshold was first observed + thresholdsFirstObservedAt thresholdsObservedAt +} + +// ensure it implements the required interface +var _ Manager = &managerImpl{} + +// NewManager returns a configured Manager and an associated admission handler to enforce eviction configuration. +func NewManager( + summaryProvider stats.SummaryProvider, + config Config, + killPodFunc KillPodFunc, + recorder record.EventRecorder, + nodeRef *api.ObjectReference, + clock util.Clock) (Manager, lifecycle.PodAdmitHandler, error) { + manager := &managerImpl{ + clock: clock, + killPodFunc: killPodFunc, + config: config, + recorder: recorder, + summaryProvider: summaryProvider, + nodeRef: nodeRef, + nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, + thresholdsFirstObservedAt: thresholdsObservedAt{}, + } + return manager, manager, nil +} + +// Admit rejects a pod if its not safe to admit for node stability. +func (m *managerImpl) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { + m.RLock() + defer m.RUnlock() + if len(m.nodeConditions) == 0 { + return lifecycle.PodAdmitResult{Admit: true} + } + notBestEffort := qosutil.BestEffort != qosutil.GetPodQos(attrs.Pod) + if notBestEffort { + return lifecycle.PodAdmitResult{Admit: true} + } + glog.Warningf("Failed to admit pod %v - %s", format.Pod(attrs.Pod), "node has conditions: %v", m.nodeConditions) + // we reject all best effort pods until we are stable. + return lifecycle.PodAdmitResult{ + Admit: false, + Reason: reason, + Message: message, + } +} + +// Start starts the control loop to observe and response to low compute resources. +func (m *managerImpl) Start(podFunc ActivePodsFunc, monitoringInterval time.Duration) { + go wait.Until(func() { m.synchronize(podFunc) }, monitoringInterval, wait.NeverStop) +} + +// IsUnderMemoryPressure returns true if the node is under memory pressure. +func (m *managerImpl) IsUnderMemoryPressure() bool { + m.RLock() + defer m.RUnlock() + return hasNodeCondition(m.nodeConditions, api.NodeMemoryPressure) +} + +// synchronize is the main control loop that enforces eviction thresholds. +func (m *managerImpl) synchronize(podFunc ActivePodsFunc) { + // if we have nothing to do, just return + thresholds := m.config.Thresholds + if len(thresholds) == 0 { + return + } + + // make observations and get a function to derive pod usage stats relative to those observations. + observations, statsFunc, err := makeSignalObservations(m.summaryProvider) + if err != nil { + glog.Errorf("eviction manager: unexpected err: %v", err) + return + } + + // find the list of thresholds that are met independent of grace period + now := m.clock.Now() + + // determine the set of thresholds met independent of grace period + thresholds = thresholdsMet(thresholds, observations) + + // track when a threshold was first observed + thresholdsFirstObservedAt := thresholdsFirstObservedAt(thresholds, m.thresholdsFirstObservedAt, now) + + // the set of node conditions that are triggered by currently observed thresholds + nodeConditions := nodeConditions(thresholds) + + // track when a node condition was last observed + nodeConditionsLastObservedAt := nodeConditionsLastObservedAt(nodeConditions, m.nodeConditionsLastObservedAt, now) + + // node conditions report true if it has been observed within the transition period window + nodeConditions = nodeConditionsObservedSince(nodeConditionsLastObservedAt, m.config.PressureTransitionPeriod, now) + + // determine the set of thresholds we need to drive eviction behavior (i.e. all grace periods are met) + thresholds = thresholdsMetGracePeriod(thresholdsFirstObservedAt, now) + + // update internal state + m.Lock() + m.nodeConditions = nodeConditions + m.thresholdsFirstObservedAt = thresholdsFirstObservedAt + m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt + m.Unlock() + + // determine the set of resources under starvation + starvedResources := reclaimResources(thresholds) + if len(starvedResources) == 0 { + glog.Infof("eviction manager: no resources are starved") + return + } + + // rank the resources to reclaim by eviction priority + sort.Sort(byEvictionPriority(starvedResources)) + resourceToReclaim := starvedResources[0] + glog.Warningf("eviction manager: attempting to reclaim %v", resourceToReclaim) + + // record an event about the resources we are now attempting to reclaim via eviction + m.recorder.Eventf(m.nodeRef, api.EventTypeWarning, "EvictionThresholdMet", "Attempting to reclaim %s", resourceToReclaim) + + // rank the pods for eviction + rank, ok := resourceToRankFunc[resourceToReclaim] + if !ok { + glog.Errorf("eviction manager: no ranking function for resource %s", resourceToReclaim) + return + } + + // the only candidates viable for eviction are those pods that had anything running. + activePods := podFunc() + if len(activePods) == 0 { + glog.Errorf("eviction manager: eviction thresholds have been met, but no pods are active to evict") + return + } + + // rank the running pods for eviction for the specified resource + rank(activePods, statsFunc) + + glog.Infof("eviction manager: pods ranked for eviction: %s", format.Pods(activePods)) + + // we kill at most a single pod during each eviction interval + for i := range activePods { + pod := activePods[i] + status := api.PodStatus{ + Phase: api.PodFailed, + Message: message, + Reason: reason, + } + // record that we are evicting the pod + m.recorder.Eventf(pod, api.EventTypeWarning, reason, message) + // TODO this needs to be based on soft or hard eviction threshold being met, soft eviction will allow a configured value. + gracePeriodOverride := int64(0) + // this is a blocking call and should only return when the pod and its containers are killed. + err := m.killPodFunc(pod, status, &gracePeriodOverride) + if err != nil { + glog.Infof("eviction manager: pod %s failed to evict %v", format.Pod(pod), err) + continue + } + // success, so we return until the next housekeeping interval + glog.Infof("eviction manager: pod %s evicted successfully", format.Pod(pod)) + return + } + glog.Infof("eviction manager: unable to evict any pods from the node") +} diff --git a/pkg/kubelet/eviction/manager_test.go b/pkg/kubelet/eviction/manager_test.go new file mode 100644 index 00000000000..0d5fa77291e --- /dev/null +++ b/pkg/kubelet/eviction/manager_test.go @@ -0,0 +1,212 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package eviction + +import ( + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/client/record" + statsapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util" +) + +// mockPodKiller is used to testing which pod is killed +type mockPodKiller struct { + pod *api.Pod + status api.PodStatus + gracePeriodOverride *int64 +} + +// killPodNow records the pod that was killed +func (m *mockPodKiller) killPodNow(pod *api.Pod, status api.PodStatus, gracePeriodOverride *int64) error { + m.pod = pod + m.status = status + m.gracePeriodOverride = gracePeriodOverride + return nil +} + +// TestMemoryPressure +func TestMemoryPressure(t *testing.T) { + podMaker := func(name string, requests api.ResourceList, limits api.ResourceList, memoryWorkingSet string) (*api.Pod, statsapi.PodStats) { + pod := newPod(name, []api.Container{ + newContainer(name, requests, api.ResourceList{}), + }) + podStats := newPodMemoryStats(pod, resource.MustParse(memoryWorkingSet)) + return pod, podStats + } + summaryStatsMaker := func(nodeAvailableBytes string, podStats map[*api.Pod]statsapi.PodStats) *statsapi.Summary { + val := resource.MustParse(nodeAvailableBytes) + availableBytes := uint64(val.Value()) + result := &statsapi.Summary{ + Node: statsapi.NodeStats{ + Memory: &statsapi.MemoryStats{ + AvailableBytes: &availableBytes, + }, + }, + Pods: []statsapi.PodStats{}, + } + for _, podStat := range podStats { + result.Pods = append(result.Pods, podStat) + } + return result + } + podsToMake := []struct { + name string + requests api.ResourceList + limits api.ResourceList + memoryWorkingSet string + }{ + {name: "best-effort-high", requests: newResourceList("", ""), limits: newResourceList("", ""), memoryWorkingSet: "500Mi"}, + {name: "best-effort-low", requests: newResourceList("", ""), limits: newResourceList("", ""), memoryWorkingSet: "300Mi"}, + {name: "burstable-high", requests: newResourceList("100m", "100Mi"), limits: newResourceList("200m", "1Gi"), memoryWorkingSet: "800Mi"}, + {name: "burstable-low", requests: newResourceList("100m", "100Mi"), limits: newResourceList("200m", "1Gi"), memoryWorkingSet: "300Mi"}, + {name: "guaranteed-high", requests: newResourceList("100m", "1Gi"), limits: newResourceList("100m", "1Gi"), memoryWorkingSet: "800Mi"}, + {name: "guaranteed-low", requests: newResourceList("100m", "1Gi"), limits: newResourceList("100m", "1Gi"), memoryWorkingSet: "200Mi"}, + } + pods := []*api.Pod{} + podStats := map[*api.Pod]statsapi.PodStats{} + for _, podToMake := range podsToMake { + pod, podStat := podMaker(podToMake.name, podToMake.requests, podToMake.limits, podToMake.memoryWorkingSet) + pods = append(pods, pod) + podStats[pod] = podStat + } + activePodsFunc := func() []*api.Pod { + return pods + } + + fakeClock := util.NewFakeClock(time.Now()) + podKiller := &mockPodKiller{} + nodeRef := &api.ObjectReference{Kind: "Node", Name: "test", UID: types.UID("test"), Namespace: ""} + + config := Config{ + PressureTransitionPeriod: time.Minute * 5, + Thresholds: []Threshold{ + { + Signal: SignalMemoryAvailable, + Operator: OpLessThan, + Value: resource.MustParse("1Gi"), + }, + }, + } + summaryProvider := &fakeSummaryProvider{result: summaryStatsMaker("2Gi", podStats)} + manager := &managerImpl{ + clock: fakeClock, + killPodFunc: podKiller.killPodNow, + config: config, + recorder: &record.FakeRecorder{}, + summaryProvider: summaryProvider, + nodeRef: nodeRef, + nodeConditionsLastObservedAt: nodeConditionsObservedAt{}, + thresholdsFirstObservedAt: thresholdsObservedAt{}, + } + + // create a best effort pod to test admission + bestEffortPodToAdmit, _ := podMaker("best-admit", newResourceList("", ""), newResourceList("", ""), "0Gi") + burstablePodToAdmit, _ := podMaker("burst-admit", newResourceList("100m", "100Mi"), newResourceList("200m", "200Mi"), "0Gi") + + // synchronize + manager.synchronize(activePodsFunc) + + // we should not have memory pressure + if manager.IsUnderMemoryPressure() { + t.Errorf("Manager should not report memory pressure") + } + + // try to admit our pods (they should succeed) + expected := []bool{true, true} + for i, pod := range []*api.Pod{bestEffortPodToAdmit, burstablePodToAdmit} { + if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: pod}); expected[i] != result.Admit { + t.Errorf("Admit pod: %v, expected: %v, actual: %v", pod, expected[i], result.Admit) + } + } + + // induce memory pressure! + fakeClock.Step(1 * time.Minute) + summaryProvider.result = summaryStatsMaker("500Mi", podStats) + manager.synchronize(activePodsFunc) + + // we should have memory pressure + if !manager.IsUnderMemoryPressure() { + t.Errorf("Manager should report memory pressure") + } + + // check the right pod was killed + if podKiller.pod != pods[0] { + t.Errorf("Manager chose to kill pod: %v, but should have chosen %v", podKiller.pod, pods[0]) + } + + // the best-effort pod should not admit, burstable should + expected = []bool{false, true} + for i, pod := range []*api.Pod{bestEffortPodToAdmit, burstablePodToAdmit} { + if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: pod}); expected[i] != result.Admit { + t.Errorf("Admit pod: %v, expected: %v, actual: %v", pod, expected[i], result.Admit) + } + } + + // reduce memory pressure + fakeClock.Step(1 * time.Minute) + summaryProvider.result = summaryStatsMaker("2Gi", podStats) + podKiller.pod = nil // reset state + manager.synchronize(activePodsFunc) + + // we should have memory pressure (because transition period not yet met) + if !manager.IsUnderMemoryPressure() { + t.Errorf("Manager should report memory pressure") + } + + // no pod should have been killed + if podKiller.pod != nil { + t.Errorf("Manager chose to kill pod: %v when no pod should have been killed", podKiller.pod) + } + + // the best-effort pod should not admit, burstable should + expected = []bool{false, true} + for i, pod := range []*api.Pod{bestEffortPodToAdmit, burstablePodToAdmit} { + if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: pod}); expected[i] != result.Admit { + t.Errorf("Admit pod: %v, expected: %v, actual: %v", pod, expected[i], result.Admit) + } + } + + // move the clock past transition period to ensure that we stop reporting pressure + fakeClock.Step(5 * time.Minute) + summaryProvider.result = summaryStatsMaker("2Gi", podStats) + podKiller.pod = nil // reset state + manager.synchronize(activePodsFunc) + + // we should not have memory pressure (because transition period met) + if manager.IsUnderMemoryPressure() { + t.Errorf("Manager should not report memory pressure") + } + + // no pod should have been killed + if podKiller.pod != nil { + t.Errorf("Manager chose to kill pod: %v when no pod should have been killed", podKiller.pod) + } + + // all pods should admit now + expected = []bool{true, true} + for i, pod := range []*api.Pod{bestEffortPodToAdmit, burstablePodToAdmit} { + if result := manager.Admit(&lifecycle.PodAdmitAttributes{Pod: pod}); expected[i] != result.Admit { + t.Errorf("Admit pod: %v, expected: %v, actual: %v", pod, expected[i], result.Admit) + } + } +} diff --git a/pkg/kubelet/eviction/types.go b/pkg/kubelet/eviction/types.go index 9794cbf54a1..a39a30394f9 100644 --- a/pkg/kubelet/eviction/types.go +++ b/pkg/kubelet/eviction/types.go @@ -21,6 +21,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" + statsapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/stats" ) // Signal defines a signal that can trigger eviction of pods on a node. @@ -59,6 +60,15 @@ type Threshold struct { GracePeriod time.Duration } +// Manager evaluates when an eviction threshold for node stability has been met on the node. +type Manager interface { + // Start starts the control loop to monitor eviction thresholds at specified interval. + Start(podFunc ActivePodsFunc, monitoringInterval time.Duration) + + // IsUnderMemoryPressure returns true if the node is under memory pressure. + IsUnderMemoryPressure() bool +} + // KillPodFunc kills a pod. // The pod status is updated, and then it is killed with the specified grace period. // This function must block until either the pod is killed or an error is encountered. @@ -67,3 +77,21 @@ type Threshold struct { // status - the desired status to associate with the pod (i.e. why its killed) // gracePeriodOverride - the grace period override to use instead of what is on the pod spec type KillPodFunc func(pod *api.Pod, status api.PodStatus, gracePeriodOverride *int64) error + +// ActivePodsFunc returns pods bound to the kubelet that are active (i.e. non-terminal state) +type ActivePodsFunc func() []*api.Pod + +// statsFunc returns the usage stats if known for an input pod. +type statsFunc func(pod *api.Pod) (statsapi.PodStats, bool) + +// rankFunc sorts the pods in eviction order +type rankFunc func(pods []*api.Pod, stats statsFunc) + +// signalObservations maps a signal to an observed quantity +type signalObservations map[Signal]resource.Quantity + +// thresholdsObservedAt maps a threshold to a time that it was observed +type thresholdsObservedAt map[Threshold]time.Time + +// nodeConditionsObservedAt maps a node condition to a time that it was observed +type nodeConditionsObservedAt map[api.NodeConditionType]time.Time diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 82e49538cb6..a850126e61d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -118,6 +118,10 @@ const ( // Period for performing global cleanup tasks. housekeepingPeriod = time.Second * 2 + // Period for performing eviction monitoring. + // TODO ensure this is in sync with internal cadvisor housekeeping. + evictionMonitoringPeriod = time.Second * 10 + // The path in containers' filesystems where the hosts file is mounted. etcHostsPath = "/etc/hosts" @@ -496,6 +500,14 @@ func NewMainKubelet( klet.podKillingCh = make(chan *kubecontainer.PodPair, podKillingChannelCapacity) klet.setNodeStatusFuncs = klet.defaultNodeStatusFuncs() + // setup eviction manager + evictionManager, evictionAdmitHandler, err := eviction.NewManager(klet.resourceAnalyzer, evictionConfig, killPodNow(klet.podWorkers), recorder, nodeRef, klet.clock) + if err != nil { + return nil, fmt.Errorf("failed to initialize eviction manager: %v", err) + } + klet.evictionManager = evictionManager + klet.AddPodAdmitHandler(evictionAdmitHandler) + // apply functional Option's for _, opt := range kubeOptions { opt(klet) @@ -568,6 +580,9 @@ type Kubelet struct { // this Kubelet services. podManager kubepod.Manager + // Needed to observe and respond to situations that could impact node stability + evictionManager eviction.Manager + // Needed to report events for containers belonging to deleted/modified pods. // Tracks references for reporting events containerRefManager *kubecontainer.RefManager @@ -961,12 +976,20 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { // Start component sync loops. kl.statusManager.Start() kl.probeManager.Start() + kl.evictionManager.Start(kl.getActivePods, evictionMonitoringPeriod) // Start the pod lifecycle event generator. kl.pleg.Start() kl.syncLoop(updates, kl) } +// getActivePods returns non-terminal pods +func (kl *Kubelet) getActivePods() []*api.Pod { + allPods := kl.podManager.GetPods() + activePods := kl.filterOutTerminatedPods(allPods) + return activePods +} + // initialNodeStatus determines the initial node status, incorporating node // labels and information from the cloud provider. func (kl *Kubelet) initialNodeStatus() (*api.Node, error) { @@ -3152,6 +3175,64 @@ func (kl *Kubelet) setNodeReadyCondition(node *api.Node) { } } +// setNodeMemoryPressureCondition for the node. +// TODO: this needs to move somewhere centralized... +func (kl *Kubelet) setNodeMemoryPressureCondition(node *api.Node) { + currentTime := unversioned.NewTime(kl.clock.Now()) + var condition *api.NodeCondition + + // Check if NodeMemoryPressure condition already exists and if it does, just pick it up for update. + for i := range node.Status.Conditions { + if node.Status.Conditions[i].Type == api.NodeMemoryPressure { + condition = &node.Status.Conditions[i] + } + } + + newCondition := false + // If the NodeMemoryPressure condition doesn't exist, create one + if condition == nil { + condition = &api.NodeCondition{ + Type: api.NodeMemoryPressure, + Status: api.ConditionUnknown, + } + // cannot be appended to node.Status.Conditions here because it gets + // copied to the slice. So if we append to the slice here none of the + // updates we make below are reflected in the slice. + newCondition = true + } + + // Update the heartbeat time + condition.LastHeartbeatTime = currentTime + + // Note: The conditions below take care of the case when a new NodeMemoryPressure condition is + // created and as well as the case when the condition already exists. When a new condition + // is created its status is set to api.ConditionUnknown which matches either + // condition.Status != api.ConditionTrue or + // condition.Status != api.ConditionFalse in the conditions below depending on whether + // the kubelet is under memory pressure or not. + if kl.evictionManager.IsUnderMemoryPressure() { + if condition.Status != api.ConditionTrue { + condition.Status = api.ConditionTrue + condition.Reason = "KubeletHasInsufficientMemory" + condition.Message = "kubelet has insufficient memory available" + condition.LastTransitionTime = currentTime + kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasInsufficientMemory") + } + } else { + if condition.Status != api.ConditionFalse { + condition.Status = api.ConditionFalse + condition.Reason = "KubeletHasSufficientMemory" + condition.Message = "kubelet has sufficient memory available" + condition.LastTransitionTime = currentTime + kl.recordNodeStatusEvent(api.EventTypeNormal, "NodeHasSufficientMemory") + } + } + + if newCondition { + node.Status.Conditions = append(node.Status.Conditions, *condition) + } +} + // Set OODcondition for the node. func (kl *Kubelet) setNodeOODCondition(node *api.Node) { currentTime := unversioned.NewTime(kl.clock.Now()) @@ -3257,6 +3338,7 @@ func (kl *Kubelet) defaultNodeStatusFuncs() []func(*api.Node) error { kl.setNodeAddress, withoutError(kl.setNodeStatusInfo), withoutError(kl.setNodeOODCondition), + withoutError(kl.setNodeMemoryPressureCondition), withoutError(kl.setNodeReadyCondition), withoutError(kl.recordNodeSchedulableEvent), } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index afb1d3f5ba3..9955269eed0 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -47,6 +47,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" + "k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/network" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" @@ -55,6 +56,7 @@ import ( podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" probetest "k8s.io/kubernetes/pkg/kubelet/prober/testing" + "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/status" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/queue" @@ -202,6 +204,24 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.pleg = pleg.NewGenericPLEG(fakeRuntime, 100, time.Hour, nil, util.RealClock{}) kubelet.clock = fakeClock kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs() + + // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency + volumeStatsAggPeriod := time.Second * 10 + kubelet.resourceAnalyzer = stats.NewResourceAnalyzer(kubelet, volumeStatsAggPeriod, kubelet.containerRuntime) + nodeRef := &api.ObjectReference{ + Kind: "Node", + Name: kubelet.nodeName, + UID: types.UID(kubelet.nodeName), + Namespace: "", + } + // setup eviction manager + evictionManager, evictionAdmitHandler, err := eviction.NewManager(kubelet.resourceAnalyzer, eviction.Config{}, killPodNow(kubelet.podWorkers), fakeRecorder, nodeRef, kubelet.clock) + if err != nil { + t.Fatalf("failed to initialize eviction manager: %v", err) + } + kubelet.evictionManager = evictionManager + kubelet.AddPodAdmitHandler(evictionAdmitHandler) + return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil} } @@ -2369,6 +2389,14 @@ func TestUpdateNewNodeStatus(t *testing.T) { LastHeartbeatTime: unversioned.Time{}, LastTransitionTime: unversioned.Time{}, }, + { + Type: api.NodeMemoryPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: fmt.Sprintf("kubelet has sufficient memory available"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, { Type: api.NodeReady, Status: api.ConditionTrue, @@ -2548,6 +2576,14 @@ func TestUpdateExistingNodeStatus(t *testing.T) { LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, + { + Type: api.NodeMemoryPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: fmt.Sprintf("kubelet has sufficient memory available"), + LastHeartbeatTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, { Type: api.NodeReady, Status: api.ConditionTrue, @@ -2604,6 +2640,14 @@ func TestUpdateExistingNodeStatus(t *testing.T) { LastHeartbeatTime: unversioned.Time{}, // placeholder LastTransitionTime: unversioned.Time{}, // placeholder }, + { + Type: api.NodeMemoryPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: fmt.Sprintf("kubelet has sufficient memory available"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, { Type: api.NodeReady, Status: api.ConditionTrue, @@ -2886,6 +2930,14 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { LastHeartbeatTime: unversioned.Time{}, LastTransitionTime: unversioned.Time{}, }, + { + Type: api.NodeMemoryPressure, + Status: api.ConditionFalse, + Reason: "KubeletHasSufficientMemory", + Message: fmt.Sprintf("kubelet has sufficient memory available"), + LastHeartbeatTime: unversioned.Time{}, + LastTransitionTime: unversioned.Time{}, + }, {}, //placeholder }, NodeInfo: api.NodeSystemInfo{ @@ -2958,10 +3010,11 @@ func TestUpdateNodeStatusWithRuntimeStateError(t *testing.T) { } // Version skew workaround. See: https://github.com/kubernetes/kubernetes/issues/16961 - if updatedNode.Status.Conditions[len(updatedNode.Status.Conditions)-1].Type != api.NodeReady { + lastIndex := len(updatedNode.Status.Conditions) - 1 + if updatedNode.Status.Conditions[lastIndex].Type != api.NodeReady { t.Errorf("unexpected node condition order. NodeReady should be last.") } - expectedNode.Status.Conditions[1] = api.NodeCondition{ + expectedNode.Status.Conditions[lastIndex] = api.NodeCondition{ Type: api.NodeReady, Status: status, Reason: reason, diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index b4b11e957e7..31739030f16 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -31,11 +31,14 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" + "k8s.io/kubernetes/pkg/kubelet/eviction" "k8s.io/kubernetes/pkg/kubelet/network" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" kubepod "k8s.io/kubernetes/pkg/kubelet/pod" podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing" + "k8s.io/kubernetes/pkg/kubelet/server/stats" "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" utiltesting "k8s.io/kubernetes/pkg/util/testing" ) @@ -76,10 +79,30 @@ func TestRunOnce(t *testing.T) { reasonCache: NewReasonCache(), clock: util.RealClock{}, kubeClient: &fake.Clientset{}, + hostname: testKubeletHostname, + nodeName: testKubeletHostname, } kb.containerManager = cm.NewStubContainerManager() kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone) + // TODO: Factor out "StatsProvider" from Kubelet so we don't have a cyclic dependency + volumeStatsAggPeriod := time.Second * 10 + kb.resourceAnalyzer = stats.NewResourceAnalyzer(kb, volumeStatsAggPeriod, kb.containerRuntime) + nodeRef := &api.ObjectReference{ + Kind: "Node", + Name: kb.nodeName, + UID: types.UID(kb.nodeName), + Namespace: "", + } + fakeKillPodFunc := func(pod *api.Pod, podStatus api.PodStatus, gracePeriodOverride *int64) error { + return nil + } + evictionManager, evictionAdmitHandler, err := eviction.NewManager(kb.resourceAnalyzer, eviction.Config{}, fakeKillPodFunc, kb.recorder, nodeRef, kb.clock) + if err != nil { + t.Fatalf("failed to initialize eviction manager: %v", err) + } + kb.evictionManager = evictionManager + kb.AddPodAdmitHandler(evictionAdmitHandler) if err := kb.setupDataDirs(); err != nil { t.Errorf("Failed to init data dirs: %v", err) }