From 8bdc3d897018862c654a7fb0b4094abd7df45eb2 Mon Sep 17 00:00:00 2001 From: Anthony ARNAUD Date: Tue, 21 Jul 2020 12:35:49 +0200 Subject: [PATCH] Port deviceManager in windows container manager --- pkg/kubelet/cm/BUILD | 1 + pkg/kubelet/cm/container_manager_windows.go | 58 +++++++++++++++---- pkg/kubelet/cm/devicemanager/manager.go | 7 ++- .../apis/deviceplugin/v1beta1/constants.go | 10 ++++ 4 files changed, 65 insertions(+), 11 deletions(-) diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index 6935ba92d09..1122bba86f7 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -169,6 +169,7 @@ go_library( ], "@io_bazel_rules_go//go/platform:windows": [ "//pkg/kubelet/cadvisor:go_default_library", + "//pkg/kubelet/cm/devicemanager:go_default_library", "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/mount-utils:go_default_library", ], diff --git a/pkg/kubelet/cm/container_manager_windows.go b/pkg/kubelet/cm/container_manager_windows.go index e6637b4d443..072ea63fabe 100644 --- a/pkg/kubelet/cm/container_manager_windows.go +++ b/pkg/kubelet/cm/container_manager_windows.go @@ -36,6 +36,7 @@ import ( kubefeatures "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" @@ -52,6 +53,10 @@ type containerManagerImpl struct { cadvisorInterface cadvisor.Interface // Config of this node. nodeConfig NodeConfig + // Interface for exporting and allocating devices reported by device plugins. + deviceManager devicemanager.Manager + // Interface for Topology resource co-ordination + topologyManager topologymanager.Manager } type noopWindowsResourceAllocator struct{} @@ -79,6 +84,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node, } } + // Starts device manager. + if err := cm.deviceManager.Start(devicemanager.ActivePodsFunc(activePods), sourcesReady); err != nil { + return err + } + return nil } @@ -93,11 +103,26 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I } capacity := cadvisor.CapacityFromMachineInfo(machineInfo) - return &containerManagerImpl{ + cm := &containerManagerImpl{ capacity: capacity, nodeConfig: nodeConfig, cadvisorInterface: cadvisorInterface, - }, nil + } + + cm.topologyManager = topologymanager.NewFakeManager() + + klog.Infof("Creating device plugin manager: %t", devicePluginEnabled) + if devicePluginEnabled { + cm.deviceManager, err = devicemanager.NewManagerImpl(nil, cm.topologyManager) + cm.topologyManager.AddHintProvider(cm.deviceManager) + } else { + cm.deviceManager, err = devicemanager.NewManagerStub() + } + if err != nil { + return nil, err + } + + return cm, nil } func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { @@ -150,11 +175,11 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList { } func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler { - return nil + return cm.deviceManager.GetWatcherHandler() } func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { - return nil, nil, []string{} + return cm.deviceManager.GetCapacity() } func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager { @@ -162,11 +187,24 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager { } func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { - return &kubecontainer.RunContainerOptions{}, nil + opts := &kubecontainer.RunContainerOptions{} + // Allocate should already be called during predicateAdmitHandler.Admit(), + // just try to fetch device runtime information from cached state here + devOpts, err := cm.deviceManager.GetDeviceRunContainerOptions(pod, container) + if err != nil { + return nil, err + } else if devOpts == nil { + return opts, nil + } + opts.Devices = append(opts.Devices, devOpts.Devices...) + opts.Mounts = append(opts.Mounts, devOpts.Mounts...) + opts.Envs = append(opts.Envs, devOpts.Envs...) + opts.Annotations = append(opts.Annotations, devOpts.Annotations...) + return opts, nil } -func (cm *containerManagerImpl) UpdatePluginResources(*schedulerframework.NodeInfo, *lifecycle.PodAdmitAttributes) error { - return nil +func (cm *containerManagerImpl) UpdatePluginResources(node *schedulerframework.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error { + return cm.deviceManager.UpdatePluginResources(node, attrs) } func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { @@ -177,12 +215,12 @@ func (cm *containerManagerImpl) GetPodCgroupRoot() string { return "" } -func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices { - return nil +func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices { + return cm.deviceManager.GetDevices(podUID, containerName) } func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { - return false + return cm.deviceManager.ShouldResetExtendedResourceCapacity() } func (cm *containerManagerImpl) GetAllocateResourcesPodAdmitHandler() lifecycle.PodAdmitHandler { diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 95cf058f1a7..e103b875bda 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -22,6 +22,7 @@ import ( "net" "os" "path/filepath" + "runtime" "sort" "sync" "time" @@ -126,7 +127,11 @@ func (s *sourcesReadyStub) AllReady() bool { return true } // NewManagerImpl creates a new manager. func NewManagerImpl(topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) { - return newManagerImpl(pluginapi.KubeletSocket, topology, topologyAffinityStore) + socketPath := pluginapi.KubeletSocket + if runtime.GOOS == "windows" { + socketPath = os.Getenv("SYSTEMDRIVE") + pluginapi.KubeletSocketWindows + } + return newManagerImpl(socketPath, topology, topologyAffinityStore) } func newManagerImpl(socketPath string, topology []cadvisorapi.Node, topologyAffinityStore topologymanager.Store) (*ManagerImpl, error) { diff --git a/staging/src/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/constants.go b/staging/src/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/constants.go index 4c40f21240e..3bed2148033 100644 --- a/staging/src/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/constants.go +++ b/staging/src/k8s.io/kubelet/pkg/apis/deviceplugin/v1beta1/constants.go @@ -30,7 +30,17 @@ const ( DevicePluginPath = "/var/lib/kubelet/device-plugins/" // KubeletSocket is the path of the Kubelet registry socket KubeletSocket = DevicePluginPath + "kubelet.sock" + + // DevicePluginPathWindows Avoid failed to run Kubelet: bad socketPath, + // must be an absolute path: /var/lib/kubelet/device-plugins/kubelet.sock + // https://github.com/kubernetes/kubernetes/issues/93262 + // https://github.com/kubernetes/kubernetes/pull/93285#discussion_r458140701 + DevicePluginPathWindows = "\\var\\lib\\kubelet\\device-plugins\\" + // KubeletSocketWindows is the path of the Kubelet registry socket on windows + KubeletSocketWindows = DevicePluginPathWindows + "kubelet.sock" + // KubeletPreStartContainerRPCTimeoutInSecs is the timeout duration in secs for PreStartContainer RPC + // Timeout duration in secs for PreStartContainer RPC KubeletPreStartContainerRPCTimeoutInSecs = 30 )