diff --git a/pkg/kubelet/network/BUILD b/pkg/kubelet/network/BUILD index 1e493f99dc4..36974c2601b 100644 --- a/pkg/kubelet/network/BUILD +++ b/pkg/kubelet/network/BUILD @@ -5,7 +5,6 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", - "go_test", ) go_library( @@ -31,17 +30,6 @@ go_library( ], ) -go_test( - name = "go_default_test", - srcs = ["plugins_test.go"], - library = ":go_default_library", - tags = ["automanaged"], - deps = [ - "//pkg/apis/componentconfig:go_default_library", - "//pkg/kubelet/network/testing:go_default_library", - ], -) - filegroup( name = "package-srcs", srcs = glob(["**"]), @@ -57,7 +45,6 @@ filegroup( "//pkg/kubelet/network/hairpin:all-srcs", "//pkg/kubelet/network/hostport:all-srcs", "//pkg/kubelet/network/kubenet:all-srcs", - "//pkg/kubelet/network/mock_network:all-srcs", "//pkg/kubelet/network/testing:all-srcs", ], tags = ["automanaged"], diff --git a/pkg/kubelet/network/plugins.go b/pkg/kubelet/network/plugins.go index ed925562e1c..5b13ef86efb 100644 --- a/pkg/kubelet/network/plugins.go +++ b/pkg/kubelet/network/plugins.go @@ -20,6 +20,7 @@ import ( "fmt" "net" "strings" + "sync" "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -293,3 +294,123 @@ type NoopPortMappingGetter struct{} func (*NoopPortMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) { return nil, nil } + +// The PluginManager wraps a kubelet network plugin and provides synchronization +// for a given pod's network operations. Each pod's setup/teardown/status operations +// are synchronized against each other, but network operations of other pods can +// proceed in parallel. +type PluginManager struct { + // Network plugin being wrapped + plugin NetworkPlugin + + // Pod list and lock + podsLock sync.Mutex + pods map[string]*podLock +} + +func NewPluginManager(plugin NetworkPlugin) *PluginManager { + return &PluginManager{ + plugin: plugin, + pods: make(map[string]*podLock), + } +} + +func (pm *PluginManager) PluginName() string { + return pm.plugin.Name() +} + +func (pm *PluginManager) Event(name string, details map[string]interface{}) { + pm.plugin.Event(name, details) +} + +func (pm *PluginManager) Status() error { + return pm.plugin.Status() +} + +type podLock struct { + // Count of in-flight operations for this pod; when this reaches zero + // the lock can be removed from the pod map + refcount uint + + // Lock to synchronize operations for this specific pod + mu sync.Mutex +} + +// Lock network operations for a specific pod. If that pod is not yet in +// the pod map, it will be added. The reference count for the pod will +// be increased. +func (pm *PluginManager) podLock(fullPodName string) *sync.Mutex { + pm.podsLock.Lock() + defer pm.podsLock.Unlock() + + lock, ok := pm.pods[fullPodName] + if !ok { + lock = &podLock{} + pm.pods[fullPodName] = lock + } + lock.refcount++ + return &lock.mu +} + +// Unlock network operations for a specific pod. The reference count for the +// pod will be decreased. If the reference count reaches zero, the pod will be +// removed from the pod map. +func (pm *PluginManager) podUnlock(fullPodName string) { + pm.podsLock.Lock() + defer pm.podsLock.Unlock() + + lock, ok := pm.pods[fullPodName] + if !ok { + glog.Warningf("Unbalanced pod lock unref for %s", fullPodName) + return + } else if lock.refcount == 0 { + // This should never ever happen, but handle it anyway + delete(pm.pods, fullPodName) + glog.Warningf("Pod lock for %s still in map with zero refcount", fullPodName) + return + } + lock.refcount-- + lock.mu.Unlock() + if lock.refcount == 0 { + delete(pm.pods, fullPodName) + } +} + +func (pm *PluginManager) GetPodNetworkStatus(podNamespace, podName string, id kubecontainer.ContainerID) (*PodNetworkStatus, error) { + fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) + pm.podLock(fullPodName).Lock() + defer pm.podUnlock(fullPodName) + + netStatus, err := pm.plugin.GetPodNetworkStatus(podNamespace, podName, id) + if err != nil { + return nil, fmt.Errorf("NetworkPlugin %s failed on the status hook for pod %q: %v", pm.plugin.Name(), fullPodName, err) + } + + return netStatus, nil +} + +func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID) error { + fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) + pm.podLock(fullPodName).Lock() + defer pm.podUnlock(fullPodName) + + glog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName) + if err := pm.plugin.SetUpPod(podNamespace, podName, id); err != nil { + return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err) + } + + return nil +} + +func (pm *PluginManager) TearDownPod(podNamespace, podName string, id kubecontainer.ContainerID) error { + fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace) + pm.podLock(fullPodName).Lock() + defer pm.podUnlock(fullPodName) + + glog.V(3).Infof("Calling network plugin %s to tear down pod %q", pm.plugin.Name(), fullPodName) + if err := pm.plugin.TearDownPod(podNamespace, podName, id); err != nil { + return fmt.Errorf("NetworkPlugin %s failed to teardown pod %q network: %v", pm.plugin.Name(), fullPodName, err) + } + + return nil +} diff --git a/pkg/kubelet/network/plugins_test.go b/pkg/kubelet/network/plugins_test.go deleted file mode 100644 index bbb6afe51fb..00000000000 --- a/pkg/kubelet/network/plugins_test.go +++ /dev/null @@ -1,38 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package network - -import ( - "testing" - - "k8s.io/kubernetes/pkg/apis/componentconfig" - nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" -) - -func TestSelectDefaultPlugin(t *testing.T) { - all_plugins := []NetworkPlugin{} - plug, err := InitNetworkPlugin(all_plugins, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8", UseDefaultMTU) - if err != nil { - t.Fatalf("Unexpected error in selecting default plugin: %v", err) - } - if plug == nil { - t.Fatalf("Failed to select the default plugin.") - } - if plug.Name() != DefaultPluginName { - t.Errorf("Failed to select the default plugin. Expected %s. Got %s", DefaultPluginName, plug.Name()) - } -} diff --git a/pkg/kubelet/network/testing/BUILD b/pkg/kubelet/network/testing/BUILD index b4082c77ddf..5a61b6d0ace 100644 --- a/pkg/kubelet/network/testing/BUILD +++ b/pkg/kubelet/network/testing/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -20,6 +21,20 @@ go_library( ], ) +go_test( + name = "go_default_test", + srcs = ["plugins_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/apis/componentconfig:go_default_library", + "//pkg/kubelet/container:go_default_library", + "//pkg/kubelet/network:go_default_library", + "//vendor:github.com/golang/mock/gomock", + "//vendor:k8s.io/apimachinery/pkg/util/sets", + ], +) + filegroup( name = "package-srcs", srcs = glob(["**"]), diff --git a/pkg/kubelet/network/testing/plugins_test.go b/pkg/kubelet/network/testing/plugins_test.go new file mode 100644 index 00000000000..96f46013ea4 --- /dev/null +++ b/pkg/kubelet/network/testing/plugins_test.go @@ -0,0 +1,218 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package testing + +import ( + "fmt" + "net" + "sync" + "testing" + + utilsets "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/kubernetes/pkg/apis/componentconfig" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/network" + + "github.com/golang/mock/gomock" +) + +func TestSelectDefaultPlugin(t *testing.T) { + all_plugins := []network.NetworkPlugin{} + plug, err := network.InitNetworkPlugin(all_plugins, "", NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8", network.UseDefaultMTU) + if err != nil { + t.Fatalf("Unexpected error in selecting default plugin: %v", err) + } + if plug == nil { + t.Fatalf("Failed to select the default plugin.") + } + if plug.Name() != network.DefaultPluginName { + t.Errorf("Failed to select the default plugin. Expected %s. Got %s", network.DefaultPluginName, plug.Name()) + } +} + +func TestPluginManager(t *testing.T) { + ctrl := gomock.NewController(t) + fnp := NewMockNetworkPlugin(ctrl) + defer fnp.Finish() + pm := network.NewPluginManager(fnp) + + fnp.EXPECT().Name().Return("someNetworkPlugin").AnyTimes() + + allCreatedWg := sync.WaitGroup{} + allCreatedWg.Add(1) + allDoneWg := sync.WaitGroup{} + + // 10 pods, 4 setup/status/teardown runs each. Ensure that network locking + // works and the pod map isn't concurrently accessed + for i := 0; i < 10; i++ { + podName := fmt.Sprintf("pod%d", i) + containerID := kubecontainer.ContainerID{ID: podName} + + fnp.EXPECT().SetUpPod("", podName, containerID).Return(nil).Times(4) + fnp.EXPECT().GetPodNetworkStatus("", podName, containerID).Return(&network.PodNetworkStatus{IP: net.ParseIP("1.2.3.4")}, nil).Times(4) + fnp.EXPECT().TearDownPod("", podName, containerID).Return(nil).Times(4) + + for x := 0; x < 4; x++ { + allDoneWg.Add(1) + go func(name string, id kubecontainer.ContainerID, num int) { + defer allDoneWg.Done() + + // Block all goroutines from running until all have + // been created and are ready. This ensures we + // have more pod network operations running + // concurrently. + allCreatedWg.Wait() + + if err := pm.SetUpPod("", name, id); err != nil { + t.Errorf("Failed to set up pod %q: %v", name, err) + return + } + + if _, err := pm.GetPodNetworkStatus("", name, id); err != nil { + t.Errorf("Failed to inspect pod %q: %v", name, err) + return + } + + if err := pm.TearDownPod("", name, id); err != nil { + t.Errorf("Failed to tear down pod %q: %v", name, err) + return + } + }(podName, containerID, x) + } + } + // Block all goroutines from running until all have been created and started + allCreatedWg.Done() + + // Wait for them all to finish + allDoneWg.Wait() +} + +type hookableFakeNetworkPluginSetupHook func(namespace, name string, id kubecontainer.ContainerID) + +type hookableFakeNetworkPlugin struct { + setupHook hookableFakeNetworkPluginSetupHook +} + +func newHookableFakeNetworkPlugin(setupHook hookableFakeNetworkPluginSetupHook) *hookableFakeNetworkPlugin { + return &hookableFakeNetworkPlugin{ + setupHook: setupHook, + } +} + +func (p *hookableFakeNetworkPlugin) Init(host network.Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error { + return nil +} + +func (p *hookableFakeNetworkPlugin) Event(name string, details map[string]interface{}) { +} + +func (p *hookableFakeNetworkPlugin) Name() string { + return "fakeplugin" +} + +func (p *hookableFakeNetworkPlugin) Capabilities() utilsets.Int { + return utilsets.NewInt() +} + +func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error { + if p.setupHook != nil { + p.setupHook(namespace, name, id) + } + return nil +} + +func (p *hookableFakeNetworkPlugin) TearDownPod(string, string, kubecontainer.ContainerID) error { + return nil +} + +func (p *hookableFakeNetworkPlugin) GetPodNetworkStatus(string, string, kubecontainer.ContainerID) (*network.PodNetworkStatus, error) { + return &network.PodNetworkStatus{IP: net.ParseIP("10.1.2.3")}, nil +} + +func (p *hookableFakeNetworkPlugin) Status() error { + return nil +} + +// Ensure that one pod's network operations don't block another's. If the +// test is successful (eg, first pod doesn't block on second) the test +// will complete. If unsuccessful, it will hang and get killed. +func TestMultiPodParallelNetworkOps(t *testing.T) { + podWg := sync.WaitGroup{} + podWg.Add(1) + + // Can't do this with MockNetworkPlugin because the gomock controller + // has its own locks which don't allow the parallel network operation + // to proceed. + didWait := false + fakePlugin := newHookableFakeNetworkPlugin(func(podNamespace, podName string, id kubecontainer.ContainerID) { + if podName == "waiter" { + podWg.Wait() + didWait = true + } + }) + pm := network.NewPluginManager(fakePlugin) + + opsWg := sync.WaitGroup{} + + // Start the pod that will wait for the other to complete + opsWg.Add(1) + go func() { + defer opsWg.Done() + + podName := "waiter" + containerID := kubecontainer.ContainerID{ID: podName} + + // Setup will block on the runner pod completing. If network + // operations locking isn't correct (eg pod network operations + // block other pods) setUpPod() will never return. + if err := pm.SetUpPod("", podName, containerID); err != nil { + t.Errorf("Failed to set up waiter pod: %v", err) + return + } + + if err := pm.TearDownPod("", podName, containerID); err != nil { + t.Errorf("Failed to tear down waiter pod: %v", err) + return + } + }() + + opsWg.Add(1) + go func() { + defer opsWg.Done() + // Let other pod proceed + defer podWg.Done() + + podName := "runner" + containerID := kubecontainer.ContainerID{ID: podName} + + if err := pm.SetUpPod("", podName, containerID); err != nil { + t.Errorf("Failed to set up runner pod: %v", err) + return + } + + if err := pm.TearDownPod("", podName, containerID); err != nil { + t.Errorf("Failed to tear down runner pod: %v", err) + return + } + }() + + opsWg.Wait() + + if !didWait { + t.Errorf("waiter pod didn't wait for runner pod!") + } +}