Merge pull request #126620 from yunwang0911/master

[InPlacePodVerticalScaling] fix restore checkpoint bug: failed to verify pod status checkpoint checksum because of different behaviors of func Quantity.Marshal and Quantity.Unmarshal
This commit is contained in:
Kubernetes Prow Robot 2024-11-02 00:33:27 +00:00 committed by GitHub
commit 2c4a863bf9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 231 additions and 43 deletions

View File

@ -18,48 +18,64 @@ package state
import (
"encoding/json"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
var _ checkpointmanager.Checkpoint = &PodResourceAllocationCheckpoint{}
var _ checkpointmanager.Checkpoint = &Checkpoint{}
// PodResourceAllocationCheckpoint is used to store resources allocated to a pod in checkpoint
type PodResourceAllocationCheckpoint struct {
type PodResourceAllocationInfo struct {
AllocationEntries map[string]map[string]v1.ResourceRequirements `json:"allocationEntries,omitempty"`
ResizeStatusEntries map[string]v1.PodResizeStatus `json:"resizeStatusEntries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}
// NewPodResourceAllocationCheckpoint returns an instance of Checkpoint
func NewPodResourceAllocationCheckpoint() *PodResourceAllocationCheckpoint {
//lint:ignore unexported-type-in-api user-facing error message
return &PodResourceAllocationCheckpoint{
AllocationEntries: make(map[string]map[string]v1.ResourceRequirements),
ResizeStatusEntries: make(map[string]v1.PodResizeStatus),
// Checkpoint represents a structure to store pod resource allocation checkpoint data
type Checkpoint struct {
// Data is a serialized PodResourceAllocationInfo
Data string `json:"data"`
// Checksum is a checksum of Data
Checksum checksum.Checksum `json:"checksum"`
}
// NewCheckpoint creates a new checkpoint from a list of claim info states
func NewCheckpoint(allocations *PodResourceAllocationInfo) (*Checkpoint, error) {
serializedAllocations, err := json.Marshal(allocations)
if err != nil {
return nil, fmt.Errorf("failed to serialize allocations for checkpointing: %w", err)
}
cp := &Checkpoint{
Data: string(serializedAllocations),
}
cp.Checksum = checksum.New(cp.Data)
return cp, nil
}
// MarshalCheckpoint returns marshalled checkpoint
func (prc *PodResourceAllocationCheckpoint) MarshalCheckpoint() ([]byte, error) {
// make sure checksum wasn't set before so it doesn't affect output checksum
prc.Checksum = 0
prc.Checksum = checksum.New(prc)
return json.Marshal(*prc)
func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
return json.Marshal(cp)
}
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
func (prc *PodResourceAllocationCheckpoint) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, prc)
// UnmarshalCheckpoint unmarshals checkpoint from JSON
func (cp *Checkpoint) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, cp)
}
// VerifyChecksum verifies that current checksum of checkpoint is valid
func (prc *PodResourceAllocationCheckpoint) VerifyChecksum() error {
ck := prc.Checksum
prc.Checksum = 0
err := ck.Verify(prc)
prc.Checksum = ck
return err
// VerifyChecksum verifies that current checksum
// of checkpointed Data is valid
func (cp *Checkpoint) VerifyChecksum() error {
return cp.Checksum.Verify(cp.Data)
}
// GetPodResourceAllocationInfo returns Pod Resource Allocation info states from checkpoint
func (cp *Checkpoint) GetPodResourceAllocationInfo() (*PodResourceAllocationInfo, error) {
var data PodResourceAllocationInfo
if err := json.Unmarshal([]byte(cp.Data), &data); err != nil {
return nil, err
}
return &data, nil
}

View File

@ -61,7 +61,10 @@ func (sc *stateCheckpoint) restoreState() error {
defer sc.mux.Unlock()
var err error
checkpoint := NewPodResourceAllocationCheckpoint()
checkpoint, err := NewCheckpoint(nil)
if err != nil {
return fmt.Errorf("failed to create new checkpoint: %w", err)
}
if err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint); err != nil {
if err == errors.ErrCheckpointNotFound {
@ -69,32 +72,35 @@ func (sc *stateCheckpoint) restoreState() error {
}
return err
}
sc.cache.SetPodResourceAllocation(checkpoint.AllocationEntries)
sc.cache.SetResizeStatus(checkpoint.ResizeStatusEntries)
praInfo, err := checkpoint.GetPodResourceAllocationInfo()
if err != nil {
return fmt.Errorf("failed to get pod resource allocation info: %w", err)
}
err = sc.cache.SetPodResourceAllocation(praInfo.AllocationEntries)
if err != nil {
return fmt.Errorf("failed to set pod resource allocation: %w", err)
}
err = sc.cache.SetResizeStatus(praInfo.ResizeStatusEntries)
if err != nil {
return fmt.Errorf("failed to set resize status: %w", err)
}
klog.V(2).InfoS("State checkpoint: restored pod resource allocation state from checkpoint")
return nil
}
// saves state to a checkpoint, caller is responsible for locking
func (sc *stateCheckpoint) storeState() error {
checkpoint := NewPodResourceAllocationCheckpoint()
podAllocation := sc.cache.GetPodResourceAllocation()
for pod := range podAllocation {
checkpoint.AllocationEntries[pod] = make(map[string]v1.ResourceRequirements)
for container, alloc := range podAllocation[pod] {
checkpoint.AllocationEntries[pod][container] = alloc
}
}
podResizeStatus := sc.cache.GetResizeStatus()
checkpoint.ResizeStatusEntries = make(map[string]v1.PodResizeStatus)
for pUID, rStatus := range podResizeStatus {
checkpoint.ResizeStatusEntries[pUID] = rStatus
checkpoint, err := NewCheckpoint(&PodResourceAllocationInfo{
AllocationEntries: podAllocation,
ResizeStatusEntries: podResizeStatus,
})
if err != nil {
return fmt.Errorf("failed to create checkpoint: %w", err)
}
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil {
klog.ErrorS(err, "Failed to save pod allocation checkpoint")
return err

View File

@ -0,0 +1,166 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"fmt"
"os"
"testing"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
)
const testCheckpoint = "pod_status_manager_state"
func newTestStateCheckpoint(t *testing.T) *stateCheckpoint {
testingDir := getTestDir(t)
cache := NewStateMemory()
checkpointManager, err := checkpointmanager.NewCheckpointManager(testingDir)
require.NoError(t, err, "failed to create checkpoint manager")
checkpointName := "pod_state_checkpoint"
sc := &stateCheckpoint{
cache: cache,
checkpointManager: checkpointManager,
checkpointName: checkpointName,
}
return sc
}
func getTestDir(t *testing.T) string {
testingDir, err := os.MkdirTemp("", "pod_resource_allocation_state_test")
require.NoError(t, err, "failed to create temp dir")
t.Cleanup(func() {
if err := os.RemoveAll(testingDir); err != nil {
t.Fatal(err)
}
})
return testingDir
}
func verifyPodResourceAllocation(t *testing.T, expected, actual *PodResourceAllocation, msgAndArgs string) {
for podUID, containerResourceList := range *expected {
require.Equal(t, len(containerResourceList), len((*actual)[podUID]), msgAndArgs)
for containerName, resourceList := range containerResourceList {
for name, quantity := range resourceList.Requests {
require.True(t, quantity.Equal((*actual)[podUID][containerName].Requests[name]), msgAndArgs)
}
}
}
}
func Test_stateCheckpoint_storeState(t *testing.T) {
type args struct {
podResourceAllocation PodResourceAllocation
}
tests := []struct {
name string
args args
}{}
suffix := []string{"Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "n", "u", "m", "k", "M", "G", "T", "P", "E", ""}
factor := []string{"1", "0.1", "0.03", "10", "100", "512", "1000", "1024", "700", "10000"}
for _, fact := range factor {
for _, suf := range suffix {
if (suf == "E" || suf == "Ei") && (fact == "1000" || fact == "10000") {
// when fact is 1000 or 10000, suffix "E" or "Ei", the quantity value is overflow
// see detail https://github.com/kubernetes/apimachinery/blob/95b78024e3feada7739b40426690b4f287933fd8/pkg/api/resource/quantity.go#L301
continue
}
tests = append(tests, struct {
name string
args args
}{
name: fmt.Sprintf("resource - %s%s", fact, suf),
args: args{
podResourceAllocation: PodResourceAllocation{
"pod1": {
"container1": {
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
v1.ResourceMemory: resource.MustParse(fmt.Sprintf("%s%s", fact, suf)),
},
},
},
},
},
})
}
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
testDir := getTestDir(t)
originalSC, err := NewStateCheckpoint(testDir, testCheckpoint)
require.NoError(t, err)
err = originalSC.SetPodResourceAllocation(tt.args.podResourceAllocation)
require.NoError(t, err)
actual := originalSC.GetPodResourceAllocation()
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "stored pod resource allocation is not equal to original pod resource allocation")
newSC, err := NewStateCheckpoint(testDir, testCheckpoint)
require.NoError(t, err)
actual = newSC.GetPodResourceAllocation()
verifyPodResourceAllocation(t, &tt.args.podResourceAllocation, &actual, "restored pod resource allocation is not equal to original pod resource allocation")
})
}
}
func Test_stateCheckpoint_formatUpgraded(t *testing.T) {
// Based on the PodResourceAllocationInfo struct, it's mostly possible that new field will be added
// in struct PodResourceAllocationInfo, rather than in struct PodResourceAllocationInfo.AllocationEntries.
// Emulate upgrade scenario by pretending that `ResizeStatusEntries` is a new field.
// The checkpoint content doesn't have it and that shouldn't prevent the checkpoint from being loaded.
sc := newTestStateCheckpoint(t)
// prepare old checkpoint, ResizeStatusEntries is unset,
// pretend that the old checkpoint is unaware for the field ResizeStatusEntries
const checkpointContent = `{"data":"{\"allocationEntries\":{\"pod1\":{\"container1\":{\"requests\":{\"cpu\":\"1Ki\",\"memory\":\"1Ki\"}}}}}","checksum":1555601526}`
expectedPodResourceAllocationInfo := &PodResourceAllocationInfo{
AllocationEntries: map[string]map[string]v1.ResourceRequirements{
"pod1": {
"container1": {
Requests: v1.ResourceList{
v1.ResourceCPU: resource.MustParse("1Ki"),
v1.ResourceMemory: resource.MustParse("1Ki"),
},
},
},
},
ResizeStatusEntries: map[string]v1.PodResizeStatus{},
}
checkpoint := &Checkpoint{}
err := checkpoint.UnmarshalCheckpoint([]byte(checkpointContent))
require.NoError(t, err, "failed to unmarshal checkpoint")
err = sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
require.NoError(t, err, "failed to create old checkpoint")
err = sc.restoreState()
require.NoError(t, err, "failed to restore state")
actualPodResourceAllocationInfo := &PodResourceAllocationInfo{}
actualPodResourceAllocationInfo.AllocationEntries = sc.cache.GetPodResourceAllocation()
actualPodResourceAllocationInfo.ResizeStatusEntries = sc.cache.GetResizeStatus()
require.NoError(t, err, "failed to get pod resource allocation info")
require.Equal(t, expectedPodResourceAllocationInfo, actualPodResourceAllocationInfo, "pod resource allocation info is not equal")
}