kubelet: cm: refactor QoS logic into seperate interface

This commit is contained in:
Seth Jennings 2017-02-21 14:10:45 -06:00
parent 49d1814b3a
commit b9adb66426
9 changed files with 194 additions and 69 deletions

View File

@ -19,6 +19,7 @@ go_library(
"node_container_manager.go", "node_container_manager.go",
"pod_container_manager_linux.go", "pod_container_manager_linux.go",
"pod_container_manager_stub.go", "pod_container_manager_stub.go",
"qos_container_manager_linux.go",
"types.go", "types.go",
], ],
tags = ["automanaged"], tags = ["automanaged"],

View File

@ -23,12 +23,14 @@ import (
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
) )
type ActivePodsFunc func() []*v1.Pod
// Manages the containers running on a machine. // Manages the containers running on a machine.
type ContainerManager interface { type ContainerManager interface {
// Runs the container manager's housekeeping. // Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container. // - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run. // - Creates the system container where all non-containerized processes run.
Start(*v1.Node) error Start(*v1.Node, ActivePodsFunc) error
// Returns resources allocated to system cgroups in the machine. // Returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services. // These cgroups include the system and Kubernetes services.
@ -52,6 +54,10 @@ type ContainerManager interface {
// GetNodeAllocatable returns the amount of compute resources that have to be reserved from scheduling. // GetNodeAllocatable returns the amount of compute resources that have to be reserved from scheduling.
GetNodeAllocatableReservation() v1.ResourceList GetNodeAllocatableReservation() v1.ResourceList
// UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way
UpdateQOSCgroups() error
} }
type NodeConfig struct { type NodeConfig struct {

View File

@ -116,6 +116,8 @@ type containerManagerImpl struct {
cgroupRoot string cgroupRoot string
// Event recorder interface. // Event recorder interface.
recorder record.EventRecorder recorder record.EventRecorder
// Interface for QoS cgroup management
qosContainerManager QOSContainerManager
} }
type features struct { type features struct {
@ -246,15 +248,22 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cgroupRoot = path.Join(cgroupRoot, defaultNodeAllocatableCgroupName) cgroupRoot = path.Join(cgroupRoot, defaultNodeAllocatableCgroupName)
} }
glog.Infof("Creating Container Manager object based on Node Config: %+v", nodeConfig) glog.Infof("Creating Container Manager object based on Node Config: %+v", nodeConfig)
qosContainerManager, err := NewQOSContainerManager(subsystems, cgroupRoot, nodeConfig)
if err != nil {
return nil, err
}
return &containerManagerImpl{ return &containerManagerImpl{
cadvisorInterface: cadvisorInterface, cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil, mountUtil: mountUtil,
NodeConfig: nodeConfig, NodeConfig: nodeConfig,
subsystems: subsystems, subsystems: subsystems,
cgroupManager: cgroupManager, cgroupManager: cgroupManager,
capacity: capacity, capacity: capacity,
cgroupRoot: cgroupRoot, cgroupRoot: cgroupRoot,
recorder: recorder, recorder: recorder,
qosContainerManager: qosContainerManager,
}, nil }, nil
} }
@ -264,7 +273,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager { func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
if cm.NodeConfig.CgroupsPerQOS { if cm.NodeConfig.CgroupsPerQOS {
return &podContainerManagerImpl{ return &podContainerManagerImpl{
qosContainersInfo: cm.qosContainers, qosContainersInfo: cm.GetQOSContainersInfo(),
nodeInfo: cm.nodeInfo, nodeInfo: cm.nodeInfo,
subsystems: cm.subsystems, subsystems: cm.subsystems,
cgroupManager: cm.cgroupManager, cgroupManager: cm.cgroupManager,
@ -297,53 +306,6 @@ const (
KernelTunableModify KernelTunableBehavior = "modify" KernelTunableModify KernelTunableBehavior = "modify"
) )
// InitQOS creates the top level qos cgroup containers
// We create top level QoS containers for only Burstable and Best Effort
// and not Guaranteed QoS class. All guaranteed pods are nested under the
// RootContainer by default. InitQOS is called only once during kubelet bootstrapping.
func InitQOS(cgroupDriver, rootContainer string, subsystems *CgroupSubsystems) (QOSContainersInfo, error) {
cm := NewCgroupManager(subsystems, cgroupDriver)
// Top level for Qos containers are created only for Burstable
// and Best Effort classes
qosClasses := [2]v1.PodQOSClass{v1.PodQOSBurstable, v1.PodQOSBestEffort}
// Create containers for both qos classes
for _, qosClass := range qosClasses {
// get the container's absolute name
absoluteContainerName := CgroupName(path.Join(rootContainer, string(qosClass)))
resourceParameters := &ResourceConfig{}
// the BestEffort QoS class has a statically configured minShares value
if qosClass == v1.PodQOSBestEffort {
minShares := int64(MinShares)
resourceParameters.CpuShares = &minShares
}
// containerConfig object stores the cgroup specifications
containerConfig := &CgroupConfig{
Name: absoluteContainerName,
ResourceParameters: resourceParameters,
}
// check if it exists
if !cm.Exists(absoluteContainerName) {
if err := cm.Create(containerConfig); err != nil {
return QOSContainersInfo{}, fmt.Errorf("failed to create top level %v QOS cgroup : %v", qosClass, err)
}
} else {
// to ensure we actually have the right state, we update the config on startup
if err := cm.Update(containerConfig); err != nil {
return QOSContainersInfo{}, fmt.Errorf("failed to update top level %v QOS cgroup : %v", qosClass, err)
}
}
}
// Store the top level qos container names
qosContainersInfo := QOSContainersInfo{
Guaranteed: rootContainer,
Burstable: path.Join(rootContainer, string(v1.PodQOSBurstable)),
BestEffort: path.Join(rootContainer, string(v1.PodQOSBestEffort)),
}
return qosContainersInfo, nil
}
// setupKernelTunables validates kernel tunable flags are set as expected // setupKernelTunables validates kernel tunable flags are set as expected
// depending upon the specified option, it will either warn, error, or modify the kernel tunable flags // depending upon the specified option, it will either warn, error, or modify the kernel tunable flags
func setupKernelTunables(option KernelTunableBehavior) error { func setupKernelTunables(option KernelTunableBehavior) error {
@ -383,7 +345,7 @@ func setupKernelTunables(option KernelTunableBehavior) error {
return utilerrors.NewAggregate(errList) return utilerrors.NewAggregate(errList)
} }
func (cm *containerManagerImpl) setupNode() error { func (cm *containerManagerImpl) setupNode(activePods ActivePodsFunc) error {
f, err := validateSystemRequirements(cm.mountUtil) f, err := validateSystemRequirements(cm.mountUtil)
if err != nil { if err != nil {
return err return err
@ -404,11 +366,10 @@ func (cm *containerManagerImpl) setupNode() error {
if err := cm.createNodeAllocatableCgroups(); err != nil { if err := cm.createNodeAllocatableCgroups(); err != nil {
return err return err
} }
qosContainersInfo, err := InitQOS(cm.NodeConfig.CgroupDriver, cm.cgroupRoot, cm.subsystems) err = cm.qosContainerManager.Start(cm.nodeInfo, activePods)
if err != nil { if err != nil {
return fmt.Errorf("failed to initialise top level QOS containers: %v", err) return fmt.Errorf("failed to initialise top level QOS containers: %v", err)
} }
cm.qosContainers = qosContainersInfo
} }
// Enforce Node Allocatable (if required) // Enforce Node Allocatable (if required)
@ -559,7 +520,11 @@ func (cm *containerManagerImpl) GetMountedSubsystems() *CgroupSubsystems {
} }
func (cm *containerManagerImpl) GetQOSContainersInfo() QOSContainersInfo { func (cm *containerManagerImpl) GetQOSContainersInfo() QOSContainersInfo {
return cm.qosContainers return cm.qosContainerManager.GetQOSContainersInfo()
}
func (cm *containerManagerImpl) UpdateQOSCgroups() error {
return cm.qosContainerManager.UpdateCgroups()
} }
func (cm *containerManagerImpl) Status() Status { func (cm *containerManagerImpl) Status() Status {
@ -568,12 +533,12 @@ func (cm *containerManagerImpl) Status() Status {
return cm.status return cm.status
} }
func (cm *containerManagerImpl) Start(node *v1.Node) error { func (cm *containerManagerImpl) Start(node *v1.Node, activePods ActivePodsFunc) error {
// cache the node Info including resource capacity and // cache the node Info including resource capacity and
// allocatable of the node // allocatable of the node
cm.nodeInfo = node cm.nodeInfo = node
// Setup the node // Setup the node
if err := cm.setupNode(); err != nil { if err := cm.setupNode(activePods); err != nil {
return err return err
} }
// Ensure that node allocatable configuration is valid. // Ensure that node allocatable configuration is valid.

View File

@ -25,7 +25,7 @@ type containerManagerStub struct{}
var _ ContainerManager = &containerManagerStub{} var _ ContainerManager = &containerManagerStub{}
func (cm *containerManagerStub) Start(_ *v1.Node) error { func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc) error {
glog.V(2).Infof("Starting stub container manager") glog.V(2).Infof("Starting stub container manager")
return nil return nil
} }
@ -46,6 +46,10 @@ func (cm *containerManagerStub) GetQOSContainersInfo() QOSContainersInfo {
return QOSContainersInfo{} return QOSContainersInfo{}
} }
func (cm *containerManagerStub) UpdateQOSCgroups() error {
return nil
}
func (cm *containerManagerStub) Status() Status { func (cm *containerManagerStub) Status() Status {
return Status{} return Status{}
} }

View File

@ -32,7 +32,7 @@ type unsupportedContainerManager struct {
var _ ContainerManager = &unsupportedContainerManager{} var _ ContainerManager = &unsupportedContainerManager{}
func (unsupportedContainerManager) Start(_ *v1.Node) error { func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc) error {
return fmt.Errorf("Container Manager is unsupported in this build") return fmt.Errorf("Container Manager is unsupported in this build")
} }
@ -52,6 +52,10 @@ func (unsupportedContainerManager) GetQOSContainersInfo() QOSContainersInfo {
return QOSContainersInfo{} return QOSContainersInfo{}
} }
func (unsupportedContainerManager) UpdateQOSCgroups() error {
return nil
}
func (cm *unsupportedContainerManager) Status() Status { func (cm *unsupportedContainerManager) Status() Status {
return Status{} return Status{}
} }

View File

@ -33,7 +33,7 @@ type containerManagerImpl struct {
var _ ContainerManager = &containerManagerImpl{} var _ ContainerManager = &containerManagerImpl{}
func (cm *containerManagerImpl) Start(_ *v1.Node) error { func (cm *containerManagerImpl) Start(_ *v1.Node, _ ActivePodsFunc) error {
glog.V(2).Infof("Starting Windows stub container manager") glog.V(2).Infof("Starting Windows stub container manager")
return nil return nil
} }

View File

@ -0,0 +1,137 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"fmt"
"path"
"sync"
"k8s.io/kubernetes/pkg/api/v1"
)
type QOSContainerManager interface {
Start(*v1.Node, ActivePodsFunc) error
GetQOSContainersInfo() QOSContainersInfo
UpdateCgroups() error
}
type qosContainerManagerImpl struct {
sync.Mutex
nodeInfo *v1.Node
qosContainersInfo QOSContainersInfo
subsystems *CgroupSubsystems
cgroupManager CgroupManager
activePods ActivePodsFunc
cgroupRoot string
}
func NewQOSContainerManager(subsystems *CgroupSubsystems, cgroupRoot string, nodeConfig NodeConfig) (QOSContainerManager, error) {
if !nodeConfig.CgroupsPerQOS {
return &qosContainerManagerNoop{
cgroupRoot: CgroupName(nodeConfig.CgroupRoot),
}, nil
}
return &qosContainerManagerImpl{
subsystems: subsystems,
cgroupManager: NewCgroupManager(subsystems, nodeConfig.CgroupDriver),
cgroupRoot: cgroupRoot,
}, nil
}
func (m *qosContainerManagerImpl) GetQOSContainersInfo() QOSContainersInfo {
return m.qosContainersInfo
}
func (m *qosContainerManagerImpl) Start(nodeInfo *v1.Node, activePods ActivePodsFunc) error {
cm := m.cgroupManager
rootContainer := m.cgroupRoot
if !cm.Exists(CgroupName(rootContainer)) {
return fmt.Errorf("root container %s doesn't exist", rootContainer)
}
// Top level for Qos containers are created only for Burstable
// and Best Effort classes
qosClasses := [2]v1.PodQOSClass{v1.PodQOSBurstable, v1.PodQOSBestEffort}
// Create containers for both qos classes
for _, qosClass := range qosClasses {
// get the container's absolute name
absoluteContainerName := CgroupName(path.Join(rootContainer, string(qosClass)))
resourceParameters := &ResourceConfig{}
// the BestEffort QoS class has a statically configured minShares value
if qosClass == v1.PodQOSBestEffort {
minShares := int64(MinShares)
resourceParameters.CpuShares = &minShares
}
// containerConfig object stores the cgroup specifications
containerConfig := &CgroupConfig{
Name: absoluteContainerName,
ResourceParameters: resourceParameters,
}
// check if it exists
if !cm.Exists(absoluteContainerName) {
if err := cm.Create(containerConfig); err != nil {
return fmt.Errorf("failed to create top level %v QOS cgroup : %v", qosClass, err)
}
} else {
// to ensure we actually have the right state, we update the config on startup
if err := cm.Update(containerConfig); err != nil {
return fmt.Errorf("failed to update top level %v QOS cgroup : %v", qosClass, err)
}
}
}
// Store the top level qos container names
m.qosContainersInfo = QOSContainersInfo{
Guaranteed: rootContainer,
Burstable: path.Join(rootContainer, string(v1.PodQOSBurstable)),
BestEffort: path.Join(rootContainer, string(v1.PodQOSBestEffort)),
}
m.nodeInfo = nodeInfo
m.activePods = activePods
return nil
}
func (m *qosContainerManagerImpl) UpdateCgroups() error {
m.Lock()
defer m.Unlock()
// TODO: Update cgroups
return nil
}
type qosContainerManagerNoop struct {
cgroupRoot CgroupName
}
var _ QOSContainerManager = &qosContainerManagerNoop{}
func (m *qosContainerManagerNoop) GetQOSContainersInfo() QOSContainersInfo {
return QOSContainersInfo{}
}
func (m *qosContainerManagerNoop) Start(_ *v1.Node, _ ActivePodsFunc) error {
return nil
}
func (m *qosContainerManagerNoop) UpdateCgroups() error {
return nil
}

View File

@ -1173,7 +1173,7 @@ func (kl *Kubelet) initializeModules() error {
return fmt.Errorf("Kubelet failed to get node info: %v", err) return fmt.Errorf("Kubelet failed to get node info: %v", err)
} }
if err := kl.containerManager.Start(node); err != nil { if err := kl.containerManager.Start(node, kl.getActivePods); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err) return fmt.Errorf("Failed to start ContainerManager %v", err)
} }
@ -1468,8 +1468,13 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
// they are not expected to run again. // they are not expected to run again.
// We don't create and apply updates to cgroup if its a run once pod and was killed above // We don't create and apply updates to cgroup if its a run once pod and was killed above
if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) { if !(podKilled && pod.Spec.RestartPolicy == v1.RestartPolicyNever) {
if err := pcm.EnsureExists(pod); err != nil { if !pcm.Exists(pod) {
return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err) if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
glog.V(2).Infof("Failed to update QoS cgroups while syncing pod: %v", err)
}
if err := pcm.EnsureExists(pod); err != nil {
return fmt.Errorf("failed to ensure that the pod: %v cgroups exist and are correctly applied: %v", pod.UID, err)
}
} }
} }
} }

View File

@ -654,6 +654,9 @@ func (kl *Kubelet) killPod(pod *v1.Pod, runningPod *kubecontainer.Pod, status *k
glog.Warningf("Failed to reduce the CPU values to the minimum amount of shares: %v", err) glog.Warningf("Failed to reduce the CPU values to the minimum amount of shares: %v", err)
} }
} }
if err := kl.containerManager.UpdateQOSCgroups(); err != nil {
glog.V(2).Infof("Failed to update QoS cgroups while killing pod: %v", err)
}
return nil return nil
} }