From 9f3ef68ebc3afe285b2b6fffb01a9137ba3e2e91 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Mon, 10 Aug 2015 15:08:31 -0700 Subject: [PATCH] integrate bandwidth shaping and the kubelet. --- pkg/kubelet/kubelet.go | 117 +++++++++++- pkg/kubelet/kubelet_test.go | 245 ++++++++++++++++++++++++ pkg/util/bandwidth/fake_shaper.go | 49 +++++ pkg/util/bandwidth/interfaces.go | 8 +- pkg/util/bandwidth/linux.go | 160 ++++++++++++++-- pkg/util/bandwidth/linux_test.go | 302 +++++++++++++++++++++++++++--- 6 files changed, 831 insertions(+), 50 deletions(-) create mode 100644 pkg/util/bandwidth/fake_shaper.go diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 093c3279abe..b5d0eb4b25e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -58,6 +58,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/bandwidth" utilErrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/mount" nodeutil "k8s.io/kubernetes/pkg/util/node" @@ -552,6 +553,9 @@ type Kubelet struct { // DNS resolver configuration file. This can be used in conjunction with // clusterDomain and clusterDNS. resolverConfig string + + // Optionally shape the bandwidth of a pod + shaper bandwidth.BandwidthShaper } // getRootDir returns the full path to the directory under which kubelet can @@ -1314,6 +1318,31 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont return err } + ingress, egress, err := extractBandwidthResources(pod) + if err != nil { + return err + } + if egress != nil || ingress != nil { + if pod.Spec.HostNetwork { + kl.recorder.Event(pod, "host network not supported", "Bandwidth shaping is not currently supported on the host network") + } else if kl.shaper != nil { + status, found := kl.statusManager.GetPodStatus(pod.UID) + if !found { + statusPtr, err := kl.containerRuntime.GetPodStatus(pod) + if err != nil { + glog.Errorf("Error getting pod for bandwidth shaping") + return err + } + status = *statusPtr + } + if len(status.PodIP) > 0 { + err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", status.PodIP), egress, ingress) + } + } else { + kl.recorder.Event(pod, "nil shaper", "Pod requests bandwidth shaping, but the shaper is undefined") + } + } + if isStaticPod(pod) { if mirrorPod != nil && !kl.podManager.IsMirrorPodOf(mirrorPod, pod) { // The mirror pod is semantically different from the static pod. Remove @@ -1391,6 +1420,48 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod, runningPods []*kubeco return utilErrors.NewAggregate(errlist) } +func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error { + if kl.shaper == nil { + return nil + } + currentCIDRs, err := kl.shaper.GetCIDRs() + if err != nil { + return err + } + possibleCIDRs := util.StringSet{} + for ix := range allPods { + pod := allPods[ix] + ingress, egress, err := extractBandwidthResources(pod) + if err != nil { + return err + } + if ingress == nil && egress == nil { + glog.V(8).Infof("Not a bandwidth limited container...") + continue + } + status, found := kl.statusManager.GetPodStatus(pod.UID) + if !found { + statusPtr, err := kl.containerRuntime.GetPodStatus(pod) + if err != nil { + return err + } + status = *statusPtr + } + if status.Phase == api.PodRunning { + possibleCIDRs.Insert(fmt.Sprintf("%s/32", status.PodIP)) + } + } + for _, cidr := range currentCIDRs { + if !possibleCIDRs.Has(cidr) { + glog.V(2).Infof("Removing CIDR: %s (%v)", cidr, possibleCIDRs) + if err := kl.shaper.Reset(cidr); err != nil { + return err + } + } + } + return nil +} + // Compares the map of current volumes to the map of desired volumes. // If an active volume does not have a respective desired volume, clean it up. func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubecontainer.Pod) error { @@ -1623,6 +1694,11 @@ func (kl *Kubelet) HandlePodCleanups() error { glog.Errorf("Failed to cleanup terminated pods: %v", err) } + // Clear out any old bandwith rules + if err = kl.cleanupBandwidthLimits(allPods); err != nil { + return err + } + kl.backOff.GC() return err } @@ -2071,7 +2147,11 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error { if err := ensureCbr0(cidr); err != nil { return err } - return nil + if kl.shaper == nil { + glog.V(5).Info("Shaper is nil, creating") + kl.shaper = bandwidth.NewTCShaper("cbr0") + } + return kl.shaper.ReconcileInterface() } // updateNodeStatus updates node status to master with retries. @@ -2642,3 +2722,38 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) { func (kl *Kubelet) GetRuntime() kubecontainer.Runtime { return kl.containerRuntime } + +var minRsrc = resource.MustParse("1k") +var maxRsrc = resource.MustParse("1P") + +func validateBandwidthIsReasonable(rsrc *resource.Quantity) error { + if rsrc.Value() < minRsrc.Value() { + return fmt.Errorf("resource is unreasonably small (< 1kbit)") + } + if rsrc.Value() > maxRsrc.Value() { + return fmt.Errorf("resoruce is unreasonably large (> 1Pbit)") + } + return nil +} + +func extractBandwidthResources(pod *api.Pod) (ingress, egress *resource.Quantity, err error) { + str, found := pod.Annotations["kubernetes.io/ingress-bandwidth"] + if found { + if ingress, err = resource.ParseQuantity(str); err != nil { + return nil, nil, err + } + if err := validateBandwidthIsReasonable(ingress); err != nil { + return nil, nil, err + } + } + str, found = pod.Annotations["kubernetes.io/egress-bandwidth"] + if found { + if egress, err = resource.ParseQuantity(str); err != nil { + return nil, nil, err + } + if err := validateBandwidthIsReasonable(egress); err != nil { + return nil, nil, err + } + } + return ingress, egress, nil +} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 900fbaa5e73..d3f76bfc914 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/bandwidth" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/volume" _ "k8s.io/kubernetes/pkg/volume/host_path" @@ -3387,3 +3388,247 @@ func TestDoesNotDeletePodDirsIfContainerIsRunning(t *testing.T) { testKubelet.fakeRuntime.PodList = []*kubecontainer.Pod{} syncAndVerifyPodDir(t, testKubelet, pods, []*api.Pod{apiPod}, false) } + +func TestCleanupBandwidthLimits(t *testing.T) { + tests := []struct { + status *api.PodStatus + pods []*api.Pod + inputCIDRs []string + expectResetCIDRs []string + cacheStatus bool + expectedCalls []string + name string + }{ + { + status: &api.PodStatus{ + PodIP: "1.2.3.4", + Phase: api.PodRunning, + }, + pods: []*api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Annotations: map[string]string{ + "kubernetes.io/ingress-bandwidth": "10M", + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "bar", + }, + }, + }, + inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"}, + expectResetCIDRs: []string{"2.3.4.5/32", "5.6.7.8/32"}, + expectedCalls: []string{"GetPodStatus"}, + name: "pod running", + }, + { + status: &api.PodStatus{ + PodIP: "1.2.3.4", + Phase: api.PodRunning, + }, + pods: []*api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Annotations: map[string]string{ + "kubernetes.io/ingress-bandwidth": "10M", + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "bar", + }, + }, + }, + inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"}, + expectResetCIDRs: []string{"2.3.4.5/32", "5.6.7.8/32"}, + expectedCalls: []string{}, + cacheStatus: true, + name: "pod running with cache", + }, + { + status: &api.PodStatus{ + PodIP: "1.2.3.4", + Phase: api.PodFailed, + }, + pods: []*api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Annotations: map[string]string{ + "kubernetes.io/ingress-bandwidth": "10M", + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "bar", + }, + }, + }, + inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"}, + expectResetCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"}, + expectedCalls: []string{"GetPodStatus"}, + name: "pod not running", + }, + { + status: &api.PodStatus{ + PodIP: "1.2.3.4", + Phase: api.PodFailed, + }, + pods: []*api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Annotations: map[string]string{ + "kubernetes.io/ingress-bandwidth": "10M", + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "bar", + }, + }, + }, + inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"}, + expectResetCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"}, + expectedCalls: []string{}, + cacheStatus: true, + name: "pod not running with cache", + }, + { + status: &api.PodStatus{ + PodIP: "1.2.3.4", + Phase: api.PodRunning, + }, + pods: []*api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + Name: "foo", + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "bar", + }, + }, + }, + inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"}, + expectResetCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"}, + name: "no bandwidth limits", + }, + } + for _, test := range tests { + shaper := &bandwidth.FakeShaper{ + CIDRs: test.inputCIDRs, + } + + testKube := newTestKubelet(t) + testKube.kubelet.shaper = shaper + testKube.fakeRuntime.PodStatus = *test.status + + if test.cacheStatus { + for _, pod := range test.pods { + testKube.kubelet.statusManager.SetPodStatus(pod, *test.status) + } + } + + err := testKube.kubelet.cleanupBandwidthLimits(test.pods) + if err != nil { + t.Errorf("unexpected error: %v (%s)", test.name) + } + if !reflect.DeepEqual(shaper.ResetCIDRs, test.expectResetCIDRs) { + t.Errorf("[%s]\nexpected: %v, saw: %v", test.name, test.expectResetCIDRs, shaper.ResetCIDRs) + } + + if test.cacheStatus { + if len(testKube.fakeRuntime.CalledFunctions) != 0 { + t.Errorf("unexpected function calls: %v", testKube.fakeRuntime.CalledFunctions) + } + } else if !reflect.DeepEqual(testKube.fakeRuntime.CalledFunctions, test.expectedCalls) { + t.Errorf("[%s], expected %v, saw %v", test.name, test.expectedCalls, testKube.fakeRuntime.CalledFunctions) + } + } +} + +func TestExtractBandwidthResources(t *testing.T) { + four, _ := resource.ParseQuantity("4M") + ten, _ := resource.ParseQuantity("10M") + twenty, _ := resource.ParseQuantity("20M") + tests := []struct { + pod *api.Pod + expectedIngress *resource.Quantity + expectedEgress *resource.Quantity + expectError bool + }{ + { + pod: &api.Pod{}, + }, + { + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Annotations: map[string]string{ + "kubernetes.io/ingress-bandwidth": "10M", + }, + }, + }, + expectedIngress: ten, + }, + { + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Annotations: map[string]string{ + "kubernetes.io/egress-bandwidth": "10M", + }, + }, + }, + expectedEgress: ten, + }, + { + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Annotations: map[string]string{ + "kubernetes.io/ingress-bandwidth": "4M", + "kubernetes.io/egress-bandwidth": "20M", + }, + }, + }, + expectedIngress: four, + expectedEgress: twenty, + }, + { + pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Annotations: map[string]string{ + "kubernetes.io/ingress-bandwidth": "foo", + }, + }, + }, + expectError: true, + }, + } + for _, test := range tests { + ingress, egress, err := extractBandwidthResources(test.pod) + if test.expectError { + if err == nil { + t.Errorf("unexpected non-error") + } + continue + } + if err != nil { + t.Errorf("unexpected error: %v", err) + continue + } + if !reflect.DeepEqual(ingress, test.expectedIngress) { + t.Errorf("expected: %v, saw: %v", ingress, test.expectedIngress) + } + if !reflect.DeepEqual(egress, test.expectedEgress) { + t.Errorf("expected: %v, saw: %v", egress, test.expectedEgress) + } + } +} diff --git a/pkg/util/bandwidth/fake_shaper.go b/pkg/util/bandwidth/fake_shaper.go new file mode 100644 index 00000000000..12798bde18e --- /dev/null +++ b/pkg/util/bandwidth/fake_shaper.go @@ -0,0 +1,49 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import ( + "errors" + + "k8s.io/kubernetes/pkg/api/resource" +) + +type FakeShaper struct { + CIDRs []string + ResetCIDRs []string +} + +func (f *FakeShaper) Limit(cidr string, egress, ingress *resource.Quantity) error { + return errors.New("unimplemented") +} + +func (f *FakeShaper) Reset(cidr string) error { + f.ResetCIDRs = append(f.ResetCIDRs, cidr) + return nil +} + +func (f *FakeShaper) ReconcileInterface() error { + return errors.New("unimplemented") +} + +func (f *FakeShaper) ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error { + return errors.New("unimplemented") +} + +func (f *FakeShaper) GetCIDRs() ([]string, error) { + return f.CIDRs, nil +} diff --git a/pkg/util/bandwidth/interfaces.go b/pkg/util/bandwidth/interfaces.go index aa817ceae56..62b20d87d0c 100644 --- a/pkg/util/bandwidth/interfaces.go +++ b/pkg/util/bandwidth/interfaces.go @@ -26,7 +26,13 @@ type BandwidthShaper interface { // 'ingress' bandwidth limit applies to all packets on the interface whose destination matches 'cidr' // Limits are aggregate limits for the CIDR, not per IP address. CIDRs must be unique, but can be overlapping, traffic // that matches multiple CIDRs counts against all limits. - Limit(cidr string, egress, ingress resource.Quantity) error + Limit(cidr string, egress, ingress *resource.Quantity) error // Remove a bandwidth limit for a particular CIDR on a particular network interface Reset(cidr string) error + // Reconcile the interface managed by this shaper with the state on the ground. + ReconcileInterface() error + // Reconcile a CIDR managed by this shaper with the state on the ground + ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error + // GetCIDRs returns the set of CIDRs that are being managed by this shaper + GetCIDRs() ([]string, error) } diff --git a/pkg/util/bandwidth/linux.go b/pkg/util/bandwidth/linux.go index 542d8db2c85..e1ebd687b1a 100644 --- a/pkg/util/bandwidth/linux.go +++ b/pkg/util/bandwidth/linux.go @@ -47,7 +47,6 @@ func NewTCShaper(iface string) BandwidthShaper { e: exec.New(), iface: iface, } - shaper.initializeInterface() return shaper } @@ -105,15 +104,34 @@ func hexCIDR(cidr string) (string, error) { return hexIP + "/" + hexMask, nil } -func (t *tcShaper) findCIDRClass(cidr string) (class, handle string, err error) { +// Convert a CIDR from hex representation to text, opposite of the above. +func asciiCIDR(cidr string) (string, error) { + parts := strings.Split(cidr, "/") + if len(parts) != 2 { + return "", fmt.Errorf("unexpected CIDR format: %s", cidr) + } + ipData, err := hex.DecodeString(parts[0]) + if err != nil { + return "", err + } + ip := net.IP(ipData) + + maskData, err := hex.DecodeString(parts[1]) + mask := net.IPMask(maskData) + size, _ := mask.Size() + + return fmt.Sprintf("%s/%d", ip.String(), size), nil +} + +func (t *tcShaper) findCIDRClass(cidr string) (class, handle string, found bool, err error) { data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput() if err != nil { - return "", "", err + return "", "", false, err } hex, err := hexCIDR(cidr) if err != nil { - return "", "", err + return "", "", false, err } spec := fmt.Sprintf("match %s", hex) @@ -133,15 +151,15 @@ func (t *tcShaper) findCIDRClass(cidr string) (class, handle string, err error) // expected tc line: // filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1 if len(parts) != 19 { - return "", "", fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(parts), parts) + return "", "", false, fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(parts), parts) } - return parts[18], parts[9], nil + return parts[18], parts[9], true, nil } } - return "", "", fmt.Errorf("Failed to find cidr: %s on interface: %s", cidr, t.iface) + return "", "", false, nil } -func makeKBitString(rsrc resource.Quantity) string { +func makeKBitString(rsrc *resource.Quantity) string { return fmt.Sprintf("%dkbit", (rsrc.Value() / 1000)) } @@ -150,27 +168,91 @@ func (t *tcShaper) makeNewClass(rate string) (int, error) { if err != nil { return -1, err } - if err := t.execAndLog("tc", "class", "add", "dev", t.iface, "parent", "1:", "classid", fmt.Sprintf("1:%d", class), "htb", "rate", rate); err != nil { + if err := t.execAndLog("tc", "class", "add", + "dev", t.iface, + "parent", "1:", + "classid", fmt.Sprintf("1:%d", class), + "htb", "rate", rate); err != nil { return -1, err } return class, nil } -func (t *tcShaper) Limit(cidr string, upload, download resource.Quantity) (err error) { +func (t *tcShaper) Limit(cidr string, upload, download *resource.Quantity) (err error) { var downloadClass, uploadClass int - if downloadClass, err = t.makeNewClass(makeKBitString(download)); err != nil { - return err + if download != nil { + if downloadClass, err = t.makeNewClass(makeKBitString(download)); err != nil { + return err + } + if err := t.execAndLog("tc", "filter", "add", + "dev", t.iface, + "protocol", "ip", + "parent", "1:0", + "prio", "1", "u32", + "match", "ip", "dst", cidr, + "flowid", fmt.Sprintf("1:%d", downloadClass)); err != nil { + return err + } } - if uploadClass, err = t.makeNewClass(makeKBitString(upload)); err != nil { - return err + if upload != nil { + if uploadClass, err = t.makeNewClass(makeKBitString(upload)); err != nil { + return err + } + if err := t.execAndLog("tc", "filter", "add", + "dev", t.iface, + "protocol", "ip", + "parent", "1:0", + "prio", "1", "u32", + "match", "ip", "src", cidr, + "flowid", fmt.Sprintf("1:%d", uploadClass)); err != nil { + return err + } } + return nil +} - if err := t.execAndLog("tc", "filter", "add", "dev", t.iface, "protocol", "ip", "parent", "1:0", "prio", "1", "u32", "match", "ip", "dst", cidr, "flowid", fmt.Sprintf("1:%d", downloadClass)); err != nil { +// tests to see if an interface exists, if it does, return true and the status line for the interface +// returns false, "", if an error occurs. +func (t *tcShaper) interfaceExists() (bool, string, error) { + data, err := t.e.Command("tc", "qdisc", "show", "dev", t.iface).CombinedOutput() + if err != nil { + return false, "", err + } + value := strings.TrimSpace(string(data)) + if len(value) == 0 { + return false, "", nil + } + return true, value, nil +} + +func (t *tcShaper) ReconcileCIDR(cidr string, upload, download *resource.Quantity) error { + _, _, found, err := t.findCIDRClass(cidr) + if err != nil { return err } - if err := t.execAndLog("tc", "filter", "add", "dev", t.iface, "protocol", "ip", "parent", "1:0", "prio", "1", "u32", "match", "ip", "src", cidr, "flowid", fmt.Sprintf("1:%d", uploadClass)); err != nil { + if !found { + return t.Limit(cidr, upload, download) + } + // TODO: actually check bandwidth limits here + return nil +} + +func (t *tcShaper) ReconcileInterface() error { + exists, output, err := t.interfaceExists() + if err != nil { return err } + if !exists { + glog.V(4).Info("Didn't find bandwidth interface, creating") + return t.initializeInterface() + } + fields := strings.Split(output, " ") + if len(fields) != 12 || fields[1] != "htb" || fields[2] != "1:" { + if err := t.deleteInterface(fields[2]); err != nil { + return err + } + return t.initializeInterface() + } return nil } @@ -179,12 +261,54 @@ func (t *tcShaper) initializeInterface() error { } func (t *tcShaper) Reset(cidr string) error { - class, handle, err := t.findCIDRClass(cidr) + class, handle, found, err := t.findCIDRClass(cidr) if err != nil { return err } - if err := t.execAndLog("tc", "filter", "del", "dev", t.iface, "parent", "1:", "proto", "ip", "prio", "1", "handle", handle, "u32"); err != nil { + if !found { + return fmt.Errorf("Failed to find cidr: %s on interface: %s", cidr, t.iface) + } + if err := t.execAndLog("tc", "filter", "del", + "dev", t.iface, + "parent", "1:", + "proto", "ip", + "prio", "1", + "handle", handle, "u32"); err != nil { return err } return t.execAndLog("tc", "class", "del", "dev", t.iface, "parent", "1:", "classid", class) } + +func (t *tcShaper) deleteInterface(class string) error { + return t.execAndLog("tc", "qdisc", "delete", "dev", t.iface, "root", "handle", class) +} + +func (t *tcShaper) GetCIDRs() ([]string, error) { + data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput() + if err != nil { + return nil, err + } + + result := []string{} + scanner := bufio.NewScanner(bytes.NewBuffer(data)) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if len(line) == 0 { + continue + } + if strings.Contains(line, "match") { + parts := strings.Split(line, " ") + // expected tc line: + // match at + if len(parts) != 4 { + return nil, fmt.Errorf("unexpected output: %v", parts) + } + cidr, err := asciiCIDR(parts[1]) + if err != nil { + return nil, err + } + result = append(result, cidr) + } + } + return result, nil +} diff --git a/pkg/util/bandwidth/linux_test.go b/pkg/util/bandwidth/linux_test.go index fb5985114d9..a1ac3e216b3 100644 --- a/pkg/util/bandwidth/linux_test.go +++ b/pkg/util/bandwidth/linux_test.go @@ -18,6 +18,8 @@ package bandwidth import ( "errors" + "reflect" + "strings" "testing" "k8s.io/kubernetes/pkg/api/resource" @@ -95,7 +97,7 @@ func TestHexCIDR(t *testing.T) { expectErr bool }{ { - input: "1.2.3.4/16", + input: "1.2.0.0/16", output: "01020000/ffff0000", }, { @@ -120,6 +122,13 @@ func TestHexCIDR(t *testing.T) { if output != test.output { t.Errorf("expected: %s, saw: %s", test.output, output) } + input, err := asciiCIDR(output) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if input != test.input { + t.Errorf("expected: %s, saw: %s", test.input, input) + } } } } @@ -137,6 +146,7 @@ func TestFindCIDRClass(t *testing.T) { cidr string output string expectErr bool + expectNotFound bool expectedClass string expectedHandle string err error @@ -153,6 +163,11 @@ func TestFindCIDRClass(t *testing.T) { expectedClass: "1:2", expectedHandle: "800::801", }, + { + cidr: "2.2.3.4/16", + output: tcFilterOutput, + expectNotFound: true, + }, { err: errors.New("test error"), expectErr: true, @@ -172,7 +187,7 @@ func TestFindCIDRClass(t *testing.T) { }, } shaper := &tcShaper{e: &fexec} - class, handle, err := shaper.findCIDRClass(test.cidr) + class, handle, found, err := shaper.findCIDRClass(test.cidr) if test.expectErr { if err == nil { t.Errorf("unexpected non-error") @@ -181,28 +196,78 @@ func TestFindCIDRClass(t *testing.T) { if err != nil { t.Errorf("unexpected error: %v", err) } - if class != test.expectedClass { - t.Errorf("expected: %s, found %s", test.expectedClass, class) - } - if handle != test.expectedHandle { - t.Errorf("expected: %s, found %s", test.expectedHandle, handle) + if test.expectNotFound { + if found { + t.Errorf("unexpectedly found an interface: %s %s", class, handle) + } + } else { + if class != test.expectedClass { + t.Errorf("expected: %s, found %s", test.expectedClass, class) + } + if handle != test.expectedHandle { + t.Errorf("expected: %s, found %s", test.expectedHandle, handle) + } } } } } +func TestGetCIDRs(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte(tcFilterOutput), nil }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { + return exec.InitFakeCmd(&fcmd, cmd, args...) + }, + }, + } + shaper := &tcShaper{e: &fexec} + cidrs, err := shaper.GetCIDRs() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + expectedCidrs := []string{"172.17.0.2/32", "1.2.0.0/16"} + if !reflect.DeepEqual(cidrs, expectedCidrs) { + t.Errorf("expected: %v, saw: %v", expectedCidrs, cidrs) + } +} + func TestLimit(t *testing.T) { tests := []struct { - cidr string - ingress *resource.Quantity - egress *resource.Quantity - expectErr bool - err error + cidr string + ingress *resource.Quantity + egress *resource.Quantity + expectErr bool + expectedCalls int + err error }{ { - cidr: "1.2.3.4/32", - ingress: resource.NewQuantity(10, resource.DecimalSI), - egress: resource.NewQuantity(20, resource.DecimalSI), + cidr: "1.2.3.4/32", + ingress: resource.NewQuantity(10, resource.DecimalSI), + egress: resource.NewQuantity(20, resource.DecimalSI), + expectedCalls: 6, + }, + { + cidr: "1.2.3.4/32", + ingress: resource.NewQuantity(10, resource.DecimalSI), + egress: nil, + expectedCalls: 3, + }, + { + cidr: "1.2.3.4/32", + ingress: nil, + egress: resource.NewQuantity(20, resource.DecimalSI), + expectedCalls: 3, + }, + { + cidr: "1.2.3.4/32", + ingress: nil, + egress: nil, + expectedCalls: 0, }, { err: errors.New("test error"), @@ -217,8 +282,8 @@ func TestLimit(t *testing.T) { CombinedOutputScript: []exec.FakeCombinedOutputAction{ func() ([]byte, error) { return []byte(tcClassOutput), test.err }, func() ([]byte, error) { return []byte{}, test.err }, - func() ([]byte, error) { return []byte(tcClassOutput2), test.err }, func() ([]byte, error) { return []byte{}, test.err }, + func() ([]byte, error) { return []byte(tcClassOutput2), test.err }, func() ([]byte, error) { return []byte{}, test.err }, func() ([]byte, error) { return []byte{}, test.err }, }, @@ -236,7 +301,7 @@ func TestLimit(t *testing.T) { } iface := "cbr0" shaper := &tcShaper{e: &fexec, iface: iface} - if err := shaper.Limit(test.cidr, *test.ingress, *test.egress); err != nil && !test.expectErr { + if err := shaper.Limit(test.cidr, test.ingress, test.egress); err != nil && !test.expectErr { t.Errorf("unexpected error: %v", err) return } else if err == nil && test.expectErr { @@ -251,8 +316,8 @@ func TestLimit(t *testing.T) { return } - if fcmd.CombinedOutputCalls != 6 { - t.Errorf("unexpected number of calls: %d, expected: 6", fcmd.CombinedOutputCalls) + if fcmd.CombinedOutputCalls != test.expectedCalls { + t.Errorf("unexpected number of calls: %d, expected: %d", fcmd.CombinedOutputCalls, test.expectedCalls) } for ix := range fcmd.CombinedOutputLog { @@ -264,22 +329,20 @@ func TestLimit(t *testing.T) { t.Errorf("unexpected interface: %s, expected %s (%v)", output[4], iface, output) } if ix == 1 { - if output[11] != makeKBitString(*test.ingress) { - t.Errorf("unexpected ingress: %s, expected: %s", output[11], makeKBitString(*test.ingress)) + var expectedRate string + if test.ingress != nil { + expectedRate = makeKBitString(test.ingress) + } else { + expectedRate = makeKBitString(test.egress) + } + if output[11] != expectedRate { + t.Errorf("unexpected ingress: %s, expected: %s", output[11], expectedRate) } if output[8] != "1:5" { t.Errorf("unexpected class: %s, expected: %s", output[8], "1:5") } } - if ix == 3 { - if output[11] != makeKBitString(*test.egress) { - t.Errorf("unexpected egress: %s, expected: %s", output[11], makeKBitString(*test.egress)) - } - if output[8] != "1:6" { - t.Errorf("unexpected class: %s, expected: %s", output[8], "1:6") - } - } - if ix == 4 { + if ix == 2 { if output[15] != test.cidr { t.Errorf("unexpected cidr: %s, expected: %s", output[15], test.cidr) } @@ -287,6 +350,14 @@ func TestLimit(t *testing.T) { t.Errorf("unexpected class: %s, expected: %s", output[17], "1:5") } } + if ix == 4 { + if output[11] != makeKBitString(test.egress) { + t.Errorf("unexpected egress: %s, expected: %s", output[11], makeKBitString(test.egress)) + } + if output[8] != "1:6" { + t.Errorf("unexpected class: %s, expected: %s", output[8], "1:6") + } + } if ix == 5 { if output[15] != test.cidr { t.Errorf("unexpected cidr: %s, expected: %s", output[15], test.cidr) @@ -378,3 +449,174 @@ func TestReset(t *testing.T) { } } } + +var tcQdisc = "qdisc htb 1: root refcnt 2 r2q 10 default 30 direct_packets_stat 0\n" + +func TestReconcileInterfaceExists(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte(tcQdisc), nil }, + }, + } + + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + iface := "cbr0" + shaper := &tcShaper{e: &fexec, iface: iface} + err := shaper.ReconcileInterface() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("unexpected number of calls: %d", fcmd.CombinedOutputCalls) + } + + output := fcmd.CombinedOutputLog[0] + if len(output) != 5 { + t.Errorf("unexpected command: %v", output) + } + if output[0] != "tc" { + t.Errorf("unexpected command: %s", output[0]) + } + if output[4] != iface { + t.Errorf("unexpected interface: %s, expected %s", output[4], iface) + } + if output[2] != "show" { + t.Errorf("unexpected action: %s", output[2]) + } +} + +func TestReconcileInterfaceDoesntExist(t *testing.T) { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte("\n"), nil }, + func() ([]byte, error) { return []byte("\n"), nil }, + }, + } + + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + iface := "cbr0" + shaper := &tcShaper{e: &fexec, iface: iface} + err := shaper.ReconcileInterface() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if fcmd.CombinedOutputCalls != 2 { + t.Errorf("unexpected number of calls: %d", fcmd.CombinedOutputCalls) + } + + for ix, output := range fcmd.CombinedOutputLog { + if output[0] != "tc" { + t.Errorf("unexpected command: %s", output[0]) + } + if output[4] != iface { + t.Errorf("unexpected interface: %s, expected %s", output[4], iface) + } + if ix == 0 { + if len(output) != 5 { + t.Errorf("unexpected command: %v", output) + } + if output[2] != "show" { + t.Errorf("unexpected action: %s", output[2]) + } + } + if ix == 1 { + if len(output) != 11 { + t.Errorf("unexpected command: %v", output) + } + if output[2] != "add" { + t.Errorf("unexpected action: %s", output[2]) + } + if output[7] != "1:" { + t.Errorf("unexpected root class: %s", output[7]) + } + if output[8] != "htb" { + t.Errorf("unexpected qdisc algo: %s", output[8]) + } + } + } +} + +var tcQdiscWrong = []string{ + "qdisc htb 2: root refcnt 2 r2q 10 default 30 direct_packets_stat 0\n", + "qdisc foo 1: root refcnt 2 r2q 10 default 30 direct_packets_stat 0\n", +} + +func TestReconcileInterfaceIsWrong(t *testing.T) { + for _, test := range tcQdiscWrong { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte(test), nil }, + func() ([]byte, error) { return []byte("\n"), nil }, + func() ([]byte, error) { return []byte("\n"), nil }, + }, + } + + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + iface := "cbr0" + shaper := &tcShaper{e: &fexec, iface: iface} + err := shaper.ReconcileInterface() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if fcmd.CombinedOutputCalls != 3 { + t.Errorf("unexpected number of calls: %d", fcmd.CombinedOutputCalls) + } + + for ix, output := range fcmd.CombinedOutputLog { + if output[0] != "tc" { + t.Errorf("unexpected command: %s", output[0]) + } + if output[4] != iface { + t.Errorf("unexpected interface: %s, expected %s", output[4], iface) + } + if ix == 0 { + if len(output) != 5 { + t.Errorf("unexpected command: %v", output) + } + if output[2] != "show" { + t.Errorf("unexpected action: %s", output[2]) + } + } + if ix == 1 { + if len(output) != 8 { + t.Errorf("unexpected command: %v", output) + } + if output[2] != "delete" { + t.Errorf("unexpected action: %s", output[2]) + } + if output[7] != strings.Split(test, " ")[2] { + t.Errorf("unexpected class: %s, expected: %s", output[7], strings.Split(test, " ")[2]) + } + } + if ix == 2 { + if len(output) != 11 { + t.Errorf("unexpected command: %v", output) + } + if output[7] != "1:" { + t.Errorf("unexpected root class: %s", output[7]) + } + if output[8] != "htb" { + t.Errorf("unexpected qdisc algo: %s", output[8]) + } + } + } + } +}