diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index cc15d2f0aca..0afc400bc3d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1811,7 +1811,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont if !kl.shapingEnabled() { return nil } - ingress, egress, err := extractBandwidthResources(pod) + ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) if err != nil { return err } @@ -1937,7 +1937,7 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error { possibleCIDRs := sets.String{} for ix := range allPods { pod := allPods[ix] - ingress, egress, err := extractBandwidthResources(pod) + ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) if err != nil { return err } @@ -3619,38 +3619,3 @@ func (kl *Kubelet) shapingEnabled() bool { func (kl *Kubelet) GetNodeConfig() cm.NodeConfig { return kl.containerManager.GetNodeConfig() } - -var minRsrc = resource.MustParse("1k") -var maxRsrc = resource.MustParse("1P") - -func validateBandwidthIsReasonable(rsrc *resource.Quantity) error { - if rsrc.Value() < minRsrc.Value() { - return fmt.Errorf("resource is unreasonably small (< 1kbit)") - } - if rsrc.Value() > maxRsrc.Value() { - return fmt.Errorf("resoruce is unreasonably large (> 1Pbit)") - } - return nil -} - -func extractBandwidthResources(pod *api.Pod) (ingress, egress *resource.Quantity, err error) { - str, found := pod.Annotations["kubernetes.io/ingress-bandwidth"] - if found { - if ingress, err = resource.ParseQuantity(str); err != nil { - return nil, nil, err - } - if err := validateBandwidthIsReasonable(ingress); err != nil { - return nil, nil, err - } - } - str, found = pod.Annotations["kubernetes.io/egress-bandwidth"] - if found { - if egress, err = resource.ParseQuantity(str); err != nil { - return nil, nil, err - } - if err := validateBandwidthIsReasonable(egress); err != nil { - return nil, nil, err - } - } - return ingress, egress, nil -} diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 0ffb0557a02..de92dd462c9 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -4395,7 +4395,7 @@ func TestExtractBandwidthResources(t *testing.T) { }, } for _, test := range tests { - ingress, egress, err := extractBandwidthResources(test.pod) + ingress, egress, err := bandwidth.ExtractPodBandwidthResources(test.pod.Annotations) if test.expectError { if err == nil { t.Errorf("unexpected non-error") diff --git a/pkg/kubelet/network/kubenet/kubenet_linux.go b/pkg/kubelet/network/kubenet/kubenet_linux.go index 65a73667d2b..ef804911774 100644 --- a/pkg/kubelet/network/kubenet/kubenet_linux.go +++ b/pkg/kubelet/network/kubenet/kubenet_linux.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/util/bandwidth" utilexec "k8s.io/kubernetes/pkg/util/exec" + utilsets "k8s.io/kubernetes/pkg/util/sets" utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" ) @@ -195,7 +196,20 @@ func (plugin *kubenetNetworkPlugin) Name() string { return KubenetPluginName } +func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int { + return utilsets.NewInt(network.NET_PLUGIN_CAPABILITY_SHAPING) +} + func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { + pod, ok := plugin.host.GetPodByName(namespace, name) + if !ok { + return fmt.Errorf("pod %q cannot be found", name) + } + ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations) + if err != nil { + return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) + } + // Can't set up pods if we don't have a PodCIDR yet if plugin.netConfig == nil { return fmt.Errorf("Kubenet needs a PodCIDR to set up pods") @@ -235,7 +249,12 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k plugin.shaper.ReconcileInterface() } - // TODO: get ingress/egress from Pod.Spec and add pod CIDR to shaper + if egress != nil || ingress != nil { + ipAddr, _, _ := net.ParseCIDR(plugin.podCIDRs[id]) + if err = plugin.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ipAddr.String()), egress, ingress); err != nil { + return fmt.Errorf("Failed to add pod to shaper: %v", err) + } + } return nil } diff --git a/pkg/util/bandwidth/utils.go b/pkg/util/bandwidth/utils.go new file mode 100644 index 00000000000..989096f9346 --- /dev/null +++ b/pkg/util/bandwidth/utils.go @@ -0,0 +1,58 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package bandwidth + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api/resource" +) + +var minRsrc = resource.MustParse("1k") +var maxRsrc = resource.MustParse("1P") + +func validateBandwidthIsReasonable(rsrc *resource.Quantity) error { + if rsrc.Value() < minRsrc.Value() { + return fmt.Errorf("resource is unreasonably small (< 1kbit)") + } + if rsrc.Value() > maxRsrc.Value() { + return fmt.Errorf("resoruce is unreasonably large (> 1Pbit)") + } + return nil +} + +func ExtractPodBandwidthResources(podAnnotations map[string]string) (ingress, egress *resource.Quantity, err error) { + str, found := podAnnotations["kubernetes.io/ingress-bandwidth"] + if found { + if ingress, err = resource.ParseQuantity(str); err != nil { + return nil, nil, err + } + if err := validateBandwidthIsReasonable(ingress); err != nil { + return nil, nil, err + } + } + str, found = podAnnotations["kubernetes.io/egress-bandwidth"] + if found { + if egress, err = resource.ParseQuantity(str); err != nil { + return nil, nil, err + } + if err := validateBandwidthIsReasonable(egress); err != nil { + return nil, nil, err + } + } + return ingress, egress, nil +}