From e7256e08d34f391e44a97b82deae348c066f39a7 Mon Sep 17 00:00:00 2001 From: Moshe Levi Date: Thu, 16 Feb 2023 23:27:26 +0200 Subject: [PATCH] kubelet dra: add checkpointing mechanism in the DRA Manager The checkpointing mechanism will repopulate DRA Manager in-memory cache on kubelet restart. This will ensure that the information needed by the PodResources API is available across a kubelet restart. The ClaimInfoState struct represent the DRA Manager in-memory cache state in checkpoint. It is embedd in the ClaimInfo which also include the annotation field. The separation between the in-memory cache and the cache state in the checkpoint is so we won't be tied to the in-memory cache struct which may change in the future. In the ClaimInfoState we save the minimal required fields to restore the in-memory cache. Signed-off-by: Moshe Levi --- pkg/kubelet/cm/container_manager_linux.go | 2 +- pkg/kubelet/cm/dra/cdi.go | 22 +++ pkg/kubelet/cm/dra/claiminfo.go | 111 +++++++++++----- pkg/kubelet/cm/dra/manager.go | 133 ++++++++++--------- pkg/kubelet/cm/dra/state/checkpoint.go | 68 ++++++++++ pkg/kubelet/cm/dra/state/state_checkpoint.go | 115 ++++++++++++++++ 6 files changed, 348 insertions(+), 103 deletions(-) create mode 100644 pkg/kubelet/cm/dra/state/checkpoint.go create mode 100644 pkg/kubelet/cm/dra/state/state_checkpoint.go diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index 7d735e8bfd7..6cc867a9958 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -315,7 +315,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I // initialize DRA manager if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager") - cm.draManager, err = dra.NewManagerImpl(kubeClient) + cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir) if err != nil { return nil, err } diff --git a/pkg/kubelet/cm/dra/cdi.go b/pkg/kubelet/cm/dra/cdi.go index 3d80891f9d2..b6118337817 100644 --- a/pkg/kubelet/cm/dra/cdi.go +++ b/pkg/kubelet/cm/dra/cdi.go @@ -29,6 +29,9 @@ import ( "errors" "fmt" "strings" + + "k8s.io/apimachinery/pkg/types" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) const ( @@ -36,6 +39,25 @@ const ( annotationPrefix = "cdi.k8s.io/" ) +// generate container annotations using CDI UpdateAnnotations API. +func generateCDIAnnotations( + claimUID types.UID, + driverName string, + cdiDevices []string, +) ([]kubecontainer.Annotation, error) { + annotations, err := updateAnnotations(map[string]string{}, driverName, string(claimUID), cdiDevices) + if err != nil { + return nil, fmt.Errorf("can't generate CDI annotations: %+v", err) + } + + kubeAnnotations := []kubecontainer.Annotation{} + for key, value := range annotations { + kubeAnnotations = append(kubeAnnotations, kubecontainer.Annotation{Name: key, Value: value}) + } + + return kubeAnnotations, nil +} + // updateAnnotations updates annotations with a plugin-specific CDI device // injection request for the given devices. Upon any error a non-nil error // is returned and annotations are left intact. By convention plugin should diff --git a/pkg/kubelet/cm/dra/claiminfo.go b/pkg/kubelet/cm/dra/claiminfo.go index 354f8c8bc43..c230e72d08f 100644 --- a/pkg/kubelet/cm/dra/claiminfo.go +++ b/pkg/kubelet/cm/dra/claiminfo.go @@ -22,6 +22,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/kubelet/cm/dra/state" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) @@ -29,26 +30,7 @@ import ( // to prepare and unprepare a resource claim. type claimInfo struct { sync.RWMutex - - // name of the DRA driver - driverName string - - // claimUID is an UID of the resource claim - claimUID types.UID - - // claimName is a name of the resource claim - claimName string - - // namespace is a claim namespace - namespace string - - // podUIDs is a set of pod UIDs that reference a resource - podUIDs sets.Set[string] - - // cdiDevices is a list of CDI devices returned by the - // GRPC API call NodePrepareResource - cdiDevices []string - + state.ClaimInfoState // annotations is a list of container annotations associated with // a prepared resource annotations []kubecontainer.Annotation @@ -58,41 +40,86 @@ func (res *claimInfo) addPodReference(podUID types.UID) { res.Lock() defer res.Unlock() - res.podUIDs.Insert(string(podUID)) + res.PodUIDs.Insert(string(podUID)) } func (res *claimInfo) deletePodReference(podUID types.UID) { res.Lock() defer res.Unlock() - res.podUIDs.Delete(string(podUID)) + res.PodUIDs.Delete(string(podUID)) } // claimInfoCache is a cache of processed resource claims keyed by namespace + claim name. type claimInfoCache struct { sync.RWMutex + state state.CheckpointState claimInfo map[string]*claimInfo } -// newClaimInfoCache is a function that returns an instance of the claimInfoCache. -func newClaimInfoCache() *claimInfoCache { - return &claimInfoCache{ - claimInfo: make(map[string]*claimInfo), +func newClaimInfo(driverName string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], cdiDevice []string) (*claimInfo, error) { + claimInfoState := state.ClaimInfoState{ + DriverName: driverName, + ClaimUID: claimUID, + ClaimName: claimName, + Namespace: namespace, + PodUIDs: podUIDs, + CdiDevices: cdiDevice, } + // NOTE: Passing CDI device names as annotations is a temporary solution + // It will be removed after all runtimes are updated + // to get CDI device names from the ContainerConfig.CDIDevices field + annotations, err := generateCDIAnnotations(claimUID, driverName, cdiDevice) + if err != nil { + return nil, fmt.Errorf("failed to generate container annotations, err: %+v", err) + } + claimInfo := claimInfo{ + ClaimInfoState: claimInfoState, + annotations: annotations, + } + return &claimInfo, nil } -func (cache *claimInfoCache) add(claim, namespace string, res *claimInfo) error { +// newClaimInfoCache is a function that returns an instance of the claimInfoCache. +func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) { + stateImpl, err := state.NewCheckpointState(stateDir, checkpointName) + if err != nil { + return nil, fmt.Errorf("could not initialize checkpoint manager, please drain node and remove dra state file, err: %+v", err) + } + + curState, err := stateImpl.GetOrCreate() + if err != nil { + return nil, fmt.Errorf("error calling GetOrCreate() on checkpoint state: %v", err) + } + + cache := &claimInfoCache{ + state: stateImpl, + claimInfo: make(map[string]*claimInfo), + } + + for _, entry := range curState { + info, err := newClaimInfo( + entry.DriverName, + entry.ClaimUID, + entry.ClaimName, + entry.Namespace, + entry.PodUIDs, + entry.CdiDevices, + ) + if err != nil { + return nil, fmt.Errorf("failed to create claimInfo %+v: %+v", info, err) + } + cache.add(info) + } + + return cache, nil +} + +func (cache *claimInfoCache) add(res *claimInfo) { cache.Lock() defer cache.Unlock() - key := claim + namespace - if _, ok := cache.claimInfo[key]; ok { - return fmt.Errorf("claim %s, namespace %s already cached", claim, namespace) - } - - cache.claimInfo[claim+namespace] = res - - return nil + cache.claimInfo[res.ClaimName+res.Namespace] = res } func (cache *claimInfoCache) get(claimName, namespace string) *claimInfo { @@ -118,10 +145,22 @@ func (cache *claimInfoCache) hasPodReference(UID types.UID) bool { defer cache.RUnlock() for _, claimInfo := range cache.claimInfo { - if claimInfo.podUIDs.Has(string(UID)) { + if claimInfo.PodUIDs.Has(string(UID)) { return true } } return false } + +func (cache *claimInfoCache) syncToCheckpoint() error { + cache.RLock() + defer cache.RUnlock() + + claimInfoStateList := make(state.ClaimInfoStateList, 0, len(cache.claimInfo)) + for _, infoClaim := range cache.claimInfo { + claimInfoStateList = append(claimInfoStateList, infoClaim.ClaimInfoState) + } + + return cache.state.Store(claimInfoStateList) +} diff --git a/pkg/kubelet/cm/dra/manager.go b/pkg/kubelet/cm/dra/manager.go index 2513af1b498..b3df135d803 100644 --- a/pkg/kubelet/cm/dra/manager.go +++ b/pkg/kubelet/cm/dra/manager.go @@ -31,6 +31,9 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" ) +// draManagerStateFileName is the file name where dra manager stores its state +const draManagerStateFileName = "dra_manager_state" + // ManagerImpl is the structure in charge of managing DRA resource Plugins. type ManagerImpl struct { // cache contains cached claim info @@ -41,36 +44,22 @@ type ManagerImpl struct { } // NewManagerImpl creates a new manager. -func NewManagerImpl(kubeClient clientset.Interface) (*ManagerImpl, error) { +func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (*ManagerImpl, error) { klog.V(2).InfoS("Creating DRA manager") + claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName) + if err != nil { + return nil, fmt.Errorf("failed to create claimInfo cache: %+v", err) + } + manager := &ManagerImpl{ - cache: newClaimInfoCache(), + cache: claimInfoCache, kubeClient: kubeClient, } return manager, nil } -// Generate container annotations using CDI UpdateAnnotations API. -func generateCDIAnnotations( - claimUID types.UID, - driverName string, - cdiDevices []string, -) ([]kubecontainer.Annotation, error) { - annotations, err := updateAnnotations(map[string]string{}, driverName, string(claimUID), cdiDevices) - if err != nil { - return nil, fmt.Errorf("can't generate CDI annotations: %+v", err) - } - - kubeAnnotations := []kubecontainer.Annotation{} - for key, value := range annotations { - kubeAnnotations = append(kubeAnnotations, kubecontainer.Annotation{Name: key, Value: value}) - } - - return kubeAnnotations, nil -} - // PrepareResources attempts to prepare all of the required resource // plugin resources for the input container, issue an NodePrepareResource rpc request // for each new resource requirement, process their responses and update the cached @@ -83,10 +72,14 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i]) klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name) + // Resource is already prepared, add pod UID to it if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil { - // resource is already prepared, add pod UID to it + // We delay checkpointing of this change until this call returns successfully. + // It is OK to do this because we will only return successfully from this call if + // the checkpoint has succeeded. That means if the kubelet is ever restarted + // before this checkpoint succeeds, the pod whose resources are being prepared + // would never have started, so it's OK (actually correct) to not include it in the cache. claimInfo.addPodReference(pod.UID) - continue } @@ -126,39 +119,36 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error { klog.V(3).InfoS("NodePrepareResource succeeded", "response", response) - // NOTE: Passing CDI device names as annotations is a temporary solution - // It will be removed after all runtimes are updated - // to get CDI device names from the ContainerConfig.CDIDevices field - annotations, err := generateCDIAnnotations(resourceClaim.UID, driverName, response.CdiDevices) - if err != nil { - return fmt.Errorf("failed to generate container annotations, err: %+v", err) - } - - // Cache prepared resource - err = m.cache.add( + // TODO: We are adding the claimInfo struct to the cache and syncing it to the checkpoint *after* the NodePrepareResource + // call has completed. This will cause issues if the kubelet gets restarted between NodePrepareResource and syncToCheckpoint. + // It will result in not calling NodeUnprepareResource for this claim because no claimInfo will be synced back to the cache + // for it after the restart. We need to resolve this issue before moving to beta. + claimInfo, err := newClaimInfo( + driverName, + resourceClaim.UID, resourceClaim.Name, resourceClaim.Namespace, - &claimInfo{ - driverName: driverName, - claimUID: resourceClaim.UID, - claimName: resourceClaim.Name, - namespace: resourceClaim.Namespace, - podUIDs: sets.New(string(pod.UID)), - cdiDevices: response.CdiDevices, - annotations: annotations, - }) + sets.New(string(pod.UID)), + response.CdiDevices) if err != nil { - return fmt.Errorf( - "failed to cache prepared resource, claim: %s(%s), err: %+v", - resourceClaim.Name, - resourceClaim.UID, - err, - ) + return fmt.Errorf("newClaimInfo failed, claim UID: %s, claim name: %s, claim namespace: %s, err: %+v", + resourceClaim.UID, resourceClaim.Name, resourceClaim.Namespace, err) + } + + m.cache.add(claimInfo) + // Checkpoint to reduce redundant calls to NodePrepareResource() after a kubelet restart. + err = m.cache.syncToCheckpoint() + if err != nil { + return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) } } } } - + // Checkpoint to capture all of the previous addPodReference() calls. + err := m.cache.syncToCheckpoint() + if err != nil { + return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) + } return nil } @@ -181,9 +171,9 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, claimName) } - klog.V(3).InfoS("add resource annotations", "claim", claimName, "annotations", claimInfo.annotations) + klog.V(3).InfoS("Add resource annotations", "claim", claimName, "annotations", claimInfo.annotations) annotations = append(annotations, claimInfo.annotations...) - for _, cdiDevice := range claimInfo.cdiDevices { + for _, cdiDevice := range claimInfo.CdiDevices { cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: cdiDevice}) } } @@ -208,43 +198,54 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error { } // Skip calling NodeUnprepareResource if other pods are still referencing it - if len(claimInfo.podUIDs) > 1 { + if len(claimInfo.PodUIDs) > 1 { + // We delay checkpointing of this change until this call returns successfully. + // It is OK to do this because we will only return successfully from this call if + // the checkpoint has succeeded. That means if the kubelet is ever restarted + // before this checkpoint succeeds, we will simply call into this (idempotent) + // function again. claimInfo.deletePodReference(pod.UID) continue } // Call NodeUnprepareResource only for the last pod that references the claim - client, err := dra.NewDRAPluginClient(claimInfo.driverName) + client, err := dra.NewDRAPluginClient(claimInfo.DriverName) if err != nil { - return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.driverName, err) + return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.DriverName, err) } response, err := client.NodeUnprepareResource( context.Background(), - claimInfo.namespace, - claimInfo.claimUID, - claimInfo.claimName, - claimInfo.cdiDevices) + claimInfo.Namespace, + claimInfo.ClaimUID, + claimInfo.ClaimName, + claimInfo.CdiDevices) if err != nil { return fmt.Errorf( "NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v", - pod.Name, - claimInfo.claimUID, - claimInfo.claimName, - claimInfo.cdiDevices, err) + pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, claimInfo.CdiDevices, err) } + klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response) + // Delete last pod UID only if NodeUnprepareResource call succeeds. // This ensures that status manager doesn't enter termination status // for the pod. This logic is implemented in the m.PodMightNeedToUnprepareResources // and in the claimInfo.hasPodReference. claimInfo.deletePodReference(pod.UID) - klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response) - // delete resource from the cache - m.cache.delete(claimInfo.claimName, pod.Namespace) + m.cache.delete(claimInfo.ClaimName, pod.Namespace) + // Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart. + err = m.cache.syncToCheckpoint() + if err != nil { + return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) + } + } + // Checkpoint to capture all of the previous deletePodReference() calls. + err := m.cache.syncToCheckpoint() + if err != nil { + return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) } - return nil } diff --git a/pkg/kubelet/cm/dra/state/checkpoint.go b/pkg/kubelet/cm/dra/state/checkpoint.go new file mode 100644 index 00000000000..7cce6118182 --- /dev/null +++ b/pkg/kubelet/cm/dra/state/checkpoint.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "encoding/json" + + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum" +) + +var _ checkpointmanager.Checkpoint = &DRAManagerCheckpoint{} + +const checkpointVersion = "v1" + +// DRAManagerCheckpoint struct is used to store pod dynamic resources assignments in a checkpoint +type DRAManagerCheckpoint struct { + Version string `json:"version"` + Entries ClaimInfoStateList `json:"entries,omitempty"` + Checksum checksum.Checksum `json:"checksum"` +} + +// List of claim info to store in checkpoint +type ClaimInfoStateList []ClaimInfoState + +// NewDRAManagerCheckpoint returns an instance of Checkpoint +func NewDRAManagerCheckpoint() *DRAManagerCheckpoint { + return &DRAManagerCheckpoint{ + Version: checkpointVersion, + Entries: ClaimInfoStateList{}, + } +} + +// MarshalCheckpoint returns marshalled checkpoint +func (dc *DRAManagerCheckpoint) MarshalCheckpoint() ([]byte, error) { + // make sure checksum wasn't set before so it doesn't affect output checksum + dc.Checksum = 0 + dc.Checksum = checksum.New(dc) + return json.Marshal(*dc) +} + +// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint +func (dc *DRAManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error { + return json.Unmarshal(blob, dc) +} + +// VerifyChecksum verifies that current checksum of checkpoint is valid +func (dc *DRAManagerCheckpoint) VerifyChecksum() error { + ck := dc.Checksum + dc.Checksum = 0 + err := ck.Verify(dc) + dc.Checksum = ck + return err +} diff --git a/pkg/kubelet/cm/dra/state/state_checkpoint.go b/pkg/kubelet/cm/dra/state/state_checkpoint.go new file mode 100644 index 00000000000..387c02eef2a --- /dev/null +++ b/pkg/kubelet/cm/dra/state/state_checkpoint.go @@ -0,0 +1,115 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package state + +import ( + "fmt" + "sync" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors" +) + +var _ CheckpointState = &stateCheckpoint{} + +// CheckpointState interface provides to get and store state +type CheckpointState interface { + GetOrCreate() (ClaimInfoStateList, error) + Store(ClaimInfoStateList) error +} + +// ClaimInfoState is used to store claim info state in a checkpoint +type ClaimInfoState struct { + // Name of the DRA driver + DriverName string + + // ClaimUID is an UID of the resource claim + ClaimUID types.UID + + // ClaimName is a name of the resource claim + ClaimName string + + // Namespace is a claim namespace + Namespace string + + // PodUIDs is a set of pod UIDs that reference a resource + PodUIDs sets.Set[string] + + // CdiDevices is a list of CDI devices returned by the + // GRPC API call NodePrepareResource + CdiDevices []string +} + +type stateCheckpoint struct { + sync.RWMutex + checkpointManager checkpointmanager.CheckpointManager + checkpointName string +} + +// NewCheckpointState creates new State for keeping track of claim info with checkpoint backend +func NewCheckpointState(stateDir, checkpointName string) (*stateCheckpoint, error) { + checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir) + if err != nil { + return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err) + } + stateCheckpoint := &stateCheckpoint{ + checkpointManager: checkpointManager, + checkpointName: checkpointName, + } + + return stateCheckpoint, nil +} + +// get state from a checkpoint and creates it if it doesn't exist +func (sc *stateCheckpoint) GetOrCreate() (ClaimInfoStateList, error) { + sc.Lock() + defer sc.Unlock() + + checkpoint := NewDRAManagerCheckpoint() + err := sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint) + if err == errors.ErrCheckpointNotFound { + sc.store(ClaimInfoStateList{}) + return ClaimInfoStateList{}, nil + } + if err != nil { + return nil, fmt.Errorf("failed to get checkpoint %v: %v", sc.checkpointName, err) + } + + return checkpoint.Entries, nil +} + +// saves state to a checkpoint +func (sc *stateCheckpoint) Store(claimInfoStateList ClaimInfoStateList) error { + sc.Lock() + defer sc.Unlock() + + return sc.store(claimInfoStateList) +} + +// saves state to a checkpoint, caller is responsible for locking +func (sc *stateCheckpoint) store(claimInfoStateList ClaimInfoStateList) error { + checkpoint := NewDRAManagerCheckpoint() + checkpoint.Entries = claimInfoStateList + + err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint) + if err != nil { + return fmt.Errorf("could not save checkpoint %s: %v", sc.checkpointName, err) + } + return nil +}