Merge pull request #26398 from euank/various-kubenet-fixes

Automatic merge from submit-queue

Various kubenet fixes (panics and bugs and cidrs, oh my)

This PR fixes the following issues:

1. Corrects an inverse error-check that prevented `shaper.Reset` from ever being called with a correct ip address
2. Fix an issue where `parseCIDR` would fail after a kubelet restart due to an IP being stored instead of a CIDR being stored in the cache.
3. Fix an issue where kubenet could panic in TearDownPod if it was called before SetUpPod (e.g. after a kubelet restart).. because of bug number 1, this didn't happen except in rare situations (see 2 for why such a rare situation might happen)

This adds a test, but more would definitely be useful.
The commits are also granular enough I could split this up more if desired.

I'm also not super-familiar with this code, so review and feedback would be welcome.

Testing done:
```
$ cat examples/egress/egress.yml
 apiVersion: v1
kind: Pod
metadata:
  labels:
    name: egress
  name: egress-output
  annotations: {"kubernetes.io/ingress-bandwidth": "300k"}
spec:
  restartPolicy: Never
  containers:
    - name: egress
      image: busybox
      command: ["sh", "-c", "sleep 60"]
$ cat kubelet.log
...
Running: tc filter add dev cbr0 protocol ip parent 1:0 prio 1 u32 match ip dst 10.0.0.5/32 flowid 1:1
# setup
...
Running: tc filter del dev cbr0 parent 1:proto ip prio 1 handle 800::800 u32
# teardown
```

I also did various other bits of manual testing and logging to hunt down the panic and other issues, but don't have anything to paste for that 

cc @dcbw @kubernetes/sig-network
This commit is contained in:
k8s-merge-robot 2016-05-29 04:04:22 -07:00
commit 98af443209
3 changed files with 143 additions and 59 deletions

View File

@ -0,0 +1,39 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// mock_cni is a mock of the `libcni.CNI` interface. It's a handwritten mock
// because there are only two functions to deal with.
package mock_cni
import (
"github.com/appc/cni/libcni"
"github.com/appc/cni/pkg/types"
"github.com/stretchr/testify/mock"
)
type MockCNI struct {
mock.Mock
}
func (m *MockCNI) AddNetwork(net *libcni.NetworkConfig, rt *libcni.RuntimeConf) (*types.Result, error) {
args := m.Called(net, rt)
return args.Get(0).(*types.Result), args.Error(1)
}
func (m *MockCNI) DelNetwork(net *libcni.NetworkConfig, rt *libcni.RuntimeConf) error {
args := m.Called(net, rt)
return args.Error(0)
}

View File

@ -64,19 +64,19 @@ const (
type kubenetNetworkPlugin struct {
network.NoopNetworkPlugin
host network.Host
netConfig *libcni.NetworkConfig
loConfig *libcni.NetworkConfig
cniConfig *libcni.CNIConfig
shaper bandwidth.BandwidthShaper
podCIDRs map[kubecontainer.ContainerID]string
MTU int
mu sync.Mutex //Mutex for protecting podCIDRs map and netConfig
execer utilexec.Interface
nsenterPath string
hairpinMode componentconfig.HairpinMode
hostPortMap map[hostport]closeable
iptables utiliptables.Interface
host network.Host
netConfig *libcni.NetworkConfig
loConfig *libcni.NetworkConfig
cniConfig libcni.CNI
bandwidthShaper bandwidth.BandwidthShaper
mu sync.Mutex //Mutex for protecting podIPs map, netConfig, and shaper initialization
podIPs map[kubecontainer.ContainerID]string
MTU int
execer utilexec.Interface
nsenterPath string
hairpinMode componentconfig.HairpinMode
hostPortMap map[hostport]closeable
iptables utiliptables.Interface
}
func NewPlugin() network.NetworkPlugin {
@ -86,7 +86,7 @@ func NewPlugin() network.NetworkPlugin {
iptInterface := utiliptables.New(execer, dbus, protocol)
return &kubenetNetworkPlugin{
podCIDRs: make(map[kubecontainer.ContainerID]string),
podIPs: make(map[kubecontainer.ContainerID]string),
hostPortMap: make(map[hostport]closeable),
MTU: 1460, //TODO: don't hardcode this
execer: utilexec.New(),
@ -317,10 +317,14 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
if err != nil {
return err
}
if res.IP4 == nil || res.IP4.IP.String() == "" {
if res.IP4 == nil {
return fmt.Errorf("CNI plugin reported no IPv4 address for container %v.", id)
}
plugin.podCIDRs[id] = res.IP4.IP.String()
ip4 := res.IP4.IP.IP.To4()
if ip4 == nil {
return fmt.Errorf("CNI plugin reported an invalid IPv4 address for container %v: %+v.", id, res.IP4)
}
plugin.podIPs[id] = ip4.String()
// Put the container bridge into promiscuous mode to force it to accept hairpin packets.
// TODO: Remove this once the kernel bug (#20096) is fixed.
@ -335,19 +339,13 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
}
}
// The first SetUpPod call creates the bridge; ensure shaping is enabled
if plugin.shaper == nil {
plugin.shaper = bandwidth.NewTCShaper(BridgeName)
if plugin.shaper == nil {
return fmt.Errorf("Failed to create bandwidth shaper!")
}
plugin.ensureBridgeTxQueueLen()
plugin.shaper.ReconcileInterface()
}
// The first SetUpPod call creates the bridge; get a shaper for the sake of
// initialization
shaper := plugin.shaper()
if egress != nil || ingress != nil {
ipAddr, _, _ := net.ParseCIDR(plugin.podCIDRs[id])
if err = plugin.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ipAddr.String()), egress, ingress); err != nil {
ipAddr := plugin.podIPs[id]
if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ipAddr), egress, ingress); err != nil {
return fmt.Errorf("Failed to add pod to shaper: %v", err)
}
}
@ -369,26 +367,25 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i
return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods")
}
// no cached CIDR is Ok during teardown
cidr, hasCIDR := plugin.podCIDRs[id]
if hasCIDR {
glog.V(5).Infof("Removing pod CIDR %s from shaper", cidr)
// no cached IP is Ok during teardown
podIP, hasIP := plugin.podIPs[id]
if hasIP {
glog.V(5).Infof("Removing pod IP %s from shaper", podIP)
// shaper wants /32
if addr, _, err := net.ParseCIDR(cidr); err != nil {
if err = plugin.shaper.Reset(fmt.Sprintf("%s/32", addr.String())); err != nil {
glog.Warningf("Failed to remove pod CIDR %s from shaper: %v", cidr, err)
}
if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil {
// Possible bandwidth shaping wasn't enabled for this pod anyways
glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err)
}
}
if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil {
// This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution.
if !hasCIDR {
if !hasIP {
glog.Warningf("Failed to delete container from kubenet: %v", err)
return nil
}
return err
}
delete(plugin.podCIDRs, id)
delete(plugin.podIPs, id)
plugin.syncHostportsRules()
return nil
@ -400,11 +397,8 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s
plugin.mu.Lock()
defer plugin.mu.Unlock()
// Assuming the ip of pod does not change. Try to retrieve ip from kubenet map first.
if cidr, ok := plugin.podCIDRs[id]; ok {
ip, _, err := net.ParseCIDR(strings.Trim(cidr, "\n"))
if err == nil {
return &network.PodNetworkStatus{IP: ip}, nil
}
if podIP, ok := plugin.podIPs[id]; ok {
return &network.PodNetworkStatus{IP: net.ParseIP(podIP)}, nil
}
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
@ -429,7 +423,7 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s
if err != nil {
return nil, fmt.Errorf("Kubenet failed to parse ip from output %s due to %v", output, err)
}
plugin.podCIDRs[id] = ip.String()
plugin.podIPs[id] = ip.String()
return &network.PodNetworkStatus{IP: ip}, nil
}
@ -549,7 +543,7 @@ func (plugin *kubenetNetworkPlugin) openPodHostports(pod *api.Pod) (map[hostport
//syncHostportMap syncs newly opened hostports to kubenet on successful pod setup. If pod setup failed, then clean up.
func (plugin *kubenetNetworkPlugin) syncHostportMap(id kubecontainer.ContainerID, hostportMap map[hostport]closeable) {
// if pod ip cannot be retrieved from podCIDR, then assume pod setup failed.
if _, ok := plugin.podCIDRs[id]; !ok {
if _, ok := plugin.podIPs[id]; !ok {
for hp, socket := range hostportMap {
err := socket.Close()
if err != nil {
@ -580,23 +574,18 @@ func (plugin *kubenetNetworkPlugin) gatherAllHostports() (map[api.ContainerPort]
}
}
// Assuming if kubenet has the pod's ip, the pod is alive and its host port should be presented.
cidr, ok := plugin.podCIDRs[podInfraContainerId]
podIP, ok := plugin.podIPs[podInfraContainerId]
if !ok {
// The POD has been delete. Ignore
continue
}
podIP, _, err := net.ParseCIDR(strings.Trim(cidr, "\n"))
if err != nil {
glog.V(3).Info("Failed to retrieve pod ip for %s-%s: %v", p.Namespace, p.Name, err)
continue
}
// Need the complete api.Pod object
pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name)
if ok {
for _, container := range pod.Spec.Containers {
for _, port := range container.Ports {
if port.HostPort != 0 {
podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(pod), podIP: podIP.String()}
podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(pod), podIP: podIP}
}
}
}
@ -821,3 +810,15 @@ func (plugin *kubenetNetworkPlugin) cleanupHostportMap(containerPortMap map[api.
}
}
}
// shaper retrieves the bandwidth shaper and, if it hasn't been fetched before,
// initializes it and ensures the bridge is appropriately configured
// This function should only be called while holding the `plugin.mu` lock
func (plugin *kubenetNetworkPlugin) shaper() bandwidth.BandwidthShaper {
if plugin.bandwidthShaper == nil {
plugin.bandwidthShaper = bandwidth.NewTCShaper(BridgeName)
plugin.ensureBridgeTxQueueLen()
plugin.bandwidthShaper.ReconcileInterface()
}
return plugin.bandwidthShaper
}

View File

@ -19,26 +19,36 @@ package kubenet
import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"testing"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/cni/testing"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/util/exec"
"testing"
ipttest "k8s.io/kubernetes/pkg/util/iptables/testing"
)
func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]string, execer exec.Interface, host network.Host) network.NetworkPlugin {
// test it fulfills the NetworkPlugin interface
var _ network.NetworkPlugin = &kubenetNetworkPlugin{}
func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]string, execer exec.Interface, host network.Host) *kubenetNetworkPlugin {
return &kubenetNetworkPlugin{
podCIDRs: initMap,
execer: execer,
MTU: 1460,
host: host,
podIPs: initMap,
execer: execer,
MTU: 1460,
host: host,
}
}
func TestGetPodNetworkStatus(t *testing.T) {
podIPMap := make(map[kubecontainer.ContainerID]string)
podIPMap[kubecontainer.ContainerID{ID: "1"}] = "10.245.0.2/32"
podIPMap[kubecontainer.ContainerID{ID: "2"}] = "10.245.0.3/32"
podIPMap[kubecontainer.ContainerID{ID: "1"}] = "10.245.0.2"
podIPMap[kubecontainer.ContainerID{ID: "2"}] = "10.245.0.3"
testCases := []struct {
id string
@ -111,4 +121,38 @@ func TestGetPodNetworkStatus(t *testing.T) {
}
}
// TestTeardownBeforeSetUp tests that a `TearDown` call does call
// `shaper.Reset`
func TestTeardownCallsShaper(t *testing.T) {
fexec := &exec.FakeExec{
CommandScript: []exec.FakeCommandAction{},
LookPathFunc: func(file string) (string, error) {
return fmt.Sprintf("/fake-bin/%s", file), nil
},
}
fhost := nettest.NewFakeHost(nil)
fshaper := &bandwidth.FakeShaper{}
mockcni := &mock_cni.MockCNI{}
kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost)
kubenet.cniConfig = mockcni
kubenet.iptables = ipttest.NewFake()
kubenet.bandwidthShaper = fshaper
mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil)
details := make(map[string]interface{})
details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = "10.0.0.1/24"
kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
existingContainerID := kubecontainer.BuildContainerID("docker", "123")
kubenet.podIPs[existingContainerID] = "10.0.0.1"
if err := kubenet.TearDownPod("namespace", "name", existingContainerID); err != nil {
t.Fatalf("Unexpected error in TearDownPod: %v", err)
}
assert.Equal(t, []string{"10.0.0.1/32"}, fshaper.ResetCIDRs, "shaper.Reset should have been called")
mockcni.AssertExpectations(t)
}
//TODO: add unit test for each implementation of network plugin interface