Merge pull request #129504 from felipeagger/cleanup/ckpt-pod-alloc-state-mem

Cleanup Kubelet Checkpointing: Refactor state mem
This commit is contained in:
Kubernetes Prow Robot 2025-01-30 13:05:33 -08:00 committed by GitHub
commit 0e9ca10eeb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 43 additions and 58 deletions

View File

@ -93,6 +93,6 @@ func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodRe
// NewFakeManager creates empty/fake memory manager // NewFakeManager creates empty/fake memory manager
func NewFakeManager() Manager { func NewFakeManager() Manager {
return &fakeManager{ return &fakeManager{
state: state.NewStateMemory(), state: state.NewStateMemory(state.PodResourceAllocation{}),
} }
} }

View File

@ -47,7 +47,6 @@ type Reader interface {
type writer interface { type writer interface {
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
SetPodResourceAllocation(PodResourceAllocation) error
SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus)
Delete(podUID string, containerName string) error Delete(podUID string, containerName string) error
ClearState() error ClearState() error

View File

@ -42,46 +42,42 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err) return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err)
} }
stateCheckpoint := &stateCheckpoint{
cache: NewStateMemory(), praInfo, err := restoreState(checkpointManager, checkpointName)
checkpointManager: checkpointManager, if err != nil {
checkpointName: checkpointName, //lint:ignore ST1005 user-facing error message
return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet",
err, path.Join(stateDir, checkpointName))
} }
if err := stateCheckpoint.restoreState(); err != nil { stateCheckpoint := &stateCheckpoint{
//lint:ignore ST1005 user-facing error message cache: NewStateMemory(praInfo.AllocationEntries),
return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", err, path.Join(stateDir, checkpointName)) checkpointManager: checkpointManager,
checkpointName: checkpointName,
} }
return stateCheckpoint, nil return stateCheckpoint, nil
} }
// restores state from a checkpoint and creates it if it doesn't exist // restores state from a checkpoint and creates it if it doesn't exist
func (sc *stateCheckpoint) restoreState() error { func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (*PodResourceAllocationInfo, error) {
sc.mux.Lock()
defer sc.mux.Unlock()
var err error var err error
checkpoint := &Checkpoint{}
checkpoint, err := NewCheckpoint(nil) if err = checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil {
if err != nil {
return fmt.Errorf("failed to create new checkpoint: %w", err)
}
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound { if err == errors.ErrCheckpointNotFound {
return sc.storeState() return &PodResourceAllocationInfo{
AllocationEntries: make(map[string]map[string]v1.ResourceRequirements),
}, nil
} }
return err return nil, err
}
praInfo, err := checkpoint.GetPodResourceAllocationInfo()
if err != nil {
return fmt.Errorf("failed to get pod resource allocation info: %w", err)
}
err = sc.cache.SetPodResourceAllocation(praInfo.AllocationEntries)
if err != nil {
return fmt.Errorf("failed to set pod resource allocation: %w", err)
} }
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
return nil praInfo, err := checkpoint.GetPodResourceAllocationInfo()
if err != nil {
return nil, fmt.Errorf("failed to get pod resource allocation info: %w", err)
}
return praInfo, nil
} }
// saves state to a checkpoint, caller is responsible for locking // saves state to a checkpoint, caller is responsible for locking
@ -131,14 +127,6 @@ func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, contain
return sc.storeState() return sc.storeState()
} }
// SetPodResourceAllocation sets pod resource allocation
func (sc *stateCheckpoint) SetPodResourceAllocation(a PodResourceAllocation) error {
sc.mux.Lock()
defer sc.mux.Unlock()
sc.cache.SetPodResourceAllocation(a)
return sc.storeState()
}
// SetPodResizeStatus sets the last resize decision for a pod // SetPodResizeStatus sets the last resize decision for a pod
func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) { func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) {
sc.mux.Lock() sc.mux.Lock()
@ -185,10 +173,6 @@ func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string
return nil return nil
} }
func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ PodResourceAllocation) error {
return nil
}
func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) {} func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) {}
func (sc *noopStateCheckpoint) Delete(_ string, _ string) error { func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {

View File

@ -32,7 +32,7 @@ const testCheckpoint = "pod_status_manager_state"
func newTestStateCheckpoint(t *testing.T) *stateCheckpoint { func newTestStateCheckpoint(t *testing.T) *stateCheckpoint {
testingDir := getTestDir(t) testingDir := getTestDir(t)
cache := NewStateMemory() cache := NewStateMemory(PodResourceAllocation{})
checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir) checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir)
require.NoError(t, err, "failed to create checkpoint manager") require.NoError(t, err, "failed to create checkpoint manager")
checkpointName := "pod_state_checkpoint" checkpointName := "pod_state_checkpoint"
@ -110,8 +110,12 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
originalSC, err := NewStateCheckpoint(testDir, testCheckpoint) originalSC, err := NewStateCheckpoint(testDir, testCheckpoint)
require.NoError(t, err) require.NoError(t, err)
err = originalSC.SetPodResourceAllocation(tt.args.podResourceAllocation) for podUID, containerAlloc := range tt.args.podResourceAllocation {
for containerName, alloc := range containerAlloc {
err = originalSC.SetContainerResourceAllocation(podUID, containerName, alloc)
require.NoError(t, err) require.NoError(t, err)
}
}
actual := originalSC.GetPodResourceAllocation() actual := originalSC.GetPodResourceAllocation()
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "stored pod resource allocation is not equal to original pod resource allocation") verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "stored pod resource allocation is not equal to original pod resource allocation")
@ -154,11 +158,15 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
require.NoError(t, err, "failed to create old checkpoint") require.NoError(t, err, "failed to create old checkpoint")
err = sc.restoreState() actualPodResourceAllocationInfo, err := restoreState(sc.checkpointManager, sc.checkpointName)
require.NoError(t, err, "failed to restore state") require.NoError(t, err, "failed to restore state")
actualPodResourceAllocationInfo := &PodResourceAllocationInfo{} require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal")
sc.cache = NewStateMemory(actualPodResourceAllocationInfo.AllocationEntries)
actualPodResourceAllocationInfo = &PodResourceAllocationInfo{}
actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation() actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation()
require.NoError(t, err, "failed to get pod resource allocation info")
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal") require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal")
} }

View File

@ -32,10 +32,13 @@ type stateMemory struct {
var _ State = &stateMemory{} var _ State = &stateMemory{}
// NewStateMemory creates new State to track resources allocated to pods // NewStateMemory creates new State to track resources allocated to pods
func NewStateMemory() State { func NewStateMemory(alloc PodResourceAllocation) State {
if alloc == nil {
alloc = PodResourceAllocation{}
}
klog.V(2).InfoS("Initialized new in-memory state store for pod resource allocation tracking") klog.V(2).InfoS("Initialized new in-memory state store for pod resource allocation tracking")
return &stateMemory{ return &stateMemory{
podAllocation: PodResourceAllocation{}, podAllocation: alloc,
podResizeStatus: PodResizeStatus{}, podResizeStatus: PodResizeStatus{},
} }
} }
@ -74,15 +77,6 @@ func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerNam
return nil return nil
} }
func (s *stateMemory) SetPodResourceAllocation(a PodResourceAllocation) error {
s.Lock()
defer s.Unlock()
s.podAllocation = a.Clone()
klog.V(3).InfoS("Updated pod resource allocation", "allocation", a)
return nil
}
func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) { func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()