mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 12:07:47 +00:00
Merge pull request #128194 from AnishShah/extended-resource
test: refactor logic to add/remove extended resources
This commit is contained in:
commit
833ee8502e
@ -18,7 +18,6 @@ package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
@ -26,8 +25,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/kubernetes/test/e2e/feature"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
|
||||
@ -43,78 +40,6 @@ const (
|
||||
fakeExtendedResource = "dummy.com/dummy"
|
||||
)
|
||||
|
||||
func patchNode(ctx context.Context, client clientset.Interface, old *v1.Node, new *v1.Node) error {
|
||||
oldData, err := json.Marshal(old)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newData, err := json.Marshal(new)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create merge patch for node %q: %w", old.Name, err)
|
||||
}
|
||||
_, err = client.CoreV1().Nodes().Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
|
||||
return err
|
||||
}
|
||||
|
||||
func addExtendedResource(clientSet clientset.Interface, nodeName, extendedResourceName string, extendedResourceQuantity resource.Quantity) {
|
||||
extendedResource := v1.ResourceName(extendedResourceName)
|
||||
|
||||
ginkgo.By("Adding a custom resource")
|
||||
OriginalNode, err := clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
node := OriginalNode.DeepCopy()
|
||||
node.Status.Capacity[extendedResource] = extendedResourceQuantity
|
||||
node.Status.Allocatable[extendedResource] = extendedResourceQuantity
|
||||
err = patchNode(context.Background(), clientSet, OriginalNode.DeepCopy(), node)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
gomega.Eventually(func() error {
|
||||
node, err = clientSet.CoreV1().Nodes().Get(context.Background(), node.Name, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
fakeResourceCapacity, exists := node.Status.Capacity[extendedResource]
|
||||
if !exists {
|
||||
return fmt.Errorf("node %s has no %s resource capacity", node.Name, extendedResourceName)
|
||||
}
|
||||
if expectedResource := resource.MustParse("123"); fakeResourceCapacity.Cmp(expectedResource) != 0 {
|
||||
return fmt.Errorf("node %s has resource capacity %s, expected: %s", node.Name, fakeResourceCapacity.String(), expectedResource.String())
|
||||
}
|
||||
|
||||
return nil
|
||||
}).WithTimeout(30 * time.Second).WithPolling(time.Second).ShouldNot(gomega.HaveOccurred())
|
||||
}
|
||||
|
||||
func removeExtendedResource(clientSet clientset.Interface, nodeName, extendedResourceName string) {
|
||||
extendedResource := v1.ResourceName(extendedResourceName)
|
||||
|
||||
ginkgo.By("Removing a custom resource")
|
||||
originalNode, err := clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
node := originalNode.DeepCopy()
|
||||
delete(node.Status.Capacity, extendedResource)
|
||||
delete(node.Status.Allocatable, extendedResource)
|
||||
err = patchNode(context.Background(), clientSet, originalNode.DeepCopy(), node)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
gomega.Eventually(func() error {
|
||||
node, err = clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
if _, exists := node.Status.Capacity[extendedResource]; exists {
|
||||
return fmt.Errorf("node %s has resource capacity %s which is expected to be removed", node.Name, extendedResourceName)
|
||||
}
|
||||
|
||||
return nil
|
||||
}).WithTimeout(30 * time.Second).WithPolling(time.Second).ShouldNot(gomega.HaveOccurred())
|
||||
}
|
||||
|
||||
func doPodResizeTests(f *framework.Framework) {
|
||||
type testCase struct {
|
||||
name string
|
||||
@ -879,11 +804,11 @@ func doPodResizeTests(f *framework.Framework) {
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
for _, node := range nodes.Items {
|
||||
addExtendedResource(f.ClientSet, node.Name, fakeExtendedResource, resource.MustParse("123"))
|
||||
e2enode.AddExtendedResource(ctx, f.ClientSet, node.Name, fakeExtendedResource, resource.MustParse("123"))
|
||||
}
|
||||
defer func() {
|
||||
for _, node := range nodes.Items {
|
||||
removeExtendedResource(f.ClientSet, node.Name, fakeExtendedResource)
|
||||
e2enode.RemoveExtendedResource(ctx, f.ClientSet, node.Name, fakeExtendedResource)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -18,13 +18,16 @@ package node
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/onsi/ginkgo/v2"
|
||||
"github.com/onsi/gomega"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
|
||||
@ -175,3 +178,36 @@ func IsARM64(node *v1.Node) bool {
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// AddExtendedResource adds a fake resource to the Node.
|
||||
func AddExtendedResource(ctx context.Context, clientSet clientset.Interface, nodeName string, extendedResourceName v1.ResourceName, extendedResourceQuantity resource.Quantity) {
|
||||
extendedResource := v1.ResourceName(extendedResourceName)
|
||||
|
||||
ginkgo.By("Adding a custom resource")
|
||||
extendedResourceList := v1.ResourceList{
|
||||
extendedResource: extendedResourceQuantity,
|
||||
}
|
||||
patchPayload, err := json.Marshal(v1.Node{
|
||||
Status: v1.NodeStatus{
|
||||
Capacity: extendedResourceList,
|
||||
Allocatable: extendedResourceList,
|
||||
},
|
||||
})
|
||||
framework.ExpectNoError(err, "Failed to marshal node JSON")
|
||||
|
||||
_, err = clientSet.CoreV1().Nodes().Patch(ctx, nodeName, types.StrategicMergePatchType, []byte(patchPayload), metav1.PatchOptions{}, "status")
|
||||
framework.ExpectNoError(err)
|
||||
}
|
||||
|
||||
// RemoveExtendedResource removes a fake resource from the Node.
|
||||
func RemoveExtendedResource(ctx context.Context, clientSet clientset.Interface, nodeName string, extendedResourceName v1.ResourceName) {
|
||||
extendedResource := v1.ResourceName(extendedResourceName)
|
||||
|
||||
ginkgo.By("Removing a custom resource")
|
||||
node, err := clientSet.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
delete(node.Status.Capacity, extendedResource)
|
||||
delete(node.Status.Allocatable, extendedResource)
|
||||
_, err = clientSet.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
}
|
||||
|
@ -83,11 +83,7 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
|
||||
_ = cs.SchedulingV1().PriorityClasses().Delete(ctx, pair.name, *metav1.NewDeleteOptions(0))
|
||||
}
|
||||
for _, node := range nodeList.Items {
|
||||
nodeCopy := node.DeepCopy()
|
||||
delete(nodeCopy.Status.Capacity, testExtendedResource)
|
||||
delete(nodeCopy.Status.Allocatable, testExtendedResource)
|
||||
err := patchNode(ctx, cs, &node, nodeCopy)
|
||||
framework.ExpectNoError(err)
|
||||
e2enode.RemoveExtendedResource(ctx, cs, node.Name, testExtendedResource)
|
||||
}
|
||||
})
|
||||
|
||||
@ -134,11 +130,7 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
|
||||
// One of them has low priority, making it the victim for preemption.
|
||||
for i, node := range nodeList.Items {
|
||||
// Update each node to advertise 3 available extended resources
|
||||
nodeCopy := node.DeepCopy()
|
||||
nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("5")
|
||||
nodeCopy.Status.Allocatable[testExtendedResource] = resource.MustParse("5")
|
||||
err := patchNode(ctx, cs, &node, nodeCopy)
|
||||
framework.ExpectNoError(err)
|
||||
e2enode.AddExtendedResource(ctx, cs, node.Name, testExtendedResource, resource.MustParse("5"))
|
||||
|
||||
for j := 0; j < 2; j++ {
|
||||
// Request 2 of the available resources for the victim pods
|
||||
@ -225,11 +217,7 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
|
||||
pods := make([]*v1.Pod, 0, len(nodeList.Items))
|
||||
for i, node := range nodeList.Items {
|
||||
// Update each node to advertise 3 available extended resources
|
||||
nodeCopy := node.DeepCopy()
|
||||
nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("5")
|
||||
nodeCopy.Status.Allocatable[testExtendedResource] = resource.MustParse("5")
|
||||
err := patchNode(ctx, cs, &node, nodeCopy)
|
||||
framework.ExpectNoError(err)
|
||||
e2enode.AddExtendedResource(ctx, cs, node.Name, testExtendedResource, resource.MustParse("5"))
|
||||
|
||||
for j := 0; j < 2; j++ {
|
||||
// Request 2 of the available resources for the victim pods
|
||||
@ -332,11 +320,7 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
|
||||
ginkgo.By("Select a node to run the lower and higher priority pods")
|
||||
gomega.Expect(nodeList.Items).ToNot(gomega.BeEmpty(), "We need at least one node for the test to run")
|
||||
node := nodeList.Items[0]
|
||||
nodeCopy := node.DeepCopy()
|
||||
nodeCopy.Status.Capacity[testExtendedResource] = resource.MustParse("1")
|
||||
nodeCopy.Status.Allocatable[testExtendedResource] = resource.MustParse("1")
|
||||
err := patchNode(ctx, cs, &node, nodeCopy)
|
||||
framework.ExpectNoError(err)
|
||||
e2enode.AddExtendedResource(ctx, cs, node.Name, testExtendedResource, resource.MustParse("1"))
|
||||
|
||||
// prepare node affinity to make sure both the lower and higher priority pods are scheduled on the same node
|
||||
testNodeAffinity := v1.Affinity{
|
||||
@ -385,7 +369,7 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
|
||||
framework.Logf("Created pod: %v", preemptorPod.Name)
|
||||
|
||||
ginkgo.By("Waiting for the victim pod to be terminating")
|
||||
err = e2epod.WaitForPodTerminatingInNamespaceTimeout(ctx, f.ClientSet, victimPod.Name, victimPod.Namespace, framework.PodDeleteTimeout)
|
||||
err := e2epod.WaitForPodTerminatingInNamespaceTimeout(ctx, f.ClientSet, victimPod.Name, victimPod.Namespace, framework.PodDeleteTimeout)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Verifying the pod has the pod disruption condition")
|
||||
@ -412,11 +396,7 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
|
||||
framework.ExpectNoError(err)
|
||||
// update Node API object with a fake resource
|
||||
ginkgo.By(fmt.Sprintf("Apply 10 fake resource to node %v.", node.Name))
|
||||
nodeCopy := node.DeepCopy()
|
||||
nodeCopy.Status.Capacity[fakeRes] = resource.MustParse("10")
|
||||
nodeCopy.Status.Allocatable[fakeRes] = resource.MustParse("10")
|
||||
err = patchNode(ctx, cs, node, nodeCopy)
|
||||
framework.ExpectNoError(err)
|
||||
e2enode.AddExtendedResource(ctx, cs, node.Name, fakeRes, resource.MustParse("10"))
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
})
|
||||
@ -425,11 +405,7 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
|
||||
e2enode.RemoveLabelOffNode(cs, nodeName, topologyKey)
|
||||
}
|
||||
for _, node := range nodes {
|
||||
nodeCopy := node.DeepCopy()
|
||||
delete(nodeCopy.Status.Capacity, fakeRes)
|
||||
delete(nodeCopy.Status.Allocatable, fakeRes)
|
||||
err := patchNode(ctx, cs, node, nodeCopy)
|
||||
framework.ExpectNoError(err)
|
||||
e2enode.RemoveExtendedResource(ctx, cs, node.Name, fakeRes)
|
||||
}
|
||||
})
|
||||
|
||||
@ -564,11 +540,7 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
|
||||
}
|
||||
|
||||
if node != nil {
|
||||
nodeCopy := node.DeepCopy()
|
||||
delete(nodeCopy.Status.Capacity, fakecpu)
|
||||
delete(nodeCopy.Status.Allocatable, fakecpu)
|
||||
err := patchNode(ctx, cs, node, nodeCopy)
|
||||
framework.ExpectNoError(err)
|
||||
e2enode.RemoveExtendedResource(ctx, cs, node.Name, fakecpu)
|
||||
}
|
||||
for _, pair := range priorityPairs {
|
||||
_ = cs.SchedulingV1().PriorityClasses().Delete(ctx, pair.name, *metav1.NewDeleteOptions(0))
|
||||
@ -597,11 +569,7 @@ var _ = SIGDescribe("SchedulerPreemption", framework.WithSerial(), func() {
|
||||
}
|
||||
|
||||
// update Node API object with a fake resource
|
||||
nodeCopy := node.DeepCopy()
|
||||
nodeCopy.Status.Capacity[fakecpu] = resource.MustParse("1000")
|
||||
nodeCopy.Status.Allocatable[fakecpu] = resource.MustParse("1000")
|
||||
err = patchNode(ctx, cs, node, nodeCopy)
|
||||
framework.ExpectNoError(err)
|
||||
e2enode.AddExtendedResource(ctx, cs, node.Name, fakecpu, resource.MustParse("1000"))
|
||||
|
||||
// create four PriorityClass: p1, p2, p3, p4
|
||||
for i := 1; i <= 4; i++ {
|
||||
@ -920,24 +888,6 @@ func waitForPreemptingWithTimeout(ctx context.Context, f *framework.Framework, p
|
||||
framework.ExpectNoError(err, "pod %v/%v failed to preempt other pods", pod.Namespace, pod.Name)
|
||||
}
|
||||
|
||||
func patchNode(ctx context.Context, client clientset.Interface, old *v1.Node, new *v1.Node) error {
|
||||
oldData, err := json.Marshal(old)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newData, err := json.Marshal(new)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create merge patch for node %q: %w", old.Name, err)
|
||||
}
|
||||
_, err = client.CoreV1().Nodes().Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status")
|
||||
return err
|
||||
}
|
||||
|
||||
func patchPriorityClass(ctx context.Context, cs clientset.Interface, old, new *schedulingv1.PriorityClass) error {
|
||||
oldData, err := json.Marshal(old)
|
||||
if err != nil {
|
||||
|
Loading…
Reference in New Issue
Block a user