CPU Manager initialization and lifecycle calls.

This commit is contained in:
Connor Doyle 2017-08-28 15:02:01 -07:00
parent 5dee682796
commit 7c6e31617d
15 changed files with 204 additions and 47 deletions

View File

@ -446,7 +446,8 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) {
SystemReserved: systemReserved,
HardEvictionThresholds: hardEvictionThresholds,
},
ExperimentalQOSReserved: *experimentalQOSReserved,
ExperimentalQOSReserved: *experimentalQOSReserved,
ExperimentalCPUManagerPolicy: s.CPUManagerPolicy,
},
s.FailSwapOn,
kubeDeps.Recorder)

View File

@ -7,7 +7,9 @@ go_library(
"container_manager.go",
"container_manager_stub.go",
"container_manager_unsupported.go",
"fake_internal_container_lifecycle.go",
"helpers_unsupported.go",
"internal_container_lifecycle.go",
"pod_container_manager_stub.go",
"pod_container_manager_unsupported.go",
"types.go",
@ -27,9 +29,11 @@ go_library(
}),
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/util/mount:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
@ -41,6 +45,8 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/v1/helper/qos:go_default_library",
"//pkg/api/v1/resource:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/cm/cpumanager:go_default_library",
"//pkg/kubelet/cm/util:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
@ -57,6 +63,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
"//conditions:default": [],
}),

View File

@ -20,8 +20,10 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
// TODO: Migrate kubelet to either use its own internal objects or client library.
"k8s.io/api/core/v1"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/status"
"fmt"
"strconv"
@ -35,7 +37,7 @@ type ContainerManager interface {
// Runs the container manager's housekeeping.
// - Ensures that the Docker daemon is in a container.
// - Creates the system container where all non-containerized processes run.
Start(*v1.Node, ActivePodsFunc) error
Start(*v1.Node, ActivePodsFunc, status.PodStatusProvider, internalapi.RuntimeService) error
// Returns resources allocated to system cgroups in the machine.
// These cgroups include the system and Kubernetes services.
@ -66,6 +68,8 @@ type ContainerManager interface {
// UpdateQOSCgroups performs housekeeping updates to ensure that the top
// level QoS containers have their desired state in a thread-safe way
UpdateQOSCgroups() error
InternalContainerLifecycle() InternalContainerLifecycle
}
type NodeConfig struct {
@ -78,7 +82,8 @@ type NodeConfig struct {
CgroupDriver string
ProtectKernelDefaults bool
NodeAllocatableConfig
ExperimentalQOSReserved map[v1.ResourceName]int64
ExperimentalQOSReserved map[v1.ResourceName]int64
ExperimentalCPUManagerPolicy string
}
type NodeAllocatableConfig struct {

View File

@ -33,15 +33,21 @@ import (
"github.com/opencontainers/runc/libcontainer/cgroups"
"github.com/opencontainers/runc/libcontainer/cgroups/fs"
"github.com/opencontainers/runc/libcontainer/configs"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
kubefeatures "k8s.io/kubernetes/pkg/features"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager"
cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/status"
utilfile "k8s.io/kubernetes/pkg/util/file"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom"
@ -117,6 +123,8 @@ type containerManagerImpl struct {
recorder record.EventRecorder
// Interface for QoS cgroup management
qosContainerManager QOSContainerManager
// Interface for CPU affinity management.
cpuManager cpumanager.Manager
}
type features struct {
@ -216,11 +224,11 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
// It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because
// machine info is computed and cached once as part of cAdvisor object creation.
// But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts
if info, err := cadvisorInterface.MachineInfo(); err == nil {
capacity = cadvisor.CapacityFromMachineInfo(info)
} else {
machineInfo, err := cadvisorInterface.MachineInfo()
if err != nil {
return nil, err
}
capacity = cadvisor.CapacityFromMachineInfo(machineInfo)
cgroupRoot := nodeConfig.CgroupRoot
cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver)
@ -250,7 +258,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
return nil, err
}
return &containerManagerImpl{
cm := &containerManagerImpl{
cadvisorInterface: cadvisorInterface,
mountUtil: mountUtil,
NodeConfig: nodeConfig,
@ -260,7 +268,22 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
cgroupRoot: cgroupRoot,
recorder: recorder,
qosContainerManager: qosContainerManager,
}, nil
}
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager, err = cpumanager.NewManager(
nodeConfig.ExperimentalCPUManagerPolicy,
machineInfo,
cm.GetNodeAllocatableReservation(),
)
if err != nil {
glog.Errorf("failed to initialize cpu manager: %v", err)
return nil, err
}
}
return cm, nil
}
// NewPodContainerManager is a factory method returns a PodContainerManager object
@ -279,6 +302,36 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager {
}
}
func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle {
return &internalContainerLifecycleImpl{cm.cpuManager}
}
// Implements InternalContainerLifecycle interface.
type internalContainerLifecycleImpl struct {
cpuManager cpumanager.Manager
}
func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.AddContainer(pod, container, containerID)
}
return nil
}
func (i *internalContainerLifecycleImpl) PreStopContainer(containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.RemoveContainer(containerID)
}
return nil
}
func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error {
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
return i.cpuManager.RemoveContainer(containerID)
}
return nil
}
// Create a cgroup container manager.
func createManager(containerName string) *fs.Manager {
allowAllDevices := true
@ -485,7 +538,16 @@ func (cm *containerManagerImpl) Status() Status {
return cm.status
}
func (cm *containerManagerImpl) Start(node *v1.Node, activePods ActivePodsFunc) error {
func (cm *containerManagerImpl) Start(node *v1.Node,
activePods ActivePodsFunc,
podStatusProvider status.PodStatusProvider,
runtimeService internalapi.RuntimeService) error {
// Initialize CPU manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), podStatusProvider, runtimeService)
}
// cache the node Info including resource capacity and
// allocatable of the node
cm.nodeInfo = node

View File

@ -19,13 +19,16 @@ package cm
import (
"github.com/golang/glog"
"k8s.io/api/core/v1"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/status"
)
type containerManagerStub struct{}
var _ ContainerManager = &containerManagerStub{}
func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc) error {
func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
glog.V(2).Infof("Starting stub container manager")
return nil
}
@ -66,6 +69,10 @@ func (cm *containerManagerStub) NewPodContainerManager() PodContainerManager {
return &podContainerManagerStub{}
}
func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle {
return nil
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{}
}

View File

@ -23,7 +23,9 @@ import (
"k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/util/mount"
)
@ -32,7 +34,7 @@ type unsupportedContainerManager struct {
var _ ContainerManager = &unsupportedContainerManager{}
func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc) error {
func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error {
return fmt.Errorf("Container Manager is unsupported in this build")
}
@ -72,6 +74,10 @@ func (cm *unsupportedContainerManager) NewPodContainerManager() PodContainerMana
return &unsupportedPodContainerManager{}
}
func (cm *unsupportedContainerManager) InternalContainerLifecycle() InternalContainerLifecycle {
return nil
}
func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, recorder record.EventRecorder) (ContainerManager, error) {
return &unsupportedContainerManager{}, nil
}

View File

@ -33,7 +33,6 @@ go_test(
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/github.com/google/cadvisor/info/v1:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -59,22 +58,3 @@ filegroup(
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = [
"cpu_manager_test.go",
"policy_none_test.go",
],
library = ":go_default_library",
deps = [
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/cm/cpumanager/state:go_default_library",
"//pkg/kubelet/cm/cpuset:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)

View File

@ -1,25 +1,16 @@
package(default_visibility = ["//visibility:public"])
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
go_library(
name = "go_default_library",
srcs = ["cpuset.go"],
visibility = ["//visibility:public"],
deps = ["//vendor/github.com/golang/glog:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["cpuset_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
)
go_library(
name = "go_default_library",
srcs = ["cpuset.go"],
tags = ["automanaged"],
deps = ["//vendor/github.com/golang/glog:go_default_library"],
)
filegroup(
@ -33,4 +24,5 @@ filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,39 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"k8s.io/api/core/v1"
)
func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle {
return &fakeInternalContainerLifecycle{}
}
type fakeInternalContainerLifecycle struct{}
func (f *fakeInternalContainerLifecycle) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error {
return nil
}
func (f *fakeInternalContainerLifecycle) PreStopContainer(containerID string) error {
return nil
}
func (f *fakeInternalContainerLifecycle) PostStopContainer(containerID string) error {
return nil
}

View File

@ -0,0 +1,27 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cm
import (
"k8s.io/api/core/v1"
)
type InternalContainerLifecycle interface {
PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error
PreStopContainer(containerID string) error
PostStopContainer(containerID string) error
}

View File

@ -622,6 +622,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
if err != nil {
return nil, err
}
klet.runtimeService = runtimeService
runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
@ -639,6 +640,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
kubeCfg.CPUCFSQuota,
runtimeService,
imageService,
kubeDeps.ContainerManager.InternalContainerLifecycle(),
)
if err != nil {
return nil, err
@ -979,6 +981,11 @@ type Kubelet struct {
// Container runtime.
containerRuntime kubecontainer.Runtime
// Container runtime service (needed by container runtime Start()).
// TODO(CD): try to make this available without holding a reference in this
// struct. For example, by adding a getter to generic runtime.
runtimeService internalapi.RuntimeService
// reasonCache caches the failure reason of the last creation of all containers, which is
// used for generating ContainerStatus.
reasonCache *ReasonCache
@ -1240,7 +1247,7 @@ func (kl *Kubelet) initializeModules() error {
return fmt.Errorf("Kubelet failed to get node info: %v", err)
}
if err := kl.containerManager.Start(node, kl.GetActivePods); err != nil {
if err := kl.containerManager.Start(node, kl.GetActivePods, kl.statusManager, kl.runtimeService); err != nil {
return fmt.Errorf("Failed to start ContainerManager %v", err)
}

View File

@ -29,6 +29,7 @@ go_library(
"//pkg/credentialprovider:go_default_library",
"//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/kubelet/images:go_default_library",

View File

@ -27,6 +27,7 @@ import (
"k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/pkg/credentialprovider"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -69,6 +70,7 @@ func NewFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS
runtimeService: runtimeService,
imageService: imageService,
keyring: keyring,
internalLifecycle: cm.NewFakeInternalContainerLifecycle(),
}
typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion)

View File

@ -116,6 +116,11 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb
m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err))
return grpc.ErrorDesc(err), ErrCreateContainer
}
err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
if err != nil {
m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", err)
return "Internal PreStartContainer hook failed", err
}
m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container")
if ref != nil {
@ -574,6 +579,11 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubec
glog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod)
// Run internal pre-stop lifecycle hook
if err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil {
return err
}
// Run the pre-stop lifecycle hooks if applicable and if there is enough time to run it
if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 {
gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod)
@ -805,6 +815,11 @@ func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID,
// it will not write container logs anymore in that state.
func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error {
glog.V(4).Infof("Removing container %q", containerID)
// Call internal container post-stop lifecycle hook.
if err := m.internalLifecycle.PostStopContainer(containerID); err != nil {
return err
}
// Remove the container log.
// TODO: Separate log and container lifecycle management.
if err := m.removeContainerLog(containerID); err != nil {

View File

@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/credentialprovider"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/images"
@ -108,6 +109,9 @@ type kubeGenericRuntimeManager struct {
// The directory path for seccomp profiles.
seccompProfileRoot string
// Internal lifecycle event handlers for container resource management.
internalLifecycle cm.InternalContainerLifecycle
}
type KubeGenericRuntime interface {
@ -134,6 +138,7 @@ func NewKubeGenericRuntimeManager(
cpuCFSQuota bool,
runtimeService internalapi.RuntimeService,
imageService internalapi.ImageManagerService,
internalLifecycle cm.InternalContainerLifecycle,
) (KubeGenericRuntime, error) {
kubeRuntimeManager := &kubeGenericRuntimeManager{
recorder: recorder,
@ -147,6 +152,7 @@ func NewKubeGenericRuntimeManager(
runtimeService: newInstrumentedRuntimeService(runtimeService),
imageService: newInstrumentedImageManagerService(imageService),
keyring: credentialprovider.NewDockerKeyring(),
internalLifecycle: internalLifecycle,
}
typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)