From 62326a1846eb18dae2a378648e6d3462b86344dd Mon Sep 17 00:00:00 2001 From: Alexey Perevalov Date: Sun, 25 Oct 2020 23:59:49 +0300 Subject: [PATCH] Convert podDevices to struct PodDevices will have its own guard Signed-off-by: Alexey Perevalov --- pkg/kubelet/cm/devicemanager/manager.go | 20 ++-- pkg/kubelet/cm/devicemanager/manager_test.go | 16 +-- pkg/kubelet/cm/devicemanager/pod_devices.go | 105 +++++++++++++----- .../cm/devicemanager/topology_hints_test.go | 6 +- 4 files changed, 94 insertions(+), 53 deletions(-) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 59ecf7f2a60..c374e1a7c7a 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -96,7 +96,7 @@ type ManagerImpl struct { allocatedDevices map[string]sets.String // podDevices contains pod to allocated device mapping. - podDevices podDevices + podDevices *podDevices checkpointManager checkpointmanager.CheckpointManager // List of NUMA Nodes available on the underlying machine @@ -150,7 +150,7 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi healthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), + podDevices: newPodDevices(), numaNodes: numaNodes, topologyAffinityStore: topologyAffinityStore, devicesToReuse: make(PodReusableDevices), @@ -393,11 +393,8 @@ func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error { func (m *ManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { pod := attrs.Pod - m.mutex.Lock() - defer m.mutex.Unlock() - // quick return if no pluginResources requested - if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource { + if !m.podDevices.hasPod(string(pod.UID)) { return nil } @@ -904,9 +901,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont } // Update internal cached podDevices state. - m.mutex.Lock() m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0]) - m.mutex.Unlock() } if needsUpdateCheckpoint { @@ -945,8 +940,6 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co return nil, err } } - m.mutex.Lock() - defer m.mutex.Unlock() return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil } @@ -1019,6 +1012,9 @@ func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulerframework.NodeInfo) if allocatableResource.ScalarResources == nil { allocatableResource.ScalarResources = make(map[v1.ResourceName]int64) } + + m.mutex.Lock() + defer m.mutex.Unlock() for resource, devices := range m.allocatedDevices { needed := devices.Len() quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)] @@ -1038,6 +1034,8 @@ func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulerframework.NodeInfo) } func (m *ManagerImpl) isDevicePluginResource(resource string) bool { + m.mutex.Lock() + defer m.mutex.Unlock() _, registeredResource := m.healthyDevices[resource] _, allocatedResource := m.allocatedDevices[resource] // Return true if this is either an active device plugin resource or @@ -1050,8 +1048,6 @@ func (m *ManagerImpl) isDevicePluginResource(resource string) bool { // GetDevices returns the devices used by the specified container func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices { - m.mutex.Lock() - defer m.mutex.Unlock() return m.podDevices.getContainerDevices(podUID, containerName) } diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 6b40f1f9ef1..ca45ebc87e8 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -477,7 +477,7 @@ func TestCheckpoint(t *testing.T) { healthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), + podDevices: newPodDevices(), checkpointManager: ckm, } @@ -516,12 +516,12 @@ func TestCheckpoint(t *testing.T) { err = testManager.writeCheckpoint() as.Nil(err) - testManager.podDevices = make(podDevices) + testManager.podDevices = newPodDevices() err = testManager.readCheckpoint() as.Nil(err) - as.Equal(len(expectedPodDevices), len(testManager.podDevices)) - for podUID, containerDevices := range expectedPodDevices { + as.Equal(expectedPodDevices.size(), testManager.podDevices.size()) + for podUID, containerDevices := range expectedPodDevices.devs { for conName, resources := range containerDevices { for resource := range resources { expDevices := expectedPodDevices.containerDevices(podUID, conName, resource) @@ -615,7 +615,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), endpoints: make(map[string]endpointInfo), - podDevices: make(podDevices), + podDevices: newPodDevices(), devicesToReuse: make(PodReusableDevices), topologyAffinityStore: topologymanager.NewFakeManager(), activePods: activePods, @@ -882,10 +882,10 @@ func TestUpdatePluginResources(t *testing.T) { callback: monitorCallback, allocatedDevices: make(map[string]sets.String), healthyDevices: make(map[string]sets.String), - podDevices: make(podDevices), + podDevices: newPodDevices(), checkpointManager: ckm, } - testManager.podDevices[string(pod.UID)] = make(containerDevices) + testManager.podDevices.devs[string(pod.UID)] = make(containerDevices) // require one of resource1 and one of resource2 testManager.allocatedDevices[resourceName1] = sets.NewString() @@ -983,7 +983,7 @@ func TestResetExtendedResource(t *testing.T) { healthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), + podDevices: newPodDevices(), checkpointManager: ckm, } diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index f8625c8a06c..d3ffab80da4 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -17,6 +17,8 @@ limitations under the License. package devicemanager import ( + "sync" + "k8s.io/klog/v2" "k8s.io/apimachinery/pkg/util/sets" @@ -35,45 +37,74 @@ type deviceAllocateInfo struct { type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName. type containerDevices map[string]resourceAllocateInfo // Keyed by containerName. -type podDevices map[string]containerDevices // Keyed by podUID. +type podDevices struct { + sync.RWMutex + devs map[string]containerDevices // Keyed by podUID. +} -func (pdev podDevices) pods() sets.String { +// NewPodDevices is a function that returns object of podDevices type with its own guard +// RWMutex and a map where key is a pod UID and value contains +// container devices information of type containerDevices. +func newPodDevices() *podDevices { + return &podDevices{devs: make(map[string]containerDevices)} +} + +func (pdev *podDevices) pods() sets.String { + pdev.RLock() + defer pdev.RUnlock() ret := sets.NewString() - for k := range pdev { + for k := range pdev.devs { ret.Insert(k) } return ret } -func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) { - if _, podExists := pdev[podUID]; !podExists { - pdev[podUID] = make(containerDevices) +func (pdev *podDevices) size() int { + pdev.RLock() + defer pdev.RUnlock() + return len(pdev.devs) +} + +func (pdev *podDevices) hasPod(podUID string) bool { + _, podExists := pdev.devs[podUID] + return podExists +} + +func (pdev *podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) { + pdev.Lock() + defer pdev.Unlock() + if _, podExists := pdev.devs[podUID]; !podExists { + pdev.devs[podUID] = make(containerDevices) } - if _, contExists := pdev[podUID][contName]; !contExists { - pdev[podUID][contName] = make(resourceAllocateInfo) + if _, contExists := pdev.devs[podUID][contName]; !contExists { + pdev.devs[podUID][contName] = make(resourceAllocateInfo) } - pdev[podUID][contName][resource] = deviceAllocateInfo{ + pdev.devs[podUID][contName][resource] = deviceAllocateInfo{ deviceIds: devices, allocResp: resp, } } -func (pdev podDevices) delete(pods []string) { +func (pdev *podDevices) delete(pods []string) { + pdev.Lock() + defer pdev.Unlock() for _, uid := range pods { - delete(pdev, uid) + delete(pdev.devs, uid) } } // Returns list of device Ids allocated to the given container for the given resource. // Returns nil if we don't have cached state for the given . -func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.String { - if _, podExists := pdev[podUID]; !podExists { +func (pdev *podDevices) containerDevices(podUID, contName, resource string) sets.String { + pdev.RLock() + defer pdev.RUnlock() + if _, podExists := pdev.devs[podUID]; !podExists { return nil } - if _, contExists := pdev[podUID][contName]; !contExists { + if _, contExists := pdev.devs[podUID][contName]; !contExists { return nil } - devs, resourceExists := pdev[podUID][contName][resource] + devs, resourceExists := pdev.devs[podUID][contName][resource] if !resourceExists { return nil } @@ -81,8 +112,10 @@ func (pdev podDevices) containerDevices(podUID, contName, resource string) sets. } // Populates allocatedResources with the device resources allocated to the specified . -func (pdev podDevices) addContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) { - containers, exists := pdev[podUID] +func (pdev *podDevices) addContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) { + pdev.RLock() + defer pdev.RUnlock() + containers, exists := pdev.devs[podUID] if !exists { return } @@ -96,8 +129,10 @@ func (pdev podDevices) addContainerAllocatedResources(podUID, contName string, a } // Removes the device resources allocated to the specified from allocatedResources. -func (pdev podDevices) removeContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) { - containers, exists := pdev[podUID] +func (pdev *podDevices) removeContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) { + pdev.RLock() + defer pdev.RUnlock() + containers, exists := pdev.devs[podUID] if !exists { return } @@ -111,9 +146,11 @@ func (pdev podDevices) removeContainerAllocatedResources(podUID, contName string } // Returns all of devices allocated to the pods being tracked, keyed by resourceName. -func (pdev podDevices) devices() map[string]sets.String { +func (pdev *podDevices) devices() map[string]sets.String { ret := make(map[string]sets.String) - for _, containerDevices := range pdev { + pdev.RLock() + defer pdev.RUnlock() + for _, containerDevices := range pdev.devs { for _, resources := range containerDevices { for resource, devices := range resources { if _, exists := ret[resource]; !exists { @@ -129,9 +166,11 @@ func (pdev podDevices) devices() map[string]sets.String { } // Turns podDevices to checkpointData. -func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry { +func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry { var data []checkpoint.PodDevicesEntry - for podUID, containerDevices := range pdev { + pdev.RLock() + defer pdev.RUnlock() + for podUID, containerDevices := range pdev.devs { for conName, resources := range containerDevices { for resource, devices := range resources { devIds := devices.deviceIds.UnsortedList() @@ -158,7 +197,7 @@ func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry { } // Populates podDevices from the passed in checkpointData. -func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) { +func (pdev *podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) { for _, entry := range data { klog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n", entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp) @@ -177,8 +216,11 @@ func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) { } // Returns combined container runtime settings to consume the container's allocated devices. -func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions { - containers, exists := pdev[podUID] +func (pdev *podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions { + pdev.RLock() + defer pdev.RUnlock() + + containers, exists := pdev.devs[podUID] if !exists { return nil } @@ -274,15 +316,18 @@ func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *Devic } // getContainerDevices returns the devices assigned to the provided container for all ResourceNames -func (pdev podDevices) getContainerDevices(podUID, contName string) []*podresourcesapi.ContainerDevices { - if _, podExists := pdev[podUID]; !podExists { +func (pdev *podDevices) getContainerDevices(podUID, contName string) []*podresourcesapi.ContainerDevices { + pdev.RLock() + defer pdev.RUnlock() + + if _, podExists := pdev.devs[podUID]; !podExists { return nil } - if _, contExists := pdev[podUID][contName]; !contExists { + if _, contExists := pdev.devs[podUID][contName]; !contExists { return nil } cDev := []*podresourcesapi.ContainerDevices{} - for resource, allocateInfo := range pdev[podUID][contName] { + for resource, allocateInfo := range pdev.devs[podUID][contName] { cDev = append(cDev, &podresourcesapi.ContainerDevices{ ResourceName: resource, DeviceIds: allocateInfo.deviceIds.UnsortedList(), diff --git a/pkg/kubelet/cm/devicemanager/topology_hints_test.go b/pkg/kubelet/cm/devicemanager/topology_hints_test.go index 3cd072183e8..5f2db322953 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints_test.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints_test.go @@ -385,7 +385,7 @@ func TestGetTopologyHints(t *testing.T) { allDevices: make(map[string]map[string]pluginapi.Device), healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), + podDevices: newPodDevices(), sourcesReady: &sourcesReadyStub{}, activePods: func() []*v1.Pod { return []*v1.Pod{pod} }, numaNodes: []int{0, 1}, @@ -739,7 +739,7 @@ func TestTopologyAlignedAllocation(t *testing.T) { healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), endpoints: make(map[string]endpointInfo), - podDevices: make(podDevices), + podDevices: newPodDevices(), sourcesReady: &sourcesReadyStub{}, activePods: func() []*v1.Pod { return []*v1.Pod{} }, topologyAffinityStore: &mockAffinityStore{tc.hint}, @@ -928,7 +928,7 @@ func TestGetPreferredAllocationParameters(t *testing.T) { healthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), endpoints: make(map[string]endpointInfo), - podDevices: make(podDevices), + podDevices: newPodDevices(), sourcesReady: &sourcesReadyStub{}, activePods: func() []*v1.Pod { return []*v1.Pod{} }, topologyAffinityStore: &mockAffinityStore{tc.hint},