diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 5681b2174f8..1baa280768f 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -304,7 +304,8 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I klog.Infof("Creating device plugin manager: %t", devicePluginEnabled) if devicePluginEnabled { - cm.deviceManager, err = devicemanager.NewManagerImpl() + cm.deviceManager, err = devicemanager.NewManagerImpl(numaNodeInfo, cm.topologyManager) + cm.topologyManager.AddHintProvider(cm.deviceManager) } else { cm.deviceManager, err = devicemanager.NewManagerStub() } diff --git a/pkg/kubelet/cm/devicemanager/BUILD b/pkg/kubelet/cm/devicemanager/BUILD index f0e320252b5..3e948d61592 100644 --- a/pkg/kubelet/cm/devicemanager/BUILD +++ b/pkg/kubelet/cm/devicemanager/BUILD @@ -8,6 +8,7 @@ go_library( "manager.go", "manager_stub.go", "pod_devices.go", + "topology_hints.go", "types.go", ], importpath = "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager", @@ -20,7 +21,9 @@ go_library( "//pkg/kubelet/apis/podresources/v1alpha1:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library", + "//pkg/kubelet/cm/topologymanager:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", @@ -48,6 +51,7 @@ go_test( "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", "//pkg/kubelet/apis/pluginregistration/v1:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/cm/topologymanager:go_default_library", "//pkg/kubelet/config:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/pluginmanager:go_default_library", diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 00ac669dffc..964a89bc3e2 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -38,7 +38,9 @@ import ( podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + cputopology "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" @@ -79,6 +81,9 @@ type ManagerImpl struct { // e.g. a new device is advertised, two old devices are deleted and a running device fails. callback monitorCallback + // allDevices is a map by resource name of all the devices currently registered to the device manager + allDevices map[string]map[string]pluginapi.Device + // healthyDevices contains all of the registered healthy resourceNames and their exported device IDs. healthyDevices map[string]sets.String @@ -91,6 +96,12 @@ type ManagerImpl struct { // podDevices contains pod to allocated device mapping. podDevices podDevices checkpointManager checkpointmanager.CheckpointManager + + // List of NUMA Nodes available on the underlying machine + numaNodes []int + + // Store of Topology Affinties that the Device Manager can query. + topologyAffinityStore topologymanager.Store } type endpointInfo struct { @@ -104,27 +115,35 @@ func (s *sourcesReadyStub) AddSource(source string) {} func (s *sourcesReadyStub) AllReady() bool { return true } // NewManagerImpl creates a new manager. -func NewManagerImpl() (*ManagerImpl, error) { - return newManagerImpl(pluginapi.KubeletSocket) +func NewManagerImpl(numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) { + return newManagerImpl(pluginapi.KubeletSocket, numaNodeInfo, topologyAffinityStore) } -func newManagerImpl(socketPath string) (*ManagerImpl, error) { +func newManagerImpl(socketPath string, numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) { klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath) if socketPath == "" || !filepath.IsAbs(socketPath) { return nil, fmt.Errorf(errBadSocket+" %s", socketPath) } + var numaNodes []int + for node := range numaNodeInfo { + numaNodes = append(numaNodes, node) + } + dir, file := filepath.Split(socketPath) manager := &ManagerImpl{ endpoints: make(map[string]endpointInfo), - socketname: file, - socketdir: dir, - healthyDevices: make(map[string]sets.String), - unhealthyDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - podDevices: make(podDevices), + socketname: file, + socketdir: dir, + allDevices: make(map[string]map[string]pluginapi.Device), + healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + podDevices: make(podDevices), + numaNodes: numaNodes, + topologyAffinityStore: topologyAffinityStore, } manager.callback = manager.genericDeviceUpdateCallback @@ -145,7 +164,9 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [ m.mutex.Lock() m.healthyDevices[resourceName] = sets.NewString() m.unhealthyDevices[resourceName] = sets.NewString() + m.allDevices[resourceName] = make(map[string]pluginapi.Device) for _, dev := range devices { + m.allDevices[resourceName][dev.ID] = dev if dev.Health == pluginapi.Healthy { m.healthyDevices[resourceName].Insert(dev.ID) } else { diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index 1a63bc8c151..a22e01b29fa 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -19,6 +19,7 @@ package devicemanager import ( v1 "k8s.io/api/core/v1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" @@ -63,6 +64,11 @@ func (h *ManagerStub) GetWatcherHandler() cache.PluginHandler { return nil } +// GetTopologyHints returns an empty TopologyHint map +func (h *ManagerStub) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint { + return map[string][]topologymanager.TopologyHint{} +} + // GetDevices returns nil func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices { return nil diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index aa169aefeb6..b41e3a95d32 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -37,6 +37,7 @@ import ( pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/pluginmanager" @@ -60,9 +61,10 @@ func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error) func TestNewManagerImpl(t *testing.T) { socketDir, socketName, _, err := tmpSocketDir() + topologyStore := topologymanager.NewFakeManager() require.NoError(t, err) defer os.RemoveAll(socketDir) - _, err = newManagerImpl(socketName) + _, err = newManagerImpl(socketName, nil, topologyStore) require.NoError(t, err) os.RemoveAll(socketDir) } @@ -221,7 +223,8 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) { } func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) { - m, err := newManagerImpl(socketName) + topologyStore := topologymanager.NewFakeManager() + m, err := newManagerImpl(socketName, nil, topologyStore) require.NoError(t, err) updateChan := make(chan interface{}) @@ -288,9 +291,10 @@ func cleanup(t *testing.T, m Manager, p *Stub) { func TestUpdateCapacityAllocatable(t *testing.T) { socketDir, socketName, _, err := tmpSocketDir() + topologyStore := topologymanager.NewFakeManager() require.NoError(t, err) defer os.RemoveAll(socketDir) - testManager, err := newManagerImpl(socketName) + testManager, err := newManagerImpl(socketName, nil, topologyStore) as := assert.New(t) as.NotNil(testManager) as.Nil(err) @@ -594,16 +598,17 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso return nil, err } testManager := &ManagerImpl{ - socketdir: tmpDir, - callback: monitorCallback, - healthyDevices: make(map[string]sets.String), - unhealthyDevices: make(map[string]sets.String), - allocatedDevices: make(map[string]sets.String), - endpoints: make(map[string]endpointInfo), - podDevices: make(podDevices), - activePods: activePods, - sourcesReady: &sourcesReadyStub{}, - checkpointManager: ckm, + socketdir: tmpDir, + callback: monitorCallback, + healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), + allocatedDevices: make(map[string]sets.String), + endpoints: make(map[string]endpointInfo), + podDevices: make(podDevices), + topologyAffinityStore: topologymanager.NewFakeManager(), + activePods: activePods, + sourcesReady: &sourcesReadyStub{}, + checkpointManager: ckm, } for _, res := range testRes { diff --git a/pkg/kubelet/cm/devicemanager/topology_hints.go b/pkg/kubelet/cm/devicemanager/topology_hints.go new file mode 100644 index 00000000000..aeed76a3e64 --- /dev/null +++ b/pkg/kubelet/cm/devicemanager/topology_hints.go @@ -0,0 +1,31 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package devicemanager + +import ( + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" +) + +// GetTopologyHints implements the TopologyManager HintProvider Interface which +// ensures the Device Manager is consulted when Topology Aware Hints for each +// container are created. +func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint { + deviceHints := make(map[string][]topologymanager.TopologyHint) + // TODO: Fill in deviceHints with proper TopologyHints + return deviceHints +} diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index 12ac5e94310..4d7c9b8af9e 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -21,6 +21,7 @@ import ( v1 "k8s.io/api/core/v1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" + "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -63,6 +64,10 @@ type Manager interface { // depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates // the node has been recreated. ShouldResetExtendedResourceCapacity() bool + + // TopologyManager HintProvider provider indicates the Device Manager implements the Topology Manager Interface + // and is consulted to make Topology aware resource alignments + GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint } // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.