Caches all pod delegates json for pods deletion without k8s info

This fixes #243 with following changes:
 + Optimize to fetch Pod from k8s client
 + Change to use cache always in DEL.
 + If failed to fetch the pod info from k8s clinet in deletion,
  use cached delegates as emergency bailout
 + Add test cases for cache
This commit is contained in:
Tomofumi Hayashi
2019-01-25 17:49:16 +09:00
committed by Tomofumi Hayashi
parent f0a43ca0a5
commit 5dc774a547
5 changed files with 379 additions and 122 deletions

View File

@@ -37,8 +37,9 @@ import (
) )
const ( const (
resourceNameAnnot = "k8s.v1.cni.cncf.io/resourceName" resourceNameAnnot = "k8s.v1.cni.cncf.io/resourceName"
defaultNetAnnot = "v1.multus-cni.io/default-network" defaultNetAnnot = "v1.multus-cni.io/default-network"
networkAttachmentAnnot = "k8s.v1.cni.cncf.io/networks"
) )
// NoK8sNetworkError indicates error, no network in kubernetes // NoK8sNetworkError indicates error, no network in kubernetes
@@ -80,12 +81,14 @@ func setKubeClientInfo(c *clientInfo, client KubeClient, k8sArgs *types.K8sArgs)
c.Podname = string(k8sArgs.K8S_POD_NAME) c.Podname = string(k8sArgs.K8S_POD_NAME)
} }
func SetNetworkStatus(k *clientInfo, netStatus []*types.NetworkStatus) error { func SetNetworkStatus(client KubeClient, k8sArgs *types.K8sArgs, netStatus []*types.NetworkStatus) error {
logging.Debugf("SetNetworkStatus: %v, %v", k, netStatus) logging.Debugf("SetNetworkStatus: %v, %v, %v", client, k8sArgs, netStatus)
pod, err := k.Client.GetPod(k.Podnamespace, k.Podname) podName := string(k8sArgs.K8S_POD_NAME)
podNamespace := string(k8sArgs.K8S_POD_NAMESPACE)
pod, err := client.GetPod(podNamespace, podName)
if err != nil { if err != nil {
return logging.Errorf("SetNetworkStatus: failed to query the pod %v in out of cluster comm: %v", k.Podname, err) return logging.Errorf("SetNetworkStatus: failed to query the pod %v in out of cluster comm: %v", podName, err)
} }
var networkStatuses string var networkStatuses string
@@ -101,9 +104,9 @@ func SetNetworkStatus(k *clientInfo, netStatus []*types.NetworkStatus) error {
networkStatuses = fmt.Sprintf("[%s]", strings.Join(networkStatus, ",")) networkStatuses = fmt.Sprintf("[%s]", strings.Join(networkStatus, ","))
} }
_, err = setPodNetworkAnnotation(k.Client, k.Podnamespace, pod, networkStatuses) _, err = setPodNetworkAnnotation(client, podNamespace, pod, networkStatuses)
if err != nil { if err != nil {
return logging.Errorf("SetNetworkStatus: failed to update the pod %v in out of cluster comm: %v", k.Podname, err) return logging.Errorf("SetNetworkStatus: failed to update the pod %v in out of cluster comm: %v", podName, err)
} }
return nil return nil
@@ -137,18 +140,6 @@ func setPodNetworkAnnotation(client KubeClient, namespace string, pod *v1.Pod, n
return pod, nil return pod, nil
} }
func getPodNetworkAnnotation(client KubeClient, k8sArgs *types.K8sArgs) (string, string, string, error) {
var err error
logging.Debugf("getPodNetworkAnnotation: %v, %v", client, k8sArgs)
pod, err := client.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
return "", "", "", logging.Errorf("getPodNetworkAnnotation: failed to query the pod %v in out of cluster comm: %v", string(k8sArgs.K8S_POD_NAME), err)
}
return pod.Annotations["k8s.v1.cni.cncf.io/networks"], pod.ObjectMeta.Namespace, string(pod.UID), nil
}
func parsePodNetworkObjectName(podnetwork string) (string, string, string, error) { func parsePodNetworkObjectName(podnetwork string) (string, string, string, error) {
var netNsName string var netNsName string
var netIfName string var netIfName string
@@ -427,8 +418,14 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
} }
setKubeClientInfo(clientInfo, kubeClient, k8sArgs) setKubeClientInfo(clientInfo, kubeClient, k8sArgs)
// Get the pod info. If cannot get it, we use cached delegates
pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
logging.Debugf("tryLoadK8sDelegates: Err in loading K8s cluster default network from pod annotation: %v, use cached delegates", err)
return 0, nil, nil
}
delegate, err := tryLoadK8sPodDefaultNetwork(k8sArgs, conf, kubeClient) delegate, err := tryLoadK8sPodDefaultNetwork(kubeClient, pod, conf)
if err != nil { if err != nil {
return 0, nil, logging.Errorf("tryLoadK8sDelegates: Err in loading K8s cluster default network from pod annotation: %v", err) return 0, nil, logging.Errorf("tryLoadK8sDelegates: Err in loading K8s cluster default network from pod annotation: %v", err)
} }
@@ -438,19 +435,23 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
conf.Delegates[0] = delegate conf.Delegates[0] = delegate
} }
delegates, err := GetPodNetwork(kubeClient, k8sArgs, conf.ConfDir, conf.NamespaceIsolation) networks, err := GetPodNetwork(pod)
if err != nil { if networks != nil {
if _, ok := err.(*NoK8sNetworkError); ok { delegates, err := GetNetworkDelegates(kubeClient, pod, networks, conf.ConfDir, conf.NamespaceIsolation)
return 0, clientInfo, nil
if err != nil {
if _, ok := err.(*NoK8sNetworkError); ok {
return 0, clientInfo, nil
}
return 0, nil, logging.Errorf("Multus: Err in getting k8s network from pod: %v", err)
} }
return 0, nil, logging.Errorf("Multus: Err in getting k8s network from pod: %v", err)
}
if err = conf.AddDelegates(delegates); err != nil { if err = conf.AddDelegates(delegates); err != nil {
return 0, nil, err return 0, nil, err
}
return len(delegates), clientInfo, nil
} }
return 0, clientInfo, nil
return len(delegates), clientInfo, nil
} }
func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error) { func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error) {
@@ -495,17 +496,11 @@ func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error)
return &defaultKubeClient{client: client}, nil return &defaultKubeClient{client: client}, nil
} }
func GetPodNetwork(k8sclient KubeClient, k8sArgs *types.K8sArgs, confdir string, confnamespaceisolation bool) ([]*types.DelegateNetConf, error) { func GetPodNetwork(pod *v1.Pod) ([]*types.NetworkSelectionElement, error) {
logging.Debugf("GetPodNetwork: %v, %v, %v", k8sclient, k8sArgs, confdir) logging.Debugf("GetPodNetwork: %v", pod)
netAnnot, defaultNamespace, podID, err := getPodNetworkAnnotation(k8sclient, k8sArgs) netAnnot := pod.Annotations[networkAttachmentAnnot]
if err != nil { defaultNamespace := pod.ObjectMeta.Namespace
return nil, err
}
if err != nil {
return nil, logging.Errorf("GetK8sNetwork: failed to get resourceMap for PodUID: %v %v", podID, err)
}
if len(netAnnot) == 0 { if len(netAnnot) == 0 {
return nil, &NoK8sNetworkError{"no kubernetes network found"} return nil, &NoK8sNetworkError{"no kubernetes network found"}
@@ -515,24 +510,31 @@ func GetPodNetwork(k8sclient KubeClient, k8sArgs *types.K8sArgs, confdir string,
if err != nil { if err != nil {
return nil, err return nil, err
} }
return networks, nil
}
func GetNetworkDelegates(k8sclient KubeClient, pod *v1.Pod, networks []*types.NetworkSelectionElement, confdir string, confnamespaceIsolation bool) ([]*types.DelegateNetConf, error) {
logging.Debugf("GetNetworkDelegates: %v, %v, %v, %v, %v", k8sclient, pod, networks, confdir, confnamespaceIsolation)
// resourceMap holds Pod device allocation information; only initizized if CRD contains 'resourceName' annotation. // resourceMap holds Pod device allocation information; only initizized if CRD contains 'resourceName' annotation.
// This will only be initialized once and all delegate objects can reference this to look up device info. // This will only be initialized once and all delegate objects can reference this to look up device info.
var resourceMap map[string]*types.ResourceInfo var resourceMap map[string]*types.ResourceInfo
// Read all network objects referenced by 'networks' // Read all network objects referenced by 'networks'
var delegates []*types.DelegateNetConf var delegates []*types.DelegateNetConf
defaultNamespace := pod.ObjectMeta.Namespace
podID := pod.UID
for _, net := range networks { for _, net := range networks {
// The pods namespace (stored as defaultNamespace, does not equal the annotation's target namespace in net.Namespace) // The pods namespace (stored as defaultNamespace, does not equal the annotation's target namespace in net.Namespace)
// In the case that this is a mismatch when namespaceisolation is enabled, this should be an error. // In the case that this is a mismatch when namespaceisolation is enabled, this should be an error.
if confnamespaceisolation { if confnamespaceIsolation {
if defaultNamespace != net.Namespace { if defaultNamespace != net.Namespace {
return nil, logging.Errorf("GetPodNetwork: namespace isolation violation: podnamespace: %v / target namespace: %v", defaultNamespace, net.Namespace) return nil, logging.Errorf("GetPodNetwork: namespace isolation violation: podnamespace: %v / target namespace: %v", defaultNamespace, net.Namespace)
} }
} }
delegate, updatedResourceMap, err := getKubernetesDelegate(k8sclient, net, confdir, podID, resourceMap) delegate, updatedResourceMap, err := getKubernetesDelegate(k8sclient, net, confdir, string(podID), resourceMap)
if err != nil { if err != nil {
return nil, logging.Errorf("GetPodNetwork: failed getting the delegate: %v", err) return nil, logging.Errorf("GetPodNetwork: failed getting the delegate: %v", err)
} }
@@ -655,28 +657,13 @@ func GetDefaultNetworks(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient
return nil return nil
} }
func getPodDefaultNetworkAnnotation(client KubeClient, k8sArgs *types.K8sArgs) (string, error) {
logging.Debugf("getPodDefaultNetworkAnnotation: %v, %v", client, k8sArgs)
pod, err := client.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
if err != nil {
return "", logging.Errorf("getPodDefaultNetworkAnnotation: failed to query the pod %v in out of cluster comm: %v", string(k8sArgs.K8S_POD_NAME), err)
}
if v, ok := pod.Annotations[defaultNetAnnot]; ok {
return v, nil
}
return "", nil
}
// tryLoadK8sPodDefaultNetwork get pod default network from annotations // tryLoadK8sPodDefaultNetwork get pod default network from annotations
func tryLoadK8sPodDefaultNetwork(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient KubeClient) (*types.DelegateNetConf, error) { func tryLoadK8sPodDefaultNetwork(kubeClient KubeClient, pod *v1.Pod, conf *types.NetConf) (*types.DelegateNetConf, error) {
logging.Debugf("tryLoadK8sPodDefaultNetwork: %v, %v", kubeClient, k8sArgs) var netAnnot string
logging.Debugf("tryLoadK8sPodDefaultNetwork: %v, %v, %v", kubeClient, pod, conf)
netAnnot, err := getPodDefaultNetworkAnnotation(kubeClient, k8sArgs) netAnnot, ok := pod.Annotations[defaultNetAnnot]
if err != nil { if !ok {
return nil, logging.Errorf("tryLoadK8sPodDefaultNetwork: failed to get pod annotation: %v", err)
}
if netAnnot == "" {
logging.Debugf("tryLoadK8sPodDefaultNetwork: Pod default network annotation is not defined") logging.Debugf("tryLoadK8sPodDefaultNetwork: Pod default network annotation is not defined")
return nil, nil return nil, nil
} }

View File

@@ -82,7 +82,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args) k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(fKubeClient.PodCount).To(Equal(1)) Expect(fKubeClient.PodCount).To(Equal(1))
Expect(fKubeClient.NetCount).To(Equal(2)) Expect(fKubeClient.NetCount).To(Equal(2))
@@ -115,7 +118,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args) k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(len(delegates)).To(Equal(0)) Expect(len(delegates)).To(Equal(0))
Expect(err).To(MatchError("GetPodNetwork: failed getting the delegate: getKubernetesDelegate: failed to get network resource, refer Multus README.md for the usage guide: resource not found")) Expect(err).To(MatchError("GetPodNetwork: failed getting the delegate: getKubernetesDelegate: failed to get network resource, refer Multus README.md for the usage guide: resource not found"))
}) })
@@ -159,7 +165,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args) k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(fKubeClient.PodCount).To(Equal(1)) Expect(fKubeClient.PodCount).To(Equal(1))
Expect(fKubeClient.NetCount).To(Equal(3)) Expect(fKubeClient.NetCount).To(Equal(3))
@@ -186,8 +195,9 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args) k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
Expect(len(delegates)).To(Equal(0)) networks, err := GetPodNetwork(pod)
Expect(len(networks)).To(Equal(0))
Expect(err).To(MatchError("parsePodNetworkAnnotation: failed to parse pod Network Attachment Selection Annotation JSON format: invalid character 'a' looking for beginning of value")) Expect(err).To(MatchError("parsePodNetworkAnnotation: failed to parse pod Network Attachment Selection Annotation JSON format: invalid character 'a' looking for beginning of value"))
}) })
@@ -216,7 +226,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args) k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(fKubeClient.PodCount).To(Equal(1)) Expect(fKubeClient.PodCount).To(Equal(1))
Expect(fKubeClient.NetCount).To(Equal(2)) Expect(fKubeClient.NetCount).To(Equal(2))
@@ -242,7 +255,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args) k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(fKubeClient.PodCount).To(Equal(1)) Expect(fKubeClient.PodCount).To(Equal(1))
Expect(fKubeClient.NetCount).To(Equal(1)) Expect(fKubeClient.NetCount).To(Equal(1))
@@ -273,7 +289,10 @@ var _ = Describe("k8sclient operations", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
k8sArgs, err := GetK8sArgs(args) k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false)
Expect(len(delegates)).To(Equal(0)) Expect(len(delegates)).To(Equal(0))
Expect(err).To(MatchError(fmt.Sprintf("GetPodNetwork: failed getting the delegate: cniConfigFromNetworkResource: err in getCNIConfigFromFile: Error loading CNI config file %s: error parsing configuration: invalid character 'a' looking for beginning of value", net2Name))) Expect(err).To(MatchError(fmt.Sprintf("GetPodNetwork: failed getting the delegate: cniConfigFromNetworkResource: err in getCNIConfigFromFile: Error loading CNI config file %s: error parsing configuration: invalid character 'a' looking for beginning of value", net2Name)))
}) })
@@ -584,7 +603,10 @@ var _ = Describe("k8sclient operations", func() {
k8sArgs, err := GetK8sArgs(args) k8sArgs, err := GetK8sArgs(args)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
_, err = GetPodNetwork(kubeClient, k8sArgs, tmpDir, netConf.NamespaceIsolation) pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME))
networks, err := GetPodNetwork(pod)
Expect(err).NotTo(HaveOccurred())
_, err = GetNetworkDelegates(kubeClient, pod, networks, tmpDir, netConf.NamespaceIsolation)
Expect(err).To(HaveOccurred()) Expect(err).To(HaveOccurred())
Expect(err).To(MatchError("GetPodNetwork: namespace isolation violation: podnamespace: test / target namespace: kube-system")) Expect(err).To(MatchError("GetPodNetwork: namespace isolation violation: podnamespace: test / target namespace: kube-system"))

View File

@@ -74,12 +74,12 @@ func saveScratchNetConf(containerID, dataDir string, netconf []byte) error {
return err return err
} }
func consumeScratchNetConf(containerID, dataDir string) ([]byte, error) { func consumeScratchNetConf(containerID, dataDir string) ([]byte, string, error) {
logging.Debugf("consumeScratchNetConf: %s, %s", containerID, dataDir) logging.Debugf("consumeScratchNetConf: %s, %s", containerID, dataDir)
path := filepath.Join(dataDir, containerID) path := filepath.Join(dataDir, containerID)
defer os.Remove(path)
return ioutil.ReadFile(path) b, err := ioutil.ReadFile(path)
return b, path, err
} }
func getIfname(delegate *types.DelegateNetConf, argif string, idx int) string { func getIfname(delegate *types.DelegateNetConf, argif string, idx int) string {
@@ -105,12 +105,23 @@ func saveDelegates(containerID, dataDir string, delegates []*types.DelegateNetCo
} }
if err = saveScratchNetConf(containerID, dataDir, delegatesBytes); err != nil { if err = saveScratchNetConf(containerID, dataDir, delegatesBytes); err != nil {
return logging.Errorf("error in saving the delegates : %v", err) return logging.Errorf("error in saving the delegates : %v", err)
} }
return err return err
} }
func deleteDelegates(containerID, dataDir string) error {
logging.Debugf("deleteDelegates: %s, %s", containerID, dataDir)
path := filepath.Join(dataDir, containerID)
if err := os.Remove(path); err != nil {
return logging.Errorf("error in deleting the delegates : %v", err)
}
return nil
}
func validateIfName(nsname string, ifname string) error { func validateIfName(nsname string, ifname string) error {
logging.Debugf("validateIfName: %s, %s", nsname, ifname) logging.Debugf("validateIfName: %s, %s", nsname, ifname)
podNs, err := ns.GetNS(nsname) podNs, err := ns.GetNS(nsname)
@@ -282,8 +293,8 @@ func delPlugins(exec invoke.Exec, argIfname string, delegates []*types.DelegateN
} }
func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cnitypes.Result, error) { func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cnitypes.Result, error) {
logging.Debugf("cmdAdd: %v, %v, %v", args, exec, kubeClient)
n, err := types.LoadNetConf(args.StdinData) n, err := types.LoadNetConf(args.StdinData)
logging.Debugf("cmdAdd: %v, %v, %v", args, exec, kubeClient)
if err != nil { if err != nil {
return nil, logging.Errorf("err in loading netconf: %v", err) return nil, logging.Errorf("err in loading netconf: %v", err)
} }
@@ -312,16 +323,14 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cn
n.Delegates[0].MasterPlugin = true n.Delegates[0].MasterPlugin = true
} }
numK8sDelegates, kc, err := k8s.TryLoadPodDelegates(k8sArgs, n, kubeClient) _, kc, err := k8s.TryLoadPodDelegates(k8sArgs, n, kubeClient)
if err != nil { if err != nil {
return nil, logging.Errorf("Multus: Err in loading K8s Delegates k8s args: %v", err) return nil, logging.Errorf("Multus: Err in loading K8s Delegates k8s args: %v", err)
} }
if numK8sDelegates == 0 { // cache the multus config
// cache the multus config if we have only Multus delegates if err := saveDelegates(args.ContainerID, n.CNIDir, n.Delegates); err != nil {
if err := saveDelegates(args.ContainerID, n.CNIDir, n.Delegates); err != nil { return nil, logging.Errorf("Multus: Err in saving the delegates: %v", err)
return nil, logging.Errorf("Multus: Err in saving the delegates: %v", err)
}
} }
var result, tmpResult cnitypes.Result var result, tmpResult cnitypes.Result
@@ -363,7 +372,7 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cn
//set the network status annotation in apiserver, only in case Multus as kubeconfig //set the network status annotation in apiserver, only in case Multus as kubeconfig
if n.Kubeconfig != "" && kc != nil { if n.Kubeconfig != "" && kc != nil {
if !types.CheckSystemNamespaces(kc.Podnamespace, n.SystemNamespaces) { if !types.CheckSystemNamespaces(kc.Podnamespace, n.SystemNamespaces) {
err = k8s.SetNetworkStatus(kc, netStatus) err = k8s.SetNetworkStatus(kubeClient, k8sArgs, netStatus)
if err != nil { if err != nil {
return nil, logging.Errorf("Multus: Err set the networks status: %v", err) return nil, logging.Errorf("Multus: Err set the networks status: %v", err)
} }
@@ -386,8 +395,8 @@ func cmdGet(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cn
} }
func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) error { func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) error {
logging.Debugf("cmdDel: %v, %v, %v", args, exec, kubeClient)
in, err := types.LoadNetConf(args.StdinData) in, err := types.LoadNetConf(args.StdinData)
logging.Debugf("cmdDel: %v, %v, %v", args, exec, kubeClient)
if err != nil { if err != nil {
return err return err
} }
@@ -397,7 +406,7 @@ func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) err
} }
netns, err := ns.GetNS(args.Netns) netns, err := ns.GetNS(args.Netns)
if err != nil { if err != nil {
// if NetNs is passed down by the Cloud Orchestration Engine, or if it called multiple times // if NetNs is passed down by the Cloud Orchestration Engine, or if it called multiple times
// so don't return an error if the device is already removed. // so don't return an error if the device is already removed.
// https://github.com/kubernetes/kubernetes/issues/43014#issuecomment-287164444 // https://github.com/kubernetes/kubernetes/issues/43014#issuecomment-287164444
_, ok := err.(ns.NSPathNotExistErr) _, ok := err.(ns.NSPathNotExistErr)
@@ -417,42 +426,49 @@ func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) err
return logging.Errorf("Multus: Err in getting k8s args: %v", err) return logging.Errorf("Multus: Err in getting k8s args: %v", err)
} }
if in.ClusterNetwork != "" { // Read the cache to get delegates json for the pod
err = k8s.GetDefaultNetworks(k8sArgs, in, kubeClient) netconfBytes, path, err := consumeScratchNetConf(args.ContainerID, in.CNIDir)
if err != nil { if err != nil {
return logging.Errorf("Multus: Failed to get clusterNetwork/defaultNetworks: %v", err) // Fetch delegates again if cache is not exist
if os.IsNotExist(err) {
if in.ClusterNetwork != "" {
err = k8s.GetDefaultNetworks(k8sArgs, in, kubeClient)
if err != nil {
return logging.Errorf("Multus: Failed to get clusterNetwork/defaultNetworks: %v", err)
}
// First delegate is always the master plugin
in.Delegates[0].MasterPlugin = true
}
// Get pod annotation and so on
_, _, err := k8s.TryLoadPodDelegates(k8sArgs, in, kubeClient)
if err != nil {
if len(in.Delegates) == 0 {
// No delegate available so send error
return logging.Errorf("Multus: failed to get delegates: %v", err)
}
// Get clusterNetwork before, so continue to delete
logging.Errorf("Multus: failed to get delegates: %v, but continue to delete clusterNetwork", err)
}
} else {
return logging.Errorf("Multus: Err in reading the delegates: %v", err)
}
} else {
defer os.Remove(path)
if err := json.Unmarshal(netconfBytes, &in.Delegates); err != nil {
return logging.Errorf("Multus: failed to load netconf: %v", err)
} }
// First delegate is always the master plugin // First delegate is always the master plugin
in.Delegates[0].MasterPlugin = true in.Delegates[0].MasterPlugin = true
} }
numK8sDelegates, kc, err := k8s.TryLoadPodDelegates(k8sArgs, in, kubeClient) // unset the network status annotation in apiserver, only in case Multus as kubeconfig
if err != nil { if in.Kubeconfig != "" {
return err if !types.CheckSystemNamespaces(string(k8sArgs.K8S_POD_NAMESPACE), in.SystemNamespaces) {
} err := k8s.SetNetworkStatus(kubeClient, k8sArgs, nil)
if numK8sDelegates == 0 {
// re-read the scratch multus config if we have only Multus delegates
netconfBytes, err := consumeScratchNetConf(args.ContainerID, in.CNIDir)
if err != nil {
if os.IsNotExist(err) {
// Per spec should ignore error if resources are missing / already removed
return nil
}
return logging.Errorf("Multus: Err in reading the delegates: %v", err)
}
if err := json.Unmarshal(netconfBytes, &in.Delegates); err != nil {
return logging.Errorf("Multus: failed to load netconf: %v", err)
}
}
//unset the network status annotation in apiserver, only in case Multus as kubeconfig
if in.Kubeconfig != "" && kc != nil {
if !types.CheckSystemNamespaces(kc.Podnamespace, in.SystemNamespaces) {
err := k8s.SetNetworkStatus(kc, nil)
if err != nil { if err != nil {
return logging.Errorf("Multus: Err unset the networks status: %v", err) // error happen but continue to delete
logging.Errorf("Multus: Err unset the networks status: %v", err)
} }
} }
} }

View File

@@ -373,7 +373,7 @@ var _ = Describe("multus operations", func() {
result, err := cmdAdd(args, fExec, fKubeClient) result, err := cmdAdd(args, fExec, fKubeClient)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
Expect(fKubeClient.PodCount).To(Equal(3)) Expect(fKubeClient.PodCount).To(Equal(2))
Expect(fKubeClient.NetCount).To(Equal(2)) Expect(fKubeClient.NetCount).To(Equal(2))
r := result.(*types020.Result) r := result.(*types020.Result)
// plugin 1 is the masterplugin // plugin 1 is the masterplugin
@@ -452,13 +452,81 @@ var _ = Describe("multus operations", func() {
result, err := cmdAdd(args, fExec, fKubeClient) result, err := cmdAdd(args, fExec, fKubeClient)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
Expect(fKubeClient.PodCount).To(Equal(3)) Expect(fKubeClient.PodCount).To(Equal(2))
Expect(fKubeClient.NetCount).To(Equal(2)) Expect(fKubeClient.NetCount).To(Equal(2))
r := result.(*types020.Result) r := result.(*types020.Result)
// plugin 1 is the masterplugin // plugin 1 is the masterplugin
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())
}) })
It("executes kubernetes networks and delete it after pod removal", func() {
fakePod := testhelpers.NewFakePod("testpod", "net1", "")
net1 := `{
"name": "net1",
"type": "mynet",
"cniVersion": "0.2.0"
}`
args := &skel.CmdArgs{
ContainerID: "123456789",
Netns: testNS.Path(),
IfName: "eth0",
Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace),
StdinData: []byte(`{
"name": "node-cni-network",
"type": "multus",
"kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml",
"delegates": [{
"name": "weave1",
"cniVersion": "0.2.0",
"type": "weave-net"
}]
}`),
}
fExec := &fakeExec{}
expectedResult1 := &types020.Result{
CNIVersion: "0.2.0",
IP4: &types020.IPConfig{
IP: *testhelpers.EnsureCIDR("1.1.1.2/24"),
},
}
expectedConf1 := `{
"name": "weave1",
"cniVersion": "0.2.0",
"type": "weave-net"
}`
fExec.addPlugin(nil, "eth0", expectedConf1, expectedResult1, nil)
fExec.addPlugin(nil, "net1", net1, &types020.Result{
CNIVersion: "0.2.0",
IP4: &types020.IPConfig{
IP: *testhelpers.EnsureCIDR("1.1.1.3/24"),
},
}, nil)
fKubeClient := testhelpers.NewFakeKubeClient()
fKubeClient.AddPod(fakePod)
fKubeClient.AddNetConfig(fakePod.ObjectMeta.Namespace, "net1", net1)
os.Setenv("CNI_COMMAND", "ADD")
os.Setenv("CNI_IFNAME", "eth0")
result, err := cmdAdd(args, fExec, fKubeClient)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
Expect(fKubeClient.PodCount).To(Equal(2))
Expect(fKubeClient.NetCount).To(Equal(1))
r := result.(*types020.Result)
// plugin 1 is the masterplugin
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())
os.Setenv("CNI_COMMAND", "DEL")
os.Setenv("CNI_IFNAME", "eth0")
// set fKubeClient to nil to emulate no pod info
fKubeClient.DeletePod(fakePod)
err = cmdDel(args, fExec, fKubeClient)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
})
It("ensure delegates get portmap runtime config", func() { It("ensure delegates get portmap runtime config", func() {
args := &skel.CmdArgs{ args := &skel.CmdArgs{
ContainerID: "123456789", ContainerID: "123456789",
@@ -525,9 +593,9 @@ var _ = Describe("multus operations", func() {
StdinData: []byte(`{ StdinData: []byte(`{
"name": "node-cni-network", "name": "node-cni-network",
"type": "multus", "type": "multus",
"kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml",
"defaultNetworks": [], "defaultNetworks": [],
"clusterNetwork": "net1", "clusterNetwork": "net1",
"delegates": [] "delegates": []
}`), }`),
} }
@@ -544,7 +612,7 @@ var _ = Describe("multus operations", func() {
result, err := cmdAdd(args, fExec, fKubeClient) result, err := cmdAdd(args, fExec, fKubeClient)
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
Expect(fKubeClient.PodCount).To(Equal(3)) Expect(fKubeClient.PodCount).To(Equal(2))
Expect(fKubeClient.NetCount).To(Equal(2)) Expect(fKubeClient.NetCount).To(Equal(2))
r := result.(*types020.Result) r := result.(*types020.Result)
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())
@@ -555,4 +623,163 @@ var _ = Describe("multus operations", func() {
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
}) })
It("Verify the cache is created in dataDir", func() {
tmpCNIDir := tmpDir + "/cniData"
err := os.Mkdir(tmpCNIDir, 0777)
Expect(err).NotTo(HaveOccurred())
fakePod := testhelpers.NewFakePod("testpod", "net1", "")
net1 := `{
"name": "net1",
"type": "mynet",
"cniVersion": "0.2.0"
}`
args := &skel.CmdArgs{
ContainerID: "123456789",
Netns: testNS.Path(),
IfName: "eth0",
Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace),
StdinData: []byte(fmt.Sprintf(`{
"name": "node-cni-network",
"type": "multus",
"kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml",
"cniDir": "%s",
"delegates": [{
"name": "weave1",
"cniVersion": "0.2.0",
"type": "weave-net"
}]
}`, tmpCNIDir)),
}
fExec := &fakeExec{}
expectedResult1 := &types020.Result{
CNIVersion: "0.2.0",
IP4: &types020.IPConfig{
IP: *testhelpers.EnsureCIDR("1.1.1.2/24"),
},
}
expectedConf1 := `{
"name": "weave1",
"cniVersion": "0.2.0",
"type": "weave-net"
}`
fExec.addPlugin(nil, "eth0", expectedConf1, expectedResult1, nil)
fExec.addPlugin(nil, "net1", net1, &types020.Result{
CNIVersion: "0.2.0",
IP4: &types020.IPConfig{
IP: *testhelpers.EnsureCIDR("1.1.1.3/24"),
},
}, nil)
fKubeClient := testhelpers.NewFakeKubeClient()
fKubeClient.AddPod(fakePod)
fKubeClient.AddNetConfig(fakePod.ObjectMeta.Namespace, "net1", net1)
os.Setenv("CNI_COMMAND", "ADD")
os.Setenv("CNI_IFNAME", "eth0")
result, err := cmdAdd(args, fExec, fKubeClient)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
Expect(fKubeClient.PodCount).To(Equal(2))
Expect(fKubeClient.NetCount).To(Equal(1))
r := result.(*types020.Result)
// plugin 1 is the masterplugin
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())
By("Verify cache file existence")
cacheFilePath := fmt.Sprintf("%s/%s", tmpCNIDir, "123456789")
_, err = os.Stat(cacheFilePath)
Expect(err).NotTo(HaveOccurred())
By("Delete and check net count is not incremented")
os.Setenv("CNI_COMMAND", "DEL")
os.Setenv("CNI_IFNAME", "eth0")
err = cmdDel(args, fExec, fKubeClient)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
Expect(fKubeClient.PodCount).To(Equal(3))
Expect(fKubeClient.NetCount).To(Equal(1))
})
It("Delete pod without cache", func() {
tmpCNIDir := tmpDir + "/cniData"
err := os.Mkdir(tmpCNIDir, 0777)
Expect(err).NotTo(HaveOccurred())
fakePod := testhelpers.NewFakePod("testpod", "net1", "")
net1 := `{
"name": "net1",
"type": "mynet",
"cniVersion": "0.2.0"
}`
args := &skel.CmdArgs{
ContainerID: "123456789",
Netns: testNS.Path(),
IfName: "eth0",
Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace),
StdinData: []byte(fmt.Sprintf(`{
"name": "node-cni-network",
"type": "multus",
"kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml",
"cniDir": "%s",
"delegates": [{
"name": "weave1",
"cniVersion": "0.2.0",
"type": "weave-net"
}]
}`, tmpCNIDir)),
}
fExec := &fakeExec{}
expectedResult1 := &types020.Result{
CNIVersion: "0.2.0",
IP4: &types020.IPConfig{
IP: *testhelpers.EnsureCIDR("1.1.1.2/24"),
},
}
expectedConf1 := `{
"name": "weave1",
"cniVersion": "0.2.0",
"type": "weave-net"
}`
fExec.addPlugin(nil, "eth0", expectedConf1, expectedResult1, nil)
fExec.addPlugin(nil, "net1", net1, &types020.Result{
CNIVersion: "0.2.0",
IP4: &types020.IPConfig{
IP: *testhelpers.EnsureCIDR("1.1.1.3/24"),
},
}, nil)
fKubeClient := testhelpers.NewFakeKubeClient()
fKubeClient.AddPod(fakePod)
fKubeClient.AddNetConfig(fakePod.ObjectMeta.Namespace, "net1", net1)
os.Setenv("CNI_COMMAND", "ADD")
os.Setenv("CNI_IFNAME", "eth0")
result, err := cmdAdd(args, fExec, fKubeClient)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.addIndex).To(Equal(len(fExec.plugins)))
Expect(fKubeClient.PodCount).To(Equal(2))
Expect(fKubeClient.NetCount).To(Equal(1))
r := result.(*types020.Result)
// plugin 1 is the masterplugin
Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue())
By("Verify cache file existence")
cacheFilePath := fmt.Sprintf("%s/%s", tmpCNIDir, "123456789")
_, err = os.Stat(cacheFilePath)
Expect(err).NotTo(HaveOccurred())
err = os.Remove(cacheFilePath)
Expect(err).NotTo(HaveOccurred())
By("Delete and check pod/net count is incremented")
os.Setenv("CNI_COMMAND", "DEL")
os.Setenv("CNI_IFNAME", "eth0")
err = cmdDel(args, fExec, fKubeClient)
Expect(err).NotTo(HaveOccurred())
Expect(fExec.delIndex).To(Equal(len(fExec.plugins)))
Expect(fKubeClient.PodCount).To(Equal(4))
Expect(fKubeClient.NetCount).To(Equal(2))
})
}) })

View File

@@ -103,6 +103,11 @@ func (f *FakeKubeClient) AddPod(pod *v1.Pod) {
f.pods[key] = pod f.pods[key] = pod
} }
func (f *FakeKubeClient) DeletePod(pod *v1.Pod) {
key := fmt.Sprintf("%s/%s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name)
delete(f.pods, key)
}
func NewFakePod(name string, netAnnotation string, defaultNetAnnotation string) *v1.Pod { func NewFakePod(name string, netAnnotation string, defaultNetAnnotation string) *v1.Pod {
pod := &v1.Pod{ pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{