diff --git a/pkg/kubelet/apis/podresources/server_v1.go b/pkg/kubelet/apis/podresources/server_v1.go index fe4666a969c..8482f51a19d 100644 --- a/pkg/kubelet/apis/podresources/server_v1.go +++ b/pkg/kubelet/apis/podresources/server_v1.go @@ -28,14 +28,16 @@ import ( type v1PodResourcesServer struct { podsProvider PodsProvider devicesProvider DevicesProvider + cpusProvider CPUsProvider } // NewV1PodResourcesServer returns a PodResourcesListerServer which lists pods provided by the PodsProvider // with device information provided by the DevicesProvider -func NewV1PodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider) v1.PodResourcesListerServer { +func NewV1PodResourcesServer(podsProvider PodsProvider, devicesProvider DevicesProvider, cpusProvider CPUsProvider) v1.PodResourcesListerServer { return &v1PodResourcesServer{ podsProvider: podsProvider, devicesProvider: devicesProvider, + cpusProvider: cpusProvider, } } @@ -58,6 +60,7 @@ func (p *v1PodResourcesServer) List(ctx context.Context, req *v1.ListPodResource pRes.Containers[j] = &v1.ContainerResources{ Name: container.Name, Devices: p.devicesProvider.GetDevices(string(pod.UID), container.Name), + CpuIds: p.cpusProvider.GetCPUs(string(pod.UID), container.Name), } } podResources[i] = &pRes diff --git a/pkg/kubelet/apis/podresources/server_v1_test.go b/pkg/kubelet/apis/podresources/server_v1_test.go index 40ace68f351..c25912ee3ba 100644 --- a/pkg/kubelet/apis/podresources/server_v1_test.go +++ b/pkg/kubelet/apis/podresources/server_v1_test.go @@ -31,24 +31,30 @@ func TestListPodResourcesV1(t *testing.T) { podNamespace := "pod-namespace" podUID := types.UID("pod-uid") containerName := "container-name" + numaID := int64(1) devs := []*podresourcesapi.ContainerDevices{ { ResourceName: "resource", DeviceIds: []string{"dev0", "dev1"}, + Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaID}}}, }, } + cpus := []int64{12, 23, 30} + for _, tc := range []struct { desc string pods []*v1.Pod devices []*podresourcesapi.ContainerDevices + cpus []int64 expectedResponse *podresourcesapi.ListPodResourcesResponse }{ { desc: "no pods", pods: []*v1.Pod{}, devices: []*podresourcesapi.ContainerDevices{}, + cpus: []int64{}, expectedResponse: &podresourcesapi.ListPodResourcesResponse{}, }, { @@ -70,6 +76,7 @@ func TestListPodResourcesV1(t *testing.T) { }, }, devices: []*podresourcesapi.ContainerDevices{}, + cpus: []int64{}, expectedResponse: &podresourcesapi.ListPodResourcesResponse{ PodResources: []*podresourcesapi.PodResources{ { @@ -104,6 +111,7 @@ func TestListPodResourcesV1(t *testing.T) { }, }, devices: devs, + cpus: cpus, expectedResponse: &podresourcesapi.ListPodResourcesResponse{ PodResources: []*podresourcesapi.PodResources{ { @@ -113,6 +121,7 @@ func TestListPodResourcesV1(t *testing.T) { { Name: containerName, Devices: devs, + CpuIds: cpus, }, }, }, @@ -124,8 +133,9 @@ func TestListPodResourcesV1(t *testing.T) { m := new(mockProvider) m.On("GetPods").Return(tc.pods) m.On("GetDevices", string(podUID), containerName).Return(tc.devices) + m.On("GetCPUs", string(podUID), containerName).Return(tc.cpus) m.On("UpdateAllocatedDevices").Return() - server := NewV1PodResourcesServer(m, m) + server := NewV1PodResourcesServer(m, m, m) resp, err := server.List(context.TODO(), &podresourcesapi.ListPodResourcesRequest{}) if err != nil { t.Errorf("want err = %v, got %q", nil, err) diff --git a/pkg/kubelet/apis/podresources/server_v1alpha1_test.go b/pkg/kubelet/apis/podresources/server_v1alpha1_test.go index b87ac10fa8c..5fe6f966e9d 100644 --- a/pkg/kubelet/apis/podresources/server_v1alpha1_test.go +++ b/pkg/kubelet/apis/podresources/server_v1alpha1_test.go @@ -43,6 +43,11 @@ func (m *mockProvider) GetDevices(podUID, containerName string) []*podresourcesv return args.Get(0).([]*podresourcesv1.ContainerDevices) } +func (m *mockProvider) GetCPUs(podUID, containerName string) []int64 { + args := m.Called(podUID, containerName) + return args.Get(0).([]int64) +} + func (m *mockProvider) UpdateAllocatedDevices() { m.Called() } diff --git a/pkg/kubelet/apis/podresources/types.go b/pkg/kubelet/apis/podresources/types.go index 266c0c8af70..433d92c5996 100644 --- a/pkg/kubelet/apis/podresources/types.go +++ b/pkg/kubelet/apis/podresources/types.go @@ -31,3 +31,8 @@ type DevicesProvider interface { type PodsProvider interface { GetPods() []*v1.Pod } + +// CPUsProvider knows how to provide the cpus used by the given container +type CPUsProvider interface { + GetCPUs(podUID, containerName string) []int64 +} diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 603a150a26a..bf8122610a2 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -106,6 +106,9 @@ type ContainerManager interface { // GetDevices returns information about the devices assigned to pods and containers GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices + // GetCPUs returns information about the cpus assigned to pods and containers + GetCPUs(podUID, containerName string) []int64 + // ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, // due to node recreation. ShouldResetExtendedResourceCapacity() bool diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index f0c0e33cf1d..1c95b652918 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -1027,6 +1027,10 @@ func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podr return cm.deviceManager.GetDevices(podUID, containerName) } +func (cm *containerManagerImpl) GetCPUs(podUID, containerName string) []int64 { + return cm.cpuManager.GetCPUs(podUID, containerName) +} + func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { return cm.deviceManager.ShouldResetExtendedResourceCapacity() } diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 6d8fa08dcab..65eac7aadcf 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -125,6 +125,10 @@ func (cm *containerManagerStub) UpdateAllocatedDevices() { return } +func (cm *containerManagerStub) GetCPUs(_, _ string) []int64 { + return nil +} + func NewStubContainerManager() ContainerManager { return &containerManagerStub{shouldResetExtendedResourceCapacity: false} } diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index 3bc0f00e2a6..e6637b4d443 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -192,3 +192,7 @@ func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle. func (cm *containerManagerImpl) UpdateAllocatedDevices() { return } + +func (cm *containerManagerImpl) GetCPUs(_, _ string) []int64 { + return nil +} diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 1e6cc936d82..5f68c5957cd 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -77,6 +77,10 @@ type Manager interface { // and is consulted to achieve NUMA aware resource alignment among this // and other resource controllers. GetTopologyHints(*v1.Pod, *v1.Container) map[string][]topologymanager.TopologyHint + + // GetCPUs implements the podresources.CPUsProvider interface to provide allocated + // cpus for the container + GetCPUs(podUID, containerName string) []int64 } type manager struct { @@ -461,3 +465,12 @@ func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) CpusetCpus: cpus.String(), }) } + +func (m *manager) GetCPUs(podUID, containerName string) []int64 { + cpus := m.state.GetCPUSetOrDefault(string(podUID), containerName) + result := []int64{} + for _, cpu := range cpus.ToSliceNoSort() { + result = append(result, int64(cpu)) + } + return result +} diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index d8d707173bd..f73d038298f 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -64,6 +64,11 @@ func (m *fakeManager) State() state.Reader { return m.state } +func (m *fakeManager) GetCPUs(podUID, containerName string) []int64 { + klog.Infof("[fake cpumanager] GetCPUs(podUID: %s, containerName: %s)", podUID, containerName) + return nil +} + // NewFakeManager creates empty/fake cpu manager func NewFakeManager() Manager { return &fakeManager{ diff --git a/pkg/kubelet/cm/devicemanager/BUILD b/pkg/kubelet/cm/devicemanager/BUILD index a2a58b0b363..571f8b37389 100644 --- a/pkg/kubelet/cm/devicemanager/BUILD +++ b/pkg/kubelet/cm/devicemanager/BUILD @@ -48,11 +48,13 @@ go_test( srcs = [ "endpoint_test.go", "manager_test.go", + "pod_devices_test.go", "topology_hints_test.go", ], embed = [":go_default_library"], deps = [ "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library", "//pkg/kubelet/cm/topologymanager:go_default_library", "//pkg/kubelet/cm/topologymanager/bitmask:go_default_library", "//pkg/kubelet/config:go_default_library", diff --git a/pkg/kubelet/cm/devicemanager/checkpoint/BUILD b/pkg/kubelet/cm/devicemanager/checkpoint/BUILD index 91506a71f7c..afdace3db4f 100644 --- a/pkg/kubelet/cm/devicemanager/checkpoint/BUILD +++ b/pkg/kubelet/cm/devicemanager/checkpoint/BUILD @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager/checksum:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", ], ) diff --git a/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go b/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go index 441eebd6684..0430fcd1d2b 100644 --- a/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go +++ b/pkg/kubelet/cm/devicemanager/checkpoint/checkpoint.go @@ -19,6 +19,7 @@ package checkpoint import ( "encoding/json" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" ) @@ -29,12 +30,15 @@ type DeviceManagerCheckpoint interface { GetData() ([]PodDevicesEntry, map[string][]string) } +// DevicesPerNUMA represents device ids obtained from device plugin per NUMA node id +type DevicesPerNUMA map[int64][]string + // PodDevicesEntry connects pod information to devices type PodDevicesEntry struct { PodUID string ContainerName string ResourceName string - DeviceIDs []string + DeviceIDs DevicesPerNUMA AllocResp []byte } @@ -52,6 +56,22 @@ type Data struct { Checksum checksum.Checksum } +// NewDevicesPerNUMA is a function that creates DevicesPerNUMA map +func NewDevicesPerNUMA() DevicesPerNUMA { + return make(DevicesPerNUMA) +} + +// Devices is a function that returns all device ids for all NUMA nodes +// and represent it as sets.String +func (dev DevicesPerNUMA) Devices() sets.String { + result := sets.NewString() + + for _, devs := range dev { + result.Insert(devs...) + } + return result +} + // New returns an instance of Checkpoint func New(devEntries []PodDevicesEntry, devices map[string][]string) DeviceManagerCheckpoint { diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index c374e1a7c7a..626f25da9ef 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -900,8 +900,17 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont return fmt.Errorf("no containers return in allocation response %v", resp) } + allocDevicesWithNUMA := checkpoint.NewDevicesPerNUMA() // Update internal cached podDevices state. - m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0]) + m.mutex.Lock() + for dev := range allocDevices { + for idx := range m.allDevices[resource][dev].Topology.Nodes { + node := m.allDevices[resource][dev].Topology.Nodes[idx] + allocDevicesWithNUMA[node.ID] = append(allocDevicesWithNUMA[node.ID], dev) + } + } + m.mutex.Unlock() + m.podDevices.insert(podUID, contName, resource, allocDevicesWithNUMA, resp.ContainerResponses[0]) } if needsUpdateCheckpoint { diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index ca45ebc87e8..8f1ec0bd362 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -38,6 +38,7 @@ import ( pluginapi "k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1" watcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -432,10 +433,10 @@ func TestUpdateCapacityAllocatable(t *testing.T) { as.True(testManager.isDevicePluginResource(resourceName2)) } -func constructDevices(devices []string) sets.String { - ret := sets.NewString() +func constructDevices(devices []string) checkpoint.DevicesPerNUMA { + ret := checkpoint.DevicesPerNUMA{} for _, dev := range devices { - ret.Insert(dev) + ret[0] = append(ret[0], dev) } return ret } @@ -621,13 +622,11 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso activePods: activePods, sourcesReady: &sourcesReadyStub{}, checkpointManager: ckm, + allDevices: make(map[string]map[string]pluginapi.Device), } for _, res := range testRes { - testManager.healthyDevices[res.resourceName] = sets.NewString() - for _, dev := range res.devs { - testManager.healthyDevices[res.resourceName].Insert(dev) - } + testManager.healthyDevices[res.resourceName] = sets.NewString(res.devs.Devices().UnsortedList()...) if res.resourceName == "domain1.com/resource1" { testManager.endpoints[res.resourceName] = endpointInfo{ e: &MockEndpoint{allocateFunc: allocateStubFunc()}, @@ -657,6 +656,8 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso opts: nil, } } + testManager.allDevices[res.resourceName] = makeDevice(res.devs) + } return testManager, nil } @@ -664,19 +665,19 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso type TestResource struct { resourceName string resourceQuantity resource.Quantity - devs []string + devs checkpoint.DevicesPerNUMA } func TestPodContainerDeviceAllocation(t *testing.T) { res1 := TestResource{ resourceName: "domain1.com/resource1", resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), - devs: []string{"dev1", "dev2"}, + devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}}, } res2 := TestResource{ resourceName: "domain2.com/resource2", resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI), - devs: []string{"dev3", "dev4"}, + devs: checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}}, } testResources := make([]TestResource, 2) testResources = append(testResources, res1) @@ -767,12 +768,12 @@ func TestInitContainerDeviceAllocation(t *testing.T) { res1 := TestResource{ resourceName: "domain1.com/resource1", resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), - devs: []string{"dev1", "dev2"}, + devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}}, } res2 := TestResource{ resourceName: "domain2.com/resource2", resourceQuantity: *resource.NewQuantity(int64(1), resource.DecimalSI), - devs: []string{"dev3", "dev4"}, + devs: checkpoint.DevicesPerNUMA{0: []string{"dev3", "dev4"}}, } testResources := make([]TestResource, 2) testResources = append(testResources, res1) @@ -920,7 +921,7 @@ func TestDevicePreStartContainer(t *testing.T) { res1 := TestResource{ resourceName: "domain1.com/resource1", resourceQuantity: *resource.NewQuantity(int64(2), resource.DecimalSI), - devs: []string{"dev1", "dev2"}, + devs: checkpoint.DevicesPerNUMA{0: []string{"dev1", "dev2"}}, } as := require.New(t) podsStub := activePodsStub{ @@ -960,7 +961,7 @@ func TestDevicePreStartContainer(t *testing.T) { as.Contains(initializedDevs, "dev1") as.Contains(initializedDevs, "dev2") - as.Equal(len(initializedDevs), len(res1.devs)) + as.Equal(len(initializedDevs), res1.devs.Devices().Len()) expectedResps, err := allocateStubFunc()([]string{"dev1", "dev2"}) as.Nil(err) @@ -1057,3 +1058,13 @@ func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) return resps, nil } } + +func makeDevice(devOnNUMA checkpoint.DevicesPerNUMA) map[string]pluginapi.Device { + res := make(map[string]pluginapi.Device) + for node, devs := range devOnNUMA { + for idx := range devs { + res[devs[idx]] = pluginapi.Device{ID: devs[idx], Topology: &pluginapi.TopologyInfo{Nodes: []*pluginapi.NUMANode{{ID: node}}}} + } + } + return res +} diff --git a/pkg/kubelet/cm/devicemanager/pod_devices.go b/pkg/kubelet/cm/devicemanager/pod_devices.go index d3ffab80da4..6f640666211 100644 --- a/pkg/kubelet/cm/devicemanager/pod_devices.go +++ b/pkg/kubelet/cm/devicemanager/pod_devices.go @@ -30,7 +30,7 @@ import ( type deviceAllocateInfo struct { // deviceIds contains device Ids allocated to this container for the given resourceName. - deviceIds sets.String + deviceIds checkpoint.DevicesPerNUMA // allocResp contains cached rpc AllocateResponse. allocResp *pluginapi.ContainerAllocateResponse } @@ -70,7 +70,7 @@ func (pdev *podDevices) hasPod(podUID string) bool { return podExists } -func (pdev *podDevices) insert(podUID, contName, resource string, devices sets.String, resp *pluginapi.ContainerAllocateResponse) { +func (pdev *podDevices) insert(podUID, contName, resource string, devices checkpoint.DevicesPerNUMA, resp *pluginapi.ContainerAllocateResponse) { pdev.Lock() defer pdev.Unlock() if _, podExists := pdev.devs[podUID]; !podExists { @@ -108,7 +108,7 @@ func (pdev *podDevices) containerDevices(podUID, contName, resource string) sets if !resourceExists { return nil } - return devs.deviceIds + return devs.deviceIds.Devices() } // Populates allocatedResources with the device resources allocated to the specified . @@ -124,7 +124,7 @@ func (pdev *podDevices) addContainerAllocatedResources(podUID, contName string, return } for resource, devices := range resources { - allocatedResources[resource] = allocatedResources[resource].Union(devices.deviceIds) + allocatedResources[resource] = allocatedResources[resource].Union(devices.deviceIds.Devices()) } } @@ -141,7 +141,7 @@ func (pdev *podDevices) removeContainerAllocatedResources(podUID, contName strin return } for resource, devices := range resources { - allocatedResources[resource] = allocatedResources[resource].Difference(devices.deviceIds) + allocatedResources[resource] = allocatedResources[resource].Difference(devices.deviceIds.Devices()) } } @@ -157,7 +157,7 @@ func (pdev *podDevices) devices() map[string]sets.String { ret[resource] = sets.NewString() } if devices.allocResp != nil { - ret[resource] = ret[resource].Union(devices.deviceIds) + ret[resource] = ret[resource].Union(devices.deviceIds.Devices()) } } } @@ -173,7 +173,6 @@ func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry { for podUID, containerDevices := range pdev.devs { for conName, resources := range containerDevices { for resource, devices := range resources { - devIds := devices.deviceIds.UnsortedList() if devices.allocResp == nil { klog.Errorf("Can't marshal allocResp for %v %v %v: allocation response is missing", podUID, conName, resource) continue @@ -188,7 +187,7 @@ func (pdev *podDevices) toCheckpointData() []checkpoint.PodDevicesEntry { PodUID: podUID, ContainerName: conName, ResourceName: resource, - DeviceIDs: devIds, + DeviceIDs: devices.deviceIds, AllocResp: allocResp}) } } @@ -201,17 +200,13 @@ func (pdev *podDevices) fromCheckpointData(data []checkpoint.PodDevicesEntry) { for _, entry := range data { klog.V(2).Infof("Get checkpoint entry: %v %v %v %v %v\n", entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, entry.AllocResp) - devIDs := sets.NewString() - for _, devID := range entry.DeviceIDs { - devIDs.Insert(devID) - } allocResp := &pluginapi.ContainerAllocateResponse{} err := allocResp.Unmarshal(entry.AllocResp) if err != nil { klog.Errorf("Can't unmarshal allocResp for %v %v %v: %v", entry.PodUID, entry.ContainerName, entry.ResourceName, err) continue } - pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, devIDs, allocResp) + pdev.insert(entry.PodUID, entry.ContainerName, entry.ResourceName, entry.DeviceIDs, allocResp) } } @@ -328,10 +323,13 @@ func (pdev *podDevices) getContainerDevices(podUID, contName string) []*podresou } cDev := []*podresourcesapi.ContainerDevices{} for resource, allocateInfo := range pdev.devs[podUID][contName] { - cDev = append(cDev, &podresourcesapi.ContainerDevices{ - ResourceName: resource, - DeviceIds: allocateInfo.deviceIds.UnsortedList(), - }) + for numaid, devlist := range allocateInfo.deviceIds { + cDev = append(cDev, &podresourcesapi.ContainerDevices{ + ResourceName: resource, + DeviceIds: devlist, + Topology: &podresourcesapi.TopologyInfo{Nodes: []*podresourcesapi.NUMANode{{ID: numaid}}}, + }) + } } return cDev } diff --git a/pkg/kubelet/cm/devicemanager/pod_devices_test.go b/pkg/kubelet/cm/devicemanager/pod_devices_test.go new file mode 100644 index 00000000000..b2ff8376a73 --- /dev/null +++ b/pkg/kubelet/cm/devicemanager/pod_devices_test.go @@ -0,0 +1,47 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package devicemanager + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" +) + +func TestGetContainerDevices(t *testing.T) { + podDevices := newPodDevices() + resourceName1 := "domain1.com/resource1" + podID := "pod1" + contID := "con1" + devices := checkpoint.DevicesPerNUMA{0: []string{"dev1"}, 1: []string{"dev1"}} + + podDevices.insert(podID, contID, resourceName1, + devices, + constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"}, map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{})) + + contDevices := podDevices.getContainerDevices(podID, contID) + require.Equal(t, len(devices), len(contDevices), "Incorrect container devices") + for _, contDev := range contDevices { + for _, node := range contDev.Topology.Nodes { + dev, ok := devices[node.ID] + require.True(t, ok, "NUMA id %v doesn't exist in result", node.ID) + require.Equal(t, contDev.DeviceIds[0], dev[0], "Can't find device %s in result", dev[0]) + } + } +} diff --git a/pkg/kubelet/cm/devicemanager/topology_hints_test.go b/pkg/kubelet/cm/devicemanager/topology_hints_test.go index 5f2db322953..84f000a8cc6 100644 --- a/pkg/kubelet/cm/devicemanager/topology_hints_test.go +++ b/pkg/kubelet/cm/devicemanager/topology_hints_test.go @@ -404,7 +404,7 @@ func TestGetTopologyHints(t *testing.T) { for p := range tc.allocatedDevices { for c := range tc.allocatedDevices[p] { for r, devices := range tc.allocatedDevices[p][c] { - m.podDevices.insert(p, c, r, sets.NewString(devices...), nil) + m.podDevices.insert(p, c, r, constructDevices(devices), nil) m.allocatedDevices[r] = sets.NewString() for _, d := range devices { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index d6d620790c0..f8f0fc0a0ef 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -2193,7 +2193,7 @@ func (kl *Kubelet) ListenAndServePodResources() { klog.V(2).Infof("Failed to get local endpoint for PodResources endpoint: %v", err) return } - server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager) + server.ListenAndServePodResources(socket, kl.podManager, kl.containerManager, kl.containerManager) } // Delete the eligible dead container instances in a pod. Depending on the configuration, the latest dead containers may be kept around. diff --git a/pkg/kubelet/server/server.go b/pkg/kubelet/server/server.go index 23e7c362afc..fb8c7e12e7c 100644 --- a/pkg/kubelet/server/server.go +++ b/pkg/kubelet/server/server.go @@ -179,10 +179,10 @@ func ListenAndServeKubeletReadOnlyServer(host HostInterface, resourceAnalyzer st } // ListenAndServePodResources initializes a gRPC server to serve the PodResources service -func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider) { +func ListenAndServePodResources(socket string, podsProvider podresources.PodsProvider, devicesProvider podresources.DevicesProvider, cpusProvider podresources.CPUsProvider) { server := grpc.NewServer() podresourcesapiv1alpha1.RegisterPodResourcesListerServer(server, podresources.NewV1alpha1PodResourcesServer(podsProvider, devicesProvider)) - podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(podsProvider, devicesProvider)) + podresourcesapi.RegisterPodResourcesListerServer(server, podresources.NewV1PodResourcesServer(podsProvider, devicesProvider, cpusProvider)) l, err := util.CreateListener(socket) if err != nil { klog.Fatalf("Failed to create listener for podResources endpoint: %v", err)