diff --git a/pkg/kubelet/kubelet_node_status.go b/pkg/kubelet/kubelet_node_status.go index 791effef0fd..e07596f6e59 100644 --- a/pkg/kubelet/kubelet_node_status.go +++ b/pkg/kubelet/kubelet_node_status.go @@ -529,6 +529,14 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) { if node.Status.Allocatable == nil { node.Status.Allocatable = make(v1.ResourceList) } + // Remove opaque integer resources from allocatable that are no longer + // present in capacity. + for k := range node.Status.Allocatable { + _, found := node.Status.Capacity[k] + if !found && v1.IsOpaqueIntResourceName(k) { + delete(node.Status.Allocatable, k) + } + } allocatableReservation := kl.containerManager.GetNodeAllocatableReservation() for k, v := range node.Status.Capacity { value := *(v.Copy()) diff --git a/plugin/pkg/scheduler/algorithm/predicates/predicates.go b/plugin/pkg/scheduler/algorithm/predicates/predicates.go index 4f1387ca085..d255374d91a 100644 --- a/plugin/pkg/scheduler/algorithm/predicates/predicates.go +++ b/plugin/pkg/scheduler/algorithm/predicates/predicates.go @@ -468,6 +468,30 @@ func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *s return true, nil, nil } +// Returns a *schedulercache.Resource that covers the largest width in each +// resource dimension. Because init-containers run sequentially, we collect the +// max in each dimension iteratively. In contrast, we sum the resource vectors +// for regular containers since they run simultaneously. +// +// Example: +// +// Pod: +// InitContainers +// IC1: +// CPU: 2 +// Memory: 1G +// IC2: +// CPU: 2 +// Memory: 3G +// Containers +// C1: +// CPU: 2 +// Memory: 1G +// C2: +// CPU: 1 +// Memory: 1G +// +// Result: CPU: 3, Memory: 3G func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource { result := schedulercache.Resource{} for _, container := range pod.Spec.Containers { @@ -505,10 +529,8 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource { default: if v1.IsOpaqueIntResourceName(rName) { value := rQuantity.Value() - // Ensure the opaque resource map is initialized in the result. - result.AddOpaque(rName, int64(0)) if value > result.OpaqueIntResources[rName] { - result.OpaqueIntResources[rName] = value + result.SetOpaque(rName, value) } } } diff --git a/plugin/pkg/scheduler/schedulercache/node_info.go b/plugin/pkg/scheduler/schedulercache/node_info.go index 4fca801e419..3e3d417b66e 100644 --- a/plugin/pkg/scheduler/schedulercache/node_info.go +++ b/plugin/pkg/scheduler/schedulercache/node_info.go @@ -83,11 +83,15 @@ func (r *Resource) ResourceList() v1.ResourceList { } func (r *Resource) AddOpaque(name v1.ResourceName, quantity int64) { + r.SetOpaque(name, r.OpaqueIntResources[name]+quantity) +} + +func (r *Resource) SetOpaque(name v1.ResourceName, quantity int64) { // Lazily allocate opaque integer resource map. if r.OpaqueIntResources == nil { r.OpaqueIntResources = map[v1.ResourceName]int64{} } - r.OpaqueIntResources[name] += quantity + r.OpaqueIntResources[name] = quantity } // NewNodeInfo returns a ready to use empty NodeInfo object. @@ -333,7 +337,7 @@ func (n *NodeInfo) SetNode(node *v1.Node) error { n.allowedPodNumber = int(rQuant.Value()) default: if v1.IsOpaqueIntResourceName(rName) { - n.allocatableResource.AddOpaque(rName, rQuant.Value()) + n.allocatableResource.SetOpaque(rName, rQuant.Value()) } } } diff --git a/test/e2e/opaque_resource.go b/test/e2e/opaque_resource.go index 5bf734524f0..f4fd235b8db 100644 --- a/test/e2e/opaque_resource.go +++ b/test/e2e/opaque_resource.go @@ -38,7 +38,7 @@ import ( . "github.com/onsi/gomega" ) -var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", func() { +var _ = framework.KubeDescribe("Opaque resources", func() { f := framework.NewDefaultFramework("opaque-resource") opaqueResName := v1.OpaqueIntResourceName("foo") var node *v1.Node @@ -59,11 +59,19 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun } } - removeOpaqueResource(f, node.Name, opaqueResName) addOpaqueResource(f, node.Name, opaqueResName) }) + // TODO: The suite times out if removeOpaqueResource is called as part of + // an AfterEach closure. For now, it is the last statement in each + // It block. + // AfterEach(func() { + // removeOpaqueResource(f, node.Name, opaqueResName) + // }) + It("should not break pods that do not consume opaque integer resources.", func() { + defer removeOpaqueResource(f, node.Name, opaqueResName) + By("Creating a vanilla pod") requests := v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.1")} limits := v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.2")} @@ -74,19 +82,17 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun _, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) return err } - predicate := func(e *v1.Event) bool { - return e.Type == v1.EventTypeNormal && - e.Reason == "Scheduled" && - // Here we don't check for the bound node name since it can land on - // any one (this pod doesn't require any of the opaque resource.) - strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v", pod.Name)) - } + // Here we don't check for the bound node name since it can land on + // any one (this pod doesn't require any of the opaque resource.) + predicate := scheduleSuccess(pod.Name, "") success, err := observeEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) }) It("should schedule pods that do consume opaque integer resources.", func() { + defer removeOpaqueResource(f, node.Name, opaqueResName) + By("Creating a pod that requires less of the opaque resource than is allocatable on a node.") requests := v1.ResourceList{ v1.ResourceCPU: resource.MustParse("0.1"), @@ -103,17 +109,15 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun _, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) return err } - predicate := func(e *v1.Event) bool { - return e.Type == v1.EventTypeNormal && - e.Reason == "Scheduled" && - strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name)) - } + predicate := scheduleSuccess(pod.Name, node.Name) success, err := observeEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) }) It("should not schedule pods that exceed the available amount of opaque integer resource.", func() { + defer removeOpaqueResource(f, node.Name, opaqueResName) + By("Creating a pod that requires more of the opaque resource than is allocatable on any node") requests := v1.ResourceList{opaqueResName: resource.MustParse("6")} limits := v1.ResourceList{} @@ -123,17 +127,15 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun _, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(newTestPod(f, "over-max-oir", requests, limits)) return err } - predicate := func(e *v1.Event) bool { - return e.Type == "Warning" && - e.Reason == "FailedScheduling" && - strings.Contains(e.Message, "failed to fit in any node") - } + predicate := scheduleFailure("over-max-oir") success, err := observeEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) }) It("should account opaque integer resources in pods with multiple containers.", func() { + defer removeOpaqueResource(f, node.Name, opaqueResName) + By("Creating a pod with two containers that together require less of the opaque resource than is allocatable on a node") requests := v1.ResourceList{opaqueResName: resource.MustParse("1")} limits := v1.ResourceList{} @@ -170,11 +172,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun _, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) return err } - predicate := func(e *v1.Event) bool { - return e.Type == v1.EventTypeNormal && - e.Reason == "Scheduled" && - strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name)) - } + predicate := scheduleSuccess(pod.Name, node.Name) success, err := observeEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) @@ -214,11 +212,53 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun _, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod) return err } - predicate = func(e *v1.Event) bool { - return e.Type == "Warning" && - e.Reason == "FailedScheduling" && - strings.Contains(e.Message, "failed to fit in any node") + predicate = scheduleFailure(pod.Name) + success, err = observeEventAfterAction(f, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) + }) + + It("should schedule pods that initially do not fit after enough opaque integer resources are freed.", func() { + defer removeOpaqueResource(f, node.Name, opaqueResName) + + By("Creating a pod that requires less of the opaque resource than is allocatable on a node.") + requests := v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0.1"), + opaqueResName: resource.MustParse("3"), } + limits := v1.ResourceList{ + v1.ResourceCPU: resource.MustParse("0.2"), + opaqueResName: resource.MustParse("3"), + } + pod1 := newTestPod(f, "oir-1", requests, limits) + pod2 := newTestPod(f, "oir-2", requests, limits) + + By("Observing an event that indicates one pod was scheduled") + action := func() error { + _, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod1) + return err + } + predicate := scheduleSuccess(pod1.Name, node.Name) + success, err := observeEventAfterAction(f, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) + + By("Observing an event that indicates a subsequent pod was not scheduled") + action = func() error { + _, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod2) + return err + } + predicate = scheduleFailure(pod2.Name) + success, err = observeEventAfterAction(f, predicate, action) + Expect(err).NotTo(HaveOccurred()) + Expect(success).To(Equal(true)) + + By("Observing an event that indicates the second pod was scheduled after deleting the first pod") + action = func() error { + err := f.ClientSet.Core().Pods(f.Namespace.Name).Delete(pod1.Name, nil) + return err + } + predicate = scheduleSuccess(pod2.Name, node.Name) success, err = observeEventAfterAction(f, predicate, action) Expect(err).NotTo(HaveOccurred()) Expect(success).To(Equal(true)) @@ -228,12 +268,14 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun // Adds the opaque resource to a node. func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1.ResourceName) { action := func() error { + By(fmt.Sprintf("Adding OIR to node [%s]", nodeName)) patch := []byte(fmt.Sprintf(`[{"op": "add", "path": "/status/capacity/%s", "value": "5"}]`, escapeForJSONPatch(opaqueResName))) return f.ClientSet.Core().RESTClient().Patch(types.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do().Error() } predicate := func(n *v1.Node) bool { capacity, foundCap := n.Status.Capacity[opaqueResName] allocatable, foundAlloc := n.Status.Allocatable[opaqueResName] + By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String())) return foundCap && capacity.MilliValue() == int64(5000) && foundAlloc && allocatable.MilliValue() == int64(5000) } @@ -245,14 +287,16 @@ func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1 // Removes the opaque resource from a node. func removeOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1.ResourceName) { action := func() error { + By(fmt.Sprintf("Removing OIR from node [%s]", nodeName)) patch := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/status/capacity/%s"}]`, escapeForJSONPatch(opaqueResName))) f.ClientSet.Core().RESTClient().Patch(types.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do() return nil // Ignore error -- the opaque resource may not exist. } predicate := func(n *v1.Node) bool { - _, foundCap := n.Status.Capacity[opaqueResName] - _, foundAlloc := n.Status.Allocatable[opaqueResName] - return !foundCap && !foundAlloc + capacity, foundCap := n.Status.Capacity[opaqueResName] + allocatable, foundAlloc := n.Status.Allocatable[opaqueResName] + By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String())) + return (!foundCap || capacity.IsZero()) && (!foundAlloc || allocatable.IsZero()) } success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action) Expect(err).NotTo(HaveOccurred()) @@ -345,7 +389,7 @@ func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Eve cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { e, ok := obj.(*v1.Event) - By(fmt.Sprintf("Considering event: \nType = [%s], Reason = [%s], Message = [%s]", e.Type, e.Reason, e.Message)) + By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message)) Expect(ok).To(Equal(true)) if ok && eventPredicate(e) { observedMatchingEvent = true @@ -373,3 +417,20 @@ func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Eve }) return err == nil, err } + +func scheduleSuccess(podName, nodeName string) func(*v1.Event) bool { + return func(e *v1.Event) bool { + return e.Type == v1.EventTypeNormal && + e.Reason == "Scheduled" && + strings.HasPrefix(e.Name, podName) && + strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", podName, nodeName)) + } +} + +func scheduleFailure(podName string) func(*v1.Event) bool { + return func(e *v1.Event) bool { + return strings.HasPrefix(e.Name, podName) && + e.Type == "Warning" && + e.Reason == "FailedScheduling" + } +}