Merge pull request #23740 from dcbw/kubenet-shaper

Automatic merge from submit-queue

kubenet: hook pod bandwidth resources up to shaper

@bprashanth @thockin Last bit for shaping.
This commit is contained in:
k8s-merge-robot 2016-04-25 22:15:42 -07:00
commit a586177360
4 changed files with 81 additions and 39 deletions

View File

@ -1811,7 +1811,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont
if !kl.shapingEnabled() {
return nil
}
ingress, egress, err := extractBandwidthResources(pod)
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return err
}
@ -1937,7 +1937,7 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error {
possibleCIDRs := sets.String{}
for ix := range allPods {
pod := allPods[ix]
ingress, egress, err := extractBandwidthResources(pod)
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return err
}
@ -3619,38 +3619,3 @@ func (kl *Kubelet) shapingEnabled() bool {
func (kl *Kubelet) GetNodeConfig() cm.NodeConfig {
return kl.containerManager.GetNodeConfig()
}
var minRsrc = resource.MustParse("1k")
var maxRsrc = resource.MustParse("1P")
func validateBandwidthIsReasonable(rsrc *resource.Quantity) error {
if rsrc.Value() < minRsrc.Value() {
return fmt.Errorf("resource is unreasonably small (< 1kbit)")
}
if rsrc.Value() > maxRsrc.Value() {
return fmt.Errorf("resoruce is unreasonably large (> 1Pbit)")
}
return nil
}
func extractBandwidthResources(pod *api.Pod) (ingress, egress *resource.Quantity, err error) {
str, found := pod.Annotations["kubernetes.io/ingress-bandwidth"]
if found {
if ingress, err = resource.ParseQuantity(str); err != nil {
return nil, nil, err
}
if err := validateBandwidthIsReasonable(ingress); err != nil {
return nil, nil, err
}
}
str, found = pod.Annotations["kubernetes.io/egress-bandwidth"]
if found {
if egress, err = resource.ParseQuantity(str); err != nil {
return nil, nil, err
}
if err := validateBandwidthIsReasonable(egress); err != nil {
return nil, nil, err
}
}
return ingress, egress, nil
}

View File

@ -4395,7 +4395,7 @@ func TestExtractBandwidthResources(t *testing.T) {
},
}
for _, test := range tests {
ingress, egress, err := extractBandwidthResources(test.pod)
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(test.pod.Annotations)
if test.expectError {
if err == nil {
t.Errorf("unexpected non-error")

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/util/bandwidth"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utilsets "k8s.io/kubernetes/pkg/util/sets"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"
)
@ -195,7 +196,20 @@ func (plugin *kubenetNetworkPlugin) Name() string {
return KubenetPluginName
}
func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int {
return utilsets.NewInt(network.NET_PLUGIN_CAPABILITY_SHAPING)
}
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
pod, ok := plugin.host.GetPodByName(namespace, name)
if !ok {
return fmt.Errorf("pod %q cannot be found", name)
}
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
}
// Can't set up pods if we don't have a PodCIDR yet
if plugin.netConfig == nil {
return fmt.Errorf("Kubenet needs a PodCIDR to set up pods")
@ -235,7 +249,12 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
plugin.shaper.ReconcileInterface()
}
// TODO: get ingress/egress from Pod.Spec and add pod CIDR to shaper
if egress != nil || ingress != nil {
ipAddr, _, _ := net.ParseCIDR(plugin.podCIDRs[id])
if err = plugin.shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ipAddr.String()), egress, ingress); err != nil {
return fmt.Errorf("Failed to add pod to shaper: %v", err)
}
}
return nil
}

View File

@ -0,0 +1,58 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package bandwidth
import (
"fmt"
"k8s.io/kubernetes/pkg/api/resource"
)
var minRsrc = resource.MustParse("1k")
var maxRsrc = resource.MustParse("1P")
func validateBandwidthIsReasonable(rsrc *resource.Quantity) error {
if rsrc.Value() < minRsrc.Value() {
return fmt.Errorf("resource is unreasonably small (< 1kbit)")
}
if rsrc.Value() > maxRsrc.Value() {
return fmt.Errorf("resoruce is unreasonably large (> 1Pbit)")
}
return nil
}
func ExtractPodBandwidthResources(podAnnotations map[string]string) (ingress, egress *resource.Quantity, err error) {
str, found := podAnnotations["kubernetes.io/ingress-bandwidth"]
if found {
if ingress, err = resource.ParseQuantity(str); err != nil {
return nil, nil, err
}
if err := validateBandwidthIsReasonable(ingress); err != nil {
return nil, nil, err
}
}
str, found = podAnnotations["kubernetes.io/egress-bandwidth"]
if found {
if egress, err = resource.ParseQuantity(str); err != nil {
return nil, nil, err
}
if err := validateBandwidthIsReasonable(egress); err != nil {
return nil, nil, err
}
}
return ingress, egress, nil
}