From 5633d7423a5227d1830f0e663dfd20e7dc6cd4e8 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 12 Dec 2016 18:44:03 -0600 Subject: [PATCH 1/7] kubelet: add network plugin manager with per-pod operation locking The PluginManager almost duplicates the network plugin interface, but not quite since the Init() function should be called by whatever actually finds and creates the network plugin instance. Only then does it get passed off to the PluginManager. The Manager synchronizes pod-specific network operations like setup, teardown, and pod network status. It passes through all other operations so that runtimes don't have to cache the network plugin directly, but can use the PluginManager as a wrapper. --- pkg/kubelet/network/BUILD | 13 -- pkg/kubelet/network/plugins.go | 121 +++++++++++ pkg/kubelet/network/plugins_test.go | 38 ---- pkg/kubelet/network/testing/BUILD | 15 ++ pkg/kubelet/network/testing/plugins_test.go | 218 ++++++++++++++++++++ 5 files changed, 354 insertions(+), 51 deletions(-) delete mode 100644 pkg/kubelet/network/plugins_test.go create mode 100644 pkg/kubelet/network/testing/plugins_test.go diff --git a/pkg/kubelet/network/BUILD b/pkg/kubelet/network/BUILD index 1e493f99dc4..36974c2601b 100644 --- a/pkg/kubelet/network/BUILD +++ b/pkg/kubelet/network/BUILD @@ -5,7 +5,6 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", - "go_test", ) go_library( @@ -31,17 +30,6 @@ go_library( ], ) -go_test( - name = "go_default_test", - srcs = ["plugins_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//pkg/apis/componentconfig:go_default_library", - "//pkg/kubelet/network/testing:go_default_library", - ], -) - filegroup( name = "package-srcs", srcs = glob(["**"]), @@ -57,7 +45,6 @@ filegroup( "//pkg/kubelet/network/hairpin:all-srcs", "//pkg/kubelet/network/hostport:all-srcs", "//pkg/kubelet/network/kubenet:all-srcs", - "//pkg/kubelet/network/mock_network:all-srcs", "//pkg/kubelet/network/testing:all-srcs", ], tags = ["automanaged"], diff --git a/pkg/kubelet/network/plugins.go b/pkg/kubelet/network/plugins.go index ed925562e1c..5b13ef86efb 100644 --- a/pkg/kubelet/network/plugins.go +++ b/pkg/kubelet/network/plugins.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "strings" + "sync" "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -293,3 +294,123 @@ type NoopPortMappingGetter struct{} func (*NoopPortMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) { return nil, nil } + +// The PluginManager wraps a kubelet network plugin and provides synchronization +// for a given pod's network operations. Each pod's setup/teardown/status operations +// are synchronized against each other, but network operations of other pods can +// proceed in parallel. +type PluginManager struct { + // Network plugin being wrapped + plugin NetworkPlugin + + // Pod list and lock + podsLock sync.Mutex + pods map[string]*podLock +} + +func NewPluginManager(plugin NetworkPlugin) *PluginManager { + return &PluginManager{ + plugin: plugin, + pods: make(map[string]*podLock), + } +} + +func (pm *PluginManager) PluginName() string { + return pm.plugin.Name() +} + +func (pm *PluginManager) Event(name string, details map[string]interface{}) { + pm.plugin.Event(name, details) +} + +func (pm *PluginManager) Status() error { + return pm.plugin.Status() +} + +type podLock struct { + // Count of in-flight operations for this pod; when this reaches zero + // the lock can be removed from the pod map + refcount uint + + // Lock to synchronize operations for this specific pod + mu sync.Mutex +} + +// Lock network operations for a specific pod. If that pod is not yet in +// the pod map, it will be added. The reference count for the pod will +// be increased. +func (pm *PluginManager) podLock(fullPodName string) *sync.Mutex { + pm.podsLock.Lock() + defer pm.podsLock.Unlock() + + lock, ok := pm.pods[fullPodName] + if !ok { + lock = &podLock{} + pm.pods[fullPodName] = lock + } + lock.refcount++ + return &lock.mu +} + +// Unlock network operations for a specific pod. The reference count for the +// pod will be decreased. If the reference count reaches zero, the pod will be +// removed from the pod map. +func (pm *PluginManager) podUnlock(fullPodName string) { + pm.podsLock.Lock() + defer pm.podsLock.Unlock() + + lock, ok := pm.pods[fullPodName] + if !ok { + glog.Warningf("Unbalanced pod lock unref for %s", fullPodName) + return + } else if lock.refcount == 0 { + // This should never ever happen, but handle it anyway + delete(pm.pods, fullPodName) + glog.Warningf("Pod lock for %s still in map with zero refcount", fullPodName) + return + } + lock.refcount-- + lock.mu.Unlock() + if lock.refcount == 0 { + delete(pm.pods, fullPodName) + } +} + +func (pm *PluginManager) GetPodNetworkStatus(podNamespace, podName string, id kubecontainer.ContainerID) (*PodNetworkStatus, error) { + fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) + pm.podLock(fullPodName).Lock() + defer pm.podUnlock(fullPodName) + + netStatus, err := pm.plugin.GetPodNetworkStatus(podNamespace, podName, id) + if err != nil { + return nil, fmt.Errorf("NetworkPlugin %s failed on the status hook for pod %q: %v", pm.plugin.Name(), fullPodName, err) + } + + return netStatus, nil +} + +func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID) error { + fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) + pm.podLock(fullPodName).Lock() + defer pm.podUnlock(fullPodName) + + glog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName) + if err := pm.plugin.SetUpPod(podNamespace, podName, id); err != nil { + return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err) + } + + return nil +} + +func (pm *PluginManager) TearDownPod(podNamespace, podName string, id kubecontainer.ContainerID) error { + fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) + pm.podLock(fullPodName).Lock() + defer pm.podUnlock(fullPodName) + + glog.V(3).Infof("Calling network plugin %s to tear down pod %q", pm.plugin.Name(), fullPodName) + if err := pm.plugin.TearDownPod(podNamespace, podName, id); err != nil { + return fmt.Errorf("NetworkPlugin %s failed to teardown pod %q network: %v", pm.plugin.Name(), fullPodName, err) + } + + return nil +} diff --git a/pkg/kubelet/network/plugins_test.go b/pkg/kubelet/network/plugins_test.go deleted file mode 100644 index bbb6afe51fb..00000000000 --- a/pkg/kubelet/network/plugins_test.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package network - -import ( - "testing" - - "k8s.io/kubernetes/pkg/apis/componentconfig" - nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" -) - -func TestSelectDefaultPlugin(t *testing.T) { - all_plugins := []NetworkPlugin{} - plug, err := InitNetworkPlugin(all_plugins, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8", UseDefaultMTU) - if err != nil { - t.Fatalf("Unexpected error in selecting default plugin: %v", err) - } - if plug == nil { - t.Fatalf("Failed to select the default plugin.") - } - if plug.Name() != DefaultPluginName { - t.Errorf("Failed to select the default plugin. Expected %s. Got %s", DefaultPluginName, plug.Name()) - } -} diff --git a/pkg/kubelet/network/testing/BUILD b/pkg/kubelet/network/testing/BUILD index b4082c77ddf..5a61b6d0ace 100644 --- a/pkg/kubelet/network/testing/BUILD +++ b/pkg/kubelet/network/testing/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -20,6 +21,20 @@ go_library( ], ) +go_test( + name = "go_default_test", + srcs = ["plugins_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/apis/componentconfig:go_default_library", + "//pkg/kubelet/container:go_default_library", + "//pkg/kubelet/network:go_default_library", + "//vendor:github.com/golang/mock/gomock", + "//vendor:k8s.io/apimachinery/pkg/util/sets", + ], +) + filegroup( name = "package-srcs", srcs = glob(["**"]), diff --git a/pkg/kubelet/network/testing/plugins_test.go b/pkg/kubelet/network/testing/plugins_test.go new file mode 100644 index 00000000000..96f46013ea4 --- /dev/null +++ b/pkg/kubelet/network/testing/plugins_test.go @@ -0,0 +1,218 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + "net" + "sync" + "testing" + + utilsets "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/apis/componentconfig" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/network" + + "github.com/golang/mock/gomock" +) + +func TestSelectDefaultPlugin(t *testing.T) { + all_plugins := []network.NetworkPlugin{} + plug, err := network.InitNetworkPlugin(all_plugins, "", NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8", network.UseDefaultMTU) + if err != nil { + t.Fatalf("Unexpected error in selecting default plugin: %v", err) + } + if plug == nil { + t.Fatalf("Failed to select the default plugin.") + } + if plug.Name() != network.DefaultPluginName { + t.Errorf("Failed to select the default plugin. Expected %s. Got %s", network.DefaultPluginName, plug.Name()) + } +} + +func TestPluginManager(t *testing.T) { + ctrl := gomock.NewController(t) + fnp := NewMockNetworkPlugin(ctrl) + defer fnp.Finish() + pm := network.NewPluginManager(fnp) + + fnp.EXPECT().Name().Return("someNetworkPlugin").AnyTimes() + + allCreatedWg := sync.WaitGroup{} + allCreatedWg.Add(1) + allDoneWg := sync.WaitGroup{} + + // 10 pods, 4 setup/status/teardown runs each. Ensure that network locking + // works and the pod map isn't concurrently accessed + for i := 0; i < 10; i++ { + podName := fmt.Sprintf("pod%d", i) + containerID := kubecontainer.ContainerID{ID: podName} + + fnp.EXPECT().SetUpPod("", podName, containerID).Return(nil).Times(4) + fnp.EXPECT().GetPodNetworkStatus("", podName, containerID).Return(&network.PodNetworkStatus{IP: net.ParseIP("1.2.3.4")}, nil).Times(4) + fnp.EXPECT().TearDownPod("", podName, containerID).Return(nil).Times(4) + + for x := 0; x < 4; x++ { + allDoneWg.Add(1) + go func(name string, id kubecontainer.ContainerID, num int) { + defer allDoneWg.Done() + + // Block all goroutines from running until all have + // been created and are ready. This ensures we + // have more pod network operations running + // concurrently. + allCreatedWg.Wait() + + if err := pm.SetUpPod("", name, id); err != nil { + t.Errorf("Failed to set up pod %q: %v", name, err) + return + } + + if _, err := pm.GetPodNetworkStatus("", name, id); err != nil { + t.Errorf("Failed to inspect pod %q: %v", name, err) + return + } + + if err := pm.TearDownPod("", name, id); err != nil { + t.Errorf("Failed to tear down pod %q: %v", name, err) + return + } + }(podName, containerID, x) + } + } + // Block all goroutines from running until all have been created and started + allCreatedWg.Done() + + // Wait for them all to finish + allDoneWg.Wait() +} + +type hookableFakeNetworkPluginSetupHook func(namespace, name string, id kubecontainer.ContainerID) + +type hookableFakeNetworkPlugin struct { + setupHook hookableFakeNetworkPluginSetupHook +} + +func newHookableFakeNetworkPlugin(setupHook hookableFakeNetworkPluginSetupHook) *hookableFakeNetworkPlugin { + return &hookableFakeNetworkPlugin{ + setupHook: setupHook, + } +} + +func (p *hookableFakeNetworkPlugin) Init(host network.Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error { + return nil +} + +func (p *hookableFakeNetworkPlugin) Event(name string, details map[string]interface{}) { +} + +func (p *hookableFakeNetworkPlugin) Name() string { + return "fakeplugin" +} + +func (p *hookableFakeNetworkPlugin) Capabilities() utilsets.Int { + return utilsets.NewInt() +} + +func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { + if p.setupHook != nil { + p.setupHook(namespace, name, id) + } + return nil +} + +func (p *hookableFakeNetworkPlugin) TearDownPod(string, string, kubecontainer.ContainerID) error { + return nil +} + +func (p *hookableFakeNetworkPlugin) GetPodNetworkStatus(string, string, kubecontainer.ContainerID) (*network.PodNetworkStatus, error) { + return &network.PodNetworkStatus{IP: net.ParseIP("10.1.2.3")}, nil +} + +func (p *hookableFakeNetworkPlugin) Status() error { + return nil +} + +// Ensure that one pod's network operations don't block another's. If the +// test is successful (eg, first pod doesn't block on second) the test +// will complete. If unsuccessful, it will hang and get killed. +func TestMultiPodParallelNetworkOps(t *testing.T) { + podWg := sync.WaitGroup{} + podWg.Add(1) + + // Can't do this with MockNetworkPlugin because the gomock controller + // has its own locks which don't allow the parallel network operation + // to proceed. + didWait := false + fakePlugin := newHookableFakeNetworkPlugin(func(podNamespace, podName string, id kubecontainer.ContainerID) { + if podName == "waiter" { + podWg.Wait() + didWait = true + } + }) + pm := network.NewPluginManager(fakePlugin) + + opsWg := sync.WaitGroup{} + + // Start the pod that will wait for the other to complete + opsWg.Add(1) + go func() { + defer opsWg.Done() + + podName := "waiter" + containerID := kubecontainer.ContainerID{ID: podName} + + // Setup will block on the runner pod completing. If network + // operations locking isn't correct (eg pod network operations + // block other pods) setUpPod() will never return. + if err := pm.SetUpPod("", podName, containerID); err != nil { + t.Errorf("Failed to set up waiter pod: %v", err) + return + } + + if err := pm.TearDownPod("", podName, containerID); err != nil { + t.Errorf("Failed to tear down waiter pod: %v", err) + return + } + }() + + opsWg.Add(1) + go func() { + defer opsWg.Done() + // Let other pod proceed + defer podWg.Done() + + podName := "runner" + containerID := kubecontainer.ContainerID{ID: podName} + + if err := pm.SetUpPod("", podName, containerID); err != nil { + t.Errorf("Failed to set up runner pod: %v", err) + return + } + + if err := pm.TearDownPod("", podName, containerID); err != nil { + t.Errorf("Failed to tear down runner pod: %v", err) + return + } + }() + + opsWg.Wait() + + if !didWait { + t.Errorf("waiter pod didn't wait for runner pod!") + } +} From 60525801c1ba9573f15d5dbb41570e01db2ae9b6 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 13 Dec 2016 13:54:08 -0600 Subject: [PATCH 2/7] kubelet/network: move mock network plugin to pkg/kubelet/network/testing --- pkg/kubelet/dockershim/BUILD | 2 +- pkg/kubelet/dockershim/docker_service_test.go | 6 ++-- pkg/kubelet/dockertools/BUILD | 1 - .../dockertools/docker_manager_linux_test.go | 5 +-- .../dockertools/docker_manager_test.go | 3 +- pkg/kubelet/network/mock_network/BUILD | 34 ------------------- pkg/kubelet/network/testing/BUILD | 9 ++++- .../mock_network_plugin.go} | 4 +-- pkg/kubelet/rkt/BUILD | 2 +- pkg/kubelet/rkt/rkt_test.go | 4 +-- 10 files changed, 21 insertions(+), 49 deletions(-) delete mode 100644 pkg/kubelet/network/mock_network/BUILD rename pkg/kubelet/network/{mock_network/network_plugins.go => testing/mock_network_plugin.go} (98%) diff --git a/pkg/kubelet/dockershim/BUILD b/pkg/kubelet/dockershim/BUILD index 5571efe085f..84d9298d55d 100644 --- a/pkg/kubelet/dockershim/BUILD +++ b/pkg/kubelet/dockershim/BUILD @@ -89,7 +89,7 @@ go_test( "//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/dockertools/securitycontext:go_default_library", "//pkg/kubelet/network:go_default_library", - "//pkg/kubelet/network/mock_network:go_default_library", + "//pkg/kubelet/network/testing:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/kubelet/util/cache:go_default_library", "//pkg/security/apparmor:go_default_library", diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index 3e6d8c8bd8f..8d57f0a2eb8 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -32,14 +32,14 @@ import ( containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/network/mock_network" + nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" "k8s.io/kubernetes/pkg/kubelet/util/cache" ) // newTestNetworkPlugin returns a mock plugin that implements network.NetworkPlugin -func newTestNetworkPlugin(t *testing.T) *mock_network.MockNetworkPlugin { +func newTestNetworkPlugin(t *testing.T) *nettest.MockNetworkPlugin { ctrl := gomock.NewController(t) - return mock_network.NewMockNetworkPlugin(ctrl) + return nettest.NewMockNetworkPlugin(ctrl) } func newTestDockerService() (*dockerService, *dockertools.FakeDockerClient, *clock.FakeClock) { diff --git a/pkg/kubelet/dockertools/BUILD b/pkg/kubelet/dockertools/BUILD index 3bf5402ad5c..397edb53051 100644 --- a/pkg/kubelet/dockertools/BUILD +++ b/pkg/kubelet/dockertools/BUILD @@ -112,7 +112,6 @@ go_test( "//pkg/kubelet/events:go_default_library", "//pkg/kubelet/images:go_default_library", "//pkg/kubelet/network:go_default_library", - "//pkg/kubelet/network/mock_network:go_default_library", "//pkg/kubelet/network/testing:go_default_library", "//pkg/kubelet/prober/results:go_default_library", "//pkg/kubelet/types:go_default_library", diff --git a/pkg/kubelet/dockertools/docker_manager_linux_test.go b/pkg/kubelet/dockertools/docker_manager_linux_test.go index 0c61496281c..7f739f614a1 100644 --- a/pkg/kubelet/dockertools/docker_manager_linux_test.go +++ b/pkg/kubelet/dockertools/docker_manager_linux_test.go @@ -33,7 +33,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/network/mock_network" + nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" "k8s.io/kubernetes/pkg/security/apparmor" utilstrings "k8s.io/kubernetes/pkg/util/strings" ) @@ -455,7 +455,8 @@ func TestGetPodStatusFromNetworkPlugin(t *testing.T) { for _, test := range cases { dm, fakeDocker := newTestDockerManager() ctrl := gomock.NewController(t) - fnp := mock_network.NewMockNetworkPlugin(ctrl) + defer ctrl.Finish() + fnp := nettest.NewMockNetworkPlugin(ctrl) dm.networkPlugin = fnp fakeDocker.SetFakeRunningContainers([]*FakeContainer{ diff --git a/pkg/kubelet/dockertools/docker_manager_test.go b/pkg/kubelet/dockertools/docker_manager_test.go index f3f484bf291..16a8d4ba45d 100644 --- a/pkg/kubelet/dockertools/docker_manager_test.go +++ b/pkg/kubelet/dockertools/docker_manager_test.go @@ -54,7 +54,6 @@ import ( containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/network" - "k8s.io/kubernetes/pkg/kubelet/network/mock_network" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/types" @@ -1878,7 +1877,7 @@ func TestSyncPodGetsPodIPFromNetworkPlugin(t *testing.T) { dm.podInfraContainerImage = "pod_infra_image" ctrl := gomock.NewController(t) defer ctrl.Finish() - fnp := mock_network.NewMockNetworkPlugin(ctrl) + fnp := nettest.NewMockNetworkPlugin(ctrl) dm.networkPlugin = fnp pod := makePod("foo", &v1.PodSpec{ diff --git a/pkg/kubelet/network/mock_network/BUILD b/pkg/kubelet/network/mock_network/BUILD deleted file mode 100644 index 8f06ba40be0..00000000000 --- a/pkg/kubelet/network/mock_network/BUILD +++ /dev/null @@ -1,34 +0,0 @@ -package(default_visibility = ["//visibility:public"]) - -licenses(["notice"]) - -load( - "@io_bazel_rules_go//go:def.bzl", - "go_library", -) - -go_library( - name = "go_default_library", - srcs = ["network_plugins.go"], - tags = ["automanaged"], - deps = [ - "//pkg/apis/componentconfig:go_default_library", - "//pkg/kubelet/container:go_default_library", - "//pkg/kubelet/network:go_default_library", - "//vendor:github.com/golang/mock/gomock", - "//vendor:k8s.io/apimachinery/pkg/util/sets", - ], -) - -filegroup( - name = "package-srcs", - srcs = glob(["**"]), - tags = ["automanaged"], - visibility = ["//visibility:private"], -) - -filegroup( - name = "all-srcs", - srcs = [":package-srcs"], - tags = ["automanaged"], -) diff --git a/pkg/kubelet/network/testing/BUILD b/pkg/kubelet/network/testing/BUILD index 5a61b6d0ace..30f594a6aee 100644 --- a/pkg/kubelet/network/testing/BUILD +++ b/pkg/kubelet/network/testing/BUILD @@ -10,14 +10,21 @@ load( go_library( name = "go_default_library", - srcs = ["fake_host.go"], + srcs = [ + "fake_host.go", + "mock_network_plugin.go", + ], tags = ["automanaged"], deps = [ "//pkg/api/v1:go_default_library", + "//pkg/apis/componentconfig:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container/testing:go_default_library", + "//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network/hostport:go_default_library", + "//vendor:github.com/golang/mock/gomock", + "//vendor:k8s.io/apimachinery/pkg/util/sets", ], ) diff --git a/pkg/kubelet/network/mock_network/network_plugins.go b/pkg/kubelet/network/testing/mock_network_plugin.go similarity index 98% rename from pkg/kubelet/network/mock_network/network_plugins.go rename to pkg/kubelet/network/testing/mock_network_plugin.go index fd7259d4a86..3eaa3fefe59 100644 --- a/pkg/kubelet/network/mock_network/network_plugins.go +++ b/pkg/kubelet/network/testing/mock_network_plugin.go @@ -14,11 +14,11 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Generated code, generated via: `mockgen k8s.io/kubernetes/pkg/kubelet/network NetworkPlugin > $GOPATH/src/k8s.io/kubernetes/pkg/kubelet/network/mock_network/network_plugins.go` +// Generated code, generated via: `mockgen k8s.io/kubernetes/pkg/kubelet/network NetworkPlugin > $GOPATH/src/k8s.io/kubernetes/pkg/kubelet/network/testing/mock_network_plugin.go` // Edited by hand for boilerplate and gofmt. // TODO, this should be autogenerated/autoupdated by scripts. -package mock_network +package testing import ( gomock "github.com/golang/mock/gomock" diff --git a/pkg/kubelet/rkt/BUILD b/pkg/kubelet/rkt/BUILD index b2ce390c7af..679bbeb3467 100644 --- a/pkg/kubelet/rkt/BUILD +++ b/pkg/kubelet/rkt/BUILD @@ -76,7 +76,7 @@ go_test( "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/network:go_default_library", "//pkg/kubelet/network/kubenet:go_default_library", - "//pkg/kubelet/network/mock_network:go_default_library", + "//pkg/kubelet/network/testing:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/util/exec:go_default_library", "//vendor:github.com/appc/spec/schema", diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index e712aaba0bd..8902e2a671e 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -43,7 +43,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network/kubenet" - "k8s.io/kubernetes/pkg/kubelet/network/mock_network" + nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" "k8s.io/kubernetes/pkg/kubelet/types" utilexec "k8s.io/kubernetes/pkg/util/exec" ) @@ -583,7 +583,7 @@ func TestGetPodStatus(t *testing.T) { defer ctrl.Finish() fr := newFakeRktInterface() fs := newFakeSystemd() - fnp := mock_network.NewMockNetworkPlugin(ctrl) + fnp := nettest.NewMockNetworkPlugin(ctrl) fos := &containertesting.FakeOS{} frh := &fakeRuntimeHelper{} r := &Runtime{ From aafd5c9ef69acec18cc29149e61213f45f406a10 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 13 Dec 2016 16:19:04 -0600 Subject: [PATCH 3/7] dockershim: use network PluginManager to synchronize pod network operations --- pkg/kubelet/dockershim/docker_sandbox.go | 12 ++++++------ pkg/kubelet/dockershim/docker_sandbox_test.go | 6 ++++-- pkg/kubelet/dockershim/docker_service.go | 10 +++++----- pkg/kubelet/dockershim/docker_service_test.go | 5 +++-- pkg/kubelet/dockershim/security_context.go | 17 ++++++++--------- 5 files changed, 26 insertions(+), 24 deletions(-) diff --git a/pkg/kubelet/dockershim/docker_sandbox.go b/pkg/kubelet/dockershim/docker_sandbox.go index 2a4ec8b7671..b1e0e1dfbb7 100644 --- a/pkg/kubelet/dockershim/docker_sandbox.go +++ b/pkg/kubelet/dockershim/docker_sandbox.go @@ -103,7 +103,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str // on the host as well, to satisfy parts of the pod spec that aren't // recognized by the CNI standard yet. cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID) - err = ds.networkPlugin.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID) + err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID) // TODO: Do we need to teardown on failure or can we rely on a StopPodSandbox call with the given ID? return createResp.ID, err } @@ -162,8 +162,8 @@ func (ds *dockerService) StopPodSandbox(podSandboxID string) error { errList := []error{} if needNetworkTearDown { cID := kubecontainer.BuildContainerID(runtimeName, podSandboxID) - if err := ds.networkPlugin.TearDownPod(namespace, name, cID); err != nil { - errList = append(errList, fmt.Errorf("failed to teardown sandbox %q for pod %s/%s: %v", podSandboxID, namespace, name, err)) + if err := ds.network.TearDownPod(namespace, name, cID); err != nil { + errList = append(errList, err) } } if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil { @@ -199,12 +199,12 @@ func (ds *dockerService) getIPFromPlugin(sandbox *dockertypes.ContainerJSON) (st } msg := fmt.Sprintf("Couldn't find network status for %s/%s through plugin", metadata.Namespace, metadata.Name) cID := kubecontainer.BuildContainerID(runtimeName, sandbox.ID) - networkStatus, err := ds.networkPlugin.GetPodNetworkStatus(metadata.Namespace, metadata.Name, cID) + networkStatus, err := ds.network.GetPodNetworkStatus(metadata.Namespace, metadata.Name, cID) if err != nil { // This might be a sandbox that somehow ended up without a default // interface (eth0). We can't distinguish this from a more serious // error, so callers should probably treat it as non-fatal. - return "", fmt.Errorf("%v: %v", msg, err) + return "", err } if networkStatus == nil { return "", fmt.Errorf("%v: invalid network status for", msg) @@ -408,7 +408,7 @@ func (ds *dockerService) applySandboxLinuxOptions(hc *dockercontainer.HostConfig } hc.CgroupParent = cgroupParent // Apply security context. - applySandboxSecurityContext(lc, createConfig.Config, hc, ds.networkPlugin, separator) + applySandboxSecurityContext(lc, createConfig.Config, hc, ds.network, separator) return nil } diff --git a/pkg/kubelet/dockershim/docker_sandbox_test.go b/pkg/kubelet/dockershim/docker_sandbox_test.go index 085a7b7e720..855dd7454fc 100644 --- a/pkg/kubelet/dockershim/docker_sandbox_test.go +++ b/pkg/kubelet/dockershim/docker_sandbox_test.go @@ -26,6 +26,7 @@ import ( runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/types" ) @@ -146,7 +147,7 @@ func TestSandboxStatus(t *testing.T) { func TestNetworkPluginInvocation(t *testing.T) { ds, _, _ := newTestDockerService() mockPlugin := newTestNetworkPlugin(t) - ds.networkPlugin = mockPlugin + ds.network = network.NewPluginManager(mockPlugin) defer mockPlugin.Finish() name := "foo0" @@ -158,6 +159,7 @@ func TestNetworkPluginInvocation(t *testing.T) { ) cID := kubecontainer.ContainerID{Type: runtimeName, ID: fmt.Sprintf("/%v", makeSandboxName(c))} + mockPlugin.EXPECT().Name().Return("mockNetworkPlugin").AnyTimes() setup := mockPlugin.EXPECT().SetUpPod(ns, name, cID) // StopPodSandbox performs a lookup on status to figure out if the sandbox // is running with hostnetworking, as all its given is the ID. @@ -175,7 +177,7 @@ func TestNetworkPluginInvocation(t *testing.T) { func TestHostNetworkPluginInvocation(t *testing.T) { ds, _, _ := newTestDockerService() mockPlugin := newTestNetworkPlugin(t) - ds.networkPlugin = mockPlugin + ds.network = network.NewPluginManager(mockPlugin) defer mockPlugin.Finish() name := "foo0" diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index 1bb60131889..db7e8568eae 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -178,7 +178,7 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str if err != nil { return nil, fmt.Errorf("didn't find compatible CNI plugin with given settings %+v: %v", pluginSettings, err) } - ds.networkPlugin = plug + ds.network = network.NewPluginManager(plug) glog.Infof("Docker cri networking managed by %v", plug.Name()) // NOTE: cgroup driver is only detectable in docker 1.11+ @@ -224,7 +224,7 @@ type dockerService struct { podSandboxImage string streamingRuntime *streamingRuntime streamingServer streaming.Server - networkPlugin network.NetworkPlugin + network *network.PluginManager containerManager cm.ContainerManager // cgroup driver used by Docker runtime. cgroupDriver string @@ -270,10 +270,10 @@ func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeCo return } glog.Infof("docker cri received runtime config %+v", runtimeConfig) - if ds.networkPlugin != nil && runtimeConfig.NetworkConfig.PodCidr != "" { + if ds.network != nil && runtimeConfig.NetworkConfig.PodCidr != "" { event := make(map[string]interface{}) event[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = runtimeConfig.NetworkConfig.PodCidr - ds.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event) + ds.network.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event) } return } @@ -339,7 +339,7 @@ func (ds *dockerService) Status() (*runtimeapi.RuntimeStatus, error) { runtimeReady.Reason = "DockerDaemonNotReady" runtimeReady.Message = fmt.Sprintf("docker: failed to get docker version: %v", err) } - if err := ds.networkPlugin.Status(); err != nil { + if err := ds.network.Status(); err != nil { networkReady.Status = false networkReady.Reason = "NetworkPluginNotReady" networkReady.Message = fmt.Sprintf("docker: network plugin is not ready: %v", err) diff --git a/pkg/kubelet/dockershim/docker_service_test.go b/pkg/kubelet/dockershim/docker_service_test.go index 8d57f0a2eb8..5bc92925d78 100644 --- a/pkg/kubelet/dockershim/docker_service_test.go +++ b/pkg/kubelet/dockershim/docker_service_test.go @@ -45,7 +45,8 @@ func newTestNetworkPlugin(t *testing.T) *nettest.MockNetworkPlugin { func newTestDockerService() (*dockerService, *dockertools.FakeDockerClient, *clock.FakeClock) { fakeClock := clock.NewFakeClock(time.Time{}) c := dockertools.NewFakeDockerClient().WithClock(fakeClock).WithVersion("1.11.2", "1.23") - return &dockerService{client: c, os: &containertest.FakeOS{}, networkPlugin: &network.NoopNetworkPlugin{}, + pm := network.NewPluginManager(&network.NoopNetworkPlugin{}) + return &dockerService{client: c, os: &containertest.FakeOS{}, network: pm, legacyCleanup: legacyCleanupFlag{done: 1}, checkpointHandler: NewTestPersistentCheckpointHandler()}, c, fakeClock } @@ -95,7 +96,7 @@ func TestStatus(t *testing.T) { // Should not report ready status is network plugin returns error. mockPlugin := newTestNetworkPlugin(t) - ds.networkPlugin = mockPlugin + ds.network = network.NewPluginManager(mockPlugin) defer mockPlugin.Finish() mockPlugin.EXPECT().Status().Return(errors.New("network error")) status, err = ds.Status() diff --git a/pkg/kubelet/dockershim/security_context.go b/pkg/kubelet/dockershim/security_context.go index 159a171a1b9..0b9f3392a5b 100644 --- a/pkg/kubelet/dockershim/security_context.go +++ b/pkg/kubelet/dockershim/security_context.go @@ -25,11 +25,11 @@ import ( "k8s.io/kubernetes/pkg/api/v1" runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" "k8s.io/kubernetes/pkg/kubelet/dockertools/securitycontext" - "k8s.io/kubernetes/pkg/kubelet/network" + knetwork "k8s.io/kubernetes/pkg/kubelet/network" ) // applySandboxSecurityContext updates docker sandbox options according to security context. -func applySandboxSecurityContext(lc *runtimeapi.LinuxPodSandboxConfig, config *dockercontainer.Config, hc *dockercontainer.HostConfig, networkPlugin network.NetworkPlugin, separator rune) { +func applySandboxSecurityContext(lc *runtimeapi.LinuxPodSandboxConfig, config *dockercontainer.Config, hc *dockercontainer.HostConfig, network *knetwork.PluginManager, separator rune) { if lc == nil { return } @@ -47,8 +47,7 @@ func applySandboxSecurityContext(lc *runtimeapi.LinuxPodSandboxConfig, config *d modifyContainerConfig(sc, config) modifyHostConfig(sc, hc, separator) - modifySandboxNamespaceOptions(sc.GetNamespaceOptions(), hc, networkPlugin) - + modifySandboxNamespaceOptions(sc.GetNamespaceOptions(), hc, network) } // applyContainerSecurityContext updates docker container options according to security context. @@ -109,9 +108,9 @@ func modifyHostConfig(sc *runtimeapi.LinuxContainerSecurityContext, hostConfig * } // modifySandboxNamespaceOptions apply namespace options for sandbox -func modifySandboxNamespaceOptions(nsOpts *runtimeapi.NamespaceOption, hostConfig *dockercontainer.HostConfig, networkPlugin network.NetworkPlugin) { +func modifySandboxNamespaceOptions(nsOpts *runtimeapi.NamespaceOption, hostConfig *dockercontainer.HostConfig, network *knetwork.PluginManager) { modifyCommonNamespaceOptions(nsOpts, hostConfig) - modifyHostNetworkOptionForSandbox(nsOpts.HostNetwork, networkPlugin, hostConfig) + modifyHostNetworkOptionForSandbox(nsOpts.HostNetwork, network, hostConfig) } // modifyContainerNamespaceOptions apply namespace options for container @@ -137,18 +136,18 @@ func modifyCommonNamespaceOptions(nsOpts *runtimeapi.NamespaceOption, hostConfig } // modifyHostNetworkOptionForSandbox applies NetworkMode/UTSMode to sandbox's dockercontainer.HostConfig. -func modifyHostNetworkOptionForSandbox(hostNetwork bool, networkPlugin network.NetworkPlugin, hc *dockercontainer.HostConfig) { +func modifyHostNetworkOptionForSandbox(hostNetwork bool, network *knetwork.PluginManager, hc *dockercontainer.HostConfig) { if hostNetwork { hc.NetworkMode = namespaceModeHost return } - if networkPlugin == nil { + if network == nil { hc.NetworkMode = "default" return } - switch networkPlugin.Name() { + switch network.PluginName() { case "cni": fallthrough case "kubenet": From 4c3cc67385aa543a5cf6263650b1040c91ec69b3 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 13 Dec 2016 17:00:34 -0600 Subject: [PATCH 4/7] rkt: use network PluginManager to synchronize pod network operations --- pkg/kubelet/rkt/rkt.go | 34 ++++++++++++++++------------------ pkg/kubelet/rkt/rkt_test.go | 6 ++++-- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index b06729cde52..9668c80be13 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -163,8 +163,8 @@ type Runtime struct { execer utilexec.Interface os kubecontainer.OSInterface - // Network plugin. - networkPlugin network.NetworkPlugin + // Network plugin manager. + network *network.PluginManager // If true, the "hairpin mode" flag is set on container interfaces. // A false value means the kubelet just backs off from setting it, @@ -266,7 +266,7 @@ func New( runtimeHelper: runtimeHelper, recorder: recorder, livenessManager: livenessManager, - networkPlugin: networkPlugin, + network: network.NewPluginManager(networkPlugin), execer: execer, touchPath: touchPath, nsenterPath: nsenterPath, @@ -946,7 +946,7 @@ func serviceFilePath(serviceName string) string { // The pod does not run in host network. And // The pod runs inside a netns created outside of rkt. func (r *Runtime) shouldCreateNetns(pod *v1.Pod) bool { - return !kubecontainer.IsHostNetworkPod(pod) && r.networkPlugin.Name() != network.DefaultPluginName + return !kubecontainer.IsHostNetworkPod(pod) && r.network.PluginName() != network.DefaultPluginName } // usesRktHostNetwork returns true if: @@ -1047,18 +1047,17 @@ func (r *Runtime) generateRunCommand(pod *v1.Pod, uuid, netnsName string) (strin } func (r *Runtime) cleanupPodNetwork(pod *v1.Pod) error { - glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.networkPlugin.Name(), format.Pod(pod)) + glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.network.PluginName(), format.Pod(pod)) // No-op if the pod is not running in a created netns. if !r.shouldCreateNetns(pod) { return nil } - var teardownErr error containerID := kubecontainer.ContainerID{ID: string(pod.UID)} - if err := r.networkPlugin.TearDownPod(pod.Namespace, pod.Name, containerID); err != nil { - teardownErr = fmt.Errorf("rkt: failed to tear down network for pod %s: %v", format.Pod(pod), err) - glog.Errorf("%v", teardownErr) + teardownErr := r.network.TearDownPod(pod.Namespace, pod.Name, containerID) + if teardownErr != nil { + glog.Error(teardownErr) } if _, err := r.execer.Command("ip", "netns", "del", makePodNetnsName(pod.UID)).Output(); err != nil { @@ -1265,7 +1264,7 @@ func netnsPathFromName(netnsName string) string { // // If the pod is running in host network or is running using the no-op plugin, then nothing will be done. func (r *Runtime) setupPodNetwork(pod *v1.Pod) (string, string, error) { - glog.V(3).Infof("Calling network plugin %s to set up pod for %s", r.networkPlugin.Name(), format.Pod(pod)) + glog.V(3).Infof("Calling network plugin %s to set up pod for %s", r.network.PluginName(), format.Pod(pod)) // No-op if the pod is not running in a created netns. if !r.shouldCreateNetns(pod) { @@ -1282,15 +1281,14 @@ func (r *Runtime) setupPodNetwork(pod *v1.Pod) (string, string, error) { } // Set up networking with the network plugin - glog.V(3).Infof("Calling network plugin %s to setup pod for %s", r.networkPlugin.Name(), format.Pod(pod)) containerID := kubecontainer.ContainerID{ID: string(pod.UID)} - err = r.networkPlugin.SetUpPod(pod.Namespace, pod.Name, containerID) + err = r.network.SetUpPod(pod.Namespace, pod.Name, containerID) if err != nil { - return "", "", fmt.Errorf("failed to set up pod network: %v", err) + return "", "", err } - status, err := r.networkPlugin.GetPodNetworkStatus(pod.Namespace, pod.Name, containerID) + status, err := r.network.GetPodNetworkStatus(pod.Namespace, pod.Name, containerID) if err != nil { - return "", "", fmt.Errorf("failed to get status of pod network: %v", err) + return "", "", err } if r.configureHairpinMode { @@ -2329,7 +2327,7 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube } // If we are running no-op network plugin, then get the pod IP from the rkt pod status. - if r.networkPlugin.Name() == network.DefaultPluginName { + if r.network.PluginName() == network.DefaultPluginName { if latestPod != nil { for _, n := range latestPod.Networks { if n.Name == defaultNetworkName { @@ -2340,9 +2338,9 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube } } else { containerID := kubecontainer.ContainerID{ID: string(uid)} - status, err := r.networkPlugin.GetPodNetworkStatus(namespace, name, containerID) + status, err := r.network.GetPodNetworkStatus(namespace, name, containerID) if err != nil { - glog.Warningf("rkt: Failed to get pod network status for pod (UID %q, name %q, namespace %q): %v", uid, name, namespace, err) + glog.Warningf("rkt: %v", err) } else if status != nil { // status can be nil when the pod is running on the host network, in which case the pod IP // will be populated by the upper layer. diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index 8902e2a671e..57a6e382ee7 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -591,7 +591,7 @@ func TestGetPodStatus(t *testing.T) { systemd: fs, runtimeHelper: frh, os: fos, - networkPlugin: fnp, + network: network.NewPluginManager(fnp), } ns := func(seconds int64) int64 { @@ -826,6 +826,8 @@ func TestGetPodStatus(t *testing.T) { } else { fnp.EXPECT().GetPodNetworkStatus("default", "guestbook", kubecontainer.ContainerID{ID: "42"}). Return(nil, fmt.Errorf("no such network")) + // Plugin name only requested again in error case + fnp.EXPECT().Name().Return(tt.networkPluginName) } } @@ -1388,7 +1390,7 @@ func TestGenerateRunCommand(t *testing.T) { for i, tt := range tests { testCaseHint := fmt.Sprintf("test case #%d", i) - rkt.networkPlugin = tt.networkPlugin + rkt.network = network.NewPluginManager(tt.networkPlugin) rkt.runtimeHelper = &fakeRuntimeHelper{tt.dnsServers, tt.dnsSearches, tt.hostName, "", tt.err} rkt.execer = &utilexec.FakeExec{CommandScript: []utilexec.FakeCommandAction{func(cmd string, args ...string) utilexec.Cmd { return utilexec.InitFakeCmd(&utilexec.FakeCmd{}, cmd, args...) From dc2fd511ab6668e5bdd0ebd595d9e87377f9ff58 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Tue, 6 Dec 2016 15:58:44 -0600 Subject: [PATCH 5/7] dockertools: use network PluginManager to synchronize pod network operations We need to tear down networking when garbage collecting containers too, and GC is run from a different goroutine in kubelet. We don't want container network operations running for the same pod concurrently. --- pkg/kubelet/dockertools/docker_manager.go | 48 +++++++++---------- .../dockertools/docker_manager_linux_test.go | 2 +- .../dockertools/docker_manager_test.go | 2 +- 3 files changed, 24 insertions(+), 28 deletions(-) diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 318293dd21e..42e8bc59856 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -59,7 +59,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" - "k8s.io/kubernetes/pkg/kubelet/network" + knetwork "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network/hairpin" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/qos" @@ -152,8 +152,8 @@ type DockerManager struct { // Directory of container logs. containerLogsDir string - // Network plugin. - networkPlugin network.NetworkPlugin + // Network plugin manager. + network *knetwork.PluginManager // Health check results. livenessManager proberesults.Manager @@ -226,7 +226,7 @@ func NewDockerManager( burst int, containerLogsDir string, osInterface kubecontainer.OSInterface, - networkPlugin network.NetworkPlugin, + networkPlugin knetwork.NetworkPlugin, runtimeHelper kubecontainer.RuntimeHelper, httpClient types.HttpGetter, execHandler ExecHandler, @@ -267,7 +267,7 @@ func NewDockerManager( dockerPuller: newDockerPuller(client), cgroupDriver: cgroupDriver, containerLogsDir: containerLogsDir, - networkPlugin: networkPlugin, + network: knetwork.NewPluginManager(networkPlugin), livenessManager: livenessManager, runtimeHelper: runtimeHelper, execHandler: execHandler, @@ -357,10 +357,10 @@ func (dm *DockerManager) determineContainerIP(podNamespace, podName string, cont isHostNetwork := networkMode == namespaceModeHost // For host networking or default network plugin, GetPodNetworkStatus doesn't work - if !isHostNetwork && dm.networkPlugin.Name() != network.DefaultPluginName { - netStatus, err := dm.networkPlugin.GetPodNetworkStatus(podNamespace, podName, kubecontainer.DockerID(container.ID).ContainerID()) + if !isHostNetwork && dm.network.PluginName() != knetwork.DefaultPluginName { + netStatus, err := dm.network.GetPodNetworkStatus(podNamespace, podName, kubecontainer.DockerID(container.ID).ContainerID()) if err != nil { - glog.Errorf("NetworkPlugin %s failed on the status hook for pod '%s' - %v", dm.networkPlugin.Name(), podName, err) + glog.Error(err) return result, err } else if netStatus != nil { result = netStatus.IP.String() @@ -1058,7 +1058,7 @@ func (dm *DockerManager) podInfraContainerChanged(pod *v1.Pod, podInfraContainer glog.V(4).Infof("host: %v, %v", pod.Spec.HostNetwork, networkMode) return true, nil } - } else if dm.networkPlugin.Name() != "cni" && dm.networkPlugin.Name() != "kubenet" { + } else if !dm.pluginDisablesDockerNetworking() { // Docker only exports ports from the pod infra container. Let's // collect all of the relevant ports and export them. for _, container := range pod.Spec.InitContainers { @@ -1091,6 +1091,10 @@ func getDockerNetworkMode(container *dockertypes.ContainerJSON) string { return "" } +func (dm *DockerManager) pluginDisablesDockerNetworking() bool { + return dm.network.PluginName() == "cni" || dm.network.PluginName() == "kubenet" +} + // newDockerVersion returns a semantically versioned docker version value func newDockerVersion(version string) (*utilversion.Version, error) { return utilversion.ParseSemantic(version) @@ -1508,11 +1512,9 @@ func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubeconta if getDockerNetworkMode(ins) != namespaceModeHost { teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace)) result.AddSyncResult(teardownNetworkResult) - glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", dm.networkPlugin.Name(), kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace)) - if err := dm.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, networkContainer.ID); err != nil { - message := fmt.Sprintf("Failed to teardown network for pod %q using network plugins %q: %v", runningPod.ID, dm.networkPlugin.Name(), err) - teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, message) - glog.Error(message) + if err := dm.network.TearDownPod(runningPod.Namespace, runningPod.Name, networkContainer.ID); err != nil { + teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, err.Error()) + glog.Error(err) } } killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, networkContainer.Name) @@ -1915,7 +1917,7 @@ func (dm *DockerManager) createPodInfraContainer(pod *v1.Pod) (kubecontainer.Doc if kubecontainer.IsHostNetworkPod(pod) { netNamespace = namespaceModeHost - } else if dm.networkPlugin.Name() == "cni" || dm.networkPlugin.Name() == "kubenet" { + } else if dm.pluginDisablesDockerNetworking() { netNamespace = "none" } else { // Docker only exports ports from the pod infra container. Let's @@ -2217,20 +2219,14 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod)) result.AddSyncResult(setupNetworkResult) if !kubecontainer.IsHostNetworkPod(pod) { - glog.V(3).Infof("Calling network plugin %s to setup pod for %s", dm.networkPlugin.Name(), format.Pod(pod)) - err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()) - if err != nil { - // TODO: (random-liu) There shouldn't be "Skipping pod" in sync result message - message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v; Skipping pod", format.Pod(pod), dm.networkPlugin.Name(), err) - setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message) - glog.Error(message) + if err := dm.network.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()); err != nil { + setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, err.Error()) + glog.Error(err) // Delete infra container killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, PodInfraContainerName) result.AddSyncResult(killContainerResult) - if delErr := dm.KillContainerInPod(kubecontainer.ContainerID{ - ID: string(podInfraContainerID), - Type: "docker"}, nil, pod, message, nil); delErr != nil { + if delErr := dm.KillContainerInPod(podInfraContainerID.ContainerID(), nil, pod, err.Error(), nil); delErr != nil { killContainerResult.Fail(kubecontainer.ErrKillContainer, delErr.Error()) glog.Warningf("Clear infra container failed for pod %q: %v", format.Pod(pod), delErr) } @@ -2246,7 +2242,7 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon } if dm.configureHairpinMode { - if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, network.DefaultInterfaceName); err != nil { + if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, knetwork.DefaultInterfaceName); err != nil { glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err) } } diff --git a/pkg/kubelet/dockertools/docker_manager_linux_test.go b/pkg/kubelet/dockertools/docker_manager_linux_test.go index 7f739f614a1..b749d8f955f 100644 --- a/pkg/kubelet/dockertools/docker_manager_linux_test.go +++ b/pkg/kubelet/dockertools/docker_manager_linux_test.go @@ -457,7 +457,7 @@ func TestGetPodStatusFromNetworkPlugin(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() fnp := nettest.NewMockNetworkPlugin(ctrl) - dm.networkPlugin = fnp + dm.network = network.NewPluginManager(fnp) fakeDocker.SetFakeRunningContainers([]*FakeContainer{ { diff --git a/pkg/kubelet/dockertools/docker_manager_test.go b/pkg/kubelet/dockertools/docker_manager_test.go index 16a8d4ba45d..41b3386532d 100644 --- a/pkg/kubelet/dockertools/docker_manager_test.go +++ b/pkg/kubelet/dockertools/docker_manager_test.go @@ -1878,7 +1878,7 @@ func TestSyncPodGetsPodIPFromNetworkPlugin(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() fnp := nettest.NewMockNetworkPlugin(ctrl) - dm.networkPlugin = fnp + dm.network = network.NewPluginManager(fnp) pod := makePod("foo", &v1.PodSpec{ Containers: []v1.Container{ From 4d7d7faa81f04c2df0161aaa8487f17ba66819d1 Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Thu, 17 Nov 2016 16:06:44 -0600 Subject: [PATCH 6/7] dockertools: clean up networking when garbage-collecting pods The docker runtime doesn't tear down networking when GC-ing pods. rkt already does so make docker do it too. To ensure this happens, networking is always torn down for the container even if the container itself is not deleted. This prevents IPAM from leaking when the pod gets killed for some reason outside kubelet (like docker restart) or when pods are killed while kubelet isn't running. Fixes: https://github.com/kubernetes/kubernetes/issues/14940 Related: https://github.com/kubernetes/kubernetes/pull/35572 --- pkg/kubelet/dockertools/container_gc.go | 123 +++++++++++++----- pkg/kubelet/dockertools/container_gc_test.go | 66 ++++++++-- pkg/kubelet/dockertools/docker_manager.go | 6 +- .../dockertools/docker_manager_linux.go | 9 +- .../dockertools/docker_manager_unsupported.go | 6 +- .../dockertools/docker_manager_windows.go | 9 +- 6 files changed, 169 insertions(+), 50 deletions(-) diff --git a/pkg/kubelet/dockertools/container_gc.go b/pkg/kubelet/dockertools/container_gc.go index 65adddf378c..8d902a7be4b 100644 --- a/pkg/kubelet/dockertools/container_gc.go +++ b/pkg/kubelet/dockertools/container_gc.go @@ -28,18 +28,21 @@ import ( "github.com/golang/glog" "k8s.io/apimachinery/pkg/types" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + knetwork "k8s.io/kubernetes/pkg/kubelet/network" ) type containerGC struct { client DockerInterface podGetter podGetter + network *knetwork.PluginManager containerLogsDir string } -func NewContainerGC(client DockerInterface, podGetter podGetter, containerLogsDir string) *containerGC { +func NewContainerGC(client DockerInterface, podGetter podGetter, network *knetwork.PluginManager, containerLogsDir string) *containerGC { return &containerGC{ client: client, podGetter: podGetter, + network: network, containerLogsDir: containerLogsDir, } } @@ -50,7 +53,7 @@ type containerGCInfo struct { id string // Docker name of the container. - name string + dockerName string // Creation time for the container. createTime time.Time @@ -59,8 +62,14 @@ type containerGCInfo struct { // This comes from dockertools.ParseDockerName(...) podNameWithNamespace string + // Kubernetes pod UID + podUID types.UID + // Container name in pod containerName string + + // Container network mode + isHostNetwork bool } // Containers are considered for eviction as units of (UID, container name) pair. @@ -111,22 +120,45 @@ func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int // Remove from oldest to newest (last to first). numToKeep := len(containers) - toRemove for i := numToKeep; i < len(containers); i++ { - cgc.removeContainer(containers[i].id, containers[i].podNameWithNamespace, containers[i].containerName) + cgc.removeContainer(containers[i]) } // Assume we removed the containers so that we're not too aggressive. return containers[:numToKeep] } +// Returns a full GC info structure on success, or a partial one on failure +func newContainerGCInfo(id string, inspectResult *dockertypes.ContainerJSON, created time.Time) (containerGCInfo, error) { + containerName, _, err := ParseDockerName(inspectResult.Name) + if err != nil { + return containerGCInfo{ + id: id, + dockerName: inspectResult.Name, + }, fmt.Errorf("failed to parse docker name %q: %v", inspectResult.Name, err) + } + + networkMode := getDockerNetworkMode(inspectResult) + return containerGCInfo{ + id: id, + dockerName: inspectResult.Name, + podNameWithNamespace: containerName.PodFullName, + podUID: containerName.PodUID, + containerName: containerName.ContainerName, + createTime: created, + isHostNetwork: networkMode == namespaceModeHost, + }, nil +} + // Get all containers that are evictable. Evictable containers are: not running // and created more than MinAge ago. -func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, []containerGCInfo, error) { +func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, []containerGCInfo, []containerGCInfo, error) { containers, err := GetKubeletDockerContainers(cgc.client, true) if err != nil { - return containersByEvictUnit{}, []containerGCInfo{}, err + return containersByEvictUnit{}, []containerGCInfo{}, []containerGCInfo{}, err } unidentifiedContainers := make([]containerGCInfo, 0) + netContainers := make([]containerGCInfo, 0) evictUnits := make(containersByEvictUnit) newestGCTime := time.Now().Add(-minAge) for _, container := range containers { @@ -147,23 +179,19 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE continue } - containerInfo := containerGCInfo{ - id: container.ID, - name: container.Names[0], - createTime: created, - } - - containerName, _, err := ParseDockerName(container.Names[0]) - + containerInfo, err := newContainerGCInfo(container.ID, data, created) if err != nil { unidentifiedContainers = append(unidentifiedContainers, containerInfo) } else { - key := evictUnit{ - uid: containerName.PodUID, - name: containerName.ContainerName, + // Track net containers for special cleanup + if containerIsNetworked(containerInfo.containerName) { + netContainers = append(netContainers, containerInfo) + } + + key := evictUnit{ + uid: containerInfo.podUID, + name: containerInfo.containerName, } - containerInfo.podNameWithNamespace = containerName.PodFullName - containerInfo.containerName = containerName.ContainerName evictUnits[key] = append(evictUnits[key], containerInfo) } } @@ -173,26 +201,34 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE sort.Sort(byCreated(evictUnits[uid])) } - return evictUnits, unidentifiedContainers, nil + return evictUnits, netContainers, unidentifiedContainers, nil } // GarbageCollect removes dead containers using the specified container gc policy func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error { // Separate containers by evict units. - evictUnits, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge) + evictUnits, netContainers, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge) if err != nil { return err } // Remove unidentified containers. for _, container := range unidentifiedContainers { - glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id) + glog.Infof("Removing unidentified dead container %q", container.dockerName) err = cgc.client.RemoveContainer(container.id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}) if err != nil { - glog.Warningf("Failed to remove unidentified dead container %q: %v", container.name, err) + glog.Warningf("Failed to remove unidentified dead container %q: %v", container.dockerName, err) } } + // Always clean up net containers to ensure network resources are released + // TODO: this may tear down networking again if the container doesn't get + // removed in this GC cycle, but that already happens elsewhere... + for _, container := range netContainers { + glog.Infof("Cleaning up dead net container %q", container.dockerName) + cgc.netContainerCleanup(container) + } + // Remove deleted pod containers if all sources are ready. if allSourcesReady { for key, unit := range evictUnits { @@ -245,35 +281,56 @@ func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, return nil } -func (cgc *containerGC) removeContainer(id string, podNameWithNamespace string, containerName string) { - glog.V(4).Infof("Removing container %q name %q", id, containerName) - err := cgc.client.RemoveContainer(id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}) - if err != nil { - glog.Warningf("Failed to remove container %q: %v", id, err) +func (cgc *containerGC) netContainerCleanup(containerInfo containerGCInfo) { + if containerInfo.isHostNetwork { + return } - symlinkPath := LogSymlink(cgc.containerLogsDir, podNameWithNamespace, containerName, id) + + podName, podNamespace, err := kubecontainer.ParsePodFullName(containerInfo.podNameWithNamespace) + if err != nil { + glog.Warningf("failed to parse container %q pod full name: %v", containerInfo.dockerName, err) + return + } + + containerID := kubecontainer.DockerID(containerInfo.id).ContainerID() + if err := cgc.network.TearDownPod(podNamespace, podName, containerID); err != nil { + glog.Warningf("failed to tear down container %q network: %v", containerInfo.dockerName, err) + } +} + +func (cgc *containerGC) removeContainer(containerInfo containerGCInfo) { + glog.V(4).Infof("Removing container %q", containerInfo.dockerName) + err := cgc.client.RemoveContainer(containerInfo.id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true}) + if err != nil { + glog.Warningf("Failed to remove container %q: %v", containerInfo.dockerName, err) + } + symlinkPath := LogSymlink(cgc.containerLogsDir, containerInfo.podNameWithNamespace, containerInfo.containerName, containerInfo.id) err = os.Remove(symlinkPath) if err != nil && !os.IsNotExist(err) { - glog.Warningf("Failed to remove container %q log symlink %q: %v", id, symlinkPath, err) + glog.Warningf("Failed to remove container %q log symlink %q: %v", containerInfo.dockerName, symlinkPath, err) } } func (cgc *containerGC) deleteContainer(id string) error { - containerInfo, err := cgc.client.InspectContainer(id) + data, err := cgc.client.InspectContainer(id) if err != nil { glog.Warningf("Failed to inspect container %q: %v", id, err) return err } - if containerInfo.State.Running { + if data.State.Running { return fmt.Errorf("container %q is still running", id) } - containerName, _, err := ParseDockerName(containerInfo.Name) + containerInfo, err := newContainerGCInfo(id, data, time.Now()) if err != nil { return err } - cgc.removeContainer(id, containerName.PodFullName, containerName.ContainerName) + if containerIsNetworked(containerInfo.containerName) { + cgc.netContainerCleanup(containerInfo) + } + + cgc.removeContainer(containerInfo) return nil } diff --git a/pkg/kubelet/dockertools/container_gc_test.go b/pkg/kubelet/dockertools/container_gc_test.go index 9393d794291..c0eeecb2296 100644 --- a/pkg/kubelet/dockertools/container_gc_test.go +++ b/pkg/kubelet/dockertools/container_gc_test.go @@ -23,18 +23,23 @@ import ( "testing" "time" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/api/v1" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + knetwork "k8s.io/kubernetes/pkg/kubelet/network" + nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" ) -func newTestContainerGC(t *testing.T) (*containerGC, *FakeDockerClient) { +func newTestContainerGC(t *testing.T) (*containerGC, *FakeDockerClient, *nettest.MockNetworkPlugin) { fakeDocker := NewFakeDockerClient() fakePodGetter := newFakePodGetter() - gc := NewContainerGC(fakeDocker, fakePodGetter, "") - return gc, fakeDocker + fakePlugin := nettest.NewMockNetworkPlugin(gomock.NewController(t)) + fakePlugin.EXPECT().Name().Return("someNetworkPlugin").AnyTimes() + gc := NewContainerGC(fakeDocker, fakePodGetter, knetwork.NewPluginManager(fakePlugin), "") + return gc, fakeDocker, fakePlugin } // Makes a stable time object, lower id is earlier time. @@ -91,7 +96,7 @@ func verifyStringArrayEqualsAnyOrder(t *testing.T, actual, expected []string) { } func TestDeleteContainerSkipRunningContainer(t *testing.T) { - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, _ := newTestContainerGC(t) fakeDocker.SetFakeContainers([]*FakeContainer{ makeContainer("1876", "foo", "POD", true, makeTime(0)), }) @@ -102,29 +107,65 @@ func TestDeleteContainerSkipRunningContainer(t *testing.T) { } func TestDeleteContainerRemoveDeadContainer(t *testing.T) { - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, fakePlugin := newTestContainerGC(t) + defer fakePlugin.Finish() fakeDocker.SetFakeContainers([]*FakeContainer{ makeContainer("1876", "foo", "POD", false, makeTime(0)), }) addPods(gc.podGetter, "foo") + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + assert.Nil(t, gc.deleteContainer("1876")) assert.Len(t, fakeDocker.Removed, 1) } +func TestGarbageCollectNetworkTeardown(t *testing.T) { + // Ensure infra container gets teardown called + gc, fakeDocker, fakePlugin := newTestContainerGC(t) + defer fakePlugin.Finish() + id := kubecontainer.DockerID("1867").ContainerID() + fakeDocker.SetFakeContainers([]*FakeContainer{ + makeContainer(id.ID, "foo", "POD", false, makeTime(0)), + }) + addPods(gc.podGetter, "foo") + + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), id).Return(nil) + + assert.Nil(t, gc.deleteContainer(id.ID)) + assert.Len(t, fakeDocker.Removed, 1) + + // Ensure non-infra container does not have teardown called + gc, fakeDocker, fakePlugin = newTestContainerGC(t) + id = kubecontainer.DockerID("1877").ContainerID() + fakeDocker.SetFakeContainers([]*FakeContainer{ + makeContainer(id.ID, "foo", "adsfasdfasdf", false, makeTime(0)), + }) + fakePlugin.EXPECT().SetUpPod(gomock.Any(), gomock.Any(), id).Return(nil) + + addPods(gc.podGetter, "foo") + + assert.Nil(t, gc.deleteContainer(id.ID)) + assert.Len(t, fakeDocker.Removed, 1) +} + func TestGarbageCollectZeroMaxContainers(t *testing.T) { - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, fakePlugin := newTestContainerGC(t) + defer fakePlugin.Finish() fakeDocker.SetFakeContainers([]*FakeContainer{ makeContainer("1876", "foo", "POD", false, makeTime(0)), }) addPods(gc.podGetter, "foo") + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: 1, MaxContainers: 0}, true)) assert.Len(t, fakeDocker.Removed, 1) } func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) { - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, fakePlugin := newTestContainerGC(t) + defer fakePlugin.Finish() fakeDocker.SetFakeContainers([]*FakeContainer{ makeContainer("1876", "foo", "POD", false, makeTime(0)), makeContainer("2876", "foo1", "POD", false, makeTime(1)), @@ -134,12 +175,15 @@ func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) { }) addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4") + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(5) + assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: -1, MaxContainers: 4}, true)) assert.Len(t, fakeDocker.Removed, 1) } func TestGarbageCollectNoMaxLimit(t *testing.T) { - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, fakePlugin := newTestContainerGC(t) + defer fakePlugin.Finish() fakeDocker.SetFakeContainers([]*FakeContainer{ makeContainer("1876", "foo", "POD", false, makeTime(0)), makeContainer("2876", "foo1", "POD", false, makeTime(0)), @@ -149,6 +193,8 @@ func TestGarbageCollectNoMaxLimit(t *testing.T) { }) addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4") + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(5) + assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: -1, MaxContainers: -1}, true)) assert.Len(t, fakeDocker.Removed, 0) } @@ -261,10 +307,12 @@ func TestGarbageCollect(t *testing.T) { } for i, test := range tests { t.Logf("Running test case with index %d", i) - gc, fakeDocker := newTestContainerGC(t) + gc, fakeDocker, fakePlugin := newTestContainerGC(t) fakeDocker.SetFakeContainers(test.containers) addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7") + fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Hour, MaxPerPodContainer: 2, MaxContainers: 6}, true)) verifyStringArrayEqualsAnyOrder(t, fakeDocker.Removed, test.expectedRemoved) + fakePlugin.Finish() } } diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 42e8bc59856..51d34f14198 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -282,7 +282,7 @@ func NewDockerManager( cmdRunner := kubecontainer.DirectStreamingRunner(dm) dm.runner = lifecycle.NewHandlerRunner(httpClient, cmdRunner, dm) dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls, qps, burst) - dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir) + dm.containerGC = NewContainerGC(client, podGetter, dm.network, containerLogsDir) dm.versionCache = cache.NewObjectCache( func() (interface{}, error) { @@ -436,7 +436,7 @@ func (dm *DockerManager) inspectContainer(id string, podName, podNamespace strin // Container that are running, restarting and paused status.State = kubecontainer.ContainerStateRunning status.StartedAt = startedAt - if containerProvidesPodIP(dockerName) { + if containerProvidesPodIP(dockerName.ContainerName) { ip, err = dm.determineContainerIP(podNamespace, podName, iResult) // Kubelet doesn't handle the network error scenario if err != nil { @@ -2675,7 +2675,7 @@ func (dm *DockerManager) GetPodStatus(uid kubetypes.UID, name, namespace string) } } containerStatuses = append(containerStatuses, result) - if containerProvidesPodIP(dockerName) && ip != "" { + if containerProvidesPodIP(dockerName.ContainerName) && ip != "" { podStatus.IP = ip } } diff --git a/pkg/kubelet/dockertools/docker_manager_linux.go b/pkg/kubelet/dockertools/docker_manager_linux.go index 333ab4673b8..539ae0a2ca4 100644 --- a/pkg/kubelet/dockertools/docker_manager_linux.go +++ b/pkg/kubelet/dockertools/docker_manager_linux.go @@ -52,8 +52,13 @@ func getContainerIP(container *dockertypes.ContainerJSON) string { func getNetworkingMode() string { return "" } // Returns true if the container name matches the infrastructure's container name -func containerProvidesPodIP(name *KubeletContainerName) bool { - return name.ContainerName == PodInfraContainerName +func containerProvidesPodIP(containerName string) bool { + return containerName == PodInfraContainerName +} + +// Only the infrastructure container needs network setup/teardown +func containerIsNetworked(containerName string) bool { + return containerName == PodInfraContainerName } // Returns Seccomp and AppArmor Security options diff --git a/pkg/kubelet/dockertools/docker_manager_unsupported.go b/pkg/kubelet/dockertools/docker_manager_unsupported.go index ecfe5fce112..5bbcff7ac82 100644 --- a/pkg/kubelet/dockertools/docker_manager_unsupported.go +++ b/pkg/kubelet/dockertools/docker_manager_unsupported.go @@ -42,7 +42,11 @@ func getNetworkingMode() string { return "" } -func containerProvidesPodIP(name *KubeletContainerName) bool { +func containerProvidesPodIP(containerName string) bool { + return false +} + +func containerIsNetworked(containerName string) bool { return false } diff --git a/pkg/kubelet/dockertools/docker_manager_windows.go b/pkg/kubelet/dockertools/docker_manager_windows.go index 4a9ab80693e..8b29d964e19 100644 --- a/pkg/kubelet/dockertools/docker_manager_windows.go +++ b/pkg/kubelet/dockertools/docker_manager_windows.go @@ -65,8 +65,13 @@ func getNetworkingMode() string { // Infrastructure containers are not supported on Windows. For this reason, we // make sure to not grab the infra container's IP for the pod. -func containerProvidesPodIP(name *KubeletContainerName) bool { - return name.ContainerName != PodInfraContainerName +func containerProvidesPodIP(containerName string) bool { + return containerName != PodInfraContainerName +} + +// All containers in Windows need networking setup/teardown +func containerIsNetworked(containerName string) bool { + return true } // Returns nil as both Seccomp and AppArmor security options are not valid on Windows From 20e1cdb97c7399225a5a1db623c77faade73527a Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Mon, 16 Jan 2017 13:26:38 -0600 Subject: [PATCH 7/7] dockertools: tear down dead infra containers in SyncPod() Dead infra containers may still have network resources allocated to them and may not be GC-ed for a long time. But allowing SyncPod() to restart an infra container before the old one is destroyed prevents network plugins from carrying the old network details (eg IPAM) over to the new infra container. --- pkg/kubelet/dockertools/docker_manager.go | 94 ++++++++++++------- .../dockertools/docker_manager_test.go | 49 ++++++++++ 2 files changed, 111 insertions(+), 32 deletions(-) diff --git a/pkg/kubelet/dockertools/docker_manager.go b/pkg/kubelet/dockertools/docker_manager.go index 51d34f14198..e97f0777c11 100644 --- a/pkg/kubelet/dockertools/docker_manager.go +++ b/pkg/kubelet/dockertools/docker_manager.go @@ -1440,21 +1440,22 @@ func (dm *DockerManager) KillPod(pod *v1.Pod, runningPod kubecontainer.Pod, grac } // NOTE(random-liu): The pod passed in could be *nil* when kubelet restarted. -func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) { +// runtimePod may contain either running or exited containers +func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runtimePod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) { // Short circuit if there's nothing to kill. - if len(runningPod.Containers) == 0 { + if len(runtimePod.Containers) == 0 { return } // Send the kills in parallel since they may take a long time. - // There may be len(runningPod.Containers) or len(runningPod.Containers)-1 of result in the channel - containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers)) + // There may be len(runtimePod.Containers) or len(runtimePod.Containers)-1 of result in the channel + containerResults := make(chan *kubecontainer.SyncResult, len(runtimePod.Containers)) wg := sync.WaitGroup{} var ( - networkContainer *kubecontainer.Container - networkSpec *v1.Container + networkContainers []*kubecontainer.Container + networkSpecs []*v1.Container ) - wg.Add(len(runningPod.Containers)) - for _, container := range runningPod.Containers { + wg.Add(len(runtimePod.Containers)) + for _, container := range runtimePod.Containers { go func(container *kubecontainer.Container) { defer utilruntime.HandleCrash() defer wg.Done() @@ -1479,21 +1480,20 @@ func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubeconta // TODO: Handle this without signaling the pod infra container to // adapt to the generic container runtime. - if container.Name == PodInfraContainerName { + if containerIsNetworked(container.Name) { // Store the container runtime for later deletion. // We do this so that PreStop handlers can run in the network namespace. - networkContainer = container - networkSpec = containerSpec - return + networkContainers = append(networkContainers, container) + networkSpecs = append(networkSpecs, containerSpec) + } else { + killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name) + err := dm.KillContainerInPod(container.ID, containerSpec, pod, "Need to kill pod.", gracePeriodOverride) + if err != nil { + killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) + glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", container.ID.ID, err, runtimePod.ID) + } + containerResults <- killContainerResult } - - killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name) - err := dm.KillContainerInPod(container.ID, containerSpec, pod, "Need to kill pod.", gracePeriodOverride) - if err != nil { - killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) - glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", container.ID.ID, err, runningPod.ID) - } - containerResults <- killContainerResult }(container) } wg.Wait() @@ -1501,27 +1501,37 @@ func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubeconta for containerResult := range containerResults { result.AddSyncResult(containerResult) } - if networkContainer != nil { + + // Tear down any dead or running network/infra containers, but only kill + // those that are still running. + for i := range networkContainers { + networkContainer := networkContainers[i] + networkSpec := networkSpecs[i] + + teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, kubecontainer.BuildPodFullName(runtimePod.Name, runtimePod.Namespace)) + result.AddSyncResult(teardownNetworkResult) + ins, err := dm.client.InspectContainer(networkContainer.ID.ID) if err != nil { err = fmt.Errorf("Error inspecting container %v: %v", networkContainer.ID.ID, err) glog.Error(err) - result.Fail(err) - return + teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, err.Error()) + continue } + if getDockerNetworkMode(ins) != namespaceModeHost { - teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace)) - result.AddSyncResult(teardownNetworkResult) - if err := dm.network.TearDownPod(runningPod.Namespace, runningPod.Name, networkContainer.ID); err != nil { + if err := dm.network.TearDownPod(runtimePod.Namespace, runtimePod.Name, networkContainer.ID); err != nil { teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, err.Error()) glog.Error(err) } } - killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, networkContainer.Name) - result.AddSyncResult(killContainerResult) - if err := dm.KillContainerInPod(networkContainer.ID, networkSpec, pod, "Need to kill pod.", gracePeriodOverride); err != nil { - killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) - glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", networkContainer.ID.ID, err, runningPod.ID) + if networkContainer.State == kubecontainer.ContainerStateRunning { + killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, networkContainer.Name) + result.AddSyncResult(killContainerResult) + if err := dm.KillContainerInPod(networkContainer.ID, networkSpec, pod, "Need to kill pod.", gracePeriodOverride); err != nil { + killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) + glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", networkContainer.ID.ID, err, runtimePod.ID) + } } } return @@ -2150,9 +2160,29 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon glog.V(4).Infof("Killing Infra Container for %q, will start new one", format.Pod(pod)) } + // Get list of running container(s) to kill + podToKill := kubecontainer.ConvertPodStatusToRunningPod(dm.Type(), podStatus) + + // If there are dead network containers, also kill them to ensure + // their network resources get released and are available to be + // re-used by new net containers + for _, containerStatus := range podStatus.ContainerStatuses { + if containerIsNetworked(containerStatus.Name) && containerStatus.State == kubecontainer.ContainerStateExited { + container := &kubecontainer.Container{ + ID: containerStatus.ID, + Name: containerStatus.Name, + Image: containerStatus.Image, + ImageID: containerStatus.ImageID, + Hash: containerStatus.Hash, + State: containerStatus.State, + } + podToKill.Containers = append(podToKill.Containers, container) + } + } + // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) // TODO(random-liu): We'll use pod status directly in the future - killResult := dm.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(dm.Type(), podStatus), nil) + killResult := dm.killPodWithSyncResult(pod, podToKill, nil) result.AddPodSyncResult(killResult) if killResult.Error() != nil { return diff --git a/pkg/kubelet/dockertools/docker_manager_test.go b/pkg/kubelet/dockertools/docker_manager_test.go index 41b3386532d..da0ca6bf15c 100644 --- a/pkg/kubelet/dockertools/docker_manager_test.go +++ b/pkg/kubelet/dockertools/docker_manager_test.go @@ -1828,6 +1828,55 @@ func TestGetPodStatusNoSuchContainer(t *testing.T) { // Verify that we will try to start new contrainers even if the inspections // failed. verifyCalls(t, fakeDocker, []string{ + // Inspect dead infra container for possible network teardown + "inspect_container", + // Start a new infra container. + "create", "start", "inspect_container", "inspect_container", + // Start a new container. + "create", "start", "inspect_container", + }) +} + +func TestSyncPodDeadInfraContainerTeardown(t *testing.T) { + const ( + noSuchContainerID = "nosuchcontainer" + infraContainerID = "9876" + ) + dm, fakeDocker := newTestDockerManager() + dm.podInfraContainerImage = "pod_infra_image" + ctrl := gomock.NewController(t) + defer ctrl.Finish() + fnp := nettest.NewMockNetworkPlugin(ctrl) + dm.network = network.NewPluginManager(fnp) + + pod := makePod("foo", &v1.PodSpec{ + Containers: []v1.Container{{Name: noSuchContainerID}}, + }) + + fakeDocker.SetFakeContainers([]*FakeContainer{ + { + ID: infraContainerID, + Name: "/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_42", + ExitCode: 0, + StartedAt: time.Now(), + FinishedAt: time.Now(), + Running: false, + }, + }) + + // Can be called multiple times due to GetPodStatus + fnp.EXPECT().Name().Return("someNetworkPlugin").AnyTimes() + fnp.EXPECT().TearDownPod("new", "foo", gomock.Any()).Return(nil) + fnp.EXPECT().GetPodNetworkStatus("new", "foo", gomock.Any()).Return(&network.PodNetworkStatus{IP: net.ParseIP("1.1.1.1")}, nil).AnyTimes() + fnp.EXPECT().SetUpPod("new", "foo", gomock.Any()).Return(nil) + + runSyncPod(t, dm, fakeDocker, pod, nil, false) + + // Verify that we will try to start new contrainers even if the inspections + // failed. + verifyCalls(t, fakeDocker, []string{ + // Inspect dead infra container for possible network teardown + "inspect_container", // Start a new infra container. "create", "start", "inspect_container", "inspect_container", // Start a new container.