From 947e0e1bf500d30559913a1c0f28c41a47ab7f2b Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Thu, 16 Feb 2017 11:37:54 -0800 Subject: [PATCH 1/2] pass pod annotation to SetUpPod --- pkg/kubelet/dockershim/docker_sandbox.go | 2 +- pkg/kubelet/dockertools/docker_manager.go | 2 +- pkg/kubelet/network/cni/cni.go | 2 +- pkg/kubelet/network/cni/cni_test.go | 2 +- pkg/kubelet/network/kubenet/kubenet_linux.go | 2 +- pkg/kubelet/network/kubenet/kubenet_unsupported.go | 2 +- pkg/kubelet/network/plugins.go | 8 ++++---- pkg/kubelet/network/testing/mock_network_plugin.go | 2 +- pkg/kubelet/network/testing/plugins_test.go | 8 ++++---- pkg/kubelet/rkt/rkt.go | 2 +- 10 files changed, 16 insertions(+), 16 deletions(-) diff --git a/pkg/kubelet/dockershim/docker_sandbox.go b/pkg/kubelet/dockershim/docker_sandbox.go index 1c8fa11f941..f529a065d46 100644 --- a/pkg/kubelet/dockershim/docker_sandbox.go +++ b/pkg/kubelet/dockershim/docker_sandbox.go @@ -103,7 +103,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str // on the host as well, to satisfy parts of the pod spec that aren't // recognized by the CNI standard yet. cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) - err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID) + err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations) // TODO: Do we need to teardown on failure or can we rely on a StopPodSandbox call with the given ID? return createResp.ID, err } diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index e97f0777c11..388ea65a53a 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -2249,7 +2249,7 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod)) result.AddSyncResult(setupNetworkResult) if !kubecontainer.IsHostNetworkPod(pod) { - if err := dm.network.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()); err != nil { + if err := dm.network.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID(), pod.Annotations); err != nil { setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, err.Error()) glog.Error(err) diff --git a/pkg/kubelet/network/cni/cni.go b/pkg/kubelet/network/cni/cni.go index 9c5b72a5d53..8e14802bbc6 100644 --- a/pkg/kubelet/network/cni/cni.go +++ b/pkg/kubelet/network/cni/cni.go @@ -189,7 +189,7 @@ func (plugin *cniNetworkPlugin) Name() string { return CNIPluginName } -func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { +func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error { if err := plugin.checkInitialized(); err != nil { return err } diff --git a/pkg/kubelet/network/cni/cni_test.go b/pkg/kubelet/network/cni/cni_test.go index 1d76236fea8..76c2e00fa83 100644 --- a/pkg/kubelet/network/cni/cni_test.go +++ b/pkg/kubelet/network/cni/cni_test.go @@ -215,7 +215,7 @@ func TestCNIPlugin(t *testing.T) { } // Set up the pod - err = plug.SetUpPod("podNamespace", "podName", containerID) + err = plug.SetUpPod("podNamespace", "podName", containerID, map[string]string{}) if err != nil { t.Errorf("Expected nil: %v", err) } diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 8ac40d44b7e..f21aa929b85 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -406,7 +406,7 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube return nil } -func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { +func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error { plugin.mu.Lock() defer plugin.mu.Unlock() diff --git a/pkg/kubelet/network/kubenet/kubenet_unsupported.go b/pkg/kubelet/network/kubenet/kubenet_unsupported.go index 34bc73581d3..ea73b00bf13 100644 --- a/pkg/kubelet/network/kubenet/kubenet_unsupported.go +++ b/pkg/kubelet/network/kubenet/kubenet_unsupported.go @@ -42,7 +42,7 @@ func (plugin *kubenetNetworkPlugin) Name() string { return "kubenet" } -func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { +func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error { return fmt.Errorf("Kubenet is not supported in this build") } diff --git a/pkg/kubelet/network/plugins.go b/pkg/kubelet/network/plugins.go index 5b13ef86efb..8d8845e30a7 100644 --- a/pkg/kubelet/network/plugins.go +++ b/pkg/kubelet/network/plugins.go @@ -70,7 +70,7 @@ type NetworkPlugin interface { // the pod has been created but before the other containers of the // pod are launched. // TODO: rename podInfraContainerID to sandboxID - SetUpPod(namespace string, name string, podInfraContainerID kubecontainer.ContainerID) error + SetUpPod(namespace string, name string, podInfraContainerID kubecontainer.ContainerID, annotations map[string]string) error // TearDownPod is the method called before a pod's infra container will be deleted // TODO: rename podInfraContainerID to sandboxID @@ -235,7 +235,7 @@ func (plugin *NoopNetworkPlugin) Capabilities() utilsets.Int { return utilsets.NewInt() } -func (plugin *NoopNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { +func (plugin *NoopNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error { return nil } @@ -389,13 +389,13 @@ func (pm *PluginManager) GetPodNetworkStatus(podNamespace, podName string, id ku return netStatus, nil } -func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID) error { +func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID, annotations map[string]string) error { fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) pm.podLock(fullPodName).Lock() defer pm.podUnlock(fullPodName) glog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName) - if err := pm.plugin.SetUpPod(podNamespace, podName, id); err != nil { + if err := pm.plugin.SetUpPod(podNamespace, podName, id, annotations); err != nil { return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err) } diff --git a/pkg/kubelet/network/testing/mock_network_plugin.go b/pkg/kubelet/network/testing/mock_network_plugin.go index 3eaa3fefe59..9015c360a39 100644 --- a/pkg/kubelet/network/testing/mock_network_plugin.go +++ b/pkg/kubelet/network/testing/mock_network_plugin.go @@ -102,7 +102,7 @@ func (_mr *_MockNetworkPluginRecorder) Name() *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "Name") } -func (_m *MockNetworkPlugin) SetUpPod(_param0 string, _param1 string, _param2 container.ContainerID) error { +func (_m *MockNetworkPlugin) SetUpPod(_param0 string, _param1 string, _param2 container.ContainerID, annotations map[string]string) error { ret := _m.ctrl.Call(_m, "SetUpPod", _param0, _param1, _param2) ret0, _ := ret[0].(error) return ret0 diff --git a/pkg/kubelet/network/testing/plugins_test.go b/pkg/kubelet/network/testing/plugins_test.go index 96f46013ea4..fd12c0ef42d 100644 --- a/pkg/kubelet/network/testing/plugins_test.go +++ b/pkg/kubelet/network/testing/plugins_test.go @@ -77,7 +77,7 @@ func TestPluginManager(t *testing.T) { // concurrently. allCreatedWg.Wait() - if err := pm.SetUpPod("", name, id); err != nil { + if err := pm.SetUpPod("", name, id, nil); err != nil { t.Errorf("Failed to set up pod %q: %v", name, err) return } @@ -128,7 +128,7 @@ func (p *hookableFakeNetworkPlugin) Capabilities() utilsets.Int { return utilsets.NewInt() } -func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { +func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error { if p.setupHook != nil { p.setupHook(namespace, name, id) } @@ -179,7 +179,7 @@ func TestMultiPodParallelNetworkOps(t *testing.T) { // Setup will block on the runner pod completing. If network // operations locking isn't correct (eg pod network operations // block other pods) setUpPod() will never return. - if err := pm.SetUpPod("", podName, containerID); err != nil { + if err := pm.SetUpPod("", podName, containerID, nil); err != nil { t.Errorf("Failed to set up waiter pod: %v", err) return } @@ -199,7 +199,7 @@ func TestMultiPodParallelNetworkOps(t *testing.T) { podName := "runner" containerID := kubecontainer.ContainerID{ID: podName} - if err := pm.SetUpPod("", podName, containerID); err != nil { + if err := pm.SetUpPod("", podName, containerID, nil); err != nil { t.Errorf("Failed to set up runner pod: %v", err) return } diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 2e7b89fe255..4a125306e38 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -1297,7 +1297,7 @@ func (r *Runtime) setupPodNetwork(pod *v1.Pod) (string, string, error) { // Set up networking with the network plugin containerID := kubecontainer.ContainerID{ID: string(pod.UID)} - err = r.network.SetUpPod(pod.Namespace, pod.Name, containerID) + err = r.network.SetUpPod(pod.Namespace, pod.Name, containerID, pod.Annotations) if err != nil { return "", "", err } From f006c8bcd39b3dbfee1299bf60acbfb85fdfaf34 Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Thu, 16 Feb 2017 11:41:11 -0800 Subject: [PATCH 2/2] teach kubenet to use annotation instead of pod object for traffic shaper --- pkg/kubelet/network/kubenet/kubenet_linux.go | 32 ++++++++++---------- pkg/util/bandwidth/utils.go | 3 ++ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index f21aa929b85..166dcabc8ad 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -307,7 +307,7 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int { // setup sets up networking through CNI using the given ns/name and sandbox ID. // TODO: Don't pass the pod to this method, it only needs it for bandwidth // shaping and hostport management. -func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *v1.Pod) error { +func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *v1.Pod, annotations map[string]string) error { // Bring up container loopback interface if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil { return err @@ -359,23 +359,22 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube plugin.podIPs[id] = ip4.String() + // The first SetUpPod call creates the bridge; get a shaper for the sake of initialization + // TODO: replace with CNI traffic shaper plugin + shaper := plugin.shaper() + ingress, egress, err := bandwidth.ExtractPodBandwidthResources(annotations) + if err != nil { + return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) + } + if egress != nil || ingress != nil { + if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil { + return fmt.Errorf("Failed to add pod to shaper: %v", err) + } + } + // The host can choose to not support "legacy" features. The remote // shim doesn't support it (#35457), but the kubelet does. if plugin.host.SupportsLegacyFeatures() { - // The first SetUpPod call creates the bridge; get a shaper for the sake of - // initialization - shaper := plugin.shaper() - - ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) - if err != nil { - return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) - } - if egress != nil || ingress != nil { - if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil { - return fmt.Errorf("Failed to add pod to shaper: %v", err) - } - } - // Open any hostport the pod's containers want activePodPortMappings, err := plugin.getPodPortMappings() if err != nil { @@ -387,6 +386,7 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube return err } } else { + // TODO: replace with CNI port-forwarding plugin portMappings, err := plugin.host.GetPodPortMappings(id.ID) if err != nil { return err @@ -425,7 +425,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k return fmt.Errorf("Kubenet cannot SetUpPod: %v", err) } - if err := plugin.setup(namespace, name, id, pod); err != nil { + if err := plugin.setup(namespace, name, id, pod, annotations); err != nil { // Make sure everything gets cleaned up on errors podIP, _ := plugin.podIPs[id] if err := plugin.teardown(namespace, name, id, podIP); err != nil { diff --git a/pkg/util/bandwidth/utils.go b/pkg/util/bandwidth/utils.go index bcd64f528bb..451ab68836c 100644 --- a/pkg/util/bandwidth/utils.go +++ b/pkg/util/bandwidth/utils.go @@ -36,6 +36,9 @@ func validateBandwidthIsReasonable(rsrc *resource.Quantity) error { } func ExtractPodBandwidthResources(podAnnotations map[string]string) (ingress, egress *resource.Quantity, err error) { + if podAnnotations == nil { + return nil, nil, nil + } str, found := podAnnotations["kubernetes.io/ingress-bandwidth"] if found { ingressValue, err := resource.ParseQuantity(str)