added deviceIDs insertions into delegates

this changes allow Multus to parse resourceName annotation from
network CRDs then add deviceIDs into the delegate object for CNI
plugin to consume it.
This commit is contained in:
Abdul Halim
2018-08-03 13:32:28 +01:00
parent 1ad25a890d
commit f30c688775
602 changed files with 21447 additions and 6809 deletions

View File

@@ -34,11 +34,20 @@ import (
"github.com/intel/multus-cni/types"
)
const (
resourceNameAnnot = "k8s.v1.cni.cncf.io/resourceName"
)
// NoK8sNetworkError indicates error, no network in kubernetes
type NoK8sNetworkError struct {
message string
}
type ResourceInfo struct {
Index int
deviceIDs []string
}
type clientInfo struct {
Client KubeClient
Podnamespace string
@@ -66,6 +75,26 @@ func (d *defaultKubeClient) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) {
return d.client.Core().Pods(pod.Namespace).UpdateStatus(pod)
}
// GetComputeDeviceMap returns a map of resourceName to list of device IDs
func GetComputeDeviceMap(pod *v1.Pod) map[string]*ResourceInfo {
resourceMap := make(map[string]*ResourceInfo)
for _, cntr := range pod.Spec.Containers {
for _, cd := range cntr.ComputeDevices {
entry, ok := resourceMap[cd.ResourceName]
if ok {
// already exists; append to it
entry.deviceIDs = append(entry.deviceIDs, cd.DeviceIDs...)
} else {
// new entry
resourceMap[cd.ResourceName] = &ResourceInfo{deviceIDs: cd.DeviceIDs}
}
}
}
return resourceMap
}
func setKubeClientInfo(c *clientInfo, client KubeClient, k8sArgs *types.K8sArgs) {
c.Client = client
c.Podnamespace = string(k8sArgs.K8S_POD_NAMESPACE)
@@ -127,14 +156,7 @@ func setPodNetworkAnnotation(client KubeClient, namespace string, pod *v1.Pod, n
return pod, nil
}
func getPodNetworkAnnotation(client KubeClient, k8sArgs *types.K8sArgs) (string, string, error) {
var err error
pod, err := client.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
return "", "", fmt.Errorf("getPodNetworkAnnotation: failed to query the pod %v in out of cluster comm: %v", string(k8sArgs.K8S_POD_NAME), err)
}
func getPodNetworkAnnotation(pod *v1.Pod) (string, string, error) {
return pod.Annotations["k8s.v1.cni.cncf.io/networks"], pod.ObjectMeta.Namespace, nil
}
@@ -315,7 +337,7 @@ func cniConfigFromNetworkResource(customResource *types.NetworkAttachmentDefinit
return config, nil
}
func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement, confdir string) (*types.DelegateNetConf, error) {
func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement, confdir string, resourceMap map[string]*ResourceInfo) (*types.DelegateNetConf, error) {
rawPath := fmt.Sprintf("/apis/k8s.cni.cncf.io/v1/namespaces/%s/network-attachment-definitions/%s", net.Namespace, net.Name)
netData, err := client.GetRawWithPath(rawPath)
if err != nil {
@@ -327,19 +349,57 @@ func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement
return nil, fmt.Errorf("getKubernetesDelegate: failed to get the netplugin data: %v", err)
}
// DEVICE_ID
// Get resourceName annotation from NetDefinition
deviceID := ""
resourceName, ok := customResource.Metadata.Annotations[resourceNameAnnot]
if ok {
// ResourceName annotation is found; try to get device info from resourceMap
entry, ok := resourceMap[resourceName]
if ok {
if idCount := len(entry.deviceIDs); idCount > 0 && idCount > entry.Index {
deviceID = entry.deviceIDs[entry.Index]
entry.Index++ // increment Index for next delegate
}
}
}
configBytes, err := cniConfigFromNetworkResource(customResource, confdir)
if err != nil {
return nil, err
}
if deviceID != "" {
if configBytes, err = delegateAddDeviceID(configBytes, deviceID); err != nil {
return nil, err
}
}
delegate, err := types.LoadDelegateNetConf(configBytes, net.InterfaceRequest)
if err != nil {
return nil, err
}
// return delegate, fmt.Errorf("Debug: delegate object: %+v", string(delegate.Bytes))
return delegate, nil
}
func delegateAddDeviceID(inBytes []byte, deviceID string) ([]byte, error) {
var rawConfig map[string]interface{}
var err error
err = json.Unmarshal(inBytes, &rawConfig)
if err != nil {
return nil, fmt.Errorf("delegateAddDeviceID: failed to unmarshal inBytes: %v", err)
}
// Inject deviceID
rawConfig["deviceID"] = deviceID
configBytes, err := json.Marshal(rawConfig)
if err != nil {
return nil, fmt.Errorf("delegateAddDeviceID: failed to re-marshal Spec.Config: %v", err)
}
return configBytes, nil
}
type KubeClient interface {
GetRawWithPath(path string) ([]byte, error)
GetPod(namespace, name string) (*v1.Pod, error)
@@ -385,6 +445,8 @@ func TryLoadK8sDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
return 0, nil, fmt.Errorf("Multus: Err in getting k8s network from pod: %v", err)
}
// TODO: add Device information into delegates from resourceMap
if err = conf.AddDelegates(delegates); err != nil {
return 0, nil, err
}
@@ -431,11 +493,19 @@ func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error)
func GetK8sNetwork(k8sclient KubeClient, k8sArgs *types.K8sArgs, confdir string) ([]*types.DelegateNetConf, error) {
netAnnot, defaultNamespace, err := getPodNetworkAnnotation(k8sclient, k8sArgs)
pod, err := k8sclient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
return nil, fmt.Errorf("getPodNetworkAnnotation: failed to query the pod %v in out of cluster comm: %v", string(k8sArgs.K8S_POD_NAME), err)
}
netAnnot, defaultNamespace, err := getPodNetworkAnnotation(pod)
if err != nil {
return nil, err
}
// Get Pod ComputeDevices info
resourceMap := GetComputeDeviceMap(pod)
if len(netAnnot) == 0 {
return nil, &NoK8sNetworkError{"no kubernetes network found"}
}
@@ -448,7 +518,7 @@ func GetK8sNetwork(k8sclient KubeClient, k8sArgs *types.K8sArgs, confdir string)
// Read all network objects referenced by 'networks'
var delegates []*types.DelegateNetConf
for _, net := range networks {
delegate, err := getKubernetesDelegate(k8sclient, net, confdir)
delegate, err := getKubernetesDelegate(k8sclient, net, confdir, resourceMap)
if err != nil {
return nil, fmt.Errorf("GetK8sNetwork: failed getting the delegate: %v", err)
}