diff --git a/pkg/kubelet/cm/cpumanager/BUILD b/pkg/kubelet/cm/cpumanager/BUILD index c496c8837d3..b84719d1f8c 100644 --- a/pkg/kubelet/cm/cpumanager/BUILD +++ b/pkg/kubelet/cm/cpumanager/BUILD @@ -5,13 +5,19 @@ go_library( srcs = [ "cpu_manager.go", "policy.go", + "policy_none.go", ], visibility = ["//visibility:public"], deps = [ "//pkg/kubelet/apis/cri/v1alpha1/runtime:go_default_library", "//pkg/kubelet/cm/cpumanager/state:go_default_library", + "//pkg/kubelet/cm/cpuset:go_default_library", + "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/status:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/github.com/google/cadvisor/info/v1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", ], ) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 5da92348ffc..4e599813817 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -17,10 +17,19 @@ limitations under the License. package cpumanager import ( + "fmt" + "sync" + "time" + + "github.com/golang/glog" + cadvisorapi "github.com/google/cadvisor/info/v1" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" + "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/status" ) @@ -51,3 +60,181 @@ type Manager interface { // State returns a read-only interface to the internal CPU manager state. State() state.Reader } + +type manager struct { + sync.Mutex + policy Policy + + // state allows pluggable CPU assignment policies while sharing a common + // representation of state for the system to inspect and reconcile. + state state.State + + // containerRuntime is the container runtime service interface needed + // to make UpdateContainerResources() calls against the containers. + containerRuntime runtimeService + + // activePods is a method for listing active pods on the node + // so all the containers can be updated in the reconciliation loop. + activePods ActivePodsFunc + + // podStatusProvider provides a method for obtaining pod statuses + // and the containerID of their containers + podStatusProvider status.PodStatusProvider + + machineInfo *cadvisorapi.MachineInfo + + nodeAllocatableReservation v1.ResourceList +} + +var _ Manager = &manager{} + +// NewManager creates new cpu manager based on provided policy +func NewManager( + cpuPolicyName string, + machineInfo *cadvisorapi.MachineInfo, + nodeAllocatableReservation v1.ResourceList, +) (Manager, error) { + var policy Policy + + switch policyName(cpuPolicyName) { + + case PolicyNone: + policy = NewNonePolicy() + + default: + glog.Warningf("[cpumanager] Unknown policy (\"%s\"), falling back to \"%s\" policy (\"%s\")", cpuPolicyName, PolicyNone) + policy = NewNonePolicy() + } + + manager := &manager{ + policy: policy, + state: state.NewMemoryState(), + machineInfo: machineInfo, + nodeAllocatableReservation: nodeAllocatableReservation, + } + return manager, nil +} + +func (m *manager) Start(activePods ActivePodsFunc, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) { + glog.Infof("[cpumanger] starting with %s policy", m.policy.Name()) + + m.activePods = activePods + m.podStatusProvider = podStatusProvider + m.containerRuntime = containerRuntime + + m.policy.Start(m.state) + if m.policy.Name() == string(PolicyNone) { + return + } + go wait.Until(func() { m.reconcileState() }, time.Second, wait.NeverStop) +} + +func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error { + m.Lock() + err := m.policy.AddContainer(m.state, p, c, containerID) + if err != nil { + glog.Errorf("[cpumanager] AddContainer error: %v", err) + m.Unlock() + return err + } + cpus := m.state.GetCPUSetOrDefault(containerID) + m.Unlock() + + err = m.updateContainerCPUSet(containerID, cpus) + if err != nil { + glog.Errorf("[cpumanager] AddContainer error: %v", err) + return err + } + return nil +} + +func (m *manager) RemoveContainer(containerID string) error { + m.Lock() + defer m.Unlock() + + err := m.policy.RemoveContainer(m.state, containerID) + if err != nil { + glog.Errorf("[cpumanager] RemoveContainer error: %v", err) + return err + } + return nil +} + +func (m *manager) State() state.Reader { + return m.state +} + +type reconciledContainer struct { + podName string + containerName string + containerID string +} + +func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) { + success = []reconciledContainer{} + failure = []reconciledContainer{} + + for _, pod := range m.activePods() { + allContainers := pod.Spec.InitContainers + allContainers = append(allContainers, pod.Spec.Containers...) + for _, container := range allContainers { + status, ok := m.podStatusProvider.GetPodStatus(pod.UID) + if !ok { + glog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s, container: %s)", pod.Name, container.Name) + failure = append(failure, reconciledContainer{pod.Name, container.Name, ""}) + break + } + + containerID, err := findContainerIDByName(&status, container.Name) + if err != nil { + glog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err) + failure = append(failure, reconciledContainer{pod.Name, container.Name, ""}) + continue + } + + cset := m.state.GetCPUSetOrDefault(containerID) + if cset.IsEmpty() { + // NOTE: This should not happen outside of tests. + glog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name) + failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID}) + continue + } + + glog.Infof("[cpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")", pod.Name, container.Name, containerID, cset) + err = m.updateContainerCPUSet(containerID, cset) + if err != nil { + glog.Errorf("[cpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)", pod.Name, container.Name, containerID, cset, err) + failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID}) + continue + } + success = append(success, reconciledContainer{pod.Name, container.Name, containerID}) + } + } + return success, failure +} + +func findContainerIDByName(status *v1.PodStatus, name string) (string, error) { + for _, container := range status.ContainerStatuses { + if container.Name == name && container.ContainerID != "" { + cid := &kubecontainer.ContainerID{} + err := cid.ParseString(container.ContainerID) + if err != nil { + return "", err + } + return cid.ID, nil + } + } + return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name) +} + +func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error { + // TODO: Consider adding a `ResourceConfigForContainer` helper in + // helpers_linux.go similar to what exists for pods. + // It would be better to pass the full container resources here instead of + // this patch-like partial resources. + return m.containerRuntime.UpdateContainerResources( + containerID, + &runtimeapi.LinuxContainerResources{ + CpusetCpus: cpus.String(), + }) +} diff --git a/pkg/kubelet/cm/cpumanager/policy_none.go b/pkg/kubelet/cm/cpumanager/policy_none.go new file mode 100644 index 00000000000..26e3335f73a --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/policy_none.go @@ -0,0 +1,51 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cpumanager + +import ( + "github.com/golang/glog" + "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" +) + +type nonePolicy struct{} + +var _ Policy = &nonePolicy{} + +// PolicyNone name of none policy +const PolicyNone policyName = "none" + +// NewNonePolicy returns a cupset manager policy that does nothing +func NewNonePolicy() Policy { + return &nonePolicy{} +} + +func (p *nonePolicy) Name() string { + return string(PolicyNone) +} + +func (p *nonePolicy) Start(s state.State) { + glog.Info("[cpumanager] none policy: Start") +} + +func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error { + return nil +} + +func (p *nonePolicy) RemoveContainer(s state.State, containerID string) error { + return nil +} diff --git a/pkg/kubelet/cm/cpumanager/state/BUILD b/pkg/kubelet/cm/cpumanager/state/BUILD index f90e24c96e5..7731bfb78c9 100644 --- a/pkg/kubelet/cm/cpumanager/state/BUILD +++ b/pkg/kubelet/cm/cpumanager/state/BUILD @@ -2,9 +2,15 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = ["state.go"], + srcs = [ + "state.go", + "state_mem.go", + ], visibility = ["//visibility:public"], - deps = ["//pkg/kubelet/cm/cpuset:go_default_library"], + deps = [ + "//pkg/kubelet/cm/cpuset:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + ], ) filegroup(