diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 424aacfc224..f5d000e15be 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -51,6 +51,8 @@ import ( "k8s.io/kubernetes/pkg/util/selinux" ) +const nodeWithoutTopology = -1 + // ActivePodsFunc is a function that returns a list of pods to reconcile. type ActivePodsFunc func() []*v1.Pod @@ -400,7 +402,6 @@ func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error { } m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, m.devicesToReuse[string(pod.UID)]) return nil - } // UpdatePluginResources updates node resources based on devices already allocated to pods. @@ -780,7 +781,6 @@ func (m *ManagerImpl) filterByAffinity(podUID, contName, resource string, availa // available device does not have any NUMA Nodes associated with it, add it // to a list of NUMA Nodes for the fake NUMANode -1. perNodeDevices := make(map[int]sets.String) - nodeWithoutTopology := -1 for d := range available { if m.allDevices[resource][d].Topology == nil || len(m.allDevices[resource][d].Topology.Nodes) == 0 { if _, ok := perNodeDevices[nodeWithoutTopology]; !ok { @@ -949,7 +949,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont m.mutex.Lock() for dev := range allocDevices { if m.allDevices[resource][dev].Topology == nil || len(m.allDevices[resource][dev].Topology.Nodes) == 0 { - allocDevicesWithNUMA[0] = append(allocDevicesWithNUMA[0], dev) + allocDevicesWithNUMA[nodeWithoutTopology] = append(allocDevicesWithNUMA[nodeWithoutTopology], dev) continue } for idx := range m.allDevices[resource][dev].Topology.Nodes { diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 0f74dd5a488..afe4fdd9009 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -468,6 +468,7 @@ func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.Cont func TestCheckpoint(t *testing.T) { resourceName1 := "domain1.com/resource1" resourceName2 := "domain2.com/resource2" + resourceName3 := "domain2.com/resource3" as := assert.New(t) tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) @@ -500,6 +501,10 @@ func TestCheckpoint(t *testing.T) { constructDevices([]string{"dev4"}), constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"}, map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + testManager.podDevices.insert("pod3", "con3", resourceName3, + checkpoint.DevicesPerNUMA{nodeWithoutTopology: []string{"dev5"}}, + constructAllocResp(map[string]string{"/dev/r1dev5": "/dev/r1dev5"}, + map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) testManager.healthyDevices[resourceName1] = sets.NewString() testManager.healthyDevices[resourceName1].Insert("dev1") @@ -510,6 +515,8 @@ func TestCheckpoint(t *testing.T) { testManager.healthyDevices[resourceName2] = sets.NewString() testManager.healthyDevices[resourceName2].Insert("dev1") testManager.healthyDevices[resourceName2].Insert("dev2") + testManager.healthyDevices[resourceName3] = sets.NewString() + testManager.healthyDevices[resourceName3].Insert("dev5") expectedPodDevices := testManager.podDevices expectedAllocatedDevices := testManager.podDevices.devices() @@ -717,6 +724,9 @@ func TestFilterByAffinity(t *testing.T) { }, }, }, + "devwithouttopology": { + ID: "dev5", + }, }, } diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index 8996930056c..36e325a5b72 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -344,18 +344,20 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) ResourceDev devicePluginMap := make(map[string]pluginapi.Device) for numaid, devlist := range allocateInfo.deviceIds { for _, devId := range devlist { - NUMANodes := []*pluginapi.NUMANode{{ID: numaid}} - if pDev, ok := devicePluginMap[devId]; ok && pDev.Topology != nil { - if nodes := pDev.Topology.GetNodes(); nodes != nil { - NUMANodes = append(NUMANodes, nodes...) + var topology *pluginapi.TopologyInfo + if numaid != nodeWithoutTopology { + NUMANodes := []*pluginapi.NUMANode{{ID: numaid}} + if pDev, ok := devicePluginMap[devId]; ok && pDev.Topology != nil { + if nodes := pDev.Topology.GetNodes(); nodes != nil { + NUMANodes = append(NUMANodes, nodes...) + } } - } - devicePluginMap[devId] = pluginapi.Device{ // ID and Healthy are not relevant here. - Topology: &pluginapi.TopologyInfo{ - Nodes: NUMANodes, - }, + topology = &pluginapi.TopologyInfo{Nodes: NUMANodes} + } + devicePluginMap[devId] = pluginapi.Device{ + Topology: topology, } } } diff --git a/test/e2e_node/image_list.go b/test/e2e_node/image_list.go index dbae408c251..7c6aaa6002d 100644 --- a/test/e2e_node/image_list.go +++ b/test/e2e_node/image_list.go @@ -84,6 +84,11 @@ func updateImageAllowList() { } else { framework.ImagePrePullList.Insert(gpuDevicePluginImage) } + if kubeVirtPluginImage, err := getKubeVirtDevicePluginImage(); err != nil { + klog.Errorln(err) + } else { + framework.ImagePrePullList.Insert(kubeVirtPluginImage) + } } func getNodeProblemDetectorImage() string { @@ -254,3 +259,23 @@ func getSRIOVDevicePluginImage() (string, error) { } return ds.Spec.Template.Spec.Containers[0].Image, nil } + +//TODO generilize this function with above one +// getKubeVirtDevicePluginImage returns the image of SRIOV device plugin. +func getKubeVirtDevicePluginImage() (string, error) { + data, err := e2etestfiles.Read(KubeVirtDevicePluginDSYAML) + if err != nil { + return "", fmt.Errorf("failed to read the device plugin manifest: %w", err) + } + ds, err := e2emanifest.DaemonSetFromData(data) + if err != nil { + return "", fmt.Errorf("failed to parse the device plugin image: %w", err) + } + if ds == nil { + return "", fmt.Errorf("failed to parse the device plugin image: the extracted DaemonSet is nil") + } + if len(ds.Spec.Template.Spec.Containers) < 1 { + return "", fmt.Errorf("failed to parse the device plugin image: cannot extract the container from YAML") + } + return ds.Spec.Template.Spec.Containers[0].Image, nil +} diff --git a/test/e2e_node/podresources_test.go b/test/e2e_node/podresources_test.go index 2384cd9d68c..5408d723999 100644 --- a/test/e2e_node/podresources_test.go +++ b/test/e2e_node/podresources_test.go @@ -18,8 +18,10 @@ package e2enode import ( "context" + "errors" "fmt" "io/ioutil" + "os" "strings" "time" @@ -34,10 +36,13 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" "k8s.io/kubernetes/pkg/kubelet/util" + testutils "k8s.io/kubernetes/test/utils" "k8s.io/kubernetes/test/e2e/framework" e2enode "k8s.io/kubernetes/test/e2e/framework/node" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" + e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" "github.com/onsi/ginkgo" "github.com/onsi/gomega" @@ -169,7 +174,27 @@ func findContainerDeviceByName(devs []*kubeletpodresourcesv1.ContainerDevices, r return nil } -func matchPodDescWithResources(expected []podDesc, found podResMap) error { +type deviceCheckFunc func(expect podDesc, found podResMap) error + +func checkForTopology(expect podDesc, found podResMap) error { + if cnts, ok := found[KubeVirtResourceName]; ok { + for _, cnt := range cnts { + for _, cd := range cnt.GetDevices() { + if cd.ResourceName != KubeVirtResourceName { + continue + } + if cd.Topology != nil { + //we expect nil topology + return fmt.Errorf("Nil topology is expected") + } + } + + } + } + return nil +} + +func matchPodDescWithResources(expected []podDesc, found podResMap, deviceCheck deviceCheckFunc) error { for _, podReq := range expected { framework.Logf("matching: %#v", podReq) @@ -202,14 +227,20 @@ func matchPodDescWithResources(expected []podDesc, found podResMap) error { return fmt.Errorf("pod %q container %q expected no resources, got %v", podReq.podName, podReq.cntName, devs) } } + if deviceCheck != nil { + err := deviceCheck(podReq, found) + if err != nil { + return err + } + } } return nil } -func expectPodResources(offset int, cli kubeletpodresourcesv1.PodResourcesListerClient, expected []podDesc) { +func expectPodResources(offset int, cli kubeletpodresourcesv1.PodResourcesListerClient, expected []podDesc, cf deviceCheckFunc) { gomega.EventuallyWithOffset(1+offset, func() error { found := getPodResources(cli) - return matchPodDescWithResources(expected, found) + return matchPodDescWithResources(expected, found, cf) }, time.Minute, 10*time.Second).Should(gomega.BeNil()) } @@ -253,7 +284,7 @@ func podresourcesListTests(f *framework.Framework, cli kubeletpodresourcesv1.Pod }, } tpd.createPodsForTest(f, expected) - expectPodResources(1, cli, expected) + expectPodResources(1, cli, expected, nil) tpd.deletePodsForTest(f) tpd = newTestPodData() @@ -309,7 +340,7 @@ func podresourcesListTests(f *framework.Framework, cli kubeletpodresourcesv1.Pod } tpd.createPodsForTest(f, expected) - expectPodResources(1, cli, expected) + expectPodResources(1, cli, expected, nil) tpd.deletePodsForTest(f) tpd = newTestPodData() @@ -353,7 +384,7 @@ func podresourcesListTests(f *framework.Framework, cli kubeletpodresourcesv1.Pod } tpd.createPodsForTest(f, expected) - expectPodResources(1, cli, expected) + expectPodResources(1, cli, expected, nil) if sd != nil { extra = podDesc{ @@ -377,7 +408,7 @@ func podresourcesListTests(f *framework.Framework, cli kubeletpodresourcesv1.Pod }) expected = append(expected, extra) - expectPodResources(1, cli, expected) + expectPodResources(1, cli, expected, nil) tpd.deletePodsForTest(f) tpd = newTestPodData() @@ -433,11 +464,11 @@ func podresourcesListTests(f *framework.Framework, cli kubeletpodresourcesv1.Pod } } tpd.createPodsForTest(f, expected) - expectPodResources(1, cli, expected) + expectPodResources(1, cli, expected, nil) tpd.deletePod(f, "pod-01") expectedPostDelete := filterOutDesc(expected, "pod-01") - expectPodResources(1, cli, expectedPostDelete) + expectPodResources(1, cli, expectedPostDelete, nil) tpd.deletePodsForTest(f) } @@ -644,9 +675,85 @@ var _ = SIGDescribe("POD Resources [Serial] [Feature:PodResources][NodeFeature:P defer conn.Close() ginkgo.By("checking GetAllocatableResources fail if the feature gate is not enabled") - _, err = cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + allocatableRes, err := cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + framework.Logf("GetAllocatableResources result: %v, err: %v", allocatableRes, err) framework.ExpectError(err, "With feature gate disabled, the call must fail") }) + }) + + ginkgo.Context("Use KubeVirt device plugin, which reports resources w/o hardware topology", func() { + ginkgo.It("should return proper podresources the same as before the restart of kubelet", func() { + + if !utilfeature.DefaultFeatureGate.Enabled(kubefeatures.KubeletPodResources) { + e2eskipper.Skipf("this test is meant to run with the POD Resources Extensions feature gate enabled") + } + + _, err := os.Stat("/dev/kvm") + if errors.Is(err, os.ErrNotExist) { + e2eskipper.Skipf("KubeVirt device plugin could work only in kvm based environment") + } + + _, cpuAlloc, _ := getLocalNodeCPUDetails(f) + + if cpuAlloc < minCoreCount { + e2eskipper.Skipf("Skipping CPU Manager tests since the CPU allocatable < %d", minCoreCount) + } + + // Make sure all the feature gates and the right settings are in place. + oldCfg := configurePodResourcesInKubelet(f, true, reservedSystemCPUs) + defer func() { + // restore kubelet config + setOldKubeletConfig(f, oldCfg) + + // Delete state file to allow repeated runs + deleteStateFile() + }() + + dpPod := setupKubeVirtDevicePluginOrFail(f) + defer teardownKubeVirtDevicePluginOrFail(f, dpPod) + + waitForKubeVirtResources(f, dpPod) + + endpoint, err := util.LocalEndpoint(defaultPodResourcesPath, podresources.Socket) + framework.ExpectNoError(err) + + cli, conn, err := podresources.GetV1Client(endpoint, defaultPodResourcesTimeout, defaultPodResourcesMaxSize) + framework.ExpectNoError(err) + defer conn.Close() + + ginkgo.By("checking List and resources kubevirt resource should be without topology") + + allocatableResponse, _ := cli.GetAllocatableResources(context.TODO(), &kubeletpodresourcesv1.AllocatableResourcesRequest{}) + for _, dev := range allocatableResponse.GetDevices() { + if dev.ResourceName != KubeVirtResourceName { + continue + } + + framework.ExpectEqual(dev.Topology == nil, true, "Topology is expected to be empty for kubevirt resources") + } + + // Run pod which requires KubeVirtResourceName + desc := podDesc{ + podName: "pod-01", + cntName: "cnt-01", + resourceName: KubeVirtResourceName, + resourceAmount: 1, + cpuCount: 1, + } + + tpd := newTestPodData() + tpd.createPodsForTest(f, []podDesc{ + desc, + }) + + expectPodResources(1, cli, []podDesc{desc}, checkForTopology) + + ginkgo.By("Restarting Kubelet") + restartKubelet() + framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout) + expectPodResources(1, cli, []podDesc{desc}, checkForTopology) + tpd.deletePodsForTest(f) + }) }) }) @@ -739,3 +846,76 @@ func enablePodResourcesFeatureGateInKubelet(f *framework.Framework) (oldCfg *kub return oldCfg } + +func setupKubeVirtDevicePluginOrFail(f *framework.Framework) *v1.Pod { + e2enode.WaitForNodeToBeReady(f.ClientSet, framework.TestContext.NodeName, 5*time.Minute) + + dp := getKubeVirtDevicePluginPod() + dp.Spec.NodeName = framework.TestContext.NodeName + + ginkgo.By("Create KubeVirt device plugin pod") + + dpPod, err := f.ClientSet.CoreV1().Pods(metav1.NamespaceSystem).Create(context.TODO(), dp, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + if err = e2epod.WaitForPodCondition(f.ClientSet, metav1.NamespaceSystem, dp.Name, "Ready", 120*time.Second, testutils.PodRunningReady); err != nil { + framework.Logf("KubeVirt Pod %v took too long to enter running/ready: %v", dp.Name, err) + } + framework.ExpectNoError(err) + + return dpPod +} + +func teardownKubeVirtDevicePluginOrFail(f *framework.Framework, pod *v1.Pod) { + gp := int64(0) + deleteOptions := metav1.DeleteOptions{ + GracePeriodSeconds: &gp, + } + ginkgo.By(fmt.Sprintf("Delete KubeVirt device plugin pod %s/%s", pod.Namespace, pod.Name)) + err := f.ClientSet.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions) + + framework.ExpectNoError(err) +} + +func findKubeVirtResource(node *v1.Node) int64 { + framework.Logf("Node status allocatable: %v", node.Status.Allocatable) + for key, val := range node.Status.Allocatable { + if string(key) == KubeVirtResourceName { + v := val.Value() + if v > 0 { + return v + } + } + } + return 0 +} + +func waitForKubeVirtResources(f *framework.Framework, pod *v1.Pod) { + ginkgo.By("Waiting for kubevirt resources to become available on the local node") + + gomega.Eventually(func() bool { + node := getLocalNode(f) + kubeVirtResourceAmount := findKubeVirtResource(node) + return kubeVirtResourceAmount != 0 + }, 2*time.Minute, framework.Poll).Should(gomega.BeTrue()) +} + +// getKubeVirtDevicePluginPod returns the Device Plugin pod for kube resources in e2e tests. +func getKubeVirtDevicePluginPod() *v1.Pod { + data, err := e2etestfiles.Read(KubeVirtDevicePluginDSYAML) + if err != nil { + framework.Fail(err.Error()) + } + + ds := readDaemonSetV1OrDie(data) + p := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: KubeVirtDevicePluginName, + Namespace: metav1.NamespaceSystem, + }, + + Spec: ds.Spec.Template.Spec, + } + + return p +} diff --git a/test/e2e_node/testing-manifests/kubevirt-kvm-ds.yaml b/test/e2e_node/testing-manifests/kubevirt-kvm-ds.yaml new file mode 100644 index 00000000000..d8e8b0fdc4f --- /dev/null +++ b/test/e2e_node/testing-manifests/kubevirt-kvm-ds.yaml @@ -0,0 +1,28 @@ +apiVersion: apps/v1 +kind: DaemonSet +metadata: + labels: + name: kubevirt-kvm-device-plugin + name: kubevirt-kvm-device-plugin +spec: + selector: + matchLabels: + name: kubevirt-kvm-device-plugin + template: + metadata: + labels: + name: kubevirt-kvm-device-plugin + spec: + containers: + - name: kubevirt-kvm-device-plugin + image: quay.io/kubevirt/device-plugin-kvm + args: ["-v", "3", "-logtostderr"] + securityContext: + privileged: true + volumeMounts: + - name: device-plugin + mountPath: /var/lib/kubelet/device-plugins + volumes: + - name: device-plugin + hostPath: + path: /var/lib/kubelet/device-plugins diff --git a/test/e2e_node/util_kubevirt.go b/test/e2e_node/util_kubevirt.go new file mode 100644 index 00000000000..4b444138fde --- /dev/null +++ b/test/e2e_node/util_kubevirt.go @@ -0,0 +1,27 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +const ( + KubeVirtDevicePluginDSYAML = "test/e2e_node/testing-manifests/kubevirt-kvm-ds.yaml" + + // KubeVirtDevicePluginName is the name of the device plugin pod + KubeVirtDevicePluginName = "kubevirt-device-plugin" + + // KubeVirtResourceName is the name of the resource provided by kubevirt device plugin + KubeVirtResourceName = "devices.kubevirt.io/kvm" +)