diff --git a/pkg/kubelet/kubelet_pods.go b/pkg/kubelet/kubelet_pods.go index 6867a4d4a1f..ff872985ce5 100644 --- a/pkg/kubelet/kubelet_pods.go +++ b/pkg/kubelet/kubelet_pods.go @@ -2094,6 +2094,18 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon if oldStatus.Resources == nil { oldStatus.Resources = &v1.ResourceRequirements{} } + + convertCustomResources := func(inResources, outResources v1.ResourceList) { + for resourceName, resourceQuantity := range inResources { + if resourceName == v1.ResourceCPU || resourceName == v1.ResourceMemory || + resourceName == v1.ResourceStorage || resourceName == v1.ResourceEphemeralStorage { + continue + } + + outResources[resourceName] = resourceQuantity.DeepCopy() + } + } + // Convert Limits if container.Resources.Limits != nil { limits = make(v1.ResourceList) @@ -2110,6 +2122,11 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon if ephemeralStorage, found := container.Resources.Limits[v1.ResourceEphemeralStorage]; found { limits[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy() } + if storage, found := container.Resources.Limits[v1.ResourceStorage]; found { + limits[v1.ResourceStorage] = storage.DeepCopy() + } + + convertCustomResources(container.Resources.Limits, limits) } // Convert Requests if status.AllocatedResources != nil { @@ -2125,9 +2142,13 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon if ephemeralStorage, found := status.AllocatedResources[v1.ResourceEphemeralStorage]; found { requests[v1.ResourceEphemeralStorage] = ephemeralStorage.DeepCopy() } + if storage, found := status.AllocatedResources[v1.ResourceStorage]; found { + requests[v1.ResourceStorage] = storage.DeepCopy() + } + + convertCustomResources(status.AllocatedResources, requests) } - //TODO(vinaykul,derekwaynecarr,InPlacePodVerticalScaling): Update this to include extended resources in - // addition to CPU, memory, ephemeral storage. Add test case for extended resources. + resources := &v1.ResourceRequirements{ Limits: limits, Requests: requests, diff --git a/pkg/kubelet/kubelet_pods_test.go b/pkg/kubelet/kubelet_pods_test.go index c8b32aaf82d..46c25b32e49 100644 --- a/pkg/kubelet/kubelet_pods_test.go +++ b/pkg/kubelet/kubelet_pods_test.go @@ -4585,8 +4585,24 @@ func TestConvertToAPIContainerStatusesForResources(t *testing.T) { CPU2AndMem2G := v1.ResourceList{v1.ResourceCPU: resource.MustParse("2"), v1.ResourceMemory: resource.MustParse("2Gi")} CPU1AndMem1GAndStorage2G := CPU1AndMem1G.DeepCopy() CPU1AndMem1GAndStorage2G[v1.ResourceEphemeralStorage] = resource.MustParse("2Gi") + CPU1AndMem1GAndStorage2G[v1.ResourceStorage] = resource.MustParse("2Gi") CPU2AndMem2GAndStorage2G := CPU2AndMem2G.DeepCopy() CPU2AndMem2GAndStorage2G[v1.ResourceEphemeralStorage] = resource.MustParse("2Gi") + CPU2AndMem2GAndStorage2G[v1.ResourceStorage] = resource.MustParse("2Gi") + + addExtendedResource := func(list v1.ResourceList) v1.ResourceList { + const stubCustomResource = v1.ResourceName("dummy.io/dummy") + + withExtendedResource := list.DeepCopy() + for _, resourceName := range []v1.ResourceName{v1.ResourceMemory, v1.ResourceCPU} { + if _, exists := withExtendedResource[resourceName]; !exists { + withExtendedResource[resourceName] = resource.MustParse("0") + } + } + + withExtendedResource[stubCustomResource] = resource.MustParse("1") + return withExtendedResource + } testKubelet := newTestKubelet(t, false) defer testKubelet.Cleanup() @@ -4734,6 +4750,98 @@ func TestConvertToAPIContainerStatusesForResources(t *testing.T) { }, }, }, + "BestEffort QoSPod with extended resources": { + Resources: []v1.ResourceRequirements{{Requests: addExtendedResource(v1.ResourceList{})}}, + OldStatus: []v1.ContainerStatus{ + { + Name: testContainerName, + Image: "img", + ImageID: "img1234", + State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}, + Resources: &v1.ResourceRequirements{}, + }, + }, + Expected: []v1.ContainerStatus{ + { + Name: testContainerName, + ContainerID: testContainerID.String(), + Image: "img", + ImageID: "img1234", + State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.NewTime(nowTime)}}, + AllocatedResources: addExtendedResource(v1.ResourceList{}), + Resources: &v1.ResourceRequirements{Requests: addExtendedResource(v1.ResourceList{})}, + }, + }, + }, + "BurstableQoSPod with extended resources": { + Resources: []v1.ResourceRequirements{{Requests: addExtendedResource(CPU1AndMem1G)}}, + OldStatus: []v1.ContainerStatus{ + { + Name: testContainerName, + Image: "img", + ImageID: "img1234", + State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}, + Resources: &v1.ResourceRequirements{}, + }, + }, + Expected: []v1.ContainerStatus{ + { + Name: testContainerName, + ContainerID: testContainerID.String(), + Image: "img", + ImageID: "img1234", + State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.NewTime(nowTime)}}, + AllocatedResources: addExtendedResource(CPU1AndMem1G), + Resources: &v1.ResourceRequirements{Requests: addExtendedResource(CPU1AndMem1G)}, + }, + }, + }, + "BurstableQoSPod with storage, ephemeral storage and extended resources": { + Resources: []v1.ResourceRequirements{{Requests: addExtendedResource(CPU1AndMem1GAndStorage2G)}}, + OldStatus: []v1.ContainerStatus{ + { + Name: testContainerName, + Image: "img", + ImageID: "img1234", + State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}, + Resources: &v1.ResourceRequirements{}, + }, + }, + Expected: []v1.ContainerStatus{ + { + Name: testContainerName, + ContainerID: testContainerID.String(), + Image: "img", + ImageID: "img1234", + State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.NewTime(nowTime)}}, + AllocatedResources: addExtendedResource(CPU1AndMem1GAndStorage2G), + Resources: &v1.ResourceRequirements{Requests: addExtendedResource(CPU1AndMem1GAndStorage2G)}, + }, + }, + }, + "GuaranteedQoSPod with extended resources": { + Resources: []v1.ResourceRequirements{{Requests: addExtendedResource(CPU1AndMem1G), Limits: addExtendedResource(CPU1AndMem1G)}}, + OldStatus: []v1.ContainerStatus{ + { + Name: testContainerName, + Image: "img", + ImageID: "img1234", + State: v1.ContainerState{Running: &v1.ContainerStateRunning{}}, + Resources: &v1.ResourceRequirements{}, + }, + }, + Expected: []v1.ContainerStatus{ + { + Name: testContainerName, + ContainerID: testContainerID.String(), + Image: "img", + ImageID: "img1234", + State: v1.ContainerState{Running: &v1.ContainerStateRunning{StartedAt: metav1.NewTime(nowTime)}}, + AllocatedResources: addExtendedResource(CPU1AndMem1G), + Resources: &v1.ResourceRequirements{Requests: addExtendedResource(CPU1AndMem1G), Limits: addExtendedResource(CPU1AndMem1G)}, + }, + }, + }, } { tPod := testPod.DeepCopy() tPod.Name = fmt.Sprintf("%s-%d", testPod.Name, idx) diff --git a/test/e2e/node/pod_resize.go b/test/e2e/node/pod_resize.go index 96824ae5ff7..464981e3019 100644 --- a/test/e2e/node/pod_resize.go +++ b/test/e2e/node/pod_resize.go @@ -31,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/strategicpatch" clientset "k8s.io/client-go/kubernetes" podutil "k8s.io/kubernetes/pkg/api/v1/pod" resourceapi "k8s.io/kubernetes/pkg/api/v1/resource" @@ -63,14 +64,16 @@ const ( PollInterval time.Duration = 2 * time.Second PollTimeout time.Duration = 4 * time.Minute + + fakeExtendedResource = "dummy.com/dummy" ) type ContainerResources struct { - CPUReq, CPULim, MemReq, MemLim, EphStorReq, EphStorLim string + CPUReq, CPULim, MemReq, MemLim, EphStorReq, EphStorLim, ExtendedResourceReq, ExtendedResourceLim string } type ContainerAllocations struct { - CPUAlloc, MemAlloc, ephStorAlloc string + CPUAlloc, MemAlloc, ephStorAlloc, ExtendedResourceAlloc string } type TestContainerInfo struct { @@ -146,6 +149,9 @@ func getTestResourceInfo(tcInfo TestContainerInfo) (v1.ResourceRequirements, v1. if tcInfo.Resources.EphStorLim != "" { lim[v1.ResourceEphemeralStorage] = resource.MustParse(tcInfo.Resources.EphStorLim) } + if tcInfo.Resources.ExtendedResourceLim != "" { + lim[fakeExtendedResource] = resource.MustParse(tcInfo.Resources.ExtendedResourceLim) + } if tcInfo.Resources.CPUReq != "" { req[v1.ResourceCPU] = resource.MustParse(tcInfo.Resources.CPUReq) } @@ -155,6 +161,9 @@ func getTestResourceInfo(tcInfo TestContainerInfo) (v1.ResourceRequirements, v1. if tcInfo.Resources.EphStorReq != "" { req[v1.ResourceEphemeralStorage] = resource.MustParse(tcInfo.Resources.EphStorReq) } + if tcInfo.Resources.ExtendedResourceReq != "" { + req[fakeExtendedResource] = resource.MustParse(tcInfo.Resources.ExtendedResourceReq) + } res = v1.ResourceRequirements{Limits: lim, Requests: req} } if tcInfo.Allocations != nil { @@ -168,7 +177,9 @@ func getTestResourceInfo(tcInfo TestContainerInfo) (v1.ResourceRequirements, v1. if tcInfo.Allocations.ephStorAlloc != "" { alloc[v1.ResourceEphemeralStorage] = resource.MustParse(tcInfo.Allocations.ephStorAlloc) } - + if tcInfo.Allocations.ExtendedResourceAlloc != "" { + alloc[fakeExtendedResource] = resource.MustParse(tcInfo.Allocations.ExtendedResourceAlloc) + } } if tcInfo.CPUPolicy != nil { cpuPol := v1.ContainerResizePolicy{ResourceName: v1.ResourceCPU, RestartPolicy: *tcInfo.CPUPolicy} @@ -318,7 +329,8 @@ func verifyPodAllocations(pod *v1.Pod, tcInfo []TestContainerInfo, flagError boo cStatus := cStatusMap[ci.Name] if ci.Allocations == nil { if ci.Resources != nil { - alloc := &ContainerAllocations{CPUAlloc: ci.Resources.CPUReq, MemAlloc: ci.Resources.MemReq} + alloc := &ContainerAllocations{CPUAlloc: ci.Resources.CPUReq, MemAlloc: ci.Resources.MemReq, + ExtendedResourceAlloc: ci.Resources.ExtendedResourceReq} ci.Allocations = alloc defer func() { ci.Allocations = nil @@ -571,18 +583,92 @@ func genPatchString(containers []TestContainerInfo) (string, error) { return string(patchBytes), nil } +func patchNode(ctx context.Context, client clientset.Interface, old *v1.Node, new *v1.Node) error { + oldData, err := json.Marshal(old) + if err != nil { + return err + } + + newData, err := json.Marshal(new) + if err != nil { + return err + } + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{}) + if err != nil { + return fmt.Errorf("failed to create merge patch for node %q: %w", old.Name, err) + } + _, err = client.CoreV1().Nodes().Patch(ctx, old.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}, "status") + return err +} + +func addExtendedResource(clientSet clientset.Interface, nodeName, extendedResourceName string, extendedResourceQuantity resource.Quantity) { + extendedResource := v1.ResourceName(extendedResourceName) + + ginkgo.By("Adding a custom resource") + OriginalNode, err := clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + node := OriginalNode.DeepCopy() + node.Status.Capacity[extendedResource] = extendedResourceQuantity + node.Status.Allocatable[extendedResource] = extendedResourceQuantity + err = patchNode(context.Background(), clientSet, OriginalNode.DeepCopy(), node) + framework.ExpectNoError(err) + + gomega.Eventually(func() error { + node, err = clientSet.CoreV1().Nodes().Get(context.Background(), node.Name, metav1.GetOptions{}) + framework.ExpectNoError(err) + + fakeResourceCapacity, exists := node.Status.Capacity[extendedResource] + if !exists { + return fmt.Errorf("node %s has no %s resource capacity", node.Name, extendedResourceName) + } + if expectedResource := resource.MustParse("123"); fakeResourceCapacity.Cmp(expectedResource) != 0 { + return fmt.Errorf("node %s has resource capacity %s, expected: %s", node.Name, fakeResourceCapacity.String(), expectedResource.String()) + } + + return nil + }).WithTimeout(30 * time.Second).WithPolling(time.Second).ShouldNot(gomega.HaveOccurred()) +} + +func removeExtendedResource(clientSet clientset.Interface, nodeName, extendedResourceName string) { + extendedResource := v1.ResourceName(extendedResourceName) + + ginkgo.By("Removing a custom resource") + originalNode, err := clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + node := originalNode.DeepCopy() + delete(node.Status.Capacity, extendedResource) + delete(node.Status.Allocatable, extendedResource) + err = patchNode(context.Background(), clientSet, originalNode.DeepCopy(), node) + framework.ExpectNoError(err) + + gomega.Eventually(func() error { + node, err = clientSet.CoreV1().Nodes().Get(context.Background(), nodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + if _, exists := node.Status.Capacity[extendedResource]; exists { + return fmt.Errorf("node %s has resource capacity %s which is expected to be removed", node.Name, extendedResourceName) + } + + return nil + }).WithTimeout(30 * time.Second).WithPolling(time.Second).ShouldNot(gomega.HaveOccurred()) +} + func doPodResizeTests() { f := framework.NewDefaultFramework("pod-resize") var podClient *e2epod.PodClient + ginkgo.BeforeEach(func() { podClient = e2epod.NewPodClient(f) }) type testCase struct { - name string - containers []TestContainerInfo - patchString string - expected []TestContainerInfo + name string + containers []TestContainerInfo + patchString string + expected []TestContainerInfo + addExtendedResource bool } noRestart := v1.NotRequired @@ -1284,6 +1370,31 @@ func doPodResizeTests() { }, }, }, + { + name: "Guaranteed QoS pod, one container - increase CPU & memory with an extended resource", + containers: []TestContainerInfo{ + { + Name: "c1", + Resources: &ContainerResources{CPUReq: "100m", CPULim: "100m", MemReq: "200Mi", MemLim: "200Mi", + ExtendedResourceReq: "1", ExtendedResourceLim: "1"}, + CPUPolicy: &noRestart, + MemPolicy: &noRestart, + }, + }, + patchString: `{"spec":{"containers":[ + {"name":"c1", "resources":{"requests":{"cpu":"200m","memory":"400Mi"},"limits":{"cpu":"200m","memory":"400Mi"}}} + ]}}`, + expected: []TestContainerInfo{ + { + Name: "c1", + Resources: &ContainerResources{CPUReq: "200m", CPULim: "200m", MemReq: "400Mi", MemLim: "400Mi", + ExtendedResourceReq: "1", ExtendedResourceLim: "1"}, + CPUPolicy: &noRestart, + MemPolicy: &noRestart, + }, + }, + addExtendedResource: true, + }, } for idx := range tests { @@ -1297,6 +1408,20 @@ func doPodResizeTests() { initDefaultResizePolicy(tc.expected) testPod = makeTestPod(f.Namespace.Name, "testpod", tStamp, tc.containers) + if tc.addExtendedResource { + nodes, err := e2enode.GetReadySchedulableNodes(context.Background(), f.ClientSet) + framework.ExpectNoError(err) + + for _, node := range nodes.Items { + addExtendedResource(f.ClientSet, node.Name, fakeExtendedResource, resource.MustParse("123")) + } + defer func() { + for _, node := range nodes.Items { + removeExtendedResource(f.ClientSet, node.Name, fakeExtendedResource) + } + }() + } + ginkgo.By("creating pod") newPod := podClient.CreateSync(ctx, testPod)