Added stub support for Topology Manager to Device Manager

Co-authored-by: Conor Nolan <conor.nolan@intel.com>
Co-authored-by: Sreemanti Ghosh <sreemanti.ghosh@intel.com>
Co-authored-by: Kevin Klues <kklues@nvidia.com>
This commit is contained in:
Louise Daly 2019-07-24 09:40:51 -07:00 committed by Kevin Klues
parent 1c1f19c61c
commit 9a118ceac4
7 changed files with 96 additions and 23 deletions

View File

@ -304,7 +304,8 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
klog.Infof("Creating device plugin manager: %t", devicePluginEnabled) klog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
if devicePluginEnabled { if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl() cm.deviceManager, err = devicemanager.NewManagerImpl(numaNodeInfo, cm.topologyManager)
cm.topologyManager.AddHintProvider(cm.deviceManager)
} else { } else {
cm.deviceManager, err = devicemanager.NewManagerStub() cm.deviceManager, err = devicemanager.NewManagerStub()
} }

View File

@ -8,6 +8,7 @@ go_library(
"manager.go", "manager.go",
"manager_stub.go", "manager_stub.go",
"pod_devices.go", "pod_devices.go",
"topology_hints.go",
"types.go", "types.go",
], ],
importpath = "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager", importpath = "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager",
@ -20,7 +21,9 @@ go_library(
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library", "//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library", "//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/cm/cpumanager/topology:go_default_library",
"//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library", "//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/config:go_default_library", "//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
@ -48,6 +51,7 @@ go_test(
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library", "//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/config:go_default_library", "//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/pluginmanager:go_default_library", "//pkg/kubelet/pluginmanager:go_default_library",

View File

@ -38,7 +38,9 @@ import (
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
cputopology "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
@ -79,6 +81,9 @@ type ManagerImpl struct {
// e.g. a new device is advertised, two old devices are deleted and a running device fails. // e.g. a new device is advertised, two old devices are deleted and a running device fails.
callback monitorCallback callback monitorCallback
// allDevices is a map by resource name of all the devices currently registered to the device manager
allDevices map[string]map[string]pluginapi.Device
// healthyDevices contains all of the registered healthy resourceNames and their exported device IDs. // healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
healthyDevices map[string]sets.String healthyDevices map[string]sets.String
@ -91,6 +96,12 @@ type ManagerImpl struct {
// podDevices contains pod to allocated device mapping. // podDevices contains pod to allocated device mapping.
podDevices podDevices podDevices podDevices
checkpointManager checkpointmanager.CheckpointManager checkpointManager checkpointmanager.CheckpointManager
// List of NUMA Nodes available on the underlying machine
numaNodes []int
// Store of Topology Affinties that the Device Manager can query.
topologyAffinityStore topologymanager.Store
} }
type endpointInfo struct { type endpointInfo struct {
@ -104,27 +115,35 @@ func (s *sourcesReadyStub) AddSource(source string) {}
func (s *sourcesReadyStub) AllReady() bool { return true } func (s *sourcesReadyStub) AllReady() bool { return true }
// NewManagerImpl creates a new manager. // NewManagerImpl creates a new manager.
func NewManagerImpl() (*ManagerImpl, error) { func NewManagerImpl(numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
return newManagerImpl(pluginapi.KubeletSocket) return newManagerImpl(pluginapi.KubeletSocket, numaNodeInfo, topologyAffinityStore)
} }
func newManagerImpl(socketPath string) (*ManagerImpl, error) { func newManagerImpl(socketPath string, numaNodeInfo cputopology.NUMANodeInfo, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath) klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
if socketPath == "" || !filepath.IsAbs(socketPath) { if socketPath == "" || !filepath.IsAbs(socketPath) {
return nil, fmt.Errorf(errBadSocket+" %s", socketPath) return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
} }
var numaNodes []int
for node := range numaNodeInfo {
numaNodes = append(numaNodes, node)
}
dir, file := filepath.Split(socketPath) dir, file := filepath.Split(socketPath)
manager := &ManagerImpl{ manager := &ManagerImpl{
endpoints: make(map[string]endpointInfo), endpoints: make(map[string]endpointInfo),
socketname: file, socketname: file,
socketdir: dir, socketdir: dir,
healthyDevices: make(map[string]sets.String), allDevices: make(map[string]map[string]pluginapi.Device),
unhealthyDevices: make(map[string]sets.String), healthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String),
podDevices: make(podDevices), allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
numaNodes: numaNodes,
topologyAffinityStore: topologyAffinityStore,
} }
manager.callback = manager.genericDeviceUpdateCallback manager.callback = manager.genericDeviceUpdateCallback
@ -145,7 +164,9 @@ func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices [
m.mutex.Lock() m.mutex.Lock()
m.healthyDevices[resourceName] = sets.NewString() m.healthyDevices[resourceName] = sets.NewString()
m.unhealthyDevices[resourceName] = sets.NewString() m.unhealthyDevices[resourceName] = sets.NewString()
m.allDevices[resourceName] = make(map[string]pluginapi.Device)
for _, dev := range devices { for _, dev := range devices {
m.allDevices[resourceName][dev.ID] = dev
if dev.Health == pluginapi.Healthy { if dev.Health == pluginapi.Healthy {
m.healthyDevices[resourceName].Insert(dev.ID) m.healthyDevices[resourceName].Insert(dev.ID)
} else { } else {

View File

@ -19,6 +19,7 @@ package devicemanager
import ( import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
@ -63,6 +64,11 @@ func (h *ManagerStub) GetWatcherHandler() cache.PluginHandler {
return nil return nil
} }
// GetTopologyHints returns an empty TopologyHint map
func (h *ManagerStub) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint {
return map[string][]topologymanager.TopologyHint{}
}
// GetDevices returns nil // GetDevices returns nil
func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices { func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
return nil return nil

View File

@ -37,6 +37,7 @@ import (
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1" watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager" "k8s.io/kubernetes/pkg/kubelet/pluginmanager"
@ -60,9 +61,10 @@ func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error)
func TestNewManagerImpl(t *testing.T) { func TestNewManagerImpl(t *testing.T) {
socketDir, socketName, _, err := tmpSocketDir() socketDir, socketName, _, err := tmpSocketDir()
topologyStore := topologymanager.NewFakeManager()
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(socketDir) defer os.RemoveAll(socketDir)
_, err = newManagerImpl(socketName) _, err = newManagerImpl(socketName, nil, topologyStore)
require.NoError(t, err) require.NoError(t, err)
os.RemoveAll(socketDir) os.RemoveAll(socketDir)
} }
@ -221,7 +223,8 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
} }
func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) { func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) {
m, err := newManagerImpl(socketName) topologyStore := topologymanager.NewFakeManager()
m, err := newManagerImpl(socketName, nil, topologyStore)
require.NoError(t, err) require.NoError(t, err)
updateChan := make(chan interface{}) updateChan := make(chan interface{})
@ -288,9 +291,10 @@ func cleanup(t *testing.T, m Manager, p *Stub) {
func TestUpdateCapacityAllocatable(t *testing.T) { func TestUpdateCapacityAllocatable(t *testing.T) {
socketDir, socketName, _, err := tmpSocketDir() socketDir, socketName, _, err := tmpSocketDir()
topologyStore := topologymanager.NewFakeManager()
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(socketDir) defer os.RemoveAll(socketDir)
testManager, err := newManagerImpl(socketName) testManager, err := newManagerImpl(socketName, nil, topologyStore)
as := assert.New(t) as := assert.New(t)
as.NotNil(testManager) as.NotNil(testManager)
as.Nil(err) as.Nil(err)
@ -594,16 +598,17 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
return nil, err return nil, err
} }
testManager := &ManagerImpl{ testManager := &ManagerImpl{
socketdir: tmpDir, socketdir: tmpDir,
callback: monitorCallback, callback: monitorCallback,
healthyDevices: make(map[string]sets.String), healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String), unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpointInfo), endpoints: make(map[string]endpointInfo),
podDevices: make(podDevices), podDevices: make(podDevices),
activePods: activePods, topologyAffinityStore: topologymanager.NewFakeManager(),
sourcesReady: &sourcesReadyStub{}, activePods: activePods,
checkpointManager: ckm, sourcesReady: &sourcesReadyStub{},
checkpointManager: ckm,
} }
for _, res := range testRes { for _, res := range testRes {

View File

@ -0,0 +1,31 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package devicemanager
import (
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
)
// GetTopologyHints implements the TopologyManager HintProvider Interface which
// ensures the Device Manager is consulted when Topology Aware Hints for each
// container are created.
func (m *ManagerImpl) GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint {
deviceHints := make(map[string][]topologymanager.TopologyHint)
// TODO: Fill in deviceHints with proper TopologyHints
return deviceHints
}

View File

@ -21,6 +21,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -63,6 +64,10 @@ type Manager interface {
// depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates // depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates
// the node has been recreated. // the node has been recreated.
ShouldResetExtendedResourceCapacity() bool ShouldResetExtendedResourceCapacity() bool
// TopologyManager HintProvider provider indicates the Device Manager implements the Topology Manager Interface
// and is consulted to make Topology aware resource alignments
GetTopologyHints(pod v1.Pod, container v1.Container) map[string][]topologymanager.TopologyHint
} }
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices. // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.