diff --git a/k8sclient/k8sclient.go b/k8sclient/k8sclient.go index 0821af29b..435ce2ce9 100644 --- a/k8sclient/k8sclient.go +++ b/k8sclient/k8sclient.go @@ -37,8 +37,9 @@ import ( ) const ( - resourceNameAnnot = "k8s.v1.cni.cncf.io/resourceName" - defaultNetAnnot = "v1.multus-cni.io/default-network" + resourceNameAnnot = "k8s.v1.cni.cncf.io/resourceName" + defaultNetAnnot = "v1.multus-cni.io/default-network" + networkAttachmentAnnot = "k8s.v1.cni.cncf.io/networks" ) // NoK8sNetworkError indicates error, no network in kubernetes @@ -80,12 +81,14 @@ func setKubeClientInfo(c *clientInfo, client KubeClient, k8sArgs *types.K8sArgs) c.Podname = string(k8sArgs.K8S_POD_NAME) } -func SetNetworkStatus(k *clientInfo, netStatus []*types.NetworkStatus) error { +func SetNetworkStatus(client KubeClient, k8sArgs *types.K8sArgs, netStatus []*types.NetworkStatus) error { - logging.Debugf("SetNetworkStatus: %v, %v", k, netStatus) - pod, err := k.Client.GetPod(k.Podnamespace, k.Podname) + logging.Debugf("SetNetworkStatus: %v, %v, %v", client, k8sArgs, netStatus) + podName := string(k8sArgs.K8S_POD_NAME) + podNamespace := string(k8sArgs.K8S_POD_NAMESPACE) + pod, err := client.GetPod(podNamespace, podName) if err != nil { - return logging.Errorf("SetNetworkStatus: failed to query the pod %v in out of cluster comm: %v", k.Podname, err) + return logging.Errorf("SetNetworkStatus: failed to query the pod %v in out of cluster comm: %v", podName, err) } var networkStatuses string @@ -101,9 +104,9 @@ func SetNetworkStatus(k *clientInfo, netStatus []*types.NetworkStatus) error { networkStatuses = fmt.Sprintf("[%s]", strings.Join(networkStatus, ",")) } - _, err = setPodNetworkAnnotation(k.Client, k.Podnamespace, pod, networkStatuses) + _, err = setPodNetworkAnnotation(client, podNamespace, pod, networkStatuses) if err != nil { - return logging.Errorf("SetNetworkStatus: failed to update the pod %v in out of cluster comm: %v", k.Podname, err) + return logging.Errorf("SetNetworkStatus: failed to update the pod %v in out of cluster comm: %v", podName, err) } return nil @@ -137,18 +140,6 @@ func setPodNetworkAnnotation(client KubeClient, namespace string, pod *v1.Pod, n return pod, nil } -func getPodNetworkAnnotation(client KubeClient, k8sArgs *types.K8sArgs) (string, string, string, error) { - var err error - - logging.Debugf("getPodNetworkAnnotation: %v, %v", client, k8sArgs) - pod, err := client.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) - if err != nil { - return "", "", "", logging.Errorf("getPodNetworkAnnotation: failed to query the pod %v in out of cluster comm: %v", string(k8sArgs.K8S_POD_NAME), err) - } - - return pod.Annotations["k8s.v1.cni.cncf.io/networks"], pod.ObjectMeta.Namespace, string(pod.UID), nil -} - func parsePodNetworkObjectName(podnetwork string) (string, string, string, error) { var netNsName string var netIfName string @@ -427,8 +418,14 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient } setKubeClientInfo(clientInfo, kubeClient, k8sArgs) + // Get the pod info. If cannot get it, we use cached delegates + pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) + if err != nil { + logging.Debugf("tryLoadK8sDelegates: Err in loading K8s cluster default network from pod annotation: %v, use cached delegates", err) + return 0, nil, nil + } - delegate, err := tryLoadK8sPodDefaultNetwork(k8sArgs, conf, kubeClient) + delegate, err := tryLoadK8sPodDefaultNetwork(kubeClient, pod, conf) if err != nil { return 0, nil, logging.Errorf("tryLoadK8sDelegates: Err in loading K8s cluster default network from pod annotation: %v", err) } @@ -438,19 +435,23 @@ func TryLoadPodDelegates(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient conf.Delegates[0] = delegate } - delegates, err := GetPodNetwork(kubeClient, k8sArgs, conf.ConfDir, conf.NamespaceIsolation) - if err != nil { - if _, ok := err.(*NoK8sNetworkError); ok { - return 0, clientInfo, nil + networks, err := GetPodNetwork(pod) + if networks != nil { + delegates, err := GetNetworkDelegates(kubeClient, pod, networks, conf.ConfDir, conf.NamespaceIsolation) + + if err != nil { + if _, ok := err.(*NoK8sNetworkError); ok { + return 0, clientInfo, nil + } + return 0, nil, logging.Errorf("Multus: Err in getting k8s network from pod: %v", err) } - return 0, nil, logging.Errorf("Multus: Err in getting k8s network from pod: %v", err) - } - if err = conf.AddDelegates(delegates); err != nil { - return 0, nil, err + if err = conf.AddDelegates(delegates); err != nil { + return 0, nil, err + } + return len(delegates), clientInfo, nil } - - return len(delegates), clientInfo, nil + return 0, clientInfo, nil } func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error) { @@ -495,17 +496,11 @@ func GetK8sClient(kubeconfig string, kubeClient KubeClient) (KubeClient, error) return &defaultKubeClient{client: client}, nil } -func GetPodNetwork(k8sclient KubeClient, k8sArgs *types.K8sArgs, confdir string, confnamespaceisolation bool) ([]*types.DelegateNetConf, error) { - logging.Debugf("GetPodNetwork: %v, %v, %v", k8sclient, k8sArgs, confdir) +func GetPodNetwork(pod *v1.Pod) ([]*types.NetworkSelectionElement, error) { + logging.Debugf("GetPodNetwork: %v", pod) - netAnnot, defaultNamespace, podID, err := getPodNetworkAnnotation(k8sclient, k8sArgs) - if err != nil { - return nil, err - } - - if err != nil { - return nil, logging.Errorf("GetK8sNetwork: failed to get resourceMap for PodUID: %v %v", podID, err) - } + netAnnot := pod.Annotations[networkAttachmentAnnot] + defaultNamespace := pod.ObjectMeta.Namespace if len(netAnnot) == 0 { return nil, &NoK8sNetworkError{"no kubernetes network found"} @@ -515,24 +510,31 @@ func GetPodNetwork(k8sclient KubeClient, k8sArgs *types.K8sArgs, confdir string, if err != nil { return nil, err } + return networks, nil +} +func GetNetworkDelegates(k8sclient KubeClient, pod *v1.Pod, networks []*types.NetworkSelectionElement, confdir string, confnamespaceIsolation bool) ([]*types.DelegateNetConf, error) { + logging.Debugf("GetNetworkDelegates: %v, %v, %v, %v, %v", k8sclient, pod, networks, confdir, confnamespaceIsolation) // resourceMap holds Pod device allocation information; only initizized if CRD contains 'resourceName' annotation. // This will only be initialized once and all delegate objects can reference this to look up device info. var resourceMap map[string]*types.ResourceInfo // Read all network objects referenced by 'networks' var delegates []*types.DelegateNetConf + defaultNamespace := pod.ObjectMeta.Namespace + + podID := pod.UID for _, net := range networks { // The pods namespace (stored as defaultNamespace, does not equal the annotation's target namespace in net.Namespace) // In the case that this is a mismatch when namespaceisolation is enabled, this should be an error. - if confnamespaceisolation { + if confnamespaceIsolation { if defaultNamespace != net.Namespace { return nil, logging.Errorf("GetPodNetwork: namespace isolation violation: podnamespace: %v / target namespace: %v", defaultNamespace, net.Namespace) } } - delegate, updatedResourceMap, err := getKubernetesDelegate(k8sclient, net, confdir, podID, resourceMap) + delegate, updatedResourceMap, err := getKubernetesDelegate(k8sclient, net, confdir, string(podID), resourceMap) if err != nil { return nil, logging.Errorf("GetPodNetwork: failed getting the delegate: %v", err) } @@ -655,28 +657,13 @@ func GetDefaultNetworks(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient return nil } -func getPodDefaultNetworkAnnotation(client KubeClient, k8sArgs *types.K8sArgs) (string, error) { - logging.Debugf("getPodDefaultNetworkAnnotation: %v, %v", client, k8sArgs) - pod, err := client.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) - if err != nil { - return "", logging.Errorf("getPodDefaultNetworkAnnotation: failed to query the pod %v in out of cluster comm: %v", string(k8sArgs.K8S_POD_NAME), err) - } - - if v, ok := pod.Annotations[defaultNetAnnot]; ok { - return v, nil - } - return "", nil -} - // tryLoadK8sPodDefaultNetwork get pod default network from annotations -func tryLoadK8sPodDefaultNetwork(k8sArgs *types.K8sArgs, conf *types.NetConf, kubeClient KubeClient) (*types.DelegateNetConf, error) { - logging.Debugf("tryLoadK8sPodDefaultNetwork: %v, %v", kubeClient, k8sArgs) +func tryLoadK8sPodDefaultNetwork(kubeClient KubeClient, pod *v1.Pod, conf *types.NetConf) (*types.DelegateNetConf, error) { + var netAnnot string + logging.Debugf("tryLoadK8sPodDefaultNetwork: %v, %v, %v", kubeClient, pod, conf) - netAnnot, err := getPodDefaultNetworkAnnotation(kubeClient, k8sArgs) - if err != nil { - return nil, logging.Errorf("tryLoadK8sPodDefaultNetwork: failed to get pod annotation: %v", err) - } - if netAnnot == "" { + netAnnot, ok := pod.Annotations[defaultNetAnnot] + if !ok { logging.Debugf("tryLoadK8sPodDefaultNetwork: Pod default network annotation is not defined") return nil, nil } diff --git a/k8sclient/k8sclient_test.go b/k8sclient/k8sclient_test.go index 323d891cd..58dbdd619 100644 --- a/k8sclient/k8sclient_test.go +++ b/k8sclient/k8sclient_test.go @@ -82,7 +82,10 @@ var _ = Describe("k8sclient operations", func() { Expect(err).NotTo(HaveOccurred()) k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) + pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) + networks, err := GetPodNetwork(pod) + Expect(err).NotTo(HaveOccurred()) + delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false) Expect(err).NotTo(HaveOccurred()) Expect(fKubeClient.PodCount).To(Equal(1)) Expect(fKubeClient.NetCount).To(Equal(2)) @@ -115,7 +118,10 @@ var _ = Describe("k8sclient operations", func() { Expect(err).NotTo(HaveOccurred()) k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) + pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) + networks, err := GetPodNetwork(pod) + Expect(err).NotTo(HaveOccurred()) + delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false) Expect(len(delegates)).To(Equal(0)) Expect(err).To(MatchError("GetPodNetwork: failed getting the delegate: getKubernetesDelegate: failed to get network resource, refer Multus README.md for the usage guide: resource not found")) }) @@ -159,7 +165,10 @@ var _ = Describe("k8sclient operations", func() { Expect(err).NotTo(HaveOccurred()) k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) + pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) + networks, err := GetPodNetwork(pod) + Expect(err).NotTo(HaveOccurred()) + delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false) Expect(err).NotTo(HaveOccurred()) Expect(fKubeClient.PodCount).To(Equal(1)) Expect(fKubeClient.NetCount).To(Equal(3)) @@ -186,8 +195,9 @@ var _ = Describe("k8sclient operations", func() { Expect(err).NotTo(HaveOccurred()) k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) - Expect(len(delegates)).To(Equal(0)) + pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) + networks, err := GetPodNetwork(pod) + Expect(len(networks)).To(Equal(0)) Expect(err).To(MatchError("parsePodNetworkAnnotation: failed to parse pod Network Attachment Selection Annotation JSON format: invalid character 'a' looking for beginning of value")) }) @@ -216,7 +226,10 @@ var _ = Describe("k8sclient operations", func() { Expect(err).NotTo(HaveOccurred()) k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) + pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) + networks, err := GetPodNetwork(pod) + Expect(err).NotTo(HaveOccurred()) + delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false) Expect(err).NotTo(HaveOccurred()) Expect(fKubeClient.PodCount).To(Equal(1)) Expect(fKubeClient.NetCount).To(Equal(2)) @@ -242,7 +255,10 @@ var _ = Describe("k8sclient operations", func() { Expect(err).NotTo(HaveOccurred()) k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) + pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) + networks, err := GetPodNetwork(pod) + Expect(err).NotTo(HaveOccurred()) + delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false) Expect(err).NotTo(HaveOccurred()) Expect(fKubeClient.PodCount).To(Equal(1)) Expect(fKubeClient.NetCount).To(Equal(1)) @@ -273,7 +289,10 @@ var _ = Describe("k8sclient operations", func() { Expect(err).NotTo(HaveOccurred()) k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - delegates, err := GetPodNetwork(kubeClient, k8sArgs, tmpDir, false) + pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) + networks, err := GetPodNetwork(pod) + Expect(err).NotTo(HaveOccurred()) + delegates, err := GetNetworkDelegates(kubeClient, pod, networks, tmpDir, false) Expect(len(delegates)).To(Equal(0)) Expect(err).To(MatchError(fmt.Sprintf("GetPodNetwork: failed getting the delegate: cniConfigFromNetworkResource: err in getCNIConfigFromFile: Error loading CNI config file %s: error parsing configuration: invalid character 'a' looking for beginning of value", net2Name))) }) @@ -584,7 +603,10 @@ var _ = Describe("k8sclient operations", func() { k8sArgs, err := GetK8sArgs(args) Expect(err).NotTo(HaveOccurred()) - _, err = GetPodNetwork(kubeClient, k8sArgs, tmpDir, netConf.NamespaceIsolation) + pod, err := kubeClient.GetPod(string(k8sArgs.K8S_POD_NAMESPACE), string(k8sArgs.K8S_POD_NAME)) + networks, err := GetPodNetwork(pod) + Expect(err).NotTo(HaveOccurred()) + _, err = GetNetworkDelegates(kubeClient, pod, networks, tmpDir, netConf.NamespaceIsolation) Expect(err).To(HaveOccurred()) Expect(err).To(MatchError("GetPodNetwork: namespace isolation violation: podnamespace: test / target namespace: kube-system")) diff --git a/multus/multus.go b/multus/multus.go index d12c4250f..e2e346281 100644 --- a/multus/multus.go +++ b/multus/multus.go @@ -74,12 +74,12 @@ func saveScratchNetConf(containerID, dataDir string, netconf []byte) error { return err } -func consumeScratchNetConf(containerID, dataDir string) ([]byte, error) { +func consumeScratchNetConf(containerID, dataDir string) ([]byte, string, error) { logging.Debugf("consumeScratchNetConf: %s, %s", containerID, dataDir) path := filepath.Join(dataDir, containerID) - defer os.Remove(path) - return ioutil.ReadFile(path) + b, err := ioutil.ReadFile(path) + return b, path, err } func getIfname(delegate *types.DelegateNetConf, argif string, idx int) string { @@ -105,12 +105,23 @@ func saveDelegates(containerID, dataDir string, delegates []*types.DelegateNetCo } if err = saveScratchNetConf(containerID, dataDir, delegatesBytes); err != nil { - return logging.Errorf("error in saving the delegates : %v", err) + return logging.Errorf("error in saving the delegates : %v", err) } return err } +func deleteDelegates(containerID, dataDir string) error { + logging.Debugf("deleteDelegates: %s, %s", containerID, dataDir) + + path := filepath.Join(dataDir, containerID) + if err := os.Remove(path); err != nil { + return logging.Errorf("error in deleting the delegates : %v", err) + } + + return nil +} + func validateIfName(nsname string, ifname string) error { logging.Debugf("validateIfName: %s, %s", nsname, ifname) podNs, err := ns.GetNS(nsname) @@ -282,8 +293,8 @@ func delPlugins(exec invoke.Exec, argIfname string, delegates []*types.DelegateN } func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cnitypes.Result, error) { - logging.Debugf("cmdAdd: %v, %v, %v", args, exec, kubeClient) n, err := types.LoadNetConf(args.StdinData) + logging.Debugf("cmdAdd: %v, %v, %v", args, exec, kubeClient) if err != nil { return nil, logging.Errorf("err in loading netconf: %v", err) } @@ -312,16 +323,14 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cn n.Delegates[0].MasterPlugin = true } - numK8sDelegates, kc, err := k8s.TryLoadPodDelegates(k8sArgs, n, kubeClient) + _, kc, err := k8s.TryLoadPodDelegates(k8sArgs, n, kubeClient) if err != nil { return nil, logging.Errorf("Multus: Err in loading K8s Delegates k8s args: %v", err) } - if numK8sDelegates == 0 { - // cache the multus config if we have only Multus delegates - if err := saveDelegates(args.ContainerID, n.CNIDir, n.Delegates); err != nil { - return nil, logging.Errorf("Multus: Err in saving the delegates: %v", err) - } + // cache the multus config + if err := saveDelegates(args.ContainerID, n.CNIDir, n.Delegates); err != nil { + return nil, logging.Errorf("Multus: Err in saving the delegates: %v", err) } var result, tmpResult cnitypes.Result @@ -363,7 +372,7 @@ func cmdAdd(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cn //set the network status annotation in apiserver, only in case Multus as kubeconfig if n.Kubeconfig != "" && kc != nil { if !types.CheckSystemNamespaces(kc.Podnamespace, n.SystemNamespaces) { - err = k8s.SetNetworkStatus(kc, netStatus) + err = k8s.SetNetworkStatus(kubeClient, k8sArgs, netStatus) if err != nil { return nil, logging.Errorf("Multus: Err set the networks status: %v", err) } @@ -386,8 +395,8 @@ func cmdGet(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) (cn } func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) error { - logging.Debugf("cmdDel: %v, %v, %v", args, exec, kubeClient) in, err := types.LoadNetConf(args.StdinData) + logging.Debugf("cmdDel: %v, %v, %v", args, exec, kubeClient) if err != nil { return err } @@ -397,7 +406,7 @@ func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) err } netns, err := ns.GetNS(args.Netns) if err != nil { - // if NetNs is passed down by the Cloud Orchestration Engine, or if it called multiple times + // if NetNs is passed down by the Cloud Orchestration Engine, or if it called multiple times // so don't return an error if the device is already removed. // https://github.com/kubernetes/kubernetes/issues/43014#issuecomment-287164444 _, ok := err.(ns.NSPathNotExistErr) @@ -417,42 +426,49 @@ func cmdDel(args *skel.CmdArgs, exec invoke.Exec, kubeClient k8s.KubeClient) err return logging.Errorf("Multus: Err in getting k8s args: %v", err) } - if in.ClusterNetwork != "" { - err = k8s.GetDefaultNetworks(k8sArgs, in, kubeClient) - if err != nil { - return logging.Errorf("Multus: Failed to get clusterNetwork/defaultNetworks: %v", err) + // Read the cache to get delegates json for the pod + netconfBytes, path, err := consumeScratchNetConf(args.ContainerID, in.CNIDir) + if err != nil { + // Fetch delegates again if cache is not exist + if os.IsNotExist(err) { + if in.ClusterNetwork != "" { + err = k8s.GetDefaultNetworks(k8sArgs, in, kubeClient) + if err != nil { + return logging.Errorf("Multus: Failed to get clusterNetwork/defaultNetworks: %v", err) + } + // First delegate is always the master plugin + in.Delegates[0].MasterPlugin = true + } + + // Get pod annotation and so on + _, _, err := k8s.TryLoadPodDelegates(k8sArgs, in, kubeClient) + if err != nil { + if len(in.Delegates) == 0 { + // No delegate available so send error + return logging.Errorf("Multus: failed to get delegates: %v", err) + } + // Get clusterNetwork before, so continue to delete + logging.Errorf("Multus: failed to get delegates: %v, but continue to delete clusterNetwork", err) + } + } else { + return logging.Errorf("Multus: Err in reading the delegates: %v", err) + } + } else { + defer os.Remove(path) + if err := json.Unmarshal(netconfBytes, &in.Delegates); err != nil { + return logging.Errorf("Multus: failed to load netconf: %v", err) } // First delegate is always the master plugin in.Delegates[0].MasterPlugin = true } - numK8sDelegates, kc, err := k8s.TryLoadPodDelegates(k8sArgs, in, kubeClient) - if err != nil { - return err - } - - if numK8sDelegates == 0 { - // re-read the scratch multus config if we have only Multus delegates - netconfBytes, err := consumeScratchNetConf(args.ContainerID, in.CNIDir) - if err != nil { - if os.IsNotExist(err) { - // Per spec should ignore error if resources are missing / already removed - return nil - } - return logging.Errorf("Multus: Err in reading the delegates: %v", err) - } - - if err := json.Unmarshal(netconfBytes, &in.Delegates); err != nil { - return logging.Errorf("Multus: failed to load netconf: %v", err) - } - } - - //unset the network status annotation in apiserver, only in case Multus as kubeconfig - if in.Kubeconfig != "" && kc != nil { - if !types.CheckSystemNamespaces(kc.Podnamespace, in.SystemNamespaces) { - err := k8s.SetNetworkStatus(kc, nil) + // unset the network status annotation in apiserver, only in case Multus as kubeconfig + if in.Kubeconfig != "" { + if !types.CheckSystemNamespaces(string(k8sArgs.K8S_POD_NAMESPACE), in.SystemNamespaces) { + err := k8s.SetNetworkStatus(kubeClient, k8sArgs, nil) if err != nil { - return logging.Errorf("Multus: Err unset the networks status: %v", err) + // error happen but continue to delete + logging.Errorf("Multus: Err unset the networks status: %v", err) } } } diff --git a/multus/multus_test.go b/multus/multus_test.go index 0aa8f61df..961b383a2 100644 --- a/multus/multus_test.go +++ b/multus/multus_test.go @@ -373,7 +373,7 @@ var _ = Describe("multus operations", func() { result, err := cmdAdd(args, fExec, fKubeClient) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) - Expect(fKubeClient.PodCount).To(Equal(3)) + Expect(fKubeClient.PodCount).To(Equal(2)) Expect(fKubeClient.NetCount).To(Equal(2)) r := result.(*types020.Result) // plugin 1 is the masterplugin @@ -452,13 +452,81 @@ var _ = Describe("multus operations", func() { result, err := cmdAdd(args, fExec, fKubeClient) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) - Expect(fKubeClient.PodCount).To(Equal(3)) + Expect(fKubeClient.PodCount).To(Equal(2)) Expect(fKubeClient.NetCount).To(Equal(2)) r := result.(*types020.Result) // plugin 1 is the masterplugin Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) }) + It("executes kubernetes networks and delete it after pod removal", func() { + fakePod := testhelpers.NewFakePod("testpod", "net1", "") + net1 := `{ + "name": "net1", + "type": "mynet", + "cniVersion": "0.2.0" +}` + args := &skel.CmdArgs{ + ContainerID: "123456789", + Netns: testNS.Path(), + IfName: "eth0", + Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace), + StdinData: []byte(`{ + "name": "node-cni-network", + "type": "multus", + "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", + "delegates": [{ + "name": "weave1", + "cniVersion": "0.2.0", + "type": "weave-net" + }] +}`), + } + + fExec := &fakeExec{} + expectedResult1 := &types020.Result{ + CNIVersion: "0.2.0", + IP4: &types020.IPConfig{ + IP: *testhelpers.EnsureCIDR("1.1.1.2/24"), + }, + } + expectedConf1 := `{ + "name": "weave1", + "cniVersion": "0.2.0", + "type": "weave-net" +}` + fExec.addPlugin(nil, "eth0", expectedConf1, expectedResult1, nil) + fExec.addPlugin(nil, "net1", net1, &types020.Result{ + CNIVersion: "0.2.0", + IP4: &types020.IPConfig{ + IP: *testhelpers.EnsureCIDR("1.1.1.3/24"), + }, + }, nil) + + fKubeClient := testhelpers.NewFakeKubeClient() + fKubeClient.AddPod(fakePod) + fKubeClient.AddNetConfig(fakePod.ObjectMeta.Namespace, "net1", net1) + + os.Setenv("CNI_COMMAND", "ADD") + os.Setenv("CNI_IFNAME", "eth0") + result, err := cmdAdd(args, fExec, fKubeClient) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) + Expect(fKubeClient.PodCount).To(Equal(2)) + Expect(fKubeClient.NetCount).To(Equal(1)) + r := result.(*types020.Result) + // plugin 1 is the masterplugin + Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) + + os.Setenv("CNI_COMMAND", "DEL") + os.Setenv("CNI_IFNAME", "eth0") + // set fKubeClient to nil to emulate no pod info + fKubeClient.DeletePod(fakePod) + err = cmdDel(args, fExec, fKubeClient) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) + }) + It("ensure delegates get portmap runtime config", func() { args := &skel.CmdArgs{ ContainerID: "123456789", @@ -525,9 +593,9 @@ var _ = Describe("multus operations", func() { StdinData: []byte(`{ "name": "node-cni-network", "type": "multus", - "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", - "defaultNetworks": [], - "clusterNetwork": "net1", + "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", + "defaultNetworks": [], + "clusterNetwork": "net1", "delegates": [] }`), } @@ -544,7 +612,7 @@ var _ = Describe("multus operations", func() { result, err := cmdAdd(args, fExec, fKubeClient) Expect(err).NotTo(HaveOccurred()) Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) - Expect(fKubeClient.PodCount).To(Equal(3)) + Expect(fKubeClient.PodCount).To(Equal(2)) Expect(fKubeClient.NetCount).To(Equal(2)) r := result.(*types020.Result) Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) @@ -555,4 +623,163 @@ var _ = Describe("multus operations", func() { Expect(err).NotTo(HaveOccurred()) Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) }) + + It("Verify the cache is created in dataDir", func() { + tmpCNIDir := tmpDir + "/cniData" + err := os.Mkdir(tmpCNIDir, 0777) + Expect(err).NotTo(HaveOccurred()) + + fakePod := testhelpers.NewFakePod("testpod", "net1", "") + net1 := `{ + "name": "net1", + "type": "mynet", + "cniVersion": "0.2.0" +}` + args := &skel.CmdArgs{ + ContainerID: "123456789", + Netns: testNS.Path(), + IfName: "eth0", + Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace), + StdinData: []byte(fmt.Sprintf(`{ + "name": "node-cni-network", + "type": "multus", + "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", + "cniDir": "%s", + "delegates": [{ + "name": "weave1", + "cniVersion": "0.2.0", + "type": "weave-net" + }] +}`, tmpCNIDir)), + } + + fExec := &fakeExec{} + expectedResult1 := &types020.Result{ + CNIVersion: "0.2.0", + IP4: &types020.IPConfig{ + IP: *testhelpers.EnsureCIDR("1.1.1.2/24"), + }, + } + expectedConf1 := `{ + "name": "weave1", + "cniVersion": "0.2.0", + "type": "weave-net" +}` + fExec.addPlugin(nil, "eth0", expectedConf1, expectedResult1, nil) + fExec.addPlugin(nil, "net1", net1, &types020.Result{ + CNIVersion: "0.2.0", + IP4: &types020.IPConfig{ + IP: *testhelpers.EnsureCIDR("1.1.1.3/24"), + }, + }, nil) + + fKubeClient := testhelpers.NewFakeKubeClient() + fKubeClient.AddPod(fakePod) + fKubeClient.AddNetConfig(fakePod.ObjectMeta.Namespace, "net1", net1) + os.Setenv("CNI_COMMAND", "ADD") + os.Setenv("CNI_IFNAME", "eth0") + result, err := cmdAdd(args, fExec, fKubeClient) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) + Expect(fKubeClient.PodCount).To(Equal(2)) + Expect(fKubeClient.NetCount).To(Equal(1)) + r := result.(*types020.Result) + // plugin 1 is the masterplugin + Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) + + By("Verify cache file existence") + cacheFilePath := fmt.Sprintf("%s/%s", tmpCNIDir, "123456789") + _, err = os.Stat(cacheFilePath) + Expect(err).NotTo(HaveOccurred()) + + By("Delete and check net count is not incremented") + os.Setenv("CNI_COMMAND", "DEL") + os.Setenv("CNI_IFNAME", "eth0") + err = cmdDel(args, fExec, fKubeClient) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) + Expect(fKubeClient.PodCount).To(Equal(3)) + Expect(fKubeClient.NetCount).To(Equal(1)) + }) + + It("Delete pod without cache", func() { + tmpCNIDir := tmpDir + "/cniData" + err := os.Mkdir(tmpCNIDir, 0777) + Expect(err).NotTo(HaveOccurred()) + + fakePod := testhelpers.NewFakePod("testpod", "net1", "") + net1 := `{ + "name": "net1", + "type": "mynet", + "cniVersion": "0.2.0" +}` + args := &skel.CmdArgs{ + ContainerID: "123456789", + Netns: testNS.Path(), + IfName: "eth0", + Args: fmt.Sprintf("K8S_POD_NAME=%s;K8S_POD_NAMESPACE=%s", fakePod.ObjectMeta.Name, fakePod.ObjectMeta.Namespace), + StdinData: []byte(fmt.Sprintf(`{ + "name": "node-cni-network", + "type": "multus", + "kubeconfig": "/etc/kubernetes/node-kubeconfig.yaml", + "cniDir": "%s", + "delegates": [{ + "name": "weave1", + "cniVersion": "0.2.0", + "type": "weave-net" + }] +}`, tmpCNIDir)), + } + + fExec := &fakeExec{} + expectedResult1 := &types020.Result{ + CNIVersion: "0.2.0", + IP4: &types020.IPConfig{ + IP: *testhelpers.EnsureCIDR("1.1.1.2/24"), + }, + } + expectedConf1 := `{ + "name": "weave1", + "cniVersion": "0.2.0", + "type": "weave-net" +}` + fExec.addPlugin(nil, "eth0", expectedConf1, expectedResult1, nil) + fExec.addPlugin(nil, "net1", net1, &types020.Result{ + CNIVersion: "0.2.0", + IP4: &types020.IPConfig{ + IP: *testhelpers.EnsureCIDR("1.1.1.3/24"), + }, + }, nil) + + fKubeClient := testhelpers.NewFakeKubeClient() + fKubeClient.AddPod(fakePod) + fKubeClient.AddNetConfig(fakePod.ObjectMeta.Namespace, "net1", net1) + os.Setenv("CNI_COMMAND", "ADD") + os.Setenv("CNI_IFNAME", "eth0") + result, err := cmdAdd(args, fExec, fKubeClient) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.addIndex).To(Equal(len(fExec.plugins))) + Expect(fKubeClient.PodCount).To(Equal(2)) + Expect(fKubeClient.NetCount).To(Equal(1)) + r := result.(*types020.Result) + // plugin 1 is the masterplugin + Expect(reflect.DeepEqual(r, expectedResult1)).To(BeTrue()) + + By("Verify cache file existence") + cacheFilePath := fmt.Sprintf("%s/%s", tmpCNIDir, "123456789") + _, err = os.Stat(cacheFilePath) + Expect(err).NotTo(HaveOccurred()) + + err = os.Remove(cacheFilePath) + Expect(err).NotTo(HaveOccurred()) + + By("Delete and check pod/net count is incremented") + os.Setenv("CNI_COMMAND", "DEL") + os.Setenv("CNI_IFNAME", "eth0") + err = cmdDel(args, fExec, fKubeClient) + Expect(err).NotTo(HaveOccurred()) + Expect(fExec.delIndex).To(Equal(len(fExec.plugins))) + Expect(fKubeClient.PodCount).To(Equal(4)) + Expect(fKubeClient.NetCount).To(Equal(2)) + }) }) diff --git a/testing/testing.go b/testing/testing.go index 56bd7f792..35652e121 100644 --- a/testing/testing.go +++ b/testing/testing.go @@ -103,6 +103,11 @@ func (f *FakeKubeClient) AddPod(pod *v1.Pod) { f.pods[key] = pod } +func (f *FakeKubeClient) DeletePod(pod *v1.Pod) { + key := fmt.Sprintf("%s/%s", pod.ObjectMeta.Namespace, pod.ObjectMeta.Name) + delete(f.pods, key) +} + func NewFakePod(name string, netAnnotation string, defaultNetAnnotation string) *v1.Pod { pod := &v1.Pod{ ObjectMeta: metav1.ObjectMeta{