Call allocationManager directly

This commit is contained in:
Tim Allclair 2025-02-18 19:34:47 -08:00
parent 84ec78ede7
commit fe4671356c
14 changed files with 52 additions and 91 deletions

View File

@ -20,6 +20,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature" utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2" "k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
@ -44,10 +45,10 @@ type Manager interface {
SetPodAllocation(pod *v1.Pod) error SetPodAllocation(pod *v1.Pod) error
// DeletePodAllocation removes any stored state for the given pod UID. // DeletePodAllocation removes any stored state for the given pod UID.
DeletePodAllocation(uid types.UID) error DeletePodAllocation(uid types.UID)
// RemoveOrphanedPods removes the stored state for any pods not included in the list of remaining pod UIDs. // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
RemoveOrphanedPods(remainingPods map[types.UID]bool) RemoveOrphanedPods(remainingPods sets.Set[types.UID])
} }
type manager struct { type manager struct {
@ -151,10 +152,13 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc) return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
} }
func (m *manager) DeletePodAllocation(uid types.UID) error { func (m *manager) DeletePodAllocation(uid types.UID) {
return m.state.Delete(string(uid), "") if err := m.state.Delete(string(uid), ""); err != nil {
// If the deletion fails, it will be retried by RemoveOrphanedPods, so we can safely ignore the error.
klog.V(3).ErrorS(err, "Failed to delete pod allocation", "podUID", uid)
}
} }
func (m *manager) RemoveOrphanedPods(remainingPods map[types.UID]bool) { func (m *manager) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
m.state.RemoveOrphanedPods(remainingPods) m.state.RemoveOrphanedPods(remainingPods)
} }

View File

@ -19,6 +19,7 @@ package state
import ( import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
) )
// PodResourceAllocation type is used in tracking resources allocated to pod's containers // PodResourceAllocation type is used in tracking resources allocated to pod's containers
@ -49,8 +50,8 @@ type writer interface {
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error
Delete(podUID string, containerName string) error Delete(podUID string, containerName string) error
// RemoveOrphanedPods removes the stored state for any pods not included in the list of remaining pod UIDs. // RemoveOrphanedPods removes the stored state for any pods not included in the set of remaining pods.
RemoveOrphanedPods(remainingPods map[types.UID]bool) RemoveOrphanedPods(remainingPods sets.Set[types.UID])
} }
// State interface provides methods for tracking and setting pod resource allocation // State interface provides methods for tracking and setting pod resource allocation

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
@ -140,7 +141,7 @@ func (sc *stateCheckpoint) Delete(podUID string, containerName string) error {
return sc.storeState() return sc.storeState()
} }
func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods map[types.UID]bool) { func (sc *stateCheckpoint) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
sc.cache.RemoveOrphanedPods(remainingPods) sc.cache.RemoveOrphanedPods(remainingPods)
// Don't bother updating the stored state. If Kubelet is restarted before the cache is written, // Don't bother updating the stored state. If Kubelet is restarted before the cache is written,
// the orphaned pods will be removed the next time this method is called. // the orphaned pods will be removed the next time this method is called.
@ -173,4 +174,4 @@ func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {
return nil return nil
} }
func (sc *noopStateCheckpoint) RemoveOrphanedPods(_ map[types.UID]bool) {} func (sc *noopStateCheckpoint) RemoveOrphanedPods(_ sets.Set[types.UID]) {}

View File

@ -21,6 +21,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2" "k8s.io/klog/v2"
) )
@ -98,7 +99,7 @@ func (s *stateMemory) Delete(podUID string, containerName string) error {
return nil return nil
} }
func (s *stateMemory) RemoveOrphanedPods(remainingPods map[types.UID]bool) { func (s *stateMemory) RemoveOrphanedPods(remainingPods sets.Set[types.UID]) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()

View File

@ -77,6 +77,7 @@ import (
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/api/v1/resource" "k8s.io/kubernetes/pkg/api/v1/resource"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/allocation"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1" "k8s.io/kubernetes/pkg/kubelet/apis/config/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/apis/podresources" "k8s.io/kubernetes/pkg/kubelet/apis/podresources"
@ -662,7 +663,8 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister) klet.mirrorPodClient = kubepod.NewBasicMirrorClient(klet.kubeClient, string(nodeName), nodeLister)
klet.podManager = kubepod.NewBasicPodManager() klet.podManager = kubepod.NewBasicPodManager()
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker, klet.getRootDir()) klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet, kubeDeps.PodStartupLatencyTracker)
klet.allocationManager = allocation.NewManager(klet.getRootDir())
klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder) klet.resourceAnalyzer = serverstats.NewResourceAnalyzer(klet, kubeCfg.VolumeStatsAggPeriod.Duration, kubeDeps.Recorder)
@ -1147,6 +1149,9 @@ type Kubelet struct {
// consult the pod worker. // consult the pod worker.
statusManager status.Manager statusManager status.Manager
// allocationManager manages allocated resources for pods.
allocationManager allocation.Manager
// resyncInterval is the interval between periodic full reconciliations of // resyncInterval is the interval between periodic full reconciliations of
// pods on this node. // pods on this node.
resyncInterval time.Duration resyncInterval time.Duration
@ -2644,7 +2649,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
// To handle kubelet restarts, test pod admissibility using AllocatedResources values // To handle kubelet restarts, test pod admissibility using AllocatedResources values
// (for cpu & memory) from checkpoint store. If found, that is the source of truth. // (for cpu & memory) from checkpoint store. If found, that is the source of truth.
allocatedPod, _ := kl.statusManager.UpdatePodFromAllocation(pod) allocatedPod, _ := kl.allocationManager.UpdatePodFromAllocation(pod)
// Check if we can admit the pod; if not, reject it. // Check if we can admit the pod; if not, reject it.
if ok, reason, message := kl.canAdmitPod(allocatedPods, allocatedPod); !ok { if ok, reason, message := kl.canAdmitPod(allocatedPods, allocatedPod); !ok {
@ -2657,7 +2662,7 @@ func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
continue continue
} }
// For new pod, checkpoint the resource values at which the Pod has been admitted // For new pod, checkpoint the resource values at which the Pod has been admitted
if err := kl.statusManager.SetPodAllocation(allocatedPod); err != nil { if err := kl.allocationManager.SetPodAllocation(allocatedPod); err != nil {
//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate //TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod)) klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
} }
@ -2713,6 +2718,7 @@ func (kl *Kubelet) HandlePodRemoves(pods []*v1.Pod) {
start := kl.clock.Now() start := kl.clock.Now()
for _, pod := range pods { for _, pod := range pods {
kl.podManager.RemovePod(pod) kl.podManager.RemovePod(pod)
kl.allocationManager.DeletePodAllocation(pod.UID)
pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod)
if wasMirror { if wasMirror {
@ -2876,7 +2882,7 @@ func (kl *Kubelet) canResizePod(pod *v1.Pod) (bool, v1.PodResizeStatus, string)
// calculations after this function is called. It also updates the cached ResizeStatus according to // calculations after this function is called. It also updates the cached ResizeStatus according to
// the allocation decision and pod status. // the allocation decision and pod status.
func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error) { func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontainer.PodStatus) (*v1.Pod, error) {
allocatedPod, updated := kl.statusManager.UpdatePodFromAllocation(pod) allocatedPod, updated := kl.allocationManager.UpdatePodFromAllocation(pod)
if !updated { if !updated {
// Desired resources == allocated resources. Check whether a resize is in progress. // Desired resources == allocated resources. Check whether a resize is in progress.
resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus) resizeInProgress := !allocatedResourcesMatchStatus(allocatedPod, podStatus)
@ -2897,7 +2903,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine
fit, resizeStatus, resizeMsg := kl.canResizePod(pod) fit, resizeStatus, resizeMsg := kl.canResizePod(pod)
if fit { if fit {
// Update pod resource allocation checkpoint // Update pod resource allocation checkpoint
if err := kl.statusManager.SetPodAllocation(pod); err != nil { if err := kl.allocationManager.SetPodAllocation(pod); err != nil {
return nil, err return nil, err
} }
for i, container := range pod.Spec.Containers { for i, container := range pod.Spec.Containers {

View File

@ -217,7 +217,7 @@ func (kl *Kubelet) getAllocatedPods() []*v1.Pod {
allocatedPods := make([]*v1.Pod, len(activePods)) allocatedPods := make([]*v1.Pod, len(activePods))
for i, pod := range activePods { for i, pod := range activePods {
allocatedPods[i], _ = kl.statusManager.UpdatePodFromAllocation(pod) allocatedPods[i], _ = kl.allocationManager.UpdatePodFromAllocation(pod)
} }
return allocatedPods return allocatedPods
} }
@ -1169,9 +1169,9 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// desired pods. Pods that must be restarted due to UID reuse, or leftover // desired pods. Pods that must be restarted due to UID reuse, or leftover
// pods from previous runs, are not known to the pod worker. // pods from previous runs, are not known to the pod worker.
allPodsByUID := make(map[types.UID]*v1.Pod) allPodsByUID := make(sets.Set[types.UID])
for _, pod := range allPods { for _, pod := range allPods {
allPodsByUID[pod.UID] = pod allPodsByUID.Insert(pod.UID)
} }
// Identify the set of pods that have workers, which should be all pods // Identify the set of pods that have workers, which should be all pods
@ -1218,6 +1218,7 @@ func (kl *Kubelet) HandlePodCleanups(ctx context.Context) error {
// Remove orphaned pod statuses not in the total list of known config pods // Remove orphaned pod statuses not in the total list of known config pods
klog.V(3).InfoS("Clean up orphaned pod statuses") klog.V(3).InfoS("Clean up orphaned pod statuses")
kl.removeOrphanedPodStatuses(allPods, mirrorPods) kl.removeOrphanedPodStatuses(allPods, mirrorPods)
kl.allocationManager.RemoveOrphanedPods(allPodsByUID)
// Remove orphaned pod user namespace allocations (if any). // Remove orphaned pod user namespace allocations (if any).
klog.V(3).InfoS("Clean up orphaned pod user namespace allocations") klog.V(3).InfoS("Clean up orphaned pod user namespace allocations")
@ -2147,7 +2148,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
// Always set the status to the latest allocated resources, even if it differs from the // Always set the status to the latest allocated resources, even if it differs from the
// allocation used by the current sync loop. // allocation used by the current sync loop.
alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName) alloc, found := kl.allocationManager.GetContainerResourceAllocation(string(pod.UID), cName)
if !found { if !found {
// This case is expected for non-resizable containers (ephemeral & non-restartable init containers). // This case is expected for non-resizable containers (ephemeral & non-restartable init containers).
// Don't set status.Resources in this case. // Don't set status.Resources in this case.
@ -2367,7 +2368,7 @@ func (kl *Kubelet) convertToAPIContainerStatuses(pod *v1.Pod, podStatus *kubecon
status.Resources = convertContainerStatusResources(cName, status, cStatus, oldStatuses) status.Resources = convertContainerStatusResources(cName, status, cStatus, oldStatuses)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingAllocatedStatus) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScalingAllocatedStatus) {
if alloc, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), cName); found { if alloc, found := kl.allocationManager.GetContainerResourceAllocation(string(pod.UID), cName); found {
status.AllocatedResources = alloc.Requests status.AllocatedResources = alloc.Requests
} }
} }

View File

@ -5087,7 +5087,8 @@ func TestConvertToAPIContainerStatusesForResources(t *testing.T) {
} else { } else {
tPod.Spec.Containers[0].Resources = tc.Resources tPod.Spec.Containers[0].Resources = tc.Resources
} }
kubelet.statusManager.SetPodAllocation(tPod) err := kubelet.allocationManager.SetPodAllocation(tPod)
require.NoError(t, err)
resources := tc.ActualResources resources := tc.ActualResources
if resources == nil { if resources == nil {
resources = &kubecontainer.ContainerResources{ resources = &kubecontainer.ContainerResources{

View File

@ -62,6 +62,7 @@ import (
fakeremote "k8s.io/cri-client/pkg/fake" fakeremote "k8s.io/cri-client/pkg/fake"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/allocation"
"k8s.io/kubernetes/pkg/kubelet/allocation/state" "k8s.io/kubernetes/pkg/kubelet/allocation/state"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/config"
cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" cadvisortest "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing"
@ -272,7 +273,8 @@ func newTestKubeletWithImageList(
kubelet.mirrorPodClient = fakeMirrorClient kubelet.mirrorPodClient = fakeMirrorClient
kubelet.podManager = kubepod.NewBasicPodManager() kubelet.podManager = kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, kubelet.getRootDir()) kubelet.statusManager = status.NewManager(fakeKubeClient, kubelet.podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker)
kubelet.allocationManager = allocation.NewInMemoryManager()
kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker() kubelet.nodeStartupLatencyTracker = kubeletutil.NewNodeStartupLatencyTracker()
kubelet.containerRuntime = fakeRuntime kubelet.containerRuntime = fakeRuntime
@ -2566,14 +2568,14 @@ func TestPodResourceAllocationReset(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
if tc.existingPodAllocation != nil { if tc.existingPodAllocation != nil {
// when kubelet restarts, AllocatedResources has already existed before adding pod // when kubelet restarts, AllocatedResources has already existed before adding pod
err := kubelet.statusManager.SetPodAllocation(tc.existingPodAllocation) err := kubelet.allocationManager.SetPodAllocation(tc.existingPodAllocation)
if err != nil { if err != nil {
t.Fatalf("failed to set pod allocation: %v", err) t.Fatalf("failed to set pod allocation: %v", err)
} }
} }
kubelet.HandlePodAdditions([]*v1.Pod{tc.pod}) kubelet.HandlePodAdditions([]*v1.Pod{tc.pod})
allocatedResources, found := kubelet.statusManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name) allocatedResources, found := kubelet.allocationManager.GetContainerResourceAllocation(string(tc.pod.UID), tc.pod.Spec.Containers[0].Name)
if !found { if !found {
t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name) t.Fatalf("resource allocation should exist: (pod: %#v, container: %s)", tc.pod, tc.pod.Spec.Containers[0].Name)
} }
@ -2903,9 +2905,9 @@ func TestHandlePodResourcesResize(t *testing.T) {
} }
if !tt.newResourcesAllocated { if !tt.newResourcesAllocated {
require.NoError(t, kubelet.statusManager.SetPodAllocation(originalPod)) require.NoError(t, kubelet.allocationManager.SetPodAllocation(originalPod))
} else { } else {
require.NoError(t, kubelet.statusManager.SetPodAllocation(newPod)) require.NoError(t, kubelet.allocationManager.SetPodAllocation(newPod))
} }
podStatus := &kubecontainer.PodStatus{ podStatus := &kubecontainer.PodStatus{
@ -2951,7 +2953,7 @@ func TestHandlePodResourcesResize(t *testing.T) {
assert.Equal(t, tt.expectedAllocatedReqs, updatedPodCtr.Resources.Requests, "updated pod spec requests") assert.Equal(t, tt.expectedAllocatedReqs, updatedPodCtr.Resources.Requests, "updated pod spec requests")
assert.Equal(t, tt.expectedAllocatedLims, updatedPodCtr.Resources.Limits, "updated pod spec limits") assert.Equal(t, tt.expectedAllocatedLims, updatedPodCtr.Resources.Limits, "updated pod spec limits")
alloc, found := kubelet.statusManager.GetContainerResourceAllocation(string(newPod.UID), updatedPodCtr.Name) alloc, found := kubelet.allocationManager.GetContainerResourceAllocation(string(newPod.UID), updatedPodCtr.Name)
require.True(t, found, "container allocation") require.True(t, found, "container allocation")
assert.Equal(t, tt.expectedAllocatedReqs, alloc.Requests, "stored container request allocation") assert.Equal(t, tt.expectedAllocatedReqs, alloc.Requests, "stored container request allocation")
assert.Equal(t, tt.expectedAllocatedLims, alloc.Limits, "stored container limit allocation") assert.Equal(t, tt.expectedAllocatedLims, alloc.Limits, "stored container limit allocation")

View File

@ -17,7 +17,6 @@ limitations under the License.
package prober package prober
import ( import (
"os"
"reflect" "reflect"
"sync" "sync"
@ -114,14 +113,8 @@ func newTestManager() *manager {
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
// Add test pod to pod manager, so that status manager can get the pod from pod manager if needed. // Add test pod to pod manager, so that status manager can get the pod from pod manager if needed.
podManager.AddPod(getTestPod()) podManager.AddPod(getTestPod())
testRootDir := ""
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
return nil
} else {
testRootDir = tempDir
}
m := NewManager( m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir), status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
results.NewManager(), results.NewManager(),
results.NewManager(), results.NewManager(),
results.NewManager(), results.NewManager(),

View File

@ -21,7 +21,6 @@ import (
"fmt" "fmt"
"net" "net"
"net/http" "net/http"
"os"
"sync" "sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -81,16 +80,10 @@ func TestTCPPortExhaustion(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
testRootDir := ""
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err)
} else {
testRootDir = tempDir
}
podManager := kubepod.NewBasicPodManager() podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker() podStartupLatencyTracker := kubeletutil.NewPodStartupLatencyTracker()
m := NewManager( m := NewManager(
status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir), status.NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker),
results.NewManager(), results.NewManager(),
results.NewManager(), results.NewManager(),
results.NewManager(), results.NewManager(),

View File

@ -19,7 +19,6 @@ package prober
import ( import (
"context" "context"
"fmt" "fmt"
"os"
"testing" "testing"
"time" "time"
@ -152,14 +151,7 @@ func TestDoProbe(t *testing.T) {
t.Errorf("[%s-%d] Expected result: %v but got %v", probeType, i, test.expectedResult, result) t.Errorf("[%s-%d] Expected result: %v but got %v", probeType, i, test.expectedResult, result)
} }
// Clean up. m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker())
testRootDir := ""
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
t.Fatalf("can't make a temp rootdir: %v", err)
} else {
testRootDir = tempDir
}
m.statusManager = status.NewManager(&fake.Clientset{}, kubepod.NewBasicPodManager(), &statustest.FakePodDeletionSafetyProvider{}, kubeletutil.NewPodStartupLatencyTracker(), testRootDir)
resultsManager(m, probeType).Remove(testContainerID) resultsManager(m, probeType).Remove(testContainerID)
} }
} }

View File

@ -20,13 +20,11 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/kubelet/allocation"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
) )
type fakeManager struct { type fakeManager struct {
podResizeStatuses map[types.UID]v1.PodResizeStatus podResizeStatuses map[types.UID]v1.PodResizeStatus
allocation.Manager
} }
func (m *fakeManager) Start() { func (m *fakeManager) Start() {
@ -75,7 +73,6 @@ func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodRe
// NewFakeManager creates empty/fake memory manager // NewFakeManager creates empty/fake memory manager
func NewFakeManager() Manager { func NewFakeManager() Manager {
return &fakeManager{ return &fakeManager{
Manager: allocation.NewInMemoryManager(),
podResizeStatuses: make(map[types.UID]v1.PodResizeStatus), podResizeStatuses: make(map[types.UID]v1.PodResizeStatus),
} }
} }

View File

@ -38,7 +38,6 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod" podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/allocation"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -79,8 +78,6 @@ type manager struct {
podDeletionSafety PodDeletionSafetyProvider podDeletionSafety PodDeletionSafetyProvider
podStartupLatencyHelper PodStartupLatencyStateHelper podStartupLatencyHelper PodStartupLatencyStateHelper
allocation.Manager
} }
// PodManager is the subset of methods the manager needs to observe the actual state of the kubelet. // PodManager is the subset of methods the manager needs to observe the actual state of the kubelet.
@ -144,28 +141,12 @@ type Manager interface {
// SetPodResizeStatus caches the last resizing decision for the pod. // SetPodResizeStatus caches the last resizing decision for the pod.
SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus) SetPodResizeStatus(podUID types.UID, resize v1.PodResizeStatus)
allocationManager
}
// TODO(tallclair): Refactor allocation state handling out of the status manager.
type allocationManager interface {
// GetContainerResourceAllocation returns the checkpointed AllocatedResources value for the container
GetContainerResourceAllocation(podUID string, containerName string) (v1.ResourceRequirements, bool)
// UpdatePodFromAllocation overwrites the pod spec with the allocation.
// This function does a deep copy only if updates are needed.
// Returns the updated (or original) pod, and whether there was an allocation stored.
UpdatePodFromAllocation(pod *v1.Pod) (*v1.Pod, bool)
// SetPodAllocation checkpoints the resources allocated to a pod's containers.
SetPodAllocation(pod *v1.Pod) error
} }
const syncPeriod = 10 * time.Second const syncPeriod = 10 * time.Second
// NewManager returns a functional Manager. // NewManager returns a functional Manager.
func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper, stateFileDirectory string) Manager { func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeletionSafety PodDeletionSafetyProvider, podStartupLatencyHelper PodStartupLatencyStateHelper) Manager {
return &manager{ return &manager{
kubeClient: kubeClient, kubeClient: kubeClient,
podManager: podManager, podManager: podManager,
@ -175,7 +156,6 @@ func NewManager(kubeClient clientset.Interface, podManager PodManager, podDeleti
apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64), apiStatusVersions: make(map[kubetypes.MirrorPodUID]uint64),
podDeletionSafety: podDeletionSafety, podDeletionSafety: podDeletionSafety,
podStartupLatencyHelper: podStartupLatencyHelper, podStartupLatencyHelper: podStartupLatencyHelper,
Manager: allocation.NewManager(stateFileDirectory),
} }
} }
@ -709,7 +689,6 @@ func (m *manager) deletePodStatus(uid types.UID) {
m.podStartupLatencyHelper.DeletePodStartupState(uid) m.podStartupLatencyHelper.DeletePodStartupState(uid)
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
delete(m.podResizeStatuses, uid) delete(m.podResizeStatuses, uid)
m.Manager.DeletePodAllocation(uid)
} }
} }
@ -726,9 +705,6 @@ func (m *manager) RemoveOrphanedStatuses(podUIDs map[types.UID]bool) {
} }
} }
} }
if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
m.Manager.RemoveOrphanedPods(podUIDs)
}
} }
// syncBatch syncs pods statuses with the apiserver. Returns the number of syncs // syncBatch syncs pods statuses with the apiserver. Returns the number of syncs

View File

@ -19,7 +19,6 @@ package status
import ( import (
"fmt" "fmt"
"math/rand" "math/rand"
"os"
"reflect" "reflect"
"strconv" "strconv"
"strings" "strings"
@ -90,13 +89,7 @@ func newTestManager(kubeClient clientset.Interface) *manager {
podManager := kubepod.NewBasicPodManager() podManager := kubepod.NewBasicPodManager()
podManager.(mutablePodManager).AddPod(getTestPod()) podManager.(mutablePodManager).AddPod(getTestPod())
podStartupLatencyTracker := util.NewPodStartupLatencyTracker() podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
testRootDir := "" return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
if tempDir, err := os.MkdirTemp("", "kubelet_test."); err != nil {
return nil
} else {
testRootDir = tempDir
}
return NewManager(kubeClient, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, testRootDir).(*manager)
} }
func generateRandomMessage() string { func generateRandomMessage() string {
@ -1086,7 +1079,7 @@ func TestTerminatePod_DefaultUnknownStatus(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager() podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker() podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager) syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
original := tc.pod.DeepCopy() original := tc.pod.DeepCopy()
syncer.SetPodStatus(original, original.Status) syncer.SetPodStatus(original, original.Status)
@ -1172,7 +1165,7 @@ func TestTerminatePod_EnsurePodPhaseIsTerminal(t *testing.T) {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
podManager := kubepod.NewBasicPodManager() podManager := kubepod.NewBasicPodManager()
podStartupLatencyTracker := util.NewPodStartupLatencyTracker() podStartupLatencyTracker := util.NewPodStartupLatencyTracker()
syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker, "").(*manager) syncer := NewManager(&fake.Clientset{}, podManager, &statustest.FakePodDeletionSafetyProvider{}, podStartupLatencyTracker).(*manager)
pod := getTestPod() pod := getTestPod()
pod.Status = tc.status pod.Status = tc.status