From a3f099ea4d7c759b9cd014aec8e3011b2363efde Mon Sep 17 00:00:00 2001 From: Kevin Klues Date: Sun, 2 Feb 2020 15:16:18 +0000 Subject: [PATCH] Split devicemanager Allocate into two functions Instead of having a single call for Allocate(), we now split this into two functions Allocate() and UpdatePluginResources(). The semantics split across them: // Allocate configures and assigns devices to a pod. From the requested // device resources, Allocate will communicate with the owning device // plugin to allow setup procedures to take place, and for the device // plugin to provide runtime settings to use the device (environment // variables, mount points and device files). Allocate(pod *v1.Pod) error // UpdatePluginResources updates node resources based on devices already // allocated to pods. The node object is provided for the device manager to // update the node capacity to reflect the currently available devices. UpdatePluginResources( node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error As we move to a model in which the TopologyManager is able to ensure aligned allocations from the CPUManager, devicemanger, and any other TopologManager HintProviders in the same synchronous loop, we will need to be able to call Allocate() independently from an UpdatePluginResources(). This commit makes that possible. --- pkg/kubelet/cm/container_manager_linux.go | 6 +++- pkg/kubelet/cm/devicemanager/manager.go | 8 ++++-- pkg/kubelet/cm/devicemanager/manager_stub.go | 7 ++++- pkg/kubelet/cm/devicemanager/manager_test.go | 30 +++++++------------- pkg/kubelet/cm/devicemanager/types.go | 20 +++++++------ 5 files changed, 39 insertions(+), 32 deletions(-) diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index d5df6f18c50..5de1d0a25d3 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -672,7 +672,11 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe } func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { - return cm.deviceManager.Allocate(node, attrs) + err := cm.deviceManager.Allocate(attrs.Pod) + if err != nil { + return err + } + return cm.deviceManager.UpdatePluginResources(node, attrs) } func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager { diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 4b5838ae033..86f04c587d8 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -369,13 +369,17 @@ func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error { // Allocate is the call that you can use to allocate a set of devices // from the registered device plugins. -func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { - pod := attrs.Pod +func (m *ManagerImpl) Allocate(pod *v1.Pod) error { err := m.allocatePodResources(pod) if err != nil { klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err) return err } + return nil +} + +func (m *ManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { + pod := attrs.Pod m.mutex.Lock() defer m.mutex.Unlock() diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index 4301c7dc3b9..6cb6aed62ef 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -45,7 +45,12 @@ func (h *ManagerStub) Stop() error { } // Allocate simply returns nil. -func (h *ManagerStub) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { +func (h *ManagerStub) Allocate(pod *v1.Pod) error { + return nil +} + +// UpdatePluginResources simply returns nil. +func (h *ManagerStub) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { return nil } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 5b7e8700594..c5df4f08154 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -30,6 +30,7 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" @@ -648,17 +649,6 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso return testManager, nil } -func getTestNodeInfo(allocatable v1.ResourceList) *schedulernodeinfo.NodeInfo { - cachedNode := &v1.Node{ - Status: v1.NodeStatus{ - Allocatable: allocatable, - }, - } - nodeInfo := &schedulernodeinfo.NodeInfo{} - nodeInfo.SetNode(cachedNode) - return nodeInfo -} - type TestResource struct { resourceName string resourceQuantity resource.Quantity @@ -686,7 +676,6 @@ func TestPodContainerDeviceAllocation(t *testing.T) { tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) defer os.RemoveAll(tmpDir) - nodeInfo := getTestNodeInfo(v1.ResourceList{}) testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources) as.Nil(err) @@ -738,7 +727,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) { pod := testCase.testPod activePods = append(activePods, pod) podsStub.updateActivePods(activePods) - err := testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) + err := testManager.Allocate(pod) if !reflect.DeepEqual(err, testCase.expErr) { t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v", testCase.description, testCase.expErr, err) @@ -780,7 +769,6 @@ func TestInitContainerDeviceAllocation(t *testing.T) { podsStub := activePodsStub{ activePods: []*v1.Pod{}, } - nodeInfo := getTestNodeInfo(v1.ResourceList{}) tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) defer os.RemoveAll(tmpDir) @@ -834,7 +822,7 @@ func TestInitContainerDeviceAllocation(t *testing.T) { }, } podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers}) - err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: podWithPluginResourcesInInitContainers}) + err = testManager.Allocate(podWithPluginResourcesInInitContainers) as.Nil(err) podUID := string(podWithPluginResourcesInInitContainers.UID) initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name @@ -855,7 +843,10 @@ func TestInitContainerDeviceAllocation(t *testing.T) { as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len()) } -func TestSanitizeNodeAllocatable(t *testing.T) { +func TestUpdatePluginResources(t *testing.T) { + pod := &v1.Pod{} + pod.UID = types.UID("testPod") + resourceName1 := "domain1.com/resource1" devID1 := "dev1" @@ -876,6 +867,8 @@ func TestSanitizeNodeAllocatable(t *testing.T) { podDevices: make(podDevices), checkpointManager: ckm, } + testManager.podDevices[string(pod.UID)] = make(containerDevices) + // require one of resource1 and one of resource2 testManager.allocatedDevices[resourceName1] = sets.NewString() testManager.allocatedDevices[resourceName1].Insert(devID1) @@ -893,7 +886,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) { nodeInfo := &schedulernodeinfo.NodeInfo{} nodeInfo.SetNode(cachedNode) - testManager.sanitizeNodeAllocatable(nodeInfo) + testManager.UpdatePluginResources(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources // allocatable in nodeInfo is less than needed, should update @@ -918,7 +911,6 @@ func TestDevicePreStartContainer(t *testing.T) { tmpDir, err := ioutil.TempDir("", "checkpoint") as.Nil(err) defer os.RemoveAll(tmpDir) - nodeInfo := getTestNodeInfo(v1.ResourceList{}) testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}) as.Nil(err) @@ -936,7 +928,7 @@ func TestDevicePreStartContainer(t *testing.T) { activePods := []*v1.Pod{} activePods = append(activePods, pod) podsStub.updateActivePods(activePods) - err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) + err = testManager.Allocate(pod) as.Nil(err) runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) as.Nil(err) diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index 1b76a89a1df..114b36cc7c7 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -34,15 +34,17 @@ type Manager interface { // Start starts device plugin registration service. Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error - // Allocate configures and assigns devices to pods. The pods are provided - // through the pod admission attributes in the attrs argument. From the - // requested device resources, Allocate will communicate with the owning - // device plugin to allow setup procedures to take place, and for the - // device plugin to provide runtime settings to use the device (environment - // variables, mount points and device files). The node object is provided - // for the device manager to update the node capacity to reflect the - // currently available devices. - Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error + // Allocate configures and assigns devices to a pod. From the requested + // device resources, Allocate will communicate with the owning device + // plugin to allow setup procedures to take place, and for the device + // plugin to provide runtime settings to use the device (environment + // variables, mount points and device files). + Allocate(pod *v1.Pod) error + + // UpdatePluginResources updates node resources based on devices already + // allocated to pods. The node object is provided for the device manager to + // update the node capacity to reflect the currently available devices. + UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error // Stop stops the manager. Stop() error