mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-05 10:19:50 +00:00
fix state mem constructor and adjust restoreState
This commit is contained in:
parent
efdd6bea2e
commit
fd35f652d4
@ -2922,7 +2922,7 @@ func (kl *Kubelet) handlePodResourcesResize(pod *v1.Pod, podStatus *kubecontaine
|
|||||||
if fit {
|
if fit {
|
||||||
// Update pod resource allocation checkpoint
|
// Update pod resource allocation checkpoint
|
||||||
if err := kl.statusManager.SetPodAllocation(pod); err != nil {
|
if err := kl.statusManager.SetPodAllocation(pod); err != nil {
|
||||||
klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
|
return nil, err
|
||||||
}
|
}
|
||||||
for i, container := range pod.Spec.Containers {
|
for i, container := range pod.Spec.Containers {
|
||||||
if !apiequality.Semantic.DeepEqual(container.Resources, allocatedPod.Spec.Containers[i].Resources) {
|
if !apiequality.Semantic.DeepEqual(container.Resources, allocatedPod.Spec.Containers[i].Resources) {
|
||||||
|
@ -93,6 +93,6 @@ func (m *fakeManager) SetPodResizeStatus(podUID types.UID, resizeStatus v1.PodRe
|
|||||||
// NewFakeManager creates empty/fake memory manager
|
// NewFakeManager creates empty/fake memory manager
|
||||||
func NewFakeManager() Manager {
|
func NewFakeManager() Manager {
|
||||||
return &fakeManager{
|
return &fakeManager{
|
||||||
state: state.NewStateMemory(state.PodResourceAllocation{}, state.PodResizeStatus{}),
|
state: state.NewStateMemory(state.PodResourceAllocation{}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -47,7 +47,6 @@ type Reader interface {
|
|||||||
|
|
||||||
type writer interface {
|
type writer interface {
|
||||||
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
|
SetContainerResourceAllocation(podUID string, containerName string, alloc v1.ResourceRequirements) error
|
||||||
SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error
|
|
||||||
SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus)
|
SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus)
|
||||||
Delete(podUID string, containerName string) error
|
Delete(podUID string, containerName string) error
|
||||||
ClearState() error
|
ClearState() error
|
||||||
|
@ -42,50 +42,43 @@ func NewStateCheckpoint(stateDir, checkpointName string) (State, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err)
|
return nil, fmt.Errorf("failed to initialize checkpoint manager for pod allocation tracking: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
praInfo, err := restoreState(checkpointManager, checkpointName)
|
||||||
|
if err != nil {
|
||||||
|
//lint:ignore ST1005 user-facing error message
|
||||||
|
return nil, fmt.Errorf("could not restore state from checkpoint: %w, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet",
|
||||||
|
err, path.Join(stateDir, checkpointName))
|
||||||
|
}
|
||||||
|
|
||||||
stateCheckpoint := &stateCheckpoint{
|
stateCheckpoint := &stateCheckpoint{
|
||||||
cache: NewStateMemory(PodResourceAllocation{}, PodResizeStatus{}),
|
cache: NewStateMemory(praInfo.AllocationEntries),
|
||||||
checkpointManager: checkpointManager,
|
checkpointManager: checkpointManager,
|
||||||
checkpointName: checkpointName,
|
checkpointName: checkpointName,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := stateCheckpoint.restoreState(); err != nil {
|
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
|
||||||
//lint:ignore ST1005 user-facing error message
|
|
||||||
return nil, fmt.Errorf("could not restore state from checkpoint: %v, please drain this node and delete pod allocation checkpoint file %q before restarting Kubelet", err, path.Join(stateDir, checkpointName))
|
|
||||||
}
|
|
||||||
return stateCheckpoint, nil
|
return stateCheckpoint, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// restores state from a checkpoint and creates it if it doesn't exist
|
// restores state from a checkpoint and creates it if it doesn't exist
|
||||||
func (sc *stateCheckpoint) restoreState() error {
|
func restoreState(checkpointManager checkpointmanager.CheckpointManager, checkpointName string) (*PodResourceAllocationInfo, error) {
|
||||||
sc.mux.Lock()
|
|
||||||
defer sc.mux.Unlock()
|
|
||||||
var err error
|
var err error
|
||||||
|
checkpoint := &Checkpoint{}
|
||||||
|
|
||||||
checkpoint, err := NewCheckpoint(nil)
|
if err = checkpointManager.GetCheckpoint(checkpointName, checkpoint); err != nil {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create new checkpoint: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
|
|
||||||
if err == errors.ErrCheckpointNotFound {
|
if err == errors.ErrCheckpointNotFound {
|
||||||
return sc.storeState()
|
return &PodResourceAllocationInfo{
|
||||||
|
AllocationEntries: make(map[string]map[string]v1.ResourceRequirements),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
praInfo, err := checkpoint.GetPodResourceAllocationInfo()
|
praInfo, err := checkpoint.GetPodResourceAllocationInfo()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get pod resource allocation info: %w", err)
|
return nil, fmt.Errorf("failed to get pod resource allocation info: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for podUID, alloc := range praInfo.AllocationEntries {
|
return praInfo, nil
|
||||||
err = sc.cache.SetPodResourceAllocation(podUID, alloc)
|
|
||||||
if err != nil {
|
|
||||||
klog.ErrorS(err, "failed to set pod resource allocation")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// saves state to a checkpoint, caller is responsible for locking
|
// saves state to a checkpoint, caller is responsible for locking
|
||||||
@ -135,19 +128,6 @@ func (sc *stateCheckpoint) SetContainerResourceAllocation(podUID string, contain
|
|||||||
return sc.storeState()
|
return sc.storeState()
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetPodResourceAllocation sets pod resource allocation
|
|
||||||
func (sc *stateCheckpoint) SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error {
|
|
||||||
sc.mux.Lock()
|
|
||||||
defer sc.mux.Unlock()
|
|
||||||
|
|
||||||
err := sc.cache.SetPodResourceAllocation(podUID, alloc)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return sc.storeState()
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetPodResizeStatus sets the last resize decision for a pod
|
// SetPodResizeStatus sets the last resize decision for a pod
|
||||||
func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) {
|
func (sc *stateCheckpoint) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) {
|
||||||
sc.mux.Lock()
|
sc.mux.Lock()
|
||||||
@ -194,10 +174,6 @@ func (sc *noopStateCheckpoint) SetContainerResourceAllocation(_ string, _ string
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (sc *noopStateCheckpoint) SetPodResourceAllocation(_ string, _ map[string]v1.ResourceRequirements) error {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) {}
|
func (sc *noopStateCheckpoint) SetPodResizeStatus(_ string, _ v1.PodResizeStatus) {}
|
||||||
|
|
||||||
func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {
|
func (sc *noopStateCheckpoint) Delete(_ string, _ string) error {
|
||||||
|
@ -32,7 +32,7 @@ const testCheckpoint = "pod_status_manager_state"
|
|||||||
|
|
||||||
func newTestStateCheckpoint(t *testing.T) *stateCheckpoint {
|
func newTestStateCheckpoint(t *testing.T) *stateCheckpoint {
|
||||||
testingDir := getTestDir(t)
|
testingDir := getTestDir(t)
|
||||||
cache := NewStateMemory(PodResourceAllocation{}, PodResizeStatus{})
|
cache := NewStateMemory(PodResourceAllocation{})
|
||||||
checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir)
|
checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir)
|
||||||
require.NoError(t, err, "failed to create checkpoint manager")
|
require.NoError(t, err, "failed to create checkpoint manager")
|
||||||
checkpointName := "pod_state_checkpoint"
|
checkpointName := "pod_state_checkpoint"
|
||||||
@ -110,9 +110,11 @@ func Test_stateCheckpoint_storeState(t *testing.T) {
|
|||||||
originalSC, err := NewStateCheckpoint(testDir, testCheckpoint)
|
originalSC, err := NewStateCheckpoint(testDir, testCheckpoint)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
for podUID, alloc := range tt.args.podResourceAllocation {
|
for podUID, containerAlloc := range tt.args.podResourceAllocation {
|
||||||
err = originalSC.SetPodResourceAllocation(podUID, alloc)
|
for containerName, alloc := range containerAlloc {
|
||||||
require.NoError(t, err)
|
err = originalSC.SetContainerResourceAllocation(podUID, containerName, alloc)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
actual := originalSC.GetPodResourceAllocation()
|
actual := originalSC.GetPodResourceAllocation()
|
||||||
@ -156,11 +158,15 @@ func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
|
|||||||
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
|
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
|
||||||
require.NoError(t, err, "failed to create old checkpoint")
|
require.NoError(t, err, "failed to create old checkpoint")
|
||||||
|
|
||||||
err = sc.restoreState()
|
actualPodResourceAllocationInfo, err := restoreState(sc.checkpointManager, sc.checkpointName)
|
||||||
require.NoError(t, err, "failed to restore state")
|
require.NoError(t, err, "failed to restore state")
|
||||||
|
|
||||||
actualPodResourceAllocationInfo := &PodResourceAllocationInfo{}
|
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal")
|
||||||
|
|
||||||
|
sc.cache = NewStateMemory(actualPodResourceAllocationInfo.AllocationEntries)
|
||||||
|
|
||||||
|
actualPodResourceAllocationInfo = &PodResourceAllocationInfo{}
|
||||||
actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation()
|
actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation()
|
||||||
require.NoError(t, err, "failed to get pod resource allocation info")
|
|
||||||
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal")
|
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal")
|
||||||
}
|
}
|
||||||
|
@ -32,11 +32,14 @@ type stateMemory struct {
|
|||||||
var _ State = &stateMemory{}
|
var _ State = &stateMemory{}
|
||||||
|
|
||||||
// NewStateMemory creates new State to track resources allocated to pods
|
// NewStateMemory creates new State to track resources allocated to pods
|
||||||
func NewStateMemory(alloc PodResourceAllocation, stats PodResizeStatus) State {
|
func NewStateMemory(alloc PodResourceAllocation) State {
|
||||||
|
if alloc == nil {
|
||||||
|
alloc = PodResourceAllocation{}
|
||||||
|
}
|
||||||
klog.V(2).InfoS("Initialized new in-memory state store for pod resource allocation tracking")
|
klog.V(2).InfoS("Initialized new in-memory state store for pod resource allocation tracking")
|
||||||
return &stateMemory{
|
return &stateMemory{
|
||||||
podAllocation: alloc,
|
podAllocation: alloc,
|
||||||
podResizeStatus: stats,
|
podResizeStatus: PodResizeStatus{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -74,18 +77,6 @@ func (s *stateMemory) SetContainerResourceAllocation(podUID string, containerNam
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *stateMemory) SetPodResourceAllocation(podUID string, alloc map[string]v1.ResourceRequirements) error {
|
|
||||||
s.Lock()
|
|
||||||
defer s.Unlock()
|
|
||||||
|
|
||||||
for containerName, containerAlloc := range alloc {
|
|
||||||
s.podAllocation[podUID][containerName] = containerAlloc
|
|
||||||
}
|
|
||||||
|
|
||||||
klog.V(3).InfoS("Updated pod resource allocation", "podUID", podUID, "allocation", alloc)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) {
|
func (s *stateMemory) SetPodResizeStatus(podUID string, resizeStatus v1.PodResizeStatus) {
|
||||||
s.Lock()
|
s.Lock()
|
||||||
defer s.Unlock()
|
defer s.Unlock()
|
||||||
|
@ -296,15 +296,13 @@ func (m *manager) GetPodResizeStatus(podUID types.UID) v1.PodResizeStatus {
|
|||||||
func (m *manager) SetPodAllocation(pod *v1.Pod) error {
|
func (m *manager) SetPodAllocation(pod *v1.Pod) error {
|
||||||
m.podStatusesLock.RLock()
|
m.podStatusesLock.RLock()
|
||||||
defer m.podStatusesLock.RUnlock()
|
defer m.podStatusesLock.RUnlock()
|
||||||
|
|
||||||
podAlloc := make(map[string]v1.ResourceRequirements)
|
|
||||||
|
|
||||||
for _, container := range pod.Spec.Containers {
|
for _, container := range pod.Spec.Containers {
|
||||||
alloc := *container.Resources.DeepCopy()
|
alloc := *container.Resources.DeepCopy()
|
||||||
podAlloc[container.Name] = alloc
|
if err := m.state.SetContainerResourceAllocation(string(pod.UID), container.Name, alloc); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
return m.state.SetPodResourceAllocation(string(pod.UID), podAlloc)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetPodResizeStatus checkpoints the last resizing decision for the pod.
|
// SetPodResizeStatus checkpoints the last resizing decision for the pod.
|
||||||
|
Loading…
Reference in New Issue
Block a user