diff --git a/pkg/kubelet/cm/deviceplugin/manager.go b/pkg/kubelet/cm/deviceplugin/manager.go index 6535479fe43..db28e36d5d6 100644 --- a/pkg/kubelet/cm/deviceplugin/manager.go +++ b/pkg/kubelet/cm/deviceplugin/manager.go @@ -240,16 +240,19 @@ func (m *ManagerImpl) Devices() map[string][]pluginapi.Device { // from the registered device plugins. func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { pod := attrs.Pod + devicesToReuse := make(map[string]sets.String) // TODO: Reuse devices between init containers and regular containers. for _, container := range pod.Spec.InitContainers { - if err := m.allocateContainerResources(pod, &container); err != nil { + if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil { return err } + m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse) } for _, container := range pod.Spec.Containers { - if err := m.allocateContainerResources(pod, &container); err != nil { + if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil { return err } + m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse) } m.mutex.Lock() @@ -471,7 +474,7 @@ func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) { // Returns list of device Ids we need to allocate with Allocate rpc call. // Returns empty list in case we don't need to issue the Allocate rpc call. -func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int) (sets.String, error) { +func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) { m.mutex.Lock() defer m.mutex.Unlock() needed := required @@ -497,6 +500,14 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi return nil, fmt.Errorf("can't allocate unregistered device %v", resource) } devices = sets.NewString() + // Allocates from reusableDevices list first. + for device := range reusableDevices { + devices.Insert(device) + needed-- + if needed == 0 { + return devices, nil + } + } // Needs to allocate additional devices. if m.allocatedDevices[resource] == nil { m.allocatedDevices[resource] = sets.NewString() @@ -523,7 +534,7 @@ func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, requi // plugin resources for the input container, issues an Allocate rpc request // for each new device resource requirement, processes their AllocateResponses, // and updates the cached containerDevices on success. -func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container) error { +func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error { podUID := string(pod.UID) contName := container.Name allocatedDevicesUpdated := false @@ -544,7 +555,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont m.updateAllocatedDevices(m.activePods()) allocatedDevicesUpdated = true } - allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed) + allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource]) if err != nil { return err } diff --git a/pkg/kubelet/cm/deviceplugin/manager_test.go b/pkg/kubelet/cm/deviceplugin/manager_test.go index 9a74ec93b50..7cd9a1aa9f1 100644 --- a/pkg/kubelet/cm/deviceplugin/manager_test.go +++ b/pkg/kubelet/cm/deviceplugin/manager_test.go @@ -539,6 +539,70 @@ func TestPodContainerDeviceAllocation(t *testing.T) { as.Nil(err) runContainerOpts3 := testManager.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0]) as.Equal(1, len(runContainerOpts3.Envs)) + + // Requesting to create a pod that requests resourceName1 in init containers and normal containers + // should succeed with devices allocated to init containers reallocated to normal containers. + podWithPluginResourcesInInitContainers := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), + }, + Spec: v1.PodSpec{ + InitContainers: []v1.Container{ + { + Name: string(uuid.NewUUID()), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(resourceName1): resourceQuantity2, + }, + }, + }, + { + Name: string(uuid.NewUUID()), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(resourceName1): resourceQuantity1, + }, + }, + }, + }, + Containers: []v1.Container{ + { + Name: string(uuid.NewUUID()), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(resourceName1): resourceQuantity2, + v1.ResourceName(resourceName2): resourceQuantity2, + }, + }, + }, + { + Name: string(uuid.NewUUID()), + Resources: v1.ResourceRequirements{ + Limits: v1.ResourceList{ + v1.ResourceName(resourceName1): resourceQuantity2, + v1.ResourceName(resourceName2): resourceQuantity2, + }, + }, + }, + }, + }, + } + podsStub.updateActivePods([]*v1.Pod{podWithPluginResourcesInInitContainers}) + err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: podWithPluginResourcesInInitContainers}) + as.Nil(err) + podUID := string(podWithPluginResourcesInInitContainers.UID) + initCont1 := podWithPluginResourcesInInitContainers.Spec.InitContainers[0].Name + initCont2 := podWithPluginResourcesInInitContainers.Spec.InitContainers[1].Name + normalCont1 := podWithPluginResourcesInInitContainers.Spec.Containers[0].Name + normalCont2 := podWithPluginResourcesInInitContainers.Spec.Containers[1].Name + initCont1Devices := testManager.podDevices.containerDevices(podUID, initCont1, resourceName1) + initCont2Devices := testManager.podDevices.containerDevices(podUID, initCont2, resourceName1) + normalCont1Devices := testManager.podDevices.containerDevices(podUID, normalCont1, resourceName1) + normalCont2Devices := testManager.podDevices.containerDevices(podUID, normalCont2, resourceName1) + as.True(initCont2Devices.IsSuperset(initCont1Devices)) + as.True(initCont2Devices.IsSuperset(normalCont1Devices)) + as.True(initCont2Devices.IsSuperset(normalCont2Devices)) + as.Equal(0, normalCont1Devices.Intersection(normalCont2Devices).Len()) } func TestSanitizeNodeAllocatable(t *testing.T) { diff --git a/pkg/kubelet/cm/deviceplugin/pod_devices.go b/pkg/kubelet/cm/deviceplugin/pod_devices.go index 495a5729879..311c8d0c60f 100644 --- a/pkg/kubelet/cm/deviceplugin/pod_devices.go +++ b/pkg/kubelet/cm/deviceplugin/pod_devices.go @@ -78,6 +78,36 @@ func (pdev podDevices) containerDevices(podUID, contName, resource string) sets. return devs.deviceIds } +// Populates allocatedResources with the device resources allocated to the specified . +func (pdev podDevices) addContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) { + containers, exists := pdev[podUID] + if !exists { + return + } + resources, exists := containers[contName] + if !exists { + return + } + for resource, devices := range resources { + allocatedResources[resource] = allocatedResources[resource].Union(devices.deviceIds) + } +} + +// Removes the device resources allocated to the specified from allocatedResources. +func (pdev podDevices) removeContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) { + containers, exists := pdev[podUID] + if !exists { + return + } + resources, exists := containers[contName] + if !exists { + return + } + for resource, devices := range resources { + allocatedResources[resource] = allocatedResources[resource].Difference(devices.deviceIds) + } +} + // Returns all of devices allocated to the pods being tracked, keyed by resourceName. func (pdev podDevices) devices() map[string]sets.String { ret := make(map[string]sets.String)