diff --git a/pkg/kubelet/cm/cpumanager/cpu_manager.go b/pkg/kubelet/cm/cpumanager/cpu_manager.go index 2ab8b2b37ad..ac8a009ae05 100644 --- a/pkg/kubelet/cm/cpumanager/cpu_manager.go +++ b/pkg/kubelet/cm/cpumanager/cpu_manager.go @@ -239,15 +239,68 @@ type reconciledContainer struct { containerID string } -func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) { +func (m *manager) removeStaleState() { + // Only once all sources are ready do we attempt to remove any stale state. + // This ensures that the call to `m.activePods()` below will succeed with + // the actual active pods list. if !m.sourcesReady.AllReady() { return } + + // We grab the lock to ensure that no new containers will grab CPUs while + // executing the code below. Without this lock, its possible that we end up + // removing state that is newly added by an asynchronous call to + // AddContainer() during the execution of this code. + m.Lock() + defer m.Unlock() + + // We remove stale state very conservatively, only removing *any* state + // once we know for sure that we wont be accidentally removing state that + // is still valid. Since this function is called periodically, we will just + // try again next time this function is called. + activePods := m.activePods() + if len(activePods) == 0 { + // If there are no active pods, skip the removal of stale state. + return + } + + // Build a list of containerIDs for all containers in all active Pods. + activeContainers := make(map[string]struct{}) + for _, pod := range activePods { + pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID) + if !ok { + // If even one pod does not have it's status set, skip state removal. + return + } + for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) { + containerID, err := findContainerIDByName(&pstatus, container.Name) + if err != nil { + // If even one container does not have it's containerID set, skip state removal. + return + } + activeContainers[containerID] = struct{}{} + } + } + + // Loop through the CPUManager state. Remove any state for containers not + // in the `activeContainers` list built above. The shortcircuits in place + // above ensure that no erroneous state will ever be removed. + for containerID := range m.state.GetCPUAssignments() { + if _, ok := activeContainers[containerID]; !ok { + klog.Errorf("[cpumanager] removeStaleState: removing container: %s)", containerID) + err := m.policy.RemoveContainer(m.state, containerID) + if err != nil { + klog.Errorf("[cpumanager] removeStaleState: failed to remove container %s, error: %v)", containerID, err) + } + } + } +} + +func (m *manager) reconcileState() (success []reconciledContainer, failure []reconciledContainer) { success = []reconciledContainer{} failure = []reconciledContainer{} - activeContainers := make(map[string]*v1.Pod) - + m.removeStaleState() for _, pod := range m.activePods() { allContainers := pod.Spec.InitContainers allContainers = append(allContainers, pod.Spec.Containers...) @@ -286,8 +339,6 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec } } - activeContainers[containerID] = pod - cset := m.state.GetCPUSetOrDefault(containerID) if cset.IsEmpty() { // NOTE: This should not happen outside of tests. @@ -306,16 +357,6 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec success = append(success, reconciledContainer{pod.Name, container.Name, containerID}) } } - - for containerID := range m.state.GetCPUAssignments() { - if pod, ok := activeContainers[containerID]; !ok { - err := m.RemoveContainer(containerID) - if err != nil { - klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err) - failure = append(failure, reconciledContainer{pod.Name, "", containerID}) - } - } - } return success, failure }