From a519e8a403f4a9106bfe729ab685e84daea26c11 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 13 Jun 2016 17:19:04 -0500 Subject: [PATCH] kubenet: clean up networking when setup errors occur Relying on the runtime to later call cleanup is fragile, so make sure that everything gets nicely cleaned up when setup errors occur. --- pkg/kubelet/network/kubenet/kubenet_linux.go | 132 ++++++++++++------- 1 file changed, 83 insertions(+), 49 deletions(-) diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 3e942234f32..8ced3032dd6 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -31,6 +31,7 @@ import ( "github.com/golang/glog" "github.com/vishvananda/netlink" "github.com/vishvananda/netlink/nl" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/componentconfig" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" @@ -290,29 +291,7 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int { return utilsets.NewInt(network.NET_PLUGIN_CAPABILITY_SHAPING) } -func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { - plugin.mu.Lock() - defer plugin.mu.Unlock() - - start := time.Now() - defer func() { - glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name) - }() - - pod, ok := plugin.host.GetPodByName(namespace, name) - if !ok { - return fmt.Errorf("pod %q cannot be found", name) - } - - ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) - if err != nil { - return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) - } - - if err := plugin.Status(); err != nil { - return fmt.Errorf("Kubenet cannot SetUpPod: %v", err) - } - +func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *api.Pod) error { // Bring up container loopback interface if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil { return err @@ -348,6 +327,10 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k // initialization shaper := plugin.shaper() + ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) + if err != nil { + return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) + } if egress != nil || ingress != nil { if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil { return fmt.Errorf("Failed to add pod to shaper: %v", err) @@ -358,16 +341,86 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k // Open any hostports the pod's containers want runningPods, err := plugin.getRunningPods() - if err == nil { - err = plugin.hostportHandler.OpenPodHostportsAndSync(pod, BridgeName, runningPods) + if err != nil { + return err + } + if err := plugin.hostportHandler.OpenPodHostportsAndSync(pod, BridgeName, runningPods); err != nil { + return err + } + + return nil +} + +func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { + plugin.mu.Lock() + defer plugin.mu.Unlock() + + start := time.Now() + defer func() { + glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name) + }() + + pod, ok := plugin.host.GetPodByName(namespace, name) + if !ok { + return fmt.Errorf("pod %q cannot be found", name) + } + + if err := plugin.Status(); err != nil { + return fmt.Errorf("Kubenet cannot SetUpPod: %v", err) + } + + if err := plugin.setup(namespace, name, id, pod); err != nil { + // Make sure everything gets cleaned up on errors + podIP, _ := plugin.podIPs[id] + if err := plugin.teardown(namespace, name, id, podIP); err != nil { + // Not a hard error or warning + glog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err) + } + return err } // Need to SNAT outbound traffic from cluster - if err = plugin.ensureMasqRule(); err != nil { + if err := plugin.ensureMasqRule(); err != nil { glog.Errorf("Failed to ensure MASQ rule: %v", err) } - return err + return nil +} + +// Tears down as much of a pod's network as it can even if errors occur. Returns +// an aggregate error composed of all errors encountered during the teardown. +func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id kubecontainer.ContainerID, podIP string) error { + errList := []error{} + + if podIP != "" { + glog.V(5).Infof("Removing pod IP %s from shaper", podIP) + // shaper wants /32 + if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil { + // Possible bandwidth shaping wasn't enabled for this pod anyways + glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err) + } + + delete(plugin.podIPs, id) + } + + if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil { + // This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution. + if podIP != "" { + glog.Warningf("Failed to delete container from kubenet: %v", err) + } else { + errList = append(errList, err) + } + } + + runningPods, err := plugin.getRunningPods() + if err == nil { + err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods) + } + if err != nil { + errList = append(errList, err) + } + + return utilerrors.NewAggregate(errList) } func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error { @@ -384,36 +437,17 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i } // no cached IP is Ok during teardown - podIP, hasIP := plugin.podIPs[id] - if hasIP { - glog.V(5).Infof("Removing pod IP %s from shaper", podIP) - // shaper wants /32 - if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil { - // Possible bandwidth shaping wasn't enabled for this pod anyways - glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err) - } - } - if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil { - // This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution. - if !hasIP { - glog.Warningf("Failed to delete container from kubenet: %v", err) - return nil - } + podIP, _ := plugin.podIPs[id] + if err := plugin.teardown(namespace, name, id, podIP); err != nil { return err } - delete(plugin.podIPs, id) - - runningPods, err := plugin.getRunningPods() - if err == nil { - err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods) - } // Need to SNAT outbound traffic from cluster if err := plugin.ensureMasqRule(); err != nil { glog.Errorf("Failed to ensure MASQ rule: %v", err) } - return err + return nil } // TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin.