diff --git a/pkg/kubelet/network/cni/testing/mock_cni.go b/pkg/kubelet/network/cni/testing/mock_cni.go new file mode 100644 index 00000000000..622edefd577 --- /dev/null +++ b/pkg/kubelet/network/cni/testing/mock_cni.go @@ -0,0 +1,39 @@ +/* +Copyright 2014 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// mock_cni is a mock of the `libcni.CNI` interface. It's a handwritten mock +// because there are only two functions to deal with. +package mock_cni + +import ( + "github.com/appc/cni/libcni" + "github.com/appc/cni/pkg/types" + "github.com/stretchr/testify/mock" +) + +type MockCNI struct { + mock.Mock +} + +func (m *MockCNI) AddNetwork(net *libcni.NetworkConfig, rt *libcni.RuntimeConf) (*types.Result, error) { + args := m.Called(net, rt) + return args.Get(0).(*types.Result), args.Error(1) +} + +func (m *MockCNI) DelNetwork(net *libcni.NetworkConfig, rt *libcni.RuntimeConf) error { + args := m.Called(net, rt) + return args.Error(0) +} diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index cdc83c54cca..db22b3afd7b 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -64,19 +64,19 @@ const ( type kubenetNetworkPlugin struct { network.NoopNetworkPlugin - host network.Host - netConfig *libcni.NetworkConfig - loConfig *libcni.NetworkConfig - cniConfig *libcni.CNIConfig - shaper bandwidth.BandwidthShaper - podCIDRs map[kubecontainer.ContainerID]string - MTU int - mu sync.Mutex //Mutex for protecting podCIDRs map and netConfig - execer utilexec.Interface - nsenterPath string - hairpinMode componentconfig.HairpinMode - hostPortMap map[hostport]closeable - iptables utiliptables.Interface + host network.Host + netConfig *libcni.NetworkConfig + loConfig *libcni.NetworkConfig + cniConfig libcni.CNI + bandwidthShaper bandwidth.BandwidthShaper + mu sync.Mutex //Mutex for protecting podIPs map, netConfig, and shaper initialization + podIPs map[kubecontainer.ContainerID]string + MTU int + execer utilexec.Interface + nsenterPath string + hairpinMode componentconfig.HairpinMode + hostPortMap map[hostport]closeable + iptables utiliptables.Interface } func NewPlugin() network.NetworkPlugin { @@ -86,7 +86,7 @@ func NewPlugin() network.NetworkPlugin { iptInterface := utiliptables.New(execer, dbus, protocol) return &kubenetNetworkPlugin{ - podCIDRs: make(map[kubecontainer.ContainerID]string), + podIPs: make(map[kubecontainer.ContainerID]string), hostPortMap: make(map[hostport]closeable), MTU: 1460, //TODO: don't hardcode this execer: utilexec.New(), @@ -317,10 +317,14 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k if err != nil { return err } - if res.IP4 == nil || res.IP4.IP.String() == "" { + if res.IP4 == nil { return fmt.Errorf("CNI plugin reported no IPv4 address for container %v.", id) } - plugin.podCIDRs[id] = res.IP4.IP.String() + ip4 := res.IP4.IP.IP.To4() + if ip4 == nil { + return fmt.Errorf("CNI plugin reported an invalid IPv4 address for container %v: %+v.", id, res.IP4) + } + plugin.podIPs[id] = ip4.String() // Put the container bridge into promiscuous mode to force it to accept hairpin packets. // TODO: Remove this once the kernel bug (#20096) is fixed. @@ -335,19 +339,13 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k } } - // The first SetUpPod call creates the bridge; ensure shaping is enabled - if plugin.shaper == nil { - plugin.shaper = bandwidth.NewTCShaper(BridgeName) - if plugin.shaper == nil { - return fmt.Errorf("Failed to create bandwidth shaper!") - } - plugin.ensureBridgeTxQueueLen() - plugin.shaper.ReconcileInterface() - } + // The first SetUpPod call creates the bridge; get a shaper for the sake of + // initialization + shaper := plugin.shaper() if egress != nil || ingress != nil { - ipAddr, _, _ := net.ParseCIDR(plugin.podCIDRs[id]) - if err = plugin.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ipAddr.String()), egress, ingress); err != nil { + ipAddr := plugin.podIPs[id] + if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ipAddr), egress, ingress); err != nil { return fmt.Errorf("Failed to add pod to shaper: %v", err) } } @@ -369,26 +367,25 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i return fmt.Errorf("Kubenet needs a PodCIDR to tear down pods") } - // no cached CIDR is Ok during teardown - cidr, hasCIDR := plugin.podCIDRs[id] - if hasCIDR { - glog.V(5).Infof("Removing pod CIDR %s from shaper", cidr) + // no cached IP is Ok during teardown + podIP, hasIP := plugin.podIPs[id] + if hasIP { + glog.V(5).Infof("Removing pod IP %s from shaper", podIP) // shaper wants /32 - if addr, _, err := net.ParseCIDR(cidr); err != nil { - if err = plugin.shaper.Reset(fmt.Sprintf("%s/32", addr.String())); err != nil { - glog.Warningf("Failed to remove pod CIDR %s from shaper: %v", cidr, err) - } + if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil { + // Possible bandwidth shaping wasn't enabled for this pod anyways + glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err) } } if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil { // This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution. - if !hasCIDR { + if !hasIP { glog.Warningf("Failed to delete container from kubenet: %v", err) return nil } return err } - delete(plugin.podCIDRs, id) + delete(plugin.podIPs, id) plugin.syncHostportsRules() return nil @@ -400,11 +397,8 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s plugin.mu.Lock() defer plugin.mu.Unlock() // Assuming the ip of pod does not change. Try to retrieve ip from kubenet map first. - if cidr, ok := plugin.podCIDRs[id]; ok { - ip, _, err := net.ParseCIDR(strings.Trim(cidr, "\n")) - if err == nil { - return &network.PodNetworkStatus{IP: ip}, nil - } + if podIP, ok := plugin.podIPs[id]; ok { + return &network.PodNetworkStatus{IP: net.ParseIP(podIP)}, nil } netnsPath, err := plugin.host.GetRuntime().GetNetNS(id) @@ -429,7 +423,7 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s if err != nil { return nil, fmt.Errorf("Kubenet failed to parse ip from output %s due to %v", output, err) } - plugin.podCIDRs[id] = ip.String() + plugin.podIPs[id] = ip.String() return &network.PodNetworkStatus{IP: ip}, nil } @@ -549,7 +543,7 @@ func (plugin *kubenetNetworkPlugin) openPodHostports(pod *api.Pod) (map[hostport //syncHostportMap syncs newly opened hostports to kubenet on successful pod setup. If pod setup failed, then clean up. func (plugin *kubenetNetworkPlugin) syncHostportMap(id kubecontainer.ContainerID, hostportMap map[hostport]closeable) { // if pod ip cannot be retrieved from podCIDR, then assume pod setup failed. - if _, ok := plugin.podCIDRs[id]; !ok { + if _, ok := plugin.podIPs[id]; !ok { for hp, socket := range hostportMap { err := socket.Close() if err != nil { @@ -580,23 +574,18 @@ func (plugin *kubenetNetworkPlugin) gatherAllHostports() (map[api.ContainerPort] } } // Assuming if kubenet has the pod's ip, the pod is alive and its host port should be presented. - cidr, ok := plugin.podCIDRs[podInfraContainerId] + podIP, ok := plugin.podIPs[podInfraContainerId] if !ok { // The POD has been delete. Ignore continue } - podIP, _, err := net.ParseCIDR(strings.Trim(cidr, "\n")) - if err != nil { - glog.V(3).Info("Failed to retrieve pod ip for %s-%s: %v", p.Namespace, p.Name, err) - continue - } // Need the complete api.Pod object pod, ok := plugin.host.GetPodByName(p.Namespace, p.Name) if ok { for _, container := range pod.Spec.Containers { for _, port := range container.Ports { if port.HostPort != 0 { - podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(pod), podIP: podIP.String()} + podHostportMap[port] = targetPod{podFullName: kubecontainer.GetPodFullName(pod), podIP: podIP} } } } @@ -821,3 +810,15 @@ func (plugin *kubenetNetworkPlugin) cleanupHostportMap(containerPortMap map[api. } } } + +// shaper retrieves the bandwidth shaper and, if it hasn't been fetched before, +// initializes it and ensures the bridge is appropriately configured +// This function should only be called while holding the `plugin.mu` lock +func (plugin *kubenetNetworkPlugin) shaper() bandwidth.BandwidthShaper { + if plugin.bandwidthShaper == nil { + plugin.bandwidthShaper = bandwidth.NewTCShaper(BridgeName) + plugin.ensureBridgeTxQueueLen() + plugin.bandwidthShaper.ReconcileInterface() + } + return plugin.bandwidthShaper +} diff --git a/pkg/kubelet/network/kubenet/kubenet_linux_test.go b/pkg/kubelet/network/kubenet/kubenet_linux_test.go index a16e11e7fa7..051a0f2494f 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux_test.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux_test.go @@ -19,26 +19,36 @@ package kubenet import ( "fmt" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + + "testing" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/network" + "k8s.io/kubernetes/pkg/kubelet/network/cni/testing" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" + "k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/util/exec" - "testing" + ipttest "k8s.io/kubernetes/pkg/util/iptables/testing" ) -func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]string, execer exec.Interface, host network.Host) network.NetworkPlugin { +// test it fulfills the NetworkPlugin interface +var _ network.NetworkPlugin = &kubenetNetworkPlugin{} + +func newFakeKubenetPlugin(initMap map[kubecontainer.ContainerID]string, execer exec.Interface, host network.Host) *kubenetNetworkPlugin { return &kubenetNetworkPlugin{ - podCIDRs: initMap, - execer: execer, - MTU: 1460, - host: host, + podIPs: initMap, + execer: execer, + MTU: 1460, + host: host, } } func TestGetPodNetworkStatus(t *testing.T) { podIPMap := make(map[kubecontainer.ContainerID]string) - podIPMap[kubecontainer.ContainerID{ID: "1"}] = "10.245.0.2/32" - podIPMap[kubecontainer.ContainerID{ID: "2"}] = "10.245.0.3/32" + podIPMap[kubecontainer.ContainerID{ID: "1"}] = "10.245.0.2" + podIPMap[kubecontainer.ContainerID{ID: "2"}] = "10.245.0.3" testCases := []struct { id string @@ -111,4 +121,38 @@ func TestGetPodNetworkStatus(t *testing.T) { } } +// TestTeardownBeforeSetUp tests that a `TearDown` call does call +// `shaper.Reset` +func TestTeardownCallsShaper(t *testing.T) { + fexec := &exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{}, + LookPathFunc: func(file string) (string, error) { + return fmt.Sprintf("/fake-bin/%s", file), nil + }, + } + fhost := nettest.NewFakeHost(nil) + fshaper := &bandwidth.FakeShaper{} + mockcni := &mock_cni.MockCNI{} + kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost) + kubenet.cniConfig = mockcni + kubenet.iptables = ipttest.NewFake() + kubenet.bandwidthShaper = fshaper + + mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil) + + details := make(map[string]interface{}) + details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = "10.0.0.1/24" + kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details) + + existingContainerID := kubecontainer.BuildContainerID("docker", "123") + kubenet.podIPs[existingContainerID] = "10.0.0.1" + + if err := kubenet.TearDownPod("namespace", "name", existingContainerID); err != nil { + t.Fatalf("Unexpected error in TearDownPod: %v", err) + } + assert.Equal(t, []string{"10.0.0.1/32"}, fshaper.ResetCIDRs, "shaper.Reset should have been called") + + mockcni.AssertExpectations(t) +} + //TODO: add unit test for each implementation of network plugin interface