mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-31 08:36:16 +00:00
Merge pull request #42609 from intelsdi-x/test-out-of-oir
Automatic merge from submit-queue (batch tested with PRs 41890, 42593, 42633, 42626, 42609) Pods pending due to insufficient OIR should get scheduled once sufficient OIR becomes available (e2e disabled). #41870 was reverted because it introduced an e2e test flake. This is the same code with the e2e for OIR disabled again. We can attempt to enable the e2e test cases one-by-one in follow-up PRs, but it would be preferable to get the main fix merged in time for 1.6 since OIR is broken on master (see #41861). cc @timothysc
This commit is contained in:
@@ -529,6 +529,14 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
|
||||
if node.Status.Allocatable == nil {
|
||||
node.Status.Allocatable = make(v1.ResourceList)
|
||||
}
|
||||
// Remove opaque integer resources from allocatable that are no longer
|
||||
// present in capacity.
|
||||
for k := range node.Status.Allocatable {
|
||||
_, found := node.Status.Capacity[k]
|
||||
if !found && v1.IsOpaqueIntResourceName(k) {
|
||||
delete(node.Status.Allocatable, k)
|
||||
}
|
||||
}
|
||||
allocatableReservation := kl.containerManager.GetNodeAllocatableReservation()
|
||||
for k, v := range node.Status.Capacity {
|
||||
value := *(v.Copy())
|
||||
|
@@ -468,6 +468,30 @@ func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *s
|
||||
return true, nil, nil
|
||||
}
|
||||
|
||||
// Returns a *schedulercache.Resource that covers the largest width in each
|
||||
// resource dimension. Because init-containers run sequentially, we collect the
|
||||
// max in each dimension iteratively. In contrast, we sum the resource vectors
|
||||
// for regular containers since they run simultaneously.
|
||||
//
|
||||
// Example:
|
||||
//
|
||||
// Pod:
|
||||
// InitContainers
|
||||
// IC1:
|
||||
// CPU: 2
|
||||
// Memory: 1G
|
||||
// IC2:
|
||||
// CPU: 2
|
||||
// Memory: 3G
|
||||
// Containers
|
||||
// C1:
|
||||
// CPU: 2
|
||||
// Memory: 1G
|
||||
// C2:
|
||||
// CPU: 1
|
||||
// Memory: 1G
|
||||
//
|
||||
// Result: CPU: 3, Memory: 3G
|
||||
func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource {
|
||||
result := schedulercache.Resource{}
|
||||
for _, container := range pod.Spec.Containers {
|
||||
@@ -505,10 +529,8 @@ func GetResourceRequest(pod *v1.Pod) *schedulercache.Resource {
|
||||
default:
|
||||
if v1.IsOpaqueIntResourceName(rName) {
|
||||
value := rQuantity.Value()
|
||||
// Ensure the opaque resource map is initialized in the result.
|
||||
result.AddOpaque(rName, int64(0))
|
||||
if value > result.OpaqueIntResources[rName] {
|
||||
result.OpaqueIntResources[rName] = value
|
||||
result.SetOpaque(rName, value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -83,11 +83,15 @@ func (r *Resource) ResourceList() v1.ResourceList {
|
||||
}
|
||||
|
||||
func (r *Resource) AddOpaque(name v1.ResourceName, quantity int64) {
|
||||
r.SetOpaque(name, r.OpaqueIntResources[name]+quantity)
|
||||
}
|
||||
|
||||
func (r *Resource) SetOpaque(name v1.ResourceName, quantity int64) {
|
||||
// Lazily allocate opaque integer resource map.
|
||||
if r.OpaqueIntResources == nil {
|
||||
r.OpaqueIntResources = map[v1.ResourceName]int64{}
|
||||
}
|
||||
r.OpaqueIntResources[name] += quantity
|
||||
r.OpaqueIntResources[name] = quantity
|
||||
}
|
||||
|
||||
// NewNodeInfo returns a ready to use empty NodeInfo object.
|
||||
@@ -333,7 +337,7 @@ func (n *NodeInfo) SetNode(node *v1.Node) error {
|
||||
n.allowedPodNumber = int(rQuant.Value())
|
||||
default:
|
||||
if v1.IsOpaqueIntResourceName(rName) {
|
||||
n.allocatableResource.AddOpaque(rName, rQuant.Value())
|
||||
n.allocatableResource.SetOpaque(rName, rQuant.Value())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -59,11 +59,19 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
|
||||
}
|
||||
}
|
||||
|
||||
removeOpaqueResource(f, node.Name, opaqueResName)
|
||||
addOpaqueResource(f, node.Name, opaqueResName)
|
||||
})
|
||||
|
||||
// TODO: The suite times out if removeOpaqueResource is called as part of
|
||||
// an AfterEach closure. For now, it is the last statement in each
|
||||
// It block.
|
||||
// AfterEach(func() {
|
||||
// removeOpaqueResource(f, node.Name, opaqueResName)
|
||||
// })
|
||||
|
||||
It("should not break pods that do not consume opaque integer resources.", func() {
|
||||
defer removeOpaqueResource(f, node.Name, opaqueResName)
|
||||
|
||||
By("Creating a vanilla pod")
|
||||
requests := v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.1")}
|
||||
limits := v1.ResourceList{v1.ResourceCPU: resource.MustParse("0.2")}
|
||||
@@ -74,19 +82,17 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
|
||||
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
|
||||
return err
|
||||
}
|
||||
predicate := func(e *v1.Event) bool {
|
||||
return e.Type == v1.EventTypeNormal &&
|
||||
e.Reason == "Scheduled" &&
|
||||
// Here we don't check for the bound node name since it can land on
|
||||
// any one (this pod doesn't require any of the opaque resource.)
|
||||
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v", pod.Name))
|
||||
}
|
||||
// Here we don't check for the bound node name since it can land on
|
||||
// any one (this pod doesn't require any of the opaque resource.)
|
||||
predicate := scheduleSuccess(pod.Name, "")
|
||||
success, err := observeEventAfterAction(f, predicate, action)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(success).To(Equal(true))
|
||||
})
|
||||
|
||||
It("should schedule pods that do consume opaque integer resources.", func() {
|
||||
defer removeOpaqueResource(f, node.Name, opaqueResName)
|
||||
|
||||
By("Creating a pod that requires less of the opaque resource than is allocatable on a node.")
|
||||
requests := v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("0.1"),
|
||||
@@ -103,17 +109,15 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
|
||||
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
|
||||
return err
|
||||
}
|
||||
predicate := func(e *v1.Event) bool {
|
||||
return e.Type == v1.EventTypeNormal &&
|
||||
e.Reason == "Scheduled" &&
|
||||
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name))
|
||||
}
|
||||
predicate := scheduleSuccess(pod.Name, node.Name)
|
||||
success, err := observeEventAfterAction(f, predicate, action)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(success).To(Equal(true))
|
||||
})
|
||||
|
||||
It("should not schedule pods that exceed the available amount of opaque integer resource.", func() {
|
||||
defer removeOpaqueResource(f, node.Name, opaqueResName)
|
||||
|
||||
By("Creating a pod that requires more of the opaque resource than is allocatable on any node")
|
||||
requests := v1.ResourceList{opaqueResName: resource.MustParse("6")}
|
||||
limits := v1.ResourceList{}
|
||||
@@ -123,17 +127,15 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
|
||||
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(newTestPod(f, "over-max-oir", requests, limits))
|
||||
return err
|
||||
}
|
||||
predicate := func(e *v1.Event) bool {
|
||||
return e.Type == "Warning" &&
|
||||
e.Reason == "FailedScheduling" &&
|
||||
strings.Contains(e.Message, "failed to fit in any node")
|
||||
}
|
||||
predicate := scheduleFailure("over-max-oir")
|
||||
success, err := observeEventAfterAction(f, predicate, action)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(success).To(Equal(true))
|
||||
})
|
||||
|
||||
It("should account opaque integer resources in pods with multiple containers.", func() {
|
||||
defer removeOpaqueResource(f, node.Name, opaqueResName)
|
||||
|
||||
By("Creating a pod with two containers that together require less of the opaque resource than is allocatable on a node")
|
||||
requests := v1.ResourceList{opaqueResName: resource.MustParse("1")}
|
||||
limits := v1.ResourceList{}
|
||||
@@ -170,11 +172,7 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
|
||||
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
|
||||
return err
|
||||
}
|
||||
predicate := func(e *v1.Event) bool {
|
||||
return e.Type == v1.EventTypeNormal &&
|
||||
e.Reason == "Scheduled" &&
|
||||
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", pod.Name, node.Name))
|
||||
}
|
||||
predicate := scheduleSuccess(pod.Name, node.Name)
|
||||
success, err := observeEventAfterAction(f, predicate, action)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(success).To(Equal(true))
|
||||
@@ -214,11 +212,53 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
|
||||
_, err = f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod)
|
||||
return err
|
||||
}
|
||||
predicate = func(e *v1.Event) bool {
|
||||
return e.Type == "Warning" &&
|
||||
e.Reason == "FailedScheduling" &&
|
||||
strings.Contains(e.Message, "failed to fit in any node")
|
||||
predicate = scheduleFailure(pod.Name)
|
||||
success, err = observeEventAfterAction(f, predicate, action)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(success).To(Equal(true))
|
||||
})
|
||||
|
||||
It("should schedule pods that initially do not fit after enough opaque integer resources are freed.", func() {
|
||||
defer removeOpaqueResource(f, node.Name, opaqueResName)
|
||||
|
||||
By("Creating a pod that requires less of the opaque resource than is allocatable on a node.")
|
||||
requests := v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("0.1"),
|
||||
opaqueResName: resource.MustParse("3"),
|
||||
}
|
||||
limits := v1.ResourceList{
|
||||
v1.ResourceCPU: resource.MustParse("0.2"),
|
||||
opaqueResName: resource.MustParse("3"),
|
||||
}
|
||||
pod1 := newTestPod(f, "oir-1", requests, limits)
|
||||
pod2 := newTestPod(f, "oir-2", requests, limits)
|
||||
|
||||
By("Observing an event that indicates one pod was scheduled")
|
||||
action := func() error {
|
||||
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod1)
|
||||
return err
|
||||
}
|
||||
predicate := scheduleSuccess(pod1.Name, node.Name)
|
||||
success, err := observeEventAfterAction(f, predicate, action)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(success).To(Equal(true))
|
||||
|
||||
By("Observing an event that indicates a subsequent pod was not scheduled")
|
||||
action = func() error {
|
||||
_, err := f.ClientSet.Core().Pods(f.Namespace.Name).Create(pod2)
|
||||
return err
|
||||
}
|
||||
predicate = scheduleFailure(pod2.Name)
|
||||
success, err = observeEventAfterAction(f, predicate, action)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(success).To(Equal(true))
|
||||
|
||||
By("Observing an event that indicates the second pod was scheduled after deleting the first pod")
|
||||
action = func() error {
|
||||
err := f.ClientSet.Core().Pods(f.Namespace.Name).Delete(pod1.Name, nil)
|
||||
return err
|
||||
}
|
||||
predicate = scheduleSuccess(pod2.Name, node.Name)
|
||||
success, err = observeEventAfterAction(f, predicate, action)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(success).To(Equal(true))
|
||||
@@ -228,12 +268,14 @@ var _ = framework.KubeDescribe("Opaque resources [Feature:OpaqueResources]", fun
|
||||
// Adds the opaque resource to a node.
|
||||
func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1.ResourceName) {
|
||||
action := func() error {
|
||||
By(fmt.Sprintf("Adding OIR to node [%s]", nodeName))
|
||||
patch := []byte(fmt.Sprintf(`[{"op": "add", "path": "/status/capacity/%s", "value": "5"}]`, escapeForJSONPatch(opaqueResName)))
|
||||
return f.ClientSet.Core().RESTClient().Patch(types.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do().Error()
|
||||
}
|
||||
predicate := func(n *v1.Node) bool {
|
||||
capacity, foundCap := n.Status.Capacity[opaqueResName]
|
||||
allocatable, foundAlloc := n.Status.Allocatable[opaqueResName]
|
||||
By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String()))
|
||||
return foundCap && capacity.MilliValue() == int64(5000) &&
|
||||
foundAlloc && allocatable.MilliValue() == int64(5000)
|
||||
}
|
||||
@@ -245,14 +287,16 @@ func addOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1
|
||||
// Removes the opaque resource from a node.
|
||||
func removeOpaqueResource(f *framework.Framework, nodeName string, opaqueResName v1.ResourceName) {
|
||||
action := func() error {
|
||||
By(fmt.Sprintf("Removing OIR from node [%s]", nodeName))
|
||||
patch := []byte(fmt.Sprintf(`[{"op": "remove", "path": "/status/capacity/%s"}]`, escapeForJSONPatch(opaqueResName)))
|
||||
f.ClientSet.Core().RESTClient().Patch(types.JSONPatchType).Resource("nodes").Name(nodeName).SubResource("status").Body(patch).Do()
|
||||
return nil // Ignore error -- the opaque resource may not exist.
|
||||
}
|
||||
predicate := func(n *v1.Node) bool {
|
||||
_, foundCap := n.Status.Capacity[opaqueResName]
|
||||
_, foundAlloc := n.Status.Allocatable[opaqueResName]
|
||||
return !foundCap && !foundAlloc
|
||||
capacity, foundCap := n.Status.Capacity[opaqueResName]
|
||||
allocatable, foundAlloc := n.Status.Allocatable[opaqueResName]
|
||||
By(fmt.Sprintf("Node [%s] has OIR capacity: [%t] (%s), has OIR allocatable: [%t] (%s)", n.Name, foundCap, capacity.String(), foundAlloc, allocatable.String()))
|
||||
return (!foundCap || capacity.IsZero()) && (!foundAlloc || allocatable.IsZero())
|
||||
}
|
||||
success, err := observeNodeUpdateAfterAction(f, nodeName, predicate, action)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
@@ -345,7 +389,7 @@ func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Eve
|
||||
cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
e, ok := obj.(*v1.Event)
|
||||
By(fmt.Sprintf("Considering event: \nType = [%s], Reason = [%s], Message = [%s]", e.Type, e.Reason, e.Message))
|
||||
By(fmt.Sprintf("Considering event: \nType = [%s], Name = [%s], Reason = [%s], Message = [%s]", e.Type, e.Name, e.Reason, e.Message))
|
||||
Expect(ok).To(Equal(true))
|
||||
if ok && eventPredicate(e) {
|
||||
observedMatchingEvent = true
|
||||
@@ -373,3 +417,20 @@ func observeEventAfterAction(f *framework.Framework, eventPredicate func(*v1.Eve
|
||||
})
|
||||
return err == nil, err
|
||||
}
|
||||
|
||||
func scheduleSuccess(podName, nodeName string) func(*v1.Event) bool {
|
||||
return func(e *v1.Event) bool {
|
||||
return e.Type == v1.EventTypeNormal &&
|
||||
e.Reason == "Scheduled" &&
|
||||
strings.HasPrefix(e.Name, podName) &&
|
||||
strings.Contains(e.Message, fmt.Sprintf("Successfully assigned %v to %v", podName, nodeName))
|
||||
}
|
||||
}
|
||||
|
||||
func scheduleFailure(podName string) func(*v1.Event) bool {
|
||||
return func(e *v1.Event) bool {
|
||||
return strings.HasPrefix(e.Name, podName) &&
|
||||
e.Type == "Warning" &&
|
||||
e.Reason == "FailedScheduling"
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user