integrate bandwidth shaping and the kubelet.

This commit is contained in:
Brendan Burns 2015-08-10 15:08:31 -07:00
parent de60651cc0
commit 9f3ef68ebc
6 changed files with 831 additions and 50 deletions

View File

@ -58,6 +58,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/bandwidth"
utilErrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/mount"
nodeutil "k8s.io/kubernetes/pkg/util/node"
@ -552,6 +553,9 @@ type Kubelet struct {
// DNS resolver configuration file. This can be used in conjunction with
// clusterDomain and clusterDNS.
resolverConfig string
// Optionally shape the bandwidth of a pod
shaper bandwidth.BandwidthShaper
}
// getRootDir returns the full path to the directory under which kubelet can
@ -1314,6 +1318,31 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
return err
}
ingress, egress, err := extractBandwidthResources(pod)
if err != nil {
return err
}
if egress != nil || ingress != nil {
if pod.Spec.HostNetwork {
kl.recorder.Event(pod, "host network not supported", "Bandwidth shaping is not currently supported on the host network")
} else if kl.shaper != nil {
status, found := kl.statusManager.GetPodStatus(pod.UID)
if !found {
statusPtr, err := kl.containerRuntime.GetPodStatus(pod)
if err != nil {
glog.Errorf("Error getting pod for bandwidth shaping")
return err
}
status = *statusPtr
}
if len(status.PodIP) > 0 {
err = kl.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", status.PodIP), egress, ingress)
}
} else {
kl.recorder.Event(pod, "nil shaper", "Pod requests bandwidth shaping, but the shaper is undefined")
}
}
if isStaticPod(pod) {
if mirrorPod != nil && !kl.podManager.IsMirrorPodOf(mirrorPod, pod) {
// The mirror pod is semantically different from the static pod. Remove
@ -1391,6 +1420,48 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod, runningPods []*kubeco
return utilErrors.NewAggregate(errlist)
}
func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error {
if kl.shaper == nil {
return nil
}
currentCIDRs, err := kl.shaper.GetCIDRs()
if err != nil {
return err
}
possibleCIDRs := util.StringSet{}
for ix := range allPods {
pod := allPods[ix]
ingress, egress, err := extractBandwidthResources(pod)
if err != nil {
return err
}
if ingress == nil && egress == nil {
glog.V(8).Infof("Not a bandwidth limited container...")
continue
}
status, found := kl.statusManager.GetPodStatus(pod.UID)
if !found {
statusPtr, err := kl.containerRuntime.GetPodStatus(pod)
if err != nil {
return err
}
status = *statusPtr
}
if status.Phase == api.PodRunning {
possibleCIDRs.Insert(fmt.Sprintf("%s/32", status.PodIP))
}
}
for _, cidr := range currentCIDRs {
if !possibleCIDRs.Has(cidr) {
glog.V(2).Infof("Removing CIDR: %s (%v)", cidr, possibleCIDRs)
if err := kl.shaper.Reset(cidr); err != nil {
return err
}
}
}
return nil
}
// Compares the map of current volumes to the map of desired volumes.
// If an active volume does not have a respective desired volume, clean it up.
func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
@ -1623,6 +1694,11 @@ func (kl *Kubelet) HandlePodCleanups() error {
glog.Errorf("Failed to cleanup terminated pods: %v", err)
}
// Clear out any old bandwith rules
if err = kl.cleanupBandwidthLimits(allPods); err != nil {
return err
}
kl.backOff.GC()
return err
}
@ -2071,7 +2147,11 @@ func (kl *Kubelet) reconcileCBR0(podCIDR string) error {
if err := ensureCbr0(cidr); err != nil {
return err
}
return nil
if kl.shaper == nil {
glog.V(5).Info("Shaper is nil, creating")
kl.shaper = bandwidth.NewTCShaper("cbr0")
}
return kl.shaper.ReconcileInterface()
}
// updateNodeStatus updates node status to master with retries.
@ -2642,3 +2722,38 @@ func (kl *Kubelet) ListenAndServeReadOnly(address net.IP, port uint) {
func (kl *Kubelet) GetRuntime() kubecontainer.Runtime {
return kl.containerRuntime
}
var minRsrc = resource.MustParse("1k")
var maxRsrc = resource.MustParse("1P")
func validateBandwidthIsReasonable(rsrc *resource.Quantity) error {
if rsrc.Value() < minRsrc.Value() {
return fmt.Errorf("resource is unreasonably small (< 1kbit)")
}
if rsrc.Value() > maxRsrc.Value() {
return fmt.Errorf("resoruce is unreasonably large (> 1Pbit)")
}
return nil
}
func extractBandwidthResources(pod *api.Pod) (ingress, egress *resource.Quantity, err error) {
str, found := pod.Annotations["kubernetes.io/ingress-bandwidth"]
if found {
if ingress, err = resource.ParseQuantity(str); err != nil {
return nil, nil, err
}
if err := validateBandwidthIsReasonable(ingress); err != nil {
return nil, nil, err
}
}
str, found = pod.Annotations["kubernetes.io/egress-bandwidth"]
if found {
if egress, err = resource.ParseQuantity(str); err != nil {
return nil, nil, err
}
if err := validateBandwidthIsReasonable(egress); err != nil {
return nil, nil, err
}
}
return ingress, egress, nil
}

View File

@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/bandwidth"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/volume"
_ "k8s.io/kubernetes/pkg/volume/host_path"
@ -3387,3 +3388,247 @@ func TestDoesNotDeletePodDirsIfContainerIsRunning(t *testing.T) {
testKubelet.fakeRuntime.PodList = []*kubecontainer.Pod{}
syncAndVerifyPodDir(t, testKubelet, pods, []*api.Pod{apiPod}, false)
}
func TestCleanupBandwidthLimits(t *testing.T) {
tests := []struct {
status *api.PodStatus
pods []*api.Pod
inputCIDRs []string
expectResetCIDRs []string
cacheStatus bool
expectedCalls []string
name string
}{
{
status: &api.PodStatus{
PodIP: "1.2.3.4",
Phase: api.PodRunning,
},
pods: []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Annotations: map[string]string{
"kubernetes.io/ingress-bandwidth": "10M",
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
},
},
},
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
expectResetCIDRs: []string{"2.3.4.5/32", "5.6.7.8/32"},
expectedCalls: []string{"GetPodStatus"},
name: "pod running",
},
{
status: &api.PodStatus{
PodIP: "1.2.3.4",
Phase: api.PodRunning,
},
pods: []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Annotations: map[string]string{
"kubernetes.io/ingress-bandwidth": "10M",
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
},
},
},
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
expectResetCIDRs: []string{"2.3.4.5/32", "5.6.7.8/32"},
expectedCalls: []string{},
cacheStatus: true,
name: "pod running with cache",
},
{
status: &api.PodStatus{
PodIP: "1.2.3.4",
Phase: api.PodFailed,
},
pods: []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Annotations: map[string]string{
"kubernetes.io/ingress-bandwidth": "10M",
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
},
},
},
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
expectResetCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
expectedCalls: []string{"GetPodStatus"},
name: "pod not running",
},
{
status: &api.PodStatus{
PodIP: "1.2.3.4",
Phase: api.PodFailed,
},
pods: []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Annotations: map[string]string{
"kubernetes.io/ingress-bandwidth": "10M",
},
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
},
},
},
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
expectResetCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
expectedCalls: []string{},
cacheStatus: true,
name: "pod not running with cache",
},
{
status: &api.PodStatus{
PodIP: "1.2.3.4",
Phase: api.PodRunning,
},
pods: []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
},
},
{
ObjectMeta: api.ObjectMeta{
Name: "bar",
},
},
},
inputCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
expectResetCIDRs: []string{"1.2.3.4/32", "2.3.4.5/32", "5.6.7.8/32"},
name: "no bandwidth limits",
},
}
for _, test := range tests {
shaper := &bandwidth.FakeShaper{
CIDRs: test.inputCIDRs,
}
testKube := newTestKubelet(t)
testKube.kubelet.shaper = shaper
testKube.fakeRuntime.PodStatus = *test.status
if test.cacheStatus {
for _, pod := range test.pods {
testKube.kubelet.statusManager.SetPodStatus(pod, *test.status)
}
}
err := testKube.kubelet.cleanupBandwidthLimits(test.pods)
if err != nil {
t.Errorf("unexpected error: %v (%s)", test.name)
}
if !reflect.DeepEqual(shaper.ResetCIDRs, test.expectResetCIDRs) {
t.Errorf("[%s]\nexpected: %v, saw: %v", test.name, test.expectResetCIDRs, shaper.ResetCIDRs)
}
if test.cacheStatus {
if len(testKube.fakeRuntime.CalledFunctions) != 0 {
t.Errorf("unexpected function calls: %v", testKube.fakeRuntime.CalledFunctions)
}
} else if !reflect.DeepEqual(testKube.fakeRuntime.CalledFunctions, test.expectedCalls) {
t.Errorf("[%s], expected %v, saw %v", test.name, test.expectedCalls, testKube.fakeRuntime.CalledFunctions)
}
}
}
func TestExtractBandwidthResources(t *testing.T) {
four, _ := resource.ParseQuantity("4M")
ten, _ := resource.ParseQuantity("10M")
twenty, _ := resource.ParseQuantity("20M")
tests := []struct {
pod *api.Pod
expectedIngress *resource.Quantity
expectedEgress *resource.Quantity
expectError bool
}{
{
pod: &api.Pod{},
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{
"kubernetes.io/ingress-bandwidth": "10M",
},
},
},
expectedIngress: ten,
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{
"kubernetes.io/egress-bandwidth": "10M",
},
},
},
expectedEgress: ten,
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{
"kubernetes.io/ingress-bandwidth": "4M",
"kubernetes.io/egress-bandwidth": "20M",
},
},
},
expectedIngress: four,
expectedEgress: twenty,
},
{
pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Annotations: map[string]string{
"kubernetes.io/ingress-bandwidth": "foo",
},
},
},
expectError: true,
},
}
for _, test := range tests {
ingress, egress, err := extractBandwidthResources(test.pod)
if test.expectError {
if err == nil {
t.Errorf("unexpected non-error")
}
continue
}
if err != nil {
t.Errorf("unexpected error: %v", err)
continue
}
if !reflect.DeepEqual(ingress, test.expectedIngress) {
t.Errorf("expected: %v, saw: %v", ingress, test.expectedIngress)
}
if !reflect.DeepEqual(egress, test.expectedEgress) {
t.Errorf("expected: %v, saw: %v", egress, test.expectedEgress)
}
}
}

View File

@ -0,0 +1,49 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bandwidth
import (
"errors"
"k8s.io/kubernetes/pkg/api/resource"
)
type FakeShaper struct {
CIDRs []string
ResetCIDRs []string
}
func (f *FakeShaper) Limit(cidr string, egress, ingress *resource.Quantity) error {
return errors.New("unimplemented")
}
func (f *FakeShaper) Reset(cidr string) error {
f.ResetCIDRs = append(f.ResetCIDRs, cidr)
return nil
}
func (f *FakeShaper) ReconcileInterface() error {
return errors.New("unimplemented")
}
func (f *FakeShaper) ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error {
return errors.New("unimplemented")
}
func (f *FakeShaper) GetCIDRs() ([]string, error) {
return f.CIDRs, nil
}

View File

@ -26,7 +26,13 @@ type BandwidthShaper interface {
// 'ingress' bandwidth limit applies to all packets on the interface whose destination matches 'cidr'
// Limits are aggregate limits for the CIDR, not per IP address. CIDRs must be unique, but can be overlapping, traffic
// that matches multiple CIDRs counts against all limits.
Limit(cidr string, egress, ingress resource.Quantity) error
Limit(cidr string, egress, ingress *resource.Quantity) error
// Remove a bandwidth limit for a particular CIDR on a particular network interface
Reset(cidr string) error
// Reconcile the interface managed by this shaper with the state on the ground.
ReconcileInterface() error
// Reconcile a CIDR managed by this shaper with the state on the ground
ReconcileCIDR(cidr string, egress, ingress *resource.Quantity) error
// GetCIDRs returns the set of CIDRs that are being managed by this shaper
GetCIDRs() ([]string, error)
}

View File

@ -47,7 +47,6 @@ func NewTCShaper(iface string) BandwidthShaper {
e: exec.New(),
iface: iface,
}
shaper.initializeInterface()
return shaper
}
@ -105,15 +104,34 @@ func hexCIDR(cidr string) (string, error) {
return hexIP + "/" + hexMask, nil
}
func (t *tcShaper) findCIDRClass(cidr string) (class, handle string, err error) {
// Convert a CIDR from hex representation to text, opposite of the above.
func asciiCIDR(cidr string) (string, error) {
parts := strings.Split(cidr, "/")
if len(parts) != 2 {
return "", fmt.Errorf("unexpected CIDR format: %s", cidr)
}
ipData, err := hex.DecodeString(parts[0])
if err != nil {
return "", err
}
ip := net.IP(ipData)
maskData, err := hex.DecodeString(parts[1])
mask := net.IPMask(maskData)
size, _ := mask.Size()
return fmt.Sprintf("%s/%d", ip.String(), size), nil
}
func (t *tcShaper) findCIDRClass(cidr string) (class, handle string, found bool, err error) {
data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput()
if err != nil {
return "", "", err
return "", "", false, err
}
hex, err := hexCIDR(cidr)
if err != nil {
return "", "", err
return "", "", false, err
}
spec := fmt.Sprintf("match %s", hex)
@ -133,15 +151,15 @@ func (t *tcShaper) findCIDRClass(cidr string) (class, handle string, err error)
// expected tc line:
// filter parent 1: protocol ip pref 1 u32 fh 800::800 order 2048 key ht 800 bkt 0 flowid 1:1
if len(parts) != 19 {
return "", "", fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(parts), parts)
return "", "", false, fmt.Errorf("unexpected output from tc: %s %d (%v)", filter, len(parts), parts)
}
return parts[18], parts[9], nil
return parts[18], parts[9], true, nil
}
}
return "", "", fmt.Errorf("Failed to find cidr: %s on interface: %s", cidr, t.iface)
return "", "", false, nil
}
func makeKBitString(rsrc resource.Quantity) string {
func makeKBitString(rsrc *resource.Quantity) string {
return fmt.Sprintf("%dkbit", (rsrc.Value() / 1000))
}
@ -150,27 +168,91 @@ func (t *tcShaper) makeNewClass(rate string) (int, error) {
if err != nil {
return -1, err
}
if err := t.execAndLog("tc", "class", "add", "dev", t.iface, "parent", "1:", "classid", fmt.Sprintf("1:%d", class), "htb", "rate", rate); err != nil {
if err := t.execAndLog("tc", "class", "add",
"dev", t.iface,
"parent", "1:",
"classid", fmt.Sprintf("1:%d", class),
"htb", "rate", rate); err != nil {
return -1, err
}
return class, nil
}
func (t *tcShaper) Limit(cidr string, upload, download resource.Quantity) (err error) {
func (t *tcShaper) Limit(cidr string, upload, download *resource.Quantity) (err error) {
var downloadClass, uploadClass int
if downloadClass, err = t.makeNewClass(makeKBitString(download)); err != nil {
return err
if download != nil {
if downloadClass, err = t.makeNewClass(makeKBitString(download)); err != nil {
return err
}
if err := t.execAndLog("tc", "filter", "add",
"dev", t.iface,
"protocol", "ip",
"parent", "1:0",
"prio", "1", "u32",
"match", "ip", "dst", cidr,
"flowid", fmt.Sprintf("1:%d", downloadClass)); err != nil {
return err
}
}
if uploadClass, err = t.makeNewClass(makeKBitString(upload)); err != nil {
return err
if upload != nil {
if uploadClass, err = t.makeNewClass(makeKBitString(upload)); err != nil {
return err
}
if err := t.execAndLog("tc", "filter", "add",
"dev", t.iface,
"protocol", "ip",
"parent", "1:0",
"prio", "1", "u32",
"match", "ip", "src", cidr,
"flowid", fmt.Sprintf("1:%d", uploadClass)); err != nil {
return err
}
}
return nil
}
if err := t.execAndLog("tc", "filter", "add", "dev", t.iface, "protocol", "ip", "parent", "1:0", "prio", "1", "u32", "match", "ip", "dst", cidr, "flowid", fmt.Sprintf("1:%d", downloadClass)); err != nil {
// tests to see if an interface exists, if it does, return true and the status line for the interface
// returns false, "", <err> if an error occurs.
func (t *tcShaper) interfaceExists() (bool, string, error) {
data, err := t.e.Command("tc", "qdisc", "show", "dev", t.iface).CombinedOutput()
if err != nil {
return false, "", err
}
value := strings.TrimSpace(string(data))
if len(value) == 0 {
return false, "", nil
}
return true, value, nil
}
func (t *tcShaper) ReconcileCIDR(cidr string, upload, download *resource.Quantity) error {
_, _, found, err := t.findCIDRClass(cidr)
if err != nil {
return err
}
if err := t.execAndLog("tc", "filter", "add", "dev", t.iface, "protocol", "ip", "parent", "1:0", "prio", "1", "u32", "match", "ip", "src", cidr, "flowid", fmt.Sprintf("1:%d", uploadClass)); err != nil {
if !found {
return t.Limit(cidr, upload, download)
}
// TODO: actually check bandwidth limits here
return nil
}
func (t *tcShaper) ReconcileInterface() error {
exists, output, err := t.interfaceExists()
if err != nil {
return err
}
if !exists {
glog.V(4).Info("Didn't find bandwidth interface, creating")
return t.initializeInterface()
}
fields := strings.Split(output, " ")
if len(fields) != 12 || fields[1] != "htb" || fields[2] != "1:" {
if err := t.deleteInterface(fields[2]); err != nil {
return err
}
return t.initializeInterface()
}
return nil
}
@ -179,12 +261,54 @@ func (t *tcShaper) initializeInterface() error {
}
func (t *tcShaper) Reset(cidr string) error {
class, handle, err := t.findCIDRClass(cidr)
class, handle, found, err := t.findCIDRClass(cidr)
if err != nil {
return err
}
if err := t.execAndLog("tc", "filter", "del", "dev", t.iface, "parent", "1:", "proto", "ip", "prio", "1", "handle", handle, "u32"); err != nil {
if !found {
return fmt.Errorf("Failed to find cidr: %s on interface: %s", cidr, t.iface)
}
if err := t.execAndLog("tc", "filter", "del",
"dev", t.iface,
"parent", "1:",
"proto", "ip",
"prio", "1",
"handle", handle, "u32"); err != nil {
return err
}
return t.execAndLog("tc", "class", "del", "dev", t.iface, "parent", "1:", "classid", class)
}
func (t *tcShaper) deleteInterface(class string) error {
return t.execAndLog("tc", "qdisc", "delete", "dev", t.iface, "root", "handle", class)
}
func (t *tcShaper) GetCIDRs() ([]string, error) {
data, err := t.e.Command("tc", "filter", "show", "dev", t.iface).CombinedOutput()
if err != nil {
return nil, err
}
result := []string{}
scanner := bufio.NewScanner(bytes.NewBuffer(data))
for scanner.Scan() {
line := strings.TrimSpace(scanner.Text())
if len(line) == 0 {
continue
}
if strings.Contains(line, "match") {
parts := strings.Split(line, " ")
// expected tc line:
// match <cidr> at <number>
if len(parts) != 4 {
return nil, fmt.Errorf("unexpected output: %v", parts)
}
cidr, err := asciiCIDR(parts[1])
if err != nil {
return nil, err
}
result = append(result, cidr)
}
}
return result, nil
}

View File

@ -18,6 +18,8 @@ package bandwidth
import (
"errors"
"reflect"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api/resource"
@ -95,7 +97,7 @@ func TestHexCIDR(t *testing.T) {
expectErr bool
}{
{
input: "1.2.3.4/16",
input: "1.2.0.0/16",
output: "01020000/ffff0000",
},
{
@ -120,6 +122,13 @@ func TestHexCIDR(t *testing.T) {
if output != test.output {
t.Errorf("expected: %s, saw: %s", test.output, output)
}
input, err := asciiCIDR(output)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if input != test.input {
t.Errorf("expected: %s, saw: %s", test.input, input)
}
}
}
}
@ -137,6 +146,7 @@ func TestFindCIDRClass(t *testing.T) {
cidr string
output string
expectErr bool
expectNotFound bool
expectedClass string
expectedHandle string
err error
@ -153,6 +163,11 @@ func TestFindCIDRClass(t *testing.T) {
expectedClass: "1:2",
expectedHandle: "800::801",
},
{
cidr: "2.2.3.4/16",
output: tcFilterOutput,
expectNotFound: true,
},
{
err: errors.New("test error"),
expectErr: true,
@ -172,7 +187,7 @@ func TestFindCIDRClass(t *testing.T) {
},
}
shaper := &tcShaper{e: &fexec}
class, handle, err := shaper.findCIDRClass(test.cidr)
class, handle, found, err := shaper.findCIDRClass(test.cidr)
if test.expectErr {
if err == nil {
t.Errorf("unexpected non-error")
@ -181,28 +196,78 @@ func TestFindCIDRClass(t *testing.T) {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if class != test.expectedClass {
t.Errorf("expected: %s, found %s", test.expectedClass, class)
}
if handle != test.expectedHandle {
t.Errorf("expected: %s, found %s", test.expectedHandle, handle)
if test.expectNotFound {
if found {
t.Errorf("unexpectedly found an interface: %s %s", class, handle)
}
} else {
if class != test.expectedClass {
t.Errorf("expected: %s, found %s", test.expectedClass, class)
}
if handle != test.expectedHandle {
t.Errorf("expected: %s, found %s", test.expectedHandle, handle)
}
}
}
}
}
func TestGetCIDRs(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte(tcFilterOutput), nil },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd {
return exec.InitFakeCmd(&fcmd, cmd, args...)
},
},
}
shaper := &tcShaper{e: &fexec}
cidrs, err := shaper.GetCIDRs()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
expectedCidrs := []string{"172.17.0.2/32", "1.2.0.0/16"}
if !reflect.DeepEqual(cidrs, expectedCidrs) {
t.Errorf("expected: %v, saw: %v", expectedCidrs, cidrs)
}
}
func TestLimit(t *testing.T) {
tests := []struct {
cidr string
ingress *resource.Quantity
egress *resource.Quantity
expectErr bool
err error
cidr string
ingress *resource.Quantity
egress *resource.Quantity
expectErr bool
expectedCalls int
err error
}{
{
cidr: "1.2.3.4/32",
ingress: resource.NewQuantity(10, resource.DecimalSI),
egress: resource.NewQuantity(20, resource.DecimalSI),
cidr: "1.2.3.4/32",
ingress: resource.NewQuantity(10, resource.DecimalSI),
egress: resource.NewQuantity(20, resource.DecimalSI),
expectedCalls: 6,
},
{
cidr: "1.2.3.4/32",
ingress: resource.NewQuantity(10, resource.DecimalSI),
egress: nil,
expectedCalls: 3,
},
{
cidr: "1.2.3.4/32",
ingress: nil,
egress: resource.NewQuantity(20, resource.DecimalSI),
expectedCalls: 3,
},
{
cidr: "1.2.3.4/32",
ingress: nil,
egress: nil,
expectedCalls: 0,
},
{
err: errors.New("test error"),
@ -217,8 +282,8 @@ func TestLimit(t *testing.T) {
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte(tcClassOutput), test.err },
func() ([]byte, error) { return []byte{}, test.err },
func() ([]byte, error) { return []byte(tcClassOutput2), test.err },
func() ([]byte, error) { return []byte{}, test.err },
func() ([]byte, error) { return []byte(tcClassOutput2), test.err },
func() ([]byte, error) { return []byte{}, test.err },
func() ([]byte, error) { return []byte{}, test.err },
},
@ -236,7 +301,7 @@ func TestLimit(t *testing.T) {
}
iface := "cbr0"
shaper := &tcShaper{e: &fexec, iface: iface}
if err := shaper.Limit(test.cidr, *test.ingress, *test.egress); err != nil && !test.expectErr {
if err := shaper.Limit(test.cidr, test.ingress, test.egress); err != nil && !test.expectErr {
t.Errorf("unexpected error: %v", err)
return
} else if err == nil && test.expectErr {
@ -251,8 +316,8 @@ func TestLimit(t *testing.T) {
return
}
if fcmd.CombinedOutputCalls != 6 {
t.Errorf("unexpected number of calls: %d, expected: 6", fcmd.CombinedOutputCalls)
if fcmd.CombinedOutputCalls != test.expectedCalls {
t.Errorf("unexpected number of calls: %d, expected: %d", fcmd.CombinedOutputCalls, test.expectedCalls)
}
for ix := range fcmd.CombinedOutputLog {
@ -264,22 +329,20 @@ func TestLimit(t *testing.T) {
t.Errorf("unexpected interface: %s, expected %s (%v)", output[4], iface, output)
}
if ix == 1 {
if output[11] != makeKBitString(*test.ingress) {
t.Errorf("unexpected ingress: %s, expected: %s", output[11], makeKBitString(*test.ingress))
var expectedRate string
if test.ingress != nil {
expectedRate = makeKBitString(test.ingress)
} else {
expectedRate = makeKBitString(test.egress)
}
if output[11] != expectedRate {
t.Errorf("unexpected ingress: %s, expected: %s", output[11], expectedRate)
}
if output[8] != "1:5" {
t.Errorf("unexpected class: %s, expected: %s", output[8], "1:5")
}
}
if ix == 3 {
if output[11] != makeKBitString(*test.egress) {
t.Errorf("unexpected egress: %s, expected: %s", output[11], makeKBitString(*test.egress))
}
if output[8] != "1:6" {
t.Errorf("unexpected class: %s, expected: %s", output[8], "1:6")
}
}
if ix == 4 {
if ix == 2 {
if output[15] != test.cidr {
t.Errorf("unexpected cidr: %s, expected: %s", output[15], test.cidr)
}
@ -287,6 +350,14 @@ func TestLimit(t *testing.T) {
t.Errorf("unexpected class: %s, expected: %s", output[17], "1:5")
}
}
if ix == 4 {
if output[11] != makeKBitString(test.egress) {
t.Errorf("unexpected egress: %s, expected: %s", output[11], makeKBitString(test.egress))
}
if output[8] != "1:6" {
t.Errorf("unexpected class: %s, expected: %s", output[8], "1:6")
}
}
if ix == 5 {
if output[15] != test.cidr {
t.Errorf("unexpected cidr: %s, expected: %s", output[15], test.cidr)
@ -378,3 +449,174 @@ func TestReset(t *testing.T) {
}
}
}
var tcQdisc = "qdisc htb 1: root refcnt 2 r2q 10 default 30 direct_packets_stat 0\n"
func TestReconcileInterfaceExists(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte(tcQdisc), nil },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
iface := "cbr0"
shaper := &tcShaper{e: &fexec, iface: iface}
err := shaper.ReconcileInterface()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fcmd.CombinedOutputCalls != 1 {
t.Errorf("unexpected number of calls: %d", fcmd.CombinedOutputCalls)
}
output := fcmd.CombinedOutputLog[0]
if len(output) != 5 {
t.Errorf("unexpected command: %v", output)
}
if output[0] != "tc" {
t.Errorf("unexpected command: %s", output[0])
}
if output[4] != iface {
t.Errorf("unexpected interface: %s, expected %s", output[4], iface)
}
if output[2] != "show" {
t.Errorf("unexpected action: %s", output[2])
}
}
func TestReconcileInterfaceDoesntExist(t *testing.T) {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte("\n"), nil },
func() ([]byte, error) { return []byte("\n"), nil },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
iface := "cbr0"
shaper := &tcShaper{e: &fexec, iface: iface}
err := shaper.ReconcileInterface()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fcmd.CombinedOutputCalls != 2 {
t.Errorf("unexpected number of calls: %d", fcmd.CombinedOutputCalls)
}
for ix, output := range fcmd.CombinedOutputLog {
if output[0] != "tc" {
t.Errorf("unexpected command: %s", output[0])
}
if output[4] != iface {
t.Errorf("unexpected interface: %s, expected %s", output[4], iface)
}
if ix == 0 {
if len(output) != 5 {
t.Errorf("unexpected command: %v", output)
}
if output[2] != "show" {
t.Errorf("unexpected action: %s", output[2])
}
}
if ix == 1 {
if len(output) != 11 {
t.Errorf("unexpected command: %v", output)
}
if output[2] != "add" {
t.Errorf("unexpected action: %s", output[2])
}
if output[7] != "1:" {
t.Errorf("unexpected root class: %s", output[7])
}
if output[8] != "htb" {
t.Errorf("unexpected qdisc algo: %s", output[8])
}
}
}
}
var tcQdiscWrong = []string{
"qdisc htb 2: root refcnt 2 r2q 10 default 30 direct_packets_stat 0\n",
"qdisc foo 1: root refcnt 2 r2q 10 default 30 direct_packets_stat 0\n",
}
func TestReconcileInterfaceIsWrong(t *testing.T) {
for _, test := range tcQdiscWrong {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte(test), nil },
func() ([]byte, error) { return []byte("\n"), nil },
func() ([]byte, error) { return []byte("\n"), nil },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
func(cmd string, args ...string) exec.Cmd { return exec.InitFakeCmd(&fcmd, cmd, args...) },
},
}
iface := "cbr0"
shaper := &tcShaper{e: &fexec, iface: iface}
err := shaper.ReconcileInterface()
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if fcmd.CombinedOutputCalls != 3 {
t.Errorf("unexpected number of calls: %d", fcmd.CombinedOutputCalls)
}
for ix, output := range fcmd.CombinedOutputLog {
if output[0] != "tc" {
t.Errorf("unexpected command: %s", output[0])
}
if output[4] != iface {
t.Errorf("unexpected interface: %s, expected %s", output[4], iface)
}
if ix == 0 {
if len(output) != 5 {
t.Errorf("unexpected command: %v", output)
}
if output[2] != "show" {
t.Errorf("unexpected action: %s", output[2])
}
}
if ix == 1 {
if len(output) != 8 {
t.Errorf("unexpected command: %v", output)
}
if output[2] != "delete" {
t.Errorf("unexpected action: %s", output[2])
}
if output[7] != strings.Split(test, " ")[2] {
t.Errorf("unexpected class: %s, expected: %s", output[7], strings.Split(test, " ")[2])
}
}
if ix == 2 {
if len(output) != 11 {
t.Errorf("unexpected command: %v", output)
}
if output[7] != "1:" {
t.Errorf("unexpected root class: %s", output[7])
}
if output[8] != "htb" {
t.Errorf("unexpected qdisc algo: %s", output[8])
}
}
}
}
}