forked from github/multus-cni
Use NPWG client library to manipulate net-attach-defs
This change introduce NPWG client library functions to manipulate net-attach-defs. This also requires to change k8sclient and unit test code as well.
This commit is contained in:
committed by
Tomofumi Hayashi
parent
cb3f59e7e7
commit
f4f2f65d1d
@@ -36,6 +36,10 @@ import (
|
||||
"github.com/intel/multus-cni/logging"
|
||||
"github.com/intel/multus-cni/types"
|
||||
nettypes "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/apis/k8s.cni.cncf.io/v1"
|
||||
netclient "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/typed/k8s.cni.cncf.io/v1"
|
||||
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
netfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -51,48 +55,45 @@ type NoK8sNetworkError struct {
|
||||
|
||||
// ClientInfo contains information given from k8s client
|
||||
type ClientInfo struct {
|
||||
Client KubeClient
|
||||
Podnamespace string
|
||||
Podname string
|
||||
Client kubernetes.Interface
|
||||
NetClient netclient.K8sCniCncfIoV1Interface
|
||||
}
|
||||
|
||||
func (c *ClientInfo) AddPod(pod *v1.Pod) (*v1.Pod, error) {
|
||||
return c.Client.Core().Pods(pod.ObjectMeta.Namespace).Create(pod)
|
||||
}
|
||||
|
||||
func (c *ClientInfo) GetPod(namespace, name string) (*v1.Pod, error) {
|
||||
return c.Client.Core().Pods(namespace).Get(name, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
func (c *ClientInfo) DeletePod(namespace, name string) error {
|
||||
return c.Client.Core().Pods(namespace).Delete(name, &metav1.DeleteOptions{})
|
||||
}
|
||||
|
||||
func (c *ClientInfo) AddNetAttachDef(netattach *nettypes.NetworkAttachmentDefinition) (*nettypes.NetworkAttachmentDefinition, error) {
|
||||
return c.NetClient.NetworkAttachmentDefinitions(netattach.ObjectMeta.Namespace).Create(netattach)
|
||||
}
|
||||
|
||||
func NewFakeClientInfo() *ClientInfo {
|
||||
return &ClientInfo{
|
||||
Client: fake.NewSimpleClientset(),
|
||||
NetClient: netfake.NewSimpleClientset().K8sCniCncfIoV1(),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *NoK8sNetworkError) Error() string { return string(e.message) }
|
||||
|
||||
type defaultKubeClient struct {
|
||||
client kubernetes.Interface
|
||||
}
|
||||
|
||||
// defaultKubeClient implements KubeClient
|
||||
var _ KubeClient = &defaultKubeClient{}
|
||||
|
||||
func (d *defaultKubeClient) GetRawWithPath(path string) ([]byte, error) {
|
||||
return d.client.ExtensionsV1beta1().RESTClient().Get().AbsPath(path).DoRaw()
|
||||
}
|
||||
|
||||
func (d *defaultKubeClient) GetPod(namespace, name string) (*v1.Pod, error) {
|
||||
return d.client.Core().Pods(namespace).Get(name, metav1.GetOptions{})
|
||||
}
|
||||
|
||||
func (d *defaultKubeClient) UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error) {
|
||||
return d.client.Core().Pods(pod.Namespace).UpdateStatus(pod)
|
||||
}
|
||||
|
||||
func setKubeClientInfo(c *ClientInfo, client KubeClient, k8sArgs *types.K8sArgs) {
|
||||
logging.Debugf("setKubeClientInfo: %v, %v, %v", c, client, k8sArgs)
|
||||
c.Client = client
|
||||
c.Podnamespace = string(k8sArgs.K8S_POD_NAMESPACE)
|
||||
c.Podname = string(k8sArgs.K8S_POD_NAME)
|
||||
}
|
||||
|
||||
// SetNetworkStatus sets network status into Pod annotation
|
||||
func SetNetworkStatus(client KubeClient, k8sArgs *types.K8sArgs, netStatus []*types.NetworkStatus, conf *types.NetConf) error {
|
||||
func SetNetworkStatus(client *ClientInfo, k8sArgs *types.K8sArgs, netStatus []*types.NetworkStatus, conf *types.NetConf) error {
|
||||
var err error
|
||||
logging.Debugf("SetNetworkStatus: %v, %v, %v, %v", client, k8sArgs, netStatus, conf)
|
||||
|
||||
client, err := GetK8sClient(conf.Kubeconfig, client)
|
||||
client, err = GetK8sClient(conf.Kubeconfig, client)
|
||||
if err != nil {
|
||||
return logging.Errorf("SetNetworkStatus: %v", err)
|
||||
}
|
||||
if client == nil {
|
||||
if client == nil || client.Client == nil {
|
||||
if len(conf.Delegates) == 0 {
|
||||
// No available kube client and no delegates, we can't do anything
|
||||
return logging.Errorf("SetNetworkStatus: must have either Kubernetes config or delegates")
|
||||
@@ -103,7 +104,7 @@ func SetNetworkStatus(client KubeClient, k8sArgs *types.K8sArgs, netStatus []*ty
|
||||
|
||||
podName := string(k8sArgs.K8S_POD_NAME)
|
||||
podNamespace := string(k8sArgs.K8S_POD_NAMESPACE)
|
||||
pod, err := client.GetPod(podNamespace, podName)
|
||||
pod, err := client.Client.Core().Pods(podNamespace).Get(podName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return logging.Errorf("SetNetworkStatus: failed to query the pod %v in out of cluster comm: %v", podName, err)
|
||||
}
|
||||
@@ -143,7 +144,7 @@ func SetNetworkStatus(client KubeClient, k8sArgs *types.K8sArgs, netStatus []*ty
|
||||
return nil
|
||||
}
|
||||
|
||||
func setPodNetworkAnnotation(client KubeClient, namespace string, pod *v1.Pod, networkstatus string) (*v1.Pod, error) {
|
||||
func setPodNetworkAnnotation(client *ClientInfo, namespace string, pod *v1.Pod, networkstatus string) (*v1.Pod, error) {
|
||||
logging.Debugf("setPodNetworkAnnotation: %v, %s, %v, %s", client, namespace, pod, networkstatus)
|
||||
//if pod annotations is empty, make sure it allocatable
|
||||
if len(pod.Annotations) == 0 {
|
||||
@@ -157,13 +158,13 @@ func setPodNetworkAnnotation(client KubeClient, namespace string, pod *v1.Pod, n
|
||||
if resultErr := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
if err != nil {
|
||||
// Re-get the pod unless it's the first attempt to update
|
||||
pod, err = client.GetPod(pod.Namespace, pod.Name)
|
||||
pod, err = client.Client.Core().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
pod, err = client.UpdatePodStatus(pod)
|
||||
pod, err = client.Client.Core().Pods(pod.Namespace).UpdateStatus(pod)
|
||||
return err
|
||||
}); resultErr != nil {
|
||||
return nil, logging.Errorf("setPodNetworkAnnotation: status update failed for pod %s/%s: %v", pod.Namespace, pod.Name, resultErr)
|
||||
@@ -376,20 +377,14 @@ func cniConfigFromNetworkResource(customResource *nettypes.NetworkAttachmentDefi
|
||||
return config, nil
|
||||
}
|
||||
|
||||
func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement, confdir string, pod *v1.Pod, resourceMap map[string]*types.ResourceInfo) (*types.DelegateNetConf, map[string]*types.ResourceInfo, error) {
|
||||
func getKubernetesDelegate(client *ClientInfo, net *types.NetworkSelectionElement, confdir string, pod *v1.Pod, resourceMap map[string]*types.ResourceInfo) (*types.DelegateNetConf, map[string]*types.ResourceInfo, error) {
|
||||
|
||||
logging.Debugf("getKubernetesDelegate: %v, %v, %s", client, net, confdir)
|
||||
rawPath := fmt.Sprintf("/apis/k8s.cni.cncf.io/v1/namespaces/%s/network-attachment-definitions/%s", net.Namespace, net.Name)
|
||||
netData, err := client.GetRawWithPath(rawPath)
|
||||
logging.Debugf("getKubernetesDelegate: %v, %v, %s, %v, %v", client, net, confdir, pod, resourceMap)
|
||||
customResource, err := client.NetClient.NetworkAttachmentDefinitions(net.Namespace).Get(net.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, resourceMap, logging.Errorf("getKubernetesDelegate: cannot find get a network-attachment-definition (%s) in namespace (%s): %v", net.Name, net.Namespace, err)
|
||||
}
|
||||
|
||||
customResource := &nettypes.NetworkAttachmentDefinition{}
|
||||
if err := json.Unmarshal(netData, customResource); err != nil {
|
||||
return nil, resourceMap, logging.Errorf("getKubernetesDelegate: failed to parse the network-attachment-definition: %v", err)
|
||||
}
|
||||
|
||||
// Get resourceName annotation from NetworkAttachmentDefinition
|
||||
deviceID := ""
|
||||
resourceName, ok := customResource.GetAnnotations()[resourceNameAnnot]
|
||||
@@ -432,13 +427,6 @@ func getKubernetesDelegate(client KubeClient, net *types.NetworkSelectionElement
|
||||
return delegate, resourceMap, nil
|
||||
}
|
||||
|
||||
// KubeClient is abstraction layer for k8s client (used testing package)
|
||||
type KubeClient interface {
|
||||
GetRawWithPath(path string) ([]byte, error)
|
||||
GetPod(namespace, name string) (*v1.Pod, error)
|
||||
UpdatePodStatus(pod *v1.Pod) (*v1.Pod, error)
|
||||
}
|
||||
|
||||
// GetK8sArgs gets k8s related args from CNI args
|
||||
func GetK8sArgs(args *skel.CmdArgs) (*types.K8sArgs, error) {
|
||||
k8sArgs := &types.K8sArgs{}
|
||||
@@ -454,17 +442,16 @@ func GetK8sArgs(args *skel.CmdArgs) (*types.K8sArgs, error) {
|
||||
|
||||
// TryLoadPodDelegates attempts to load Kubernetes-defined delegates and add them to the Multus config.
|
||||
// Returns the number of Kubernetes-defined delegates added or an error.
|
||||
func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient KubeClient) (int, *ClientInfo, error) {
|
||||
func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, clientInfo *ClientInfo) (int, *ClientInfo, error) {
|
||||
var err error
|
||||
clientInfo := &ClientInfo{}
|
||||
|
||||
logging.Debugf("TryLoadPodDelegates: %v, %v, %v", k8sArgs, conf, kubeClient)
|
||||
kubeClient, err = GetK8sClient(conf.Kubeconfig, kubeClient)
|
||||
logging.Debugf("TryLoadPodDelegates: %v, %v, %v", k8sArgs, conf, clientInfo)
|
||||
clientInfo, err = GetK8sClient(conf.Kubeconfig, clientInfo)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
||||
if kubeClient == nil {
|
||||
if clientInfo == nil {
|
||||
if len(conf.Delegates) == 0 {
|
||||
// No available kube client and no delegates, we can't do anything
|
||||
return 0, nil, logging.Errorf("TryLoadPodDelegates: must have either Kubernetes config or delegates")
|
||||
@@ -472,15 +459,14 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
setKubeClientInfo(clientInfo, kubeClient, k8sArgs)
|
||||
// Get the pod info. If cannot get it, we use cached delegates
|
||||
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
|
||||
pod, err := clientInfo.Client.Core().Pods(string(k8sArgs.K8S_POD_NAMESPACE)).Get(string(k8sArgs.K8S_POD_NAME), metav1.GetOptions{})
|
||||
if err != nil {
|
||||
logging.Debugf("TryLoadPodDelegates: Err in loading K8s cluster default network from pod annotation: %v, use cached delegates", err)
|
||||
return 0, nil, nil
|
||||
}
|
||||
|
||||
delegate, err := tryLoadK8sPodDefaultNetwork(kubeClient, pod, conf)
|
||||
delegate, err := tryLoadK8sPodDefaultNetwork(clientInfo, pod, conf)
|
||||
if err != nil {
|
||||
return 0, nil, logging.Errorf("TryLoadPodDelegates: error in loading K8s cluster default network from pod annotation: %v", err)
|
||||
}
|
||||
@@ -492,7 +478,7 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
|
||||
|
||||
networks, err := GetPodNetwork(pod)
|
||||
if networks != nil {
|
||||
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, conf.ConfDir, conf.NamespaceIsolation)
|
||||
delegates, err := GetNetworkDelegates(clientInfo, pod, networks, conf.ConfDir, conf.NamespaceIsolation)
|
||||
|
||||
if err != nil {
|
||||
if _, ok := err.(*NoK8sNetworkError); ok {
|
||||
@@ -526,7 +512,7 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
|
||||
}
|
||||
|
||||
// GetK8sClient gets client info from kubeconfig
|
||||
func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error) {
|
||||
func GetK8sClient(kubeconfig string, kubeClient *ClientInfo) (*ClientInfo, error) {
|
||||
logging.Debugf("GetK8sClient: %s, %v", kubeconfig, kubeClient)
|
||||
// If we get a valid kubeClient (eg from testcases) just return that
|
||||
// one.
|
||||
@@ -565,7 +551,15 @@ func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &defaultKubeClient{client: client}, nil
|
||||
netclient, err := netclient.NewForConfig(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &ClientInfo{
|
||||
Client: client,
|
||||
NetClient: netclient,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// GetPodNetwork gets net-attach-def annotation from pod
|
||||
@@ -587,7 +581,7 @@ func GetPodNetwork(pod *v1.Pod) ([]*types.NetworkSelectionElement, error) {
|
||||
}
|
||||
|
||||
// GetNetworkDelegates returns delegatenetconf from net-attach-def annotation in pod
|
||||
func GetNetworkDelegates(k8sclient KubeClient, pod *v1.Pod, networks []*types.NetworkSelectionElement, confdir string, confnamespaceIsolation bool) ([]*types.DelegateNetConf, error) {
|
||||
func GetNetworkDelegates(k8sclient *ClientInfo, pod *v1.Pod, networks []*types.NetworkSelectionElement, confdir string, confnamespaceIsolation bool) ([]*types.DelegateNetConf, error) {
|
||||
logging.Debugf("GetNetworkDelegates: %v, %v, %v, %v, %v", k8sclient, pod, networks, confdir, confnamespaceIsolation)
|
||||
// resourceMap holds Pod device allocation information; only initizized if CRD contains 'resourceName' annotation.
|
||||
// This will only be initialized once and all delegate objects can reference this to look up device info.
|
||||
@@ -618,19 +612,13 @@ func GetNetworkDelegates(k8sclient KubeClient, pod *v1.Pod, networks []*types.Ne
|
||||
return delegates, nil
|
||||
}
|
||||
|
||||
func getDefaultNetDelegateCRD(client KubeClient, net, confdir, namespace string) (*types.DelegateNetConf, error) {
|
||||
func getDefaultNetDelegateCRD(client *ClientInfo, net, confdir, namespace string) (*types.DelegateNetConf, error) {
|
||||
logging.Debugf("getDefaultNetDelegateCRD: %v, %v, %s, %s", client, net, confdir, namespace)
|
||||
rawPath := fmt.Sprintf("/apis/k8s.cni.cncf.io/v1/namespaces/%s/network-attachment-definitions/%s", namespace, net)
|
||||
netData, err := client.GetRawWithPath(rawPath)
|
||||
customResource, err := client.NetClient.NetworkAttachmentDefinitions(namespace).Get(net, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, logging.Errorf("getDefaultNetDelegateCRD: failed to get network resource: %v", err)
|
||||
}
|
||||
|
||||
customResource := &nettypes.NetworkAttachmentDefinition{}
|
||||
if err := json.Unmarshal(netData, customResource); err != nil {
|
||||
return nil, logging.Errorf("getDefaultNetDelegateCRD: failed to get the netplugin data: %v", err)
|
||||
}
|
||||
|
||||
configBytes, err := cniConfigFromNetworkResource(customResource, confdir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -644,7 +632,7 @@ func getDefaultNetDelegateCRD(client KubeClient, net, confdir, namespace string)
|
||||
return delegate, nil
|
||||
}
|
||||
|
||||
func getNetDelegate(client KubeClient, netname, confdir, namespace string) (*types.DelegateNetConf, error) {
|
||||
func getNetDelegate(client *ClientInfo, netname, confdir, namespace string) (*types.DelegateNetConf, error) {
|
||||
logging.Debugf("getNetDelegate: %v, %v, %v, %s", client, netname, confdir, namespace)
|
||||
// option1) search CRD object for the network
|
||||
delegate, err := getDefaultNetDelegateCRD(client, netname, confdir, namespace)
|
||||
@@ -689,7 +677,7 @@ func getNetDelegate(client KubeClient, netname, confdir, namespace string) (*typ
|
||||
}
|
||||
|
||||
// GetDefaultNetworks parses 'defaultNetwork' config, gets network json and put it into netconf.Delegates.
|
||||
func GetDefaultNetworks(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient KubeClient) error {
|
||||
func GetDefaultNetworks(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient *ClientInfo) error {
|
||||
logging.Debugf("GetDefaultNetworks: %v, %v, %v", k8sArgs, conf, kubeClient)
|
||||
var delegates []*types.DelegateNetConf
|
||||
|
||||
@@ -731,7 +719,7 @@ func GetDefaultNetworks(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
|
||||
}
|
||||
|
||||
// tryLoadK8sPodDefaultNetwork get pod default network from annotations
|
||||
func tryLoadK8sPodDefaultNetwork(kubeClient KubeClient, pod *v1.Pod, conf *types.NetConf) (*types.DelegateNetConf, error) {
|
||||
func tryLoadK8sPodDefaultNetwork(kubeClient *ClientInfo, pod *v1.Pod, conf *types.NetConf) (*types.DelegateNetConf, error) {
|
||||
var netAnnot string
|
||||
logging.Debugf("tryLoadK8sPodDefaultNetwork: %v, %v, %v", kubeClient, pod, conf)
|
||||
|
||||
|
Reference in New Issue
Block a user