From 5389a74388388fa48471a12a611a86461add9e31 Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Wed, 24 Aug 2016 16:21:03 -0700 Subject: [PATCH] kubenet: SyncHostports for both running and ready to run pods. This fixes the race that happens in rktnetes when pod B invokes 'kubenet.SetUpPod()' before another pod A becomes actually running. The second 'kubenet.SetUpPod()' call will not pick up the pod A and thus overwrite the host port iptable rules that breaks pod A. This PR fixes the case by listing all 'active pods' (all non-exited pods) instead of only running pods. --- pkg/kubelet/network/hostport/hostport.go | 27 +++++------ pkg/kubelet/network/hostport/hostport_test.go | 6 +-- pkg/kubelet/network/hostport/testing/fake.go | 8 ++-- pkg/kubelet/network/kubenet/kubenet_linux.go | 46 +++++++++++++------ 4 files changed, 54 insertions(+), 33 deletions(-) diff --git a/pkg/kubelet/network/hostport/hostport.go b/pkg/kubelet/network/hostport/hostport.go index bf8e498975c..35d4ac2c8ea 100644 --- a/pkg/kubelet/network/hostport/hostport.go +++ b/pkg/kubelet/network/hostport/hostport.go @@ -42,11 +42,11 @@ const ( ) type HostportHandler interface { - OpenPodHostportsAndSync(newPod *RunningPod, natInterfaceName string, runningPods []*RunningPod) error - SyncHostports(natInterfaceName string, runningPods []*RunningPod) error + OpenPodHostportsAndSync(newPod *ActivePod, natInterfaceName string, activePods []*ActivePod) error + SyncHostports(natInterfaceName string, activePods []*ActivePod) error } -type RunningPod struct { +type ActivePod struct { Pod *api.Pod IP net.IP } @@ -131,9 +131,9 @@ func (h *handler) openHostports(pod *api.Pod) error { // gatherAllHostports returns all hostports that should be presented on node, // given the list of pods running on that node and ignoring host network // pods (which don't need hostport <-> container port mapping). -func gatherAllHostports(runningPods []*RunningPod) (map[api.ContainerPort]targetPod, error) { +func gatherAllHostports(activePods []*ActivePod) (map[api.ContainerPort]targetPod, error) { podHostportMap := make(map[api.ContainerPort]targetPod) - for _, r := range runningPods { + for _, r := range activePods { if r.IP.To4() == nil { return nil, fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod)) } @@ -171,36 +171,37 @@ func hostportChainName(cp api.ContainerPort, podFullName string) utiliptables.Ch } // OpenPodHostportsAndSync opens hostports for a new pod, gathers all hostports on -// node, sets up iptables rules enable them. And finally clean up stale hostports -func (h *handler) OpenPodHostportsAndSync(newPod *RunningPod, natInterfaceName string, runningPods []*RunningPod) error { +// node, sets up iptables rules enable them. And finally clean up stale hostports. +// 'newPod' must also be present in 'activePods'. +func (h *handler) OpenPodHostportsAndSync(newPod *ActivePod, natInterfaceName string, activePods []*ActivePod) error { // try to open pod host port if specified if err := h.openHostports(newPod.Pod); err != nil { return err } - // Add the new pod to running pods if it's not running already (e.g. in rkt's case). + // Add the new pod to active pods if it's not present. var found bool - for _, p := range runningPods { + for _, p := range activePods { if p.Pod.UID == newPod.Pod.UID { found = true break } } if !found { - runningPods = append(runningPods, newPod) + activePods = append(activePods, newPod) } - return h.SyncHostports(natInterfaceName, runningPods) + return h.SyncHostports(natInterfaceName, activePods) } // SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports -func (h *handler) SyncHostports(natInterfaceName string, runningPods []*RunningPod) error { +func (h *handler) SyncHostports(natInterfaceName string, activePods []*ActivePod) error { start := time.Now() defer func() { glog.V(4).Infof("syncHostportsRules took %v", time.Since(start)) }() - containerPortMap, err := gatherAllHostports(runningPods) + containerPortMap, err := gatherAllHostports(activePods) if err != nil { return err } diff --git a/pkg/kubelet/network/hostport/hostport_test.go b/pkg/kubelet/network/hostport/hostport_test.go index cbf0e7aae5a..0fc4cf85e17 100644 --- a/pkg/kubelet/network/hostport/hostport_test.go +++ b/pkg/kubelet/network/hostport/hostport_test.go @@ -158,7 +158,7 @@ func TestOpenPodHostports(t *testing.T) { }, } - runningPods := make([]*RunningPod, 0) + activePods := make([]*ActivePod, 0) // Fill in any match rules missing chain names for _, test := range tests { @@ -179,13 +179,13 @@ func TestOpenPodHostports(t *testing.T) { } } } - runningPods = append(runningPods, &RunningPod{ + activePods = append(activePods, &ActivePod{ Pod: test.pod, IP: net.ParseIP(test.ip), }) } - err := h.OpenPodHostportsAndSync(&RunningPod{Pod: tests[0].pod, IP: net.ParseIP(tests[0].ip)}, "br0", runningPods) + err := h.OpenPodHostportsAndSync(&ActivePod{Pod: tests[0].pod, IP: net.ParseIP(tests[0].ip)}, "br0", activePods) if err != nil { t.Fatalf("Failed to OpenPodHostportsAndSync: %v", err) } diff --git a/pkg/kubelet/network/hostport/testing/fake.go b/pkg/kubelet/network/hostport/testing/fake.go index 60c8e7193be..561a141f776 100644 --- a/pkg/kubelet/network/hostport/testing/fake.go +++ b/pkg/kubelet/network/hostport/testing/fake.go @@ -29,12 +29,12 @@ func NewFakeHostportHandler() hostport.HostportHandler { return &fakeHandler{} } -func (h *fakeHandler) OpenPodHostportsAndSync(newPod *hostport.RunningPod, natInterfaceName string, runningPods []*hostport.RunningPod) error { - return h.SyncHostports(natInterfaceName, append(runningPods, newPod)) +func (h *fakeHandler) OpenPodHostportsAndSync(newPod *hostport.ActivePod, natInterfaceName string, activePods []*hostport.ActivePod) error { + return h.SyncHostports(natInterfaceName, activePods) } -func (h *fakeHandler) SyncHostports(natInterfaceName string, runningPods []*hostport.RunningPod) error { - for _, r := range runningPods { +func (h *fakeHandler) SyncHostports(natInterfaceName string, activePods []*hostport.ActivePod) error { + for _, r := range activePods { if r.IP.To4() == nil { return fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod)) } diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 0ca229ff34c..20dbac63c64 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -45,8 +45,9 @@ import ( utilsets "k8s.io/kubernetes/pkg/util/sets" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" - "k8s.io/kubernetes/pkg/kubelet/network/hostport" "strconv" + + "k8s.io/kubernetes/pkg/kubelet/network/hostport" ) const ( @@ -394,13 +395,13 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube plugin.podIPs[id] = ip4.String() // Open any hostports the pod's containers want - runningPods, err := plugin.getRunningPods() + activePods, err := plugin.getActivePods() if err != nil { return err } - newPod := &hostport.RunningPod{Pod: pod, IP: ip4} - if err := plugin.hostportHandler.OpenPodHostportsAndSync(newPod, BridgeName, runningPods); err != nil { + newPod := &hostport.ActivePod{Pod: pod, IP: ip4} + if err := plugin.hostportHandler.OpenPodHostportsAndSync(newPod, BridgeName, activePods); err != nil { return err } @@ -468,9 +469,9 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k } } - runningPods, err := plugin.getRunningPods() + activePods, err := plugin.getActivePods() if err == nil { - err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods) + err = plugin.hostportHandler.SyncHostports(BridgeName, activePods) } if err != nil { errList = append(errList, err) @@ -571,16 +572,20 @@ func (plugin *kubenetNetworkPlugin) checkCNIPluginInDir(dir string) bool { return true } -// Returns a list of pods running on this node and each pod's IP address. Assumes -// PodSpecs retrieved from the runtime include the name and ID of containers in +// Returns a list of pods running or ready to run on this node and each pod's IP address. +// Assumes PodSpecs retrieved from the runtime include the name and ID of containers in // each pod. -func (plugin *kubenetNetworkPlugin) getRunningPods() ([]*hostport.RunningPod, error) { - pods, err := plugin.host.GetRuntime().GetPods(false) +func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, error) { + pods, err := plugin.host.GetRuntime().GetPods(true) if err != nil { return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err) } - runningPods := make([]*hostport.RunningPod, 0) + activePods := make([]*hostport.ActivePod, 0) for _, p := range pods { + if podIsExited(p) { + continue + } + containerID, err := plugin.host.GetRuntime().GetPodContainerID(p) if err != nil { continue @@ -594,13 +599,28 @@ func (plugin *kubenetNetworkPlugin) getRunningPods() ([]*hostport.RunningPod, er continue } if pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name); ok { - runningPods = append(runningPods, &hostport.RunningPod{ + activePods = append(activePods, &hostport.ActivePod{ Pod: pod, IP: podIP, }) } } - return runningPods, nil + return activePods, nil +} + +// podIsExited returns true if the pod is exited (all containers inside are exited). +func podIsExited(p *kubecontainer.Pod) bool { + for _, c := range p.Containers { + if c.State != kubecontainer.ContainerStateExited { + return false + } + } + for _, c := range p.Sandboxes { + if c.State != kubecontainer.ContainerStateExited { + return false + } + } + return true } func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) {