kubelet dra: add checkpointing mechanism in the DRA Manager

The checkpointing mechanism will repopulate DRA Manager in-memory cache on kubelet restart.
This will ensure that the information needed by the PodResources API is available across
a kubelet restart.

The ClaimInfoState struct represent the DRA Manager in-memory cache state in checkpoint.
It is embedd in the ClaimInfo which also include the annotation field. The separation between
the in-memory cache and the cache state in the checkpoint is so we won't be tied to the in-memory
cache struct which may change in the future. In the ClaimInfoState we save the minimal required fields
to restore the in-memory cache.

Signed-off-by: Moshe Levi <moshele@nvidia.com>
This commit is contained in:
Moshe Levi 2023-02-16 23:27:26 +02:00
parent b99fe0d5b9
commit e7256e08d3
6 changed files with 348 additions and 103 deletions

View File

@ -315,7 +315,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
// initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
cm.draManager, err = dra.NewManagerImpl(kubeClient)
cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir)
if err != nil {
return nil, err
}

View File

@ -29,6 +29,9 @@ import (
"errors"
"fmt"
"strings"
"k8s.io/apimachinery/pkg/types"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
const (
@ -36,6 +39,25 @@ const (
annotationPrefix = "cdi.k8s.io/"
)
// generate container annotations using CDI UpdateAnnotations API.
func generateCDIAnnotations(
claimUID types.UID,
driverName string,
cdiDevices []string,
) ([]kubecontainer.Annotation, error) {
annotations, err := updateAnnotations(map[string]string{}, driverName, string(claimUID), cdiDevices)
if err != nil {
return nil, fmt.Errorf("can't generate CDI annotations: %+v", err)
}
kubeAnnotations := []kubecontainer.Annotation{}
for key, value := range annotations {
kubeAnnotations = append(kubeAnnotations, kubecontainer.Annotation{Name: key, Value: value})
}
return kubeAnnotations, nil
}
// updateAnnotations updates annotations with a plugin-specific CDI device
// injection request for the given devices. Upon any error a non-nil error
// is returned and annotations are left intact. By convention plugin should

View File

@ -22,6 +22,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/cm/dra/state"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
@ -29,26 +30,7 @@ import (
// to prepare and unprepare a resource claim.
type claimInfo struct {
sync.RWMutex
// name of the DRA driver
driverName string
// claimUID is an UID of the resource claim
claimUID types.UID
// claimName is a name of the resource claim
claimName string
// namespace is a claim namespace
namespace string
// podUIDs is a set of pod UIDs that reference a resource
podUIDs sets.Set[string]
// cdiDevices is a list of CDI devices returned by the
// GRPC API call NodePrepareResource
cdiDevices []string
state.ClaimInfoState
// annotations is a list of container annotations associated with
// a prepared resource
annotations []kubecontainer.Annotation
@ -58,41 +40,86 @@ func (res *claimInfo) addPodReference(podUID types.UID) {
res.Lock()
defer res.Unlock()
res.podUIDs.Insert(string(podUID))
res.PodUIDs.Insert(string(podUID))
}
func (res *claimInfo) deletePodReference(podUID types.UID) {
res.Lock()
defer res.Unlock()
res.podUIDs.Delete(string(podUID))
res.PodUIDs.Delete(string(podUID))
}
// claimInfoCache is a cache of processed resource claims keyed by namespace + claim name.
type claimInfoCache struct {
sync.RWMutex
state state.CheckpointState
claimInfo map[string]*claimInfo
}
// newClaimInfoCache is a function that returns an instance of the claimInfoCache.
func newClaimInfoCache() *claimInfoCache {
return &claimInfoCache{
claimInfo: make(map[string]*claimInfo),
func newClaimInfo(driverName string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], cdiDevice []string) (*claimInfo, error) {
claimInfoState := state.ClaimInfoState{
DriverName: driverName,
ClaimUID: claimUID,
ClaimName: claimName,
Namespace: namespace,
PodUIDs: podUIDs,
CdiDevices: cdiDevice,
}
// NOTE: Passing CDI device names as annotations is a temporary solution
// It will be removed after all runtimes are updated
// to get CDI device names from the ContainerConfig.CDIDevices field
annotations, err := generateCDIAnnotations(claimUID, driverName, cdiDevice)
if err != nil {
return nil, fmt.Errorf("failed to generate container annotations, err: %+v", err)
}
claimInfo := claimInfo{
ClaimInfoState: claimInfoState,
annotations: annotations,
}
return &claimInfo, nil
}
func (cache *claimInfoCache) add(claim, namespace string, res *claimInfo) error {
// newClaimInfoCache is a function that returns an instance of the claimInfoCache.
func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error) {
stateImpl, err := state.NewCheckpointState(stateDir, checkpointName)
if err != nil {
return nil, fmt.Errorf("could not initialize checkpoint manager, please drain node and remove dra state file, err: %+v", err)
}
curState, err := stateImpl.GetOrCreate()
if err != nil {
return nil, fmt.Errorf("error calling GetOrCreate() on checkpoint state: %v", err)
}
cache := &claimInfoCache{
state: stateImpl,
claimInfo: make(map[string]*claimInfo),
}
for _, entry := range curState {
info, err := newClaimInfo(
entry.DriverName,
entry.ClaimUID,
entry.ClaimName,
entry.Namespace,
entry.PodUIDs,
entry.CdiDevices,
)
if err != nil {
return nil, fmt.Errorf("failed to create claimInfo %+v: %+v", info, err)
}
cache.add(info)
}
return cache, nil
}
func (cache *claimInfoCache) add(res *claimInfo) {
cache.Lock()
defer cache.Unlock()
key := claim + namespace
if _, ok := cache.claimInfo[key]; ok {
return fmt.Errorf("claim %s, namespace %s already cached", claim, namespace)
}
cache.claimInfo[claim+namespace] = res
return nil
cache.claimInfo[res.ClaimName+res.Namespace] = res
}
func (cache *claimInfoCache) get(claimName, namespace string) *claimInfo {
@ -118,10 +145,22 @@ func (cache *claimInfoCache) hasPodReference(UID types.UID) bool {
defer cache.RUnlock()
for _, claimInfo := range cache.claimInfo {
if claimInfo.podUIDs.Has(string(UID)) {
if claimInfo.PodUIDs.Has(string(UID)) {
return true
}
}
return false
}
func (cache *claimInfoCache) syncToCheckpoint() error {
cache.RLock()
defer cache.RUnlock()
claimInfoStateList := make(state.ClaimInfoStateList, 0, len(cache.claimInfo))
for _, infoClaim := range cache.claimInfo {
claimInfoStateList = append(claimInfoStateList, infoClaim.ClaimInfoState)
}
return cache.state.Store(claimInfoStateList)
}

View File

@ -31,6 +31,9 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
)
// draManagerStateFileName is the file name where dra manager stores its state
const draManagerStateFileName = "dra_manager_state"
// ManagerImpl is the structure in charge of managing DRA resource Plugins.
type ManagerImpl struct {
// cache contains cached claim info
@ -41,36 +44,22 @@ type ManagerImpl struct {
}
// NewManagerImpl creates a new manager.
func NewManagerImpl(kubeClient clientset.Interface) (*ManagerImpl, error) {
func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (*ManagerImpl, error) {
klog.V(2).InfoS("Creating DRA manager")
claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName)
if err != nil {
return nil, fmt.Errorf("failed to create claimInfo cache: %+v", err)
}
manager := &ManagerImpl{
cache: newClaimInfoCache(),
cache: claimInfoCache,
kubeClient: kubeClient,
}
return manager, nil
}
// Generate container annotations using CDI UpdateAnnotations API.
func generateCDIAnnotations(
claimUID types.UID,
driverName string,
cdiDevices []string,
) ([]kubecontainer.Annotation, error) {
annotations, err := updateAnnotations(map[string]string{}, driverName, string(claimUID), cdiDevices)
if err != nil {
return nil, fmt.Errorf("can't generate CDI annotations: %+v", err)
}
kubeAnnotations := []kubecontainer.Annotation{}
for key, value := range annotations {
kubeAnnotations = append(kubeAnnotations, kubecontainer.Annotation{Name: key, Value: value})
}
return kubeAnnotations, nil
}
// PrepareResources attempts to prepare all of the required resource
// plugin resources for the input container, issue an NodePrepareResource rpc request
// for each new resource requirement, process their responses and update the cached
@ -83,10 +72,14 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
claimName := resourceclaim.Name(pod, &pod.Spec.ResourceClaims[i])
klog.V(3).InfoS("Processing resource", "claim", claimName, "pod", pod.Name)
// Resource is already prepared, add pod UID to it
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// resource is already prepared, add pod UID to it
// We delay checkpointing of this change until this call returns successfully.
// It is OK to do this because we will only return successfully from this call if
// the checkpoint has succeeded. That means if the kubelet is ever restarted
// before this checkpoint succeeds, the pod whose resources are being prepared
// would never have started, so it's OK (actually correct) to not include it in the cache.
claimInfo.addPodReference(pod.UID)
continue
}
@ -126,39 +119,36 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
klog.V(3).InfoS("NodePrepareResource succeeded", "response", response)
// NOTE: Passing CDI device names as annotations is a temporary solution
// It will be removed after all runtimes are updated
// to get CDI device names from the ContainerConfig.CDIDevices field
annotations, err := generateCDIAnnotations(resourceClaim.UID, driverName, response.CdiDevices)
if err != nil {
return fmt.Errorf("failed to generate container annotations, err: %+v", err)
}
// Cache prepared resource
err = m.cache.add(
// TODO: We are adding the claimInfo struct to the cache and syncing it to the checkpoint *after* the NodePrepareResource
// call has completed. This will cause issues if the kubelet gets restarted between NodePrepareResource and syncToCheckpoint.
// It will result in not calling NodeUnprepareResource for this claim because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue before moving to beta.
claimInfo, err := newClaimInfo(
driverName,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Namespace,
&claimInfo{
driverName: driverName,
claimUID: resourceClaim.UID,
claimName: resourceClaim.Name,
namespace: resourceClaim.Namespace,
podUIDs: sets.New(string(pod.UID)),
cdiDevices: response.CdiDevices,
annotations: annotations,
})
sets.New(string(pod.UID)),
response.CdiDevices)
if err != nil {
return fmt.Errorf(
"failed to cache prepared resource, claim: %s(%s), err: %+v",
resourceClaim.Name,
resourceClaim.UID,
err,
)
return fmt.Errorf("newClaimInfo failed, claim UID: %s, claim name: %s, claim namespace: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceClaim.Namespace, err)
}
m.cache.add(claimInfo)
// Checkpoint to reduce redundant calls to NodePrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
}
}
}
// Checkpoint to capture all of the previous addPodReference() calls.
err := m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
return nil
}
@ -181,9 +171,9 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
return nil, fmt.Errorf("unable to get resource for namespace: %s, claim: %s", pod.Namespace, claimName)
}
klog.V(3).InfoS("add resource annotations", "claim", claimName, "annotations", claimInfo.annotations)
klog.V(3).InfoS("Add resource annotations", "claim", claimName, "annotations", claimInfo.annotations)
annotations = append(annotations, claimInfo.annotations...)
for _, cdiDevice := range claimInfo.cdiDevices {
for _, cdiDevice := range claimInfo.CdiDevices {
cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: cdiDevice})
}
}
@ -208,43 +198,54 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
}
// Skip calling NodeUnprepareResource if other pods are still referencing it
if len(claimInfo.podUIDs) > 1 {
if len(claimInfo.PodUIDs) > 1 {
// We delay checkpointing of this change until this call returns successfully.
// It is OK to do this because we will only return successfully from this call if
// the checkpoint has succeeded. That means if the kubelet is ever restarted
// before this checkpoint succeeds, we will simply call into this (idempotent)
// function again.
claimInfo.deletePodReference(pod.UID)
continue
}
// Call NodeUnprepareResource only for the last pod that references the claim
client, err := dra.NewDRAPluginClient(claimInfo.driverName)
client, err := dra.NewDRAPluginClient(claimInfo.DriverName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.driverName, err)
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.DriverName, err)
}
response, err := client.NodeUnprepareResource(
context.Background(),
claimInfo.namespace,
claimInfo.claimUID,
claimInfo.claimName,
claimInfo.cdiDevices)
claimInfo.Namespace,
claimInfo.ClaimUID,
claimInfo.ClaimName,
claimInfo.CdiDevices)
if err != nil {
return fmt.Errorf(
"NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v",
pod.Name,
claimInfo.claimUID,
claimInfo.claimName,
claimInfo.cdiDevices, err)
pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, claimInfo.CdiDevices, err)
}
klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response)
// Delete last pod UID only if NodeUnprepareResource call succeeds.
// This ensures that status manager doesn't enter termination status
// for the pod. This logic is implemented in the m.PodMightNeedToUnprepareResources
// and in the claimInfo.hasPodReference.
claimInfo.deletePodReference(pod.UID)
klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response)
// delete resource from the cache
m.cache.delete(claimInfo.claimName, pod.Namespace)
m.cache.delete(claimInfo.ClaimName, pod.Namespace)
// Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
}
// Checkpoint to capture all of the previous deletePodReference() calls.
err := m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
return nil
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"encoding/json"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/checksum"
)
var _ checkpointmanager.Checkpoint = &DRAManagerCheckpoint{}
const checkpointVersion = "v1"
// DRAManagerCheckpoint struct is used to store pod dynamic resources assignments in a checkpoint
type DRAManagerCheckpoint struct {
Version string `json:"version"`
Entries ClaimInfoStateList `json:"entries,omitempty"`
Checksum checksum.Checksum `json:"checksum"`
}
// List of claim info to store in checkpoint
type ClaimInfoStateList []ClaimInfoState
// NewDRAManagerCheckpoint returns an instance of Checkpoint
func NewDRAManagerCheckpoint() *DRAManagerCheckpoint {
return &DRAManagerCheckpoint{
Version: checkpointVersion,
Entries: ClaimInfoStateList{},
}
}
// MarshalCheckpoint returns marshalled checkpoint
func (dc *DRAManagerCheckpoint) MarshalCheckpoint() ([]byte, error) {
// make sure checksum wasn't set before so it doesn't affect output checksum
dc.Checksum = 0
dc.Checksum = checksum.New(dc)
return json.Marshal(*dc)
}
// UnmarshalCheckpoint tries to unmarshal passed bytes to checkpoint
func (dc *DRAManagerCheckpoint) UnmarshalCheckpoint(blob []byte) error {
return json.Unmarshal(blob, dc)
}
// VerifyChecksum verifies that current checksum of checkpoint is valid
func (dc *DRAManagerCheckpoint) VerifyChecksum() error {
ck := dc.Checksum
dc.Checksum = 0
err := ck.Verify(dc)
dc.Checksum = ck
return err
}

View File

@ -0,0 +1,115 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package state
import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
)
var _ CheckpointState = &stateCheckpoint{}
// CheckpointState interface provides to get and store state
type CheckpointState interface {
GetOrCreate() (ClaimInfoStateList, error)
Store(ClaimInfoStateList) error
}
// ClaimInfoState is used to store claim info state in a checkpoint
type ClaimInfoState struct {
// Name of the DRA driver
DriverName string
// ClaimUID is an UID of the resource claim
ClaimUID types.UID
// ClaimName is a name of the resource claim
ClaimName string
// Namespace is a claim namespace
Namespace string
// PodUIDs is a set of pod UIDs that reference a resource
PodUIDs sets.Set[string]
// CdiDevices is a list of CDI devices returned by the
// GRPC API call NodePrepareResource
CdiDevices []string
}
type stateCheckpoint struct {
sync.RWMutex
checkpointManager checkpointmanager.CheckpointManager
checkpointName string
}
// NewCheckpointState creates new State for keeping track of claim info with checkpoint backend
func NewCheckpointState(stateDir, checkpointName string) (*stateCheckpoint, error) {
checkpointManager, err := checkpointmanager.NewCheckpointManager(stateDir)
if err != nil {
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
}
stateCheckpoint := &stateCheckpoint{
checkpointManager: checkpointManager,
checkpointName: checkpointName,
}
return stateCheckpoint, nil
}
// get state from a checkpoint and creates it if it doesn't exist
func (sc *stateCheckpoint) GetOrCreate() (ClaimInfoStateList, error) {
sc.Lock()
defer sc.Unlock()
checkpoint := NewDRAManagerCheckpoint()
err := sc.checkpointManager.GetCheckpoint(sc.checkpointName, checkpoint)
if err == errors.ErrCheckpointNotFound {
sc.store(ClaimInfoStateList{})
return ClaimInfoStateList{}, nil
}
if err != nil {
return nil, fmt.Errorf("failed to get checkpoint %v: %v", sc.checkpointName, err)
}
return checkpoint.Entries, nil
}
// saves state to a checkpoint
func (sc *stateCheckpoint) Store(claimInfoStateList ClaimInfoStateList) error {
sc.Lock()
defer sc.Unlock()
return sc.store(claimInfoStateList)
}
// saves state to a checkpoint, caller is responsible for locking
func (sc *stateCheckpoint) store(claimInfoStateList ClaimInfoStateList) error {
checkpoint := NewDRAManagerCheckpoint()
checkpoint.Entries = claimInfoStateList
err := sc.checkpointManager.CreateCheckpoint(sc.checkpointName, checkpoint)
if err != nil {
return fmt.Errorf("could not save checkpoint %s: %v", sc.checkpointName, err)
}
return nil
}