Update DRAManager to allow multiple plugins to process a single claim

Right now, the v1alpha1 API only passes enough information for one plugin to
process a claim, but the v1alpha2 API will allow for multiple plugins to
process a claim. This commit prepares the code for this upcoming change.

Signed-off-by: Kevin Klues <kklues@nvidia.com>
This commit is contained in:
Kevin Klues
2023-03-11 14:17:37 +00:00
parent 569ed33d78
commit 685688c703
2 changed files with 127 additions and 81 deletions

View File

@@ -57,27 +57,37 @@ type claimInfoCache struct {
claimInfo map[string]*claimInfo claimInfo map[string]*claimInfo
} }
func newClaimInfo(driverName string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string], cdiDevice []string) (*claimInfo, error) { func newClaimInfo(driverName string, claimUID types.UID, claimName, namespace string, podUIDs sets.Set[string]) *claimInfo {
claimInfoState := state.ClaimInfoState{ claimInfoState := state.ClaimInfoState{
DriverName: driverName, DriverName: driverName,
ClaimUID: claimUID, ClaimUID: claimUID,
ClaimName: claimName, ClaimName: claimName,
Namespace: namespace, Namespace: namespace,
PodUIDs: podUIDs, PodUIDs: podUIDs,
CdiDevices: cdiDevice,
}
// NOTE: Passing CDI device names as annotations is a temporary solution
// It will be removed after all runtimes are updated
// to get CDI device names from the ContainerConfig.CDIDevices field
annotations, err := generateCDIAnnotations(claimUID, driverName, cdiDevice)
if err != nil {
return nil, fmt.Errorf("failed to generate container annotations, err: %+v", err)
} }
claimInfo := claimInfo{ claimInfo := claimInfo{
ClaimInfoState: claimInfoState, ClaimInfoState: claimInfoState,
annotations: annotations,
} }
return &claimInfo, nil return &claimInfo
}
func (info *claimInfo) addCDIDevices(pluginName string, cdiDevices []string) error {
// NOTE: Passing CDI device names as annotations is a temporary solution
// It will be removed after all runtimes are updated
// to get CDI device names from the ContainerConfig.CDIDevices field
annotations, err := generateCDIAnnotations(info.ClaimUID, info.DriverName, cdiDevices)
if err != nil {
return fmt.Errorf("failed to generate container annotations, err: %+v", err)
}
if info.CDIDevices == nil {
info.CDIDevices = make(map[string][]string)
}
info.CDIDevices[pluginName] = cdiDevices
info.annotations = append(info.annotations, annotations...)
return nil
} }
// newClaimInfoCache is a function that returns an instance of the claimInfoCache. // newClaimInfoCache is a function that returns an instance of the claimInfoCache.
@@ -98,16 +108,18 @@ func newClaimInfoCache(stateDir, checkpointName string) (*claimInfoCache, error)
} }
for _, entry := range curState { for _, entry := range curState {
info, err := newClaimInfo( info := newClaimInfo(
entry.DriverName, entry.DriverName,
entry.ClaimUID, entry.ClaimUID,
entry.ClaimName, entry.ClaimName,
entry.Namespace, entry.Namespace,
entry.PodUIDs, entry.PodUIDs,
entry.CdiDevices,
) )
if err != nil { for pluginName, cdiDevices := range entry.CDIDevices {
return nil, fmt.Errorf("failed to create claimInfo %+v: %+v", info, err) err := info.addCDIDevices(pluginName, cdiDevices)
if err != nil {
return nil, fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", info, err)
}
} }
cache.add(info) cache.add(info)
} }

View File

@@ -74,11 +74,14 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
// Resource is already prepared, add pod UID to it // Resource is already prepared, add pod UID to it
if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil { if claimInfo := m.cache.get(claimName, pod.Namespace); claimInfo != nil {
// We delay checkpointing of this change until this call returns successfully. // We delay checkpointing of this change until this call
// It is OK to do this because we will only return successfully from this call if // returns successfully. It is OK to do this because we
// the checkpoint has succeeded. That means if the kubelet is ever restarted // will only return successfully from this call if the
// before this checkpoint succeeds, the pod whose resources are being prepared // checkpoint has succeeded. That means if the kubelet is
// would never have started, so it's OK (actually correct) to not include it in the cache. // ever restarted before this checkpoint succeeds, the pod
// whose resources are being prepared would never have
// started, so it's OK (actually correct) to not include it
// in the cache.
claimInfo.addPodReference(pod.UID) claimInfo.addPodReference(pod.UID)
continue continue
} }
@@ -98,48 +101,76 @@ func (m *ManagerImpl) PrepareResources(pod *v1.Pod) error {
pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID) pod.Name, pod.UID, podResourceClaim.Name, resourceClaim.UID)
} }
// Call NodePrepareResource RPC // Build a slice of "resourceHandles" to group the name of the
driverName := resourceClaim.Status.DriverName // kubelet plugin to call NodePrepareResources() on and the
// ResourceHandle data to be processed by that plugin. For now
client, err := dra.NewDRAPluginClient(driverName) // this slice will only have a single entry, where the name of
if err != nil { // the kubelet plugin matches the DriverName. In the future we
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", driverName, err) // plan to allow each claim to be processed by multiple plugins
// (each with their own ResourceHandle) so this code is being
// written in a way to accommodate this.
resourceHandles := []struct {
KubeletPluginName string
Data string
}{
{
KubeletPluginName: resourceClaim.Status.DriverName,
Data: resourceClaim.Status.Allocation.ResourceHandle,
},
} }
response, err := client.NodePrepareResource( // Create a claimInfo object to store the relevant claim info.
context.Background(), claimInfo := newClaimInfo(
resourceClaim.Namespace, resourceClaim.Status.DriverName,
resourceClaim.UID,
resourceClaim.Name,
resourceClaim.Status.Allocation.ResourceHandle)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceClaim.Status.Allocation.ResourceHandle, err)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "response", response)
// TODO: We are adding the claimInfo struct to the cache and syncing it to the checkpoint *after* the NodePrepareResource
// call has completed. This will cause issues if the kubelet gets restarted between NodePrepareResource and syncToCheckpoint.
// It will result in not calling NodeUnprepareResource for this claim because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue before moving to beta.
claimInfo, err := newClaimInfo(
driverName,
resourceClaim.UID, resourceClaim.UID,
resourceClaim.Name, resourceClaim.Name,
resourceClaim.Namespace, resourceClaim.Namespace,
sets.New(string(pod.UID)), sets.New(string(pod.UID)),
response.CdiDevices) )
if err != nil {
return fmt.Errorf("newClaimInfo failed, claim UID: %s, claim name: %s, claim namespace: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceClaim.Namespace, err)
}
m.cache.add(claimInfo) // Walk through each resourceHandle
// Checkpoint to reduce redundant calls to NodePrepareResource() after a kubelet restart. for _, resourceHandle := range resourceHandles {
err = m.cache.syncToCheckpoint() // Call NodePrepareResource RPC for each resourceHandle
if err != nil { client, err := dra.NewDRAPluginClient(resourceHandle.KubeletPluginName)
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", resourceHandle.KubeletPluginName, err)
}
response, err := client.NodePrepareResource(
context.Background(),
resourceClaim.Namespace,
resourceClaim.UID,
resourceClaim.Name,
resourceHandle.Data)
if err != nil {
return fmt.Errorf("NodePrepareResource failed, claim UID: %s, claim name: %s, resource handle: %s, err: %+v",
resourceClaim.UID, resourceClaim.Name, resourceHandle.Data, err)
}
klog.V(3).InfoS("NodePrepareResource succeeded", "pluginName", resourceHandle.KubeletPluginName, "response", response)
// Add the CDI Devices returned by NodePrepareResource to
// the claimInfo object.
err = claimInfo.addCDIDevices(resourceHandle.KubeletPluginName, response.CdiDevices)
if err != nil {
return fmt.Errorf("failed to add CDIDevices to claimInfo %+v: %+v", claimInfo, err)
}
// TODO: We (re)add the claimInfo object to the cache and
// sync it to the checkpoint *after* the
// NodePrepareResource call has completed. This will cause
// issues if the kubelet gets restarted between
// NodePrepareResource and syncToCheckpoint. It will result
// in not calling NodeUnprepareResource for this claim
// because no claimInfo will be synced back to the cache
// for it after the restart. We need to resolve this issue
// before moving to beta.
m.cache.add(claimInfo)
// Checkpoint to reduce redundant calls to
// NodePrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint()
if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
}
} }
} }
} }
@@ -173,8 +204,10 @@ func (m *ManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*Conta
klog.V(3).InfoS("Add resource annotations", "claim", claimName, "annotations", claimInfo.annotations) klog.V(3).InfoS("Add resource annotations", "claim", claimName, "annotations", claimInfo.annotations)
annotations = append(annotations, claimInfo.annotations...) annotations = append(annotations, claimInfo.annotations...)
for _, cdiDevice := range claimInfo.CdiDevices { for _, devices := range claimInfo.CDIDevices {
cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: cdiDevice}) for _, device := range devices {
cdiDevices = append(cdiDevices, kubecontainer.CDIDevice{Name: device})
}
} }
} }
} }
@@ -208,35 +241,36 @@ func (m *ManagerImpl) UnprepareResources(pod *v1.Pod) error {
continue continue
} }
// Call NodeUnprepareResource only for the last pod that references the claim // Loop through all plugins and call NodeUnprepareResource only for the
client, err := dra.NewDRAPluginClient(claimInfo.DriverName) // last pod that references the claim
if err != nil { for pluginName, cdiDevices := range claimInfo.CDIDevices {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.DriverName, err) client, err := dra.NewDRAPluginClient(pluginName)
if err != nil {
return fmt.Errorf("failed to get DRA Plugin client for plugin name %s, err=%+v", claimInfo.DriverName, err)
}
response, err := client.NodeUnprepareResource(
context.Background(),
claimInfo.Namespace,
claimInfo.ClaimUID,
claimInfo.ClaimName,
cdiDevices)
if err != nil {
return fmt.Errorf(
"NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v",
pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, claimInfo.CDIDevices, err)
}
klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response)
} }
response, err := client.NodeUnprepareResource( // Delete last pod UID only if all NodeUnprepareResource calls succeed.
context.Background(), // This ensures that the status manager doesn't enter termination status
claimInfo.Namespace, // for the pod. This logic is implemented in
claimInfo.ClaimUID, // m.PodMightNeedToUnprepareResources and claimInfo.hasPodReference.
claimInfo.ClaimName,
claimInfo.CdiDevices)
if err != nil {
return fmt.Errorf(
"NodeUnprepareResource failed, pod: %s, claim UID: %s, claim name: %s, CDI devices: %s, err: %+v",
pod.Name, claimInfo.ClaimUID, claimInfo.ClaimName, claimInfo.CdiDevices, err)
}
klog.V(3).InfoS("NodeUnprepareResource succeeded", "response", response)
// Delete last pod UID only if NodeUnprepareResource call succeeds.
// This ensures that status manager doesn't enter termination status
// for the pod. This logic is implemented in the m.PodMightNeedToUnprepareResources
// and in the claimInfo.hasPodReference.
claimInfo.deletePodReference(pod.UID) claimInfo.deletePodReference(pod.UID)
m.cache.delete(claimInfo.ClaimName, pod.Namespace) m.cache.delete(claimInfo.ClaimName, pod.Namespace)
// Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart. // Checkpoint to reduce redundant calls to NodeUnPrepareResource() after a kubelet restart.
err = m.cache.syncToCheckpoint() err := m.cache.syncToCheckpoint()
if err != nil { if err != nil {
return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err) return fmt.Errorf("failed to checkpoint claimInfo state, err: %+v", err)
} }