mirror of
https://github.com/k8snetworkplumbingwg/multus-cni.git
synced 2025-09-17 06:55:48 +00:00
[increment][stub] has stubs in for the driver, untested but compiles, docs are probably meh and need updates
This commit is contained in:
2
cmd/dra-multus-driver/README.md
Normal file
2
cmd/dra-multus-driver/README.md
Normal file
@@ -0,0 +1,2 @@
|
||||
# DRA Multus Driver.
|
||||
|
@@ -29,7 +29,6 @@ const (
|
||||
cdiVendor = "k8s." + DriverName
|
||||
cdiClass = "gpu"
|
||||
cdiKind = cdiVendor + "/" + cdiClass
|
||||
DriverName = "gpu.multus-cni.io"
|
||||
cdiCommonDeviceName = "common"
|
||||
)
|
||||
|
||||
|
@@ -1,86 +0,0 @@
|
||||
/*
|
||||
* Copyright 2023 The Kubernetes Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
||||
resourceapi "k8s.io/api/resource/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/utils/ptr"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
func enumerateAllPossibleDevices(numGPUs int) (AllocatableDevices, error) {
|
||||
seed := os.Getenv("NODE_NAME")
|
||||
uuids := generateUUIDs(seed, numGPUs)
|
||||
|
||||
alldevices := make(AllocatableDevices)
|
||||
for i, uuid := range uuids {
|
||||
device := resourceapi.Device{
|
||||
Name: fmt.Sprintf("gpu-%d", i),
|
||||
Basic: &resourceapi.BasicDevice{
|
||||
Attributes: map[resourceapi.QualifiedName]resourceapi.DeviceAttribute{
|
||||
"index": {
|
||||
IntValue: ptr.To(int64(i)),
|
||||
},
|
||||
"uuid": {
|
||||
StringValue: ptr.To(uuid),
|
||||
},
|
||||
"model": {
|
||||
StringValue: ptr.To("LATEST-GPU-MODEL"),
|
||||
},
|
||||
"driverVersion": {
|
||||
VersionValue: ptr.To("1.0.0"),
|
||||
},
|
||||
},
|
||||
Capacity: map[resourceapi.QualifiedName]resourceapi.DeviceCapacity{
|
||||
"memory": {
|
||||
Value: resource.MustParse("80Gi"),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
alldevices[device.Name] = device
|
||||
}
|
||||
return alldevices, nil
|
||||
}
|
||||
|
||||
func generateUUIDs(seed string, count int) []string {
|
||||
rand := rand.New(rand.NewSource(hash(seed)))
|
||||
|
||||
uuids := make([]string, count)
|
||||
for i := 0; i < count; i++ {
|
||||
charset := make([]byte, 16)
|
||||
rand.Read(charset)
|
||||
uuid, _ := uuid.FromBytes(charset)
|
||||
uuids[i] = "gpu-" + uuid.String()
|
||||
}
|
||||
|
||||
return uuids
|
||||
}
|
||||
|
||||
func hash(s string) int64 {
|
||||
h := int64(0)
|
||||
for _, c := range s {
|
||||
h = 31*h + int64(c)
|
||||
}
|
||||
return h
|
||||
}
|
@@ -1,19 +1,3 @@
|
||||
/*
|
||||
* Copyright 2023 The Kubernetes Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
@@ -41,12 +25,14 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
|
||||
client: config.coreclient,
|
||||
}
|
||||
|
||||
// Initialize device state (we'll repurpose this to handle claim -> NAD resolution)
|
||||
state, err := NewDeviceState(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
driver.state = state
|
||||
|
||||
// Start the plugin and register it with the kubelet
|
||||
plugin, err := kubeletplugin.Start(
|
||||
ctx,
|
||||
[]any{driver},
|
||||
@@ -61,15 +47,7 @@ func NewDriver(ctx context.Context, config *Config) (*driver, error) {
|
||||
}
|
||||
driver.plugin = plugin
|
||||
|
||||
var resources kubeletplugin.Resources
|
||||
for _, device := range state.allocatable {
|
||||
resources.Devices = append(resources.Devices, device)
|
||||
}
|
||||
|
||||
if err := plugin.PublishResources(ctx, resources); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// No need to call PublishResources — we're resolving dynamically via claims (gravestone)
|
||||
return driver, nil
|
||||
}
|
||||
|
||||
@@ -79,14 +57,16 @@ func (d *driver) Shutdown(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (d *driver) NodePrepareResources(ctx context.Context, req *drapbv1.NodePrepareResourcesRequest) (*drapbv1.NodePrepareResourcesResponse, error) {
|
||||
klog.Infof("NodePrepareResource is called: number of claims: %d", len(req.Claims))
|
||||
preparedResources := &drapbv1.NodePrepareResourcesResponse{Claims: map[string]*drapbv1.NodePrepareResourceResponse{}}
|
||||
|
||||
for _, claim := range req.Claims {
|
||||
preparedResources.Claims[claim.UID] = d.nodePrepareResource(ctx, claim)
|
||||
klog.Infof("NodePrepareResources called: number of claims: %d", len(req.Claims))
|
||||
prepared := &drapbv1.NodePrepareResourcesResponse{
|
||||
Claims: map[string]*drapbv1.NodePrepareResourceResponse{},
|
||||
}
|
||||
|
||||
return preparedResources, nil
|
||||
for _, claim := range req.Claims {
|
||||
prepared.Claims[claim.UID] = d.nodePrepareResource(ctx, claim)
|
||||
}
|
||||
|
||||
return prepared, nil
|
||||
}
|
||||
|
||||
func (d *driver) nodePrepareResource(ctx context.Context, claim *drapbv1.Claim) *drapbv1.NodePrepareResourceResponse {
|
||||
@@ -96,36 +76,40 @@ func (d *driver) nodePrepareResource(ctx context.Context, claim *drapbv1.Claim)
|
||||
metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return &drapbv1.NodePrepareResourceResponse{
|
||||
Error: fmt.Sprintf("failed to fetch ResourceClaim %s in namespace %s", claim.Name, claim.Namespace),
|
||||
Error: fmt.Sprintf("failed to fetch ResourceClaim %q in namespace %q: %v", claim.Name, claim.Namespace, err),
|
||||
}
|
||||
}
|
||||
|
||||
prepared, err := d.state.Prepare(resourceClaim)
|
||||
devices, err := d.state.Prepare(resourceClaim)
|
||||
if err != nil {
|
||||
return &drapbv1.NodePrepareResourceResponse{
|
||||
Error: fmt.Sprintf("error preparing devices for claim %v: %v", claim.UID, err),
|
||||
Error: fmt.Sprintf("error preparing devices for claim %q: %v", claim.UID, err),
|
||||
}
|
||||
}
|
||||
|
||||
klog.Infof("Returning newly prepared devices for claim '%v': %v", claim.UID, prepared)
|
||||
return &drapbv1.NodePrepareResourceResponse{Devices: prepared}
|
||||
klog.Infof("Prepared NAD-based device for claim %q: %+v", claim.UID, devices)
|
||||
return &drapbv1.NodePrepareResourceResponse{
|
||||
Devices: devices,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUnprepareResourcesRequest) (*drapbv1.NodeUnprepareResourcesResponse, error) {
|
||||
klog.Infof("NodeUnPrepareResource is called: number of claims: %d", len(req.Claims))
|
||||
unpreparedResources := &drapbv1.NodeUnprepareResourcesResponse{Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{}}
|
||||
|
||||
for _, claim := range req.Claims {
|
||||
unpreparedResources.Claims[claim.UID] = d.nodeUnprepareResource(ctx, claim)
|
||||
klog.Infof("NodeUnprepareResources called: number of claims: %d", len(req.Claims))
|
||||
unprepared := &drapbv1.NodeUnprepareResourcesResponse{
|
||||
Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{},
|
||||
}
|
||||
|
||||
return unpreparedResources, nil
|
||||
for _, claim := range req.Claims {
|
||||
unprepared.Claims[claim.UID] = d.nodeUnprepareResource(ctx, claim)
|
||||
}
|
||||
|
||||
return unprepared, nil
|
||||
}
|
||||
|
||||
func (d *driver) nodeUnprepareResource(ctx context.Context, claim *drapbv1.Claim) *drapbv1.NodeUnprepareResourceResponse {
|
||||
if err := d.state.Unprepare(claim.UID); err != nil {
|
||||
return &drapbv1.NodeUnprepareResourceResponse{
|
||||
Error: fmt.Sprintf("error unpreparing devices for claim %v: %v", claim.UID, err),
|
||||
Error: fmt.Sprintf("error unpreparing devices for claim %q: %v", claim.UID, err),
|
||||
}
|
||||
}
|
||||
|
||||
|
67
cmd/dra-multus-driver/example.yaml
Normal file
67
cmd/dra-multus-driver/example.yaml
Normal file
@@ -0,0 +1,67 @@
|
||||
---
|
||||
apiVersion: "k8s.cni.cncf.io/v1"
|
||||
kind: NetworkAttachmentDefinition
|
||||
metadata:
|
||||
name: bridge-net
|
||||
namespace: default
|
||||
spec:
|
||||
config: '{
|
||||
"cniVersion": "0.3.1",
|
||||
"type": "bridge",
|
||||
"bridge": "br0",
|
||||
"ipam": {
|
||||
"type": "static",
|
||||
"addresses": [
|
||||
{
|
||||
"address": "10.10.0.2/24",
|
||||
"gateway": "10.10.0.1"
|
||||
}
|
||||
]
|
||||
}
|
||||
}'
|
||||
---
|
||||
apiVersion: resource.k8s.io/v1beta1
|
||||
kind: DeviceClass
|
||||
metadata:
|
||||
name: multus-dra.k8s.cni.cncf.io
|
||||
spec:
|
||||
selectors:
|
||||
- cel:
|
||||
expression: "device.driver == 'multus-dra.k8s.cni.cncf.io'"
|
||||
---
|
||||
apiVersion: resource.k8s.io/v1beta1
|
||||
kind: ResourceClaimTemplate
|
||||
metadata:
|
||||
name: bridge-net
|
||||
namespace: default
|
||||
spec:
|
||||
spec:
|
||||
devices:
|
||||
requests:
|
||||
- name: bridge-net
|
||||
deviceClassName: multus-dra.k8s.cni.cncf.io
|
||||
config:
|
||||
name: netattachdef
|
||||
parameters:
|
||||
driver: multus-dra.k8s.cni.cncf.io
|
||||
nadName: macvlan-conf
|
||||
nadNamespace: default
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
name: bridge-test
|
||||
namespace: default
|
||||
spec:
|
||||
containers:
|
||||
- name: app
|
||||
image: ubuntu:22.04
|
||||
command: ["bash", "-c"]
|
||||
args: ["export; trap 'exit 0' TERM; sleep 9999 & wait"]
|
||||
resources:
|
||||
claims:
|
||||
- name: net
|
||||
resourceClaims:
|
||||
- name: net
|
||||
resourceClaimTemplateName: bridge-net
|
||||
|
@@ -1,19 +1,3 @@
|
||||
/*
|
||||
* Copyright 2023 The Kubernetes Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
@@ -24,7 +8,6 @@ import (
|
||||
"syscall"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
coreclientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
@@ -32,6 +15,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
DriverName = "multus-dra.k8s.cni.cncf.io"
|
||||
PluginRegistrationPath = "/var/lib/kubelet/plugins_registry/" + DriverName + ".sock"
|
||||
DriverPluginPath = "/var/lib/kubelet/plugins/" + DriverName
|
||||
DriverPluginSocketPath = DriverPluginPath + "/plugin.sock"
|
||||
@@ -39,12 +23,11 @@ const (
|
||||
)
|
||||
|
||||
type Flags struct {
|
||||
kubeClientConfig flags.KubeClientConfig
|
||||
loggingConfig *flags.LoggingConfig
|
||||
|
||||
nodeName string
|
||||
cdiRoot string
|
||||
numDevices int
|
||||
cdiRoot string
|
||||
kubeClientConfig flags.KubeClientConfig
|
||||
loggingConfig *flags.LoggingConfig
|
||||
nodeName string
|
||||
resolvedConfigDir string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
@@ -78,21 +61,13 @@ func newApp() *cli.App {
|
||||
Destination: &flags.cdiRoot,
|
||||
EnvVars: []string{"CDI_ROOT"},
|
||||
},
|
||||
&cli.IntFlag{
|
||||
Name: "num-devices",
|
||||
Usage: "The number of devices to be generated.",
|
||||
Value: 8,
|
||||
Destination: &flags.numDevices,
|
||||
EnvVars: []string{"NUM_DEVICES"},
|
||||
},
|
||||
}
|
||||
cliFlags = append(cliFlags, flags.kubeClientConfig.Flags()...)
|
||||
cliFlags = append(cliFlags, flags.loggingConfig.Flags()...)
|
||||
|
||||
app := &cli.App{
|
||||
Name: "dra-example-kubeletplugin",
|
||||
Usage: "dra-example-kubeletplugin implements a DRA driver plugin.",
|
||||
ArgsUsage: " ",
|
||||
Name: "multus-dra-driver",
|
||||
Usage: "Multus-integrated DRA driver for resolving NetworkAttachmentDefinitions at scheduling time",
|
||||
HideHelpCommand: true,
|
||||
Flags: cliFlags,
|
||||
Before: func(c *cli.Context) error {
|
||||
@@ -126,17 +101,9 @@ func StartPlugin(ctx context.Context, config *Config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
info, err := os.Stat(config.flags.cdiRoot)
|
||||
switch {
|
||||
case err != nil && os.IsNotExist(err):
|
||||
err := os.MkdirAll(config.flags.cdiRoot, 0750)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case err != nil:
|
||||
err = os.MkdirAll(config.flags.resolvedConfigDir, 0750)
|
||||
if err != nil {
|
||||
return err
|
||||
case !info.IsDir():
|
||||
return fmt.Errorf("path for cdi file generation is not a directory: '%v'", err)
|
||||
}
|
||||
|
||||
driver, err := NewDriver(ctx, config)
|
||||
@@ -144,6 +111,7 @@ func StartPlugin(ctx context.Context, config *Config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Watch for shutdown signals
|
||||
sigc := make(chan os.Signal, 1)
|
||||
signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
|
||||
<-sigc
|
||||
|
@@ -1,24 +1,7 @@
|
||||
/*
|
||||
* Copyright 2023 The Kubernetes Authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
resourceapi "k8s.io/api/resource/v1beta1"
|
||||
@@ -26,7 +9,9 @@ import (
|
||||
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1beta1"
|
||||
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
||||
|
||||
configapi "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/dra/api/multus-cni.io/resource/gpu/v1alpha1"
|
||||
netclientset "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned"
|
||||
configapi "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/dra/api/multus-cni.io/resource/net/v1alpha1"
|
||||
multusk8sutils "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient"
|
||||
|
||||
cdiapi "tags.cncf.io/container-device-interface/pkg/cdi"
|
||||
cdispec "tags.cncf.io/container-device-interface/specs-go"
|
||||
@@ -58,24 +43,18 @@ func (pds PreparedDevices) GetDevices() []*drapbv1.Device {
|
||||
type DeviceState struct {
|
||||
sync.Mutex
|
||||
cdi *CDIHandler
|
||||
allocatable AllocatableDevices
|
||||
checkpointManager checkpointmanager.CheckpointManager
|
||||
nadClient netclientset.Interface
|
||||
}
|
||||
|
||||
func NewDeviceState(config *Config) (*DeviceState, error) {
|
||||
allocatable, err := enumerateAllPossibleDevices(config.flags.numDevices)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error enumerating all possible devices: %v", err)
|
||||
}
|
||||
|
||||
cdi, err := NewCDIHandler(config)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create CDI handler: %v", err)
|
||||
}
|
||||
|
||||
err = cdi.CreateCommonSpecFile()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create CDI spec file for common edits: %v", err)
|
||||
if err := cdi.CreateCommonSpecFile(); err != nil {
|
||||
return nil, fmt.Errorf("unable to create CDI common spec file: %v", err)
|
||||
}
|
||||
|
||||
checkpointManager, err := checkpointmanager.NewCheckpointManager(DriverPluginPath)
|
||||
@@ -83,29 +62,20 @@ func NewDeviceState(config *Config) (*DeviceState, error) {
|
||||
return nil, fmt.Errorf("unable to create checkpoint manager: %v", err)
|
||||
}
|
||||
|
||||
state := &DeviceState{
|
||||
cdi: cdi,
|
||||
allocatable: allocatable,
|
||||
checkpointManager: checkpointManager,
|
||||
}
|
||||
|
||||
checkpoints, err := state.checkpointManager.ListCheckpoints()
|
||||
clientconfig, err := config.flags.kubeClientConfig.NewClientSetConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to list checkpoints: %v", err)
|
||||
return nil, fmt.Errorf("unable to create clientset config: %v", err)
|
||||
}
|
||||
nadClient, err := netclientset.NewForConfig(clientconfig)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create NAD client: %v", err)
|
||||
}
|
||||
|
||||
for _, c := range checkpoints {
|
||||
if c == DriverPluginCheckpointFile {
|
||||
return state, nil
|
||||
}
|
||||
}
|
||||
|
||||
checkpoint := newCheckpoint()
|
||||
if err := state.checkpointManager.CreateCheckpoint(DriverPluginCheckpointFile, checkpoint); err != nil {
|
||||
return nil, fmt.Errorf("unable to sync to checkpoint: %v", err)
|
||||
}
|
||||
|
||||
return state, nil
|
||||
return &DeviceState{
|
||||
cdi: cdi,
|
||||
checkpointManager: checkpointManager,
|
||||
nadClient: nadClient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *DeviceState) Prepare(claim *resourceapi.ResourceClaim) ([]*drapbv1.Device, error) {
|
||||
@@ -177,131 +147,73 @@ func (s *DeviceState) prepareDevices(claim *resourceapi.ResourceClaim) (Prepared
|
||||
return nil, fmt.Errorf("claim not yet allocated")
|
||||
}
|
||||
|
||||
// Retrieve the full set of device configs for the driver.
|
||||
configs, err := GetOpaqueDeviceConfigs(
|
||||
configapi.Decoder,
|
||||
DriverName,
|
||||
claim.Status.Allocation.Devices.Config,
|
||||
)
|
||||
// Sanity: should only have one opaque config per claim
|
||||
if len(claim.Status.Allocation.Devices.Config) == 0 {
|
||||
return nil, fmt.Errorf("no config provided in claim allocation")
|
||||
}
|
||||
|
||||
opaque := claim.Status.Allocation.Devices.Config[0].DeviceConfiguration.Opaque
|
||||
if opaque == nil || opaque.Driver != DriverName {
|
||||
return nil, fmt.Errorf("claim does not contain expected opaque config for driver %q", DriverName)
|
||||
}
|
||||
|
||||
obj, err := runtime.Decode(configapi.Decoder, opaque.Parameters.Raw)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting opaque device configs: %v", err)
|
||||
return nil, fmt.Errorf("failed to decode opaque config: %w", err)
|
||||
}
|
||||
|
||||
// Add the default GPU Config to the front of the config list with the
|
||||
// lowest precedence. This guarantees there will be at least one config in
|
||||
// the list with len(Requests) == 0 for the lookup below.
|
||||
configs = slices.Insert(configs, 0, &OpaqueDeviceConfig{
|
||||
Requests: []string{},
|
||||
Config: configapi.DefaultGpuConfig(),
|
||||
})
|
||||
|
||||
// Look through the configs and figure out which one will be applied to
|
||||
// each device allocation result based on their order of precedence.
|
||||
configResultsMap := make(map[runtime.Object][]*resourceapi.DeviceRequestAllocationResult)
|
||||
for _, result := range claim.Status.Allocation.Devices.Results {
|
||||
if _, exists := s.allocatable[result.Device]; !exists {
|
||||
return nil, fmt.Errorf("requested GPU is not allocatable: %v", result.Device)
|
||||
}
|
||||
for _, c := range slices.Backward(configs) {
|
||||
if len(c.Requests) == 0 || slices.Contains(c.Requests, result.Request) {
|
||||
configResultsMap[c.Config] = append(configResultsMap[c.Config], &result)
|
||||
break
|
||||
}
|
||||
}
|
||||
netConfig, ok := obj.(*configapi.NetConfig)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("decoded opaque config is not a *NetConfig")
|
||||
}
|
||||
|
||||
// Normalize, validate, and apply all configs associated with devices that
|
||||
// need to be prepared. Track container edits generated from applying the
|
||||
// config to the set of device allocation results.
|
||||
perDeviceCDIContainerEdits := make(PerDeviceCDIContainerEdits)
|
||||
for c, results := range configResultsMap {
|
||||
// Cast the opaque config to a GpuConfig
|
||||
var config *configapi.GpuConfig
|
||||
switch castConfig := c.(type) {
|
||||
case *configapi.GpuConfig:
|
||||
config = castConfig
|
||||
default:
|
||||
return nil, fmt.Errorf("runtime object is not a regognized configuration")
|
||||
}
|
||||
|
||||
// Normalize the config to set any implied defaults.
|
||||
if err := config.Normalize(); err != nil {
|
||||
return nil, fmt.Errorf("error normalizing GPU config: %w", err)
|
||||
}
|
||||
|
||||
// Validate the config to ensure its integrity.
|
||||
if err := config.Validate(); err != nil {
|
||||
return nil, fmt.Errorf("error validating GPU config: %w", err)
|
||||
}
|
||||
|
||||
// Apply the config to the list of results associated with it.
|
||||
containerEdits, err := s.applyConfig(config, results)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error applying GPU config: %w", err)
|
||||
}
|
||||
|
||||
// Merge any new container edits with the overall per device map.
|
||||
for k, v := range containerEdits {
|
||||
perDeviceCDIContainerEdits[k] = v
|
||||
}
|
||||
// Apply CDI edits to all results using this config
|
||||
var results []*resourceapi.DeviceRequestAllocationResult
|
||||
for i := range claim.Status.Allocation.Devices.Results {
|
||||
results = append(results, &claim.Status.Allocation.Devices.Results[i])
|
||||
}
|
||||
|
||||
perDeviceCDIContainerEdits, err := s.applyConfig(netConfig, results, claim.Namespace)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to apply CDI container edits: %w", err)
|
||||
}
|
||||
|
||||
// Walk through each config and its associated device allocation results
|
||||
// and construct the list of prepared devices to return.
|
||||
var preparedDevices PreparedDevices
|
||||
for _, results := range configResultsMap {
|
||||
for _, result := range results {
|
||||
device := &PreparedDevice{
|
||||
Device: drapbv1.Device{
|
||||
RequestNames: []string{result.Request},
|
||||
PoolName: result.Pool,
|
||||
DeviceName: result.Device,
|
||||
CDIDeviceIDs: s.cdi.GetClaimDevices(string(claim.UID), []string{result.Device}),
|
||||
},
|
||||
ContainerEdits: perDeviceCDIContainerEdits[result.Device],
|
||||
}
|
||||
preparedDevices = append(preparedDevices, device)
|
||||
for _, result := range claim.Status.Allocation.Devices.Results {
|
||||
device := &PreparedDevice{
|
||||
Device: drapbv1.Device{
|
||||
RequestNames: []string{result.Request},
|
||||
PoolName: result.Pool,
|
||||
DeviceName: result.Device,
|
||||
CDIDeviceIDs: s.cdi.GetClaimDevices(string(claim.UID), []string{result.Device}),
|
||||
},
|
||||
ContainerEdits: perDeviceCDIContainerEdits[result.Device],
|
||||
}
|
||||
preparedDevices = append(preparedDevices, device)
|
||||
}
|
||||
|
||||
return preparedDevices, nil
|
||||
}
|
||||
|
||||
func (s *DeviceState) unprepareDevices(claimUID string, devices PreparedDevices) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// applyConfig applies a configuration to a set of device allocation results.
|
||||
//
|
||||
// In this example driver there is no actual configuration applied. We simply
|
||||
// define a set of environment variables to be injected into the containers
|
||||
// that include a given device. A real driver would likely need to do some sort
|
||||
// of hardware configuration as well, based on the config passed in.
|
||||
func (s *DeviceState) applyConfig(config *configapi.GpuConfig, results []*resourceapi.DeviceRequestAllocationResult) (PerDeviceCDIContainerEdits, error) {
|
||||
func (s *DeviceState) applyConfig(config *configapi.NetConfig, results []*resourceapi.DeviceRequestAllocationResult, podNamespace string) (PerDeviceCDIContainerEdits, error) {
|
||||
perDeviceEdits := make(PerDeviceCDIContainerEdits)
|
||||
|
||||
parsedNets, err := multusk8sutils.ParsePodNetworkAnnotation(config.Networks, podNamespace)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to parse networks string: %w", err)
|
||||
}
|
||||
|
||||
for _, result := range results {
|
||||
envs := []string{
|
||||
fmt.Sprintf("GPU_DEVICE_%s=%s", result.Device[4:], result.Device),
|
||||
fmt.Sprintf("MULTUS_DRA_DEVICE_NAME=%s", result.Device),
|
||||
fmt.Sprintf("MULTUS_DRA_NETWORKS=%s", config.Networks),
|
||||
}
|
||||
|
||||
if config.Sharing != nil {
|
||||
envs = append(envs, fmt.Sprintf("GPU_DEVICE_%s_SHARING_STRATEGY=%s", result.Device[4:], config.Sharing.Strategy))
|
||||
}
|
||||
|
||||
switch {
|
||||
case config.Sharing.IsTimeSlicing():
|
||||
tsconfig, err := config.Sharing.GetTimeSlicingConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get time slicing config for device %v: %w", result.Device, err)
|
||||
}
|
||||
envs = append(envs, fmt.Sprintf("GPU_DEVICE_%s_TIMESLICE_INTERVAL=%v", result.Device[4:], tsconfig.Interval))
|
||||
case config.Sharing.IsSpacePartitioning():
|
||||
spconfig, err := config.Sharing.GetSpacePartitioningConfig()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to get space partitioning config for device %v: %w", result.Device, err)
|
||||
}
|
||||
envs = append(envs, fmt.Sprintf("GPU_DEVICE_%s_PARTITION_COUNT=%v", result.Device[4:], spconfig.PartitionCount))
|
||||
for i, net := range parsedNets {
|
||||
envs = append(envs, fmt.Sprintf("MULTUS_DRA_NET_%d_NAME=%s", i, net.Name))
|
||||
envs = append(envs, fmt.Sprintf("MULTUS_DRA_NET_%d_NAMESPACE=%s", i, net.Namespace))
|
||||
envs = append(envs, fmt.Sprintf("MULTUS_DRA_NET_%d_IFNAME=%s", i, net.InterfaceRequest))
|
||||
}
|
||||
|
||||
edits := &cdispec.ContainerEdits{
|
||||
@@ -314,68 +226,31 @@ func (s *DeviceState) applyConfig(config *configapi.GpuConfig, results []*resour
|
||||
return perDeviceEdits, nil
|
||||
}
|
||||
|
||||
// GetOpaqueDeviceConfigs returns an ordered list of the configs contained in possibleConfigs for this driver.
|
||||
//
|
||||
// Configs can either come from the resource claim itself or from the device
|
||||
// class associated with the request. Configs coming directly from the resource
|
||||
// claim take precedence over configs coming from the device class. Moreover,
|
||||
// configs found later in the list of configs attached to its source take
|
||||
// precedence over configs found earlier in the list for that source.
|
||||
//
|
||||
// All of the configs relevant to the driver from the list of possibleConfigs
|
||||
// will be returned in order of precedence (from lowest to highest). If no
|
||||
// configs are found, nil is returned.
|
||||
func GetOpaqueDeviceConfigs(
|
||||
func GetOpaqueDeviceConfig(
|
||||
decoder runtime.Decoder,
|
||||
driverName string,
|
||||
possibleConfigs []resourceapi.DeviceAllocationConfiguration,
|
||||
) ([]*OpaqueDeviceConfig, error) {
|
||||
// Collect all configs in order of reverse precedence.
|
||||
var classConfigs []resourceapi.DeviceAllocationConfiguration
|
||||
var claimConfigs []resourceapi.DeviceAllocationConfiguration
|
||||
var candidateConfigs []resourceapi.DeviceAllocationConfiguration
|
||||
) (*configapi.NetConfig, error) {
|
||||
for _, config := range possibleConfigs {
|
||||
switch config.Source {
|
||||
case resourceapi.AllocationConfigSourceClass:
|
||||
classConfigs = append(classConfigs, config)
|
||||
case resourceapi.AllocationConfigSourceClaim:
|
||||
claimConfigs = append(claimConfigs, config)
|
||||
default:
|
||||
return nil, fmt.Errorf("invalid config source: %v", config.Source)
|
||||
}
|
||||
}
|
||||
candidateConfigs = append(candidateConfigs, classConfigs...)
|
||||
candidateConfigs = append(candidateConfigs, claimConfigs...)
|
||||
|
||||
// Decode all configs that are relevant for the driver.
|
||||
var resultConfigs []*OpaqueDeviceConfig
|
||||
for _, config := range candidateConfigs {
|
||||
// If this is nil, the driver doesn't support some future API extension
|
||||
// and needs to be updated.
|
||||
if config.DeviceConfiguration.Opaque == nil {
|
||||
return nil, fmt.Errorf("only opaque parameters are supported by this driver")
|
||||
continue
|
||||
}
|
||||
|
||||
// Configs for different drivers may have been specified because a
|
||||
// single request can be satisfied by different drivers. This is not
|
||||
// an error -- drivers must skip over other driver's configs in order
|
||||
// to support this.
|
||||
if config.DeviceConfiguration.Opaque.Driver != driverName {
|
||||
continue
|
||||
}
|
||||
|
||||
decodedConfig, err := runtime.Decode(decoder, config.DeviceConfiguration.Opaque.Parameters.Raw)
|
||||
decoded, err := runtime.Decode(decoder, config.DeviceConfiguration.Opaque.Parameters.Raw)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error decoding config parameters: %w", err)
|
||||
return nil, fmt.Errorf("error decoding opaque config: %w", err)
|
||||
}
|
||||
|
||||
resultConfig := &OpaqueDeviceConfig{
|
||||
Requests: config.Requests,
|
||||
Config: decodedConfig,
|
||||
netConfig, ok := decoded.(*configapi.NetConfig)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("decoded config is not of type *NetConfig")
|
||||
}
|
||||
|
||||
resultConfigs = append(resultConfigs, resultConfig)
|
||||
return netConfig, nil
|
||||
}
|
||||
|
||||
return resultConfigs, nil
|
||||
return nil, fmt.Errorf("no matching opaque config found for driver %q", driverName)
|
||||
}
|
||||
|
||||
func (s *DeviceState) unprepareDevices(claimUID string, devices PreparedDevices) error {
|
||||
return nil
|
||||
}
|
||||
|
@@ -15,60 +15,3 @@
|
||||
*/
|
||||
|
||||
package v1alpha1
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// Validate ensures that GpuSharingStrategy has a valid set of values.
|
||||
func (s GpuSharingStrategy) Validate() error {
|
||||
switch s {
|
||||
case TimeSlicingStrategy, SpacePartitioningStrategy:
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("unknown GPU sharing strategy: %v", s)
|
||||
}
|
||||
|
||||
// Validate ensures that TimeSliceInterval has a valid set of values.
|
||||
func (d TimeSliceInterval) Validate() error {
|
||||
switch d {
|
||||
case DefaultTimeSlice, ShortTimeSlice, MediumTimeSlice, LongTimeSlice:
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("unknown time-slice interval: %v", d)
|
||||
}
|
||||
|
||||
// Validate ensures that TimeSlicingConfig has a valid set of values.
|
||||
func (c *TimeSlicingConfig) Validate() error {
|
||||
return c.Interval.Validate()
|
||||
}
|
||||
|
||||
// Validate ensures that SpacePartitioningConfig has a valid set of values.
|
||||
func (c *SpacePartitioningConfig) Validate() error {
|
||||
if c.PartitionCount < 0 {
|
||||
return fmt.Errorf("invalid partition count: %v", c.PartitionCount)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Validate ensures that GpuSharing has a valid set of values.
|
||||
func (s *GpuSharing) Validate() error {
|
||||
if err := s.Strategy.Validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
switch {
|
||||
case s.IsTimeSlicing():
|
||||
return s.TimeSlicingConfig.Validate()
|
||||
case s.IsSpacePartitioning():
|
||||
return s.SpacePartitioningConfig.Validate()
|
||||
}
|
||||
return fmt.Errorf("invalid GPU sharing settings: %v", s)
|
||||
}
|
||||
|
||||
// Validate ensures that GpuConfig has a valid set of values.
|
||||
func (c *GpuConfig) Validate() error {
|
||||
if c.Sharing == nil {
|
||||
return fmt.Errorf("no sharing strategy set")
|
||||
}
|
||||
return c.Sharing.Validate()
|
||||
}
|
||||
|
@@ -217,17 +217,18 @@ func parsePodNetworkObjectName(podnetwork string) (string, string, string, error
|
||||
return netNsName, networkName, netIfName, nil
|
||||
}
|
||||
|
||||
func parsePodNetworkAnnotation(podNetworks, defaultNamespace string) ([]*types.NetworkSelectionElement, error) {
|
||||
// ParsePodNetworkAnnotation parses pod network annotation (the network selection element), as you'd wager.
|
||||
func ParsePodNetworkAnnotation(podNetworks, defaultNamespace string) ([]*types.NetworkSelectionElement, error) {
|
||||
var networks []*types.NetworkSelectionElement
|
||||
|
||||
logging.Debugf("parsePodNetworkAnnotation: %s, %s", podNetworks, defaultNamespace)
|
||||
logging.Debugf("ParsePodNetworkAnnotation: %s, %s", podNetworks, defaultNamespace)
|
||||
if podNetworks == "" {
|
||||
return nil, logging.Errorf("parsePodNetworkAnnotation: pod annotation does not have \"network\" as key")
|
||||
return nil, logging.Errorf("ParsePodNetworkAnnotation: pod annotation does not have \"network\" as key")
|
||||
}
|
||||
|
||||
if strings.ContainsAny(podNetworks, "[{\"") {
|
||||
if err := json.Unmarshal([]byte(podNetworks), &networks); err != nil {
|
||||
return nil, logging.Errorf("parsePodNetworkAnnotation: failed to parse pod Network Attachment Selection Annotation JSON format: %v", err)
|
||||
return nil, logging.Errorf("ParsePodNetworkAnnotation: failed to parse pod Network Attachment Selection Annotation JSON format: %v", err)
|
||||
}
|
||||
} else {
|
||||
// Comma-delimited list of network attachment object names
|
||||
@@ -238,7 +239,7 @@ func parsePodNetworkAnnotation(podNetworks, defaultNamespace string) ([]*types.N
|
||||
// Parse network name (i.e. <namespace>/<network name>@<ifname>)
|
||||
netNsName, networkName, netIfName, err := parsePodNetworkObjectName(item)
|
||||
if err != nil {
|
||||
return nil, logging.Errorf("parsePodNetworkAnnotation: %v", err)
|
||||
return nil, logging.Errorf("ParsePodNetworkAnnotation: %v", err)
|
||||
}
|
||||
|
||||
networks = append(networks, &types.NetworkSelectionElement{
|
||||
@@ -256,13 +257,13 @@ func parsePodNetworkAnnotation(podNetworks, defaultNamespace string) ([]*types.N
|
||||
if n.MacRequest != "" {
|
||||
// validate MAC address
|
||||
if _, err := net.ParseMAC(n.MacRequest); err != nil {
|
||||
return nil, logging.Errorf("parsePodNetworkAnnotation: failed to mac: %v", err)
|
||||
return nil, logging.Errorf("ParsePodNetworkAnnotation: failed to mac: %v", err)
|
||||
}
|
||||
}
|
||||
if n.InfinibandGUIDRequest != "" {
|
||||
// validate GUID address
|
||||
if _, err := net.ParseMAC(n.InfinibandGUIDRequest); err != nil {
|
||||
return nil, logging.Errorf("parsePodNetworkAnnotation: failed to validate infiniband GUID: %v", err)
|
||||
return nil, logging.Errorf("ParsePodNetworkAnnotation: failed to validate infiniband GUID: %v", err)
|
||||
}
|
||||
}
|
||||
if n.IPRequest != nil {
|
||||
@@ -435,7 +436,7 @@ func GetPodNetwork(pod *v1.Pod) ([]*types.NetworkSelectionElement, error) {
|
||||
return nil, &NoK8sNetworkError{"no kubernetes network found"}
|
||||
}
|
||||
|
||||
networks, err := parsePodNetworkAnnotation(netAnnot, defaultNamespace)
|
||||
networks, err := ParsePodNetworkAnnotation(netAnnot, defaultNamespace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -618,7 +619,7 @@ func tryLoadK8sPodDefaultNetwork(kubeClient *ClientInfo, pod *v1.Pod, conf *type
|
||||
}
|
||||
|
||||
// The CRD object of default network should only be defined in multusNamespace
|
||||
networks, err := parsePodNetworkAnnotation(netAnnot, conf.MultusNamespace)
|
||||
networks, err := ParsePodNetworkAnnotation(netAnnot, conf.MultusNamespace)
|
||||
if err != nil {
|
||||
return nil, logging.Errorf("tryLoadK8sPodDefaultNetwork: failed to parse CRD object: %v", err)
|
||||
}
|
||||
|
Reference in New Issue
Block a user