Merge pull request #74357 from lmdaly/topology-manager-container-manager

Updates to container manager and internal container lifecycle to accommodate TopologyManager
This commit is contained in:
Kubernetes Prow Robot 2019-08-01 11:52:17 -07:00 committed by GitHub
commit 93e6fb30f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 67 additions and 7 deletions

View File

@ -693,6 +693,7 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies, stopCh <-chan
ExperimentalPodPidsLimit: s.PodPidsLimit, ExperimentalPodPidsLimit: s.PodPidsLimit,
EnforceCPULimits: s.CPUCFSQuota, EnforceCPULimits: s.CPUCFSQuota,
CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration, CPUCFSQuotaPeriod: s.CPUCFSQuotaPeriod.Duration,
ExperimentalTopologyManagerPolicy: s.TopologyManagerPolicy,
}, },
s.FailSwapOn, s.FailSwapOn,
devicePluginEnabled, devicePluginEnabled,

View File

@ -28,6 +28,7 @@ go_library(
"//pkg/features:go_default_library", "//pkg/features:go_default_library",
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library", "//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library", "//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/cm/topologymanager:go_default_library",
"//pkg/kubelet/config:go_default_library", "//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library", "//pkg/kubelet/eviction/api:go_default_library",

View File

@ -24,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
internalapi "k8s.io/cri-api/pkg/apis" internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
@ -108,6 +109,9 @@ type ContainerManager interface {
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed, // ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation. // due to node recreation.
ShouldResetExtendedResourceCapacity() bool ShouldResetExtendedResourceCapacity() bool
// GetTopologyPodAdmitHandler returns an instance of the TopologyManager for Pod Admission
GetTopologyPodAdmitHandler() topologymanager.Manager
} }
type NodeConfig struct { type NodeConfig struct {
@ -127,6 +131,7 @@ type NodeConfig struct {
ExperimentalPodPidsLimit int64 ExperimentalPodPidsLimit int64
EnforceCPULimits bool EnforceCPULimits bool
CPUCFSQuotaPeriod time.Duration CPUCFSQuotaPeriod time.Duration
ExperimentalTopologyManagerPolicy string
} }
type NodeAllocatableConfig struct { type NodeAllocatableConfig struct {

View File

@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -139,6 +140,8 @@ type containerManagerImpl struct {
deviceManager devicemanager.Manager deviceManager devicemanager.Manager
// Interface for CPU affinity management. // Interface for CPU affinity management.
cpuManager cpumanager.Manager cpuManager cpumanager.Manager
// Interface for Topology resource co-ordination
topologyManager topologymanager.Manager
} }
type features struct { type features struct {
@ -284,6 +287,20 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
qosContainerManager: qosContainerManager, qosContainerManager: qosContainerManager,
} }
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
cm.topologyManager, err = topologymanager.NewManager(
nodeConfig.ExperimentalTopologyManagerPolicy,
)
if err != nil {
return nil, err
}
klog.Infof("[topologymanager] Initilizing Topology Manager with %s policy", nodeConfig.ExperimentalTopologyManagerPolicy)
} else {
cm.topologyManager = topologymanager.NewFakeManager()
}
klog.Infof("Creating device plugin manager: %t", devicePluginEnabled) klog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
if devicePluginEnabled { if devicePluginEnabled {
cm.deviceManager, err = devicemanager.NewManagerImpl() cm.deviceManager, err = devicemanager.NewManagerImpl()
@ -332,7 +349,7 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
} }
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cm.cpuManager} return &internalContainerLifecycleImpl{cm.cpuManager, cm.topologyManager}
} }
// Create a cgroup container manager. // Create a cgroup container manager.
@ -644,6 +661,10 @@ func (cm *containerManagerImpl) UpdatePluginResources(node *schedulernodeinfo.No
return cm.deviceManager.Allocate(node, attrs) return cm.deviceManager.Allocate(node, attrs)
} }
func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
return cm.topologyManager
}
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
cpuLimit := int64(0) cpuLimit := int64(0)

View File

@ -24,6 +24,7 @@ import (
internalapi "k8s.io/cri-api/pkg/apis" internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -101,7 +102,7 @@ func (cm *containerManagerStub) UpdatePluginResources(*schedulernodeinfo.NodeInf
} }
func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle { func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()} return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()}
} }
func (cm *containerManagerStub) GetPodCgroupRoot() string { func (cm *containerManagerStub) GetPodCgroupRoot() string {
@ -116,6 +117,10 @@ func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool {
return cm.shouldResetExtendedResourceCapacity return cm.shouldResetExtendedResourceCapacity
} }
func (cm *containerManagerStub) GetTopologyPodAdmitHandler() topologymanager.Manager {
return nil
}
func NewStubContainerManager() ContainerManager { func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false} return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
} }

View File

@ -34,6 +34,7 @@ import (
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -161,7 +162,7 @@ func (cm *containerManagerImpl) UpdatePluginResources(*schedulernodeinfo.NodeInf
} }
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cpumanager.NewFakeManager()} return &internalContainerLifecycleImpl{cpumanager.NewFakeManager(), topologymanager.NewFakeManager()}
} }
func (cm *containerManagerImpl) GetPodCgroupRoot() string { func (cm *containerManagerImpl) GetPodCgroupRoot() string {
@ -175,3 +176,7 @@ func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.Conta
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool { func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return false return false
} }
func (cm *containerManagerImpl) GetTopologyPodAdmitHandler() topologymanager.Manager {
return nil
}

View File

@ -22,6 +22,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
kubefeatures "k8s.io/kubernetes/pkg/features" kubefeatures "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
"k8s.io/kubernetes/pkg/kubelet/cm/topologymanager"
) )
type InternalContainerLifecycle interface { type InternalContainerLifecycle interface {
@ -33,11 +34,21 @@ type InternalContainerLifecycle interface {
// Implements InternalContainerLifecycle interface. // Implements InternalContainerLifecycle interface.
type internalContainerLifecycleImpl struct { type internalContainerLifecycleImpl struct {
cpuManager cpumanager.Manager cpuManager cpumanager.Manager
topologyManager topologymanager.Manager
} }
func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error { func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.AddContainer(pod, container, containerID) err := i.cpuManager.AddContainer(pod, container, containerID)
if err != nil {
return err
}
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
err := i.topologyManager.AddContainer(pod, containerID)
if err != nil {
return err
}
} }
return nil return nil
} }
@ -51,7 +62,16 @@ func (i *internalContainerLifecycleImpl) PreStopContainer(containerID string) er
func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error { func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.RemoveContainer(containerID) err := i.cpuManager.RemoveContainer(containerID)
if err != nil {
return err
}
}
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
err := i.topologyManager.RemoveContainer(containerID)
if err != nil {
return err
}
} }
return nil return nil
} }

View File

@ -859,7 +859,9 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
} }
klet.AddPodSyncLoopHandler(activeDeadlineHandler) klet.AddPodSyncLoopHandler(activeDeadlineHandler)
klet.AddPodSyncHandler(activeDeadlineHandler) klet.AddPodSyncHandler(activeDeadlineHandler)
if utilfeature.DefaultFeatureGate.Enabled(features.TopologyManager) {
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetTopologyPodAdmitHandler())
}
criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder) criticalPodAdmissionHandler := preemption.NewCriticalPodAdmissionHandler(klet.GetActivePods, killPodNow(klet.podWorkers, kubeDeps.Recorder), kubeDeps.Recorder)
klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)) klet.admitHandlers.AddPodAdmitHandler(lifecycle.NewPredicateAdmitHandler(klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources))
// apply functional Option's // apply functional Option's