diff --git a/cmd/kubelet/app/options/options.go b/cmd/kubelet/app/options/options.go index b9fa9286908..920b8573453 100644 --- a/cmd/kubelet/app/options/options.go +++ b/cmd/kubelet/app/options/options.go @@ -327,6 +327,8 @@ func AddKubeletConfigFlags(fs *pflag.FlagSet, c *kubeletconfig.KubeletConfigurat fs.BoolVar(&c.CgroupsPerQOS, "cgroups-per-qos", c.CgroupsPerQOS, "Enable creation of QoS cgroup hierarchy, if true top level QoS and pod cgroups are created.") fs.StringVar(&c.CgroupDriver, "cgroup-driver", c.CgroupDriver, "Driver that the kubelet uses to manipulate cgroups on the host. Possible values: 'cgroupfs', 'systemd'") fs.StringVar(&c.CgroupRoot, "cgroup-root", c.CgroupRoot, "Optional root cgroup to use for pods. This is handled by the container runtime on a best effort basis. Default: '', which means use the container runtime default.") + fs.StringVar(&c.CPUManagerPolicy, "cpu-manager-policy", c.CPUManagerPolicy, " CPU Manager policy to use. Possible values: 'none'. Default: 'none'") + fs.DurationVar(&c.CPUManagerReconcilePeriod.Duration, "cpu-manager-reconcile-period", c.CPUManagerReconcilePeriod.Duration, " CPU Manager reconciliation period. Examples: '10s', or '1m'. If not supplied, defaults to `NodeStatusUpdateFrequency`") fs.StringVar(&c.ContainerRuntime, "container-runtime", c.ContainerRuntime, "The container runtime to use. Possible values: 'docker', 'rkt'.") fs.DurationVar(&c.RuntimeRequestTimeout.Duration, "runtime-request-timeout", c.RuntimeRequestTimeout.Duration, "Timeout of all runtime requests except long running request - pull, logs, exec and attach. When timeout exceeded, kubelet will cancel the request, throw out an error and retry later.") fs.StringVar(&c.LockFilePath, "lock-file", c.LockFilePath, " The path to file for kubelet to use as a lock file.") diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 2eb641d9201..2910ae9a2c1 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -450,7 +450,9 @@ func run(s *options.KubeletServer, kubeDeps *kubelet.Dependencies) (err error) { SystemReserved: systemReserved, HardEvictionThresholds: hardEvictionThresholds, }, - ExperimentalQOSReserved: *experimentalQOSReserved, + ExperimentalQOSReserved: *experimentalQOSReserved, + ExperimentalCPUManagerPolicy: s.CPUManagerPolicy, + ExperimentalCPUManagerReconcilePeriod: s.CPUManagerReconcilePeriod.Duration, }, s.FailSwapOn, devicePluginEnabled, diff --git a/pkg/features/kube_features.go b/pkg/features/kube_features.go index 29b5cc2b46b..6c6ee2895ee 100644 --- a/pkg/features/kube_features.go +++ b/pkg/features/kube_features.go @@ -155,6 +155,12 @@ const ( // // Enable mount propagation of volumes. MountPropagation utilfeature.Feature = "MountPropagation" + + // owner: @ConnorDoyle + // alpha: v1.8 + // + // Alternative container-level CPU affinity policies. + CPUManager utilfeature.Feature = "CPUManager" ) func init() { @@ -185,6 +191,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS TaintNodesByCondition: {Default: false, PreRelease: utilfeature.Alpha}, MountPropagation: {Default: false, PreRelease: utilfeature.Alpha}, ExpandPersistentVolumes: {Default: false, PreRelease: utilfeature.Alpha}, + CPUManager: {Default: false, PreRelease: utilfeature.Alpha}, // inherited features from generic apiserver, relisted here to get a conflict if it is changed // unintentionally on either side: diff --git a/pkg/kubelet/apis/kubeletconfig/types.go b/pkg/kubelet/apis/kubeletconfig/types.go index 2f44149bcf7..338791c2fb7 100644 --- a/pkg/kubelet/apis/kubeletconfig/types.go +++ b/pkg/kubelet/apis/kubeletconfig/types.go @@ -214,6 +214,10 @@ type KubeletConfiguration struct { RemoteRuntimeEndpoint string // remoteImageEndpoint is the endpoint of remote image service RemoteImageEndpoint string + // CPUManagerPolicy is the name of the policy to use. + CPUManagerPolicy string + // CPU Manager reconciliation period. + CPUManagerReconcilePeriod metav1.Duration // runtimeRequestTimeout is the timeout for all runtime requests except long running // requests - pull, logs, exec and attach. // +optional diff --git a/pkg/kubelet/apis/kubeletconfig/v1alpha1/defaults.go b/pkg/kubelet/apis/kubeletconfig/v1alpha1/defaults.go index 83c18224aad..26ead1f7ca4 100644 --- a/pkg/kubelet/apis/kubeletconfig/v1alpha1/defaults.go +++ b/pkg/kubelet/apis/kubeletconfig/v1alpha1/defaults.go @@ -171,6 +171,12 @@ func SetDefaults_KubeletConfiguration(obj *KubeletConfiguration) { if obj.NodeStatusUpdateFrequency == zeroDuration { obj.NodeStatusUpdateFrequency = metav1.Duration{Duration: 10 * time.Second} } + if obj.CPUManagerPolicy == "" { + obj.CPUManagerPolicy = "none" + } + if obj.CPUManagerReconcilePeriod == zeroDuration { + obj.CPUManagerReconcilePeriod = obj.NodeStatusUpdateFrequency + } if obj.OOMScoreAdj == nil { temp := int32(qos.KubeletOOMScoreAdj) obj.OOMScoreAdj = &temp diff --git a/pkg/kubelet/apis/kubeletconfig/v1alpha1/types.go b/pkg/kubelet/apis/kubeletconfig/v1alpha1/types.go index 679ffa21e21..0e6172e6932 100644 --- a/pkg/kubelet/apis/kubeletconfig/v1alpha1/types.go +++ b/pkg/kubelet/apis/kubeletconfig/v1alpha1/types.go @@ -206,6 +206,10 @@ type KubeletConfiguration struct { RemoteRuntimeEndpoint string `json:"remoteRuntimeEndpoint"` // remoteImageEndpoint is the endpoint of remote image service RemoteImageEndpoint string `json:"remoteImageEndpoint"` + // CPUManagerPolicy is the name of the policy to use. + CPUManagerPolicy string `json:"cpuManagerPolicy"` + // CPU Manager reconciliation period. + CPUManagerReconcilePeriod metav1.Duration `json:"cpuManagerReconcilePeriod"` // runtimeRequestTimeout is the timeout for all runtime requests except long running // requests - pull, logs, exec and attach. RuntimeRequestTimeout metav1.Duration `json:"runtimeRequestTimeout"` diff --git a/pkg/kubelet/apis/kubeletconfig/v1alpha1/zz_generated.conversion.go b/pkg/kubelet/apis/kubeletconfig/v1alpha1/zz_generated.conversion.go index 36bd5b92318..87cf779c0db 100644 --- a/pkg/kubelet/apis/kubeletconfig/v1alpha1/zz_generated.conversion.go +++ b/pkg/kubelet/apis/kubeletconfig/v1alpha1/zz_generated.conversion.go @@ -227,6 +227,8 @@ func autoConvert_v1alpha1_KubeletConfiguration_To_kubeletconfig_KubeletConfigura out.ContainerRuntime = in.ContainerRuntime out.RemoteRuntimeEndpoint = in.RemoteRuntimeEndpoint out.RemoteImageEndpoint = in.RemoteImageEndpoint + out.CPUManagerPolicy = in.CPUManagerPolicy + out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod out.RuntimeRequestTimeout = in.RuntimeRequestTimeout out.ExperimentalMounterPath = in.ExperimentalMounterPath if err := v1.Convert_Pointer_string_To_string(&in.LockFilePath, &out.LockFilePath, s); err != nil { @@ -390,6 +392,8 @@ func autoConvert_kubeletconfig_KubeletConfiguration_To_v1alpha1_KubeletConfigura out.ContainerRuntime = in.ContainerRuntime out.RemoteRuntimeEndpoint = in.RemoteRuntimeEndpoint out.RemoteImageEndpoint = in.RemoteImageEndpoint + out.CPUManagerPolicy = in.CPUManagerPolicy + out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod out.RuntimeRequestTimeout = in.RuntimeRequestTimeout out.ExperimentalMounterPath = in.ExperimentalMounterPath if err := v1.Convert_string_To_Pointer_string(&in.LockFilePath, &out.LockFilePath, s); err != nil { diff --git a/pkg/kubelet/apis/kubeletconfig/v1alpha1/zz_generated.deepcopy.go b/pkg/kubelet/apis/kubeletconfig/v1alpha1/zz_generated.deepcopy.go index a6ba484b4f2..eff621971c0 100644 --- a/pkg/kubelet/apis/kubeletconfig/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/kubelet/apis/kubeletconfig/v1alpha1/zz_generated.deepcopy.go @@ -299,6 +299,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { **out = **in } } + out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod out.RuntimeRequestTimeout = in.RuntimeRequestTimeout if in.LockFilePath != nil { in, out := &in.LockFilePath, &out.LockFilePath diff --git a/pkg/kubelet/apis/kubeletconfig/zz_generated.deepcopy.go b/pkg/kubelet/apis/kubeletconfig/zz_generated.deepcopy.go index 3f9c00c2dad..3f37b5d303b 100644 --- a/pkg/kubelet/apis/kubeletconfig/zz_generated.deepcopy.go +++ b/pkg/kubelet/apis/kubeletconfig/zz_generated.deepcopy.go @@ -164,6 +164,7 @@ func (in *KubeletConfiguration) DeepCopyInto(out *KubeletConfiguration) { out.NodeStatusUpdateFrequency = in.NodeStatusUpdateFrequency out.ImageMinimumGCAge = in.ImageMinimumGCAge out.VolumeStatsAggPeriod = in.VolumeStatsAggPeriod + out.CPUManagerReconcilePeriod = in.CPUManagerReconcilePeriod out.RuntimeRequestTimeout = in.RuntimeRequestTimeout if in.RegisterWithTaints != nil { in, out := &in.RegisterWithTaints, &out.RegisterWithTaints diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index 6578b3541cb..46421b9efb4 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -9,7 +9,9 @@ go_library( "container_manager_unsupported.go", "device_plugin_handler.go", "device_plugin_handler_stub.go", + "fake_internal_container_lifecycle.go", "helpers_unsupported.go", + "internal_container_lifecycle.go", "pod_container_manager_stub.go", "pod_container_manager_unsupported.go", "types.go", @@ -29,12 +31,14 @@ go_library( }), visibility = ["//visibility:public"], deps = [ + "//pkg/kubelet/apis/cri:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1alpha1:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/deviceplugin:go_default_library", "//pkg/kubelet/eviction/api:go_default_library", + "//pkg/kubelet/status:go_default_library", "//pkg/util/mount:go_default_library", "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", @@ -47,6 +51,8 @@ go_library( "//pkg/api:go_default_library", "//pkg/api/v1/helper/qos:go_default_library", "//pkg/api/v1/resource:go_default_library", + "//pkg/features:go_default_library", + "//pkg/kubelet/cm/cpumanager:go_default_library", "//pkg/kubelet/cm/util:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/metrics:go_default_library", @@ -62,6 +68,7 @@ go_library( "//vendor/github.com/opencontainers/runc/libcontainer/configs:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library", ], "//conditions:default": [], }), diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index ed5312e2e73..36039bdbc0d 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -17,12 +17,16 @@ limitations under the License. package cm import ( + "time" + "k8s.io/apimachinery/pkg/util/sets" // TODO: Migrate kubelet to either use its own internal objects or client library. "k8s.io/api/core/v1" + internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" + "k8s.io/kubernetes/pkg/kubelet/status" "fmt" "strconv" @@ -36,7 +40,7 @@ type ContainerManager interface { // Runs the container manager's housekeeping. // - Ensures that the Docker daemon is in a container. // - Creates the system container where all non-containerized processes run. - Start(*v1.Node, ActivePodsFunc) error + Start(*v1.Node, ActivePodsFunc, status.PodStatusProvider, internalapi.RuntimeService) error // Returns resources allocated to system cgroups in the machine. // These cgroups include the system and Kubernetes services. @@ -71,6 +75,8 @@ type ContainerManager interface { // Returns RunContainerOptions with devices, mounts, and env fields populated for // extended resources required by container. GetResources(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) (*kubecontainer.RunContainerOptions, error) + + InternalContainerLifecycle() InternalContainerLifecycle } type NodeConfig struct { @@ -83,7 +89,9 @@ type NodeConfig struct { CgroupDriver string ProtectKernelDefaults bool NodeAllocatableConfig - ExperimentalQOSReserved map[v1.ResourceName]int64 + ExperimentalQOSReserved map[v1.ResourceName]int64 + ExperimentalCPUManagerPolicy string + ExperimentalCPUManagerReconcilePeriod time.Duration } type NodeAllocatableConfig struct { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index f3ff471bdd1..f12a27d0897 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -33,16 +33,22 @@ import ( "github.com/opencontainers/runc/libcontainer/cgroups" "github.com/opencontainers/runc/libcontainer/cgroups/fs" "github.com/opencontainers/runc/libcontainer/configs" + "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/record" + kubefeatures "k8s.io/kubernetes/pkg/features" + internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" "k8s.io/kubernetes/pkg/kubelet/cadvisor" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" cmutil "k8s.io/kubernetes/pkg/kubelet/cm/util" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/qos" + "k8s.io/kubernetes/pkg/kubelet/status" utilfile "k8s.io/kubernetes/pkg/util/file" "k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/oom" @@ -120,6 +126,8 @@ type containerManagerImpl struct { qosContainerManager QOSContainerManager // Interface for exporting and allocating devices reported by device plugins. devicePluginHandler DevicePluginHandler + // Interface for CPU affinity management. + cpuManager cpumanager.Manager } type features struct { @@ -219,11 +227,11 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I // It is safe to invoke `MachineInfo` on cAdvisor before logically initializing cAdvisor here because // machine info is computed and cached once as part of cAdvisor object creation. // But `RootFsInfo` and `ImagesFsInfo` are not available at this moment so they will be called later during manager starts - if info, err := cadvisorInterface.MachineInfo(); err == nil { - capacity = cadvisor.CapacityFromMachineInfo(info) - } else { + machineInfo, err := cadvisorInterface.MachineInfo() + if err != nil { return nil, err } + capacity = cadvisor.CapacityFromMachineInfo(machineInfo) cgroupRoot := nodeConfig.CgroupRoot cgroupManager := NewCgroupManager(subsystems, nodeConfig.CgroupDriver) @@ -287,6 +295,20 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I return nil, err } + // Initialize CPU manager + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { + cm.cpuManager, err = cpumanager.NewManager( + nodeConfig.ExperimentalCPUManagerPolicy, + nodeConfig.ExperimentalCPUManagerReconcilePeriod, + machineInfo, + cm.GetNodeAllocatableReservation(), + ) + if err != nil { + glog.Errorf("failed to initialize cpu manager: %v", err) + return nil, err + } + } + return cm, nil } @@ -306,6 +328,36 @@ func (cm *containerManagerImpl) NewPodContainerManager() PodContainerManager { } } +func (cm *containerManagerImpl) InternalContainerLifecycle() InternalContainerLifecycle { + return &internalContainerLifecycleImpl{cm.cpuManager} +} + +// Implements InternalContainerLifecycle interface. +type internalContainerLifecycleImpl struct { + cpuManager cpumanager.Manager +} + +func (i *internalContainerLifecycleImpl) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error { + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { + return i.cpuManager.AddContainer(pod, container, containerID) + } + return nil +} + +func (i *internalContainerLifecycleImpl) PreStopContainer(containerID string) error { + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { + return i.cpuManager.RemoveContainer(containerID) + } + return nil +} + +func (i *internalContainerLifecycleImpl) PostStopContainer(containerID string) error { + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { + return i.cpuManager.RemoveContainer(containerID) + } + return nil +} + // Create a cgroup container manager. func createManager(containerName string) *fs.Manager { allowAllDevices := true @@ -512,7 +564,16 @@ func (cm *containerManagerImpl) Status() Status { return cm.status } -func (cm *containerManagerImpl) Start(node *v1.Node, activePods ActivePodsFunc) error { +func (cm *containerManagerImpl) Start(node *v1.Node, + activePods ActivePodsFunc, + podStatusProvider status.PodStatusProvider, + runtimeService internalapi.RuntimeService) error { + + // Initialize CPU manager + if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { + cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), podStatusProvider, runtimeService) + } + // cache the node Info including resource capacity and // allocatable of the node cm.nodeInfo = node diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 203b682a1bc..d551ef691fe 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -20,14 +20,16 @@ import ( "github.com/golang/glog" "k8s.io/api/core/v1" + internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/status" ) type containerManagerStub struct{} var _ ContainerManager = &containerManagerStub{} -func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc) error { +func (cm *containerManagerStub) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error { glog.V(2).Infof("Starting stub container manager") return nil } @@ -72,6 +74,10 @@ func (cm *containerManagerStub) GetResources(pod *v1.Pod, container *v1.Containe return &kubecontainer.RunContainerOptions{}, nil } +func (cm *containerManagerStub) InternalContainerLifecycle() InternalContainerLifecycle { + return nil +} + func NewStubContainerManager() ContainerManager { return &containerManagerStub{} } diff --git a/pkg/kubelet/cm/container_manager_unsupported.go b/pkg/kubelet/cm/container_manager_unsupported.go index b97a6473a2b..3410dd4bbb3 100644 --- a/pkg/kubelet/cm/container_manager_unsupported.go +++ b/pkg/kubelet/cm/container_manager_unsupported.go @@ -23,8 +23,10 @@ import ( "k8s.io/api/core/v1" "k8s.io/client-go/tools/record" + internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" "k8s.io/kubernetes/pkg/kubelet/cadvisor" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/util/mount" ) @@ -33,7 +35,7 @@ type unsupportedContainerManager struct { var _ ContainerManager = &unsupportedContainerManager{} -func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc) error { +func (unsupportedContainerManager) Start(_ *v1.Node, _ ActivePodsFunc, _ status.PodStatusProvider, _ internalapi.RuntimeService) error { return fmt.Errorf("Container Manager is unsupported in this build") } @@ -77,6 +79,10 @@ func (cm *unsupportedContainerManager) GetResources(pod *v1.Pod, container *v1.C return &kubecontainer.RunContainerOptions{}, nil } +func (cm *unsupportedContainerManager) InternalContainerLifecycle() InternalContainerLifecycle { + return nil +} + func NewContainerManager(_ mount.Interface, _ cadvisor.Interface, _ NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) { return &unsupportedContainerManager{}, nil } diff --git a/pkg/kubelet/cm/cpumanager/BUILD b/pkg/kubelet/cm/cpumanager/BUILD index c496c8837d3..8065b5231de 100644 --- a/pkg/kubelet/cm/cpumanager/BUILD +++ b/pkg/kubelet/cm/cpumanager/BUILD @@ -1,17 +1,43 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ "cpu_manager.go", + "fake_cpu_manager.go", "policy.go", + "policy_none.go", ], visibility = ["//visibility:public"], deps = [ "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/cm/cpumanager/state:go_default_library", + "//pkg/kubelet/cm/cpuset:go_default_library", + "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/status:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/google/cadvisor/info/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = [ + "cpu_manager_test.go", + "policy_none_test.go", + ], + library = ":go_default_library", + deps = [ + "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", + "//pkg/kubelet/cm/cpumanager/state:go_default_library", + "//pkg/kubelet/cm/cpuset:go_default_library", + "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", ], ) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 5da92348ffc..0ee042cbe4a 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -17,10 +17,19 @@ limitations under the License. package cpumanager import ( + "fmt" + "sync" + "time" + + "github.com/golang/glog" + cadvisorapi "github.com/google/cadvisor/info/v1" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/status" ) @@ -51,3 +60,187 @@ type Manager interface { // State returns a read-only interface to the internal CPU manager state. State() state.Reader } + +type manager struct { + sync.Mutex + policy Policy + + // reconcilePeriod is the duration between calls to reconcileState. + reconcilePeriod time.Duration + + // state allows pluggable CPU assignment policies while sharing a common + // representation of state for the system to inspect and reconcile. + state state.State + + // containerRuntime is the container runtime service interface needed + // to make UpdateContainerResources() calls against the containers. + containerRuntime runtimeService + + // activePods is a method for listing active pods on the node + // so all the containers can be updated in the reconciliation loop. + activePods ActivePodsFunc + + // podStatusProvider provides a method for obtaining pod statuses + // and the containerID of their containers + podStatusProvider status.PodStatusProvider + + machineInfo *cadvisorapi.MachineInfo + + nodeAllocatableReservation v1.ResourceList +} + +var _ Manager = &manager{} + +// NewManager creates new cpu manager based on provided policy +func NewManager( + cpuPolicyName string, + reconcilePeriod time.Duration, + machineInfo *cadvisorapi.MachineInfo, + nodeAllocatableReservation v1.ResourceList, +) (Manager, error) { + var policy Policy + + switch policyName(cpuPolicyName) { + + case PolicyNone: + policy = NewNonePolicy() + + default: + glog.Warningf("[cpumanager] Unknown policy (\"%s\"), falling back to \"%s\" policy (\"%s\")", cpuPolicyName, PolicyNone) + policy = NewNonePolicy() + } + + manager := &manager{ + policy: policy, + reconcilePeriod: reconcilePeriod, + state: state.NewMemoryState(), + machineInfo: machineInfo, + nodeAllocatableReservation: nodeAllocatableReservation, + } + return manager, nil +} + +func (m *manager) Start(activePods ActivePodsFunc, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) { + glog.Infof("[cpumanger] starting with %s policy", m.policy.Name()) + glog.Infof("[cpumanger] reconciling every %v", m.reconcilePeriod) + + m.activePods = activePods + m.podStatusProvider = podStatusProvider + m.containerRuntime = containerRuntime + + m.policy.Start(m.state) + if m.policy.Name() == string(PolicyNone) { + return + } + go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop) +} + +func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error { + m.Lock() + err := m.policy.AddContainer(m.state, p, c, containerID) + if err != nil { + glog.Errorf("[cpumanager] AddContainer error: %v", err) + m.Unlock() + return err + } + cpus := m.state.GetCPUSetOrDefault(containerID) + m.Unlock() + + err = m.updateContainerCPUSet(containerID, cpus) + if err != nil { + glog.Errorf("[cpumanager] AddContainer error: %v", err) + return err + } + return nil +} + +func (m *manager) RemoveContainer(containerID string) error { + m.Lock() + defer m.Unlock() + + err := m.policy.RemoveContainer(m.state, containerID) + if err != nil { + glog.Errorf("[cpumanager] RemoveContainer error: %v", err) + return err + } + return nil +} + +func (m *manager) State() state.Reader { + return m.state +} + +type reconciledContainer struct { + podName string + containerName string + containerID string +} + +func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) { + success = []reconciledContainer{} + failure = []reconciledContainer{} + + for _, pod := range m.activePods() { + allContainers := pod.Spec.InitContainers + allContainers = append(allContainers, pod.Spec.Containers...) + for _, container := range allContainers { + status, ok := m.podStatusProvider.GetPodStatus(pod.UID) + if !ok { + glog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s, container: %s)", pod.Name, container.Name) + failure = append(failure, reconciledContainer{pod.Name, container.Name, ""}) + break + } + + containerID, err := findContainerIDByName(&status, container.Name) + if err != nil { + glog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err) + failure = append(failure, reconciledContainer{pod.Name, container.Name, ""}) + continue + } + + cset := m.state.GetCPUSetOrDefault(containerID) + if cset.IsEmpty() { + // NOTE: This should not happen outside of tests. + glog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name) + failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID}) + continue + } + + glog.Infof("[cpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")", pod.Name, container.Name, containerID, cset) + err = m.updateContainerCPUSet(containerID, cset) + if err != nil { + glog.Errorf("[cpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)", pod.Name, container.Name, containerID, cset, err) + failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID}) + continue + } + success = append(success, reconciledContainer{pod.Name, container.Name, containerID}) + } + } + return success, failure +} + +func findContainerIDByName(status *v1.PodStatus, name string) (string, error) { + for _, container := range status.ContainerStatuses { + if container.Name == name && container.ContainerID != "" { + cid := &kubecontainer.ContainerID{} + err := cid.ParseString(container.ContainerID) + if err != nil { + return "", err + } + return cid.ID, nil + } + } + return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name) +} + +func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error { + // TODO: Consider adding a `ResourceConfigForContainer` helper in + // helpers_linux.go similar to what exists for pods. + // It would be better to pass the full container resources here instead of + // this patch-like partial resources. + return m.containerRuntime.UpdateContainerResources( + containerID, + &runtimeapi.LinuxContainerResources{ + CpusetCpus: cpus.String(), + }) +} diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go new file mode 100644 index 00000000000..e432de0360b --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -0,0 +1,452 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "fmt" + "reflect" + "testing" + + "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" +) + +type mockState struct { + assignments map[string]cpuset.CPUSet + defaultCPUSet cpuset.CPUSet +} + +func (s *mockState) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { + res, ok := s.assignments[containerID] + return res.Clone(), ok +} + +func (s *mockState) GetDefaultCPUSet() cpuset.CPUSet { + return s.defaultCPUSet.Clone() +} + +func (s *mockState) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { + if res, ok := s.GetCPUSet(containerID); ok { + return res + } + return s.GetDefaultCPUSet() +} + +func (s *mockState) SetCPUSet(containerID string, cset cpuset.CPUSet) { + s.assignments[containerID] = cset +} + +func (s *mockState) SetDefaultCPUSet(cset cpuset.CPUSet) { + s.defaultCPUSet = cset +} + +func (s *mockState) Delete(containerID string) { + delete(s.assignments, containerID) +} + +type mockPolicy struct { + err error +} + +func (p *mockPolicy) Name() string { + return "mock" +} + +func (p *mockPolicy) Start(s state.State) { +} + +func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error { + return p.err +} + +func (p *mockPolicy) RemoveContainer(s state.State, containerID string) error { + return p.err +} + +type mockRuntimeService struct { + err error +} + +func (rt mockRuntimeService) UpdateContainerResources(id string, resources *runtimeapi.LinuxContainerResources) error { + return rt.err +} + +type mockPodStatusProvider struct { + podStatus v1.PodStatus + found bool +} + +func (psp mockPodStatusProvider) GetPodStatus(uid types.UID) (v1.PodStatus, bool) { + return psp.podStatus, psp.found +} + +type mockPodKiller struct { + killedPods []*v1.Pod +} + +func (f *mockPodKiller) killPodNow(pod *v1.Pod, status v1.PodStatus, gracePeriodOverride *int64) error { + f.killedPods = append(f.killedPods, pod) + return nil +} + +type mockPodProvider struct { + pods []*v1.Pod +} + +func (f *mockPodProvider) getPods() []*v1.Pod { + return f.pods +} + +type mockRecorder struct{} + +func (r *mockRecorder) Eventf(object runtime.Object, eventtype, reason, messageFmt string, args ...interface{}) { +} + +func makePod(cpuRequest, cpuLimit string) *v1.Pod { + return &v1.Pod{ + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Resources: v1.ResourceRequirements{ + Requests: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpuRequest), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + Limits: v1.ResourceList{ + v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpuLimit), + v1.ResourceName(v1.ResourceMemory): resource.MustParse("1G"), + }, + }, + }, + }, + }, + } +} + +// CpuAllocatable must be <= CpuCapacity +func prepareCPUNodeStatus(CPUCapacity, CPUAllocatable string) v1.NodeStatus { + nodestatus := v1.NodeStatus{ + Capacity: make(v1.ResourceList, 1), + Allocatable: make(v1.ResourceList, 1), + } + cpucap, _ := resource.ParseQuantity(CPUCapacity) + cpuall, _ := resource.ParseQuantity(CPUAllocatable) + + nodestatus.Capacity[v1.ResourceCPU] = cpucap + nodestatus.Allocatable[v1.ResourceCPU] = cpuall + return nodestatus +} + +func TestCPUManagerAdd(t *testing.T) { + testCases := []struct { + description string + regErr error + updateErr error + expErr error + }{ + { + description: "cpu manager add - no error", + regErr: nil, + updateErr: nil, + expErr: nil, + }, + { + description: "cpu manager add - policy add container error", + regErr: fmt.Errorf("fake reg error"), + updateErr: nil, + expErr: fmt.Errorf("fake reg error"), + }, + { + description: "cpu manager add - container update error", + regErr: nil, + updateErr: fmt.Errorf("fake update error"), + expErr: fmt.Errorf("fake update error"), + }, + } + + for _, testCase := range testCases { + mgr := &manager{ + policy: &mockPolicy{ + err: testCase.regErr, + }, + state: &mockState{ + assignments: map[string]cpuset.CPUSet{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + containerRuntime: mockRuntimeService{ + err: testCase.updateErr, + }, + activePods: func() []*v1.Pod { return nil }, + podStatusProvider: mockPodStatusProvider{}, + } + + pod := makePod("1000", "1000") + container := &pod.Spec.Containers[0] + err := mgr.AddContainer(pod, container, "fakeID") + if !reflect.DeepEqual(err, testCase.expErr) { + t.Errorf("CPU Manager AddContainer() error (%v). expected error: %v but got: %v", + testCase.description, testCase.expErr, err) + } + } +} + +func TestCPUManagerRemove(t *testing.T) { + mgr := &manager{ + policy: &mockPolicy{ + err: nil, + }, + state: &mockState{ + assignments: map[string]cpuset.CPUSet{}, + defaultCPUSet: cpuset.NewCPUSet(), + }, + containerRuntime: mockRuntimeService{}, + activePods: func() []*v1.Pod { return nil }, + podStatusProvider: mockPodStatusProvider{}, + } + + err := mgr.RemoveContainer("fakeID") + if err != nil { + t.Errorf("CPU Manager RemoveContainer() error. expected error to be nil but got: %v", err) + } + + mgr = &manager{ + policy: &mockPolicy{ + err: fmt.Errorf("fake error"), + }, + state: state.NewMemoryState(), + containerRuntime: mockRuntimeService{}, + activePods: func() []*v1.Pod { return nil }, + podStatusProvider: mockPodStatusProvider{}, + } + + err = mgr.RemoveContainer("fakeID") + if !reflect.DeepEqual(err, fmt.Errorf("fake error")) { + t.Errorf("CPU Manager RemoveContainer() error. expected error: fake error but got: %v", err) + } +} + +func TestReconcileState(t *testing.T) { + testCases := []struct { + description string + activePods []*v1.Pod + pspPS v1.PodStatus + pspFound bool + stAssignments map[string]cpuset.CPUSet + stDefaultCPUSet cpuset.CPUSet + updateErr error + expectFailedContainerName string + }{ + { + description: "cpu manager reconclie - no error", + activePods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "fakePodName", + UID: "fakeUID", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fakeName", + }, + }, + }, + }, + }, + pspPS: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "fakeName", + ContainerID: "docker://fakeID", + }, + }, + }, + pspFound: true, + stAssignments: map[string]cpuset.CPUSet{ + "fakeID": cpuset.NewCPUSet(1, 2), + }, + stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), + updateErr: nil, + expectFailedContainerName: "", + }, + { + description: "cpu manager reconclie - pod status not found", + activePods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "fakePodName", + UID: "fakeUID", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fakeName", + }, + }, + }, + }, + }, + pspPS: v1.PodStatus{}, + pspFound: false, + stAssignments: map[string]cpuset.CPUSet{}, + stDefaultCPUSet: cpuset.NewCPUSet(), + updateErr: nil, + expectFailedContainerName: "fakeName", + }, + { + description: "cpu manager reconclie - container id not found", + activePods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "fakePodName", + UID: "fakeUID", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fakeName", + }, + }, + }, + }, + }, + pspPS: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "fakeName1", + ContainerID: "docker://fakeID", + }, + }, + }, + pspFound: true, + stAssignments: map[string]cpuset.CPUSet{}, + stDefaultCPUSet: cpuset.NewCPUSet(), + updateErr: nil, + expectFailedContainerName: "fakeName", + }, + { + description: "cpu manager reconclie - cpuset is empty", + activePods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "fakePodName", + UID: "fakeUID", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fakeName", + }, + }, + }, + }, + }, + pspPS: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "fakeName", + ContainerID: "docker://fakeID", + }, + }, + }, + pspFound: true, + stAssignments: map[string]cpuset.CPUSet{ + "fakeID": cpuset.NewCPUSet(), + }, + stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), + updateErr: nil, + expectFailedContainerName: "fakeName", + }, + { + description: "cpu manager reconclie - container update error", + activePods: []*v1.Pod{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "fakePodName", + UID: "fakeUID", + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: "fakeName", + }, + }, + }, + }, + }, + pspPS: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + Name: "fakeName", + ContainerID: "docker://fakeID", + }, + }, + }, + pspFound: true, + stAssignments: map[string]cpuset.CPUSet{ + "fakeID": cpuset.NewCPUSet(1, 2), + }, + stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), + updateErr: fmt.Errorf("fake container update error"), + expectFailedContainerName: "fakeName", + }, + } + + for _, testCase := range testCases { + mgr := &manager{ + policy: &mockPolicy{ + err: nil, + }, + state: &mockState{ + assignments: testCase.stAssignments, + defaultCPUSet: testCase.stDefaultCPUSet, + }, + containerRuntime: mockRuntimeService{ + err: testCase.updateErr, + }, + activePods: func() []*v1.Pod { + return testCase.activePods + }, + podStatusProvider: mockPodStatusProvider{ + podStatus: testCase.pspPS, + found: testCase.pspFound, + }, + } + + _, failure := mgr.reconcileState() + + if testCase.expectFailedContainerName != "" { + // Search failed reconciled containers for the supplied name. + foundFailedContainer := false + for _, reconciled := range failure { + if reconciled.containerName == testCase.expectFailedContainerName { + foundFailedContainer = true + break + } + } + if !foundFailedContainer { + t.Errorf("Expected reconciliation failure for container: %s", testCase.expectFailedContainerName) + } + } + } +} diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go new file mode 100644 index 00000000000..f4af5667ea6 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -0,0 +1,58 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/status" +) + +type fakeManager struct { + state state.State +} + +func (m *fakeManager) Start(activePods ActivePodsFunc, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) { + glog.Info("[fake cpumanager] Start()") +} + +func (m *fakeManager) Policy() Policy { + glog.Info("[fake cpumanager] Policy()") + return NewNonePolicy() +} + +func (m *fakeManager) AddContainer(pod *v1.Pod, container *v1.Container, containerID string) error { + glog.Infof("[fake cpumanager] AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) + return nil +} + +func (m *fakeManager) RemoveContainer(containerID string) error { + glog.Infof("[fake cpumanager] RemoveContainer (container id: %s)", containerID) + return nil +} + +func (m *fakeManager) State() state.Reader { + return m.state +} + +// NewFakeManager creates empty/fake cpu manager +func NewFakeManager() Manager { + return &fakeManager{ + state: state.NewMemoryState(), + } +} diff --git a/pkg/kubelet/cm/cpumanager/policy_none.go b/pkg/kubelet/cm/cpumanager/policy_none.go new file mode 100644 index 00000000000..26e3335f73a --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/policy_none.go @@ -0,0 +1,51 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" +) + +type nonePolicy struct{} + +var _ Policy = &nonePolicy{} + +// PolicyNone name of none policy +const PolicyNone policyName = "none" + +// NewNonePolicy returns a cupset manager policy that does nothing +func NewNonePolicy() Policy { + return &nonePolicy{} +} + +func (p *nonePolicy) Name() string { + return string(PolicyNone) +} + +func (p *nonePolicy) Start(s state.State) { + glog.Info("[cpumanager] none policy: Start") +} + +func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error { + return nil +} + +func (p *nonePolicy) RemoveContainer(s state.State, containerID string) error { + return nil +} diff --git a/pkg/kubelet/cm/cpumanager/policy_none_test.go b/pkg/kubelet/cm/cpumanager/policy_none_test.go new file mode 100644 index 00000000000..e20f9bdbbc1 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/policy_none_test.go @@ -0,0 +1,64 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "testing" + + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" +) + +func TestNonePolicyName(t *testing.T) { + policy := &nonePolicy{} + + policyName := policy.Name() + if policyName != "none" { + t.Errorf("NonePolicy Name() error. expected: none, returned: %v", + policyName) + } +} + +func TestNonePolicyAdd(t *testing.T) { + policy := &nonePolicy{} + + st := &mockState{ + assignments: map[string]cpuset.CPUSet{}, + defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), + } + + testPod := makePod("1000m", "1000m") + + container := &testPod.Spec.Containers[0] + err := policy.AddContainer(st, testPod, container, "fakeID") + if err != nil { + t.Errorf("NonePolicy AddContainer() error. expected no error but got: %v", err) + } +} + +func TestNonePolicyRemove(t *testing.T) { + policy := &nonePolicy{} + + st := &mockState{ + assignments: map[string]cpuset.CPUSet{}, + defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), + } + + err := policy.RemoveContainer(st, "fakeID") + if err != nil { + t.Errorf("NonePolicy RemoveContainer() error. expected no error but got %v", err) + } +} diff --git a/pkg/kubelet/cm/cpumanager/state/BUILD b/pkg/kubelet/cm/cpumanager/state/BUILD index f90e24c96e5..7731bfb78c9 100644 --- a/pkg/kubelet/cm/cpumanager/state/BUILD +++ b/pkg/kubelet/cm/cpumanager/state/BUILD @@ -2,9 +2,15 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = ["state.go"], + srcs = [ + "state.go", + "state_mem.go", + ], visibility = ["//visibility:public"], - deps = ["//pkg/kubelet/cm/cpuset:go_default_library"], + deps = [ + "//pkg/kubelet/cm/cpuset:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], ) filegroup( diff --git a/pkg/kubelet/cm/cpumanager/state/state_mem.go b/pkg/kubelet/cm/cpumanager/state/state_mem.go new file mode 100644 index 00000000000..751b1726aae --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/state/state_mem.go @@ -0,0 +1,90 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "sync" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" +) + +type stateMemory struct { + sync.RWMutex + assignments map[string]cpuset.CPUSet + defaultCPUSet cpuset.CPUSet +} + +var _ State = &stateMemory{} + +// NewMemoryState creates new State for keeping track of cpu/pod assignment +func NewMemoryState() State { + glog.Infof("[cpumanager] initializing new in-memory state store") + return &stateMemory{ + assignments: map[string]cpuset.CPUSet{}, + defaultCPUSet: cpuset.NewCPUSet(), + } +} + +func (s *stateMemory) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { + s.RLock() + defer s.RUnlock() + + res, ok := s.assignments[containerID] + return res.Clone(), ok +} + +func (s *stateMemory) GetDefaultCPUSet() cpuset.CPUSet { + s.RLock() + defer s.RUnlock() + + return s.defaultCPUSet.Clone() +} + +func (s *stateMemory) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { + s.RLock() + defer s.RUnlock() + + if res, ok := s.GetCPUSet(containerID); ok { + return res + } + return s.GetDefaultCPUSet() +} + +func (s *stateMemory) SetCPUSet(containerID string, cset cpuset.CPUSet) { + s.Lock() + defer s.Unlock() + + s.assignments[containerID] = cset + glog.Infof("[cpumanager] updated desired cpuset (container id: %s, cpuset: \"%s\")", containerID, cset) +} + +func (s *stateMemory) SetDefaultCPUSet(cset cpuset.CPUSet) { + s.Lock() + defer s.Unlock() + + s.defaultCPUSet = cset + glog.Infof("[cpumanager] updated default cpuset: \"%s\"", cset) +} + +func (s *stateMemory) Delete(containerID string) { + s.Lock() + defer s.Unlock() + + delete(s.assignments, containerID) + glog.V(2).Infof("[cpumanager] deleted cpuset assignment (container id: %s)", containerID) +} diff --git a/pkg/kubelet/cm/cpuset/BUILD b/pkg/kubelet/cm/cpuset/BUILD index 84f1beebd15..24ee7753220 100644 --- a/pkg/kubelet/cm/cpuset/BUILD +++ b/pkg/kubelet/cm/cpuset/BUILD @@ -1,25 +1,16 @@ -package(default_visibility = ["//visibility:public"]) +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", - "go_test", +go_library( + name = "go_default_library", + srcs = ["cpuset.go"], + visibility = ["//visibility:public"], + deps = ["//vendor/github.com/golang/glog:go_default_library"], ) go_test( name = "go_default_test", srcs = ["cpuset_test.go"], library = ":go_default_library", - tags = ["automanaged"], -) - -go_library( - name = "go_default_library", - srcs = ["cpuset.go"], - tags = ["automanaged"], - deps = ["//vendor/github.com/golang/glog:go_default_library"], ) filegroup( @@ -33,4 +24,5 @@ filegroup( name = "all-srcs", srcs = [":package-srcs"], tags = ["automanaged"], + visibility = ["//visibility:public"], ) diff --git a/pkg/kubelet/cm/fake_internal_container_lifecycle.go b/pkg/kubelet/cm/fake_internal_container_lifecycle.go new file mode 100644 index 00000000000..4709a904020 --- /dev/null +++ b/pkg/kubelet/cm/fake_internal_container_lifecycle.go @@ -0,0 +1,39 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "k8s.io/api/core/v1" +) + +func NewFakeInternalContainerLifecycle() *fakeInternalContainerLifecycle { + return &fakeInternalContainerLifecycle{} +} + +type fakeInternalContainerLifecycle struct{} + +func (f *fakeInternalContainerLifecycle) PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error { + return nil +} + +func (f *fakeInternalContainerLifecycle) PreStopContainer(containerID string) error { + return nil +} + +func (f *fakeInternalContainerLifecycle) PostStopContainer(containerID string) error { + return nil +} diff --git a/pkg/kubelet/cm/internal_container_lifecycle.go b/pkg/kubelet/cm/internal_container_lifecycle.go new file mode 100644 index 00000000000..141f0b891f8 --- /dev/null +++ b/pkg/kubelet/cm/internal_container_lifecycle.go @@ -0,0 +1,27 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cm + +import ( + "k8s.io/api/core/v1" +) + +type InternalContainerLifecycle interface { + PreStartContainer(pod *v1.Pod, container *v1.Container, containerID string) error + PreStopContainer(containerID string) error + PostStopContainer(containerID string) error +} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 991ac33dced..c9b6b6239d3 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -622,6 +622,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, if err != nil { return nil, err } + klet.runtimeService = runtimeService runtime, err := kuberuntime.NewKubeGenericRuntimeManager( kubecontainer.FilterEventRecorder(kubeDeps.Recorder), klet.livenessManager, @@ -639,6 +640,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeCfg.CPUCFSQuota, runtimeService, imageService, + kubeDeps.ContainerManager.InternalContainerLifecycle(), ) if err != nil { return nil, err @@ -979,6 +981,11 @@ type Kubelet struct { // Container runtime. containerRuntime kubecontainer.Runtime + // Container runtime service (needed by container runtime Start()). + // TODO(CD): try to make this available without holding a reference in this + // struct. For example, by adding a getter to generic runtime. + runtimeService internalapi.RuntimeService + // reasonCache caches the failure reason of the last creation of all containers, which is // used for generating ContainerStatus. reasonCache *ReasonCache @@ -1243,7 +1250,7 @@ func (kl *Kubelet) initializeModules() error { return fmt.Errorf("Kubelet failed to get node info: %v", err) } - if err := kl.containerManager.Start(node, kl.GetActivePods); err != nil { + if err := kl.containerManager.Start(node, kl.GetActivePods, kl.statusManager, kl.runtimeService); err != nil { return fmt.Errorf("Failed to start ContainerManager %v", err) } diff --git a/pkg/kubelet/kuberuntime/BUILD b/pkg/kubelet/kuberuntime/BUILD index bef4f1a062f..a2599346fc9 100644 --- a/pkg/kubelet/kuberuntime/BUILD +++ b/pkg/kubelet/kuberuntime/BUILD @@ -29,6 +29,7 @@ go_library( "//pkg/credentialprovider:go_default_library", "//pkg/kubelet/apis/cri:go_default_library", "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", + "//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/images:go_default_library", diff --git a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go index f872707eae0..8e654c5a393 100644 --- a/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/fake_kuberuntime_manager.go @@ -27,6 +27,7 @@ import ( "k8s.io/client-go/util/flowcontrol" "k8s.io/kubernetes/pkg/credentialprovider" internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" + "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/lifecycle" @@ -69,6 +70,7 @@ func NewFakeKubeRuntimeManager(runtimeService internalapi.RuntimeService, imageS runtimeService: runtimeService, imageService: imageService, keyring: keyring, + internalLifecycle: cm.NewFakeInternalContainerLifecycle(), } typedVersion, err := runtimeService.Version(kubeRuntimeAPIVersion) diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 74b0e1b354b..e3733f15c79 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -116,6 +116,11 @@ func (m *kubeGenericRuntimeManager) startContainer(podSandboxID string, podSandb m.recordContainerEvent(pod, container, containerID, v1.EventTypeWarning, events.FailedToCreateContainer, "Error: %v", grpc.ErrorDesc(err)) return grpc.ErrorDesc(err), ErrCreateContainer } + err = m.internalLifecycle.PreStartContainer(pod, container, containerID) + if err != nil { + m.recorder.Eventf(ref, v1.EventTypeWarning, events.FailedToStartContainer, "Internal PreStartContainer hook failed: %v", err) + return "Internal PreStartContainer hook failed", err + } m.recordContainerEvent(pod, container, containerID, v1.EventTypeNormal, events.CreatedContainer, "Created container") if ref != nil { @@ -575,6 +580,11 @@ func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubec glog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod) + // Run internal pre-stop lifecycle hook + if err := m.internalLifecycle.PreStopContainer(containerID.ID); err != nil { + return err + } + // Run the pre-stop lifecycle hooks if applicable and if there is enough time to run it if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 { gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod) @@ -806,6 +816,11 @@ func (m *kubeGenericRuntimeManager) RunInContainer(id kubecontainer.ContainerID, // it will not write container logs anymore in that state. func (m *kubeGenericRuntimeManager) removeContainer(containerID string) error { glog.V(4).Infof("Removing container %q", containerID) + // Call internal container post-stop lifecycle hook. + if err := m.internalLifecycle.PostStopContainer(containerID); err != nil { + return err + } + // Remove the container log. // TODO: Separate log and container lifecycle management. if err := m.removeContainerLog(containerID); err != nil { diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 0be5b25d38d..c4facfbbbfd 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/credentialprovider" internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" + "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/images" @@ -108,6 +109,9 @@ type kubeGenericRuntimeManager struct { // The directory path for seccomp profiles. seccompProfileRoot string + + // Internal lifecycle event handlers for container resource management. + internalLifecycle cm.InternalContainerLifecycle } type KubeGenericRuntime interface { @@ -134,6 +138,7 @@ func NewKubeGenericRuntimeManager( cpuCFSQuota bool, runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService, + internalLifecycle cm.InternalContainerLifecycle, ) (KubeGenericRuntime, error) { kubeRuntimeManager := &kubeGenericRuntimeManager{ recorder: recorder, @@ -147,6 +152,7 @@ func NewKubeGenericRuntimeManager( runtimeService: newInstrumentedRuntimeService(runtimeService), imageService: newInstrumentedImageManagerService(imageService), keyring: credentialprovider.NewDockerKeyring(), + internalLifecycle: internalLifecycle, } typedVersion, err := kubeRuntimeManager.runtimeService.Version(kubeRuntimeAPIVersion)