Convert podDevices to struct

PodDevices will have its own guard

Signed-off-by: Alexey Perevalov <alexey.perevalov@huawei.com>
This commit is contained in:
Alexey Perevalov 2020-10-25 23:59:49 +03:00
parent db0a515be0
commit 62326a1846
4 changed files with 94 additions and 53 deletions

View File

@ -96,7 +96,7 @@ type ManagerImpl struct {
allocatedDevices map[string]sets.String
// podDevices contains pod to allocated device mapping.
podDevices podDevices
podDevices *podDevices
checkpointManager checkpointmanager.CheckpointManager
// List of NUMA Nodes available on the underlying machine
@ -150,7 +150,7 @@ func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffi
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
podDevices: newPodDevices(),
numaNodes: numaNodes,
topologyAffinityStore: topologyAffinityStore,
devicesToReuse: make(PodReusableDevices),
@ -393,11 +393,8 @@ func (m *ManagerImpl) Allocate(pod *v1.Pod, container *v1.Container) error {
func (m *ManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
pod := attrs.Pod
m.mutex.Lock()
defer m.mutex.Unlock()
// quick return if no pluginResources requested
if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
if !m.podDevices.hasPod(string(pod.UID)) {
return nil
}
@ -904,9 +901,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
}
// Update internal cached podDevices state.
m.mutex.Lock()
m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
m.mutex.Unlock()
}
if needsUpdateCheckpoint {
@ -945,8 +940,6 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
return nil, err
}
}
m.mutex.Lock()
defer m.mutex.Unlock()
return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
}
@ -1019,6 +1012,9 @@ func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulerframework.NodeInfo)
if allocatableResource.ScalarResources == nil {
allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
}
m.mutex.Lock()
defer m.mutex.Unlock()
for resource, devices := range m.allocatedDevices {
needed := devices.Len()
quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
@ -1038,6 +1034,8 @@ func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulerframework.NodeInfo)
}
func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
m.mutex.Lock()
defer m.mutex.Unlock()
_, registeredResource := m.healthyDevices[resource]
_, allocatedResource := m.allocatedDevices[resource]
// Return true if this is either an active device plugin resource or
@ -1050,8 +1048,6 @@ func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
// GetDevices returns the devices used by the specified container
func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.podDevices.getContainerDevices(podUID, containerName)
}

View File

@ -477,7 +477,7 @@ func TestCheckpoint(t *testing.T) {
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
podDevices: newPodDevices(),
checkpointManager: ckm,
}
@ -516,12 +516,12 @@ func TestCheckpoint(t *testing.T) {
err = testManager.writeCheckpoint()
as.Nil(err)
testManager.podDevices = make(podDevices)
testManager.podDevices = newPodDevices()
err = testManager.readCheckpoint()
as.Nil(err)
as.Equal(len(expectedPodDevices), len(testManager.podDevices))
for podUID, containerDevices := range expectedPodDevices {
as.Equal(expectedPodDevices.size(), testManager.podDevices.size())
for podUID, containerDevices := range expectedPodDevices.devs {
for conName, resources := range containerDevices {
for resource := range resources {
expDevices := expectedPodDevices.containerDevices(podUID, conName, resource)
@ -615,7 +615,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpointInfo),
podDevices: make(podDevices),
podDevices: newPodDevices(),
devicesToReuse: make(PodReusableDevices),
topologyAffinityStore: topologymanager.NewFakeManager(),
activePods: activePods,
@ -882,10 +882,10 @@ func TestUpdatePluginResources(t *testing.T) {
callback: monitorCallback,
allocatedDevices: make(map[string]sets.String),
healthyDevices: make(map[string]sets.String),
podDevices: make(podDevices),
podDevices: newPodDevices(),
checkpointManager: ckm,
}
testManager.podDevices[string(pod.UID)] = make(containerDevices)
testManager.podDevices.devs[string(pod.UID)] = make(containerDevices)
// require one of resource1 and one of resource2
testManager.allocatedDevices[resourceName1] = sets.NewString()
@ -983,7 +983,7 @@ func TestResetExtendedResource(t *testing.T) {
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
podDevices: newPodDevices(),
checkpointManager: ckm,
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package devicemanager
import (
"sync"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/sets"
@ -35,45 +37,74 @@ type deviceAllocateInfo struct {
type resourceAllocateInfo map[string]deviceAllocateInfo // Keyed by resourceName.
type containerDevices map[string]resourceAllocateInfo // Keyed by containerName.
type podDevices map[string]containerDevices // Keyed by podUID.
type podDevices struct {
sync.RWMutex
devs map[string]containerDevices // Keyed by podUID.
}
func (pdev podDevices) pods() sets.String {
// NewPodDevices is a function that returns object of podDevices type with its own guard
// RWMutex and a map where key is a pod UID and value contains
// container devices information of type containerDevices.
func newPodDevices() *podDevices {
return &podDevices{devs: make(map[string]containerDevices)}
}
func (pdev *podDevices) pods() sets.String {
pdev.RLock()
defer pdev.RUnlock()
ret := sets.NewString()
for k := range pdev {
for k := range pdev.devs {
ret.Insert(k)
}
return ret
}
func (pdev podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) {
if _, podExists := pdev[podUID]; !podExists {
pdev[podUID] = make(containerDevices)
func (pdev *podDevices) size() int {
pdev.RLock()
defer pdev.RUnlock()
return len(pdev.devs)
}
func (pdev *podDevices) hasPod(podUID string) bool {
_, podExists := pdev.devs[podUID]
return podExists
}
func (pdev *podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) {
pdev.Lock()
defer pdev.Unlock()
if _, podExists := pdev.devs[podUID]; !podExists {
pdev.devs[podUID] = make(containerDevices)
}
if _, contExists := pdev[podUID][contName]; !contExists {
pdev[podUID][contName] = make(resourceAllocateInfo)
if _, contExists := pdev.devs[podUID][contName]; !contExists {
pdev.devs[podUID][contName] = make(resourceAllocateInfo)
}
pdev[podUID][contName][resource] = deviceAllocateInfo{
pdev.devs[podUID][contName][resource] = deviceAllocateInfo{
deviceIds: devices,
allocResp: resp,
}
}
func (pdev podDevices) delete(pods []string) {
func (pdev *podDevices) delete(pods []string) {
pdev.Lock()
defer pdev.Unlock()
for _, uid := range pods {
delete(pdev, uid)
delete(pdev.devs, uid)
}
}
// Returns list of device Ids allocated to the given container for the given resource.
// Returns nil if we don't have cached state for the given <podUID, contName, resource>.
func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.String {
if _, podExists := pdev[podUID]; !podExists {
func (pdev *podDevices) containerDevices(podUID, contName, resource string) sets.String {
pdev.RLock()
defer pdev.RUnlock()
if _, podExists := pdev.devs[podUID]; !podExists {
return nil
}
if _, contExists := pdev[podUID][contName]; !contExists {
if _, contExists := pdev.devs[podUID][contName]; !contExists {
return nil
}
devs, resourceExists := pdev[podUID][contName][resource]
devs, resourceExists := pdev.devs[podUID][contName][resource]
if !resourceExists {
return nil
}
@ -81,8 +112,10 @@ func (pdev podDevices) containerDevices(podUID, contName, resource string) sets.
}
// Populates allocatedResources with the device resources allocated to the specified <podUID, contName>.
func (pdev podDevices) addContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) {
containers, exists := pdev[podUID]
func (pdev *podDevices) addContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) {
pdev.RLock()
defer pdev.RUnlock()
containers, exists := pdev.devs[podUID]
if !exists {
return
}
@ -96,8 +129,10 @@ func (pdev podDevices) addContainerAllocatedResources(podUID, contName string, a
}
// Removes the device resources allocated to the specified <podUID, contName> from allocatedResources.
func (pdev podDevices) removeContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) {
containers, exists := pdev[podUID]
func (pdev *podDevices) removeContainerAllocatedResources(podUID, contName string, allocatedResources map[string]sets.String) {
pdev.RLock()
defer pdev.RUnlock()
containers, exists := pdev.devs[podUID]
if !exists {
return
}
@ -111,9 +146,11 @@ func (pdev podDevices) removeContainerAllocatedResources(podUID, contName string
}
// Returns all of devices allocated to the pods being tracked, keyed by resourceName.
func (pdev podDevices) devices() map[string]sets.String {
func (pdev *podDevices) devices() map[string]sets.String {
ret := make(map[string]sets.String)
for _, containerDevices := range pdev {
pdev.RLock()
defer pdev.RUnlock()
for _, containerDevices := range pdev.devs {
for _, resources := range containerDevices {
for resource, devices := range resources {
if _, exists := ret[resource]; !exists {
@ -129,9 +166,11 @@ func (pdev podDevices) devices() map[string]sets.String {
}
// Turns podDevices to checkpointData.
func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
var data []checkpoint.PodDevicesEntry
for podUID, containerDevices := range pdev {
pdev.RLock()
defer pdev.RUnlock()
for podUID, containerDevices := range pdev.devs {
for conName, resources := range containerDevices {
for resource, devices := range resources {
devIds := devices.deviceIds.UnsortedList()
@ -158,7 +197,7 @@ func (pdev podDevices) toCheckpointData() []checkpoint.PodDevicesEntry {
}
// Populates podDevices from the passed in checkpointData.
func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
func (pdev *podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
for _, entry := range data {
klog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n",
entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp)
@ -177,8 +216,11 @@ func (pdev podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) {
}
// Returns combined container runtime settings to consume the container's allocated devices.
func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions {
containers, exists := pdev[podUID]
func (pdev *podDevices) deviceRunContainerOptions(podUID, contName string) *DeviceRunContainerOptions {
pdev.RLock()
defer pdev.RUnlock()
containers, exists := pdev.devs[podUID]
if !exists {
return nil
}
@ -274,15 +316,18 @@ func (pdev podDevices) deviceRunContainerOptions(podUID, contName string) *Devic
}
// getContainerDevices returns the devices assigned to the provided container for all ResourceNames
func (pdev podDevices) getContainerDevices(podUID, contName string) []*podresourcesapi.ContainerDevices {
if _, podExists := pdev[podUID]; !podExists {
func (pdev *podDevices) getContainerDevices(podUID, contName string) []*podresourcesapi.ContainerDevices {
pdev.RLock()
defer pdev.RUnlock()
if _, podExists := pdev.devs[podUID]; !podExists {
return nil
}
if _, contExists := pdev[podUID][contName]; !contExists {
if _, contExists := pdev.devs[podUID][contName]; !contExists {
return nil
}
cDev := []*podresourcesapi.ContainerDevices{}
for resource, allocateInfo := range pdev[podUID][contName] {
for resource, allocateInfo := range pdev.devs[podUID][contName] {
cDev = append(cDev, &podresourcesapi.ContainerDevices{
ResourceName: resource,
DeviceIds: allocateInfo.deviceIds.UnsortedList(),

View File

@ -385,7 +385,7 @@ func TestGetTopologyHints(t *testing.T) {
allDevices: make(map[string]map[string]pluginapi.Device),
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
podDevices: newPodDevices(),
sourcesReady: &sourcesReadyStub{},
activePods: func() []*v1.Pod { return []*v1.Pod{pod} },
numaNodes: []int{0, 1},
@ -739,7 +739,7 @@ func TestTopologyAlignedAllocation(t *testing.T) {
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpointInfo),
podDevices: make(podDevices),
podDevices: newPodDevices(),
sourcesReady: &sourcesReadyStub{},
activePods: func() []*v1.Pod { return []*v1.Pod{} },
topologyAffinityStore: &mockAffinityStore{tc.hint},
@ -928,7 +928,7 @@ func TestGetPreferredAllocationParameters(t *testing.T) {
healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpointInfo),
podDevices: make(podDevices),
podDevices: newPodDevices(),
sourcesReady: &sourcesReadyStub{},
activePods: func() []*v1.Pod { return []*v1.Pod{} },
topologyAffinityStore: &mockAffinityStore{tc.hint},