dockertools: use network PluginManager to synchronize pod network operations

We need to tear down networking when garbage collecting containers too,
and GC is run from a different goroutine in kubelet.  We don't want
container network operations running for the same pod concurrently.
This commit is contained in:
Dan Williams
2016-12-06 15:58:44 -06:00
parent 4c3cc67385
commit dc2fd511ab
3 changed files with 24 additions and 28 deletions

View File

@@ -59,7 +59,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/images" "k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network" knetwork "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/hairpin" "k8s.io/kubernetes/pkg/kubelet/network/hairpin"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
@@ -152,8 +152,8 @@ type DockerManager struct {
// Directory of container logs. // Directory of container logs.
containerLogsDir string containerLogsDir string
// Network plugin. // Network plugin manager.
networkPlugin network.NetworkPlugin network *knetwork.PluginManager
// Health check results. // Health check results.
livenessManager proberesults.Manager livenessManager proberesults.Manager
@@ -226,7 +226,7 @@ func NewDockerManager(
burst int, burst int,
containerLogsDir string, containerLogsDir string,
osInterface kubecontainer.OSInterface, osInterface kubecontainer.OSInterface,
networkPlugin network.NetworkPlugin, networkPlugin knetwork.NetworkPlugin,
runtimeHelper kubecontainer.RuntimeHelper, runtimeHelper kubecontainer.RuntimeHelper,
httpClient types.HttpGetter, httpClient types.HttpGetter,
execHandler ExecHandler, execHandler ExecHandler,
@@ -267,7 +267,7 @@ func NewDockerManager(
dockerPuller: newDockerPuller(client), dockerPuller: newDockerPuller(client),
cgroupDriver: cgroupDriver, cgroupDriver: cgroupDriver,
containerLogsDir: containerLogsDir, containerLogsDir: containerLogsDir,
networkPlugin: networkPlugin, network: knetwork.NewPluginManager(networkPlugin),
livenessManager: livenessManager, livenessManager: livenessManager,
runtimeHelper: runtimeHelper, runtimeHelper: runtimeHelper,
execHandler: execHandler, execHandler: execHandler,
@@ -357,10 +357,10 @@ func (dm *DockerManager) determineContainerIP(podNamespace, podName string, cont
isHostNetwork := networkMode == namespaceModeHost isHostNetwork := networkMode == namespaceModeHost
// For host networking or default network plugin, GetPodNetworkStatus doesn't work // For host networking or default network plugin, GetPodNetworkStatus doesn't work
if !isHostNetwork && dm.networkPlugin.Name() != network.DefaultPluginName { if !isHostNetwork && dm.network.PluginName() != knetwork.DefaultPluginName {
netStatus, err := dm.networkPlugin.GetPodNetworkStatus(podNamespace, podName, kubecontainer.DockerID(container.ID).ContainerID()) netStatus, err := dm.network.GetPodNetworkStatus(podNamespace, podName, kubecontainer.DockerID(container.ID).ContainerID())
if err != nil { if err != nil {
glog.Errorf("NetworkPlugin %s failed on the status hook for pod '%s' - %v", dm.networkPlugin.Name(), podName, err) glog.Error(err)
return result, err return result, err
} else if netStatus != nil { } else if netStatus != nil {
result = netStatus.IP.String() result = netStatus.IP.String()
@@ -1058,7 +1058,7 @@ func (dm *DockerManager) podInfraContainerChanged(pod *v1.Pod, podInfraContainer
glog.V(4).Infof("host: %v, %v", pod.Spec.HostNetwork, networkMode) glog.V(4).Infof("host: %v, %v", pod.Spec.HostNetwork, networkMode)
return true, nil return true, nil
} }
} else if dm.networkPlugin.Name() != "cni" && dm.networkPlugin.Name() != "kubenet" { } else if !dm.pluginDisablesDockerNetworking() {
// Docker only exports ports from the pod infra container. Let's // Docker only exports ports from the pod infra container. Let's
// collect all of the relevant ports and export them. // collect all of the relevant ports and export them.
for _, container := range pod.Spec.InitContainers { for _, container := range pod.Spec.InitContainers {
@@ -1091,6 +1091,10 @@ func getDockerNetworkMode(container *dockertypes.ContainerJSON) string {
return "" return ""
} }
func (dm *DockerManager) pluginDisablesDockerNetworking() bool {
return dm.network.PluginName() == "cni" || dm.network.PluginName() == "kubenet"
}
// newDockerVersion returns a semantically versioned docker version value // newDockerVersion returns a semantically versioned docker version value
func newDockerVersion(version string) (*utilversion.Version, error) { func newDockerVersion(version string) (*utilversion.Version, error) {
return utilversion.ParseSemantic(version) return utilversion.ParseSemantic(version)
@@ -1508,11 +1512,9 @@ func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubeconta
if getDockerNetworkMode(ins) != namespaceModeHost { if getDockerNetworkMode(ins) != namespaceModeHost {
teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace)) teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace))
result.AddSyncResult(teardownNetworkResult) result.AddSyncResult(teardownNetworkResult)
glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", dm.networkPlugin.Name(), kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace)) if err := dm.network.TearDownPod(runningPod.Namespace, runningPod.Name, networkContainer.ID); err != nil {
if err := dm.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, networkContainer.ID); err != nil { teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, err.Error())
message := fmt.Sprintf("Failed to teardown network for pod %q using network plugins %q: %v", runningPod.ID, dm.networkPlugin.Name(), err) glog.Error(err)
teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, message)
glog.Error(message)
} }
} }
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, networkContainer.Name) killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, networkContainer.Name)
@@ -1915,7 +1917,7 @@ func (dm *DockerManager) createPodInfraContainer(pod *v1.Pod) (kubecontainer.Doc
if kubecontainer.IsHostNetworkPod(pod) { if kubecontainer.IsHostNetworkPod(pod) {
netNamespace = namespaceModeHost netNamespace = namespaceModeHost
} else if dm.networkPlugin.Name() == "cni" || dm.networkPlugin.Name() == "kubenet" { } else if dm.pluginDisablesDockerNetworking() {
netNamespace = "none" netNamespace = "none"
} else { } else {
// Docker only exports ports from the pod infra container. Let's // Docker only exports ports from the pod infra container. Let's
@@ -2217,20 +2219,14 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon
setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod)) setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod))
result.AddSyncResult(setupNetworkResult) result.AddSyncResult(setupNetworkResult)
if !kubecontainer.IsHostNetworkPod(pod) { if !kubecontainer.IsHostNetworkPod(pod) {
glog.V(3).Infof("Calling network plugin %s to setup pod for %s", dm.networkPlugin.Name(), format.Pod(pod)) if err := dm.network.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()); err != nil {
err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()) setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, err.Error())
if err != nil { glog.Error(err)
// TODO: (random-liu) There shouldn't be "Skipping pod" in sync result message
message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v; Skipping pod", format.Pod(pod), dm.networkPlugin.Name(), err)
setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message)
glog.Error(message)
// Delete infra container // Delete infra container
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, PodInfraContainerName) killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, PodInfraContainerName)
result.AddSyncResult(killContainerResult) result.AddSyncResult(killContainerResult)
if delErr := dm.KillContainerInPod(kubecontainer.ContainerID{ if delErr := dm.KillContainerInPod(podInfraContainerID.ContainerID(), nil, pod, err.Error(), nil); delErr != nil {
ID: string(podInfraContainerID),
Type: "docker"}, nil, pod, message, nil); delErr != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, delErr.Error()) killContainerResult.Fail(kubecontainer.ErrKillContainer, delErr.Error())
glog.Warningf("Clear infra container failed for pod %q: %v", format.Pod(pod), delErr) glog.Warningf("Clear infra container failed for pod %q: %v", format.Pod(pod), delErr)
} }
@@ -2246,7 +2242,7 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon
} }
if dm.configureHairpinMode { if dm.configureHairpinMode {
if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, network.DefaultInterfaceName); err != nil { if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, knetwork.DefaultInterfaceName); err != nil {
glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err) glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err)
} }
} }

View File

@@ -457,7 +457,7 @@ func TestGetPodStatusFromNetworkPlugin(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
fnp := nettest.NewMockNetworkPlugin(ctrl) fnp := nettest.NewMockNetworkPlugin(ctrl)
dm.networkPlugin = fnp dm.network = network.NewPluginManager(fnp)
fakeDocker.SetFakeRunningContainers([]*FakeContainer{ fakeDocker.SetFakeRunningContainers([]*FakeContainer{
{ {

View File

@@ -1878,7 +1878,7 @@ func TestSyncPodGetsPodIPFromNetworkPlugin(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
fnp := nettest.NewMockNetworkPlugin(ctrl) fnp := nettest.NewMockNetworkPlugin(ctrl)
dm.networkPlugin = fnp dm.network = network.NewPluginManager(fnp)
pod := makePod("foo", &v1.PodSpec{ pod := makePod("foo", &v1.PodSpec{
Containers: []v1.Container{ Containers: []v1.Container{