mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
kubelet checkpoint: refactor state memory
refactor state mem constructor to accept the state as parameter and SetPodAllocation to update a single pod.
This commit is contained in:
parent
8e872978e8
commit
efdd6bea2e
@ -93,6 +93,6 @@ func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodRe
|
||||
// NewFakeManager creates empty/fake memory manager
|
||||
func NewFakeManager() Manager {
|
||||
return &fakeManager{
|
||||
state: state.NewStateMemory(),
|
||||
state: state.NewStateMemory(state.PodResourceAllocation{}, state.PodResizeStatus{}),
|
||||
}
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ type Reader interface {
|
||||
|
||||
type writer interface {
|
||||
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
|
||||
SetPodResourceAllocation(PodResourceAllocation) error
|
||||
SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error
|
||||
SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus)
|
||||
Delete(podUID string, containerName string) error
|
||||
ClearState() error
|
||||
|
@ -43,7 +43,7 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
|
||||
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err)
|
||||
}
|
||||
stateCheckpoint := &stateCheckpoint{
|
||||
cache: NewStateMemory(),
|
||||
cache: NewStateMemory(PodResourceAllocation{}, PodResizeStatus{}),
|
||||
checkpointManager: checkpointManager,
|
||||
checkpointName: checkpointName,
|
||||
}
|
||||
@ -76,10 +76,14 @@ func (sc *stateCheckpoint) restoreState() error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get pod resource allocation info: %w", err)
|
||||
}
|
||||
err = sc.cache.SetPodResourceAllocation(praInfo.AllocationEntries)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to set pod resource allocation: %w", err)
|
||||
|
||||
for podUID, alloc := range praInfo.AllocationEntries {
|
||||
err = sc.cache.SetPodResourceAllocation(podUID, alloc)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to set pod resource allocation")
|
||||
}
|
||||
}
|
||||
|
||||
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
|
||||
return nil
|
||||
}
|
||||
@ -132,10 +136,15 @@ func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, contain
|
||||
}
|
||||
|
||||
// SetPodResourceAllocation sets pod resource allocation
|
||||
func (sc *stateCheckpoint) SetPodResourceAllocation(a PodResourceAllocation) error {
|
||||
func (sc *stateCheckpoint) SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error {
|
||||
sc.mux.Lock()
|
||||
defer sc.mux.Unlock()
|
||||
sc.cache.SetPodResourceAllocation(a)
|
||||
|
||||
err := sc.cache.SetPodResourceAllocation(podUID, alloc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return sc.storeState()
|
||||
}
|
||||
|
||||
@ -185,7 +194,7 @@ func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ PodResourceAllocation) error {
|
||||
func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ string, _ map[string]v1.ResourceRequirements) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -32,7 +32,7 @@ const testCheckpoint = "pod_status_manager_state"
|
||||
|
||||
func newTestStateCheckpoint(t *testing.T) *stateCheckpoint {
|
||||
testingDir := getTestDir(t)
|
||||
cache := NewStateMemory()
|
||||
cache := NewStateMemory(PodResourceAllocation{}, PodResizeStatus{})
|
||||
checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir)
|
||||
require.NoError(t, err, "failed to create checkpoint manager")
|
||||
checkpointName := "pod_state_checkpoint"
|
||||
@ -110,8 +110,10 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
|
||||
originalSC, err := NewStateCheckpoint(testDir, testCheckpoint)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = originalSC.SetPodResourceAllocation(tt.args.podResourceAllocation)
|
||||
require.NoError(t, err)
|
||||
for podUID, alloc := range tt.args.podResourceAllocation {
|
||||
err = originalSC.SetPodResourceAllocation(podUID, alloc)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
actual := originalSC.GetPodResourceAllocation()
|
||||
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "stored pod resource allocation is not equal to original pod resource allocation")
|
||||
|
@ -32,11 +32,11 @@ type stateMemory struct {
|
||||
var _ State = &stateMemory{}
|
||||
|
||||
// NewStateMemory creates new State to track resources allocated to pods
|
||||
func NewStateMemory() State {
|
||||
func NewStateMemory(alloc PodResourceAllocation, stats PodResizeStatus) State {
|
||||
klog.V(2).InfoS("Initialized new in-memory state store for pod resource allocation tracking")
|
||||
return &stateMemory{
|
||||
podAllocation: PodResourceAllocation{},
|
||||
podResizeStatus: PodResizeStatus{},
|
||||
podAllocation: alloc,
|
||||
podResizeStatus: stats,
|
||||
}
|
||||
}
|
||||
|
||||
@ -74,12 +74,15 @@ func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerNam
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *stateMemory) SetPodResourceAllocation(a PodResourceAllocation) error {
|
||||
func (s *stateMemory) SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
|
||||
s.podAllocation = a.Clone()
|
||||
klog.V(3).InfoS("Updated pod resource allocation", "allocation", a)
|
||||
for containerName, containerAlloc := range alloc {
|
||||
s.podAllocation[podUID][containerName] = containerAlloc
|
||||
}
|
||||
|
||||
klog.V(3).InfoS("Updated pod resource allocation", "podUID", podUID, "allocation", alloc)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -297,16 +297,14 @@ func (m *manager) SetPodAllocation(pod *v1.Pod) error {
|
||||
m.podStatusesLock.RLock()
|
||||
defer m.podStatusesLock.RUnlock()
|
||||
|
||||
podUID := string(pod.UID)
|
||||
podAlloc := state.PodResourceAllocation{}
|
||||
podAlloc[podUID] = make(map[string]v1.ResourceRequirements)
|
||||
podAlloc := make(map[string]v1.ResourceRequirements)
|
||||
|
||||
for _, container := range pod.Spec.Containers {
|
||||
alloc := *container.Resources.DeepCopy()
|
||||
podAlloc[podUID][container.Name] = alloc
|
||||
podAlloc[container.Name] = alloc
|
||||
}
|
||||
|
||||
return m.state.SetPodResourceAllocation(podUID, podAlloc)
|
||||
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
|
||||
}
|
||||
|
||||
// SetPodResizeStatus checkpoints the last resizing decision for the pod.
|
||||
|
Loading…
Reference in New Issue
Block a user