Split devicemanager Allocate into two functions

Instead of having a single call for Allocate(), we now split this into two
functions Allocate() and UpdatePluginResources().

The semantics split across them:

// Allocate configures and assigns devices to a pod. From the requested
// device resources, Allocate will communicate with the owning device
// plugin to allow setup procedures to take place, and for the device
// plugin to provide runtime settings to use the device (environment
// variables, mount points and device files).
Allocate(pod *v1.Pod) error

// UpdatePluginResources updates node resources based on devices already
// allocated to pods. The node object is provided for the device manager to
// update the node capacity to reflect the currently available devices.
UpdatePluginResources(
    node *schedulernodeinfo.NodeInfo,
    attrs *lifecycle.PodAdmitAttributes) error

As we move to a model in which the TopologyManager is able to ensure
aligned allocations from the CPUManager, devicemanger, and any
other TopologManager HintProviders in the same synchronous loop, we will
need to be able to call Allocate() independently from an
UpdatePluginResources(). This commit makes that possible.
This commit is contained in:
Kevin Klues 2020-02-02 15:16:18 +00:00 committed by nolancon
parent acd97b42f3
commit a3f099ea4d
5 changed files with 39 additions and 32 deletions

View File

@ -672,7 +672,11 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
} }
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return cm.deviceManager.Allocate(node, attrs) err := cm.deviceManager.Allocate(attrs.Pod)
if err != nil {
return err
}
return cm.deviceManager.UpdatePluginResources(node, attrs)
} }
func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager { func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {

View File

@ -369,13 +369,17 @@ func (m *ManagerImpl) allocatePodResources(pod *v1.Pod) error {
// Allocate is the call that you can use to allocate a set of devices // Allocate is the call that you can use to allocate a set of devices
// from the registered device plugins. // from the registered device plugins.
func (m *ManagerImpl) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { func (m *ManagerImpl) Allocate(pod *v1.Pod) error {
pod := attrs.Pod
err := m.allocatePodResources(pod) err := m.allocatePodResources(pod)
if err != nil { if err != nil {
klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err) klog.Errorf("Failed to allocate device plugin resource for pod %s: %v", string(pod.UID), err)
return err return err
} }
return nil
}
func (m *ManagerImpl) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
pod := attrs.Pod
m.mutex.Lock() m.mutex.Lock()
defer m.mutex.Unlock() defer m.mutex.Unlock()

View File

@ -45,7 +45,12 @@ func (h *ManagerStub) Stop() error {
} }
// Allocate simply returns nil. // Allocate simply returns nil.
func (h *ManagerStub) Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { func (h *ManagerStub) Allocate(pod *v1.Pod) error {
return nil
}
// UpdatePluginResources simply returns nil.
func (h *ManagerStub) UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return nil return nil
} }

View File

@ -30,6 +30,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -648,17 +649,6 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
return testManager, nil return testManager, nil
} }
func getTestNodeInfo(allocatable v1.ResourceList) *schedulernodeinfo.NodeInfo {
cachedNode := &v1.Node{
Status: v1.NodeStatus{
Allocatable: allocatable,
},
}
nodeInfo := &schedulernodeinfo.NodeInfo{}
nodeInfo.SetNode(cachedNode)
return nodeInfo
}
type TestResource struct { type TestResource struct {
resourceName string resourceName string
resourceQuantity resource.Quantity resourceQuantity resource.Quantity
@ -686,7 +676,6 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "checkpoint") tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err) as.Nil(err)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources) testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
as.Nil(err) as.Nil(err)
@ -738,7 +727,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
pod := testCase.testPod pod := testCase.testPod
activePods = append(activePods, pod) activePods = append(activePods, pod)
podsStub.updateActivePods(activePods) podsStub.updateActivePods(activePods)
err := testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) err := testManager.Allocate(pod)
if !reflect.DeepEqual(err, testCase.expErr) { if !reflect.DeepEqual(err, testCase.expErr) {
t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v", t.Errorf("DevicePluginManager error (%v). expected error: %v but got: %v",
testCase.description, testCase.expErr, err) testCase.description, testCase.expErr, err)
@ -780,7 +769,6 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
podsStub := activePodsStub{ podsStub := activePodsStub{
activePods: []*v1.Pod{}, activePods: []*v1.Pod{},
} }
nodeInfo := getTestNodeInfo(v1.ResourceList{})
tmpDir, err := ioutil.TempDir("", "checkpoint") tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err) as.Nil(err)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
@ -834,7 +822,7 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
}, },
} }
podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers}) podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers})
err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: podWithPluginResourcesInInitContainers}) err = testManager.Allocate(podWithPluginResourcesInInitContainers)
as.Nil(err) as.Nil(err)
podUID := string(podWithPluginResourcesInInitContainers.UID) podUID := string(podWithPluginResourcesInInitContainers.UID)
initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name
@ -855,7 +843,10 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len()) as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len())
} }
func TestSanitizeNodeAllocatable(t *testing.T) { func TestUpdatePluginResources(t *testing.T) {
pod := &v1.Pod{}
pod.UID = types.UID("testPod")
resourceName1 := "domain1.com/resource1" resourceName1 := "domain1.com/resource1"
devID1 := "dev1" devID1 := "dev1"
@ -876,6 +867,8 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
podDevices: make(podDevices), podDevices: make(podDevices),
checkpointManager: ckm, checkpointManager: ckm,
} }
testManager.podDevices[string(pod.UID)] = make(containerDevices)
// require one of resource1 and one of resource2 // require one of resource1 and one of resource2
testManager.allocatedDevices[resourceName1] = sets.NewString() testManager.allocatedDevices[resourceName1] = sets.NewString()
testManager.allocatedDevices[resourceName1].Insert(devID1) testManager.allocatedDevices[resourceName1].Insert(devID1)
@ -893,7 +886,7 @@ func TestSanitizeNodeAllocatable(t *testing.T) {
nodeInfo := &schedulernodeinfo.NodeInfo{} nodeInfo := &schedulernodeinfo.NodeInfo{}
nodeInfo.SetNode(cachedNode) nodeInfo.SetNode(cachedNode)
testManager.sanitizeNodeAllocatable(nodeInfo) testManager.UpdatePluginResources(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources
// allocatable in nodeInfo is less than needed, should update // allocatable in nodeInfo is less than needed, should update
@ -918,7 +911,6 @@ func TestDevicePreStartContainer(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "checkpoint") tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err) as.Nil(err)
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}) testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1})
as.Nil(err) as.Nil(err)
@ -936,7 +928,7 @@ func TestDevicePreStartContainer(t *testing.T) {
activePods := []*v1.Pod{} activePods := []*v1.Pod{}
activePods = append(activePods, pod) activePods = append(activePods, pod)
podsStub.updateActivePods(activePods) podsStub.updateActivePods(activePods)
err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod}) err = testManager.Allocate(pod)
as.Nil(err) as.Nil(err)
runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0]) runContainerOpts, err := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
as.Nil(err) as.Nil(err)

View File

@ -34,15 +34,17 @@ type Manager interface {
// Start starts device plugin registration service. // Start starts device plugin registration service.
Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error
// Allocate configures and assigns devices to pods. The pods are provided // Allocate configures and assigns devices to a pod. From the requested
// through the pod admission attributes in the attrs argument. From the // device resources, Allocate will communicate with the owning device
// requested device resources, Allocate will communicate with the owning // plugin to allow setup procedures to take place, and for the device
// device plugin to allow setup procedures to take place, and for the // plugin to provide runtime settings to use the device (environment
// device plugin to provide runtime settings to use the device (environment // variables, mount points and device files).
// variables, mount points and device files). The node object is provided Allocate(pod *v1.Pod) error
// for the device manager to update the node capacity to reflect the
// currently available devices. // UpdatePluginResources updates node resources based on devices already
Allocate(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error // allocated to pods. The node object is provided for the device manager to
// update the node capacity to reflect the currently available devices.
UpdatePluginResources(node *schedulernodeinfo.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
// Stop stops the manager. // Stop stops the manager.
Stop() error Stop() error