Merge pull request #31388 from yifan-gu/fix_kubenet_getRunningPods

Automatic merge from submit-queue

kubenet: SyncHostports for both running and ready to run pods.

This fixes the race that happens in rktnetes when pod B invokes
'kubenet.SetUpPod()' before another pod A becomes actually running.

The second 'kubenet.SetUpPod()' call will not pick up the pod A
and thus overwrite the host port iptable rules that breaks pod A.

This PR fixes the case by listing all 'active pods' (all non-exited
pods) instead of only running pods.

Fix https://github.com/kubernetes/kubernetes/issues/27975 

Originally discussed in https://github.com/kubernetes/kubernetes/pull/27914#issuecomment-228140108

cc @euank @freehan @dcbw
This commit is contained in:
Kubernetes Submit Queue 2016-09-20 04:24:41 -07:00 committed by GitHub
commit 1070a51830
4 changed files with 54 additions and 33 deletions

View File

@ -42,11 +42,11 @@ const (
)
type HostportHandler interface {
OpenPodHostportsAndSync(newPod *RunningPod, natInterfaceName string, runningPods []*RunningPod) error
SyncHostports(natInterfaceName string, runningPods []*RunningPod) error
OpenPodHostportsAndSync(newPod *ActivePod, natInterfaceName string, activePods []*ActivePod) error
SyncHostports(natInterfaceName string, activePods []*ActivePod) error
}
type RunningPod struct {
type ActivePod struct {
Pod *api.Pod
IP net.IP
}
@ -131,9 +131,9 @@ func (h *handler) openHostports(pod *api.Pod) error {
// gatherAllHostports returns all hostports that should be presented on node,
// given the list of pods running on that node and ignoring host network
// pods (which don't need hostport <-> container port mapping).
func gatherAllHostports(runningPods []*RunningPod) (map[api.ContainerPort]targetPod, error) {
func gatherAllHostports(activePods []*ActivePod) (map[api.ContainerPort]targetPod, error) {
podHostportMap := make(map[api.ContainerPort]targetPod)
for _, r := range runningPods {
for _, r := range activePods {
if r.IP.To4() == nil {
return nil, fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod))
}
@ -171,36 +171,37 @@ func hostportChainName(cp api.ContainerPort, podFullName string) utiliptables.Ch
}
// OpenPodHostportsAndSync opens hostports for a new pod, gathers all hostports on
// node, sets up iptables rules enable them. And finally clean up stale hostports
func (h *handler) OpenPodHostportsAndSync(newPod *RunningPod, natInterfaceName string, runningPods []*RunningPod) error {
// node, sets up iptables rules enable them. And finally clean up stale hostports.
// 'newPod' must also be present in 'activePods'.
func (h *handler) OpenPodHostportsAndSync(newPod *ActivePod, natInterfaceName string, activePods []*ActivePod) error {
// try to open pod host port if specified
if err := h.openHostports(newPod.Pod); err != nil {
return err
}
// Add the new pod to running pods if it's not running already (e.g. in rkt's case).
// Add the new pod to active pods if it's not present.
var found bool
for _, p := range runningPods {
for _, p := range activePods {
if p.Pod.UID == newPod.Pod.UID {
found = true
break
}
}
if !found {
runningPods = append(runningPods, newPod)
activePods = append(activePods, newPod)
}
return h.SyncHostports(natInterfaceName, runningPods)
return h.SyncHostports(natInterfaceName, activePods)
}
// SyncHostports gathers all hostports on node and setup iptables rules enable them. And finally clean up stale hostports
func (h *handler) SyncHostports(natInterfaceName string, runningPods []*RunningPod) error {
func (h *handler) SyncHostports(natInterfaceName string, activePods []*ActivePod) error {
start := time.Now()
defer func() {
glog.V(4).Infof("syncHostportsRules took %v", time.Since(start))
}()
containerPortMap, err := gatherAllHostports(runningPods)
containerPortMap, err := gatherAllHostports(activePods)
if err != nil {
return err
}

View File

@ -158,7 +158,7 @@ func TestOpenPodHostports(t *testing.T) {
},
}
runningPods := make([]*RunningPod, 0)
activePods := make([]*ActivePod, 0)
// Fill in any match rules missing chain names
for _, test := range tests {
@ -179,13 +179,13 @@ func TestOpenPodHostports(t *testing.T) {
}
}
}
runningPods = append(runningPods, &RunningPod{
activePods = append(activePods, &ActivePod{
Pod: test.pod,
IP: net.ParseIP(test.ip),
})
}
err := h.OpenPodHostportsAndSync(&RunningPod{Pod: tests[0].pod, IP: net.ParseIP(tests[0].ip)}, "br0", runningPods)
err := h.OpenPodHostportsAndSync(&ActivePod{Pod: tests[0].pod, IP: net.ParseIP(tests[0].ip)}, "br0", activePods)
if err != nil {
t.Fatalf("Failed to OpenPodHostportsAndSync: %v", err)
}

View File

@ -29,12 +29,12 @@ func NewFakeHostportHandler() hostport.HostportHandler {
return &fakeHandler{}
}
func (h *fakeHandler) OpenPodHostportsAndSync(newPod *hostport.RunningPod, natInterfaceName string, runningPods []*hostport.RunningPod) error {
return h.SyncHostports(natInterfaceName, append(runningPods, newPod))
func (h *fakeHandler) OpenPodHostportsAndSync(newPod *hostport.ActivePod, natInterfaceName string, activePods []*hostport.ActivePod) error {
return h.SyncHostports(natInterfaceName, activePods)
}
func (h *fakeHandler) SyncHostports(natInterfaceName string, runningPods []*hostport.RunningPod) error {
for _, r := range runningPods {
func (h *fakeHandler) SyncHostports(natInterfaceName string, activePods []*hostport.ActivePod) error {
for _, r := range activePods {
if r.IP.To4() == nil {
return fmt.Errorf("Invalid or missing pod %s IP", kubecontainer.GetPodFullName(r.Pod))
}

View File

@ -45,8 +45,9 @@ import (
utilsets "k8s.io/kubernetes/pkg/util/sets"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
"k8s.io/kubernetes/pkg/kubelet/network/hostport"
"strconv"
"k8s.io/kubernetes/pkg/kubelet/network/hostport"
)
const (
@ -394,13 +395,13 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
plugin.podIPs[id] = ip4.String()
// Open any hostports the pod's containers want
runningPods, err := plugin.getRunningPods()
activePods, err := plugin.getActivePods()
if err != nil {
return err
}
newPod := &hostport.RunningPod{Pod: pod, IP: ip4}
if err := plugin.hostportHandler.OpenPodHostportsAndSync(newPod, BridgeName, runningPods); err != nil {
newPod := &hostport.ActivePod{Pod: pod, IP: ip4}
if err := plugin.hostportHandler.OpenPodHostportsAndSync(newPod, BridgeName, activePods); err != nil {
return err
}
@ -468,9 +469,9 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k
}
}
runningPods, err := plugin.getRunningPods()
activePods, err := plugin.getActivePods()
if err == nil {
err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods)
err = plugin.hostportHandler.SyncHostports(BridgeName, activePods)
}
if err != nil {
errList = append(errList, err)
@ -571,16 +572,20 @@ func (plugin *kubenetNetworkPlugin) checkCNIPluginInDir(dir string) bool {
return true
}
// Returns a list of pods running on this node and each pod's IP address. Assumes
// PodSpecs retrieved from the runtime include the name and ID of containers in
// Returns a list of pods running or ready to run on this node and each pod's IP address.
// Assumes PodSpecs retrieved from the runtime include the name and ID of containers in
// each pod.
func (plugin *kubenetNetworkPlugin) getRunningPods() ([]*hostport.RunningPod, error) {
pods, err := plugin.host.GetRuntime().GetPods(false)
func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, error) {
pods, err := plugin.host.GetRuntime().GetPods(true)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err)
}
runningPods := make([]*hostport.RunningPod, 0)
activePods := make([]*hostport.ActivePod, 0)
for _, p := range pods {
if podIsExited(p) {
continue
}
containerID, err := plugin.host.GetRuntime().GetPodContainerID(p)
if err != nil {
continue
@ -594,13 +599,28 @@ func (plugin *kubenetNetworkPlugin) getRunningPods() ([]*hostport.RunningPod, er
continue
}
if pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name); ok {
runningPods = append(runningPods, &hostport.RunningPod{
activePods = append(activePods, &hostport.ActivePod{
Pod: pod,
IP: podIP,
})
}
}
return runningPods, nil
return activePods, nil
}
// podIsExited returns true if the pod is exited (all containers inside are exited).
func podIsExited(p *kubecontainer.Pod) bool {
for _, c := range p.Containers {
if c.State != kubecontainer.ContainerStateExited {
return false
}
}
for _, c := range p.Sandboxes {
if c.State != kubecontainer.ContainerStateExited {
return false
}
}
return true
}
func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) {