diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index 7cf95d05604..9048c9bffbc 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -40,8 +40,8 @@ type ClaimInfo struct { // claimInfoCache is a cache of processed resource claims keyed by namespace/claimname. type claimInfoCache struct { sync.RWMutex - state state.CheckpointState - claimInfo map[string]*ClaimInfo + checkpointer state.Checkpointer + claimInfo map[string]*ClaimInfo } // newClaimInfoFromClaim creates a new claim info from a resource claim. @@ -77,12 +77,12 @@ func newClaimInfoFromState(state *state.ClaimInfoState) *ClaimInfo { } // setCDIDevices adds a set of CDI devices to the claim info. -func (info *ClaimInfo) addDevice(driverName string, device state.Device) { +func (info *ClaimInfo) addDevice(driverName string, deviceState state.Device) { if info.DriverState == nil { info.DriverState = make(map[string]state.DriverState) } driverState := info.DriverState[driverName] - driverState.Devices = append(driverState.Devices, device) + driverState.Devices = append(driverState.Devices, deviceState) info.DriverState[driverName] = driverState } @@ -113,22 +113,27 @@ func (info *ClaimInfo) isPrepared() bool { // newClaimInfoCache creates a new claim info cache object, pre-populated from a checkpoint (if present). func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) { - stateImpl, err := state.NewCheckpointState(stateDir, checkpointName) + checkpointer, err := state.NewCheckpointer(stateDir, checkpointName) if err != nil { return nil, fmt.Errorf("could not initialize checkpoint manager, please drain node and remove dra state file, err: %+v", err) } - curState, err := stateImpl.GetOrCreate() + checkpoint, err := checkpointer.GetOrCreate() if err != nil { return nil, fmt.Errorf("error calling GetOrCreate() on checkpoint state: %v", err) } cache := &claimInfoCache{ - state: stateImpl, - claimInfo: make(map[string]*ClaimInfo), + checkpointer: checkpointer, + claimInfo: make(map[string]*ClaimInfo), } - for _, entry := range curState { + entries, err := checkpoint.GetClaimInfoStateList() + if err != nil { + return nil, fmt.Errorf("error calling GetEntries() on checkpoint: %w", err) + + } + for _, entry := range entries { info := newClaimInfoFromState(&entry) cache.claimInfo[info.Namespace+"/"+info.ClaimName] = info } @@ -192,7 +197,11 @@ func (cache *claimInfoCache) syncToCheckpoint() error { for _, infoClaim := range cache.claimInfo { claimInfoStateList = append(claimInfoStateList, infoClaim.ClaimInfoState) } - return cache.state.Store(claimInfoStateList) + checkpoint, err := state.NewCheckpoint(claimInfoStateList) + if err != nil { + return err + } + return cache.checkpointer.Store(checkpoint) } // cdiDevicesAsList returns a list of CDIDevices from the provided claim info. diff --git a/pkg/kubelet/cm/dra/state/checkpoint.go b/pkg/kubelet/cm/dra/state/checkpoint.go index 7cce6118182..bfbae956bbc 100644 --- a/pkg/kubelet/cm/dra/state/checkpoint.go +++ b/pkg/kubelet/cm/dra/state/checkpoint.go @@ -18,51 +18,90 @@ package state import ( "encoding/json" + "hash/crc32" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" ) -var _ checkpointmanager.Checkpoint = &DRAManagerCheckpoint{} +const ( + CheckpointAPIGroup = "checkpoint.dra.kubelet.k8s.io" + CheckpointKind = "DRACheckpoint" + CheckpointAPIVersion = CheckpointAPIGroup + "/v1" +) -const checkpointVersion = "v1" - -// DRAManagerCheckpoint struct is used to store pod dynamic resources assignments in a checkpoint -type DRAManagerCheckpoint struct { - Version string `json:"version"` - Entries ClaimInfoStateList `json:"entries,omitempty"` - Checksum checksum.Checksum `json:"checksum"` +// Checkpoint represents a structure to store DRA checkpoint data +type Checkpoint struct { + // Data is a JSON serialized checkpoint data + Data string + // Checksum is a checksum of Data + Checksum uint32 } -// List of claim info to store in checkpoint -type ClaimInfoStateList []ClaimInfoState +type CheckpointData struct { + metav1.TypeMeta + ClaimInfoStateList ClaimInfoStateList +} -// NewDRAManagerCheckpoint returns an instance of Checkpoint -func NewDRAManagerCheckpoint() *DRAManagerCheckpoint { - return &DRAManagerCheckpoint{ - Version: checkpointVersion, - Entries: ClaimInfoStateList{}, +// NewCheckpoint creates a new checkpoint from a list of claim info states +func NewCheckpoint(data ClaimInfoStateList) (*Checkpoint, error) { + cpData := &CheckpointData{ + TypeMeta: metav1.TypeMeta{ + Kind: CheckpointKind, + APIVersion: CheckpointAPIVersion, + }, + ClaimInfoStateList: data, } + + cpDataBytes, err := json.Marshal(cpData) + if err != nil { + return nil, err + } + + cp := &Checkpoint{ + Data: string(cpDataBytes), + Checksum: crc32.ChecksumIEEE(cpDataBytes), + } + + return cp, nil } -// MarshalCheckpoint returns marshalled checkpoint -func (dc *DRAManagerCheckpoint) MarshalCheckpoint() ([]byte, error) { - // make sure checksum wasn't set before so it doesn't affect output checksum - dc.Checksum = 0 - dc.Checksum = checksum.New(dc) - return json.Marshal(*dc) +// MarshalCheckpoint marshals checkpoint to JSON +func (cp *Checkpoint) MarshalCheckpoint() ([]byte, error) { + return json.Marshal(cp) } -// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint -func (dc *DRAManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error { - return json.Unmarshal(blob, dc) +// UnmarshalCheckpoint unmarshals checkpoint from JSON +// and verifies its data checksum +func (cp *Checkpoint) UnmarshalCheckpoint(blob []byte) error { + if err := json.Unmarshal(blob, cp); err != nil { + return err + } + + // verify checksum + if err := cp.VerifyChecksum(); err != nil { + return err + } + + return nil } -// VerifyChecksum verifies that current checksum of checkpoint is valid -func (dc *DRAManagerCheckpoint) VerifyChecksum() error { - ck := dc.Checksum - dc.Checksum = 0 - err := ck.Verify(dc) - dc.Checksum = ck - return err +// VerifyChecksum verifies that current checksum +// of checkpointed Data is valid +func (cp *Checkpoint) VerifyChecksum() error { + expectedCS := crc32.ChecksumIEEE([]byte(cp.Data)) + if expectedCS != cp.Checksum { + return &errors.CorruptCheckpointError{ActualCS: uint64(cp.Checksum), ExpectedCS: uint64(expectedCS)} + } + return nil +} + +// GetClaimInfoStateList returns list of claim info states from checkpoint +func (cp *Checkpoint) GetClaimInfoStateList() (ClaimInfoStateList, error) { + var data CheckpointData + if err := json.Unmarshal([]byte(cp.Data), &data); err != nil { + return nil, err + } + + return data.ClaimInfoStateList, nil } diff --git a/pkg/kubelet/cm/dra/state/checkpointer.go b/pkg/kubelet/cm/dra/state/checkpointer.go new file mode 100644 index 00000000000..62f98029f85 --- /dev/null +++ b/pkg/kubelet/cm/dra/state/checkpointer.go @@ -0,0 +1,98 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "errors" + "fmt" + "sync" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + checkpointerrors "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" +) + +type Checkpointer interface { + GetOrCreate() (*Checkpoint, error) + Store(*Checkpoint) error +} + +type checkpointer struct { + sync.RWMutex + checkpointManager checkpointmanager.CheckpointManager + checkpointName string +} + +// NewCheckpointer creates new checkpointer for keeping track of claim info with checkpoint backend +func NewCheckpointer(stateDir, checkpointName string) (Checkpointer, error) { + if len(checkpointName) == 0 { + return nil, fmt.Errorf("received empty string instead of checkpointName") + } + + checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + if err != nil { + return nil, fmt.Errorf("failed to initialize checkpoint manager: %w", err) + } + + checkpointer := &checkpointer{ + checkpointManager: checkpointManager, + checkpointName: checkpointName, + } + + return checkpointer, nil +} + +// GetOrCreate gets list of claim info states from a checkpoint +// or creates empty list if checkpoint doesn't exist +func (sc *checkpointer) GetOrCreate() (*Checkpoint, error) { + sc.Lock() + defer sc.Unlock() + + checkpoint, err := NewCheckpoint(nil) + if err != nil { + return nil, fmt.Errorf("failed to create new checkpoint: %w", err) + } + + err = sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint) + if errors.Is(err, checkpointerrors.ErrCheckpointNotFound) { + err = sc.store(checkpoint) + if err != nil { + return nil, fmt.Errorf("failed to store checkpoint %v: %w", sc.checkpointName, err) + } + return checkpoint, nil + } + if err != nil { + return nil, fmt.Errorf("failed to get checkpoint %v: %w", sc.checkpointName, err) + } + + return checkpoint, nil +} + +// Store stores checkpoint to the file +func (sc *checkpointer) Store(checkpoint *Checkpoint) error { + sc.Lock() + defer sc.Unlock() + + return sc.store(checkpoint) +} + +// store saves state to a checkpoint, caller is responsible for locking +func (sc *checkpointer) store(checkpoint *Checkpoint) error { + if err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint); err != nil { + return fmt.Errorf("could not save checkpoint %s: %w", sc.checkpointName, err) + } + return nil +} diff --git a/pkg/kubelet/cm/dra/state/checkpointer_test.go b/pkg/kubelet/cm/dra/state/checkpointer_test.go new file mode 100644 index 00000000000..a1290ae8e85 --- /dev/null +++ b/pkg/kubelet/cm/dra/state/checkpointer_test.go @@ -0,0 +1,342 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "os" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing" +) + +const testingCheckpoint = "dramanager_checkpoint_test" + +// TODO (https://github.com/kubernetes/kubernetes/issues/123552): reconsider what data gets stored in checkpoints and whether that is really necessary. + +func TestCheckpointGetOrCreate(t *testing.T) { + testCases := []struct { + description string + checkpointContent string + expectedError string + expectedClaimInfoStateList ClaimInfoStateList + }{ + { + description: "new-checkpoint", + expectedClaimInfoStateList: nil, + }, + { + description: "single-claim-info-state", + checkpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}}]}","Checksum":1656016162}`, + expectedClaimInfoStateList: ClaimInfoStateList{ + { + DriverState: map[string]DriverState{ + "test-driver.cdi.k8s.io": { + Devices: []Device{ + { + PoolName: "worker-1", + DeviceName: "dev-1", + RequestNames: []string{"test request"}, + CDIDeviceIDs: []string{"example.com/example=cdi-example"}, + }, + }, + }, + }, + ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", + ClaimName: "example", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + }, + }, + }, + { + description: "claim-info-state-with-multiple-devices", + checkpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]},{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-2\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}}]}","Checksum":3369508096}`, + expectedClaimInfoStateList: ClaimInfoStateList{ + { + DriverState: map[string]DriverState{ + "test-driver.cdi.k8s.io": { + Devices: []Device{ + { + PoolName: "worker-1", + DeviceName: "dev-1", + RequestNames: []string{"test request"}, + CDIDeviceIDs: []string{"example.com/example=cdi-example"}, + }, + { + PoolName: "worker-1", + DeviceName: "dev-2", + RequestNames: []string{"test request"}, + CDIDeviceIDs: []string{"example.com/example=cdi-example"}, + }, + }, + }, + }, + ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", + ClaimName: "example", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + }, + }, + }, + { + description: "two-claim-info-states", + checkpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example-1\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}},{\"ClaimUID\":\"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c\",\"ClaimName\":\"example-2\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-2\",\"RequestNames\":null,\"CDIDeviceIDs\":null}]}}}]}","Checksum":1582256999}`, + expectedClaimInfoStateList: ClaimInfoStateList{ + { + DriverState: map[string]DriverState{ + "test-driver.cdi.k8s.io": { + Devices: []Device{ + { + PoolName: "worker-1", + DeviceName: "dev-1", + RequestNames: []string{"test request"}, + CDIDeviceIDs: []string{"example.com/example=cdi-example"}, + }, + }, + }, + }, + ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", + ClaimName: "example-1", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + }, + { + DriverState: map[string]DriverState{ + "test-driver.cdi.k8s.io": { + Devices: []Device{ + { + PoolName: "worker-1", + DeviceName: "dev-2", + }, + }, + }, + }, + ClaimUID: "4cf8db2d-06c0-7d70-1a51-e59b25b2c16c", + ClaimName: "example-2", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + }, + }, + }, + { + description: "incorrect-checksum", + checkpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"Entries\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example-1\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}},{\"ClaimUID\":\"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c\",\"ClaimName\":\"example-2\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-2\",\"RequestNames\":null,\"CDIDeviceIDs\":null}]}}}]}","Checksum":2930258365}`, + expectedError: "checkpoint is corrupted", + }, + { + description: "invalid-JSON", + checkpointContent: `{`, + expectedError: "unexpected end of JSON input", + }, + } + + // create temp dir + testingDir, err := os.MkdirTemp("", "dramanager_state_test") + if err != nil { + t.Fatal(err) + } + defer func() { + if err := os.RemoveAll(testingDir); err != nil { + t.Fatal(err) + } + }() + + // create checkpoint manager for testing + cpm, err := checkpointmanager.NewCheckpointManager(testingDir) + require.NoError(t, err, "could not create testing checkpoint manager") + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + // ensure there is no previous checkpoint + require.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint") + + // prepare checkpoint for testing + if strings.TrimSpace(tc.checkpointContent) != "" { + mock := &testutil.MockCheckpoint{Content: tc.checkpointContent} + require.NoError(t, cpm.CreateCheckpoint(testingCheckpoint, mock), "could not create testing checkpoint") + } + + checkpointer, err := NewCheckpointer(testingDir, testingCheckpoint) + require.NoError(t, err, "could not create testing checkpointer") + + checkpoint, err := checkpointer.GetOrCreate() + if strings.TrimSpace(tc.expectedError) != "" { + assert.ErrorContains(t, err, tc.expectedError) + } else { + require.NoError(t, err, "unexpected error") + stateList, err := checkpoint.GetClaimInfoStateList() + require.NoError(t, err, "could not get data entries from checkpoint") + require.NoError(t, err) + assert.Equal(t, tc.expectedClaimInfoStateList, stateList) + } + }) + } +} + +func TestCheckpointStateStore(t *testing.T) { + testCases := []struct { + description string + claimInfoStateList ClaimInfoStateList + expectedCheckpointContent string + }{ + { + description: "single-claim-info-state", + claimInfoStateList: ClaimInfoStateList{ + { + DriverState: map[string]DriverState{ + "test-driver.cdi.k8s.io": { + Devices: []Device{ + { + PoolName: "worker-1", + DeviceName: "dev-1", + RequestNames: []string{"test request"}, + CDIDeviceIDs: []string{"example.com/example=cdi-example"}, + }, + }, + }, + }, + ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", + ClaimName: "example", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + }, + }, + expectedCheckpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}}]}","Checksum":1656016162}`, + }, + { + description: "claim-info-state-with-multiple-devices", + claimInfoStateList: ClaimInfoStateList{ + { + DriverState: map[string]DriverState{ + "test-driver.cdi.k8s.io": { + Devices: []Device{ + { + PoolName: "worker-1", + DeviceName: "dev-1", + RequestNames: []string{"test request"}, + CDIDeviceIDs: []string{"example.com/example=cdi-example"}, + }, + { + PoolName: "worker-1", + DeviceName: "dev-2", + RequestNames: []string{"test request"}, + CDIDeviceIDs: []string{"example.com/example=cdi-example"}, + }, + }, + }, + }, + ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", + ClaimName: "example", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + }, + }, + expectedCheckpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]},{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-2\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}}]}","Checksum":3369508096}`, + }, + { + description: "two-claim-info-states", + claimInfoStateList: ClaimInfoStateList{ + { + DriverState: map[string]DriverState{ + "test-driver.cdi.k8s.io": { + Devices: []Device{ + { + PoolName: "worker-1", + DeviceName: "dev-1", + RequestNames: []string{"test request"}, + CDIDeviceIDs: []string{"example.com/example=cdi-example"}, + }, + }, + }, + }, + ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", + ClaimName: "example-1", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + }, + { + DriverState: map[string]DriverState{ + "test-driver.cdi.k8s.io": { + Devices: []Device{ + { + PoolName: "worker-1", + DeviceName: "dev-2", + }, + }, + }, + }, + ClaimUID: "4cf8db2d-06c0-7d70-1a51-e59b25b2c16c", + ClaimName: "example-2", + Namespace: "default", + PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + }, + }, + expectedCheckpointContent: `{"Data":"{\"kind\":\"DRACheckpoint\",\"apiVersion\":\"checkpoint.dra.kubelet.k8s.io/v1\",\"ClaimInfoStateList\":[{\"ClaimUID\":\"067798be-454e-4be4-9047-1aa06aea63f7\",\"ClaimName\":\"example-1\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-1\",\"RequestNames\":[\"test request\"],\"CDIDeviceIDs\":[\"example.com/example=cdi-example\"]}]}}},{\"ClaimUID\":\"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c\",\"ClaimName\":\"example-2\",\"Namespace\":\"default\",\"PodUIDs\":{\"139cdb46-f989-4f17-9561-ca10cfb509a6\":{}},\"DriverState\":{\"test-driver.cdi.k8s.io\":{\"Devices\":[{\"PoolName\":\"worker-1\",\"DeviceName\":\"dev-2\",\"RequestNames\":null,\"CDIDeviceIDs\":null}]}}}]}","Checksum":1582256999}`, + }, + } + + // Should return an error, stateDir cannot be an empty string + if _, err := NewCheckpointer("", testingCheckpoint); err == nil { + t.Fatal("expected error but got nil") + } + + // create temp dir + testingDir, err := os.MkdirTemp("", "dramanager_state_test") + if err != nil { + t.Fatal(err) + } + defer func() { + if err := os.RemoveAll(testingDir); err != nil { + t.Fatal(err) + } + }() + + // NewCheckpointState with an empty checkpointName should return an error + if _, err = NewCheckpointer(testingDir, ""); err == nil { + t.Fatal("expected error but got nil") + } + + cpm, err := checkpointmanager.NewCheckpointManager(testingDir) + require.NoError(t, err, "could not create testing checkpoint manager") + require.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint") + + cs, err := NewCheckpointer(testingDir, testingCheckpoint) + require.NoError(t, err, "could not create testing checkpointState instance") + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + checkpoint, err := NewCheckpoint(tc.claimInfoStateList) + require.NoError(t, err, "could not create Checkpoint") + + err = cs.Store(checkpoint) + require.NoError(t, err, "could not store checkpoint") + + err = cpm.GetCheckpoint(testingCheckpoint, checkpoint) + require.NoError(t, err, "could not get checkpoint") + + checkpointContent, err := checkpoint.MarshalCheckpoint() + require.NoError(t, err, "could not Marshal Checkpoint") + assert.Equal(t, tc.expectedCheckpointContent, string(checkpointContent)) + }) + } +} diff --git a/pkg/kubelet/cm/dra/state/state.go b/pkg/kubelet/cm/dra/state/state.go new file mode 100644 index 00000000000..045a9b6b2f4 --- /dev/null +++ b/pkg/kubelet/cm/dra/state/state.go @@ -0,0 +1,59 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" +) + +type ClaimInfoStateList []ClaimInfoState + +// +k8s:deepcopy-gen=true +type ClaimInfoState struct { + // ClaimUID is the UID of a resource claim + ClaimUID types.UID + + // ClaimName is the name of a resource claim + ClaimName string + + // Namespace is a claim namespace + Namespace string + + // PodUIDs is a set of pod UIDs that reference a resource + PodUIDs sets.Set[string] + + // DriverState contains information about all drivers which have allocation + // results in the claim, even if they don't provide devices for their results. + DriverState map[string]DriverState +} + +// DriverState is used to store per-device claim info state in a checkpoint +// +k8s:deepcopy-gen=true +type DriverState struct { + Devices []Device +} + +// Device is how a DRA driver described an allocated device in a claim +// to kubelet. RequestName and CDI device IDs are optional. +// +k8s:deepcopy-gen=true +type Device struct { + PoolName string + DeviceName string + RequestNames []string + CDIDeviceIDs []string +} diff --git a/pkg/kubelet/cm/dra/state/state_checkpoint.go b/pkg/kubelet/cm/dra/state/state_checkpoint.go deleted file mode 100644 index ce20e1ffdf9..00000000000 --- a/pkg/kubelet/cm/dra/state/state_checkpoint.go +++ /dev/null @@ -1,133 +0,0 @@ -/* -Copyright 2023 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package state - -import ( - "fmt" - "sync" - - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" -) - -var _ CheckpointState = &stateCheckpoint{} - -// CheckpointState interface provides to get and store state -type CheckpointState interface { - GetOrCreate() (ClaimInfoStateList, error) - Store(ClaimInfoStateList) error -} - -// ClaimInfoState is used to store claim info state in a checkpoint -// +k8s:deepcopy-gen=true -type ClaimInfoState struct { - // ClaimUID is an UID of the resource claim - ClaimUID types.UID - - // ClaimName is a name of the resource claim - ClaimName string - - // Namespace is a claim namespace - Namespace string - - // PodUIDs is a set of pod UIDs that reference a resource - PodUIDs sets.Set[string] - - // DriverState contains information about all drivers which have allocation - // results in the claim, even if they don't provide devices for their results. - DriverState map[string]DriverState -} - -// DriverState is used to store per-device claim info state in a checkpoint -// +k8s:deepcopy-gen=true -type DriverState struct { - Devices []Device -} - -// Device is how a DRA driver described an allocated device in a claim -// to kubelet. RequestName and CDI device IDs are optional. -// +k8s:deepcopy-gen=true -type Device struct { - PoolName string - DeviceName string - RequestNames []string - CDIDeviceIDs []string -} - -type stateCheckpoint struct { - sync.RWMutex - checkpointManager checkpointmanager.CheckpointManager - checkpointName string -} - -// NewCheckpointState creates new State for keeping track of claim info with checkpoint backend -func NewCheckpointState(stateDir, checkpointName string) (*stateCheckpoint, error) { - if len(checkpointName) == 0 { - return nil, fmt.Errorf("received empty string instead of checkpointName") - } - - checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) - if err != nil { - return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) - } - stateCheckpoint := &stateCheckpoint{ - checkpointManager: checkpointManager, - checkpointName: checkpointName, - } - - return stateCheckpoint, nil -} - -// get state from a checkpoint and creates it if it doesn't exist -func (sc *stateCheckpoint) GetOrCreate() (ClaimInfoStateList, error) { - sc.Lock() - defer sc.Unlock() - - checkpoint := NewDRAManagerCheckpoint() - err := sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint) - if err == errors.ErrCheckpointNotFound { - sc.store(ClaimInfoStateList{}) - return ClaimInfoStateList{}, nil - } - if err != nil { - return nil, fmt.Errorf("failed to get checkpoint %v: %w", sc.checkpointName, err) - } - - return checkpoint.Entries, nil -} - -// saves state to a checkpoint -func (sc *stateCheckpoint) Store(claimInfoStateList ClaimInfoStateList) error { - sc.Lock() - defer sc.Unlock() - - return sc.store(claimInfoStateList) -} - -// saves state to a checkpoint, caller is responsible for locking -func (sc *stateCheckpoint) store(claimInfoStateList ClaimInfoStateList) error { - checkpoint := NewDRAManagerCheckpoint() - checkpoint.Entries = claimInfoStateList - - err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) - if err != nil { - return fmt.Errorf("could not save checkpoint %s: %v", sc.checkpointName, err) - } - return nil -} diff --git a/pkg/kubelet/cm/dra/state/state_checkpoint_test.go b/pkg/kubelet/cm/dra/state/state_checkpoint_test.go deleted file mode 100644 index 4668235d40b..00000000000 --- a/pkg/kubelet/cm/dra/state/state_checkpoint_test.go +++ /dev/null @@ -1,289 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package state - -import ( - "errors" - "os" - "strings" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" - cmerrors "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" - testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing" -) - -const testingCheckpoint = "dramanager_checkpoint_test" - -// assertStateEqual marks provided test as failed if provided states differ -func assertStateEqual(t *testing.T, restoredState, expectedState ClaimInfoStateList) { - assert.Equal(t, expectedState, restoredState, "expected ClaimInfoState does not equal to restored one") -} - -// TODO (https://github.com/kubernetes/kubernetes/issues/123552): reconsider what data gets stored in checkpoints and whether that is really necessary. -// -// As it stands now, a "v1" checkpoint contains data for types like the resourceapi.ResourceHandle -// which may change over time as new fields get added in a backward-compatible way (not unusual -// for API types). That breaks checksuming with pkg/util/hash because it is based on spew output. -// That output includes those new fields. - -func TestCheckpointGetOrCreate(t *testing.T) { - testCases := []struct { - description string - checkpointContent string - expectedError string - expectedState ClaimInfoStateList - }{ - { - description: "Create non-existing checkpoint", - checkpointContent: "", - expectedError: "", - expectedState: []ClaimInfoState{}, - }, - { - description: "Restore checkpoint - single claim", - checkpointContent: `{"version":"v1","entries":[{"ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-1","RequestNames":["test request"],"CDIDeviceIDs":["example.com/example=cdi-example"]}]}}}],"checksum":1925941526}`, - expectedState: []ClaimInfoState{ - { - DriverState: map[string]DriverState{ - "test-driver.cdi.k8s.io": { - Devices: []Device{ - { - PoolName: "worker-1", - DeviceName: "dev-1", - RequestNames: []string{"test request"}, - CDIDeviceIDs: []string{"example.com/example=cdi-example"}, - }, - }, - }, - }, - ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", - ClaimName: "example", - Namespace: "default", - PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), - }, - }, - }, - { - description: "Restore checkpoint - single claim - multiple devices", - checkpointContent: `{"version":"v1","entries":[{"ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-1","RequestNames":["test request"],"CDIDeviceIDs":["example.com/example=cdi-example"]},{"PoolName":"worker-1","DeviceName":"dev-2","RequestNames":["test request2"],"CDIDeviceIDs":["example.com/example=cdi-example2"]}]}}}],"checksum":3560752542}`, - expectedError: "", - expectedState: []ClaimInfoState{ - { - DriverState: map[string]DriverState{ - "test-driver.cdi.k8s.io": { - Devices: []Device{ - { - PoolName: "worker-1", - DeviceName: "dev-1", - RequestNames: []string{"test request"}, - CDIDeviceIDs: []string{"example.com/example=cdi-example"}, - }, - { - PoolName: "worker-1", - DeviceName: "dev-2", - RequestNames: []string{"test request2"}, - CDIDeviceIDs: []string{"example.com/example=cdi-example2"}, - }, - }, - }, - }, - ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", - ClaimName: "example", - Namespace: "default", - PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), - }, - }, - }, - { - description: "Restore checkpoint - multiple claims", - checkpointContent: `{"version":"v1","entries":[{"ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example-1","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-1","RequestNames":["test request"],"CDIDeviceIDs":["example.com/example=cdi-example"]}]}}},{"ClaimUID":"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c","ClaimName":"example-2","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-1","RequestNames":["test request"],"CDIDeviceIDs":["example.com/example=cdi-example"]}]}}}],"checksum":351581974}`, - expectedError: "", - expectedState: []ClaimInfoState{ - { - DriverState: map[string]DriverState{ - "test-driver.cdi.k8s.io": { - Devices: []Device{ - { - PoolName: "worker-1", - DeviceName: "dev-1", - RequestNames: []string{"test request"}, - CDIDeviceIDs: []string{"example.com/example=cdi-example"}, - }, - }, - }, - }, - ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", - ClaimName: "example-1", - Namespace: "default", - PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), - }, - { - DriverState: map[string]DriverState{ - "test-driver.cdi.k8s.io": { - Devices: []Device{ - { - PoolName: "worker-1", - DeviceName: "dev-1", - RequestNames: []string{"test request"}, - CDIDeviceIDs: []string{"example.com/example=cdi-example"}, - }, - }, - }, - }, - ClaimUID: "4cf8db2d-06c0-7d70-1a51-e59b25b2c16c", - ClaimName: "example-2", - Namespace: "default", - PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), - }, - }, - }, - { - description: "Restore checkpoint - invalid checksum", - checkpointContent: `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":1988120168}`, - expectedError: "checkpoint is corrupted", - expectedState: []ClaimInfoState{}, - }, - { - description: "Restore checkpoint with invalid JSON", - checkpointContent: `{`, - expectedError: "unexpected end of JSON input", - expectedState: []ClaimInfoState{}, - }, - } - - // create temp dir - testingDir, err := os.MkdirTemp("", "dramanager_state_test") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(testingDir) - - // create checkpoint manager for testing - cpm, err := checkpointmanager.NewCheckpointManager(testingDir) - assert.NoError(t, err, "could not create testing checkpoint manager") - - for _, tc := range testCases { - t.Run(tc.description, func(t *testing.T) { - // ensure there is no previous checkpoint - assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint") - - // prepare checkpoint for testing - if strings.TrimSpace(tc.checkpointContent) != "" { - checkpoint := &testutil.MockCheckpoint{Content: tc.checkpointContent} - assert.NoError(t, cpm.CreateCheckpoint(testingCheckpoint, checkpoint), "could not create testing checkpoint") - } - - var state ClaimInfoStateList - - checkpointState, err := NewCheckpointState(testingDir, testingCheckpoint) - - if err == nil { - state, err = checkpointState.GetOrCreate() - } - if strings.TrimSpace(tc.expectedError) != "" { - assert.ErrorContains(t, err, tc.expectedError) - } else { - requireNoCheckpointError(t, err) - // compare state after restoration with the one expected - assertStateEqual(t, state, tc.expectedState) - } - }) - } -} - -func TestCheckpointStateStore(t *testing.T) { - claimInfoStateList := ClaimInfoStateList{ - { - DriverState: map[string]DriverState{ - "test-driver.cdi.k8s.io": { - Devices: []Device{{ - PoolName: "worker-1", - DeviceName: "dev-1", - RequestNames: []string{"test request"}, - CDIDeviceIDs: []string{"example.com/example=cdi-example"}, - }}, - }, - }, - ClaimUID: "067798be-454e-4be4-9047-1aa06aea63f7", - ClaimName: "example-1", - Namespace: "default", - PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), - }, - { - DriverState: map[string]DriverState{ - "test-driver.cdi.k8s.io": { - Devices: []Device{{ - PoolName: "worker-1", - DeviceName: "dev-2", - }}, - }, - }, - ClaimUID: "4cf8db2d-06c0-7d70-1a51-e59b25b2c16c", - ClaimName: "example-2", - Namespace: "default", - PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), - }, - } - - expectedCheckpoint := `{"version":"v1","entries":[{"ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example-1","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-1","RequestNames":["test request"],"CDIDeviceIDs":["example.com/example=cdi-example"]}]}}},{"ClaimUID":"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c","ClaimName":"example-2","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"DriverState":{"test-driver.cdi.k8s.io":{"Devices":[{"PoolName":"worker-1","DeviceName":"dev-2","RequestNames":null,"CDIDeviceIDs":null}]}}}],"checksum":1191151426}` - - // Should return an error, stateDir cannot be an empty string - if _, err := NewCheckpointState("", testingCheckpoint); err == nil { - t.Fatal("expected error but got nil") - } - - // create temp dir - testingDir, err := os.MkdirTemp("", "dramanager_state_test") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(testingDir) - - cpm, err := checkpointmanager.NewCheckpointManager(testingDir) - assert.NoError(t, err, "could not create testing checkpoint manager") - assert.NoError(t, cpm.RemoveCheckpoint(testingCheckpoint), "could not remove testing checkpoint") - - cs, err := NewCheckpointState(testingDir, testingCheckpoint) - assert.NoError(t, err, "could not create testing checkpointState instance") - err = cs.Store(claimInfoStateList) - assert.NoError(t, err, "could not store ClaimInfoState") - checkpoint := NewDRAManagerCheckpoint() - cpm.GetCheckpoint(testingCheckpoint, checkpoint) - checkpointData, err := checkpoint.MarshalCheckpoint() - assert.NoError(t, err, "could not Marshal Checkpoint") - assert.Equal(t, expectedCheckpoint, string(checkpointData), "expected ClaimInfoState does not equal to restored one") - - // NewCheckpointState with an empty checkpointName should return an error - if _, err = NewCheckpointState(testingDir, ""); err == nil { - t.Fatal("expected error but got nil") - } -} - -func requireNoCheckpointError(t *testing.T, err error) { - t.Helper() - var cksumErr *cmerrors.CorruptCheckpointError - if errors.As(err, &cksumErr) { - t.Fatalf("unexpected corrupt checkpoint, expected checksum %d, got %d", cksumErr.ExpectedCS, cksumErr.ActualCS) - } else { - require.NoError(t, err, "could not restore checkpoint") - } -}