diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index aea7ba73dd7..097cd486873 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -200,6 +200,8 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe if m.policy.Name() == string(PolicyNone) { return nil } + // Periodically call m.reconcileState() to continue to keep the CPU sets of + // all pods in sync with and guaranteed CPUs handed out among them. go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop) return nil } @@ -217,19 +219,24 @@ func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) e } } } + + // Call down into the policy to assign this container CPUs if required. err := m.policyAddContainer(p, c, containerID) if err != nil { klog.Errorf("[cpumanager] AddContainer error: %v", err) m.Unlock() return err } + + // Get the CPUs just assigned to the container (or fall back to the default + // CPUSet if none were assigned). cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name) m.Unlock() if !cpus.IsEmpty() { err = m.updateContainerCPUSet(containerID, cpus) if err != nil { - klog.Errorf("[cpumanager] AddContainer error: %v", err) + klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err) m.Lock() err := m.policyRemoveContainerByID(containerID) if err != nil { @@ -353,43 +360,59 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec m.removeStaleState() for _, pod := range m.activePods() { + pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID) + if !ok { + klog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)", pod.Name) + failure = append(failure, reconciledContainer{pod.Name, "", ""}) + continue + } + allContainers := pod.Spec.InitContainers allContainers = append(allContainers, pod.Spec.Containers...) - status, ok := m.podStatusProvider.GetPodStatus(pod.UID) for _, container := range allContainers { - if !ok { - klog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)", pod.Name) - failure = append(failure, reconciledContainer{pod.Name, container.Name, ""}) - break - } - - containerID, err := findContainerIDByName(&status, container.Name) + containerID, err := findContainerIDByName(&pstatus, container.Name) if err != nil { - klog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err) + klog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err) failure = append(failure, reconciledContainer{pod.Name, container.Name, ""}) continue } - // Check whether container is present in state, there may be 3 reasons why it's not present: - // - policy does not want to track the container - // - kubelet has just been restarted - and there is no previous state file - // - container has been removed from state by RemoveContainer call (DeletionTimestamp is set) - if _, ok := m.state.GetCPUSet(string(pod.UID), container.Name); !ok { - if status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil { - klog.V(4).Infof("[cpumanager] reconcileState: container is not present in state - trying to add (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID) - err := m.AddContainer(pod, &container, containerID) - if err != nil { - klog.Errorf("[cpumanager] reconcileState: failed to add container (pod: %s, container: %s, container id: %s, error: %v)", pod.Name, container.Name, containerID, err) - failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID}) - continue - } - } else { - // if DeletionTimestamp is set, pod has already been removed from state - // skip the pod/container since it's not running and will be deleted soon - continue - } + cstatus, err := findContainerStatusByName(&pstatus, container.Name) + if err != nil { + klog.Warningf("[cpumanager] reconcileState: skipping container; container status not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err) + failure = append(failure, reconciledContainer{pod.Name, container.Name, ""}) + continue } + if cstatus.State.Waiting != nil || + (cstatus.State.Waiting == nil && cstatus.State.Running == nil && cstatus.State.Terminated == nil) { + klog.Warningf("[cpumanager] reconcileState: skipping container; container still in the waiting state (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err) + failure = append(failure, reconciledContainer{pod.Name, container.Name, ""}) + continue + } + + if cstatus.State.Terminated != nil { + // Since the container is terminated, we know it is safe to + // remove it without any reconciliation. Removing the container + // will also remove it from the `containerMap` so that this + // container will be skipped next time around the loop. + _, _, err := m.containerMap.GetContainerRef(containerID) + if err == nil { + klog.Warningf("[cpumanager] reconcileState: skipping container; already terminated (pod: %s, container id: %s)", pod.Name, containerID) + err := m.RemoveContainer(containerID) + if err != nil { + klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err) + failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID}) + } + } + continue + } + + // Once we make it here we know we have a running container. + // Idempotently add it to the containerMap incase it is missing. + // This can happen after a kubelet restart, for example. + m.containerMap.Add(string(pod.UID), container.Name, containerID) + cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name) if cset.IsEmpty() { // NOTE: This should not happen outside of tests. @@ -427,6 +450,15 @@ func findContainerIDByName(status *v1.PodStatus, name string) (string, error) { return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name) } +func findContainerStatusByName(status *v1.PodStatus, name string) (*v1.ContainerStatus, error) { + for _, status := range append(status.InitContainerStatuses, status.ContainerStatuses...) { + if status.Name == name { + return &status, nil + } + } + return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name) +} + func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error { // TODO: Consider adding a `ResourceConfigForContainer` helper in // helpers_linux.go similar to what exists for pods. diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go index 84a094a17f3..2b21ace5463 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager_test.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager_test.go @@ -702,6 +702,9 @@ func TestReconcileState(t *testing.T) { { Name: "fakeContainerName", ContainerID: "docker://fakeContainerID", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, }, }, }, @@ -738,6 +741,9 @@ func TestReconcileState(t *testing.T) { { Name: "fakeContainerName", ContainerID: "docker://fakeContainerID", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, }, }, }, @@ -753,7 +759,7 @@ func TestReconcileState(t *testing.T) { expectFailedContainerName: "", }, { - description: "cpu manager reconclie - pod status not found", + description: "cpu manager reconcile - pod status not found", activePods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -775,10 +781,10 @@ func TestReconcileState(t *testing.T) { stDefaultCPUSet: cpuset.NewCPUSet(), updateErr: nil, expectSucceededContainerName: "", - expectFailedContainerName: "fakeContainerName", + expectFailedContainerName: "", }, { - description: "cpu manager reconclie - container id not found", + description: "cpu manager reconcile - container state not found", activePods: []*v1.Pod{ { ObjectMeta: metav1.ObjectMeta{ @@ -831,6 +837,9 @@ func TestReconcileState(t *testing.T) { { Name: "fakeContainerName", ContainerID: "docker://fakeContainerID", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, }, }, }, @@ -867,6 +876,9 @@ func TestReconcileState(t *testing.T) { { Name: "fakeContainerName", ContainerID: "docker://fakeContainerID", + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, }, }, },