Merge pull request #126303 from bart0sh/PR150-dra-refactor-checkpoint-upstream

DRA: refactor checkpointing
This commit is contained in:
Kubernetes Prow Robot 2024-07-23 18:01:53 -07:00 committed by GitHub
commit d97cf3a1eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 590 additions and 465 deletions

View File

@ -40,8 +40,8 @@ type ClaimInfo struct {
// claimInfoCache is a cache of processed resource claims keyed by namespace/claimname.
type claimInfoCache struct {
sync.RWMutex
state state.CheckpointState
claimInfo map[string]*ClaimInfo
checkpointer state.Checkpointer
claimInfo map[string]*ClaimInfo
}
// newClaimInfoFromClaim creates a new claim info from a resource claim.
@ -77,12 +77,12 @@ func newClaimInfoFromState(state *state.ClaimInfoState) *ClaimInfo {
}
// setCDIDevices adds a set of CDI devices to the claim info.
func (info *ClaimInfo) addDevice(driverName string, device state.Device) {
func (info *ClaimInfo) addDevice(driverName string, deviceState state.Device) {
if info.DriverState == nil {
info.DriverState = make(map[string]state.DriverState)
}
driverState := info.DriverState[driverName]
driverState.Devices = append(driverState.Devices, device)
driverState.Devices = append(driverState.Devices, deviceState)
info.DriverState[driverName] = driverState
}
@ -113,22 +113,27 @@ func (info *ClaimInfo) isPrepared() bool {
// newClaimInfoCache creates a new claim info cache object, pre-populated from a checkpoint (if present).
func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) {
stateImpl, err := state.NewCheckpointState(stateDir, checkpointName)
checkpointer, err := state.NewCheckpointer(stateDir, checkpointName)
if err != nil {
return nil, fmt.Errorf("could not initialize checkpoint manager, please drain node and remove dra state file, err: %+v", err)
}
curState, err := stateImpl.GetOrCreate()
checkpoint, err := checkpointer.GetOrCreate()
if err != nil {
return nil, fmt.Errorf("error calling GetOrCreate() on checkpoint state: %v", err)
}
cache := &claimInfoCache{
state: stateImpl,
claimInfo: make(map[string]*ClaimInfo),
checkpointer: checkpointer,
claimInfo: make(map[string]*ClaimInfo),
}
for _, entry := range curState {
entries, err := checkpoint.GetClaimInfoStateList()
if err != nil {
return nil, fmt.Errorf("error calling GetEntries() on checkpoint: %w", err)
}
for _, entry := range entries {
info := newClaimInfoFromState(&entry)
cache.claimInfo[info.Namespace+"/"+info.ClaimName] = info
}
@ -192,7 +197,11 @@ func (cache *claimInfoCache) syncToCheckpoint() error {
for _, infoClaim := range cache.claimInfo {
claimInfoStateList = append(claimInfoStateList, infoClaim.ClaimInfoState)
}
return cache.state.Store(claimInfoStateList)
checkpoint, err := state.NewCheckpoint(claimInfoStateList)
if err != nil {
return err
}
return cache.checkpointer.Store(checkpoint)
}
// cdiDevicesAsList returns a list of CDIDevices from the provided claim info.

View File

@ -18,51 +18,90 @@ package state
import (
"encoding/json"
"hash/crc32"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)
var _ checkpointmanager.Checkpoint = &DRAManagerCheckpoint{}
const (
CheckpointAPIGroup = "checkpoint.dra.kubelet.k8s.io"
CheckpointKind = "DRACheckpoint"
CheckpointAPIVersion = CheckpointAPIGroup + "/v1"
)
const checkpointVersion = "v1"
// DRAManagerCheckpoint struct is used to store pod dynamic resources assignments in a checkpoint
type DRAManagerCheckpoint struct {
Version string `json:"version"`
Entries ClaimInfoStateList `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
// Checkpoint represents a structure to store DRA checkpoint data
type Checkpoint struct {
// Data is a JSON serialized checkpoint data
Data string
// Checksum is a checksum of Data
Checksum uint32
}
// List of claim info to store in checkpoint
type ClaimInfoStateList []ClaimInfoState
type CheckpointData struct {
metav1.TypeMeta
ClaimInfoStateList ClaimInfoStateList
}
// NewDRAManagerCheckpoint returns an instance of Checkpoint
func NewDRAManagerCheckpoint() *DRAManagerCheckpoint {
return &DRAManagerCheckpoint{
Version: checkpointVersion,
Entries: ClaimInfoStateList{},
// NewCheckpoint creates a new checkpoint from a list of claim info states
func NewCheckpoint(data ClaimInfoStateList) (*Checkpoint, error) {
cpData := &CheckpointData{
TypeMeta: metav1.TypeMeta{
Kind: CheckpointKind,
APIVersion: CheckpointAPIVersion,
},
ClaimInfoStateList: data,
}
cpDataBytes, err := json.Marshal(cpData)
if err != nil {
return nil, err
}
cp := &Checkpoint{
Data: string(cpDataBytes),
Checksum: crc32.ChecksumIEEE(cpDataBytes),
}
return cp, nil
}
// MarshalCheckpoint returns marshalled checkpoint
func (dc *DRAManagerCheckpoint) MarshalCheckpoint() ([]byte, error) {
// make sure checksum wasn't set before so it doesn't affect output checksum
dc.Checksum = 0
dc.Checksum = checksum.New(dc)
return json.Marshal(*dc)
// MarshalCheckpoint marshals checkpoint to JSON
func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) {
return json.Marshal(cp)
}
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
func (dc *DRAManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, dc)
// UnmarshalCheckpoint unmarshals checkpoint from JSON
// and verifies its data checksum
func (cp *Checkpoint) UnmarshalCheckpoint(blob []byte) error {
if err := json.Unmarshal(blob, cp); err != nil {
return err
}
// verify checksum
if err := cp.VerifyChecksum(); err != nil {
return err
}
return nil
}
// VerifyChecksum verifies that current checksum of checkpoint is valid
func (dc *DRAManagerCheckpoint) VerifyChecksum() error {
ck := dc.Checksum
dc.Checksum = 0
err := ck.Verify(dc)
dc.Checksum = ck
return err
// VerifyChecksum verifies that current checksum
// of checkpointed Data is valid
func (cp *Checkpoint) VerifyChecksum() error {
expectedCS := crc32.ChecksumIEEE([]byte(cp.Data))
if expectedCS != cp.Checksum {
return &errors.CorruptCheckpointError{ActualCS: uint64(cp.Checksum), ExpectedCS: uint64(expectedCS)}
}
return nil
}
// GetClaimInfoStateList returns list of claim info states from checkpoint
func (cp *Checkpoint) GetClaimInfoStateList() (ClaimInfoStateList, error) {
var data CheckpointData
if err := json.Unmarshal([]byte(cp.Data), &data); err != nil {
return nil, err
}
return data.ClaimInfoStateList, nil
}

View File

@ -0,0 +1,98 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"errors"
"fmt"
"sync"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
checkpointerrors "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)
type Checkpointer interface {
GetOrCreate() (*Checkpoint, error)
Store(*Checkpoint) error
}
type checkpointer struct {
sync.RWMutex
checkpointManager checkpointmanager.CheckpointManager
checkpointName string
}
// NewCheckpointer creates new checkpointer for keeping track of claim info with checkpoint backend
func NewCheckpointer(stateDir, checkpointName string) (Checkpointer, error) {
if len(checkpointName) == 0 {
return nil, fmt.Errorf("received empty string instead of checkpointName")
}
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %w", err)
}
checkpointer := &checkpointer{
checkpointManager: checkpointManager,
checkpointName: checkpointName,
}
return checkpointer, nil
}
// GetOrCreate gets list of claim info states from a checkpoint
// or creates empty list if checkpoint doesn't exist
func (sc *checkpointer) GetOrCreate() (*Checkpoint, error) {
sc.Lock()
defer sc.Unlock()
checkpoint, err := NewCheckpoint(nil)
if err != nil {
return nil, fmt.Errorf("failed to create new checkpoint: %w", err)
}
err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint)
if errors.Is(err, checkpointerrors.ErrCheckpointNotFound) {
err = sc.store(checkpoint)
if err != nil {
return nil, fmt.Errorf("failed to store checkpoint %v: %w", sc.checkpointName, err)
}
return checkpoint, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get checkpoint %v: %w", sc.checkpointName, err)
}
return checkpoint, nil
}
// Store stores checkpoint to the file
func (sc *checkpointer) Store(checkpoint *Checkpoint) error {
sc.Lock()
defer sc.Unlock()
return sc.store(checkpoint)
}
// store saves state to a checkpoint, caller is responsible for locking
func (sc *checkpointer) store(checkpoint *Checkpoint) error {
if err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint); err != nil {
return fmt.Errorf("could not save checkpoint %s: %w", sc.checkpointName, err)
}
return nil
}

View File

@ -0,0 +1,342 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"os"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing"
)
const testingCheckpoint = "dramanager_checkpoint_test"
// TODO (https://github.com/kubernetes/kubernetes/issues/123552): reconsider what data gets stored in checkpoints and whether that is really necessary.
func TestCheckpointGetOrCreate(t *testing.T) {
testCases := []struct {
description string
checkpointContent string
expectedError string
expectedClaimInfoStateList ClaimInfoStateList
}{
{
description: "new-checkpoint",
expectedClaimInfoStateList: nil,
},
{
description: "single-claim-info-state",
checkpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}}]}","Checksum":1656016162}`,
expectedClaimInfoStateList: ClaimInfoStateList{
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
},
},
},
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
},
},
{
description: "claim-info-state-with-multiple-devices",
checkpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]},{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-2\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}}]}","Checksum":3369508096}`,
expectedClaimInfoStateList: ClaimInfoStateList{
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
{
PoolName: "worker-1",
DeviceName: "dev-2",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
},
},
},
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
},
},
{
description: "two-claim-info-states",
checkpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example-1\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}},{\"ClaimUID\":\"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c\",\"ClaimName\":\"example-2\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-2\",\"RequestNames\":null,\"CDIDeviceIDs\":null}]}}}]}","Checksum":1582256999}`,
expectedClaimInfoStateList: ClaimInfoStateList{
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
},
},
},
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example-1",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-2",
},
},
},
},
ClaimUID: "4cf8db2d-06c0-7d70-1a51-e59b25b2c16c",
ClaimName: "example-2",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
},
},
{
description: "incorrect-checksum",
checkpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"Entries\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example-1\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}},{\"ClaimUID\":\"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c\",\"ClaimName\":\"example-2\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-2\",\"RequestNames\":null,\"CDIDeviceIDs\":null}]}}}]}","Checksum":2930258365}`,
expectedError: "checkpoint is corrupted",
},
{
description: "invalid-JSON",
checkpointContent: `{`,
expectedError: "unexpected end of JSON input",
},
}
// create temp dir
testingDir, err := os.MkdirTemp("", "dramanager_state_test")
if err != nil {
t.Fatal(err)
}
defer func() {
if err := os.RemoveAll(testingDir); err != nil {
t.Fatal(err)
}
}()
// create checkpoint manager for testing
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
require.NoError(t, err, "could not create testing checkpoint manager")
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
// ensure there is no previous checkpoint
require.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
// prepare checkpoint for testing
if strings.TrimSpace(tc.checkpointContent) != "" {
mock := &testutil.MockCheckpoint{Content: tc.checkpointContent}
require.NoError(t, cpm.CreateCheckpoint(testingCheckpoint, mock), "could not create testing checkpoint")
}
checkpointer, err := NewCheckpointer(testingDir, testingCheckpoint)
require.NoError(t, err, "could not create testing checkpointer")
checkpoint, err := checkpointer.GetOrCreate()
if strings.TrimSpace(tc.expectedError) != "" {
assert.ErrorContains(t, err, tc.expectedError)
} else {
require.NoError(t, err, "unexpected error")
stateList, err := checkpoint.GetClaimInfoStateList()
require.NoError(t, err, "could not get data entries from checkpoint")
require.NoError(t, err)
assert.Equal(t, tc.expectedClaimInfoStateList, stateList)
}
})
}
}
func TestCheckpointStateStore(t *testing.T) {
testCases := []struct {
description string
claimInfoStateList ClaimInfoStateList
expectedCheckpointContent string
}{
{
description: "single-claim-info-state",
claimInfoStateList: ClaimInfoStateList{
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
},
},
},
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
},
expectedCheckpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}}]}","Checksum":1656016162}`,
},
{
description: "claim-info-state-with-multiple-devices",
claimInfoStateList: ClaimInfoStateList{
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
{
PoolName: "worker-1",
DeviceName: "dev-2",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
},
},
},
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
},
expectedCheckpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]},{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-2\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}}]}","Checksum":3369508096}`,
},
{
description: "two-claim-info-states",
claimInfoStateList: ClaimInfoStateList{
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
},
},
},
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example-1",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-2",
},
},
},
},
ClaimUID: "4cf8db2d-06c0-7d70-1a51-e59b25b2c16c",
ClaimName: "example-2",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
},
expectedCheckpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example-1\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}},{\"ClaimUID\":\"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c\",\"ClaimName\":\"example-2\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-2\",\"RequestNames\":null,\"CDIDeviceIDs\":null}]}}}]}","Checksum":1582256999}`,
},
}
// Should return an error, stateDir cannot be an empty string
if _, err := NewCheckpointer("", testingCheckpoint); err == nil {
t.Fatal("expected error but got nil")
}
// create temp dir
testingDir, err := os.MkdirTemp("", "dramanager_state_test")
if err != nil {
t.Fatal(err)
}
defer func() {
if err := os.RemoveAll(testingDir); err != nil {
t.Fatal(err)
}
}()
// NewCheckpointState with an empty checkpointName should return an error
if _, err = NewCheckpointer(testingDir, ""); err == nil {
t.Fatal("expected error but got nil")
}
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
require.NoError(t, err, "could not create testing checkpoint manager")
require.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
cs, err := NewCheckpointer(testingDir, testingCheckpoint)
require.NoError(t, err, "could not create testing checkpointState instance")
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
checkpoint, err := NewCheckpoint(tc.claimInfoStateList)
require.NoError(t, err, "could not create Checkpoint")
err = cs.Store(checkpoint)
require.NoError(t, err, "could not store checkpoint")
err = cpm.GetCheckpoint(testingCheckpoint, checkpoint)
require.NoError(t, err, "could not get checkpoint")
checkpointContent, err := checkpoint.MarshalCheckpoint()
require.NoError(t, err, "could not Marshal Checkpoint")
assert.Equal(t, tc.expectedCheckpointContent, string(checkpointContent))
})
}
}

View File

@ -0,0 +1,59 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
)
type ClaimInfoStateList []ClaimInfoState
// +k8s:deepcopy-gen=true
type ClaimInfoState struct {
// ClaimUID is the UID of a resource claim
ClaimUID types.UID
// ClaimName is the name of a resource claim
ClaimName string
// Namespace is a claim namespace
Namespace string
// PodUIDs is a set of pod UIDs that reference a resource
PodUIDs sets.Set[string]
// DriverState contains information about all drivers which have allocation
// results in the claim, even if they don't provide devices for their results.
DriverState map[string]DriverState
}
// DriverState is used to store per-device claim info state in a checkpoint
// +k8s:deepcopy-gen=true
type DriverState struct {
Devices []Device
}
// Device is how a DRA driver described an allocated device in a claim
// to kubelet. RequestName and CDI device IDs are optional.
// +k8s:deepcopy-gen=true
type Device struct {
PoolName string
DeviceName string
RequestNames []string
CDIDeviceIDs []string
}

View File

@ -1,133 +0,0 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)
var _ CheckpointState = &stateCheckpoint{}
// CheckpointState interface provides to get and store state
type CheckpointState interface {
GetOrCreate() (ClaimInfoStateList, error)
Store(ClaimInfoStateList) error
}
// ClaimInfoState is used to store claim info state in a checkpoint
// +k8s:deepcopy-gen=true
type ClaimInfoState struct {
// ClaimUID is an UID of the resource claim
ClaimUID types.UID
// ClaimName is a name of the resource claim
ClaimName string
// Namespace is a claim namespace
Namespace string
// PodUIDs is a set of pod UIDs that reference a resource
PodUIDs sets.Set[string]
// DriverState contains information about all drivers which have allocation
// results in the claim, even if they don't provide devices for their results.
DriverState map[string]DriverState
}
// DriverState is used to store per-device claim info state in a checkpoint
// +k8s:deepcopy-gen=true
type DriverState struct {
Devices []Device
}
// Device is how a DRA driver described an allocated device in a claim
// to kubelet. RequestName and CDI device IDs are optional.
// +k8s:deepcopy-gen=true
type Device struct {
PoolName string
DeviceName string
RequestNames []string
CDIDeviceIDs []string
}
type stateCheckpoint struct {
sync.RWMutex
checkpointManager checkpointmanager.CheckpointManager
checkpointName string
}
// NewCheckpointState creates new State for keeping track of claim info with checkpoint backend
func NewCheckpointState(stateDir, checkpointName string) (*stateCheckpoint, error) {
if len(checkpointName) == 0 {
return nil, fmt.Errorf("received empty string instead of checkpointName")
}
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
stateCheckpoint := &stateCheckpoint{
checkpointManager: checkpointManager,
checkpointName: checkpointName,
}
return stateCheckpoint, nil
}
// get state from a checkpoint and creates it if it doesn't exist
func (sc *stateCheckpoint) GetOrCreate() (ClaimInfoStateList, error) {
sc.Lock()
defer sc.Unlock()
checkpoint := NewDRAManagerCheckpoint()
err := sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint)
if err == errors.ErrCheckpointNotFound {
sc.store(ClaimInfoStateList{})
return ClaimInfoStateList{}, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get checkpoint %v: %w", sc.checkpointName, err)
}
return checkpoint.Entries, nil
}
// saves state to a checkpoint
func (sc *stateCheckpoint) Store(claimInfoStateList ClaimInfoStateList) error {
sc.Lock()
defer sc.Unlock()
return sc.store(claimInfoStateList)
}
// saves state to a checkpoint, caller is responsible for locking
func (sc *stateCheckpoint) store(claimInfoStateList ClaimInfoStateList) error {
checkpoint := NewDRAManagerCheckpoint()
checkpoint.Entries = claimInfoStateList
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil {
return fmt.Errorf("could not save checkpoint %s: %v", sc.checkpointName, err)
}
return nil
}

View File

@ -1,289 +0,0 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"errors"
"os"
"strings"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
cmerrors "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing"
)
const testingCheckpoint = "dramanager_checkpoint_test"
// assertStateEqual marks provided test as failed if provided states differ
func assertStateEqual(t *testing.T, restoredState, expectedState ClaimInfoStateList) {
assert.Equal(t, expectedState, restoredState, "expected ClaimInfoState does not equal to restored one")
}
// TODO (https://github.com/kubernetes/kubernetes/issues/123552): reconsider what data gets stored in checkpoints and whether that is really necessary.
//
// As it stands now, a "v1" checkpoint contains data for types like the resourceapi.ResourceHandle
// which may change over time as new fields get added in a backward-compatible way (not unusual
// for API types). That breaks checksuming with pkg/util/hash because it is based on spew output.
// That output includes those new fields.
func TestCheckpointGetOrCreate(t *testing.T) {
testCases := []struct {
description string
checkpointContent string
expectedError string
expectedState ClaimInfoStateList
}{
{
description: "Create non-existing checkpoint",
checkpointContent: "",
expectedError: "",
expectedState: []ClaimInfoState{},
},
{
description: "Restore checkpoint - single claim",
checkpointContent: `{"version":"v1","entries":[{"ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-1","RequestNames":["test request"],"CDIDeviceIDs":["example.com/example=cdi-example"]}]}}}],"checksum":1925941526}`,
expectedState: []ClaimInfoState{
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
},
},
},
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
},
},
{
description: "Restore checkpoint - single claim - multiple devices",
checkpointContent: `{"version":"v1","entries":[{"ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-1","RequestNames":["test request"],"CDIDeviceIDs":["example.com/example=cdi-example"]},{"PoolName":"worker-1","DeviceName":"dev-2","RequestNames":["test request2"],"CDIDeviceIDs":["example.com/example=cdi-example2"]}]}}}],"checksum":3560752542}`,
expectedError: "",
expectedState: []ClaimInfoState{
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
{
PoolName: "worker-1",
DeviceName: "dev-2",
RequestNames: []string{"test request2"},
CDIDeviceIDs: []string{"example.com/example=cdi-example2"},
},
},
},
},
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
},
},
{
description: "Restore checkpoint - multiple claims",
checkpointContent: `{"version":"v1","entries":[{"ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example-1","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-1","RequestNames":["test request"],"CDIDeviceIDs":["example.com/example=cdi-example"]}]}}},{"ClaimUID":"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c","ClaimName":"example-2","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-1","RequestNames":["test request"],"CDIDeviceIDs":["example.com/example=cdi-example"]}]}}}],"checksum":351581974}`,
expectedError: "",
expectedState: []ClaimInfoState{
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
},
},
},
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example-1",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{
{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
},
},
},
},
ClaimUID: "4cf8db2d-06c0-7d70-1a51-e59b25b2c16c",
ClaimName: "example-2",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
},
},
{
description: "Restore checkpoint - invalid checksum",
checkpointContent: `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":1988120168}`,
expectedError: "checkpoint is corrupted",
expectedState: []ClaimInfoState{},
},
{
description: "Restore checkpoint with invalid JSON",
checkpointContent: `{`,
expectedError: "unexpected end of JSON input",
expectedState: []ClaimInfoState{},
},
}
// create temp dir
testingDir, err := os.MkdirTemp("", "dramanager_state_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testingDir)
// create checkpoint manager for testing
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
assert.NoError(t, err, "could not create testing checkpoint manager")
for _, tc := range testCases {
t.Run(tc.description, func(t *testing.T) {
// ensure there is no previous checkpoint
assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
// prepare checkpoint for testing
if strings.TrimSpace(tc.checkpointContent) != "" {
checkpoint := &testutil.MockCheckpoint{Content: tc.checkpointContent}
assert.NoError(t, cpm.CreateCheckpoint(testingCheckpoint, checkpoint), "could not create testing checkpoint")
}
var state ClaimInfoStateList
checkpointState, err := NewCheckpointState(testingDir, testingCheckpoint)
if err == nil {
state, err = checkpointState.GetOrCreate()
}
if strings.TrimSpace(tc.expectedError) != "" {
assert.ErrorContains(t, err, tc.expectedError)
} else {
requireNoCheckpointError(t, err)
// compare state after restoration with the one expected
assertStateEqual(t, state, tc.expectedState)
}
})
}
}
func TestCheckpointStateStore(t *testing.T) {
claimInfoStateList := ClaimInfoStateList{
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{{
PoolName: "worker-1",
DeviceName: "dev-1",
RequestNames: []string{"test request"},
CDIDeviceIDs: []string{"example.com/example=cdi-example"},
}},
},
},
ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7",
ClaimName: "example-1",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
{
DriverState: map[string]DriverState{
"test-driver.cdi.k8s.io": {
Devices: []Device{{
PoolName: "worker-1",
DeviceName: "dev-2",
}},
},
},
ClaimUID: "4cf8db2d-06c0-7d70-1a51-e59b25b2c16c",
ClaimName: "example-2",
Namespace: "default",
PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"),
},
}
expectedCheckpoint := `{"version":"v1","entries":[{"ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example-1","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-1","RequestNames":["test request"],"CDIDeviceIDs":["example.com/example=cdi-example"]}]}}},{"ClaimUID":"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c","ClaimName":"example-2","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-2","RequestNames":null,"CDIDeviceIDs":null}]}}}],"checksum":1191151426}`
// Should return an error, stateDir cannot be an empty string
if _, err := NewCheckpointState("", testingCheckpoint); err == nil {
t.Fatal("expected error but got nil")
}
// create temp dir
testingDir, err := os.MkdirTemp("", "dramanager_state_test")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(testingDir)
cpm, err := checkpointmanager.NewCheckpointManager(testingDir)
assert.NoError(t, err, "could not create testing checkpoint manager")
assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint")
cs, err := NewCheckpointState(testingDir, testingCheckpoint)
assert.NoError(t, err, "could not create testing checkpointState instance")
err = cs.Store(claimInfoStateList)
assert.NoError(t, err, "could not store ClaimInfoState")
checkpoint := NewDRAManagerCheckpoint()
cpm.GetCheckpoint(testingCheckpoint, checkpoint)
checkpointData, err := checkpoint.MarshalCheckpoint()
assert.NoError(t, err, "could not Marshal Checkpoint")
assert.Equal(t, expectedCheckpoint, string(checkpointData), "expected ClaimInfoState does not equal to restored one")
// NewCheckpointState with an empty checkpointName should return an error
if _, err = NewCheckpointState(testingDir, ""); err == nil {
t.Fatal("expected error but got nil")
}
}
func requireNoCheckpointError(t *testing.T, err error) {
t.Helper()
var cksumErr *cmerrors.CorruptCheckpointError
if errors.As(err, &cksumErr) {
t.Fatalf("unexpected corrupt checkpoint, expected checksum %d, got %d", cksumErr.ExpectedCS, cksumErr.ActualCS)
} else {
require.NoError(t, err, "could not restore checkpoint")
}
}