fix InPlacePodVerticalScaling restore bug: the content wrote to and read from file pod_status are different

This commit is contained in:
yunwang 2024-08-10 18:11:25 +08:00 committed by yunwang0911
parent 1148e5ee5f
commit cf8cdaf5a7
3 changed files with 241 additions and 45 deletions

View File

@ -18,48 +18,62 @@ package state
import ( import (
"encoding/json" "encoding/json"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
) )
var _ checkpointmanager.Checkpoint = &PodResourceAllocationCheckpoint{} var _ checkpointmanager.Checkpoint = &Checkpoint{}
// PodResourceAllocationCheckpoint is used to store resources allocated to a pod in checkpoint type PodResourceAllocationInfo struct {
type PodResourceAllocationCheckpoint struct {
AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"` AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"`
ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"` ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
} }
// NewPodResourceAllocationCheckpoint returns an instance of Checkpoint // Checkpoint represents a structure to store pod resource allocation checkpoint data
func NewPodResourceAllocationCheckpoint() *PodResourceAllocationCheckpoint { type Checkpoint struct {
//lint:ignore unexported-type-in-api user-facing error message // Data is a JSON serialized checkpoint data
return &PodResourceAllocationCheckpoint{ Data string `json:"data"`
AllocationEntries: make(map[string]map[string]v1.ResourceRequirements), // Checksum is a checksum of Data
ResizeStatusEntries: make(map[string]v1.PodResizeStatus), Checksum checksum.Checksum `json:"checksum"`
}
// NewCheckpoint creates a new checkpoint from a list of claim info states
func NewCheckpoint(data *PodResourceAllocationInfo) (*Checkpoint, error) {
praData, err := json.Marshal(data)
if err != nil {
return nil, err
} }
cp := &Checkpoint{
Data: string(praData),
}
cp.Checksum = checksum.New(cp.Data)
return cp, nil
} }
// MarshalCheckpoint returns marshalled checkpoint func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
func (prc *PodResourceAllocationCheckpoint) MarshalCheckpoint() ([]byte, error) { return json.Marshal(cp)
// make sure checksum wasn't set before so it doesn't affect output checksum
prc.Checksum = 0
prc.Checksum = checksum.New(prc)
return json.Marshal(*prc)
} }
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint // UnmarshalCheckpoint unmarshals checkpoint from JSON
func (prc *PodResourceAllocationCheckpoint) UnmarshalCheckpoint(blob []byte) error { func (cp *Checkpoint) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, prc) return json.Unmarshal(blob, cp)
} }
// VerifyChecksum verifies that current checksum of checkpoint is valid // VerifyChecksum verifies that current checksum
func (prc *PodResourceAllocationCheckpoint) VerifyChecksum() error { // of checkpointed Data is valid
ck := prc.Checksum func (cp *Checkpoint) VerifyChecksum() error {
prc.Checksum = 0 return cp.Checksum.Verify(cp.Data)
err := ck.Verify(prc) }
prc.Checksum = ck
return err // GetPodResourceAllocationInfo returns Pod Resource Allocation info states from checkpoint
func (cp *Checkpoint) GetPodResourceAllocationInfo() (*PodResourceAllocationInfo, error) {
var data PodResourceAllocationInfo
if err := json.Unmarshal([]byte(cp.Data), &data); err != nil {
return nil, err
}
return &data, nil
} }

View File

@ -61,7 +61,10 @@ func (sc *stateCheckpoint) restoreState() error {
defer sc.mux.Unlock() defer sc.mux.Unlock()
var err error var err error
checkpoint := NewPodResourceAllocationCheckpoint() checkpoint, err := NewCheckpoint(nil)
if err != nil {
return fmt.Errorf("failed to create new checkpoint: %w", err)
}
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil { if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound { if err == errors.ErrCheckpointNotFound {
@ -69,32 +72,36 @@ func (sc *stateCheckpoint) restoreState() error {
} }
return err return err
} }
praInfo, err := checkpoint.GetPodResourceAllocationInfo()
sc.cache.SetPodResourceAllocation(checkpoint.AllocationEntries) if err != nil {
sc.cache.SetResizeStatus(checkpoint.ResizeStatusEntries) return fmt.Errorf("failed to get pod resource allocation info: %w", err)
}
err = sc.cache.SetPodResourceAllocation(praInfo.AllocationEntries)
if err != nil {
return fmt.Errorf("failed to set pod resource allocation: %w", err)
}
err = sc.cache.SetResizeStatus(praInfo.ResizeStatusEntries)
if err != nil {
return fmt.Errorf("failed to set resize status: %w", err)
}
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint") klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
return nil return nil
} }
// saves state to a checkpoint, caller is responsible for locking // saves state to a checkpoint, caller is responsible for locking
func (sc *stateCheckpoint) storeState() error { func (sc *stateCheckpoint) storeState() error {
checkpoint := NewPodResourceAllocationCheckpoint()
podAllocation := sc.cache.GetPodResourceAllocation() podAllocation := sc.cache.GetPodResourceAllocation()
for pod := range podAllocation {
checkpoint.AllocationEntries[pod] = make(map[string]v1.ResourceRequirements)
for container, alloc := range podAllocation[pod] {
checkpoint.AllocationEntries[pod][container] = alloc
}
}
podResizeStatus := sc.cache.GetResizeStatus() podResizeStatus := sc.cache.GetResizeStatus()
checkpoint.ResizeStatusEntries = make(map[string]v1.PodResizeStatus) checkpoint, err := NewCheckpoint(&PodResourceAllocationInfo{
for pUID, rStatus := range podResizeStatus { AllocationEntries: podAllocation,
checkpoint.ResizeStatusEntries[pUID] = rStatus ResizeStatusEntries: podResizeStatus,
})
if err != nil {
klog.ErrorS(err, "Failed to create checkpoint")
return err
} }
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil { if err != nil {
klog.ErrorS(err, "Failed to save pod allocation checkpoint") klog.ErrorS(err, "Failed to save pod allocation checkpoint")
return err return err

View File

@ -0,0 +1,175 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"fmt"
"os"
"testing"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
)
func newTestStateCheckpoint(t *testing.T) *stateCheckpoint {
// create temp dir
testingDir, err := os.MkdirTemp("", "pod_resource_allocation_state_test")
if err != nil {
t.Fatal(err)
}
defer func() {
if err := os.RemoveAll(testingDir); err != nil {
t.Fatal(err)
}
}()
cache := NewStateMemory()
checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir)
require.NoError(t, err, "failed to create checkpoint manager")
checkpointName := "pod_state_checkpoint"
sc := &stateCheckpoint{
cache: cache,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
}
return sc
}
func Test_stateCheckpoint_storeState(t *testing.T) {
sc := newTestStateCheckpoint(t)
type args struct {
podResourceAllocation PodResourceAllocation
}
tests := []struct {
name string
args args
}{}
suffix := []string{"Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "n", "u", "m", "k", "M", "G", "T", "P", "E", ""}
factor := []string{"1", "0.1", "0.03", "10", "100", "512", "1000", "1024", "700", "10000"}
for _, fact := range factor {
for _, suf := range suffix {
if (suf == "E" || suf == "Ei") && (fact == "1000" || fact == "10000") {
// when fact is 1000 or 10000, suffix "E" or "Ei", the quantity value is overflow
// see detail https://github.com/kubernetes/apimachinery/blob/95b78024e3feada7739b40426690b4f287933fd8/pkg/api/resource/quantity.go#L301
continue
}
tests = append(tests, struct {
name string
args args
}{
name: fmt.Sprintf("resource - %s%s", fact, suf),
args: args{
podResourceAllocation: PodResourceAllocation{
"pod1": {
"container1": {
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
},
},
},
},
},
})
}
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := sc.cache.ClearState()
require.NoError(t, err, "failed to clear state")
defer func() {
err = sc.checkpointManager.RemoveCheckpoint(sc.checkpointName)
require.NoError(t, err, "failed to remove checkpoint")
}()
err = sc.cache.SetPodResourceAllocation(tt.args.podResourceAllocation)
require.NoError(t, err, "failed to set pod resource allocation")
err = sc.storeState()
require.NoError(t, err, "failed to store state")
// deep copy cache
originCache := NewStateMemory()
podAllocation := sc.cache.GetPodResourceAllocation()
err = originCache.SetPodResourceAllocation(podAllocation)
require.NoError(t, err, "failed to set pod resource allocation")
err = sc.cache.ClearState()
require.NoError(t, err, "failed to clear state")
err = sc.restoreState()
require.NoError(t, err, "failed to restore state")
restoredCache := sc.cache
require.Equal(t, len(originCache.GetPodResourceAllocation()), len(restoredCache.GetPodResourceAllocation()), "restored pod resource allocation is not equal to original pod resource allocation")
for podUID, containerResourceList := range originCache.GetPodResourceAllocation() {
require.Equal(t, len(containerResourceList), len(restoredCache.GetPodResourceAllocation()[podUID]), "restored pod resource allocation is not equal to original pod resource allocation")
for containerName, resourceList := range containerResourceList {
for name, quantity := range resourceList.Requests {
if !quantity.Equal(restoredCache.GetPodResourceAllocation()[podUID][containerName].Requests[name]) {
t.Errorf("restored pod resource allocation is not equal to original pod resource allocation")
}
}
}
}
})
}
}
func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
// Based on the PodResourceAllocationInfo struct, it's mostly possible that new field will be added
// in struct PodResourceAllocationInfo, rather than in struct PodResourceAllocationInfo.AllocationEntries.
// Emulate upgrade scenario by pretending that `ResizeStatusEntries` is a new field.
// The checkpoint content doesn't have it and that shouldn't prevent the checkpoint from being loaded.
sc := newTestStateCheckpoint(t)
// prepare old checkpoint, ResizeStatusEntries is unset,
// pretend that the old checkpoint is unaware for the field ResizeStatusEntries
checkpointContent := `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}`
checkpoint := &Checkpoint{}
err := checkpoint.UnmarshalCheckpoint([]byte(checkpointContent))
require.NoError(t, err, "failed to unmarshal checkpoint")
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
require.NoError(t, err, "failed to create old checkpoint")
err = sc.restoreState()
require.NoError(t, err, "failed to restore state")
expectedPodResourceAllocationInfo := &PodResourceAllocationInfo{
AllocationEntries: map[string]map[string]v1.ResourceRequirements{
"pod1": {
"container1": {
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1Ki"),
v1.ResourceMemory: resource.MustParse("1Ki"),
},
},
},
},
ResizeStatusEntries: map[string]v1.PodResizeStatus{},
}
actualPodResourceAllocationInfo := &PodResourceAllocationInfo{}
actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation()
actualPodResourceAllocationInfo.ResizeStatusEntries = sc.cache.GetResizeStatus()
require.NoError(t, err, "failed to get pod resource allocation info")
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal")
}