Merge pull request #130254 from tallclair/allocation-manager-2

[FG:InPlacePodVerticalScaling] Move pod resource allocation management out of the status manager
This commit is contained in:
Kubernetes Prow Robot 2025-02-28 11:30:56 -08:00 committed by GitHub
commit 3560950041
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 409 additions and 349 deletions

View File

@ -0,0 +1,164 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package allocation
import (
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
)
// podStatusManagerStateFile is the file name where status manager stores its state
const podStatusManagerStateFile = "pod_status_manager_state"
// AllocationManager tracks pod resource allocations.
type Manager interface {
// GetContainerResourceAllocation returns the AllocatedResources value for the container
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed.
// Returns the updated (or original) pod, and whether there was an allocation stored.
UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool)
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
SetPodAllocation(pod *v1.Pod) error
// DeletePodAllocation removes any stored state for the given pod UID.
DeletePodAllocation(uid types.UID)
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
RemoveOrphanedPods(remainingPods sets.Set[types.UID])
}
type manager struct {
state state.State
}
func NewManager(checkpointDirectory string) Manager {
m := &manager{}
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
stateImpl, err := state.NewStateCheckpoint(checkpointDirectory, podStatusManagerStateFile)
if err != nil {
// This is a crictical, non-recoverable failure.
klog.ErrorS(err, "Failed to initialize allocation checkpoint manager")
panic(err)
}
m.state = stateImpl
} else {
m.state = state.NewNoopStateCheckpoint()
}
return m
}
// NewInMemoryManager returns an allocation manager that doesn't persist state.
// For testing purposes only!
func NewInMemoryManager() Manager {
return &manager{
state: state.NewStateMemory(nil),
}
}
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
// If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
return m.state.GetContainerResourceAllocation(podUID, containerName)
}
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed.
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
allocs := m.state.GetPodResourceAllocation()
return updatePodFromAllocation(pod, allocs)
}
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
allocated, found := allocs[string(pod.UID)]
if !found {
return pod, false
}
updated := false
containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) {
if cAlloc, ok := allocated[c.Name]; ok {
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
// Allocation differs from pod spec, retrieve the allocation
if !updated {
// If this is the first update to be performed, copy the pod
pod = pod.DeepCopy()
updated = true
}
return cAlloc, true
}
}
return v1.ResourceRequirements{}, false
}
for i, c := range pod.Spec.Containers {
if cAlloc, found := containerAlloc(c); found {
// Allocation differs from pod spec, update
pod.Spec.Containers[i].Resources = cAlloc
}
}
for i, c := range pod.Spec.InitContainers {
if cAlloc, found := containerAlloc(c); found {
// Allocation differs from pod spec, update
pod.Spec.InitContainers[i].Resources = cAlloc
}
}
return pod, updated
}
// SetPodAllocation checkpoints the resources allocated to a pod's containers
func (m *manager) SetPodAllocation(pod *v1.Pod) error {
podAlloc := make(map[string]v1.ResourceRequirements)
for _, container := range pod.Spec.Containers {
alloc := *container.Resources.DeepCopy()
podAlloc[container.Name] = alloc
}
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
for _, container := range pod.Spec.InitContainers {
if podutil.IsRestartableInitContainer(&container) {
alloc := *container.Resources.DeepCopy()
podAlloc[container.Name] = alloc
}
}
}
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
}
func (m *manager) DeletePodAllocation(uid types.UID) {
if err := m.state.Delete(string(uid), ""); err != nil {
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
}
}
func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
m.state.RemoveOrphanedPods(remainingPods)
}

View File

@ -0,0 +1,165 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package allocation
import (
"testing"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
)
func TestUpdatePodFromAllocation(t *testing.T) {
containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345",
Name: "test",
Namespace: "default",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "c1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI),
},
},
},
{
Name: "c2",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
},
},
},
},
InitContainers: []v1.Container{
{
Name: "c1-restartable-init",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(300, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
},
},
RestartPolicy: &containerRestartPolicyAlways,
},
{
Name: "c1-init",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
},
},
},
},
},
}
resizedPod := pod.DeepCopy()
resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI)
resizedPod.Spec.InitContainers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(300, resource.DecimalSI)
tests := []struct {
name string
pod *v1.Pod
allocs state.PodResourceAllocation
expectPod *v1.Pod
expectUpdate bool
}{{
name: "steady state",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
"c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(),
},
},
expectUpdate: false,
}, {
name: "no allocations",
pod: pod,
allocs: state.PodResourceAllocation{},
expectUpdate: false,
}, {
name: "missing container allocation",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
},
},
expectUpdate: false,
}, {
name: "resized container",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
},
},
expectUpdate: true,
expectPod: resizedPod,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pod := test.pod.DeepCopy()
allocatedPod, updated := updatePodFromAllocation(pod, test.allocs)
if test.expectUpdate {
assert.True(t, updated, "updated")
assert.Equal(t, test.expectPod, allocatedPod)
assert.NotEqual(t, pod, allocatedPod)
} else {
assert.False(t, updated, "updated")
assert.Same(t, pod, allocatedPod)
}
})
}
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2025 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package allocation handles tracking pod resource allocations.
package allocation

View File

@ -18,6 +18,8 @@ package state
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
)
// PodResourceAllocation type is used in tracking resources allocated to pod's containers
@ -48,6 +50,8 @@ type writer interface {
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error
Delete(podUID string, containerName string) error
// RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
RemoveOrphanedPods(remainingPods sets.Set[types.UID])
}
// State interface provides methods for tracking and setting pod resource allocation

View File

@ -22,6 +22,8 @@ import (
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
@ -139,6 +141,12 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) error {
return sc.storeState()
}
func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
sc.cache.RemoveOrphanedPods(remainingPods)
// Don't bother updating the stored state. If Kubelet is restarted before the cache is written,
// the orphaned pods will be removed the next time this method is called.
}
type noopStateCheckpoint struct{}
// NewNoopStateCheckpoint creates a dummy state checkpoint manager
@ -165,3 +173,5 @@ func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ string, _ map[string]v
func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {
return nil
}
func (sc *noopStateCheckpoint) RemoveOrphanedPods(_ sets.Set[types.UID]) {}

View File

@ -20,6 +20,8 @@ import (
"sync"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)
@ -96,3 +98,14 @@ func (s *stateMemory) Delete(podUID string, containerName string) error {
s.deleteContainer(podUID, containerName)
return nil
}
func (s *stateMemory) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
s.Lock()
defer s.Unlock()
for podUID := range s.podAllocation {
if _, ok := remainingPods[types.UID(podUID)]; !ok {
delete(s.podAllocation, podUID)
}
}
}

View File

@ -77,6 +77,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/allocation"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
@ -662,7 +663,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager()
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, klet.getRootDir())
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker)
klet.allocationManager = allocation.NewManager(klet.getRootDir())
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
@ -1147,6 +1149,9 @@ type Kubelet struct {
// consult the pod worker.
statusManager status.Manager
// allocationManager manages allocated resources for pods.
allocationManager allocation.Manager
// resyncInterval is the interval between periodic full reconciliations of
// pods on this node.
resyncInterval time.Duration
@ -2638,7 +2643,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// To handle kubelet restarts, test pod admissibility using AllocatedResources values
// (for cpu & memory) from checkpoint store. If found, that is the source of truth.
allocatedPod, _ := kl.statusManager.UpdatePodFromAllocation(pod)
allocatedPod, _ := kl.allocationManager.UpdatePodFromAllocation(pod)
// Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(allocatedPods, allocatedPod); !ok {
@ -2651,7 +2656,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
continue
}
// For new pod, checkpoint the resource values at which the Pod has been admitted
if err := kl.statusManager.SetPodAllocation(allocatedPod); err != nil {
if err := kl.allocationManager.SetPodAllocation(allocatedPod); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
}
@ -2707,6 +2712,7 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
start := kl.clock.Now()
for _, pod := range pods {
kl.podManager.RemovePod(pod)
kl.allocationManager.DeletePodAllocation(pod.UID)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror {
@ -2870,7 +2876,7 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus, string)
// calculations after this function is called. It also updates the cached ResizeStatus according to
// the allocation decision and pod status.
func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error) {
allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod)
allocatedPod, updated := kl.allocationManager.UpdatePodFromAllocation(pod)
if !updated {
// Desired resources == allocated resources. Check whether a resize is in progress.
resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus)
@ -2891,7 +2897,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine
fit, resizeStatus, resizeMsg := kl.canResizePod(pod)
if fit {
// Update pod resource allocation checkpoint
if err := kl.statusManager.SetPodAllocation(pod); err != nil {
if err := kl.allocationManager.SetPodAllocation(pod); err != nil {
return nil, err
}
for i, container := range pod.Spec.Containers {

View File

@ -218,7 +218,7 @@ func (kl *Kubelet) getAllocatedPods() []*v1.Pod {
allocatedPods := make([]*v1.Pod, len(activePods))
for i, pod := range activePods {
allocatedPods[i], _ = kl.statusManager.UpdatePodFromAllocation(pod)
allocatedPods[i], _ = kl.allocationManager.UpdatePodFromAllocation(pod)
}
return allocatedPods
}
@ -1170,9 +1170,9 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// desired pods. Pods that must be restarted due to UID reuse, or leftover
// pods from previous runs, are not known to the pod worker.
allPodsByUID := make(map[types.UID]*v1.Pod)
allPodsByUID := make(sets.Set[types.UID])
for _, pod := range allPods {
allPodsByUID[pod.UID] = pod
allPodsByUID.Insert(pod.UID)
}
// Identify the set of pods that have workers, which should be all pods
@ -1219,6 +1219,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// Remove orphaned pod statuses not in the total list of known config pods
klog.V(3).InfoS("Clean up orphaned pod statuses")
kl.removeOrphanedPodStatuses(allPods, mirrorPods)
kl.allocationManager.RemoveOrphanedPods(allPodsByUID)
// Remove orphaned pod user namespace allocations (if any).
klog.V(3).InfoS("Clean up orphaned pod user namespace allocations")
@ -2152,7 +2153,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
// Always set the status to the latest allocated resources, even if it differs from the
// allocation used by the current sync loop.
alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName)
alloc, found := kl.allocationManager.GetContainerResourceAllocation(string(pod.UID), cName)
if !found {
// This case is expected for non-resizable containers (ephemeral & non-restartable init containers).
// Don't set status.Resources in this case.
@ -2372,7 +2373,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
status.Resources = convertContainerStatusResources(cName, status, cStatus, oldStatuses)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingAllocatedStatus) {
if alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName); found {
if alloc, found := kl.allocationManager.GetContainerResourceAllocation(string(pod.UID), cName); found {
status.AllocatedResources = alloc.Requests
}
}

View File

@ -5093,7 +5093,8 @@ func TestConvertToAPIContainerStatusesForResources(t *testing.T) {
} else {
tPod.Spec.Containers[0].Resources = tc.Resources
}
kubelet.statusManager.SetPodAllocation(tPod)
err := kubelet.allocationManager.SetPodAllocation(tPod)
require.NoError(t, err)
resources := tc.ActualResources
if resources == nil {
resources = &kubecontainer.ContainerResources{

View File

@ -62,6 +62,8 @@ import (
fakeremote "k8s.io/cri-client/pkg/fake"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/allocation"
"k8s.io/kubernetes/pkg/kubelet/allocation/state"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
"k8s.io/kubernetes/pkg/kubelet/clustertrustbundle"
@ -90,7 +92,6 @@ import (
serverstats "k8s.io/kubernetes/pkg/kubelet/server/stats"
"k8s.io/kubernetes/pkg/kubelet/stats"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/status/state"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
"k8s.io/kubernetes/pkg/kubelet/sysctl"
"k8s.io/kubernetes/pkg/kubelet/token"
@ -272,7 +273,8 @@ func newTestKubeletWithImageList(
kubelet.mirrorPodClient = fakeMirrorClient
kubelet.podManager = kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir())
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker)
kubelet.allocationManager = allocation.NewInMemoryManager()
kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker()
kubelet.containerRuntime = fakeRuntime
@ -2566,14 +2568,14 @@ func TestPodResourceAllocationReset(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
if tc.existingPodAllocation != nil {
// when kubelet restarts, AllocatedResources has already existed before adding pod
err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation)
err := kubelet.allocationManager.SetPodAllocation(tc.existingPodAllocation)
if err != nil {
t.Fatalf("failed to set pod allocation: %v", err)
}
}
kubelet.HandlePodAdditions([]*v1.Pod{tc.pod})
allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name)
allocatedResources, found := kubelet.allocationManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name)
if !found {
t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name)
}
@ -2903,9 +2905,9 @@ func TestHandlePodResourcesResize(t *testing.T) {
}
if !tt.newResourcesAllocated {
require.NoError(t, kubelet.statusManager.SetPodAllocation(originalPod))
require.NoError(t, kubelet.allocationManager.SetPodAllocation(originalPod))
} else {
require.NoError(t, kubelet.statusManager.SetPodAllocation(newPod))
require.NoError(t, kubelet.allocationManager.SetPodAllocation(newPod))
}
podStatus := &kubecontainer.PodStatus{
@ -2951,7 +2953,7 @@ func TestHandlePodResourcesResize(t *testing.T) {
assert.Equal(t, tt.expectedAllocatedReqs, updatedPodCtr.Resources.Requests, "updated pod spec requests")
assert.Equal(t, tt.expectedAllocatedLims, updatedPodCtr.Resources.Limits, "updated pod spec limits")
alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(newPod.UID), updatedPodCtr.Name)
alloc, found := kubelet.allocationManager.GetContainerResourceAllocation(string(newPod.UID), updatedPodCtr.Name)
require.True(t, found, "container allocation")
assert.Equal(t, tt.expectedAllocatedReqs, alloc.Requests, "stored container request allocation")
assert.Equal(t, tt.expectedAllocatedLims, alloc.Limits, "stored container limit allocation")

View File

@ -17,7 +17,6 @@ limitations under the License.
package prober
import (
"os"
"reflect"
"sync"
@ -114,14 +113,8 @@ func newTestManager() *manager {
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod())
testRootDir := ""
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
return nil
} else {
testRootDir = tempDir
}
m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir),
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
results.NewManager(),
results.NewManager(),
results.NewManager(),

View File

@ -21,7 +21,6 @@ import (
"fmt"
"net"
"net/http"
"os"
"sync"
"sync/atomic"
"testing"
@ -81,16 +80,10 @@ func TestTCPPortExhaustion(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testRootDir := ""
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err)
} else {
testRootDir = tempDir
}
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir),
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
results.NewManager(),
results.NewManager(),
results.NewManager(),

View File

@ -19,7 +19,6 @@ package prober
import (
"context"
"fmt"
"os"
"testing"
"time"
@ -152,14 +151,7 @@ func TestDoProbe(t *testing.T) {
t.Errorf("[%s-%d] Expected result: %v but got %v", probeType, i, test.expectedResult, result)
}
// Clean up.
testRootDir := ""
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err)
} else {
testRootDir = tempDir
}
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir)
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker())
resultsManager(m, probeType).Remove(testContainerID)
}
}

View File

@ -19,16 +19,11 @@ package status
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/status/state"
)
type fakeManager struct {
state state.State
podResizeStatuses map[types.UID]v1.PodResizeStatus
}
@ -67,41 +62,10 @@ func (m *fakeManager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
return
}
func (m *fakeManager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
klog.InfoS("GetContainerResourceAllocation()")
return m.state.GetContainerResourceAllocation(podUID, containerName)
}
func (m *fakeManager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
return m.podResizeStatuses[podUID]
}
func (m *fakeManager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
allocs := m.state.GetPodResourceAllocation()
return updatePodFromAllocation(pod, allocs)
}
func (m *fakeManager) SetPodAllocation(pod *v1.Pod) error {
klog.InfoS("SetPodAllocation()")
for _, container := range pod.Spec.Containers {
alloc := *container.Resources.DeepCopy()
if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil {
return err
}
}
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
for _, container := range pod.Spec.InitContainers {
if podutil.IsRestartableInitContainer(&container) {
alloc := *container.Resources.DeepCopy()
if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil {
return err
}
}
}
}
return nil
}
func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) {
m.podResizeStatuses[podUID] = resizeStatus
}
@ -109,7 +73,6 @@ func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodRe
// NewFakeManager creates empty/fake memory manager
func NewFakeManager() Manager {
return &fakeManager{
state: state.NewStateMemory(state.PodResourceAllocation{}),
podResizeStatuses: make(map[types.UID]v1.PodResizeStatus),
}
}

View File

@ -40,15 +40,11 @@ import (
"k8s.io/kubernetes/pkg/features"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/status/state"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
kubeutil "k8s.io/kubernetes/pkg/kubelet/util"
statusutil "k8s.io/kubernetes/pkg/util/pod"
)
// podStatusManagerStateFile is the file name where status manager stores its state
const podStatusManagerStateFile = "pod_status_manager_state"
// A wrapper around v1.PodStatus that includes a version to enforce that stale pod statuses are
// not sent to the API server.
type versionedPodStatus struct {
@ -82,10 +78,6 @@ type manager struct {
podDeletionSafety PodDeletionSafetyProvider
podStartupLatencyHelper PodStartupLatencyStateHelper
// state allows to save/restore pod resource allocation and tolerate kubelet restarts.
state state.State
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string
}
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
@ -149,28 +141,12 @@ type Manager interface {
// SetPodResizeStatus caches the last resizing decision for the pod.
SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus)
allocationManager
}
// TODO(tallclair): Refactor allocation state handling out of the status manager.
type allocationManager interface {
// GetContainerResourceAllocation returns the checkpointed AllocatedResources value for the container
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed.
// Returns the updated (or original) pod, and whether there was an allocation stored.
UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool)
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
SetPodAllocation(pod *v1.Pod) error
}
const syncPeriod = 10 * time.Second
// NewManager returns a functional Manager.
func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager {
func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper) Manager {
return &manager{
kubeClient: kubeClient,
podManager: podManager,
@ -180,7 +156,6 @@ func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeleti
apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
podDeletionSafety: podDeletionSafety,
podStartupLatencyHelper: podStartupLatencyHelper,
stateFileDirectory: stateFileDirectory,
}
}
@ -204,20 +179,6 @@ func isPodStatusByKubeletEqual(oldStatus, status *v1.PodStatus) bool {
}
func (m *manager) Start() {
// Initialize m.state to no-op state checkpoint manager
m.state = state.NewNoopStateCheckpoint()
// Create pod allocation checkpoint manager even if client is nil so as to allow local get/set of AllocatedResources & Resize
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
stateImpl, err := state.NewStateCheckpoint(m.stateFileDirectory, podStatusManagerStateFile)
if err != nil {
// This is a crictical, non-recoverable failure.
klog.ErrorS(err, "Could not initialize pod allocation checkpoint manager, please drain node and remove policy state file")
panic(err)
}
m.state = stateImpl
}
// Don't start the status manager if we don't have a client. This will happen
// on the master, where the kubelet is responsible for bootstrapping the pods
// of the master components.
@ -246,61 +207,6 @@ func (m *manager) Start() {
}, 0)
}
// GetContainerResourceAllocation returns the last checkpointed AllocatedResources values
// If checkpoint manager has not been initialized, it returns nil, false
func (m *manager) GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
return m.state.GetContainerResourceAllocation(podUID, containerName)
}
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed.
func (m *manager) UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool) {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
// TODO(tallclair): This clones the whole cache, but we only need 1 pod.
allocs := m.state.GetPodResourceAllocation()
return updatePodFromAllocation(pod, allocs)
}
func updatePodFromAllocation(pod *v1.Pod, allocs state.PodResourceAllocation) (*v1.Pod, bool) {
allocated, found := allocs[string(pod.UID)]
if !found {
return pod, false
}
updated := false
containerAlloc := func(c v1.Container) (v1.ResourceRequirements, bool) {
if cAlloc, ok := allocated[c.Name]; ok {
if !apiequality.Semantic.DeepEqual(c.Resources, cAlloc) {
// Allocation differs from pod spec, retrieve the allocation
if !updated {
// If this is the first update to be performed, copy the pod
pod = pod.DeepCopy()
updated = true
}
return cAlloc, true
}
}
return v1.ResourceRequirements{}, false
}
for i, c := range pod.Spec.Containers {
if cAlloc, found := containerAlloc(c); found {
// Allocation differs from pod spec, update
pod.Spec.Containers[i].Resources = cAlloc
}
}
for i, c := range pod.Spec.InitContainers {
if cAlloc, found := containerAlloc(c); found {
// Allocation differs from pod spec, update
pod.Spec.InitContainers[i].Resources = cAlloc
}
}
return pod, updated
}
// GetPodResizeStatus returns the last cached ResizeStatus value.
func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
m.podStatusesLock.RLock()
@ -308,29 +214,6 @@ func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
return m.podResizeStatuses[podUID]
}
// SetPodAllocation checkpoints the resources allocated to a pod's containers
func (m *manager) SetPodAllocation(pod *v1.Pod) error {
m.podStatusesLock.RLock()
defer m.podStatusesLock.RUnlock()
podAlloc := make(map[string]v1.ResourceRequirements)
for _, container := range pod.Spec.Containers {
alloc := *container.Resources.DeepCopy()
podAlloc[container.Name] = alloc
}
if utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
for _, container := range pod.Spec.InitContainers {
if podutil.IsRestartableInitContainer(&container) {
alloc := *container.Resources.DeepCopy()
podAlloc[container.Name] = alloc
}
}
}
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
}
// SetPodResizeStatus checkpoints the last resizing decision for the pod.
func (m *manager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodResizeStatus) {
m.podStatusesLock.Lock()
@ -806,7 +689,6 @@ func (m *manager) deletePodStatus(uid types.UID) {
m.podStartupLatencyHelper.DeletePodStartupState(uid)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
delete(m.podResizeStatuses, uid)
m.state.Delete(string(uid), "")
}
}
@ -820,7 +702,6 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
delete(m.podStatuses, key)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
delete(m.podResizeStatuses, key)
m.state.Delete(string(key), "")
}
}
}

View File

@ -19,7 +19,6 @@ package status
import (
"fmt"
"math/rand"
"os"
"reflect"
"strconv"
"strings"
@ -32,7 +31,6 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -43,7 +41,6 @@ import (
api "k8s.io/kubernetes/pkg/apis/core"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/status/state"
statustest "k8s.io/kubernetes/pkg/kubelet/status/testing"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util"
@ -92,13 +89,7 @@ func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager()
podManager.(mutablePodManager).AddPod(getTestPod())
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
testRootDir := ""
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
return nil
} else {
testRootDir = tempDir
}
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir).(*manager)
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
}
func generateRandomMessage() string {
@ -1088,7 +1079,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager)
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
original := tc.pod.DeepCopy()
syncer.SetPodStatus(original, original.Status)
@ -1174,7 +1165,7 @@ func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) {
t.Run(name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager)
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
pod := getTestPod()
pod.Status = tc.status
@ -2036,143 +2027,6 @@ func TestMergePodStatus(t *testing.T) {
}
func TestUpdatePodFromAllocation(t *testing.T) {
containerRestartPolicyAlways := v1.ContainerRestartPolicyAlways
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: "12345",
Name: "test",
Namespace: "default",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "c1",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(200, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(300, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(400, resource.DecimalSI),
},
},
},
{
Name: "c2",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
},
},
},
},
InitContainers: []v1.Container{
{
Name: "c1-restartable-init",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(300, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(400, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(500, resource.DecimalSI),
},
},
RestartPolicy: &containerRestartPolicyAlways,
},
{
Name: "c1-init",
Resources: v1.ResourceRequirements{
Requests: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(500, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(600, resource.DecimalSI),
},
Limits: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(700, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(800, resource.DecimalSI),
},
},
},
},
},
}
resizedPod := pod.DeepCopy()
resizedPod.Spec.Containers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(200, resource.DecimalSI)
resizedPod.Spec.InitContainers[0].Resources.Requests[v1.ResourceCPU] = *resource.NewMilliQuantity(300, resource.DecimalSI)
tests := []struct {
name string
pod *v1.Pod
allocs state.PodResourceAllocation
expectPod *v1.Pod
expectUpdate bool
}{{
name: "steady state",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c1": *pod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *pod.Spec.InitContainers[0].Resources.DeepCopy(),
"c1-init": *pod.Spec.InitContainers[1].Resources.DeepCopy(),
},
},
expectUpdate: false,
}, {
name: "no allocations",
pod: pod,
allocs: state.PodResourceAllocation{},
expectUpdate: false,
}, {
name: "missing container allocation",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c2": *pod.Spec.Containers[1].Resources.DeepCopy(),
},
},
expectUpdate: false,
}, {
name: "resized container",
pod: pod,
allocs: state.PodResourceAllocation{
string(pod.UID): map[string]v1.ResourceRequirements{
"c1": *resizedPod.Spec.Containers[0].Resources.DeepCopy(),
"c2": *resizedPod.Spec.Containers[1].Resources.DeepCopy(),
"c1-restartable-init": *resizedPod.Spec.InitContainers[0].Resources.DeepCopy(),
"c1-init": *resizedPod.Spec.InitContainers[1].Resources.DeepCopy(),
},
},
expectUpdate: true,
expectPod: resizedPod,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
pod := test.pod.DeepCopy()
allocatedPod, updated := updatePodFromAllocation(pod, test.allocs)
if test.expectUpdate {
assert.True(t, updated, "updated")
assert.Equal(t, test.expectPod, allocatedPod)
assert.NotEqual(t, pod, allocatedPod)
} else {
assert.False(t, updated, "updated")
assert.Same(t, pod, allocatedPod)
}
})
}
}
func statusEqual(left, right v1.PodStatus) bool {
left.Conditions = nil
right.Conditions = nil