mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #84300 from klueska/upstream-cpu-manager-reconcile-on-container-state
Update logic in `CPUManager` `reconcileState()`
This commit is contained in:
commit
e6b5194ec1
@ -200,6 +200,8 @@ func (m *manager) Start(activePods ActivePodsFunc, sourcesReady config.SourcesRe
|
||||
if m.policy.Name() == string(PolicyNone) {
|
||||
return nil
|
||||
}
|
||||
// Periodically call m.reconcileState() to continue to keep the CPU sets of
|
||||
// all pods in sync with and guaranteed CPUs handed out among them.
|
||||
go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
|
||||
return nil
|
||||
}
|
||||
@ -217,19 +219,24 @@ func (m *manager) AddContainer(p *v1.Pod, c *v1.Container, containerID string) e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Call down into the policy to assign this container CPUs if required.
|
||||
err := m.policyAddContainer(p, c, containerID)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] AddContainer error: %v", err)
|
||||
m.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// Get the CPUs just assigned to the container (or fall back to the default
|
||||
// CPUSet if none were assigned).
|
||||
cpus := m.state.GetCPUSetOrDefault(string(p.UID), c.Name)
|
||||
m.Unlock()
|
||||
|
||||
if !cpus.IsEmpty() {
|
||||
err = m.updateContainerCPUSet(containerID, cpus)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] AddContainer error: %v", err)
|
||||
klog.Errorf("[cpumanager] AddContainer error: error updating CPUSet for container (pod: %s, container: %s, container id: %s, err: %v)", p.Name, c.Name, containerID, err)
|
||||
m.Lock()
|
||||
err := m.policyRemoveContainerByID(containerID)
|
||||
if err != nil {
|
||||
@ -353,43 +360,59 @@ func (m *manager) reconcileState() (success []reconciledContainer, failure []rec
|
||||
|
||||
m.removeStaleState()
|
||||
for _, pod := range m.activePods() {
|
||||
pstatus, ok := m.podStatusProvider.GetPodStatus(pod.UID)
|
||||
if !ok {
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)", pod.Name)
|
||||
failure = append(failure, reconciledContainer{pod.Name, "", ""})
|
||||
continue
|
||||
}
|
||||
|
||||
allContainers := pod.Spec.InitContainers
|
||||
allContainers = append(allContainers, pod.Spec.Containers...)
|
||||
status, ok := m.podStatusProvider.GetPodStatus(pod.UID)
|
||||
for _, container := range allContainers {
|
||||
if !ok {
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping pod; status not found (pod: %s)", pod.Name)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
|
||||
break
|
||||
}
|
||||
|
||||
containerID, err := findContainerIDByName(&status, container.Name)
|
||||
containerID, err := findContainerIDByName(&pstatus, container.Name)
|
||||
if err != nil {
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping container; ID not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
|
||||
continue
|
||||
}
|
||||
|
||||
// Check whether container is present in state, there may be 3 reasons why it's not present:
|
||||
// - policy does not want to track the container
|
||||
// - kubelet has just been restarted - and there is no previous state file
|
||||
// - container has been removed from state by RemoveContainer call (DeletionTimestamp is set)
|
||||
if _, ok := m.state.GetCPUSet(string(pod.UID), container.Name); !ok {
|
||||
if status.Phase == v1.PodRunning && pod.DeletionTimestamp == nil {
|
||||
klog.V(4).Infof("[cpumanager] reconcileState: container is not present in state - trying to add (pod: %s, container: %s, container id: %s)", pod.Name, container.Name, containerID)
|
||||
err := m.AddContainer(pod, &container, containerID)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] reconcileState: failed to add container (pod: %s, container: %s, container id: %s, error: %v)", pod.Name, container.Name, containerID, err)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
// if DeletionTimestamp is set, pod has already been removed from state
|
||||
// skip the pod/container since it's not running and will be deleted soon
|
||||
continue
|
||||
}
|
||||
cstatus, err := findContainerStatusByName(&pstatus, container.Name)
|
||||
if err != nil {
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping container; container status not found in pod status (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
|
||||
continue
|
||||
}
|
||||
|
||||
if cstatus.State.Waiting != nil ||
|
||||
(cstatus.State.Waiting == nil && cstatus.State.Running == nil && cstatus.State.Terminated == nil) {
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping container; container still in the waiting state (pod: %s, container: %s, error: %v)", pod.Name, container.Name, err)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, ""})
|
||||
continue
|
||||
}
|
||||
|
||||
if cstatus.State.Terminated != nil {
|
||||
// Since the container is terminated, we know it is safe to
|
||||
// remove it without any reconciliation. Removing the container
|
||||
// will also remove it from the `containerMap` so that this
|
||||
// container will be skipped next time around the loop.
|
||||
_, _, err := m.containerMap.GetContainerRef(containerID)
|
||||
if err == nil {
|
||||
klog.Warningf("[cpumanager] reconcileState: skipping container; already terminated (pod: %s, container id: %s)", pod.Name, containerID)
|
||||
err := m.RemoveContainer(containerID)
|
||||
if err != nil {
|
||||
klog.Errorf("[cpumanager] reconcileState: failed to remove container (pod: %s, container id: %s, error: %v)", pod.Name, containerID, err)
|
||||
failure = append(failure, reconciledContainer{pod.Name, container.Name, containerID})
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Once we make it here we know we have a running container.
|
||||
// Idempotently add it to the containerMap incase it is missing.
|
||||
// This can happen after a kubelet restart, for example.
|
||||
m.containerMap.Add(string(pod.UID), container.Name, containerID)
|
||||
|
||||
cset := m.state.GetCPUSetOrDefault(string(pod.UID), container.Name)
|
||||
if cset.IsEmpty() {
|
||||
// NOTE: This should not happen outside of tests.
|
||||
@ -427,6 +450,15 @@ func findContainerIDByName(status *v1.PodStatus, name string) (string, error) {
|
||||
return "", fmt.Errorf("unable to find ID for container with name %v in pod status (it may not be running)", name)
|
||||
}
|
||||
|
||||
func findContainerStatusByName(status *v1.PodStatus, name string) (*v1.ContainerStatus, error) {
|
||||
for _, status := range append(status.InitContainerStatuses, status.ContainerStatuses...) {
|
||||
if status.Name == name {
|
||||
return &status, nil
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("unable to find status for container with name %v in pod status (it may not be running)", name)
|
||||
}
|
||||
|
||||
func (m *manager) updateContainerCPUSet(containerID string, cpus cpuset.CPUSet) error {
|
||||
// TODO: Consider adding a `ResourceConfigForContainer` helper in
|
||||
// helpers_linux.go similar to what exists for pods.
|
||||
|
@ -702,6 +702,9 @@ func TestReconcileState(t *testing.T) {
|
||||
{
|
||||
Name: "fakeContainerName",
|
||||
ContainerID: "docker://fakeContainerID",
|
||||
State: v1.ContainerState{
|
||||
Running: &v1.ContainerStateRunning{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -738,6 +741,9 @@ func TestReconcileState(t *testing.T) {
|
||||
{
|
||||
Name: "fakeContainerName",
|
||||
ContainerID: "docker://fakeContainerID",
|
||||
State: v1.ContainerState{
|
||||
Running: &v1.ContainerStateRunning{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -753,7 +759,7 @@ func TestReconcileState(t *testing.T) {
|
||||
expectFailedContainerName: "",
|
||||
},
|
||||
{
|
||||
description: "cpu manager reconclie - pod status not found",
|
||||
description: "cpu manager reconcile - pod status not found",
|
||||
activePods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -775,10 +781,10 @@ func TestReconcileState(t *testing.T) {
|
||||
stDefaultCPUSet: cpuset.NewCPUSet(),
|
||||
updateErr: nil,
|
||||
expectSucceededContainerName: "",
|
||||
expectFailedContainerName: "fakeContainerName",
|
||||
expectFailedContainerName: "",
|
||||
},
|
||||
{
|
||||
description: "cpu manager reconclie - container id not found",
|
||||
description: "cpu manager reconcile - container state not found",
|
||||
activePods: []*v1.Pod{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -831,6 +837,9 @@ func TestReconcileState(t *testing.T) {
|
||||
{
|
||||
Name: "fakeContainerName",
|
||||
ContainerID: "docker://fakeContainerID",
|
||||
State: v1.ContainerState{
|
||||
Running: &v1.ContainerStateRunning{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
@ -867,6 +876,9 @@ func TestReconcileState(t *testing.T) {
|
||||
{
|
||||
Name: "fakeContainerName",
|
||||
ContainerID: "docker://fakeContainerID",
|
||||
State: v1.ContainerState{
|
||||
Running: &v1.ContainerStateRunning{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user