From f6431c613868470a500c1268af4e7c5978d074ed Mon Sep 17 00:00:00 2001 From: Ed Bartosh Date: Thu, 13 Jul 2023 16:04:02 +0300 Subject: [PATCH] DRA: don't query claims from API server When a pod is force-deleted UnprepareResources fails to get a claim from an API server. PrepareResources should cache claim info required by the UnprepareResources so that UnprepareResources would get it from the cache instead of querying API server. --- pkg/kubelet/cm/dra/claiminfo.go | 17 +++-- pkg/kubelet/cm/dra/manager.go | 29 ++------- pkg/kubelet/cm/dra/state/checkpoint.go | 54 +++++++++++++++ pkg/kubelet/cm/dra/state/state_checkpoint.go | 30 +++++++++ .../cm/dra/state/state_checkpoint_test.go | 65 +++++++++++++++++-- 5 files changed, 161 insertions(+), 34 deletions(-) diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index ae4c12a67b6..7266f9e72b2 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -20,6 +20,7 @@ import ( "fmt" "sync" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/kubelet/cm/dra/state" @@ -80,14 +81,15 @@ type claimInfoCache struct { claimInfo map[string]*ClaimInfo } -func newClaimInfo(driverName, className string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string]) *ClaimInfo { +func newClaimInfo(driverName, className string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], resourceHandles []resourcev1alpha2.ResourceHandle) *ClaimInfo { claimInfoState := state.ClaimInfoState{ - DriverName: driverName, - ClassName: className, - ClaimUID: claimUID, - ClaimName: claimName, - Namespace: namespace, - PodUIDs: podUIDs, + DriverName: driverName, + ClassName: className, + ClaimUID: claimUID, + ClaimName: claimName, + Namespace: namespace, + PodUIDs: podUIDs, + ResourceHandles: resourceHandles, } claimInfo := ClaimInfo{ ClaimInfoState: claimInfoState, @@ -120,6 +122,7 @@ func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) entry.ClaimName, entry.Namespace, entry.PodUIDs, + entry.ResourceHandles, ) for pluginName, cdiDevices := range entry.CDIDevices { err := info.addCDIDevices(pluginName, cdiDevices) diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index fcc8b30a2a3..703eae58b4f 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -140,6 +140,7 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { resourceClaim.Name, resourceClaim.Namespace, sets.New(string(pod.UID)), + resourceHandles, ) // Loop through all plugins and prepare for calling NodePrepareResources. @@ -342,26 +343,8 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error { continue } - // Query claim object from the API server - resourceClaim, err := m.kubeClient.ResourceV1alpha2().ResourceClaims(pod.Namespace).Get( - context.TODO(), - *claimName, - metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("failed to fetch ResourceClaim %s referenced by pod %s: %+v", *claimName, pod.Name, err) - } - - // Grab the allocation.resourceHandles. If there are no - // allocation.resourceHandles, create a single resourceHandle with no - // content. This will trigger processing of this claim by a single - // kubelet plugin whose name matches resourceClaim.Status.DriverName. - resourceHandles := resourceClaim.Status.Allocation.ResourceHandles - if len(resourceHandles) == 0 { - resourceHandles = make([]resourcev1alpha2.ResourceHandle, 1) - } - // Loop through all plugins and prepare for calling NodeUnprepareResources. - for _, resourceHandle := range resourceHandles { + for _, resourceHandle := range claimInfo.ResourceHandles { // If no DriverName is provided in the resourceHandle, we // use the DriverName from the status pluginName := resourceHandle.DriverName @@ -370,14 +353,14 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error { } claim := &drapb.Claim{ - Namespace: resourceClaim.Namespace, - Uid: string(resourceClaim.UID), - Name: resourceClaim.Name, + Namespace: claimInfo.Namespace, + Uid: string(claimInfo.ClaimUID), + Name: claimInfo.ClaimName, ResourceHandle: resourceHandle.Data, } batches[pluginName] = append(batches[pluginName], claim) } - claimInfos[resourceClaim.UID] = claimInfo + claimInfos[claimInfo.ClaimUID] = claimInfo } // Call NodeUnprepareResources for all claims in each batch. diff --git a/pkg/kubelet/cm/dra/state/checkpoint.go b/pkg/kubelet/cm/dra/state/checkpoint.go index 7cce6118182..7c44f12eea9 100644 --- a/pkg/kubelet/cm/dra/state/checkpoint.go +++ b/pkg/kubelet/cm/dra/state/checkpoint.go @@ -18,9 +18,14 @@ package state import ( "encoding/json" + "fmt" + "hash/fnv" + "strings" + "k8s.io/apimachinery/pkg/util/dump" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" ) var _ checkpointmanager.Checkpoint = &DRAManagerCheckpoint{} @@ -34,9 +39,20 @@ type DRAManagerCheckpoint struct { Checksum checksum.Checksum `json:"checksum"` } +// DraManagerCheckpoint struct is an old implementation of the DraManagerCheckpoint +type DRAManagerCheckpointWithoutResourceHandles struct { + Version string `json:"version"` + Entries ClaimInfoStateListWithoutResourceHandles `json:"entries,omitempty"` + Checksum checksum.Checksum `json:"checksum"` +} + // List of claim info to store in checkpoint type ClaimInfoStateList []ClaimInfoState +// List of claim info to store in checkpoint +// TODO: remove in Beta +type ClaimInfoStateListWithoutResourceHandles []ClaimInfoStateWithoutResourceHandles + // NewDRAManagerCheckpoint returns an instance of Checkpoint func NewDRAManagerCheckpoint() *DRAManagerCheckpoint { return &DRAManagerCheckpoint{ @@ -63,6 +79,44 @@ func (dc *DRAManagerCheckpoint) VerifyChecksum() error { ck := dc.Checksum dc.Checksum = 0 err := ck.Verify(dc) + if err == errors.ErrCorruptCheckpoint { + // Verify with old structs without ResourceHandles field + // TODO: remove in Beta + err = verifyChecksumWithoutResourceHandles(dc, ck) + } dc.Checksum = ck return err } + +// verifyChecksumWithoutResourceHandles is a helper function that verifies checksum of the +// checkpoint in the old format, without ResourceHandles field. +// TODO: remove in Beta. +func verifyChecksumWithoutResourceHandles(dc *DRAManagerCheckpoint, checkSum checksum.Checksum) error { + entries := ClaimInfoStateListWithoutResourceHandles{} + for _, entry := range dc.Entries { + entries = append(entries, ClaimInfoStateWithoutResourceHandles{ + DriverName: entry.DriverName, + ClassName: entry.ClassName, + ClaimUID: entry.ClaimUID, + ClaimName: entry.ClaimName, + Namespace: entry.Namespace, + PodUIDs: entry.PodUIDs, + CDIDevices: entry.CDIDevices, + }) + } + oldcheckpoint := &DRAManagerCheckpointWithoutResourceHandles{ + Version: checkpointVersion, + Entries: entries, + Checksum: 0, + } + // Calculate checksum for old checkpoint + object := dump.ForHash(oldcheckpoint) + object = strings.Replace(object, "DRAManagerCheckpointWithoutResourceHandles", "DRAManagerCheckpoint", 1) + object = strings.Replace(object, "ClaimInfoStateListWithoutResourceHandles", "ClaimInfoStateList", 1) + hash := fnv.New32a() + fmt.Fprintf(hash, "%v", object) + if checkSum != checksum.Checksum(hash.Sum32()) { + return errors.ErrCorruptCheckpoint + } + return nil +} diff --git a/pkg/kubelet/cm/dra/state/state_checkpoint.go b/pkg/kubelet/cm/dra/state/state_checkpoint.go index 7a3d3d6c3c3..a391f0a13ca 100644 --- a/pkg/kubelet/cm/dra/state/state_checkpoint.go +++ b/pkg/kubelet/cm/dra/state/state_checkpoint.go @@ -20,6 +20,7 @@ import ( "fmt" "sync" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" @@ -54,6 +55,35 @@ type ClaimInfoState struct { // PodUIDs is a set of pod UIDs that reference a resource PodUIDs sets.Set[string] + // ResourceHandles is a list of opaque resource data for processing by a specific kubelet plugin + ResourceHandles []resourcev1alpha2.ResourceHandle + + // CDIDevices is a map of DriverName --> CDI devices returned by the + // GRPC API call NodePrepareResource + CDIDevices map[string][]string +} + +// ClaimInfoStateWithoutResourceHandles is an old implementation of the ClaimInfoState +// TODO: remove in Beta +type ClaimInfoStateWithoutResourceHandles struct { + // Name of the DRA driver + DriverName string + + // ClassName is a resource class of the claim + ClassName string + + // ClaimUID is an UID of the resource claim + ClaimUID types.UID + + // ClaimName is a name of the resource claim + ClaimName string + + // Namespace is a claim namespace + Namespace string + + // PodUIDs is a set of pod UIDs that reference a resource + PodUIDs sets.Set[string] + // CDIDevices is a map of DriverName --> CDI devices returned by the // GRPC API call NodePrepareResource CDIDevices map[string][]string diff --git a/pkg/kubelet/cm/dra/state/state_checkpoint_test.go b/pkg/kubelet/cm/dra/state/state_checkpoint_test.go index 1229dbb3764..6e2a49ecdb6 100644 --- a/pkg/kubelet/cm/dra/state/state_checkpoint_test.go +++ b/pkg/kubelet/cm/dra/state/state_checkpoint_test.go @@ -18,11 +18,13 @@ package state import ( "os" + "path" "strings" "testing" "github.com/stretchr/testify/assert" + resourcev1alpha2 "k8s.io/api/resource/v1alpha2" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" testutil "k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state/testing" @@ -50,7 +52,7 @@ func TestCheckpointGetOrCreate(t *testing.T) { }, { "Restore checkpoint - single claim", - `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":153446146}`, + `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":[{"driverName":"test-driver.cdi.k8s.io","data":"{\"a\": \"b\"}"}],"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":4194867564}`, "", []ClaimInfoState{ { @@ -60,6 +62,12 @@ func TestCheckpointGetOrCreate(t *testing.T) { ClaimName: "example", Namespace: "default", PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + ResourceHandles: []resourcev1alpha2.ResourceHandle{ + { + DriverName: "test-driver.cdi.k8s.io", + Data: `{"a": "b"}`, + }, + }, CDIDevices: map[string][]string{ "test-driver.cdi.k8s.io": {"example.com/example=cdi-example"}, }, @@ -68,7 +76,7 @@ func TestCheckpointGetOrCreate(t *testing.T) { }, { "Restore checkpoint - single claim - multiple devices", - `{"version":"v1","entries":[{"DriverName":"meta-test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver-1.cdi.k8s.io":["example-1.com/example-1=cdi-example-1"],"test-driver-2.cdi.k8s.io":["example-2.com/example-2=cdi-example-2"]}}],"checksum":1363630443}`, + `{"version":"v1","entries":[{"DriverName":"meta-test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":[{"driverName":"test-driver-1.cdi.k8s.io","data":"{\"a\": \"b\"}"},{"driverName":"test-driver-2.cdi.k8s.io","data":"{\"c\": \"d\"}"}],"CDIDevices":{"test-driver-1.cdi.k8s.io":["example-1.com/example-1=cdi-example-1"],"test-driver-2.cdi.k8s.io":["example-2.com/example-2=cdi-example-2"]}}],"checksum":360176657}`, "", []ClaimInfoState{ { @@ -78,6 +86,16 @@ func TestCheckpointGetOrCreate(t *testing.T) { ClaimName: "example", Namespace: "default", PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + ResourceHandles: []resourcev1alpha2.ResourceHandle{ + { + DriverName: "test-driver-1.cdi.k8s.io", + Data: `{"a": "b"}`, + }, + { + DriverName: "test-driver-2.cdi.k8s.io", + Data: `{"c": "d"}`, + }, + }, CDIDevices: map[string][]string{ "test-driver-1.cdi.k8s.io": {"example-1.com/example-1=cdi-example-1"}, "test-driver-2.cdi.k8s.io": {"example-2.com/example-2=cdi-example-2"}, @@ -87,7 +105,7 @@ func TestCheckpointGetOrCreate(t *testing.T) { }, { "Restore checkpoint - multiple claims", - `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name-1","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example-1","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example-1"]}},{"DriverName":"test-driver.cdi.k8s.io","ClaimUID":"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c","ClassName":"class-name-2","ClaimName":"example-2","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example-2"]}}],"checksum":1978566460}`, + `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name-1","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example-1","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":[{"driverName":"test-driver.cdi.k8s.io","data":"{\"a\": \"b\"}"}],"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example-1"]}},{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name-2","ClaimUID":"4cf8db2d-06c0-7d70-1a51-e59b25b2c16c","ClaimName":"example-2","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":[{"driverName":"test-driver.cdi.k8s.io","data":"{\"c\": \"d\"}"}],"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example-2"]}}],"checksum":103176902}`, "", []ClaimInfoState{ { @@ -97,6 +115,12 @@ func TestCheckpointGetOrCreate(t *testing.T) { ClaimName: "example-1", Namespace: "default", PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + ResourceHandles: []resourcev1alpha2.ResourceHandle{ + { + DriverName: "test-driver.cdi.k8s.io", + Data: `{"a": "b"}`, + }, + }, CDIDevices: map[string][]string{ "test-driver.cdi.k8s.io": {"example.com/example=cdi-example-1"}, }, @@ -108,6 +132,12 @@ func TestCheckpointGetOrCreate(t *testing.T) { ClaimName: "example-2", Namespace: "default", PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + ResourceHandles: []resourcev1alpha2.ResourceHandle{ + { + DriverName: "test-driver.cdi.k8s.io", + Data: `{"c": "d"}`, + }, + }, CDIDevices: map[string][]string{ "test-driver.cdi.k8s.io": {"example.com/example=cdi-example-2"}, }, @@ -153,6 +183,7 @@ func TestCheckpointGetOrCreate(t *testing.T) { var state ClaimInfoStateList checkpointState, err := NewCheckpointState(testingDir, testingCheckpoint) + if err == nil { state, err = checkpointState.GetOrCreate() } @@ -176,12 +207,18 @@ func TestCheckpointStateStore(t *testing.T) { ClaimName: "example", Namespace: "default", PodUIDs: sets.New("139cdb46-f989-4f17-9561-ca10cfb509a6"), + ResourceHandles: []resourcev1alpha2.ResourceHandle{ + { + DriverName: "test-driver.cdi.k8s.io", + Data: `{"a": "b"}`, + }, + }, CDIDevices: map[string][]string{ "test-driver.cdi.k8s.io": {"example.com/example=cdi-example"}, }, } - expectedCheckpoint := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":153446146}` + expectedCheckpoint := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":[{"driverName":"test-driver.cdi.k8s.io","data":"{\"a\": \"b\"}"}],"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":4194867564}` // Should return an error, stateDir cannot be an empty string if _, err := NewCheckpointState("", testingCheckpoint); err == nil { @@ -214,3 +251,23 @@ func TestCheckpointStateStore(t *testing.T) { t.Fatal("expected error but got nil") } } + +func TestOldCheckpointRestore(t *testing.T) { + testingDir := t.TempDir() + cpm, err := checkpointmanager.NewCheckpointManager(testingDir) + assert.NoError(t, err, "could not create testing checkpoint manager") + + oldCheckpointData := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":153446146}` + err = os.WriteFile(path.Join(testingDir, testingCheckpoint), []byte(oldCheckpointData), 0644) + assert.NoError(t, err, "could not store checkpoint data") + + checkpoint := NewDRAManagerCheckpoint() + err = cpm.GetCheckpoint(testingCheckpoint, checkpoint) + assert.NoError(t, err, "could not restore checkpoint") + + checkpointData, err := checkpoint.MarshalCheckpoint() + assert.NoError(t, err, "could not Marshal Checkpoint") + + expectedData := `{"version":"v1","entries":[{"DriverName":"test-driver.cdi.k8s.io","ClassName":"class-name","ClaimUID":"067798be-454e-4be4-9047-1aa06aea63f7","ClaimName":"example","Namespace":"default","PodUIDs":{"139cdb46-f989-4f17-9561-ca10cfb509a6":{}},"ResourceHandles":null,"CDIDevices":{"test-driver.cdi.k8s.io":["example.com/example=cdi-example"]}}],"checksum":453625682}` + assert.Equal(t, expectedData, string(checkpointData), "expected ClaimInfoState does not equal to restored one") +}