Port deviceManager in windows container manager

This commit is contained in:
Anthony ARNAUD 2020-07-21 12:35:49 +02:00
parent 46d481b455
commit 8bdc3d8970
4 changed files with 65 additions and 11 deletions

View File

@ -169,6 +169,7 @@ go_library(
], ],
"@io_bazel_rules_go//go/platform:windows": [ "@io_bazel_rules_go//go/platform:windows": [
"//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/cm/devicemanager:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/mount-utils:go_default_library", "//staging/src/k8s.io/mount-utils:go_default_library",
], ],

View File

@ -36,6 +36,7 @@ import (
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -52,6 +53,10 @@ type containerManagerImpl struct {
cadvisorInterface cadvisor.Interface cadvisorInterface cadvisor.Interface
// Config of this node. // Config of this node.
nodeConfig NodeConfig nodeConfig NodeConfig
// Interface for exporting and allocating devices reported by device plugins.
deviceManager devicemanager.Manager
// Interface for Topology resource co-ordination
topologyManager topologymanager.Manager
} }
type noopWindowsResourceAllocator struct{} type noopWindowsResourceAllocator struct{}
@ -79,6 +84,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
} }
} }
// Starts device manager.
if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil {
return err
}
return nil return nil
} }
@ -93,11 +103,26 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
} }
capacity := cadvisor.CapacityFromMachineInfo(machineInfo) capacity := cadvisor.CapacityFromMachineInfo(machineInfo)
return &containerManagerImpl{ cm := &containerManagerImpl{
capacity: capacity, capacity: capacity,
nodeConfig: nodeConfig, nodeConfig: nodeConfig,
cadvisorInterface: cadvisorInterface, cadvisorInterface: cadvisorInterface,
}, nil }
cm.topologyManager = topologymanager.NewFakeManager()
klog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl(nil, cm.topologyManager)
cm.topologyManager.AddHintProvider(cm.deviceManager)
} else {
cm.deviceManager, err = devicemanager.NewManagerStub()
}
if err != nil {
return nil, err
}
return cm, nil
} }
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
@ -150,11 +175,11 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
} }
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler { func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
return nil return cm.deviceManager.GetWatcherHandler()
} }
func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return nil, nil, []string{} return cm.deviceManager.GetCapacity()
} }
func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager { func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
@ -162,11 +187,24 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
} }
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
return &kubecontainer.RunContainerOptions{}, nil opts := &kubecontainer.RunContainerOptions{}
// Allocate should already be called during predicateAdmitHandler.Admit(),
// just try to fetch device runtime information from cached state here
devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container)
if err != nil {
return nil, err
} else if devOpts == nil {
return opts, nil
}
opts.Devices = append(opts.Devices, devOpts.Devices...)
opts.Mounts = append(opts.Mounts, devOpts.Mounts...)
opts.Envs = append(opts.Envs, devOpts.Envs...)
opts.Annotations = append(opts.Annotations, devOpts.Annotations...)
return opts, nil
} }
func (cm *containerManagerImpl) UpdatePluginResources(*schedulerframework.NodeInfo, *lifecycle.PodAdmitAttributes) error { func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
return nil return cm.deviceManager.UpdatePluginResources(node, attrs)
} }
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
@ -177,12 +215,12 @@ func (cm *containerManagerImpl) GetPodCgroupRoot() string {
return "" return ""
} }
func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices { func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
return nil return cm.deviceManager.GetDevices(podUID, containerName)
} }
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return false return cm.deviceManager.ShouldResetExtendedResourceCapacity()
} }
func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler { func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler {

View File

@ -22,6 +22,7 @@ import (
"net" "net"
"os" "os"
"path/filepath" "path/filepath"
"runtime"
"sort" "sort"
"sync" "sync"
"time" "time"
@ -126,7 +127,11 @@ func (s *sourcesReadyStub) AllReady() bool { return true }
// NewManagerImpl creates a new manager. // NewManagerImpl creates a new manager.
func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) { func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {
return newManagerImpl(pluginapi.KubeletSocket, topology, topologyAffinityStore) socketPath := pluginapi.KubeletSocket
if runtime.GOOS == "windows" {
socketPath = os.Getenv("SYSTEMDRIVE") + pluginapi.KubeletSocketWindows
}
return newManagerImpl(socketPath, topology, topologyAffinityStore)
} }
func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) { func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) {

View File

@ -30,7 +30,17 @@ const (
DevicePluginPath = "/var/lib/kubelet/device-plugins/" DevicePluginPath = "/var/lib/kubelet/device-plugins/"
// KubeletSocket is the path of the Kubelet registry socket // KubeletSocket is the path of the Kubelet registry socket
KubeletSocket = DevicePluginPath + "kubelet.sock" KubeletSocket = DevicePluginPath + "kubelet.sock"
// DevicePluginPathWindows Avoid failed to run Kubelet: bad socketPath,
// must be an absolute path: /var/lib/kubelet/device-plugins/kubelet.sock
// https://github.com/kubernetes/kubernetes/issues/93262
// https://github.com/kubernetes/kubernetes/pull/93285#discussion_r458140701
DevicePluginPathWindows = "\\var\\lib\\kubelet\\device-plugins\\"
// KubeletSocketWindows is the path of the Kubelet registry socket on windows
KubeletSocketWindows = DevicePluginPathWindows + "kubelet.sock"
// KubeletPreStartContainerRPCTimeoutInSecs is the timeout duration in secs for PreStartContainer RPC // KubeletPreStartContainerRPCTimeoutInSecs is the timeout duration in secs for PreStartContainer RPC
// Timeout duration in secs for PreStartContainer RPC
KubeletPreStartContainerRPCTimeoutInSecs = 30 KubeletPreStartContainerRPCTimeoutInSecs = 30
) )