From dcd6b00d8372c27c95d38db349c1932e053905d8 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Wed, 29 Jul 2015 09:25:27 -0700 Subject: [PATCH] Add a package for bandwidth control --- pkg/util/bandwidth/doc.go | 18 ++ pkg/util/bandwidth/interfaces.go | 32 +++ pkg/util/bandwidth/linux.go | 190 ++++++++++++++++ pkg/util/bandwidth/linux_test.go | 380 +++++++++++++++++++++++++++++++ 4 files changed, 620 insertions(+) create mode 100644 pkg/util/bandwidth/doc.go create mode 100644 pkg/util/bandwidth/interfaces.go create mode 100644 pkg/util/bandwidth/linux.go create mode 100644 pkg/util/bandwidth/linux_test.go diff --git a/pkg/util/bandwidth/doc.go b/pkg/util/bandwidth/doc.go new file mode 100644 index 00000000000..5ae73e8be6d --- /dev/null +++ b/pkg/util/bandwidth/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package bandwidth provides utilities for bandwidth shaping +package bandwidth diff --git a/pkg/util/bandwidth/interfaces.go b/pkg/util/bandwidth/interfaces.go new file mode 100644 index 00000000000..aa817ceae56 --- /dev/null +++ b/pkg/util/bandwidth/interfaces.go @@ -0,0 +1,32 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import "k8s.io/kubernetes/pkg/api/resource" + +type BandwidthShaper interface { + // Limit the bandwidth for a particular CIDR on a particular interface + // * ingress and egress are in bits/second + // * cidr is expected to be a valid network CIDR (e.g. '1.2.3.4/32' or '10.20.0.1/16') + // 'egress' bandwidth limit applies to all packets on the interface whose source matches 'cidr' + // 'ingress' bandwidth limit applies to all packets on the interface whose destination matches 'cidr' + // Limits are aggregate limits for the CIDR, not per IP address. CIDRs must be unique, but can be overlapping, traffic + // that matches multiple CIDRs counts against all limits. + Limit(cidr string, egress, ingress resource.Quantity) error + // Remove a bandwidth limit for a particular CIDR on a particular network interface + Reset(cidr string) error +} diff --git a/pkg/util/bandwidth/linux.go b/pkg/util/bandwidth/linux.go new file mode 100644 index 00000000000..542d8db2c85 --- /dev/null +++ b/pkg/util/bandwidth/linux.go @@ -0,0 +1,190 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import ( + "bufio" + "bytes" + "encoding/hex" + "fmt" + "net" + "strings" + + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/exec" + + "github.com/golang/glog" +) + +// tcShaper provides an implementation of the BandwidthShaper interface on Linux using the 'tc' tool. +// In general, using this requires that the caller posses the NET_CAP_ADMIN capability, though if you +// do this within an container, it only requires the NS_CAPABLE capability for manipulations to that +// container's network namespace. +// Uses the hierarchical token bucket queueing discipline (htb), this requires Linux 2.4.20 or newer +// or a custom kernel with that queueing discipline backported. +type tcShaper struct { + e exec.Interface + iface string +} + +func NewTCShaper(iface string) BandwidthShaper { + shaper := &tcShaper{ + e: exec.New(), + iface: iface, + } + shaper.initializeInterface() + return shaper +} + +func (t *tcShaper) execAndLog(cmdStr string, args ...string) error { + glog.V(6).Infof("Running: %s %s", cmdStr, strings.Join(args, " ")) + cmd := t.e.Command(cmdStr, args...) + out, err := cmd.CombinedOutput() + glog.V(6).Infof("Output from tc: %s", string(out)) + return err +} + +func (t *tcShaper) nextClassID() (int, error) { + data, err := t.e.Command("tc", "class", "show", "dev", t.iface).CombinedOutput() + if err != nil { + return -1, err + } + + scanner := bufio.NewScanner(bytes.NewBuffer(data)) + classes := util.StringSet{} + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + // skip empty lines + if len(line) == 0 { + continue + } + parts := strings.Split(line, " ") + // expected tc line: + // class htb 1:1 root prio 0 rate 1000Kbit ceil 1000Kbit burst 1600b cburst 1600b + if len(parts) != 14 { + return -1, fmt.Errorf("unexpected output from tc: %s (%v)", scanner.Text(), parts) + } + classes.Insert(parts[2]) + } + + // Make sure it doesn't go forever + for nextClass := 1; nextClass < 10000; nextClass++ { + if !classes.Has(fmt.Sprintf("1:%d", nextClass)) { + return nextClass, nil + } + } + // This should really never happen + return -1, fmt.Errorf("exhausted class space, please try again") +} + +// Convert a CIDR from text to a hex representation +// Strips any masked parts of the IP, so 1.2.3.4/16 becomes hex(1.2.0.0)/ffffffff +func hexCIDR(cidr string) (string, error) { + ip, ipnet, err := net.ParseCIDR(cidr) + if err != nil { + return "", err + } + ip = ip.Mask(ipnet.Mask) + hexIP := hex.EncodeToString([]byte(ip.To4())) + hexMask := ipnet.Mask.String() + return hexIP + "/" + hexMask, nil +} + +func (t *tcShaper) findCIDRClass(cidr string) (class, handle string, err error) { + data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput() + if err != nil { + return "", "", err + } + + hex, err := hexCIDR(cidr) + if err != nil { + return "", "", err + } + spec := fmt.Sprintf("match %s", hex) + + scanner := bufio.NewScanner(bytes.NewBuffer(data)) + filter := "" + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if len(line) == 0 { + continue + } + if strings.HasPrefix(line, "filter") { + filter = line + continue + } + if strings.Contains(line, spec) { + parts := strings.Split(filter, " ") + // expected tc line: + // filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1 + if len(parts) != 19 { + return "", "", fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(parts), parts) + } + return parts[18], parts[9], nil + } + } + return "", "", fmt.Errorf("Failed to find cidr: %s on interface: %s", cidr, t.iface) +} + +func makeKBitString(rsrc resource.Quantity) string { + return fmt.Sprintf("%dkbit", (rsrc.Value() / 1000)) +} + +func (t *tcShaper) makeNewClass(rate string) (int, error) { + class, err := t.nextClassID() + if err != nil { + return -1, err + } + if err := t.execAndLog("tc", "class", "add", "dev", t.iface, "parent", "1:", "classid", fmt.Sprintf("1:%d", class), "htb", "rate", rate); err != nil { + return -1, err + } + return class, nil +} + +func (t *tcShaper) Limit(cidr string, upload, download resource.Quantity) (err error) { + var downloadClass, uploadClass int + if downloadClass, err = t.makeNewClass(makeKBitString(download)); err != nil { + return err + } + if uploadClass, err = t.makeNewClass(makeKBitString(upload)); err != nil { + return err + } + + if err := t.execAndLog("tc", "filter", "add", "dev", t.iface, "protocol", "ip", "parent", "1:0", "prio", "1", "u32", "match", "ip", "dst", cidr, "flowid", fmt.Sprintf("1:%d", downloadClass)); err != nil { + return err + } + if err := t.execAndLog("tc", "filter", "add", "dev", t.iface, "protocol", "ip", "parent", "1:0", "prio", "1", "u32", "match", "ip", "src", cidr, "flowid", fmt.Sprintf("1:%d", uploadClass)); err != nil { + return err + } + return nil +} + +func (t *tcShaper) initializeInterface() error { + return t.execAndLog("tc", "qdisc", "add", "dev", t.iface, "root", "handle", "1:", "htb", "default", "30") +} + +func (t *tcShaper) Reset(cidr string) error { + class, handle, err := t.findCIDRClass(cidr) + if err != nil { + return err + } + if err := t.execAndLog("tc", "filter", "del", "dev", t.iface, "parent", "1:", "proto", "ip", "prio", "1", "handle", handle, "u32"); err != nil { + return err + } + return t.execAndLog("tc", "class", "del", "dev", t.iface, "parent", "1:", "classid", class) +} diff --git a/pkg/util/bandwidth/linux_test.go b/pkg/util/bandwidth/linux_test.go new file mode 100644 index 00000000000..fb5985114d9 --- /dev/null +++ b/pkg/util/bandwidth/linux_test.go @@ -0,0 +1,380 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import ( + "errors" + "testing" + + "k8s.io/kubernetes/pkg/api/resource" + "k8s.io/kubernetes/pkg/util/exec" +) + +var tcClassOutput = `class htb 1:1 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:2 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:3 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:4 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +` + +var tcClassOutput2 = `class htb 1:1 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:2 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:3 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:4 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +class htb 1:5 root prio 0 rate 10000bit ceil 10000bit burst 1600b cburst 1600b +` + +func TestNextClassID(t *testing.T) { + tests := []struct { + output string + expectErr bool + expected int + err error + }{ + { + output: tcClassOutput, + expected: 5, + }, + { + output: "\n", + expected: 1, + }, + { + expected: -1, + expectErr: true, + err: errors.New("test error"), + }, + } + for _, test := range tests { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte(test.output), test.err }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { + return exec.InitFakeCmd(&fcmd, cmd, args...) + }, + }, + } + shaper := &tcShaper{e: &fexec} + class, err := shaper.nextClassID() + if test.expectErr { + if err == nil { + t.Errorf("unexpected non-error") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if class != test.expected { + t.Errorf("expected: %d, found %d", test.expected, class) + } + } + } +} + +func TestHexCIDR(t *testing.T) { + tests := []struct { + input string + output string + expectErr bool + }{ + { + input: "1.2.3.4/16", + output: "01020000/ffff0000", + }, + { + input: "172.17.0.2/32", + output: "ac110002/ffffffff", + }, + { + input: "foo", + expectErr: true, + }, + } + for _, test := range tests { + output, err := hexCIDR(test.input) + if test.expectErr { + if err == nil { + t.Error("unexpected non-error") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if output != test.output { + t.Errorf("expected: %s, saw: %s", test.output, output) + } + } + } +} + +var tcFilterOutput = `filter parent 1: protocol ip pref 1 u32 +filter parent 1: protocol ip pref 1 u32 fh 800: ht divisor 1 +filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1 + match ac110002/ffffffff at 16 +filter parent 1: protocol ip pref 1 u32 fh 800::801 order 2049 key ht 800 bkt 0 flowid 1:2 + match 01020000/ffff0000 at 16 +` + +func TestFindCIDRClass(t *testing.T) { + tests := []struct { + cidr string + output string + expectErr bool + expectedClass string + expectedHandle string + err error + }{ + { + cidr: "172.17.0.2/32", + output: tcFilterOutput, + expectedClass: "1:1", + expectedHandle: "800::800", + }, + { + cidr: "1.2.3.4/16", + output: tcFilterOutput, + expectedClass: "1:2", + expectedHandle: "800::801", + }, + { + err: errors.New("test error"), + expectErr: true, + }, + } + for _, test := range tests { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte(test.output), test.err }, + }, + } + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { + return exec.InitFakeCmd(&fcmd, cmd, args...) + }, + }, + } + shaper := &tcShaper{e: &fexec} + class, handle, err := shaper.findCIDRClass(test.cidr) + if test.expectErr { + if err == nil { + t.Errorf("unexpected non-error") + } + } else { + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if class != test.expectedClass { + t.Errorf("expected: %s, found %s", test.expectedClass, class) + } + if handle != test.expectedHandle { + t.Errorf("expected: %s, found %s", test.expectedHandle, handle) + } + } + } +} + +func TestLimit(t *testing.T) { + tests := []struct { + cidr string + ingress *resource.Quantity + egress *resource.Quantity + expectErr bool + err error + }{ + { + cidr: "1.2.3.4/32", + ingress: resource.NewQuantity(10, resource.DecimalSI), + egress: resource.NewQuantity(20, resource.DecimalSI), + }, + { + err: errors.New("test error"), + ingress: resource.NewQuantity(10, resource.DecimalSI), + egress: resource.NewQuantity(20, resource.DecimalSI), + expectErr: true, + }, + } + + for _, test := range tests { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte(tcClassOutput), test.err }, + func() ([]byte, error) { return []byte{}, test.err }, + func() ([]byte, error) { return []byte(tcClassOutput2), test.err }, + func() ([]byte, error) { return []byte{}, test.err }, + func() ([]byte, error) { return []byte{}, test.err }, + func() ([]byte, error) { return []byte{}, test.err }, + }, + } + + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + iface := "cbr0" + shaper := &tcShaper{e: &fexec, iface: iface} + if err := shaper.Limit(test.cidr, *test.ingress, *test.egress); err != nil && !test.expectErr { + t.Errorf("unexpected error: %v", err) + return + } else if err == nil && test.expectErr { + t.Error("unexpected non-error") + return + } + // No more testing in the error case + if test.expectErr { + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("unexpected number of calls: %d, expected: 1", fcmd.CombinedOutputCalls) + } + return + } + + if fcmd.CombinedOutputCalls != 6 { + t.Errorf("unexpected number of calls: %d, expected: 6", fcmd.CombinedOutputCalls) + } + + for ix := range fcmd.CombinedOutputLog { + output := fcmd.CombinedOutputLog[ix] + if output[0] != "tc" { + t.Errorf("unexpected command: %s, expected tc", output[0]) + } + if output[4] != iface { + t.Errorf("unexpected interface: %s, expected %s (%v)", output[4], iface, output) + } + if ix == 1 { + if output[11] != makeKBitString(*test.ingress) { + t.Errorf("unexpected ingress: %s, expected: %s", output[11], makeKBitString(*test.ingress)) + } + if output[8] != "1:5" { + t.Errorf("unexpected class: %s, expected: %s", output[8], "1:5") + } + } + if ix == 3 { + if output[11] != makeKBitString(*test.egress) { + t.Errorf("unexpected egress: %s, expected: %s", output[11], makeKBitString(*test.egress)) + } + if output[8] != "1:6" { + t.Errorf("unexpected class: %s, expected: %s", output[8], "1:6") + } + } + if ix == 4 { + if output[15] != test.cidr { + t.Errorf("unexpected cidr: %s, expected: %s", output[15], test.cidr) + } + if output[17] != "1:5" { + t.Errorf("unexpected class: %s, expected: %s", output[17], "1:5") + } + } + if ix == 5 { + if output[15] != test.cidr { + t.Errorf("unexpected cidr: %s, expected: %s", output[15], test.cidr) + } + if output[17] != "1:6" { + t.Errorf("unexpected class: %s, expected: %s", output[17], "1:5") + } + } + } + } +} + +func TestReset(t *testing.T) { + tests := []struct { + cidr string + err error + expectErr bool + expectedHandle string + expectedClass string + }{ + { + cidr: "1.2.3.4/16", + expectedHandle: "800::801", + expectedClass: "1:2", + }, + { + cidr: "172.17.0.2/32", + expectedHandle: "800::800", + expectedClass: "1:1", + }, + { + err: errors.New("test error"), + expectErr: true, + }, + } + for _, test := range tests { + fcmd := exec.FakeCmd{ + CombinedOutputScript: []exec.FakeCombinedOutputAction{ + func() ([]byte, error) { return []byte(tcFilterOutput), test.err }, + func() ([]byte, error) { return []byte{}, test.err }, + func() ([]byte, error) { return []byte{}, test.err }, + }, + } + + fexec := exec.FakeExec{ + CommandScript: []exec.FakeCommandAction{ + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) }, + }, + } + iface := "cbr0" + shaper := &tcShaper{e: &fexec, iface: iface} + + if err := shaper.Reset(test.cidr); err != nil && !test.expectErr { + t.Errorf("unexpected error: %v", err) + return + } else if test.expectErr && err == nil { + t.Error("unexpected non-error") + return + } + + // No more testing in the error case + if test.expectErr { + if fcmd.CombinedOutputCalls != 1 { + t.Errorf("unexpected number of calls: %d, expected: 1", fcmd.CombinedOutputCalls) + } + return + } + + if fcmd.CombinedOutputCalls != 3 { + t.Errorf("unexpected number of calls: %d, expected: 3", fcmd.CombinedOutputCalls) + } + + for ix := range fcmd.CombinedOutputLog { + output := fcmd.CombinedOutputLog[ix] + if output[0] != "tc" { + t.Errorf("unexpected command: %s, expected tc", output[0]) + } + if output[4] != iface { + t.Errorf("unexpected interface: %s, expected %s (%v)", output[4], iface, output) + } + if ix == 1 && output[12] != test.expectedHandle { + t.Errorf("unexpected handle: %s, expected: %s", output[12], test.expectedHandle) + } + if ix == 2 && output[8] != test.expectedClass { + t.Errorf("unexpected class: %s, expected: %s", output[8], test.expectedClass) + } + } + } +}