diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index 7b57d09c324..edcc7f53bf6 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -50,6 +50,7 @@ go_library( "//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper/qos:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", + "//pkg/kubelet/cm/cpumanager/containermap:go_default_library", "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/devicemanager:go_default_library", "//pkg/kubelet/cm/util:go_default_library", @@ -100,6 +101,7 @@ go_library( "//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper/qos:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", + "//pkg/kubelet/cm/cpumanager/containermap:go_default_library", "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/devicemanager:go_default_library", "//pkg/kubelet/cm/util:go_default_library", diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 81d3a015639..90ad0c1bb49 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -50,6 +50,7 @@ import ( podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" cputopology "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/devicemanager" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" @@ -574,7 +575,11 @@ func (cm *containerManagerImpl) Start(node *v1.Node, // Initialize CPU manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) { - cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService) + containerMap, err := buildContainerMapFromRuntime(runtimeService) + if err != nil { + return fmt.Errorf("failed to build map of initial containers from runtime: %v", err) + } + cm.cpuManager.Start(cpumanager.ActivePodsFunc(activePods), sourcesReady, podStatusProvider, runtimeService, containerMap) } // cache the node Info including resource capacity and @@ -686,6 +691,25 @@ func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList { } } +func buildContainerMapFromRuntime(runtimeService internalapi.RuntimeService) (containermap.ContainerMap, error) { + podSandboxMap := make(map[string]string) + podSandboxList, _ := runtimeService.ListPodSandbox(nil) + for _, p := range podSandboxList { + podSandboxMap[p.Id] = p.Metadata.Uid + } + + containerMap := containermap.NewContainerMap() + containerList, _ := runtimeService.ListContainers(nil) + for _, c := range containerList { + if _, exists := podSandboxMap[c.PodSandboxId]; !exists { + return nil, fmt.Errorf("no PodsandBox found with Id '%s'", c.PodSandboxId) + } + containerMap.Add(podSandboxMap[c.PodSandboxId], c.Metadata.Name, c.Id) + } + + return containerMap, nil +} + func isProcessRunningInHost(pid int) (bool, error) { // Get init pid namespace. initPidNs, err := os.Readlink("/proc/1/ns/pid") diff --git a/pkg/kubelet/cm/cpumanager/BUILD b/pkg/kubelet/cm/cpumanager/BUILD index e63627c59df..45092994ef0 100644 --- a/pkg/kubelet/cm/cpumanager/BUILD +++ b/pkg/kubelet/cm/cpumanager/BUILD @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ - "container_map.go", "cpu_assignment.go", "cpu_manager.go", "fake_cpu_manager.go", @@ -15,6 +14,7 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/apis/core/v1/helper/qos:go_default_library", + "//pkg/kubelet/cm/cpumanager/containermap:go_default_library", "//pkg/kubelet/cm/cpumanager/state:go_default_library", "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", @@ -34,7 +34,6 @@ go_library( go_test( name = "go_default_test", srcs = [ - "container_map_test.go", "cpu_assignment_test.go", "cpu_manager_test.go", "policy_none_test.go", @@ -44,6 +43,7 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//pkg/kubelet/cm/cpumanager/containermap:go_default_library", "//pkg/kubelet/cm/cpumanager/state:go_default_library", "//pkg/kubelet/cm/cpumanager/topology:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", @@ -69,6 +69,7 @@ filegroup( name = "all-srcs", srcs = [ ":package-srcs", + "//pkg/kubelet/cm/cpumanager/containermap:all-srcs", "//pkg/kubelet/cm/cpumanager/state:all-srcs", "//pkg/kubelet/cm/cpumanager/topology:all-srcs", ], diff --git a/pkg/kubelet/cm/cpumanager/container_map.go b/pkg/kubelet/cm/cpumanager/container_map.go deleted file mode 100644 index 2da2c931b60..00000000000 --- a/pkg/kubelet/cm/cpumanager/container_map.go +++ /dev/null @@ -1,68 +0,0 @@ -/* -Copyright 2019 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cpumanager - -import ( - "fmt" - - "k8s.io/api/core/v1" -) - -// containerMap maps (podUID, containerName) -> containerID -type containerMap map[string]map[string]string - -func newContainerMap() containerMap { - return make(containerMap) -} - -func (cm containerMap) Add(p *v1.Pod, c *v1.Container, containerID string) { - podUID := string(p.UID) - if _, exists := cm[podUID]; !exists { - cm[podUID] = make(map[string]string) - } - cm[podUID][c.Name] = containerID -} - -func (cm containerMap) Remove(containerID string) { - found := false - for podUID := range cm { - for containerName := range cm[podUID] { - if containerID == cm[podUID][containerName] { - delete(cm[podUID], containerName) - found = true - break - } - } - if len(cm[podUID]) == 0 { - delete(cm, podUID) - } - if found { - break - } - } -} - -func (cm containerMap) Get(p *v1.Pod, c *v1.Container) (string, error) { - podUID := string(p.UID) - if _, exists := cm[podUID]; !exists { - return "", fmt.Errorf("pod %s not in containerMap", podUID) - } - if _, exists := cm[podUID][c.Name]; !exists { - return "", fmt.Errorf("container %s not in containerMap for pod %s", c.Name, podUID) - } - return cm[podUID][c.Name], nil -} diff --git a/pkg/kubelet/cm/cpumanager/containermap/BUILD b/pkg/kubelet/cm/cpumanager/containermap/BUILD new file mode 100644 index 00000000000..a7f53443c77 --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/containermap/BUILD @@ -0,0 +1,28 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["container_map.go"], + importpath = "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap", + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["container_map_test.go"], + embed = [":go_default_library"], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubelet/cm/cpumanager/containermap/container_map.go b/pkg/kubelet/cm/cpumanager/containermap/container_map.go new file mode 100644 index 00000000000..c324e028b4f --- /dev/null +++ b/pkg/kubelet/cm/cpumanager/containermap/container_map.go @@ -0,0 +1,71 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package containermap + +import ( + "fmt" +) + +// ContainerMap maps (containerID)->(*v1.Pod, *v1.Container) +type ContainerMap map[string]struct { + podUID string + containerName string +} + +// NewContainerMap creates a new ContainerMap struct +func NewContainerMap() ContainerMap { + return make(ContainerMap) +} + +// Add adds a mapping of (containerID)->(podUID, containerName) to the ContainerMap +func (cm ContainerMap) Add(podUID, containerName, containerID string) { + cm[containerID] = struct { + podUID string + containerName string + }{podUID, containerName} +} + +// RemoveByContainerID removes a mapping of (containerID)->(podUID, containerName) from the ContainerMap +func (cm ContainerMap) RemoveByContainerID(containerID string) { + delete(cm, containerID) +} + +// RemoveByContainerRef removes a mapping of (containerID)->(podUID, containerName) from the ContainerMap +func (cm ContainerMap) RemoveByContainerRef(podUID, containerName string) { + containerID, err := cm.GetContainerID(podUID, containerName) + if err == nil { + cm.RemoveByContainerID(containerID) + } +} + +// GetContainerID retrieves a ContainerID from the ContainerMap +func (cm ContainerMap) GetContainerID(podUID, containerName string) (string, error) { + for key, val := range cm { + if val.podUID == podUID && val.containerName == containerName { + return key, nil + } + } + return "", fmt.Errorf("container %s not in ContainerMap for pod %s", containerName, podUID) +} + +// GetContainerRef retrieves a (podUID, containerName) pair from the ContainerMap +func (cm ContainerMap) GetContainerRef(containerID string) (string, string, error) { + if _, exists := cm[containerID]; !exists { + return "", "", fmt.Errorf("containerID %s not in ContainerMap", containerID) + } + return cm[containerID].podUID, cm[containerID].containerName, nil +} diff --git a/pkg/kubelet/cm/cpumanager/container_map_test.go b/pkg/kubelet/cm/cpumanager/containermap/container_map_test.go similarity index 59% rename from pkg/kubelet/cm/cpumanager/container_map_test.go rename to pkg/kubelet/cm/cpumanager/containermap/container_map_test.go index 6e0f43fb652..8e58e2f2901 100644 --- a/pkg/kubelet/cm/cpumanager/container_map_test.go +++ b/pkg/kubelet/cm/cpumanager/containermap/container_map_test.go @@ -14,13 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. */ -package cpumanager +package containermap import ( "testing" - - "k8s.io/api/core/v1" - apimachinery "k8s.io/apimachinery/pkg/types" ) func TestContainerMap(t *testing.T) { @@ -37,34 +34,48 @@ func TestContainerMap(t *testing.T) { } for _, tc := range testCases { - pod := v1.Pod{} - pod.UID = apimachinery.UID(tc.podUID) - // Build a new containerMap from the testCases, checking proper // addition, retrieval along the way. - cm := newContainerMap() + cm := NewContainerMap() for i := range tc.containerNames { - container := v1.Container{Name: tc.containerNames[i]} + cm.Add(tc.podUID, tc.containerNames[i], tc.containerIDs[i]) - cm.Add(&pod, &container, tc.containerIDs[i]) - containerID, err := cm.Get(&pod, &container) + containerID, err := cm.GetContainerID(tc.podUID, tc.containerNames[i]) if err != nil { - t.Errorf("error adding and retrieving container: %v", err) + t.Errorf("error adding and retrieving containerID: %v", err) } if containerID != tc.containerIDs[i] { t.Errorf("mismatched containerIDs %v, %v", containerID, tc.containerIDs[i]) } + + podUID, containerName, err := cm.GetContainerRef(containerID) + if err != nil { + t.Errorf("error retrieving container reference: %v", err) + } + if podUID != tc.podUID { + t.Errorf("mismatched pod UID %v, %v", tc.podUID, podUID) + } + if containerName != tc.containerNames[i] { + t.Errorf("mismatched container Name %v, %v", tc.containerNames[i], containerName) + } } // Remove all entries from the containerMap, checking proper removal of // each along the way. for i := range tc.containerNames { - container := v1.Container{Name: tc.containerNames[i]} - cm.Remove(tc.containerIDs[i]) - containerID, err := cm.Get(&pod, &container) + cm.RemoveByContainerID(tc.containerIDs[i]) + containerID, err := cm.GetContainerID(tc.podUID, tc.containerNames[i]) if err == nil { t.Errorf("unexpected retrieval of containerID after removal: %v", containerID) } + + cm.Add(tc.podUID, tc.containerNames[i], tc.containerIDs[i]) + + cm.RemoveByContainerRef(tc.podUID, tc.containerNames[i]) + podUID, containerName, err := cm.GetContainerRef(tc.containerIDs[i]) + if err == nil { + t.Errorf("unexpected retrieval of container reference after removal: (%v, %v)", podUID, containerName) + } } // Verify containerMap now empty. diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index d93f0db3c24..ec9ae4333be 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -28,6 +28,7 @@ import ( "k8s.io/klog" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" @@ -52,7 +53,7 @@ const cpuManagerStateFileName = "cpu_manager_state" // Manager interface provides methods for Kubelet to manage pod cpus. type Manager interface { // Start is called during Kubelet initialization. - Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) + Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) // AddContainer is called between container create and container start // so that initial CPU affinity settings can be written through to the @@ -96,6 +97,10 @@ type manager struct { // and the containerID of their containers podStatusProvider status.PodStatusProvider + // containerMap provides a mapping from (pod, container) -> containerID + // for all containers a pod + containerMap containermap.ContainerMap + topology *topology.CPUTopology nodeAllocatableReservation v1.ResourceList @@ -103,6 +108,9 @@ type manager struct { // sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness. // We use it to determine when we can purge inactive pods from checkpointed state. sourcesReady config.SourcesReady + + // stateFileDirectory holds the directory where the state file for checkpoints is held. + stateFileDirectory string } var _ Manager = &manager{} @@ -153,23 +161,19 @@ func NewManager(cpuPolicyName string, reconcilePeriod time.Duration, machineInfo return nil, fmt.Errorf("unknown policy: \"%s\"", cpuPolicyName) } - stateImpl, err := state.NewCheckpointState(stateFileDirectory, cpuManagerStateFileName, policy.Name()) - if err != nil { - return nil, fmt.Errorf("could not initialize checkpoint manager: %v", err) - } - manager := &manager{ policy: policy, reconcilePeriod: reconcilePeriod, - state: stateImpl, + containerMap: containermap.NewContainerMap(), topology: topo, nodeAllocatableReservation: nodeAllocatableReservation, + stateFileDirectory: stateFileDirectory, } manager.sourcesReady = &sourcesReadyStub{} return manager, nil } -func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) { +func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) { klog.Infof("[cpumanager] starting with %s policy", m.policy.Name()) klog.Infof("[cpumanager] reconciling every %v", m.reconcilePeriod) m.sourcesReady = sourcesReady @@ -177,6 +181,13 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe m.podStatusProvider = podStatusProvider m.containerRuntime = containerRuntime + stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), initialContainers) + if err != nil { + klog.Errorf("[cpumanager] could not initialize checkpoint manager: %v\n", err) + panic("[cpumanager] - please drain node and remove policy state file") + } + m.state = stateImpl + m.policy.Start(m.state) if m.policy.Name() == string(PolicyNone) { return @@ -186,13 +197,24 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) error { m.Lock() - err := m.policy.AddContainer(m.state, p, c, containerID) + // Proactively remove CPUs from init containers that have already run. + // They are guaranteed to have run to completion before any other + // container is run. + for _, initContainer := range p.Spec.InitContainers { + if c.Name != initContainer.Name { + err := m.policyRemoveContainerByRef(string(p.UID), initContainer.Name) + if err != nil { + klog.Warningf("[cpumanager] unable to remove init container (pod: %s, container: %s, error: %v)", string(p.UID), initContainer.Name, err) + } + } + } + err := m.policyAddContainer(p, c, containerID) if err != nil { klog.Errorf("[cpumanager] AddContainer error: %v", err) m.Unlock() return err } - cpus := m.state.GetCPUSetOrDefault(containerID) + cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name) m.Unlock() if !cpus.IsEmpty() { @@ -200,7 +222,7 @@ func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) e if err != nil { klog.Errorf("[cpumanager] AddContainer error: %v", err) m.Lock() - err := m.policy.RemoveContainer(m.state, containerID) + err := m.policyRemoveContainerByID(containerID) if err != nil { klog.Errorf("[cpumanager] AddContainer rollback state error: %v", err) } @@ -216,14 +238,46 @@ func (m *manager) RemoveContainer(containerID string) error { m.Lock() defer m.Unlock() - err := m.policy.RemoveContainer(m.state, containerID) + err := m.policyRemoveContainerByID(containerID) if err != nil { klog.Errorf("[cpumanager] RemoveContainer error: %v", err) return err } + return nil } +func (m *manager) policyAddContainer(p *v1.Pod, c *v1.Container, containerID string) error { + err := m.policy.AddContainer(m.state, p, c) + if err == nil { + m.containerMap.Add(string(p.UID), c.Name, containerID) + } + return err +} + +func (m *manager) policyRemoveContainerByID(containerID string) error { + podUID, containerName, err := m.containerMap.GetContainerRef(containerID) + if err != nil { + return nil + } + + err = m.policy.RemoveContainer(m.state, podUID, containerName) + if err == nil { + m.containerMap.RemoveByContainerID(containerID) + } + + return err +} + +func (m *manager) policyRemoveContainerByRef(podUID string, containerName string) error { + err := m.policy.RemoveContainer(m.state, podUID, containerName) + if err == nil { + m.containerMap.RemoveByContainerRef(podUID, containerName) + } + + return err +} + func (m *manager) State() state.Reader { return m.state } @@ -256,43 +310,35 @@ func (m *manager) removeStaleState() { m.Lock() defer m.Unlock() - // We remove stale state very conservatively, only removing *any* state - // once we know for sure that we wont be accidentally removing state that - // is still valid. Since this function is called periodically, we will just - // try again next time this function is called. + // Get the list of active pods. activePods := m.activePods() if len(activePods) == 0 { // If there are no active pods, skip the removal of stale state. + // Since this function is called periodically, we will just try again + // next time this function is called. return } - // Build a list of containerIDs for all containers in all active Pods. - activeContainers := make(map[string]struct{}) + // Build a list of (podUID, containerName) pairs for all containers in all active Pods. + activeContainers := make(map[string]map[string]struct{}) for _, pod := range activePods { - pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID) - if !ok { - // If even one pod does not have it's status set, skip state removal. - return - } + activeContainers[string(pod.UID)] = make(map[string]struct{}) for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { - containerID, err := findContainerIDByName(&pstatus, container.Name) - if err != nil { - // If even one container does not have it's containerID set, skip state removal. - return - } - activeContainers[containerID] = struct{}{} + activeContainers[string(pod.UID)][container.Name] = struct{}{} } } // Loop through the CPUManager state. Remove any state for containers not - // in the `activeContainers` list built above. The shortcircuits in place - // above ensure that no erroneous state will ever be removed. - for containerID := range m.state.GetCPUAssignments() { - if _, ok := activeContainers[containerID]; !ok { - klog.Errorf("[cpumanager] removeStaleState: removing container: %s)", containerID) - err := m.policy.RemoveContainer(m.state, containerID) - if err != nil { - klog.Errorf("[cpumanager] removeStaleState: failed to remove container %s, error: %v)", containerID, err) + // in the `activeContainers` list built above. + assignments := m.state.GetCPUAssignments() + for podUID := range assignments { + for containerName := range assignments[podUID] { + if _, ok := activeContainers[podUID][containerName]; !ok { + klog.Errorf("[cpumanager] removeStaleState: removing (pod %s, container: %s)", podUID, containerName) + err := m.policyRemoveContainerByRef(podUID, containerName) + if err != nil { + klog.Errorf("[cpumanager] removeStaleState: failed to remove (pod %s, container %s), error: %v)", podUID, containerName, err) + } } } } @@ -325,7 +371,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec // - policy does not want to track the container // - kubelet has just been restarted - and there is no previous state file // - container has been removed from state by RemoveContainer call (DeletionTimestamp is set) - if _, ok := m.state.GetCPUSet(containerID); !ok { + if _, ok := m.state.GetCPUSet(string(pod.UID), container.Name); !ok { if status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil { klog.V(4).Infof("[cpumanager] reconcileState: container is not present in state - trying to add (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) err := m.AddContainer(pod, &container, containerID) @@ -341,7 +387,7 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec } } - cset := m.state.GetCPUSetOrDefault(containerID) + cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name) if cset.IsEmpty() { // NOTE: This should not happen outside of tests. klog.Infof("[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)", pod.Name, container.Name) diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 429e9981a55..c1c494583dc 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -19,6 +19,7 @@ package cpumanager import ( "fmt" "reflect" + "strconv" "strings" "testing" "time" @@ -32,6 +33,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" @@ -43,8 +45,8 @@ type mockState struct { defaultCPUSet cpuset.CPUSet } -func (s *mockState) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { - res, ok := s.assignments[containerID] +func (s *mockState) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { + res, ok := s.assignments[podUID][containerName] return res.Clone(), ok } @@ -52,23 +54,29 @@ func (s *mockState) GetDefaultCPUSet() cpuset.CPUSet { return s.defaultCPUSet.Clone() } -func (s *mockState) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { - if res, ok := s.GetCPUSet(containerID); ok { +func (s *mockState) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet { + if res, ok := s.GetCPUSet(podUID, containerName); ok { return res } return s.GetDefaultCPUSet() } -func (s *mockState) SetCPUSet(containerID string, cset cpuset.CPUSet) { - s.assignments[containerID] = cset +func (s *mockState) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) { + if _, exists := s.assignments[podUID]; !exists { + s.assignments[podUID] = make(map[string]cpuset.CPUSet) + } + s.assignments[podUID][containerName] = cset } func (s *mockState) SetDefaultCPUSet(cset cpuset.CPUSet) { s.defaultCPUSet = cset } -func (s *mockState) Delete(containerID string) { - delete(s.assignments, containerID) +func (s *mockState) Delete(podUID string, containerName string) { + delete(s.assignments[podUID], containerName) + if len(s.assignments[podUID]) == 0 { + delete(s.assignments, podUID) + } } func (s *mockState) ClearState() { @@ -95,11 +103,11 @@ func (p *mockPolicy) Name() string { func (p *mockPolicy) Start(s state.State) { } -func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error { +func (p *mockPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { return p.err } -func (p *mockPolicy) RemoveContainer(s state.State, containerID string) error { +func (p *mockPolicy) RemoveContainer(s state.State, podUID string, containerName string) error { return p.err } @@ -124,8 +132,8 @@ func (psp mockPodStatusProvider) GetPodStatus(uid types.UID) (v1.PodStatus, bool return psp.podStatus, psp.found } -func makePod(cpuRequest, cpuLimit string) *v1.Pod { - return &v1.Pod{ +func makePod(podUID, containerName, cpuRequest, cpuLimit string) *v1.Pod { + pod := &v1.Pod{ Spec: v1.PodSpec{ Containers: []v1.Container{ { @@ -143,10 +151,19 @@ func makePod(cpuRequest, cpuLimit string) *v1.Pod { }, }, } + + pod.UID = types.UID(podUID) + pod.Spec.Containers[0].Name = containerName + + return pod } func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) *v1.Pod { pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod", + UID: "podUID", + }, Spec: v1.PodSpec{ InitContainers: []v1.Container{}, Containers: []v1.Container{}, @@ -155,7 +172,7 @@ func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) for i, cpu := range initCPUs { pod.Spec.InitContainers = append(pod.Spec.InitContainers, v1.Container{ - Name: "initContainer-" + string(i), + Name: "initContainer-" + strconv.Itoa(i), Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request), @@ -171,7 +188,7 @@ func makeMultiContainerPod(initCPUs, appCPUs []struct{ request, limit string }) for i, cpu := range appCPUs { pod.Spec.Containers = append(pod.Spec.Containers, v1.Container{ - Name: "appContainer-" + string(i), + Name: "appContainer-" + strconv.Itoa(i), Resources: v1.ResourceRequirements{ Requests: v1.ResourceList{ v1.ResourceName(v1.ResourceCPU): resource.MustParse(cpu.request), @@ -246,11 +263,12 @@ func TestCPUManagerAdd(t *testing.T) { containerRuntime: mockRuntimeService{ err: testCase.updateErr, }, + containerMap: containermap.NewContainerMap(), activePods: func() []*v1.Pod { return nil }, podStatusProvider: mockPodStatusProvider{}, } - pod := makePod("2", "2") + pod := makePod("fakePod", "fakeContainer", "2", "2") container := &pod.Spec.Containers[0] err := mgr.AddContainer(pod, container, "fakeID") if !reflect.DeepEqual(err, testCase.expErr) { @@ -264,6 +282,237 @@ func TestCPUManagerAdd(t *testing.T) { } } +func TestCPUManagerAddWithInitContainers(t *testing.T) { + testCases := []struct { + description string + topo *topology.CPUTopology + numReservedCPUs int + initContainerIDs []string + containerIDs []string + stAssignments state.ContainerCPUAssignments + stDefaultCPUSet cpuset.CPUSet + pod *v1.Pod + expInitCSets []cpuset.CPUSet + expCSets []cpuset.CPUSet + }{ + { + description: "No Guaranteed Init CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{{"100m", "100m"}}, + []struct{ request, limit string }{{"4000m", "4000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet()}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + }, + { + description: "Equal Number of Guaranteed CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{{"4000m", "4000m"}}, + []struct{ request, limit string }{{"4000m", "4000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + }, + { + description: "More Init Container Guaranteed CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{{"6000m", "6000m"}}, + []struct{ request, limit string }{{"4000m", "4000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + }, + { + description: "Less Init Container Guaranteed CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{{"2000m", "2000m"}}, + []struct{ request, limit string }{{"4000m", "4000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + }, + { + description: "Multi Init Container Equal CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{ + {"2000m", "2000m"}, + {"2000m", "2000m"}}, + []struct{ request, limit string }{ + {"2000m", "2000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4), + cpuset.NewCPUSet(0, 4)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4)}, + }, + { + description: "Multi Init Container Less CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{ + {"4000m", "4000m"}, + {"4000m", "4000m"}}, + []struct{ request, limit string }{ + {"2000m", "2000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5), + cpuset.NewCPUSet(0, 4, 1, 5)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4)}, + }, + { + description: "Multi Init Container More CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{ + {"2000m", "2000m"}, + {"2000m", "2000m"}}, + []struct{ request, limit string }{ + {"4000m", "4000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4), + cpuset.NewCPUSet(0, 4)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5)}, + }, + { + description: "Multi Init Container Increasing CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, + containerIDs: []string{"appFakeID"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{ + {"2000m", "2000m"}, + {"4000m", "4000m"}}, + []struct{ request, limit string }{ + {"6000m", "6000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4), + cpuset.NewCPUSet(0, 4, 1, 5)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)}, + }, + { + description: "Multi Init, Multi App Container Split CPUs", + topo: topoSingleSocketHT, + numReservedCPUs: 0, + stAssignments: state.ContainerCPUAssignments{}, + stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), + initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, + containerIDs: []string{"appFakeID-1", "appFakeID-2"}, + pod: makeMultiContainerPod( + []struct{ request, limit string }{ + {"2000m", "2000m"}, + {"4000m", "4000m"}}, + []struct{ request, limit string }{ + {"2000m", "2000m"}, + {"2000m", "2000m"}}), + expInitCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4), + cpuset.NewCPUSet(0, 4, 1, 5)}, + expCSets: []cpuset.CPUSet{ + cpuset.NewCPUSet(0, 4), + cpuset.NewCPUSet(1, 5)}, + }, + } + + for _, testCase := range testCases { + policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) + + state := &mockState{ + assignments: testCase.stAssignments, + defaultCPUSet: testCase.stDefaultCPUSet, + } + + mgr := &manager{ + policy: policy, + state: state, + containerRuntime: mockRuntimeService{}, + containerMap: containermap.NewContainerMap(), + activePods: func() []*v1.Pod { return nil }, + podStatusProvider: mockPodStatusProvider{}, + } + + containers := append( + testCase.pod.Spec.InitContainers, + testCase.pod.Spec.Containers...) + + containerIDs := append( + testCase.initContainerIDs, + testCase.containerIDs...) + + expCSets := append( + testCase.expInitCSets, + testCase.expCSets...) + + for i := range containers { + err := mgr.AddContainer(testCase.pod, &containers[i], containerIDs[i]) + if err != nil { + t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v", + testCase.description, containerIDs[i], err) + } + + cset, found := state.assignments[string(testCase.pod.UID)][containers[i].Name] + if !expCSets[i].IsEmpty() && !found { + t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v", + testCase.description, containers[i].Name, state.assignments) + } + + if found && !cset.Equals(expCSets[i]) { + t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v", + testCase.description, expCSets[i], containers[i].Name, cset) + } + } + } +} + func TestCPUManagerGenerate(t *testing.T) { testCases := []struct { description string @@ -377,6 +626,9 @@ func TestCPUManagerGenerate(t *testing.T) { } func TestCPUManagerRemove(t *testing.T) { + containerID := "fakeID" + containerMap := containermap.NewContainerMap() + mgr := &manager{ policy: &mockPolicy{ err: nil, @@ -386,11 +638,13 @@ func TestCPUManagerRemove(t *testing.T) { defaultCPUSet: cpuset.NewCPUSet(), }, containerRuntime: mockRuntimeService{}, + containerMap: containerMap, activePods: func() []*v1.Pod { return nil }, podStatusProvider: mockPodStatusProvider{}, } - err := mgr.RemoveContainer("fakeID") + containerMap.Add("", "", containerID) + err := mgr.RemoveContainer(containerID) if err != nil { t.Errorf("CPU Manager RemoveContainer() error. expected error to be nil but got: %v", err) } @@ -401,11 +655,13 @@ func TestCPUManagerRemove(t *testing.T) { }, state: state.NewMemoryState(), containerRuntime: mockRuntimeService{}, + containerMap: containerMap, activePods: func() []*v1.Pod { return nil }, podStatusProvider: mockPodStatusProvider{}, } - err = mgr.RemoveContainer("fakeID") + containerMap.Add("", "", containerID) + err = mgr.RemoveContainer(containerID) if !reflect.DeepEqual(err, fmt.Errorf("fake error")) { t.Errorf("CPU Manager RemoveContainer() error. expected error: fake error but got: %v", err) } @@ -429,12 +685,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -443,18 +699,20 @@ func TestReconcileState(t *testing.T) { pspPS: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ { - Name: "fakeName", - ContainerID: "docker://fakeID", + Name: "fakeContainerName", + ContainerID: "docker://fakeContainerID", }, }, }, pspFound: true, stAssignments: state.ContainerCPUAssignments{ - "fakeID": cpuset.NewCPUSet(1, 2), + "fakePodUID": map[string]cpuset.CPUSet{ + "fakeContainerName": cpuset.NewCPUSet(1, 2), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), updateErr: nil, - expectSucceededContainerName: "fakeName", + expectSucceededContainerName: "fakeContainerName", expectFailedContainerName: "", }, { @@ -463,12 +721,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ InitContainers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -477,18 +735,20 @@ func TestReconcileState(t *testing.T) { pspPS: v1.PodStatus{ InitContainerStatuses: []v1.ContainerStatus{ { - Name: "fakeName", - ContainerID: "docker://fakeID", + Name: "fakeContainerName", + ContainerID: "docker://fakeContainerID", }, }, }, pspFound: true, stAssignments: state.ContainerCPUAssignments{ - "fakeID": cpuset.NewCPUSet(1, 2), + "fakePodUID": map[string]cpuset.CPUSet{ + "fakeContainerName": cpuset.NewCPUSet(1, 2), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), updateErr: nil, - expectSucceededContainerName: "fakeName", + expectSucceededContainerName: "fakeContainerName", expectFailedContainerName: "", }, { @@ -497,12 +757,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -514,7 +774,7 @@ func TestReconcileState(t *testing.T) { stDefaultCPUSet: cpuset.NewCPUSet(), updateErr: nil, expectSucceededContainerName: "", - expectFailedContainerName: "fakeName", + expectFailedContainerName: "fakeContainerName", }, { description: "cpu manager reconclie - container id not found", @@ -522,12 +782,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -536,8 +796,8 @@ func TestReconcileState(t *testing.T) { pspPS: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ { - Name: "fakeName1", - ContainerID: "docker://fakeID", + Name: "fakeContainerName1", + ContainerID: "docker://fakeContainerID", }, }, }, @@ -546,7 +806,7 @@ func TestReconcileState(t *testing.T) { stDefaultCPUSet: cpuset.NewCPUSet(), updateErr: nil, expectSucceededContainerName: "", - expectFailedContainerName: "fakeName", + expectFailedContainerName: "fakeContainerName", }, { description: "cpu manager reconclie - cpuset is empty", @@ -554,12 +814,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -568,19 +828,21 @@ func TestReconcileState(t *testing.T) { pspPS: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ { - Name: "fakeName", - ContainerID: "docker://fakeID", + Name: "fakeContainerName", + ContainerID: "docker://fakeContainerID", }, }, }, pspFound: true, stAssignments: state.ContainerCPUAssignments{ - "fakeID": cpuset.NewCPUSet(), + "fakePodUID": map[string]cpuset.CPUSet{ + "fakeContainerName": cpuset.NewCPUSet(), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), updateErr: nil, expectSucceededContainerName: "", - expectFailedContainerName: "fakeName", + expectFailedContainerName: "fakeContainerName", }, { description: "cpu manager reconclie - container update error", @@ -588,12 +850,12 @@ func TestReconcileState(t *testing.T) { { ObjectMeta: metav1.ObjectMeta{ Name: "fakePodName", - UID: "fakeUID", + UID: "fakePodUID", }, Spec: v1.PodSpec{ Containers: []v1.Container{ { - Name: "fakeName", + Name: "fakeContainerName", }, }, }, @@ -602,19 +864,21 @@ func TestReconcileState(t *testing.T) { pspPS: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ { - Name: "fakeName", - ContainerID: "docker://fakeID", + Name: "fakeContainerName", + ContainerID: "docker://fakeContainerID", }, }, }, pspFound: true, stAssignments: state.ContainerCPUAssignments{ - "fakeID": cpuset.NewCPUSet(1, 2), + "fakePodUID": map[string]cpuset.CPUSet{ + "fakeContainerName": cpuset.NewCPUSet(1, 2), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(3, 4, 5, 6, 7), updateErr: fmt.Errorf("fake container update error"), expectSucceededContainerName: "", - expectFailedContainerName: "fakeName", + expectFailedContainerName: "fakeContainerName", }, } @@ -630,6 +894,7 @@ func TestReconcileState(t *testing.T) { containerRuntime: mockRuntimeService{ err: testCase.updateErr, }, + containerMap: containermap.NewContainerMap(), activePods: func() []*v1.Pod { return testCase.activePods }, @@ -724,11 +989,12 @@ func TestCPUManagerAddWithResvList(t *testing.T) { containerRuntime: mockRuntimeService{ err: testCase.updateErr, }, + containerMap: containermap.NewContainerMap(), activePods: func() []*v1.Pod { return nil }, podStatusProvider: mockPodStatusProvider{}, } - pod := makePod("2", "2") + pod := makePod("fakePod", "fakeContainer", "2", "2") container := &pod.Spec.Containers[0] err := mgr.AddContainer(pod, container, "fakeID") if !reflect.DeepEqual(err, testCase.expErr) { diff --git a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go index 94ef3dc83e1..7adfc2ba59e 100644 --- a/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/fake_cpu_manager.go @@ -19,6 +19,7 @@ package cpumanager import ( "k8s.io/api/core/v1" "k8s.io/klog" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state" "k8s.io/kubernetes/pkg/kubelet/cm/topologymanager" "k8s.io/kubernetes/pkg/kubelet/config" @@ -29,7 +30,7 @@ type fakeManager struct { state state.State } -func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService) { +func (m *fakeManager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, containerRuntime runtimeService, initialContainers containermap.ContainerMap) { klog.Info("[fake cpumanager] Start()") } diff --git a/pkg/kubelet/cm/cpumanager/policy.go b/pkg/kubelet/cm/cpumanager/policy.go index 83b5d07eedc..aead99ba5c4 100644 --- a/pkg/kubelet/cm/cpumanager/policy.go +++ b/pkg/kubelet/cm/cpumanager/policy.go @@ -27,9 +27,9 @@ type Policy interface { Name() string Start(s state.State) // AddContainer call is idempotent - AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error + AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error // RemoveContainer call is idempotent - RemoveContainer(s state.State, containerID string) error + RemoveContainer(s state.State, podUID string, containerName string) error // GetTopologyHints implements the topologymanager.HintProvider Interface // and is consulted to achieve NUMA aware resource alignment among this // and other resource controllers. diff --git a/pkg/kubelet/cm/cpumanager/policy_none.go b/pkg/kubelet/cm/cpumanager/policy_none.go index fd56d08f89a..8f769f62374 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none.go +++ b/pkg/kubelet/cm/cpumanager/policy_none.go @@ -43,11 +43,11 @@ func (p *nonePolicy) Start(s state.State) { klog.Info("[cpumanager] none policy: Start") } -func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) error { +func (p *nonePolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { return nil } -func (p *nonePolicy) RemoveContainer(s state.State, containerID string) error { +func (p *nonePolicy) RemoveContainer(s state.State, podUID string, containerName string) error { return nil } diff --git a/pkg/kubelet/cm/cpumanager/policy_none_test.go b/pkg/kubelet/cm/cpumanager/policy_none_test.go index 99e61fdbbe2..a28dfa6a0e9 100644 --- a/pkg/kubelet/cm/cpumanager/policy_none_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_none_test.go @@ -41,10 +41,10 @@ func TestNonePolicyAdd(t *testing.T) { defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), } - testPod := makePod("1000m", "1000m") + testPod := makePod("fakePod", "fakeContainer", "1000m", "1000m") container := &testPod.Spec.Containers[0] - err := policy.AddContainer(st, testPod, container, "fakeID") + err := policy.AddContainer(st, testPod, container) if err != nil { t.Errorf("NonePolicy AddContainer() error. expected no error but got: %v", err) } @@ -58,7 +58,10 @@ func TestNonePolicyRemove(t *testing.T) { defaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), } - err := policy.RemoveContainer(st, "fakeID") + testPod := makePod("fakePod", "fakeContainer", "1000m", "1000m") + + container := &testPod.Spec.Containers[0] + err := policy.RemoveContainer(st, string(testPod.UID), container.Name) if err != nil { t.Errorf("NonePolicy RemoveContainer() error. expected no error but got %v", err) } diff --git a/pkg/kubelet/cm/cpumanager/policy_static.go b/pkg/kubelet/cm/cpumanager/policy_static.go index dd71c026dfb..61fe3cd8a63 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static.go +++ b/pkg/kubelet/cm/cpumanager/policy_static.go @@ -75,10 +75,6 @@ type staticPolicy struct { topology *topology.CPUTopology // set of CPUs that is not available for exclusive assignment reserved cpuset.CPUSet - // containerMap provides a mapping from - // (pod, container) -> containerID - // for all containers a pod - containerMap containerMap // topology manager reference to get container Topology affinity affinity topologymanager.Store } @@ -110,10 +106,9 @@ func NewStaticPolicy(topology *topology.CPUTopology, numReservedCPUs int, reserv klog.Infof("[cpumanager] reserved %d CPUs (\"%s\") not available for exclusive assignment", reserved.Size(), reserved) return &staticPolicy{ - topology: topology, - reserved: reserved, - containerMap: newContainerMap(), - affinity: affinity, + topology: topology, + reserved: reserved, + affinity: affinity, } } @@ -153,11 +148,13 @@ func (p *staticPolicy) validateState(s state.State) error { } // 2. Check if state for static policy is consistent - for cID, cset := range tmpAssignments { - // None of the cpu in DEFAULT cset should be in s.assignments - if !tmpDefaultCPUset.Intersection(cset).IsEmpty() { - return fmt.Errorf("container id: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"", - cID, cset.String(), tmpDefaultCPUset.String()) + for pod := range tmpAssignments { + for container, cset := range tmpAssignments[pod] { + // None of the cpu in DEFAULT cset should be in s.assignments + if !tmpDefaultCPUset.Intersection(cset).IsEmpty() { + return fmt.Errorf("pod: %s, container: %s cpuset: \"%s\" overlaps with default cpuset \"%s\"", + pod, container, cset.String(), tmpDefaultCPUset.String()) + } } } @@ -170,8 +167,10 @@ func (p *staticPolicy) validateState(s state.State) error { // the set of CPUs stored in the state. totalKnownCPUs := tmpDefaultCPUset.Clone() tmpCPUSets := []cpuset.CPUSet{} - for _, cset := range tmpAssignments { - tmpCPUSets = append(tmpCPUSets, cset) + for pod := range tmpAssignments { + for _, cset := range tmpAssignments[pod] { + tmpCPUSets = append(tmpCPUSets, cset) + } } totalKnownCPUs = totalKnownCPUs.UnionAll(tmpCPUSets) if !totalKnownCPUs.Equals(p.topology.CPUDetails.CPUs()) { @@ -187,40 +186,16 @@ func (p *staticPolicy) assignableCPUs(s state.State) cpuset.CPUSet { return s.GetDefaultCPUSet().Difference(p.reserved) } -func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container, containerID string) (rerr error) { - // So long as this function does not return an error, - // add (pod, container, containerID) to the containerMap. - defer func() { - if rerr == nil { - p.containerMap.Add(pod, container, containerID) - } - }() - +func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Container) error { if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 { - klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) + klog.Infof("[cpumanager] static policy: AddContainer (pod: %s, container: %s)", pod.Name, container.Name) // container belongs in an exclusively allocated pool - if _, ok := s.GetCPUSet(containerID); ok { - klog.Infof("[cpumanager] static policy: container already present in state, skipping (container: %s, container id: %s)", container.Name, containerID) + if _, ok := s.GetCPUSet(string(pod.UID), container.Name); ok { + klog.Infof("[cpumanager] static policy: container already present in state, skipping (pod: %s, container: %s)", pod.Name, container.Name) return nil } - // Proactively remove CPUs from init containers that have already run. - // They are guaranteed to have run to completion before any other - // container is run. - for _, initContainer := range pod.Spec.InitContainers { - if container.Name != initContainer.Name { - initContainerID, err := p.containerMap.Get(pod, &initContainer) - if err != nil { - continue - } - err = p.RemoveContainer(s, initContainerID) - if err != nil { - klog.Warningf("[cpumanager] unable to remove init container (container id: %s, error: %v)", initContainerID, err) - } - } - } - // Call Topology Manager to get the aligned socket affinity across all hint providers. hint := p.affinity.GetAffinity(string(pod.UID), container.Name) klog.Infof("[cpumanager] Pod %v, Container %v Topology Affinity is: %v", pod.UID, container.Name, hint) @@ -228,27 +203,19 @@ func (p *staticPolicy) AddContainer(s state.State, pod *v1.Pod, container *v1.Co // Allocate CPUs according to the NUMA affinity contained in the hint. cpuset, err := p.allocateCPUs(s, numCPUs, hint.NUMANodeAffinity) if err != nil { - klog.Errorf("[cpumanager] unable to allocate %d CPUs (container id: %s, error: %v)", numCPUs, containerID, err) + klog.Errorf("[cpumanager] unable to allocate %d CPUs (pod: %s, container: %s, error: %v)", numCPUs, pod.Name, container.Name, err) return err } - s.SetCPUSet(containerID, cpuset) + s.SetCPUSet(string(pod.UID), container.Name, cpuset) } // container belongs in the shared pool (nothing to do; use default cpuset) return nil } -func (p *staticPolicy) RemoveContainer(s state.State, containerID string) (rerr error) { - // So long as this function does not return an error, - // remove containerID from the containerMap. - defer func() { - if rerr == nil { - p.containerMap.Remove(containerID) - } - }() - - klog.Infof("[cpumanager] static policy: RemoveContainer (container id: %s)", containerID) - if toRelease, ok := s.GetCPUSet(containerID); ok { - s.Delete(containerID) +func (p *staticPolicy) RemoveContainer(s state.State, podUID string, containerName string) error { + klog.Infof("[cpumanager] static policy: RemoveContainer (pod: %s, container: %s)", podUID, containerName) + if toRelease, ok := s.GetCPUSet(podUID, containerName); ok { + s.Delete(podUID, containerName) // Mutate the shared pool, adding released cpus. s.SetDefaultCPUSet(s.GetDefaultCPUSet().Union(toRelease)) } @@ -328,8 +295,7 @@ func (p *staticPolicy) GetTopologyHints(s state.State, pod v1.Pod, container v1. // Short circuit to regenerate the same hints if there are already // guaranteed CPUs allocated to the Container. This might happen after a // kubelet restart, for example. - containerID, _ := findContainerIDByName(&pod.Status, container.Name) - if allocated, exists := s.GetCPUSet(containerID); exists { + if allocated, exists := s.GetCPUSet(string(pod.UID), container.Name); exists { if allocated.Size() != requested { klog.Errorf("[cpumanager] CPUs already allocated to (pod %v, container %v) with different number than request: requested: %d, allocated: %d", string(pod.UID), container.Name, requested, allocated.Size()) return map[string][]topologymanager.TopologyHint{ diff --git a/pkg/kubelet/cm/cpumanager/policy_static_test.go b/pkg/kubelet/cm/cpumanager/policy_static_test.go index a2df92f34d7..f43c52a0a90 100644 --- a/pkg/kubelet/cm/cpumanager/policy_static_test.go +++ b/pkg/kubelet/cm/cpumanager/policy_static_test.go @@ -33,7 +33,8 @@ type staticPolicyTest struct { description string topo *topology.CPUTopology numReservedCPUs int - containerID string + podUID string + containerName string stAssignments state.ContainerCPUAssignments stDefaultCPUSet cpuset.CPUSet pod *v1.Pod @@ -43,19 +44,6 @@ type staticPolicyTest struct { expPanic bool } -type staticPolicyMultiContainerTest struct { - description string - topo *topology.CPUTopology - numReservedCPUs int - initContainerIDs []string - containerIDs []string - stAssignments state.ContainerCPUAssignments - stDefaultCPUSet cpuset.CPUSet - pod *v1.Pod - expInitCSets []cpuset.CPUSet - expCSets []cpuset.CPUSet -} - func TestStaticPolicyName(t *testing.T) { policy := NewStaticPolicy(topoSingleSocketHT, 1, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) @@ -72,7 +60,9 @@ func TestStaticPolicyStart(t *testing.T) { description: "non-corrupted state", topo: topoDualSocketHT, stAssignments: state.ContainerCPUAssignments{ - "0": cpuset.NewCPUSet(0), + "fakePod": map[string]cpuset.CPUSet{ + "0": cpuset.NewCPUSet(0), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), expCSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11), @@ -97,7 +87,9 @@ func TestStaticPolicyStart(t *testing.T) { description: "assigned core 2 is still present in available cpuset", topo: topoDualSocketHT, stAssignments: state.ContainerCPUAssignments{ - "0": cpuset.NewCPUSet(0, 1, 2), + "fakePod": map[string]cpuset.CPUSet{ + "0": cpuset.NewCPUSet(0, 1, 2), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(2, 3, 4, 5, 6, 7, 8, 9, 10, 11), expPanic: true, @@ -106,8 +98,10 @@ func TestStaticPolicyStart(t *testing.T) { description: "core 12 is not present in topology but is in state cpuset", topo: topoDualSocketHT, stAssignments: state.ContainerCPUAssignments{ - "0": cpuset.NewCPUSet(0, 1, 2), - "1": cpuset.NewCPUSet(3, 4), + "fakePod": map[string]cpuset.CPUSet{ + "0": cpuset.NewCPUSet(0, 1, 2), + "1": cpuset.NewCPUSet(3, 4), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(5, 6, 7, 8, 9, 10, 11, 12), expPanic: true, @@ -116,8 +110,10 @@ func TestStaticPolicyStart(t *testing.T) { description: "core 11 is present in topology but is not in state cpuset", topo: topoDualSocketHT, stAssignments: state.ContainerCPUAssignments{ - "0": cpuset.NewCPUSet(0, 1, 2), - "1": cpuset.NewCPUSet(3, 4), + "fakePod": map[string]cpuset.CPUSet{ + "0": cpuset.NewCPUSet(0, 1, 2), + "1": cpuset.NewCPUSet(3, 4), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(5, 6, 7, 8, 9, 10), expPanic: true, @@ -179,10 +175,9 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodSingleCore, SingleSocketHT, ExpectError", topo: topoSingleSocketHT, numReservedCPUs: 1, - containerID: "fakeID2", stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - pod: makePod("8000m", "8000m"), + pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"), expErr: fmt.Errorf("not enough cpus available to satisfy request"), expCPUAlloc: false, expCSet: cpuset.NewCPUSet(), @@ -191,10 +186,9 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodSingleCore, SingleSocketHT, ExpectAllocOneCPU", topo: topoSingleSocketHT, numReservedCPUs: 1, - containerID: "fakeID2", stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - pod: makePod("1000m", "1000m"), + pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core @@ -203,12 +197,13 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, SingleSocketHT, ExpectAllocOneCore", topo: topoSingleSocketHT, numReservedCPUs: 1, - containerID: "fakeID3", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": cpuset.NewCPUSet(2, 3, 6, 7), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": cpuset.NewCPUSet(2, 3, 6, 7), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5), - pod: makePod("2000m", "2000m"), + pod: makePod("fakePod", "fakeContainer3", "2000m", "2000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(1, 5), @@ -217,12 +212,13 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, SingleSocketHT, ExpectSameAllocation", topo: topoSingleSocketHT, numReservedCPUs: 1, - containerID: "fakeID3", stAssignments: state.ContainerCPUAssignments{ - "fakeID3": cpuset.NewCPUSet(2, 3, 6, 7), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer3": cpuset.NewCPUSet(2, 3, 6, 7), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5), - pod: makePod("4000m", "4000m"), + pod: makePod("fakePod", "fakeContainer3", "4000m", "4000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(2, 3, 6, 7), @@ -231,12 +227,13 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, DualSocketHT, ExpectAllocOneSocket", topo: topoDualSocketHT, numReservedCPUs: 1, - containerID: "fakeID3", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": cpuset.NewCPUSet(2), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": cpuset.NewCPUSet(2), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11), - pod: makePod("6000m", "6000m"), + pod: makePod("fakePod", "fakeContainer3", "6000m", "6000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(1, 3, 5, 7, 9, 11), @@ -245,12 +242,13 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, DualSocketHT, ExpectAllocThreeCores", topo: topoDualSocketHT, numReservedCPUs: 1, - containerID: "fakeID3", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": cpuset.NewCPUSet(1, 5), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": cpuset.NewCPUSet(1, 5), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(0, 2, 3, 4, 6, 7, 8, 9, 10, 11), - pod: makePod("6000m", "6000m"), + pod: makePod("fakePod", "fakeContainer3", "6000m", "6000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(2, 3, 4, 8, 9, 10), @@ -259,12 +257,13 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, DualSocketNoHT, ExpectAllocOneSocket", topo: topoDualSocketNoHT, numReservedCPUs: 1, - containerID: "fakeID1", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": cpuset.NewCPUSet(), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": cpuset.NewCPUSet(), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7), - pod: makePod("4000m", "4000m"), + pod: makePod("fakePod", "fakeContainer1", "4000m", "4000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(4, 5, 6, 7), @@ -273,12 +272,13 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, DualSocketNoHT, ExpectAllocFourCores", topo: topoDualSocketNoHT, numReservedCPUs: 1, - containerID: "fakeID1", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": cpuset.NewCPUSet(4, 5), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": cpuset.NewCPUSet(4, 5), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 6, 7), - pod: makePod("4000m", "4000m"), + pod: makePod("fakePod", "fakeContainer1", "4000m", "4000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(1, 3, 6, 7), @@ -287,12 +287,13 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, DualSocketHT, ExpectAllocOneSocketOneCore", topo: topoDualSocketHT, numReservedCPUs: 1, - containerID: "fakeID3", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": cpuset.NewCPUSet(2), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": cpuset.NewCPUSet(2), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 3, 4, 5, 6, 7, 8, 9, 10, 11), - pod: makePod("8000m", "8000m"), + pod: makePod("fakePod", "fakeContainer3", "8000m", "8000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(1, 3, 4, 5, 7, 9, 10, 11), @@ -301,10 +302,9 @@ func TestStaticPolicyAdd(t *testing.T) { description: "NonGuPod, SingleSocketHT, NoAlloc", topo: topoSingleSocketHT, numReservedCPUs: 1, - containerID: "fakeID1", stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - pod: makePod("1000m", "2000m"), + pod: makePod("fakePod", "fakeContainer1", "1000m", "2000m"), expErr: nil, expCPUAlloc: false, expCSet: cpuset.NewCPUSet(), @@ -313,10 +313,9 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodNonIntegerCore, SingleSocketHT, NoAlloc", topo: topoSingleSocketHT, numReservedCPUs: 1, - containerID: "fakeID4", stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - pod: makePod("977m", "977m"), + pod: makePod("fakePod", "fakeContainer4", "977m", "977m"), expErr: nil, expCPUAlloc: false, expCSet: cpuset.NewCPUSet(), @@ -325,12 +324,13 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, SingleSocketHT, NoAllocExpectError", topo: topoSingleSocketHT, numReservedCPUs: 1, - containerID: "fakeID5", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": cpuset.NewCPUSet(1, 2, 3, 4, 5, 6), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": cpuset.NewCPUSet(1, 2, 3, 4, 5, 6), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(0, 7), - pod: makePod("2000m", "2000m"), + pod: makePod("fakePod", "fakeContainer5", "2000m", "2000m"), expErr: fmt.Errorf("not enough cpus available to satisfy request"), expCPUAlloc: false, expCSet: cpuset.NewCPUSet(), @@ -339,12 +339,13 @@ func TestStaticPolicyAdd(t *testing.T) { description: "GuPodMultipleCores, DualSocketHT, NoAllocExpectError", topo: topoDualSocketHT, numReservedCPUs: 1, - containerID: "fakeID5", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": cpuset.NewCPUSet(1, 2, 3), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": cpuset.NewCPUSet(1, 2, 3), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(0, 4, 5, 6, 7, 8, 9, 10, 11), - pod: makePod("10000m", "10000m"), + pod: makePod("fakePod", "fakeContainer5", "10000m", "10000m"), expErr: fmt.Errorf("not enough cpus available to satisfy request"), expCPUAlloc: false, expCSet: cpuset.NewCPUSet(), @@ -355,12 +356,13 @@ func TestStaticPolicyAdd(t *testing.T) { // Expect all CPUs from Socket 0. description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocSock0", topo: topoQuadSocketFourWayHT, - containerID: "fakeID5", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": cpuset.NewCPUSet(3, 11, 4, 5, 6, 7), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": cpuset.NewCPUSet(3, 11, 4, 5, 6, 7), + }, }, stDefaultCPUSet: largeTopoCPUSet.Difference(cpuset.NewCPUSet(3, 11, 4, 5, 6, 7)), - pod: makePod("72000m", "72000m"), + pod: makePod("fakePod", "fakeContainer5", "72000m", "72000m"), expErr: nil, expCPUAlloc: true, expCSet: largeTopoSock0CPUSet, @@ -370,13 +372,14 @@ func TestStaticPolicyAdd(t *testing.T) { // Expect CPUs from the 2 full cores available from the three Sockets. description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocAllFullCoresFromThreeSockets", topo: topoQuadSocketFourWayHT, - containerID: "fakeID5", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(1, 25, 13, 38, 2, 9, 11, 35, 23, 48, 12, 51, - 53, 173, 113, 233, 54, 61)), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(1, 25, 13, 38, 2, 9, 11, 35, 23, 48, 12, 51, + 53, 173, 113, 233, 54, 61)), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(1, 25, 13, 38, 2, 9, 11, 35, 23, 48, 12, 51, 53, 173, 113, 233, 54, 61), - pod: makePod("12000m", "12000m"), + pod: makePod("fakePod", "fakeCcontainer5", "12000m", "12000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(1, 25, 13, 38, 11, 35, 23, 48, 53, 173, 113, 233), @@ -386,14 +389,15 @@ func TestStaticPolicyAdd(t *testing.T) { // Expect all CPUs from Socket 1 and the hyper-threads from the full core. description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocAllSock1+FullCore", topo: topoQuadSocketFourWayHT, - containerID: "fakeID5", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": largeTopoCPUSet.Difference(largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47, 53, - 173, 61, 181, 108, 228, 115, 235))), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": largeTopoCPUSet.Difference(largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47, 53, + 173, 61, 181, 108, 228, 115, 235))), + }, }, stDefaultCPUSet: largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47, 53, 173, 61, 181, 108, 228, 115, 235)), - pod: makePod("76000m", "76000m"), + pod: makePod("fakePod", "fakeContainer5", "76000m", "76000m"), expErr: nil, expCPUAlloc: true, expCSet: largeTopoSock1CPUSet.Union(cpuset.NewCPUSet(10, 34, 22, 47)), @@ -403,12 +407,13 @@ func TestStaticPolicyAdd(t *testing.T) { // Expect allocation of all the CPUs from the partial cores. description: "GuPodMultipleCores, topoQuadSocketFourWayHT, ExpectAllocCPUs", topo: topoQuadSocketFourWayHT, - containerID: "fakeID5", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), - pod: makePod("5000m", "5000m"), + pod: makePod("fakePod", "fakeContainer5", "5000m", "5000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(10, 11, 53, 67, 52), @@ -419,12 +424,13 @@ func TestStaticPolicyAdd(t *testing.T) { // Error is expect since available CPUs are less than the request. description: "GuPodMultipleCores, topoQuadSocketFourWayHT, NoAlloc", topo: topoQuadSocketFourWayHT, - containerID: "fakeID5", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": largeTopoCPUSet.Difference(cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52)), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(10, 11, 53, 37, 55, 67, 52), - pod: makePod("76000m", "76000m"), + pod: makePod("fakePod", "fakeContainer5", "76000m", "76000m"), expErr: fmt.Errorf("not enough cpus available to satisfy request"), expCPUAlloc: false, expCSet: cpuset.NewCPUSet(), @@ -440,17 +446,17 @@ func TestStaticPolicyAdd(t *testing.T) { } container := &testCase.pod.Spec.Containers[0] - err := policy.AddContainer(st, testCase.pod, container, testCase.containerID) + err := policy.AddContainer(st, testCase.pod, container) if !reflect.DeepEqual(err, testCase.expErr) { t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v", testCase.description, testCase.expErr, err) } if testCase.expCPUAlloc { - cset, found := st.assignments[testCase.containerID] + cset, found := st.assignments[string(testCase.pod.UID)][container.Name] if !found { - t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v", - testCase.description, testCase.containerID, st.assignments) + t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v", + testCase.description, container.Name, st.assignments) } if !reflect.DeepEqual(cset, testCase.expCSet) { @@ -465,221 +471,10 @@ func TestStaticPolicyAdd(t *testing.T) { } if !testCase.expCPUAlloc { - _, found := st.assignments[testCase.containerID] + _, found := st.assignments[string(testCase.pod.UID)][container.Name] if found { - t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container id %v to be present in assignments %v", - testCase.description, testCase.containerID, st.assignments) - } - } - } -} - -func TestStaticPolicyAddWithInitContainers(t *testing.T) { - testCases := []staticPolicyMultiContainerTest{ - { - description: "No Guaranteed Init CPUs", - topo: topoSingleSocketHT, - numReservedCPUs: 0, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - initContainerIDs: []string{"initFakeID"}, - containerIDs: []string{"appFakeID"}, - pod: makeMultiContainerPod( - []struct{ request, limit string }{{"100m", "100m"}}, - []struct{ request, limit string }{{"4000m", "4000m"}}), - expInitCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet()}, - expCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4, 1, 5)}, - }, - { - description: "Equal Number of Guaranteed CPUs", - topo: topoSingleSocketHT, - numReservedCPUs: 0, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - initContainerIDs: []string{"initFakeID"}, - containerIDs: []string{"appFakeID"}, - pod: makeMultiContainerPod( - []struct{ request, limit string }{{"4000m", "4000m"}}, - []struct{ request, limit string }{{"4000m", "4000m"}}), - expInitCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4, 1, 5)}, - expCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4, 1, 5)}, - }, - { - description: "More Init Container Guaranteed CPUs", - topo: topoSingleSocketHT, - numReservedCPUs: 0, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - initContainerIDs: []string{"initFakeID"}, - containerIDs: []string{"appFakeID"}, - pod: makeMultiContainerPod( - []struct{ request, limit string }{{"6000m", "6000m"}}, - []struct{ request, limit string }{{"4000m", "4000m"}}), - expInitCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)}, - expCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4, 1, 5)}, - }, - { - description: "Less Init Container Guaranteed CPUs", - topo: topoSingleSocketHT, - numReservedCPUs: 0, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - initContainerIDs: []string{"initFakeID"}, - containerIDs: []string{"appFakeID"}, - pod: makeMultiContainerPod( - []struct{ request, limit string }{{"2000m", "2000m"}}, - []struct{ request, limit string }{{"4000m", "4000m"}}), - expInitCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4)}, - expCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4, 1, 5)}, - }, - { - description: "Multi Init Container Equal CPUs", - topo: topoSingleSocketHT, - numReservedCPUs: 0, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, - containerIDs: []string{"appFakeID"}, - pod: makeMultiContainerPod( - []struct{ request, limit string }{ - {"2000m", "2000m"}, - {"2000m", "2000m"}}, - []struct{ request, limit string }{ - {"2000m", "2000m"}}), - expInitCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4), - cpuset.NewCPUSet(0, 4)}, - expCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4)}, - }, - { - description: "Multi Init Container Less CPUs", - topo: topoSingleSocketHT, - numReservedCPUs: 0, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, - containerIDs: []string{"appFakeID"}, - pod: makeMultiContainerPod( - []struct{ request, limit string }{ - {"4000m", "4000m"}, - {"4000m", "4000m"}}, - []struct{ request, limit string }{ - {"2000m", "2000m"}}), - expInitCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4, 1, 5), - cpuset.NewCPUSet(0, 4, 1, 5)}, - expCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4)}, - }, - { - description: "Multi Init Container More CPUs", - topo: topoSingleSocketHT, - numReservedCPUs: 0, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, - containerIDs: []string{"appFakeID"}, - pod: makeMultiContainerPod( - []struct{ request, limit string }{ - {"2000m", "2000m"}, - {"2000m", "2000m"}}, - []struct{ request, limit string }{ - {"4000m", "4000m"}}), - expInitCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4), - cpuset.NewCPUSet(0, 4)}, - expCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4, 1, 5)}, - }, - { - description: "Multi Init Container Increasing CPUs", - topo: topoSingleSocketHT, - numReservedCPUs: 0, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, - containerIDs: []string{"appFakeID"}, - pod: makeMultiContainerPod( - []struct{ request, limit string }{ - {"2000m", "2000m"}, - {"4000m", "4000m"}}, - []struct{ request, limit string }{ - {"6000m", "6000m"}}), - expInitCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4), - cpuset.NewCPUSet(0, 4, 1, 5)}, - expCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4, 1, 5, 2, 6)}, - }, - { - description: "Multi Init, Multi App Container Split CPUs", - topo: topoSingleSocketHT, - numReservedCPUs: 0, - stAssignments: state.ContainerCPUAssignments{}, - stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - initContainerIDs: []string{"initFakeID-1", "initFakeID-2"}, - containerIDs: []string{"appFakeID-1", "appFakeID-2"}, - pod: makeMultiContainerPod( - []struct{ request, limit string }{ - {"2000m", "2000m"}, - {"4000m", "4000m"}}, - []struct{ request, limit string }{ - {"2000m", "2000m"}, - {"2000m", "2000m"}}), - expInitCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4), - cpuset.NewCPUSet(0, 4, 1, 5)}, - expCSets: []cpuset.CPUSet{ - cpuset.NewCPUSet(0, 4), - cpuset.NewCPUSet(1, 5)}, - }, - } - - for _, testCase := range testCases { - policy := NewStaticPolicy(testCase.topo, testCase.numReservedCPUs, cpuset.NewCPUSet(), topologymanager.NewFakeManager()) - - st := &mockState{ - assignments: testCase.stAssignments, - defaultCPUSet: testCase.stDefaultCPUSet, - } - - containers := append( - testCase.pod.Spec.InitContainers, - testCase.pod.Spec.Containers...) - - containerIDs := append( - testCase.initContainerIDs, - testCase.containerIDs...) - - expCSets := append( - testCase.expInitCSets, - testCase.expCSets...) - - for i := range containers { - err := policy.AddContainer(st, testCase.pod, &containers[i], containerIDs[i]) - if err != nil { - t.Errorf("StaticPolicy AddContainer() error (%v). unexpected error for container id: %v: %v", - testCase.description, containerIDs[i], err) - } - - cset, found := st.assignments[containerIDs[i]] - if !expCSets[i].IsEmpty() && !found { - t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v", - testCase.description, containerIDs[i], st.assignments) - } - - if found && !cset.Equals(expCSets[i]) { - t.Errorf("StaticPolicy AddContainer() error (%v). expected cpuset %v for container %v but got %v", - testCase.description, expCSets[i], containerIDs[i], cset) + t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container %v to be present in assignments %v", + testCase.description, container.Name, st.assignments) } } } @@ -688,43 +483,55 @@ func TestStaticPolicyAddWithInitContainers(t *testing.T) { func TestStaticPolicyRemove(t *testing.T) { testCases := []staticPolicyTest{ { - description: "SingleSocketHT, DeAllocOneContainer", - topo: topoSingleSocketHT, - containerID: "fakeID1", + description: "SingleSocketHT, DeAllocOneContainer", + topo: topoSingleSocketHT, + podUID: "fakePod", + containerName: "fakeContainer1", stAssignments: state.ContainerCPUAssignments{ - "fakeID1": cpuset.NewCPUSet(1, 2, 3), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer1": cpuset.NewCPUSet(1, 2, 3), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(4, 5, 6, 7), expCSet: cpuset.NewCPUSet(1, 2, 3, 4, 5, 6, 7), }, { - description: "SingleSocketHT, DeAllocOneContainer, BeginEmpty", - topo: topoSingleSocketHT, - containerID: "fakeID1", + description: "SingleSocketHT, DeAllocOneContainer, BeginEmpty", + topo: topoSingleSocketHT, + podUID: "fakePod", + containerName: "fakeContainer1", stAssignments: state.ContainerCPUAssignments{ - "fakeID1": cpuset.NewCPUSet(1, 2, 3), - "fakeID2": cpuset.NewCPUSet(4, 5, 6, 7), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer1": cpuset.NewCPUSet(1, 2, 3), + "fakeContainer2": cpuset.NewCPUSet(4, 5, 6, 7), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(), expCSet: cpuset.NewCPUSet(1, 2, 3), }, { - description: "SingleSocketHT, DeAllocTwoContainer", - topo: topoSingleSocketHT, - containerID: "fakeID1", + description: "SingleSocketHT, DeAllocTwoContainer", + topo: topoSingleSocketHT, + podUID: "fakePod", + containerName: "fakeContainer1", stAssignments: state.ContainerCPUAssignments{ - "fakeID1": cpuset.NewCPUSet(1, 3, 5), - "fakeID2": cpuset.NewCPUSet(2, 4), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer1": cpuset.NewCPUSet(1, 3, 5), + "fakeContainer2": cpuset.NewCPUSet(2, 4), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(6, 7), expCSet: cpuset.NewCPUSet(1, 3, 5, 6, 7), }, { - description: "SingleSocketHT, NoDeAlloc", - topo: topoSingleSocketHT, - containerID: "fakeID2", + description: "SingleSocketHT, NoDeAlloc", + topo: topoSingleSocketHT, + podUID: "fakePod", + containerName: "fakeContainer2", stAssignments: state.ContainerCPUAssignments{ - "fakeID1": cpuset.NewCPUSet(1, 3, 5), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer1": cpuset.NewCPUSet(1, 3, 5), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(2, 4, 6, 7), expCSet: cpuset.NewCPUSet(2, 4, 6, 7), @@ -739,16 +546,16 @@ func TestStaticPolicyRemove(t *testing.T) { defaultCPUSet: testCase.stDefaultCPUSet, } - policy.RemoveContainer(st, testCase.containerID) + policy.RemoveContainer(st, testCase.podUID, testCase.containerName) if !reflect.DeepEqual(st.defaultCPUSet, testCase.expCSet) { t.Errorf("StaticPolicy RemoveContainer() error (%v). expected default cpuset %v but got %v", testCase.description, testCase.expCSet, st.defaultCPUSet) } - if _, found := st.assignments[testCase.containerID]; found { - t.Errorf("StaticPolicy RemoveContainer() error (%v). expected containerID %v not be in assignments %v", - testCase.description, testCase.containerID, st.assignments) + if _, found := st.assignments[testCase.podUID][testCase.containerName]; found { + t.Errorf("StaticPolicy RemoveContainer() error (%v). expected (pod %v, container %v) not be in assignments %v", + testCase.description, testCase.podUID, testCase.containerName, st.assignments) } } } @@ -850,7 +657,6 @@ type staticPolicyTestWithResvList struct { topo *topology.CPUTopology numReservedCPUs int reserved cpuset.CPUSet - containerID string stAssignments state.ContainerCPUAssignments stDefaultCPUSet cpuset.CPUSet pod *v1.Pod @@ -925,10 +731,9 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 1, reserved: cpuset.NewCPUSet(0), - containerID: "fakeID2", stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - pod: makePod("8000m", "8000m"), + pod: makePod("fakePod", "fakeContainer2", "8000m", "8000m"), expErr: fmt.Errorf("not enough cpus available to satisfy request"), expCPUAlloc: false, expCSet: cpuset.NewCPUSet(), @@ -938,10 +743,9 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 2, reserved: cpuset.NewCPUSet(0, 1), - containerID: "fakeID2", stAssignments: state.ContainerCPUAssignments{}, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7), - pod: makePod("1000m", "1000m"), + pod: makePod("fakePod", "fakeContainer2", "1000m", "1000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(4), // expect sibling of partial core @@ -951,12 +755,13 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { topo: topoSingleSocketHT, numReservedCPUs: 2, reserved: cpuset.NewCPUSet(0, 1), - containerID: "fakeID3", stAssignments: state.ContainerCPUAssignments{ - "fakeID100": cpuset.NewCPUSet(2, 3, 6, 7), + "fakePod": map[string]cpuset.CPUSet{ + "fakeContainer100": cpuset.NewCPUSet(2, 3, 6, 7), + }, }, stDefaultCPUSet: cpuset.NewCPUSet(0, 1, 4, 5), - pod: makePod("2000m", "2000m"), + pod: makePod("fakePod", "fakeContainer3", "2000m", "2000m"), expErr: nil, expCPUAlloc: true, expCSet: cpuset.NewCPUSet(4, 5), @@ -972,17 +777,17 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } container := &testCase.pod.Spec.Containers[0] - err := policy.AddContainer(st, testCase.pod, container, testCase.containerID) + err := policy.AddContainer(st, testCase.pod, container) if !reflect.DeepEqual(err, testCase.expErr) { t.Errorf("StaticPolicy AddContainer() error (%v). expected add error: %v but got: %v", testCase.description, testCase.expErr, err) } if testCase.expCPUAlloc { - cset, found := st.assignments[testCase.containerID] + cset, found := st.assignments[string(testCase.pod.UID)][container.Name] if !found { - t.Errorf("StaticPolicy AddContainer() error (%v). expected container id %v to be present in assignments %v", - testCase.description, testCase.containerID, st.assignments) + t.Errorf("StaticPolicy AddContainer() error (%v). expected container %v to be present in assignments %v", + testCase.description, container.Name, st.assignments) } if !reflect.DeepEqual(cset, testCase.expCSet) { @@ -997,10 +802,10 @@ func TestStaticPolicyAddWithResvList(t *testing.T) { } if !testCase.expCPUAlloc { - _, found := st.assignments[testCase.containerID] + _, found := st.assignments[string(testCase.pod.UID)][container.Name] if found { - t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container id %v to be present in assignments %v", - testCase.description, testCase.containerID, st.assignments) + t.Errorf("StaticPolicy AddContainer() error (%v). Did not expect container %v to be present in assignments %v", + testCase.description, container.Name, st.assignments) } } } diff --git a/pkg/kubelet/cm/cpumanager/state/BUILD b/pkg/kubelet/cm/cpumanager/state/BUILD index d670c795b42..37850de90c3 100644 --- a/pkg/kubelet/cm/cpumanager/state/BUILD +++ b/pkg/kubelet/cm/cpumanager/state/BUILD @@ -15,6 +15,7 @@ go_library( "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager/checksum:go_default_library", "//pkg/kubelet/checkpointmanager/errors:go_default_library", + "//pkg/kubelet/cm/cpumanager/containermap:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -30,6 +31,7 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/kubelet/checkpointmanager:go_default_library", + "//pkg/kubelet/cm/cpumanager/containermap:go_default_library", "//pkg/kubelet/cm/cpumanager/state/testing:go_default_library", "//pkg/kubelet/cm/cpuset:go_default_library", "//vendor/github.com/stretchr/testify/require:go_default_library", diff --git a/pkg/kubelet/cm/cpumanager/state/checkpoint.go b/pkg/kubelet/cm/cpumanager/state/checkpoint.go index 40e0fc81ad3..3683e66aedc 100644 --- a/pkg/kubelet/cm/cpumanager/state/checkpoint.go +++ b/pkg/kubelet/cm/cpumanager/state/checkpoint.go @@ -23,38 +23,87 @@ import ( "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" ) -var _ checkpointmanager.Checkpoint = &CPUManagerCheckpoint{} - // CPUManagerCheckpoint struct is used to store cpu/pod assignments in a checkpoint -type CPUManagerCheckpoint struct { +type CPUManagerCheckpoint = CPUManagerCheckpointV2 + +var _ checkpointmanager.Checkpoint = &CPUManagerCheckpointV1{} +var _ checkpointmanager.Checkpoint = &CPUManagerCheckpointV2{} + +// CPUManagerCheckpointV1 struct is used to store cpu/pod assignments in a checkpoint in v1 format +type CPUManagerCheckpointV1 struct { PolicyName string `json:"policyName"` DefaultCPUSet string `json:"defaultCpuSet"` Entries map[string]string `json:"entries,omitempty"` Checksum checksum.Checksum `json:"checksum"` } +// CPUManagerCheckpointV2 struct is used to store cpu/pod assignments in a checkpoint in v2 format +type CPUManagerCheckpointV2 struct { + PolicyName string `json:"policyName"` + DefaultCPUSet string `json:"defaultCpuSet"` + Entries map[string]map[string]string `json:"entries,omitempty"` + Checksum checksum.Checksum `json:"checksum"` +} + // NewCPUManagerCheckpoint returns an instance of Checkpoint func NewCPUManagerCheckpoint() *CPUManagerCheckpoint { - return &CPUManagerCheckpoint{ + //lint:ignore unexported-type-in-api user-facing error message + return newCPUManagerCheckpointV2() +} + +func newCPUManagerCheckpointV1() *CPUManagerCheckpointV1 { + return &CPUManagerCheckpointV1{ Entries: make(map[string]string), } } -// MarshalCheckpoint returns marshalled checkpoint -func (cp *CPUManagerCheckpoint) MarshalCheckpoint() ([]byte, error) { +func newCPUManagerCheckpointV2() *CPUManagerCheckpointV2 { + return &CPUManagerCheckpointV2{ + Entries: make(map[string]map[string]string), + } +} + +// MarshalCheckpoint returns marshalled checkpoint in v1 format +func (cp *CPUManagerCheckpointV1) MarshalCheckpoint() ([]byte, error) { // make sure checksum wasn't set before so it doesn't affect output checksum cp.Checksum = 0 cp.Checksum = checksum.New(cp) return json.Marshal(*cp) } -// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint -func (cp *CPUManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error { +// MarshalCheckpoint returns marshalled checkpoint in v2 format +func (cp *CPUManagerCheckpointV2) MarshalCheckpoint() ([]byte, error) { + // make sure checksum wasn't set before so it doesn't affect output checksum + cp.Checksum = 0 + cp.Checksum = checksum.New(cp) + return json.Marshal(*cp) +} + +// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint in v1 format +func (cp *CPUManagerCheckpointV1) UnmarshalCheckpoint(blob []byte) error { return json.Unmarshal(blob, cp) } -// VerifyChecksum verifies that current checksum of checkpoint is valid -func (cp *CPUManagerCheckpoint) VerifyChecksum() error { +// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint in v2 format +func (cp *CPUManagerCheckpointV2) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, cp) +} + +// VerifyChecksum verifies that current checksum of checkpoint is valid in v1 format +func (cp *CPUManagerCheckpointV1) VerifyChecksum() error { + if cp.Checksum == 0 { + // accept empty checksum for compatibility with old file backend + return nil + } + ck := cp.Checksum + cp.Checksum = 0 + err := ck.Verify(cp) + cp.Checksum = ck + return err +} + +// VerifyChecksum verifies that current checksum of checkpoint is valid in v2 format +func (cp *CPUManagerCheckpointV2) VerifyChecksum() error { if cp.Checksum == 0 { // accept empty checksum for compatibility with old file backend return nil diff --git a/pkg/kubelet/cm/cpumanager/state/state.go b/pkg/kubelet/cm/cpumanager/state/state.go index be32509279b..a9bd906fcb2 100644 --- a/pkg/kubelet/cm/cpumanager/state/state.go +++ b/pkg/kubelet/cm/cpumanager/state/state.go @@ -21,30 +21,33 @@ import ( ) // ContainerCPUAssignments type used in cpu manager state -type ContainerCPUAssignments map[string]cpuset.CPUSet +type ContainerCPUAssignments map[string]map[string]cpuset.CPUSet // Clone returns a copy of ContainerCPUAssignments func (as ContainerCPUAssignments) Clone() ContainerCPUAssignments { ret := make(ContainerCPUAssignments) - for key, val := range as { - ret[key] = val + for pod := range as { + ret[pod] = make(map[string]cpuset.CPUSet) + for container, cset := range as[pod] { + ret[pod][container] = cset + } } return ret } // Reader interface used to read current cpu/pod assignment state type Reader interface { - GetCPUSet(containerID string) (cpuset.CPUSet, bool) + GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) GetDefaultCPUSet() cpuset.CPUSet - GetCPUSetOrDefault(containerID string) cpuset.CPUSet + GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet GetCPUAssignments() ContainerCPUAssignments } type writer interface { - SetCPUSet(containerID string, cpuset cpuset.CPUSet) + SetCPUSet(podUID string, containerName string, cpuset cpuset.CPUSet) SetDefaultCPUSet(cpuset cpuset.CPUSet) SetCPUAssignments(ContainerCPUAssignments) - Delete(containerID string) + Delete(podUID string, containerName string) ClearState() } diff --git a/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go b/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go index b9fe0f46374..cd8bd994a69 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go +++ b/pkg/kubelet/cm/cpumanager/state/state_checkpoint.go @@ -24,6 +24,7 @@ import ( "k8s.io/klog" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) @@ -35,10 +36,11 @@ type stateCheckpoint struct { cache State checkpointManager checkpointmanager.CheckpointManager checkpointName string + initialContainers containermap.ContainerMap } // NewCheckpointState creates new State for keeping track of cpu/pod assignment with checkpoint backend -func NewCheckpointState(stateDir, checkpointName, policyName string) (State, error) { +func NewCheckpointState(stateDir, checkpointName, policyName string, initialContainers containermap.ContainerMap) (State, error) { checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) if err != nil { return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) @@ -48,6 +50,7 @@ func NewCheckpointState(stateDir, checkpointName, policyName string) (State, err policyName: policyName, checkpointManager: checkpointManager, checkpointName: checkpointName, + initialContainers: initialContainers, } if err := stateCheckpoint.restoreState(); err != nil { @@ -60,6 +63,30 @@ func NewCheckpointState(stateDir, checkpointName, policyName string) (State, err return stateCheckpoint, nil } +// migrateV1CheckpointToV2Checkpoint() converts checkpoints from the v1 format to the v2 format +func (sc *stateCheckpoint) migrateV1CheckpointToV2Checkpoint(src *CPUManagerCheckpointV1, dst *CPUManagerCheckpointV2) error { + if src.PolicyName != "" { + dst.PolicyName = src.PolicyName + } + if src.DefaultCPUSet != "" { + dst.DefaultCPUSet = src.DefaultCPUSet + } + for containerID, cset := range src.Entries { + podUID, containerName, err := sc.initialContainers.GetContainerRef(containerID) + if err != nil { + return fmt.Errorf("containerID '%v' not found in initial containers list", containerID) + } + if dst.Entries == nil { + dst.Entries = make(map[string]map[string]string) + } + if _, exists := dst.Entries[podUID]; !exists { + dst.Entries[podUID] = make(map[string]string) + } + dst.Entries[podUID][containerName] = cset + } + return nil +} + // restores state from a checkpoint and creates it if it doesn't exist func (sc *stateCheckpoint) restoreState() error { sc.mux.Lock() @@ -71,28 +98,40 @@ func (sc *stateCheckpoint) restoreState() error { tmpDefaultCPUSet := cpuset.NewCPUSet() tmpContainerCPUSet := cpuset.NewCPUSet() - checkpoint := NewCPUManagerCheckpoint() - if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { - if err == errors.ErrCheckpointNotFound { - sc.storeState() - return nil + checkpointV1 := newCPUManagerCheckpointV1() + checkpointV2 := newCPUManagerCheckpointV2() + + if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV1); err != nil { + checkpointV1 = &CPUManagerCheckpointV1{} // reset it back to 0 + if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpointV2); err != nil { + if err == errors.ErrCheckpointNotFound { + sc.storeState() + return nil + } + return err } - return err } - if sc.policyName != checkpoint.PolicyName { - return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpoint.PolicyName) + if err = sc.migrateV1CheckpointToV2Checkpoint(checkpointV1, checkpointV2); err != nil { + return fmt.Errorf("error migrating v1 checkpoint state to v2 checkpoint state: %s", err) } - if tmpDefaultCPUSet, err = cpuset.Parse(checkpoint.DefaultCPUSet); err != nil { - return fmt.Errorf("could not parse default cpu set %q: %v", checkpoint.DefaultCPUSet, err) + if sc.policyName != checkpointV2.PolicyName { + return fmt.Errorf("configured policy %q differs from state checkpoint policy %q", sc.policyName, checkpointV2.PolicyName) } - for containerID, cpuString := range checkpoint.Entries { - if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { - return fmt.Errorf("could not parse cpuset %q for container id %q: %v", cpuString, containerID, err) + if tmpDefaultCPUSet, err = cpuset.Parse(checkpointV2.DefaultCPUSet); err != nil { + return fmt.Errorf("could not parse default cpu set %q: %v", checkpointV2.DefaultCPUSet, err) + } + + for pod := range checkpointV2.Entries { + tmpAssignments[pod] = make(map[string]cpuset.CPUSet) + for container, cpuString := range checkpointV2.Entries[pod] { + if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { + return fmt.Errorf("could not parse cpuset %q for container %q in pod %q: %v", cpuString, container, pod, err) + } + tmpAssignments[pod][container] = tmpContainerCPUSet } - tmpAssignments[containerID] = tmpContainerCPUSet } sc.cache.SetDefaultCPUSet(tmpDefaultCPUSet) @@ -110,8 +149,12 @@ func (sc *stateCheckpoint) storeState() { checkpoint.PolicyName = sc.policyName checkpoint.DefaultCPUSet = sc.cache.GetDefaultCPUSet().String() - for containerID, cset := range sc.cache.GetCPUAssignments() { - checkpoint.Entries[containerID] = cset.String() + assignments := sc.cache.GetCPUAssignments() + for pod := range assignments { + checkpoint.Entries[pod] = make(map[string]string) + for container, cset := range assignments[pod] { + checkpoint.Entries[pod][container] = cset.String() + } } err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) @@ -122,11 +165,11 @@ func (sc *stateCheckpoint) storeState() { } // GetCPUSet returns current CPU set -func (sc *stateCheckpoint) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { +func (sc *stateCheckpoint) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { sc.mux.RLock() defer sc.mux.RUnlock() - res, ok := sc.cache.GetCPUSet(containerID) + res, ok := sc.cache.GetCPUSet(podUID, containerName) return res, ok } @@ -139,11 +182,11 @@ func (sc *stateCheckpoint) GetDefaultCPUSet() cpuset.CPUSet { } // GetCPUSetOrDefault returns current CPU set, or default one if it wasn't changed -func (sc *stateCheckpoint) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { +func (sc *stateCheckpoint) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet { sc.mux.RLock() defer sc.mux.RUnlock() - return sc.cache.GetCPUSetOrDefault(containerID) + return sc.cache.GetCPUSetOrDefault(podUID, containerName) } // GetCPUAssignments returns current CPU to pod assignments @@ -155,10 +198,10 @@ func (sc *stateCheckpoint) GetCPUAssignments() ContainerCPUAssignments { } // SetCPUSet sets CPU set -func (sc *stateCheckpoint) SetCPUSet(containerID string, cset cpuset.CPUSet) { +func (sc *stateCheckpoint) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) { sc.mux.Lock() defer sc.mux.Unlock() - sc.cache.SetCPUSet(containerID, cset) + sc.cache.SetCPUSet(podUID, containerName, cset) sc.storeState() } @@ -179,10 +222,10 @@ func (sc *stateCheckpoint) SetCPUAssignments(a ContainerCPUAssignments) { } // Delete deletes assignment for specified pod -func (sc *stateCheckpoint) Delete(containerID string) { +func (sc *stateCheckpoint) Delete(podUID string, containerName string) { sc.mux.Lock() defer sc.mux.Unlock() - sc.cache.Delete(containerID) + sc.cache.Delete(podUID, containerName) sc.storeState() } diff --git a/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go b/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go index 2471ecdf341..34d40c43174 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_checkpoint_test.go @@ -22,6 +22,7 @@ import ( "testing" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) @@ -35,6 +36,7 @@ func TestCheckpointStateRestore(t *testing.T) { description string checkpointContent string policyName string + initialContainers containermap.ContainerMap expectedError string expectedState *stateMemory }{ @@ -42,6 +44,7 @@ func TestCheckpointStateRestore(t *testing.T) { "Restore non-existing checkpoint", "", "none", + containermap.ContainerMap{}, "", &stateMemory{}, }, @@ -51,9 +54,10 @@ func TestCheckpointStateRestore(t *testing.T) { "policyName": "none", "defaultCPUSet": "4-6", "entries": {}, - "checksum": 2912033808 + "checksum": 2655485041 }`, "none", + containermap.ContainerMap{}, "", &stateMemory{ defaultCPUSet: cpuset.NewCPUSet(4, 5, 6), @@ -65,17 +69,22 @@ func TestCheckpointStateRestore(t *testing.T) { "policyName": "none", "defaultCPUSet": "1-3", "entries": { - "container1": "4-6", - "container2": "1-3" + "pod": { + "container1": "4-6", + "container2": "1-3" + } }, - "checksum": 1535905563 + "checksum": 3415933391 }`, "none", + containermap.ContainerMap{}, "", &stateMemory{ assignments: ContainerCPUAssignments{ - "container1": cpuset.NewCPUSet(4, 5, 6), - "container2": cpuset.NewCPUSet(1, 2, 3), + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, }, defaultCPUSet: cpuset.NewCPUSet(1, 2, 3), }, @@ -89,6 +98,7 @@ func TestCheckpointStateRestore(t *testing.T) { "checksum": 1337 }`, "none", + containermap.ContainerMap{}, "checkpoint is corrupted", &stateMemory{}, }, @@ -96,6 +106,7 @@ func TestCheckpointStateRestore(t *testing.T) { "Restore checkpoint with invalid JSON", `{`, "none", + containermap.ContainerMap{}, "unexpected end of JSON input", &stateMemory{}, }, @@ -105,9 +116,10 @@ func TestCheckpointStateRestore(t *testing.T) { "policyName": "other", "defaultCPUSet": "1-3", "entries": {}, - "checksum": 4195836012 + "checksum": 698611581 }`, "none", + containermap.ContainerMap{}, `configured policy "none" differs from state checkpoint policy "other"`, &stateMemory{}, }, @@ -117,9 +129,10 @@ func TestCheckpointStateRestore(t *testing.T) { "policyName": "none", "defaultCPUSet": "1.3", "entries": {}, - "checksum": 1025273327 + "checksum": 1966990140 }`, "none", + containermap.ContainerMap{}, `could not parse default cpu set "1.3": strconv.Atoi: parsing "1.3": invalid syntax`, &stateMemory{}, }, @@ -129,15 +142,47 @@ func TestCheckpointStateRestore(t *testing.T) { "policyName": "none", "defaultCPUSet": "1-3", "entries": { - "container1": "4-6", - "container2": "asd" + "pod": { + "container1": "4-6", + "container2": "asd" + } }, - "checksum": 2764213924 + "checksum": 3082925826 }`, "none", - `could not parse cpuset "asd" for container id "container2": strconv.Atoi: parsing "asd": invalid syntax`, + containermap.ContainerMap{}, + `could not parse cpuset "asd" for container "container2" in pod "pod": strconv.Atoi: parsing "asd": invalid syntax`, &stateMemory{}, }, + { + "Restore checkpoint with migration", + `{ + "policyName": "none", + "defaultCPUSet": "1-3", + "entries": { + "containerID1": "4-6", + "containerID2": "1-3" + }, + "checksum": 2832947348 + }`, + "none", + func() containermap.ContainerMap { + cm := containermap.NewContainerMap() + cm.Add("pod", "container1", "containerID1") + cm.Add("pod", "container2", "containerID2") + return cm + }(), + "", + &stateMemory{ + assignments: ContainerCPUAssignments{ + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + }, + defaultCPUSet: cpuset.NewCPUSet(1, 2, 3), + }, + }, } // create checkpoint manager for testing @@ -159,7 +204,7 @@ func TestCheckpointStateRestore(t *testing.T) { } } - restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, tc.policyName) + restoredState, err := NewCheckpointState(testingDir, testingCheckpoint, tc.policyName, tc.initialContainers) if err != nil { if strings.TrimSpace(tc.expectedError) != "" { tc.expectedError = "could not restore state from checkpoint: " + tc.expectedError @@ -189,8 +234,10 @@ func TestCheckpointStateStore(t *testing.T) { { "Store assignments", &stateMemory{ - assignments: map[string]cpuset.CPUSet{ - "container1": cpuset.NewCPUSet(1, 5, 8), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "container1": cpuset.NewCPUSet(1, 5, 8), + }, }, }, }, @@ -206,7 +253,7 @@ func TestCheckpointStateStore(t *testing.T) { // ensure there is no previous checkpoint cpm.RemoveCheckpoint(testingCheckpoint) - cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "none") + cs1, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil) if err != nil { t.Fatalf("could not create testing checkpointState instance: %v", err) } @@ -216,7 +263,7 @@ func TestCheckpointStateStore(t *testing.T) { cs1.SetCPUAssignments(tc.expectedState.assignments) // restore checkpoint with previously stored values - cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "none") + cs2, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil) if err != nil { t.Fatalf("could not create testing checkpointState instance: %v", err) } @@ -230,28 +277,34 @@ func TestCheckpointStateHelpers(t *testing.T) { testCases := []struct { description string defaultCPUset cpuset.CPUSet - containers map[string]cpuset.CPUSet + assignments map[string]map[string]cpuset.CPUSet }{ { description: "One container", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 1), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 1), + }, }, }, { description: "Two containers", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 1), - "c2": cpuset.NewCPUSet(2, 3, 4, 5), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 1), + "c2": cpuset.NewCPUSet(2, 3, 4, 5), + }, }, }, { description: "Container without assigned cpus", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(), + }, }, }, } @@ -266,21 +319,23 @@ func TestCheckpointStateHelpers(t *testing.T) { // ensure there is no previous checkpoint cpm.RemoveCheckpoint(testingCheckpoint) - state, err := NewCheckpointState(testingDir, testingCheckpoint, "none") + state, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil) if err != nil { t.Fatalf("could not create testing checkpointState instance: %v", err) } state.SetDefaultCPUSet(tc.defaultCPUset) - for container, set := range tc.containers { - state.SetCPUSet(container, set) - if cpus, _ := state.GetCPUSet(container); !cpus.Equals(set) { - t.Fatalf("state inconsistent, got %q instead of %q", set, cpus) - } + for pod := range tc.assignments { + for container, set := range tc.assignments[pod] { + state.SetCPUSet(pod, container, set) + if cpus, _ := state.GetCPUSet(pod, container); !cpus.Equals(set) { + t.Fatalf("state inconsistent, got %q instead of %q", set, cpus) + } - state.Delete(container) - if _, ok := state.GetCPUSet(container); ok { - t.Fatal("deleted container still existing in state") + state.Delete(pod, container) + if _, ok := state.GetCPUSet(pod, container); ok { + t.Fatal("deleted container still existing in state") + } } } }) @@ -291,34 +346,38 @@ func TestCheckpointStateClear(t *testing.T) { testCases := []struct { description string defaultCPUset cpuset.CPUSet - containers map[string]cpuset.CPUSet + assignments map[string]map[string]cpuset.CPUSet }{ { "Valid state", cpuset.NewCPUSet(1, 5, 10), - map[string]cpuset.CPUSet{ - "container1": cpuset.NewCPUSet(1, 4), + map[string]map[string]cpuset.CPUSet{ + "pod": { + "container1": cpuset.NewCPUSet(1, 4), + }, }, }, } for _, tc := range testCases { t.Run(tc.description, func(t *testing.T) { - state, err := NewCheckpointState(testingDir, testingCheckpoint, "none") + state, err := NewCheckpointState(testingDir, testingCheckpoint, "none", nil) if err != nil { t.Fatalf("could not create testing checkpointState instance: %v", err) } state.SetDefaultCPUSet(tc.defaultCPUset) - state.SetCPUAssignments(tc.containers) + state.SetCPUAssignments(tc.assignments) state.ClearState() if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) { t.Fatal("cleared state with non-empty default cpu set") } - for container := range tc.containers { - if _, ok := state.GetCPUSet(container); ok { - t.Fatalf("container %q with non-default cpu set in cleared state", container) + for pod := range tc.assignments { + for container := range tc.assignments[pod] { + if _, ok := state.GetCPUSet(pod, container); ok { + t.Fatalf("container %q in pod %q with non-default cpu set in cleared state", container, pod) + } } } }) diff --git a/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go b/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go index e15406f0914..a02bf82e4a5 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_compatibility_test.go @@ -30,8 +30,10 @@ const compatibilityTestingCheckpoint = "cpumanager_state_compatibility_test" var state = &stateMemory{ assignments: ContainerCPUAssignments{ - "container1": cpuset.NewCPUSet(4, 5, 6), - "container2": cpuset.NewCPUSet(1, 2, 3), + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, }, defaultCPUSet: cpuset.NewCPUSet(1, 2, 3), } @@ -44,12 +46,12 @@ func TestFileToCheckpointCompatibility(t *testing.T) { // ensure testing state is removed after testing defer os.Remove(statePath) - fileState := NewFileState(statePath, "none") + fileState := NewFileState(statePath, "none", nil) fileState.SetDefaultCPUSet(state.defaultCPUSet) fileState.SetCPUAssignments(state.assignments) - restoredState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none") + restoredState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none", nil) if err != nil { t.Fatalf("could not restore file state: %v", err) } @@ -68,13 +70,13 @@ func TestCheckpointToFileCompatibility(t *testing.T) { // ensure testing checkpoint is removed after testing defer cpm.RemoveCheckpoint(compatibilityTestingCheckpoint) - checkpointState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none") + checkpointState, err := NewCheckpointState(testingDir, compatibilityTestingCheckpoint, "none", nil) require.NoError(t, err) checkpointState.SetDefaultCPUSet(state.defaultCPUSet) checkpointState.SetCPUAssignments(state.assignments) - restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none") + restoredState := NewFileState(path.Join(testingDir, compatibilityTestingCheckpoint), "none", nil) AssertStateEqual(t, restoredState, state) } diff --git a/pkg/kubelet/cm/cpumanager/state/state_file.go b/pkg/kubelet/cm/cpumanager/state/state_file.go index 603467c1c03..f3b8e70470d 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file.go @@ -24,30 +24,39 @@ import ( "sync" "k8s.io/klog" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) -type stateFileData struct { +type stateFileDataV1 struct { PolicyName string `json:"policyName"` DefaultCPUSet string `json:"defaultCpuSet"` Entries map[string]string `json:"entries,omitempty"` } +type stateFileDataV2 struct { + PolicyName string `json:"policyName"` + DefaultCPUSet string `json:"defaultCpuSet"` + Entries map[string]map[string]string `json:"entries,omitempty"` +} + var _ State = &stateFile{} type stateFile struct { sync.RWMutex - stateFilePath string - policyName string - cache State + stateFilePath string + policyName string + cache State + initialContainers containermap.ContainerMap } // NewFileState creates new State for keeping track of cpu/pod assignment with file backend -func NewFileState(filePath string, policyName string) State { +func NewFileState(filePath string, policyName string, initialContainers containermap.ContainerMap) State { stateFile := &stateFile{ - stateFilePath: filePath, - cache: NewMemoryState(), - policyName: policyName, + stateFilePath: filePath, + cache: NewMemoryState(), + policyName: policyName, + initialContainers: initialContainers, } if err := stateFile.tryRestoreState(); err != nil { @@ -61,6 +70,30 @@ func NewFileState(filePath string, policyName string) State { return stateFile } +// migrateV1StateToV2State() converts state from the v1 format to the v2 format +func (sf *stateFile) migrateV1StateToV2State(src *stateFileDataV1, dst *stateFileDataV2) error { + if src.PolicyName != "" { + dst.PolicyName = src.PolicyName + } + if src.DefaultCPUSet != "" { + dst.DefaultCPUSet = src.DefaultCPUSet + } + for containerID, cset := range src.Entries { + podUID, containerName, err := sf.initialContainers.GetContainerRef(containerID) + if err != nil { + return fmt.Errorf("containerID '%v' not found in initial containers list", containerID) + } + if dst.Entries == nil { + dst.Entries = make(map[string]map[string]string) + } + if _, exists := dst.Entries[podUID]; !exists { + dst.Entries[podUID] = make(map[string]string) + } + dst.Entries[podUID][containerName] = cset + } + return nil +} + // tryRestoreState tries to read state file, upon any error, // err message is logged and state is left clean. un-initialized func (sf *stateFile) tryRestoreState() error { @@ -90,28 +123,40 @@ func (sf *stateFile) tryRestoreState() error { } // File exists; try to read it. - var readState stateFileData + var readStateV1 stateFileDataV1 + var readStateV2 stateFileDataV2 - if err = json.Unmarshal(content, &readState); err != nil { - klog.Errorf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath) - return err - } - - if sf.policyName != readState.PolicyName { - return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readState.PolicyName) - } - - if tmpDefaultCPUSet, err = cpuset.Parse(readState.DefaultCPUSet); err != nil { - klog.Errorf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readState.DefaultCPUSet) - return err - } - - for containerID, cpuString := range readState.Entries { - if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { - klog.Errorf("[cpumanager] state file: could not parse state file - container id: %s, cpuset: \"%s\"", containerID, cpuString) + if err = json.Unmarshal(content, &readStateV1); err != nil { + readStateV1 = stateFileDataV1{} // reset it back to 0 + if err = json.Unmarshal(content, &readStateV2); err != nil { + klog.Errorf("[cpumanager] state file: could not unmarshal, corrupted state file - \"%s\"", sf.stateFilePath) return err } - tmpAssignments[containerID] = tmpContainerCPUSet + } + + if err = sf.migrateV1StateToV2State(&readStateV1, &readStateV2); err != nil { + klog.Errorf("[cpumanager] state file: could not migrate v1 state to v2 state - \"%s\"", sf.stateFilePath) + return err + } + + if sf.policyName != readStateV2.PolicyName { + return fmt.Errorf("policy configured \"%s\" != policy from state file \"%s\"", sf.policyName, readStateV2.PolicyName) + } + + if tmpDefaultCPUSet, err = cpuset.Parse(readStateV2.DefaultCPUSet); err != nil { + klog.Errorf("[cpumanager] state file: could not parse state file - [defaultCpuSet:\"%s\"]", readStateV2.DefaultCPUSet) + return err + } + + for pod := range readStateV2.Entries { + tmpAssignments[pod] = make(map[string]cpuset.CPUSet) + for container, cpuString := range readStateV2.Entries[pod] { + if tmpContainerCPUSet, err = cpuset.Parse(cpuString); err != nil { + klog.Errorf("[cpumanager] state file: could not parse state file - pod: %s, container: %s, cpuset: \"%s\"", pod, container, cpuString) + return err + } + tmpAssignments[pod][container] = tmpContainerCPUSet + } } sf.cache.SetDefaultCPUSet(tmpDefaultCPUSet) @@ -128,14 +173,18 @@ func (sf *stateFile) storeState() { var content []byte var err error - data := stateFileData{ + data := stateFileDataV2{ PolicyName: sf.policyName, DefaultCPUSet: sf.cache.GetDefaultCPUSet().String(), - Entries: map[string]string{}, + Entries: map[string]map[string]string{}, } - for containerID, cset := range sf.cache.GetCPUAssignments() { - data.Entries[containerID] = cset.String() + assignments := sf.cache.GetCPUAssignments() + for pod := range assignments { + data.Entries[pod] = map[string]string{} + for container, cset := range assignments[pod] { + data.Entries[pod][container] = cset.String() + } } if content, err = json.Marshal(data); err != nil { @@ -147,11 +196,11 @@ func (sf *stateFile) storeState() { } } -func (sf *stateFile) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { +func (sf *stateFile) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { sf.RLock() defer sf.RUnlock() - res, ok := sf.cache.GetCPUSet(containerID) + res, ok := sf.cache.GetCPUSet(podUID, containerName) return res, ok } @@ -162,11 +211,11 @@ func (sf *stateFile) GetDefaultCPUSet() cpuset.CPUSet { return sf.cache.GetDefaultCPUSet() } -func (sf *stateFile) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { +func (sf *stateFile) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet { sf.RLock() defer sf.RUnlock() - return sf.cache.GetCPUSetOrDefault(containerID) + return sf.cache.GetCPUSetOrDefault(podUID, containerName) } func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments { @@ -175,10 +224,10 @@ func (sf *stateFile) GetCPUAssignments() ContainerCPUAssignments { return sf.cache.GetCPUAssignments() } -func (sf *stateFile) SetCPUSet(containerID string, cset cpuset.CPUSet) { +func (sf *stateFile) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) { sf.Lock() defer sf.Unlock() - sf.cache.SetCPUSet(containerID, cset) + sf.cache.SetCPUSet(podUID, containerName, cset) sf.storeState() } @@ -196,10 +245,10 @@ func (sf *stateFile) SetCPUAssignments(a ContainerCPUAssignments) { sf.storeState() } -func (sf *stateFile) Delete(containerID string) { +func (sf *stateFile) Delete(podUID string, containerName string) { sf.Lock() defer sf.Unlock() - sf.cache.Delete(containerID) + sf.cache.Delete(podUID, containerName) sf.storeState() } diff --git a/pkg/kubelet/cm/cpumanager/state/state_file_test.go b/pkg/kubelet/cm/cpumanager/state/state_file_test.go index fb83eac5966..a928e9529a9 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_file_test.go +++ b/pkg/kubelet/cm/cpumanager/state/state_file_test.go @@ -27,6 +27,7 @@ import ( "strings" "testing" + "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/containermap" "k8s.io/kubernetes/pkg/kubelet/cm/cpuset" ) @@ -70,17 +71,19 @@ func stderrCapture(t *testing.T, f func() State) (bytes.Buffer, State) { func TestFileStateTryRestore(t *testing.T) { testCases := []struct { - description string - stateFileContent string - policyName string - expErr string - expPanic bool - expectedState *stateMemory + description string + stateFileContent string + policyName string + initialContainers containermap.ContainerMap + expErr string + expPanic bool + expectedState *stateMemory }{ { "Invalid JSON - one byte file", "\n", "none", + containermap.ContainerMap{}, "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", true, &stateMemory{}, @@ -89,6 +92,7 @@ func TestFileStateTryRestore(t *testing.T) { "Invalid JSON - invalid content", "{", "none", + containermap.ContainerMap{}, "[cpumanager] state file: unable to restore state from disk (unexpected end of JSON input)", true, &stateMemory{}, @@ -97,6 +101,7 @@ func TestFileStateTryRestore(t *testing.T) { "Try restore defaultCPUSet only", `{"policyName": "none", "defaultCpuSet": "4-6"}`, "none", + containermap.ContainerMap{}, "", false, &stateMemory{ @@ -108,6 +113,7 @@ func TestFileStateTryRestore(t *testing.T) { "Try restore defaultCPUSet only - invalid name", `{"policyName": "none", "defaultCpuSet" "4-6"}`, "none", + containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (invalid character '"' after object key)`, true, &stateMemory{}, @@ -117,17 +123,22 @@ func TestFileStateTryRestore(t *testing.T) { `{ "policyName": "none", "entries": { - "container1": "4-6", - "container2": "1-3" + "pod": { + "container1": "4-6", + "container2": "1-3" + } } }`, "none", + containermap.ContainerMap{}, "", false, &stateMemory{ assignments: ContainerCPUAssignments{ - "container1": cpuset.NewCPUSet(4, 5, 6), - "container2": cpuset.NewCPUSet(1, 2, 3), + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, }, defaultCPUSet: cpuset.NewCPUSet(), }, @@ -140,6 +151,7 @@ func TestFileStateTryRestore(t *testing.T) { "entries": {} }`, "B", + containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (policy configured "B" != policy from state file "A")`, true, &stateMemory{}, @@ -148,6 +160,7 @@ func TestFileStateTryRestore(t *testing.T) { "Try restore invalid assignments", `{"entries": }`, "none", + containermap.ContainerMap{}, "[cpumanager] state file: unable to restore state from disk (invalid character '}' looking for beginning of value)", true, &stateMemory{}, @@ -158,17 +171,22 @@ func TestFileStateTryRestore(t *testing.T) { "policyName": "none", "defaultCpuSet": "23-24", "entries": { - "container1": "4-6", - "container2": "1-3" + "pod": { + "container1": "4-6", + "container2": "1-3" + } } }`, "none", + containermap.ContainerMap{}, "", false, &stateMemory{ assignments: ContainerCPUAssignments{ - "container1": cpuset.NewCPUSet(4, 5, 6), - "container2": cpuset.NewCPUSet(1, 2, 3), + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, }, defaultCPUSet: cpuset.NewCPUSet(23, 24), }, @@ -180,6 +198,7 @@ func TestFileStateTryRestore(t *testing.T) { "defaultCpuSet": "2-sd" }`, "none", + containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "sd": invalid syntax)`, true, &stateMemory{}, @@ -190,11 +209,14 @@ func TestFileStateTryRestore(t *testing.T) { "policyName": "none", "defaultCpuSet": "23-24", "entries": { - "container1": "p-6", - "container2": "1-3" + "pod": { + "container1": "p-6", + "container2": "1-3" + } } }`, "none", + containermap.ContainerMap{}, `[cpumanager] state file: unable to restore state from disk (strconv.Atoi: parsing "p": invalid syntax)`, true, &stateMemory{}, @@ -203,6 +225,7 @@ func TestFileStateTryRestore(t *testing.T) { "tryRestoreState creates empty state file", "", "none", + containermap.ContainerMap{}, "", false, &stateMemory{ @@ -210,6 +233,35 @@ func TestFileStateTryRestore(t *testing.T) { defaultCPUSet: cpuset.NewCPUSet(), }, }, + { + "Try restore with migration", + `{ + "policyName": "none", + "defaultCpuSet": "23-24", + "entries": { + "containerID1": "4-6", + "containerID2": "1-3" + } + }`, + "none", + func() containermap.ContainerMap { + cm := containermap.NewContainerMap() + cm.Add("pod", "container1", "containerID1") + cm.Add("pod", "container2", "containerID2") + return cm + }(), + "", + false, + &stateMemory{ + assignments: ContainerCPUAssignments{ + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, + }, + defaultCPUSet: cpuset.NewCPUSet(23, 24), + }, + }, } for idx, tc := range testCases { @@ -239,7 +291,7 @@ func TestFileStateTryRestore(t *testing.T) { defer os.Remove(sfilePath.Name()) logData, fileState := stderrCapture(t, func() State { - return NewFileState(sfilePath.Name(), tc.policyName) + return NewFileState(sfilePath.Name(), tc.policyName, tc.initialContainers) }) if tc.expErr != "" { @@ -284,7 +336,7 @@ func TestFileStateTryRestorePanic(t *testing.T) { } } }() - NewFileState(sfilePath, "static") + NewFileState(sfilePath, "static", nil) }) } @@ -315,8 +367,10 @@ func TestUpdateStateFile(t *testing.T) { "", &stateMemory{ assignments: ContainerCPUAssignments{ - "container1": cpuset.NewCPUSet(4, 5, 6), - "container2": cpuset.NewCPUSet(1, 2, 3), + "pod": map[string]cpuset.CPUSet{ + "container1": cpuset.NewCPUSet(4, 5, 6), + "container2": cpuset.NewCPUSet(1, 2, 3), + }, }, defaultCPUSet: cpuset.NewCPUSet(), }, @@ -363,7 +417,7 @@ func TestUpdateStateFile(t *testing.T) { return } } - newFileState := NewFileState(sfilePath.Name(), "static") + newFileState := NewFileState(sfilePath.Name(), "static", nil) AssertStateEqual(t, newFileState, tc.expectedState) }) } @@ -373,35 +427,43 @@ func TestHelpersStateFile(t *testing.T) { testCases := []struct { description string defaultCPUset cpuset.CPUSet - containers map[string]cpuset.CPUSet + assignments map[string]map[string]cpuset.CPUSet }{ { description: "one container", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 1), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 1), + }, }, }, { description: "two containers", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 1), - "c2": cpuset.NewCPUSet(2, 3, 4, 5), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 1), + "c2": cpuset.NewCPUSet(2, 3, 4, 5), + }, }, }, { description: "container with more cpus than is possible", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 10), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 10), + }, }, }, { description: "container without assigned cpus", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(), + }, }, }, } @@ -414,19 +476,21 @@ func TestHelpersStateFile(t *testing.T) { t.Errorf("cannot create temporary test file: %q", err.Error()) } - state := NewFileState(sfFile.Name(), "static") + state := NewFileState(sfFile.Name(), "static", nil) state.SetDefaultCPUSet(tc.defaultCPUset) - for containerName, containerCPUs := range tc.containers { - state.SetCPUSet(containerName, containerCPUs) - if cpus, _ := state.GetCPUSet(containerName); !cpus.Equals(containerCPUs) { - t.Errorf("state is inconsistent. Wants = %q Have = %q", containerCPUs, cpus) - } - state.Delete(containerName) - if cpus := state.GetCPUSetOrDefault(containerName); !cpus.Equals(tc.defaultCPUset) { - t.Error("deleted container still existing in state") - } + for podUID := range tc.assignments { + for containerName, containerCPUs := range tc.assignments[podUID] { + state.SetCPUSet(podUID, containerName, containerCPUs) + if cpus, _ := state.GetCPUSet(podUID, containerName); !cpus.Equals(containerCPUs) { + t.Errorf("state is inconsistent. Wants = %q Have = %q", containerCPUs, cpus) + } + state.Delete(podUID, containerName) + if cpus := state.GetCPUSetOrDefault(podUID, containerName); !cpus.Equals(tc.defaultCPUset) { + t.Error("deleted container still existing in state") + } + } } }) @@ -437,15 +501,17 @@ func TestClearStateStateFile(t *testing.T) { testCases := []struct { description string defaultCPUset cpuset.CPUSet - containers map[string]cpuset.CPUSet + assignments map[string]map[string]cpuset.CPUSet }{ { description: "valid file", defaultCPUset: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8), - containers: map[string]cpuset.CPUSet{ - "c1": cpuset.NewCPUSet(0, 1), - "c2": cpuset.NewCPUSet(2, 3), - "c3": cpuset.NewCPUSet(4, 5), + assignments: map[string]map[string]cpuset.CPUSet{ + "pod": { + "c1": cpuset.NewCPUSet(0, 1), + "c2": cpuset.NewCPUSet(2, 3), + "c3": cpuset.NewCPUSet(4, 5), + }, }, }, } @@ -457,19 +523,23 @@ func TestClearStateStateFile(t *testing.T) { t.Errorf("cannot create temporary test file: %q", err.Error()) } - state := NewFileState(sfFile.Name(), "static") + state := NewFileState(sfFile.Name(), "static", nil) state.SetDefaultCPUSet(testCase.defaultCPUset) - for containerName, containerCPUs := range testCase.containers { - state.SetCPUSet(containerName, containerCPUs) + for podUID := range testCase.assignments { + for containerName, containerCPUs := range testCase.assignments[podUID] { + state.SetCPUSet(podUID, containerName, containerCPUs) + } } state.ClearState() if !cpuset.NewCPUSet().Equals(state.GetDefaultCPUSet()) { t.Error("cleared state shouldn't has got information about available cpuset") } - for containerName := range testCase.containers { - if !cpuset.NewCPUSet().Equals(state.GetCPUSetOrDefault(containerName)) { - t.Error("cleared state shouldn't has got information about containers") + for podUID := range testCase.assignments { + for containerName := range testCase.assignments[podUID] { + if !cpuset.NewCPUSet().Equals(state.GetCPUSetOrDefault(podUID, containerName)) { + t.Error("cleared state shouldn't has got information about containers") + } } } }) diff --git a/pkg/kubelet/cm/cpumanager/state/state_mem.go b/pkg/kubelet/cm/cpumanager/state/state_mem.go index 77c5f4a525c..25ab6889bc8 100644 --- a/pkg/kubelet/cm/cpumanager/state/state_mem.go +++ b/pkg/kubelet/cm/cpumanager/state/state_mem.go @@ -40,11 +40,11 @@ func NewMemoryState() State { } } -func (s *stateMemory) GetCPUSet(containerID string) (cpuset.CPUSet, bool) { +func (s *stateMemory) GetCPUSet(podUID string, containerName string) (cpuset.CPUSet, bool) { s.RLock() defer s.RUnlock() - res, ok := s.assignments[containerID] + res, ok := s.assignments[podUID][containerName] return res.Clone(), ok } @@ -55,8 +55,8 @@ func (s *stateMemory) GetDefaultCPUSet() cpuset.CPUSet { return s.defaultCPUSet.Clone() } -func (s *stateMemory) GetCPUSetOrDefault(containerID string) cpuset.CPUSet { - if res, ok := s.GetCPUSet(containerID); ok { +func (s *stateMemory) GetCPUSetOrDefault(podUID string, containerName string) cpuset.CPUSet { + if res, ok := s.GetCPUSet(podUID, containerName); ok { return res } return s.GetDefaultCPUSet() @@ -68,12 +68,16 @@ func (s *stateMemory) GetCPUAssignments() ContainerCPUAssignments { return s.assignments.Clone() } -func (s *stateMemory) SetCPUSet(containerID string, cset cpuset.CPUSet) { +func (s *stateMemory) SetCPUSet(podUID string, containerName string, cset cpuset.CPUSet) { s.Lock() defer s.Unlock() - s.assignments[containerID] = cset - klog.Infof("[cpumanager] updated desired cpuset (container id: %s, cpuset: \"%s\")", containerID, cset) + if _, ok := s.assignments[podUID]; !ok { + s.assignments[podUID] = make(map[string]cpuset.CPUSet) + } + + s.assignments[podUID][containerName] = cset + klog.Infof("[cpumanager] updated desired cpuset (pod: %s, container: %s, cpuset: \"%s\")", podUID, containerName, cset) } func (s *stateMemory) SetDefaultCPUSet(cset cpuset.CPUSet) { @@ -92,12 +96,15 @@ func (s *stateMemory) SetCPUAssignments(a ContainerCPUAssignments) { klog.Infof("[cpumanager] updated cpuset assignments: \"%v\"", a) } -func (s *stateMemory) Delete(containerID string) { +func (s *stateMemory) Delete(podUID string, containerName string) { s.Lock() defer s.Unlock() - delete(s.assignments, containerID) - klog.V(2).Infof("[cpumanager] deleted cpuset assignment (container id: %s)", containerID) + delete(s.assignments[podUID], containerName) + if len(s.assignments[podUID]) == 0 { + delete(s.assignments, podUID) + } + klog.V(2).Infof("[cpumanager] deleted cpuset assignment (pod: %s, container: %s)", podUID, containerName) } func (s *stateMemory) ClearState() { diff --git a/pkg/kubelet/cm/cpumanager/topology_hints_test.go b/pkg/kubelet/cm/cpumanager/topology_hints_test.go index 0b70150da06..62d91e9cf98 100644 --- a/pkg/kubelet/cm/cpumanager/topology_hints_test.go +++ b/pkg/kubelet/cm/cpumanager/topology_hints_test.go @@ -31,13 +31,13 @@ import ( ) func TestGetTopologyHints(t *testing.T) { - testPod1 := makePod("2", "2") + testPod1 := makePod("fakePod", "fakeContainer", "2", "2") testContainer1 := &testPod1.Spec.Containers[0] - testPod2 := makePod("5", "5") + testPod2 := makePod("fakePod", "fakeContainer", "5", "5") testContainer2 := &testPod2.Spec.Containers[0] - testPod3 := makePod("7", "7") + testPod3 := makePod("fakePod", "fakeContainer", "7", "7") testContainer3 := &testPod3.Spec.Containers[0] - testPod4 := makePod("11", "11") + testPod4 := makePod("fakePod", "fakeContainer", "11", "11") testContainer4 := &testPod4.Spec.Containers[0] firstSocketMask, _ := bitmask.NewBitMask(0) @@ -156,7 +156,9 @@ func TestGetTopologyHints(t *testing.T) { pod: *testPod1, container: *testContainer1, assignments: state.ContainerCPUAssignments{ - "": cpuset.NewCPUSet(0, 6), + string(testPod1.UID): map[string]cpuset.CPUSet{ + testContainer1.Name: cpuset.NewCPUSet(0, 6), + }, }, defaultCPUSet: cpuset.NewCPUSet(), expectedHints: []topologymanager.TopologyHint{ @@ -175,7 +177,9 @@ func TestGetTopologyHints(t *testing.T) { pod: *testPod1, container: *testContainer1, assignments: state.ContainerCPUAssignments{ - "": cpuset.NewCPUSet(3, 9), + string(testPod1.UID): map[string]cpuset.CPUSet{ + testContainer1.Name: cpuset.NewCPUSet(3, 9), + }, }, defaultCPUSet: cpuset.NewCPUSet(), expectedHints: []topologymanager.TopologyHint{ @@ -194,7 +198,9 @@ func TestGetTopologyHints(t *testing.T) { pod: *testPod4, container: *testContainer4, assignments: state.ContainerCPUAssignments{ - "": cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + string(testPod4.UID): map[string]cpuset.CPUSet{ + testContainer4.Name: cpuset.NewCPUSet(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10), + }, }, defaultCPUSet: cpuset.NewCPUSet(), expectedHints: []topologymanager.TopologyHint{ @@ -209,7 +215,9 @@ func TestGetTopologyHints(t *testing.T) { pod: *testPod1, container: *testContainer1, assignments: state.ContainerCPUAssignments{ - "": cpuset.NewCPUSet(0, 6, 3, 9), + string(testPod1.UID): map[string]cpuset.CPUSet{ + testContainer1.Name: cpuset.NewCPUSet(0, 6, 3, 9), + }, }, defaultCPUSet: cpuset.NewCPUSet(), expectedHints: []topologymanager.TopologyHint{}, @@ -219,7 +227,9 @@ func TestGetTopologyHints(t *testing.T) { pod: *testPod4, container: *testContainer4, assignments: state.ContainerCPUAssignments{ - "": cpuset.NewCPUSet(0, 6, 3, 9), + string(testPod4.UID): map[string]cpuset.CPUSet{ + testContainer4.Name: cpuset.NewCPUSet(0, 6, 3, 9), + }, }, defaultCPUSet: cpuset.NewCPUSet(), expectedHints: []topologymanager.TopologyHint{}, diff --git a/test/e2e_node/cpu_manager_test.go b/test/e2e_node/cpu_manager_test.go index 203d983d274..e5dcd64782f 100644 --- a/test/e2e_node/cpu_manager_test.go +++ b/test/e2e_node/cpu_manager_test.go @@ -124,7 +124,7 @@ func waitForContainerRemoval(containerName, podName, podNS string) { func waitForStateFileCleanedUp() { gomega.Eventually(func() bool { - restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static") + restoredState, err := cpumanagerstate.NewCheckpointState("/var/lib/kubelet", "cpu_manager_state", "static", nil) framework.ExpectNoError(err, "failed to create testing cpumanager state instance") assignments := restoredState.GetCPUAssignments() if len(assignments) == 0 {