Merge pull request #41588 from freehan/cri-traffic-shaping

Automatic merge from submit-queue (batch tested with PRs 41205, 42196, 42068, 41588, 41271)

[CRI] enable kubenet traffic shaping

ref: https://github.com/kubernetes/kubernetes/issues/37316

Another way to do this is to expose another interface in network host to allow network plugins to retrieve annotation. But that seems unnecessary and more complicated.
This commit is contained in:
Kubernetes Submit Queue 2017-02-27 21:09:52 -08:00 committed by GitHub
commit 9f9f570984
11 changed files with 35 additions and 32 deletions

View File

@ -103,7 +103,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str
// on the host as well, to satisfy parts of the pod spec that aren't // on the host as well, to satisfy parts of the pod spec that aren't
// recognized by the CNI standard yet. // recognized by the CNI standard yet.
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID) err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID, config.Annotations)
// TODO: Do we need to teardown on failure or can we rely on a StopPodSandbox call with the given ID? // TODO: Do we need to teardown on failure or can we rely on a StopPodSandbox call with the given ID?
return createResp.ID, err return createResp.ID, err
} }

View File

@ -2249,7 +2249,7 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon
setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod)) setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod))
result.AddSyncResult(setupNetworkResult) result.AddSyncResult(setupNetworkResult)
if !kubecontainer.IsHostNetworkPod(pod) { if !kubecontainer.IsHostNetworkPod(pod) {
if err := dm.network.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()); err != nil { if err := dm.network.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID(), pod.Annotations); err != nil {
setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, err.Error()) setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, err.Error())
glog.Error(err) glog.Error(err)

View File

@ -189,7 +189,7 @@ func (plugin *cniNetworkPlugin) Name() string {
return CNIPluginName return CNIPluginName
} }
func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
if err := plugin.checkInitialized(); err != nil { if err := plugin.checkInitialized(); err != nil {
return err return err
} }

View File

@ -215,7 +215,7 @@ func TestCNIPlugin(t *testing.T) {
} }
// Set up the pod // Set up the pod
err = plug.SetUpPod("podNamespace", "podName", containerID) err = plug.SetUpPod("podNamespace", "podName", containerID, map[string]string{})
if err != nil { if err != nil {
t.Errorf("Expected nil: %v", err) t.Errorf("Expected nil: %v", err)
} }

View File

@ -307,7 +307,7 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int {
// setup sets up networking through CNI using the given ns/name and sandbox ID. // setup sets up networking through CNI using the given ns/name and sandbox ID.
// TODO: Don't pass the pod to this method, it only needs it for bandwidth // TODO: Don't pass the pod to this method, it only needs it for bandwidth
// shaping and hostport management. // shaping and hostport management.
func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *v1.Pod) error { func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *v1.Pod, annotations map[string]string) error {
// Bring up container loopback interface // Bring up container loopback interface
if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil { if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil {
return err return err
@ -359,14 +359,10 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
plugin.podIPs[id] = ip4.String() plugin.podIPs[id] = ip4.String()
// The host can choose to not support "legacy" features. The remote // The first SetUpPod call creates the bridge; get a shaper for the sake of initialization
// shim doesn't support it (#35457), but the kubelet does. // TODO: replace with CNI traffic shaper plugin
if plugin.host.SupportsLegacyFeatures() {
// The first SetUpPod call creates the bridge; get a shaper for the sake of
// initialization
shaper := plugin.shaper() shaper := plugin.shaper()
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(annotations)
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil { if err != nil {
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err) return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
} }
@ -376,6 +372,9 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
} }
} }
// The host can choose to not support "legacy" features. The remote
// shim doesn't support it (#35457), but the kubelet does.
if plugin.host.SupportsLegacyFeatures() {
// Open any hostport the pod's containers want // Open any hostport the pod's containers want
activePodPortMappings, err := plugin.getPodPortMappings() activePodPortMappings, err := plugin.getPodPortMappings()
if err != nil { if err != nil {
@ -387,6 +386,7 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
return err return err
} }
} else { } else {
// TODO: replace with CNI port-forwarding plugin
portMappings, err := plugin.host.GetPodPortMappings(id.ID) portMappings, err := plugin.host.GetPodPortMappings(id.ID)
if err != nil { if err != nil {
return err return err
@ -406,7 +406,7 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
return nil return nil
} }
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
plugin.mu.Lock() plugin.mu.Lock()
defer plugin.mu.Unlock() defer plugin.mu.Unlock()
@ -425,7 +425,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
return fmt.Errorf("Kubenet cannot SetUpPod: %v", err) return fmt.Errorf("Kubenet cannot SetUpPod: %v", err)
} }
if err := plugin.setup(namespace, name, id, pod); err != nil { if err := plugin.setup(namespace, name, id, pod, annotations); err != nil {
// Make sure everything gets cleaned up on errors // Make sure everything gets cleaned up on errors
podIP, _ := plugin.podIPs[id] podIP, _ := plugin.podIPs[id]
if err := plugin.teardown(namespace, name, id, podIP); err != nil { if err := plugin.teardown(namespace, name, id, podIP); err != nil {

View File

@ -42,7 +42,7 @@ func (plugin *kubenetNetworkPlugin) Name() string {
return "kubenet" return "kubenet"
} }
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
return fmt.Errorf("Kubenet is not supported in this build") return fmt.Errorf("Kubenet is not supported in this build")
} }

View File

@ -70,7 +70,7 @@ type NetworkPlugin interface {
// the pod has been created but before the other containers of the // the pod has been created but before the other containers of the
// pod are launched. // pod are launched.
// TODO: rename podInfraContainerID to sandboxID // TODO: rename podInfraContainerID to sandboxID
SetUpPod(namespace string, name string, podInfraContainerID kubecontainer.ContainerID) error SetUpPod(namespace string, name string, podInfraContainerID kubecontainer.ContainerID, annotations map[string]string) error
// TearDownPod is the method called before a pod's infra container will be deleted // TearDownPod is the method called before a pod's infra container will be deleted
// TODO: rename podInfraContainerID to sandboxID // TODO: rename podInfraContainerID to sandboxID
@ -235,7 +235,7 @@ func (plugin *NoopNetworkPlugin) Capabilities() utilsets.Int {
return utilsets.NewInt() return utilsets.NewInt()
} }
func (plugin *NoopNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { func (plugin *NoopNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
return nil return nil
} }
@ -389,13 +389,13 @@ func (pm *PluginManager) GetPodNetworkStatus(podNamespace, podName string, id ku
return netStatus, nil return netStatus, nil
} }
func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID) error { func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID, annotations map[string]string) error {
fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
pm.podLock(fullPodName).Lock() pm.podLock(fullPodName).Lock()
defer pm.podUnlock(fullPodName) defer pm.podUnlock(fullPodName)
glog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName) glog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName)
if err := pm.plugin.SetUpPod(podNamespace, podName, id); err != nil { if err := pm.plugin.SetUpPod(podNamespace, podName, id, annotations); err != nil {
return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err) return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err)
} }

View File

@ -102,7 +102,7 @@ func (_mr *_MockNetworkPluginRecorder) Name() *gomock.Call {
return _mr.mock.ctrl.RecordCall(_mr.mock, "Name") return _mr.mock.ctrl.RecordCall(_mr.mock, "Name")
} }
func (_m *MockNetworkPlugin) SetUpPod(_param0 string, _param1 string, _param2 container.ContainerID) error { func (_m *MockNetworkPlugin) SetUpPod(_param0 string, _param1 string, _param2 container.ContainerID, annotations map[string]string) error {
ret := _m.ctrl.Call(_m, "SetUpPod", _param0, _param1, _param2) ret := _m.ctrl.Call(_m, "SetUpPod", _param0, _param1, _param2)
ret0, _ := ret[0].(error) ret0, _ := ret[0].(error)
return ret0 return ret0

View File

@ -77,7 +77,7 @@ func TestPluginManager(t *testing.T) {
// concurrently. // concurrently.
allCreatedWg.Wait() allCreatedWg.Wait()
if err := pm.SetUpPod("", name, id); err != nil { if err := pm.SetUpPod("", name, id, nil); err != nil {
t.Errorf("Failed to set up pod %q: %v", name, err) t.Errorf("Failed to set up pod %q: %v", name, err)
return return
} }
@ -128,7 +128,7 @@ func (p *hookableFakeNetworkPlugin) Capabilities() utilsets.Int {
return utilsets.NewInt() return utilsets.NewInt()
} }
func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID, annotations map[string]string) error {
if p.setupHook != nil { if p.setupHook != nil {
p.setupHook(namespace, name, id) p.setupHook(namespace, name, id)
} }
@ -179,7 +179,7 @@ func TestMultiPodParallelNetworkOps(t *testing.T) {
// Setup will block on the runner pod completing. If network // Setup will block on the runner pod completing. If network
// operations locking isn't correct (eg pod network operations // operations locking isn't correct (eg pod network operations
// block other pods) setUpPod() will never return. // block other pods) setUpPod() will never return.
if err := pm.SetUpPod("", podName, containerID); err != nil { if err := pm.SetUpPod("", podName, containerID, nil); err != nil {
t.Errorf("Failed to set up waiter pod: %v", err) t.Errorf("Failed to set up waiter pod: %v", err)
return return
} }
@ -199,7 +199,7 @@ func TestMultiPodParallelNetworkOps(t *testing.T) {
podName := "runner" podName := "runner"
containerID := kubecontainer.ContainerID{ID: podName} containerID := kubecontainer.ContainerID{ID: podName}
if err := pm.SetUpPod("", podName, containerID); err != nil { if err := pm.SetUpPod("", podName, containerID, nil); err != nil {
t.Errorf("Failed to set up runner pod: %v", err) t.Errorf("Failed to set up runner pod: %v", err)
return return
} }

View File

@ -1297,7 +1297,7 @@ func (r *Runtime) setupPodNetwork(pod *v1.Pod) (string, string, error) {
// Set up networking with the network plugin // Set up networking with the network plugin
containerID := kubecontainer.ContainerID{ID: string(pod.UID)} containerID := kubecontainer.ContainerID{ID: string(pod.UID)}
err = r.network.SetUpPod(pod.Namespace, pod.Name, containerID) err = r.network.SetUpPod(pod.Namespace, pod.Name, containerID, pod.Annotations)
if err != nil { if err != nil {
return "", "", err return "", "", err
} }

View File

@ -36,6 +36,9 @@ func validateBandwidthIsReasonable(rsrc *resource.Quantity) error {
} }
func ExtractPodBandwidthResources(podAnnotations map[string]string) (ingress, egress *resource.Quantity, err error) { func ExtractPodBandwidthResources(podAnnotations map[string]string) (ingress, egress *resource.Quantity, err error) {
if podAnnotations == nil {
return nil, nil, nil
}
str, found := podAnnotations["kubernetes.io/ingress-bandwidth"] str, found := podAnnotations["kubernetes.io/ingress-bandwidth"]
if found { if found {
ingressValue, err := resource.ParseQuantity(str) ingressValue, err := resource.ParseQuantity(str)