mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #37036 from dcbw/docker-gc-teardown-pods
Automatic merge from submit-queue (batch tested with PRs 40505, 34664, 37036, 40726, 41595) dockertools: call TearDownPod when GC-ing infra pods The docker runtime doesn't tear down networking when GC-ing pods. rkt already does so make docker do it too. To ensure this happens, infra pods are now always GC-ed rather than gating them by containersToKeep. This prevents IPAM from leaking when the pod gets killed for some reason outside kubelet (like docker restart) or when pods are killed while kubelet isn't running. Fixes: https://github.com/kubernetes/kubernetes/issues/14940 Related: https://github.com/kubernetes/kubernetes/pull/35572
This commit is contained in:
commit
98d1cffe05
@ -89,7 +89,7 @@ go_test(
|
||||
"//pkg/kubelet/dockertools:go_default_library",
|
||||
"//pkg/kubelet/dockertools/securitycontext:go_default_library",
|
||||
"//pkg/kubelet/network:go_default_library",
|
||||
"//pkg/kubelet/network/mock_network:go_default_library",
|
||||
"//pkg/kubelet/network/testing:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
"//pkg/kubelet/util/cache:go_default_library",
|
||||
"//pkg/security/apparmor:go_default_library",
|
||||
|
@ -103,7 +103,7 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str
|
||||
// on the host as well, to satisfy parts of the pod spec that aren't
|
||||
// recognized by the CNI standard yet.
|
||||
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
|
||||
err = ds.networkPlugin.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID)
|
||||
err = ds.network.SetUpPod(config.GetMetadata().Namespace, config.GetMetadata().Name, cID)
|
||||
// TODO: Do we need to teardown on failure or can we rely on a StopPodSandbox call with the given ID?
|
||||
return createResp.ID, err
|
||||
}
|
||||
@ -162,8 +162,8 @@ func (ds *dockerService) StopPodSandbox(podSandboxID string) error {
|
||||
errList := []error{}
|
||||
if needNetworkTearDown {
|
||||
cID := kubecontainer.BuildContainerID(runtimeName, podSandboxID)
|
||||
if err := ds.networkPlugin.TearDownPod(namespace, name, cID); err != nil {
|
||||
errList = append(errList, fmt.Errorf("failed to teardown sandbox %q for pod %s/%s: %v", podSandboxID, namespace, name, err))
|
||||
if err := ds.network.TearDownPod(namespace, name, cID); err != nil {
|
||||
errList = append(errList, err)
|
||||
}
|
||||
}
|
||||
if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil {
|
||||
@ -199,12 +199,12 @@ func (ds *dockerService) getIPFromPlugin(sandbox *dockertypes.ContainerJSON) (st
|
||||
}
|
||||
msg := fmt.Sprintf("Couldn't find network status for %s/%s through plugin", metadata.Namespace, metadata.Name)
|
||||
cID := kubecontainer.BuildContainerID(runtimeName, sandbox.ID)
|
||||
networkStatus, err := ds.networkPlugin.GetPodNetworkStatus(metadata.Namespace, metadata.Name, cID)
|
||||
networkStatus, err := ds.network.GetPodNetworkStatus(metadata.Namespace, metadata.Name, cID)
|
||||
if err != nil {
|
||||
// This might be a sandbox that somehow ended up without a default
|
||||
// interface (eth0). We can't distinguish this from a more serious
|
||||
// error, so callers should probably treat it as non-fatal.
|
||||
return "", fmt.Errorf("%v: %v", msg, err)
|
||||
return "", err
|
||||
}
|
||||
if networkStatus == nil {
|
||||
return "", fmt.Errorf("%v: invalid network status for", msg)
|
||||
@ -408,7 +408,7 @@ func (ds *dockerService) applySandboxLinuxOptions(hc *dockercontainer.HostConfig
|
||||
}
|
||||
hc.CgroupParent = cgroupParent
|
||||
// Apply security context.
|
||||
applySandboxSecurityContext(lc, createConfig.Config, hc, ds.networkPlugin, separator)
|
||||
applySandboxSecurityContext(lc, createConfig.Config, hc, ds.network, separator)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
|
||||
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||
)
|
||||
|
||||
@ -146,7 +147,7 @@ func TestSandboxStatus(t *testing.T) {
|
||||
func TestNetworkPluginInvocation(t *testing.T) {
|
||||
ds, _, _ := newTestDockerService()
|
||||
mockPlugin := newTestNetworkPlugin(t)
|
||||
ds.networkPlugin = mockPlugin
|
||||
ds.network = network.NewPluginManager(mockPlugin)
|
||||
defer mockPlugin.Finish()
|
||||
|
||||
name := "foo0"
|
||||
@ -158,6 +159,7 @@ func TestNetworkPluginInvocation(t *testing.T) {
|
||||
)
|
||||
cID := kubecontainer.ContainerID{Type: runtimeName, ID: fmt.Sprintf("/%v", makeSandboxName(c))}
|
||||
|
||||
mockPlugin.EXPECT().Name().Return("mockNetworkPlugin").AnyTimes()
|
||||
setup := mockPlugin.EXPECT().SetUpPod(ns, name, cID)
|
||||
// StopPodSandbox performs a lookup on status to figure out if the sandbox
|
||||
// is running with hostnetworking, as all its given is the ID.
|
||||
@ -175,7 +177,7 @@ func TestNetworkPluginInvocation(t *testing.T) {
|
||||
func TestHostNetworkPluginInvocation(t *testing.T) {
|
||||
ds, _, _ := newTestDockerService()
|
||||
mockPlugin := newTestNetworkPlugin(t)
|
||||
ds.networkPlugin = mockPlugin
|
||||
ds.network = network.NewPluginManager(mockPlugin)
|
||||
defer mockPlugin.Finish()
|
||||
|
||||
name := "foo0"
|
||||
|
@ -178,7 +178,7 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("didn't find compatible CNI plugin with given settings %+v: %v", pluginSettings, err)
|
||||
}
|
||||
ds.networkPlugin = plug
|
||||
ds.network = network.NewPluginManager(plug)
|
||||
glog.Infof("Docker cri networking managed by %v", plug.Name())
|
||||
|
||||
// NOTE: cgroup driver is only detectable in docker 1.11+
|
||||
@ -224,7 +224,7 @@ type dockerService struct {
|
||||
podSandboxImage string
|
||||
streamingRuntime *streamingRuntime
|
||||
streamingServer streaming.Server
|
||||
networkPlugin network.NetworkPlugin
|
||||
network *network.PluginManager
|
||||
containerManager cm.ContainerManager
|
||||
// cgroup driver used by Docker runtime.
|
||||
cgroupDriver string
|
||||
@ -270,10 +270,10 @@ func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeapi.RuntimeCo
|
||||
return
|
||||
}
|
||||
glog.Infof("docker cri received runtime config %+v", runtimeConfig)
|
||||
if ds.networkPlugin != nil && runtimeConfig.NetworkConfig.PodCidr != "" {
|
||||
if ds.network != nil && runtimeConfig.NetworkConfig.PodCidr != "" {
|
||||
event := make(map[string]interface{})
|
||||
event[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = runtimeConfig.NetworkConfig.PodCidr
|
||||
ds.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event)
|
||||
ds.network.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event)
|
||||
}
|
||||
return
|
||||
}
|
||||
@ -339,7 +339,7 @@ func (ds *dockerService) Status() (*runtimeapi.RuntimeStatus, error) {
|
||||
runtimeReady.Reason = "DockerDaemonNotReady"
|
||||
runtimeReady.Message = fmt.Sprintf("docker: failed to get docker version: %v", err)
|
||||
}
|
||||
if err := ds.networkPlugin.Status(); err != nil {
|
||||
if err := ds.network.Status(); err != nil {
|
||||
networkReady.Status = false
|
||||
networkReady.Reason = "NetworkPluginNotReady"
|
||||
networkReady.Message = fmt.Sprintf("docker: network plugin is not ready: %v", err)
|
||||
|
@ -32,20 +32,21 @@ import (
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/mock_network"
|
||||
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/cache"
|
||||
)
|
||||
|
||||
// newTestNetworkPlugin returns a mock plugin that implements network.NetworkPlugin
|
||||
func newTestNetworkPlugin(t *testing.T) *mock_network.MockNetworkPlugin {
|
||||
func newTestNetworkPlugin(t *testing.T) *nettest.MockNetworkPlugin {
|
||||
ctrl := gomock.NewController(t)
|
||||
return mock_network.NewMockNetworkPlugin(ctrl)
|
||||
return nettest.NewMockNetworkPlugin(ctrl)
|
||||
}
|
||||
|
||||
func newTestDockerService() (*dockerService, *dockertools.FakeDockerClient, *clock.FakeClock) {
|
||||
fakeClock := clock.NewFakeClock(time.Time{})
|
||||
c := dockertools.NewFakeDockerClient().WithClock(fakeClock).WithVersion("1.11.2", "1.23")
|
||||
return &dockerService{client: c, os: &containertest.FakeOS{}, networkPlugin: &network.NoopNetworkPlugin{},
|
||||
pm := network.NewPluginManager(&network.NoopNetworkPlugin{})
|
||||
return &dockerService{client: c, os: &containertest.FakeOS{}, network: pm,
|
||||
legacyCleanup: legacyCleanupFlag{done: 1}, checkpointHandler: NewTestPersistentCheckpointHandler()}, c, fakeClock
|
||||
}
|
||||
|
||||
@ -95,7 +96,7 @@ func TestStatus(t *testing.T) {
|
||||
|
||||
// Should not report ready status is network plugin returns error.
|
||||
mockPlugin := newTestNetworkPlugin(t)
|
||||
ds.networkPlugin = mockPlugin
|
||||
ds.network = network.NewPluginManager(mockPlugin)
|
||||
defer mockPlugin.Finish()
|
||||
mockPlugin.EXPECT().Status().Return(errors.New("network error"))
|
||||
status, err = ds.Status()
|
||||
|
@ -25,11 +25,11 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
"k8s.io/kubernetes/pkg/kubelet/dockertools/securitycontext"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
|
||||
)
|
||||
|
||||
// applySandboxSecurityContext updates docker sandbox options according to security context.
|
||||
func applySandboxSecurityContext(lc *runtimeapi.LinuxPodSandboxConfig, config *dockercontainer.Config, hc *dockercontainer.HostConfig, networkPlugin network.NetworkPlugin, separator rune) {
|
||||
func applySandboxSecurityContext(lc *runtimeapi.LinuxPodSandboxConfig, config *dockercontainer.Config, hc *dockercontainer.HostConfig, network *knetwork.PluginManager, separator rune) {
|
||||
if lc == nil {
|
||||
return
|
||||
}
|
||||
@ -47,8 +47,7 @@ func applySandboxSecurityContext(lc *runtimeapi.LinuxPodSandboxConfig, config *d
|
||||
|
||||
modifyContainerConfig(sc, config)
|
||||
modifyHostConfig(sc, hc, separator)
|
||||
modifySandboxNamespaceOptions(sc.GetNamespaceOptions(), hc, networkPlugin)
|
||||
|
||||
modifySandboxNamespaceOptions(sc.GetNamespaceOptions(), hc, network)
|
||||
}
|
||||
|
||||
// applyContainerSecurityContext updates docker container options according to security context.
|
||||
@ -109,9 +108,9 @@ func modifyHostConfig(sc *runtimeapi.LinuxContainerSecurityContext, hostConfig *
|
||||
}
|
||||
|
||||
// modifySandboxNamespaceOptions apply namespace options for sandbox
|
||||
func modifySandboxNamespaceOptions(nsOpts *runtimeapi.NamespaceOption, hostConfig *dockercontainer.HostConfig, networkPlugin network.NetworkPlugin) {
|
||||
func modifySandboxNamespaceOptions(nsOpts *runtimeapi.NamespaceOption, hostConfig *dockercontainer.HostConfig, network *knetwork.PluginManager) {
|
||||
modifyCommonNamespaceOptions(nsOpts, hostConfig)
|
||||
modifyHostNetworkOptionForSandbox(nsOpts.HostNetwork, networkPlugin, hostConfig)
|
||||
modifyHostNetworkOptionForSandbox(nsOpts.HostNetwork, network, hostConfig)
|
||||
}
|
||||
|
||||
// modifyContainerNamespaceOptions apply namespace options for container
|
||||
@ -137,18 +136,18 @@ func modifyCommonNamespaceOptions(nsOpts *runtimeapi.NamespaceOption, hostConfig
|
||||
}
|
||||
|
||||
// modifyHostNetworkOptionForSandbox applies NetworkMode/UTSMode to sandbox's dockercontainer.HostConfig.
|
||||
func modifyHostNetworkOptionForSandbox(hostNetwork bool, networkPlugin network.NetworkPlugin, hc *dockercontainer.HostConfig) {
|
||||
func modifyHostNetworkOptionForSandbox(hostNetwork bool, network *knetwork.PluginManager, hc *dockercontainer.HostConfig) {
|
||||
if hostNetwork {
|
||||
hc.NetworkMode = namespaceModeHost
|
||||
return
|
||||
}
|
||||
|
||||
if networkPlugin == nil {
|
||||
if network == nil {
|
||||
hc.NetworkMode = "default"
|
||||
return
|
||||
}
|
||||
|
||||
switch networkPlugin.Name() {
|
||||
switch network.PluginName() {
|
||||
case "cni":
|
||||
fallthrough
|
||||
case "kubenet":
|
||||
|
@ -112,7 +112,6 @@ go_test(
|
||||
"//pkg/kubelet/events:go_default_library",
|
||||
"//pkg/kubelet/images:go_default_library",
|
||||
"//pkg/kubelet/network:go_default_library",
|
||||
"//pkg/kubelet/network/mock_network:go_default_library",
|
||||
"//pkg/kubelet/network/testing:go_default_library",
|
||||
"//pkg/kubelet/prober/results:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
|
@ -28,18 +28,21 @@ import (
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
|
||||
)
|
||||
|
||||
type containerGC struct {
|
||||
client DockerInterface
|
||||
podGetter podGetter
|
||||
network *knetwork.PluginManager
|
||||
containerLogsDir string
|
||||
}
|
||||
|
||||
func NewContainerGC(client DockerInterface, podGetter podGetter, containerLogsDir string) *containerGC {
|
||||
func NewContainerGC(client DockerInterface, podGetter podGetter, network *knetwork.PluginManager, containerLogsDir string) *containerGC {
|
||||
return &containerGC{
|
||||
client: client,
|
||||
podGetter: podGetter,
|
||||
network: network,
|
||||
containerLogsDir: containerLogsDir,
|
||||
}
|
||||
}
|
||||
@ -50,7 +53,7 @@ type containerGCInfo struct {
|
||||
id string
|
||||
|
||||
// Docker name of the container.
|
||||
name string
|
||||
dockerName string
|
||||
|
||||
// Creation time for the container.
|
||||
createTime time.Time
|
||||
@ -59,8 +62,14 @@ type containerGCInfo struct {
|
||||
// This comes from dockertools.ParseDockerName(...)
|
||||
podNameWithNamespace string
|
||||
|
||||
// Kubernetes pod UID
|
||||
podUID types.UID
|
||||
|
||||
// Container name in pod
|
||||
containerName string
|
||||
|
||||
// Container network mode
|
||||
isHostNetwork bool
|
||||
}
|
||||
|
||||
// Containers are considered for eviction as units of (UID, container name) pair.
|
||||
@ -111,22 +120,45 @@ func (cgc *containerGC) removeOldestN(containers []containerGCInfo, toRemove int
|
||||
// Remove from oldest to newest (last to first).
|
||||
numToKeep := len(containers) - toRemove
|
||||
for i := numToKeep; i < len(containers); i++ {
|
||||
cgc.removeContainer(containers[i].id, containers[i].podNameWithNamespace, containers[i].containerName)
|
||||
cgc.removeContainer(containers[i])
|
||||
}
|
||||
|
||||
// Assume we removed the containers so that we're not too aggressive.
|
||||
return containers[:numToKeep]
|
||||
}
|
||||
|
||||
// Returns a full GC info structure on success, or a partial one on failure
|
||||
func newContainerGCInfo(id string, inspectResult *dockertypes.ContainerJSON, created time.Time) (containerGCInfo, error) {
|
||||
containerName, _, err := ParseDockerName(inspectResult.Name)
|
||||
if err != nil {
|
||||
return containerGCInfo{
|
||||
id: id,
|
||||
dockerName: inspectResult.Name,
|
||||
}, fmt.Errorf("failed to parse docker name %q: %v", inspectResult.Name, err)
|
||||
}
|
||||
|
||||
networkMode := getDockerNetworkMode(inspectResult)
|
||||
return containerGCInfo{
|
||||
id: id,
|
||||
dockerName: inspectResult.Name,
|
||||
podNameWithNamespace: containerName.PodFullName,
|
||||
podUID: containerName.PodUID,
|
||||
containerName: containerName.ContainerName,
|
||||
createTime: created,
|
||||
isHostNetwork: networkMode == namespaceModeHost,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get all containers that are evictable. Evictable containers are: not running
|
||||
// and created more than MinAge ago.
|
||||
func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, []containerGCInfo, error) {
|
||||
func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByEvictUnit, []containerGCInfo, []containerGCInfo, error) {
|
||||
containers, err := GetKubeletDockerContainers(cgc.client, true)
|
||||
if err != nil {
|
||||
return containersByEvictUnit{}, []containerGCInfo{}, err
|
||||
return containersByEvictUnit{}, []containerGCInfo{}, []containerGCInfo{}, err
|
||||
}
|
||||
|
||||
unidentifiedContainers := make([]containerGCInfo, 0)
|
||||
netContainers := make([]containerGCInfo, 0)
|
||||
evictUnits := make(containersByEvictUnit)
|
||||
newestGCTime := time.Now().Add(-minAge)
|
||||
for _, container := range containers {
|
||||
@ -147,23 +179,19 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE
|
||||
continue
|
||||
}
|
||||
|
||||
containerInfo := containerGCInfo{
|
||||
id: container.ID,
|
||||
name: container.Names[0],
|
||||
createTime: created,
|
||||
}
|
||||
|
||||
containerName, _, err := ParseDockerName(container.Names[0])
|
||||
|
||||
containerInfo, err := newContainerGCInfo(container.ID, data, created)
|
||||
if err != nil {
|
||||
unidentifiedContainers = append(unidentifiedContainers, containerInfo)
|
||||
} else {
|
||||
key := evictUnit{
|
||||
uid: containerName.PodUID,
|
||||
name: containerName.ContainerName,
|
||||
// Track net containers for special cleanup
|
||||
if containerIsNetworked(containerInfo.containerName) {
|
||||
netContainers = append(netContainers, containerInfo)
|
||||
}
|
||||
|
||||
key := evictUnit{
|
||||
uid: containerInfo.podUID,
|
||||
name: containerInfo.containerName,
|
||||
}
|
||||
containerInfo.podNameWithNamespace = containerName.PodFullName
|
||||
containerInfo.containerName = containerName.ContainerName
|
||||
evictUnits[key] = append(evictUnits[key], containerInfo)
|
||||
}
|
||||
}
|
||||
@ -173,26 +201,34 @@ func (cgc *containerGC) evictableContainers(minAge time.Duration) (containersByE
|
||||
sort.Sort(byCreated(evictUnits[uid]))
|
||||
}
|
||||
|
||||
return evictUnits, unidentifiedContainers, nil
|
||||
return evictUnits, netContainers, unidentifiedContainers, nil
|
||||
}
|
||||
|
||||
// GarbageCollect removes dead containers using the specified container gc policy
|
||||
func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy, allSourcesReady bool) error {
|
||||
// Separate containers by evict units.
|
||||
evictUnits, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge)
|
||||
evictUnits, netContainers, unidentifiedContainers, err := cgc.evictableContainers(gcPolicy.MinAge)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove unidentified containers.
|
||||
for _, container := range unidentifiedContainers {
|
||||
glog.Infof("Removing unidentified dead container %q with ID %q", container.name, container.id)
|
||||
glog.Infof("Removing unidentified dead container %q", container.dockerName)
|
||||
err = cgc.client.RemoveContainer(container.id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true})
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to remove unidentified dead container %q: %v", container.name, err)
|
||||
glog.Warningf("Failed to remove unidentified dead container %q: %v", container.dockerName, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Always clean up net containers to ensure network resources are released
|
||||
// TODO: this may tear down networking again if the container doesn't get
|
||||
// removed in this GC cycle, but that already happens elsewhere...
|
||||
for _, container := range netContainers {
|
||||
glog.Infof("Cleaning up dead net container %q", container.dockerName)
|
||||
cgc.netContainerCleanup(container)
|
||||
}
|
||||
|
||||
// Remove deleted pod containers if all sources are ready.
|
||||
if allSourcesReady {
|
||||
for key, unit := range evictUnits {
|
||||
@ -245,35 +281,56 @@ func (cgc *containerGC) GarbageCollect(gcPolicy kubecontainer.ContainerGCPolicy,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cgc *containerGC) removeContainer(id string, podNameWithNamespace string, containerName string) {
|
||||
glog.V(4).Infof("Removing container %q name %q", id, containerName)
|
||||
err := cgc.client.RemoveContainer(id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true})
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to remove container %q: %v", id, err)
|
||||
func (cgc *containerGC) netContainerCleanup(containerInfo containerGCInfo) {
|
||||
if containerInfo.isHostNetwork {
|
||||
return
|
||||
}
|
||||
symlinkPath := LogSymlink(cgc.containerLogsDir, podNameWithNamespace, containerName, id)
|
||||
|
||||
podName, podNamespace, err := kubecontainer.ParsePodFullName(containerInfo.podNameWithNamespace)
|
||||
if err != nil {
|
||||
glog.Warningf("failed to parse container %q pod full name: %v", containerInfo.dockerName, err)
|
||||
return
|
||||
}
|
||||
|
||||
containerID := kubecontainer.DockerID(containerInfo.id).ContainerID()
|
||||
if err := cgc.network.TearDownPod(podNamespace, podName, containerID); err != nil {
|
||||
glog.Warningf("failed to tear down container %q network: %v", containerInfo.dockerName, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (cgc *containerGC) removeContainer(containerInfo containerGCInfo) {
|
||||
glog.V(4).Infof("Removing container %q", containerInfo.dockerName)
|
||||
err := cgc.client.RemoveContainer(containerInfo.id, dockertypes.ContainerRemoveOptions{RemoveVolumes: true})
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to remove container %q: %v", containerInfo.dockerName, err)
|
||||
}
|
||||
symlinkPath := LogSymlink(cgc.containerLogsDir, containerInfo.podNameWithNamespace, containerInfo.containerName, containerInfo.id)
|
||||
err = os.Remove(symlinkPath)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
glog.Warningf("Failed to remove container %q log symlink %q: %v", id, symlinkPath, err)
|
||||
glog.Warningf("Failed to remove container %q log symlink %q: %v", containerInfo.dockerName, symlinkPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (cgc *containerGC) deleteContainer(id string) error {
|
||||
containerInfo, err := cgc.client.InspectContainer(id)
|
||||
data, err := cgc.client.InspectContainer(id)
|
||||
if err != nil {
|
||||
glog.Warningf("Failed to inspect container %q: %v", id, err)
|
||||
return err
|
||||
}
|
||||
if containerInfo.State.Running {
|
||||
if data.State.Running {
|
||||
return fmt.Errorf("container %q is still running", id)
|
||||
}
|
||||
|
||||
containerName, _, err := ParseDockerName(containerInfo.Name)
|
||||
containerInfo, err := newContainerGCInfo(id, data, time.Now())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cgc.removeContainer(id, containerName.PodFullName, containerName.ContainerName)
|
||||
if containerIsNetworked(containerInfo.containerName) {
|
||||
cgc.netContainerCleanup(containerInfo)
|
||||
}
|
||||
|
||||
cgc.removeContainer(containerInfo)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -23,18 +23,23 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
"github.com/stretchr/testify/assert"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
|
||||
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
|
||||
)
|
||||
|
||||
func newTestContainerGC(t *testing.T) (*containerGC, *FakeDockerClient) {
|
||||
func newTestContainerGC(t *testing.T) (*containerGC, *FakeDockerClient, *nettest.MockNetworkPlugin) {
|
||||
fakeDocker := NewFakeDockerClient()
|
||||
fakePodGetter := newFakePodGetter()
|
||||
gc := NewContainerGC(fakeDocker, fakePodGetter, "")
|
||||
return gc, fakeDocker
|
||||
fakePlugin := nettest.NewMockNetworkPlugin(gomock.NewController(t))
|
||||
fakePlugin.EXPECT().Name().Return("someNetworkPlugin").AnyTimes()
|
||||
gc := NewContainerGC(fakeDocker, fakePodGetter, knetwork.NewPluginManager(fakePlugin), "")
|
||||
return gc, fakeDocker, fakePlugin
|
||||
}
|
||||
|
||||
// Makes a stable time object, lower id is earlier time.
|
||||
@ -91,7 +96,7 @@ func verifyStringArrayEqualsAnyOrder(t *testing.T, actual, expected []string) {
|
||||
}
|
||||
|
||||
func TestDeleteContainerSkipRunningContainer(t *testing.T) {
|
||||
gc, fakeDocker := newTestContainerGC(t)
|
||||
gc, fakeDocker, _ := newTestContainerGC(t)
|
||||
fakeDocker.SetFakeContainers([]*FakeContainer{
|
||||
makeContainer("1876", "foo", "POD", true, makeTime(0)),
|
||||
})
|
||||
@ -102,29 +107,65 @@ func TestDeleteContainerSkipRunningContainer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDeleteContainerRemoveDeadContainer(t *testing.T) {
|
||||
gc, fakeDocker := newTestContainerGC(t)
|
||||
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
|
||||
defer fakePlugin.Finish()
|
||||
fakeDocker.SetFakeContainers([]*FakeContainer{
|
||||
makeContainer("1876", "foo", "POD", false, makeTime(0)),
|
||||
})
|
||||
addPods(gc.podGetter, "foo")
|
||||
|
||||
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
|
||||
|
||||
assert.Nil(t, gc.deleteContainer("1876"))
|
||||
assert.Len(t, fakeDocker.Removed, 1)
|
||||
}
|
||||
|
||||
func TestGarbageCollectNetworkTeardown(t *testing.T) {
|
||||
// Ensure infra container gets teardown called
|
||||
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
|
||||
defer fakePlugin.Finish()
|
||||
id := kubecontainer.DockerID("1867").ContainerID()
|
||||
fakeDocker.SetFakeContainers([]*FakeContainer{
|
||||
makeContainer(id.ID, "foo", "POD", false, makeTime(0)),
|
||||
})
|
||||
addPods(gc.podGetter, "foo")
|
||||
|
||||
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), id).Return(nil)
|
||||
|
||||
assert.Nil(t, gc.deleteContainer(id.ID))
|
||||
assert.Len(t, fakeDocker.Removed, 1)
|
||||
|
||||
// Ensure non-infra container does not have teardown called
|
||||
gc, fakeDocker, fakePlugin = newTestContainerGC(t)
|
||||
id = kubecontainer.DockerID("1877").ContainerID()
|
||||
fakeDocker.SetFakeContainers([]*FakeContainer{
|
||||
makeContainer(id.ID, "foo", "adsfasdfasdf", false, makeTime(0)),
|
||||
})
|
||||
fakePlugin.EXPECT().SetUpPod(gomock.Any(), gomock.Any(), id).Return(nil)
|
||||
|
||||
addPods(gc.podGetter, "foo")
|
||||
|
||||
assert.Nil(t, gc.deleteContainer(id.ID))
|
||||
assert.Len(t, fakeDocker.Removed, 1)
|
||||
}
|
||||
|
||||
func TestGarbageCollectZeroMaxContainers(t *testing.T) {
|
||||
gc, fakeDocker := newTestContainerGC(t)
|
||||
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
|
||||
defer fakePlugin.Finish()
|
||||
fakeDocker.SetFakeContainers([]*FakeContainer{
|
||||
makeContainer("1876", "foo", "POD", false, makeTime(0)),
|
||||
})
|
||||
addPods(gc.podGetter, "foo")
|
||||
|
||||
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
|
||||
|
||||
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: 1, MaxContainers: 0}, true))
|
||||
assert.Len(t, fakeDocker.Removed, 1)
|
||||
}
|
||||
|
||||
func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) {
|
||||
gc, fakeDocker := newTestContainerGC(t)
|
||||
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
|
||||
defer fakePlugin.Finish()
|
||||
fakeDocker.SetFakeContainers([]*FakeContainer{
|
||||
makeContainer("1876", "foo", "POD", false, makeTime(0)),
|
||||
makeContainer("2876", "foo1", "POD", false, makeTime(1)),
|
||||
@ -134,12 +175,15 @@ func TestGarbageCollectNoMaxPerPodContainerLimit(t *testing.T) {
|
||||
})
|
||||
addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4")
|
||||
|
||||
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(5)
|
||||
|
||||
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: -1, MaxContainers: 4}, true))
|
||||
assert.Len(t, fakeDocker.Removed, 1)
|
||||
}
|
||||
|
||||
func TestGarbageCollectNoMaxLimit(t *testing.T) {
|
||||
gc, fakeDocker := newTestContainerGC(t)
|
||||
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
|
||||
defer fakePlugin.Finish()
|
||||
fakeDocker.SetFakeContainers([]*FakeContainer{
|
||||
makeContainer("1876", "foo", "POD", false, makeTime(0)),
|
||||
makeContainer("2876", "foo1", "POD", false, makeTime(0)),
|
||||
@ -149,6 +193,8 @@ func TestGarbageCollectNoMaxLimit(t *testing.T) {
|
||||
})
|
||||
addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4")
|
||||
|
||||
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).Times(5)
|
||||
|
||||
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Minute, MaxPerPodContainer: -1, MaxContainers: -1}, true))
|
||||
assert.Len(t, fakeDocker.Removed, 0)
|
||||
}
|
||||
@ -261,10 +307,12 @@ func TestGarbageCollect(t *testing.T) {
|
||||
}
|
||||
for i, test := range tests {
|
||||
t.Logf("Running test case with index %d", i)
|
||||
gc, fakeDocker := newTestContainerGC(t)
|
||||
gc, fakeDocker, fakePlugin := newTestContainerGC(t)
|
||||
fakeDocker.SetFakeContainers(test.containers)
|
||||
addPods(gc.podGetter, "foo", "foo1", "foo2", "foo3", "foo4", "foo5", "foo6", "foo7")
|
||||
fakePlugin.EXPECT().TearDownPod(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
|
||||
assert.Nil(t, gc.GarbageCollect(kubecontainer.ContainerGCPolicy{MinAge: time.Hour, MaxPerPodContainer: 2, MaxContainers: 6}, true))
|
||||
verifyStringArrayEqualsAnyOrder(t, fakeDocker.Removed, test.expectedRemoved)
|
||||
fakePlugin.Finish()
|
||||
}
|
||||
}
|
||||
|
@ -59,7 +59,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/images"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
knetwork "k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/hairpin"
|
||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||
@ -152,8 +152,8 @@ type DockerManager struct {
|
||||
// Directory of container logs.
|
||||
containerLogsDir string
|
||||
|
||||
// Network plugin.
|
||||
networkPlugin network.NetworkPlugin
|
||||
// Network plugin manager.
|
||||
network *knetwork.PluginManager
|
||||
|
||||
// Health check results.
|
||||
livenessManager proberesults.Manager
|
||||
@ -226,7 +226,7 @@ func NewDockerManager(
|
||||
burst int,
|
||||
containerLogsDir string,
|
||||
osInterface kubecontainer.OSInterface,
|
||||
networkPlugin network.NetworkPlugin,
|
||||
networkPlugin knetwork.NetworkPlugin,
|
||||
runtimeHelper kubecontainer.RuntimeHelper,
|
||||
httpClient types.HttpGetter,
|
||||
execHandler ExecHandler,
|
||||
@ -267,7 +267,7 @@ func NewDockerManager(
|
||||
dockerPuller: newDockerPuller(client),
|
||||
cgroupDriver: cgroupDriver,
|
||||
containerLogsDir: containerLogsDir,
|
||||
networkPlugin: networkPlugin,
|
||||
network: knetwork.NewPluginManager(networkPlugin),
|
||||
livenessManager: livenessManager,
|
||||
runtimeHelper: runtimeHelper,
|
||||
execHandler: execHandler,
|
||||
@ -282,7 +282,7 @@ func NewDockerManager(
|
||||
cmdRunner := kubecontainer.DirectStreamingRunner(dm)
|
||||
dm.runner = lifecycle.NewHandlerRunner(httpClient, cmdRunner, dm)
|
||||
dm.imagePuller = images.NewImageManager(kubecontainer.FilterEventRecorder(recorder), dm, imageBackOff, serializeImagePulls, qps, burst)
|
||||
dm.containerGC = NewContainerGC(client, podGetter, containerLogsDir)
|
||||
dm.containerGC = NewContainerGC(client, podGetter, dm.network, containerLogsDir)
|
||||
|
||||
dm.versionCache = cache.NewObjectCache(
|
||||
func() (interface{}, error) {
|
||||
@ -357,10 +357,10 @@ func (dm *DockerManager) determineContainerIP(podNamespace, podName string, cont
|
||||
isHostNetwork := networkMode == namespaceModeHost
|
||||
|
||||
// For host networking or default network plugin, GetPodNetworkStatus doesn't work
|
||||
if !isHostNetwork && dm.networkPlugin.Name() != network.DefaultPluginName {
|
||||
netStatus, err := dm.networkPlugin.GetPodNetworkStatus(podNamespace, podName, kubecontainer.DockerID(container.ID).ContainerID())
|
||||
if !isHostNetwork && dm.network.PluginName() != knetwork.DefaultPluginName {
|
||||
netStatus, err := dm.network.GetPodNetworkStatus(podNamespace, podName, kubecontainer.DockerID(container.ID).ContainerID())
|
||||
if err != nil {
|
||||
glog.Errorf("NetworkPlugin %s failed on the status hook for pod '%s' - %v", dm.networkPlugin.Name(), podName, err)
|
||||
glog.Error(err)
|
||||
return result, err
|
||||
} else if netStatus != nil {
|
||||
result = netStatus.IP.String()
|
||||
@ -436,7 +436,7 @@ func (dm *DockerManager) inspectContainer(id string, podName, podNamespace strin
|
||||
// Container that are running, restarting and paused
|
||||
status.State = kubecontainer.ContainerStateRunning
|
||||
status.StartedAt = startedAt
|
||||
if containerProvidesPodIP(dockerName) {
|
||||
if containerProvidesPodIP(dockerName.ContainerName) {
|
||||
ip, err = dm.determineContainerIP(podNamespace, podName, iResult)
|
||||
// Kubelet doesn't handle the network error scenario
|
||||
if err != nil {
|
||||
@ -1058,7 +1058,7 @@ func (dm *DockerManager) podInfraContainerChanged(pod *v1.Pod, podInfraContainer
|
||||
glog.V(4).Infof("host: %v, %v", pod.Spec.HostNetwork, networkMode)
|
||||
return true, nil
|
||||
}
|
||||
} else if dm.networkPlugin.Name() != "cni" && dm.networkPlugin.Name() != "kubenet" {
|
||||
} else if !dm.pluginDisablesDockerNetworking() {
|
||||
// Docker only exports ports from the pod infra container. Let's
|
||||
// collect all of the relevant ports and export them.
|
||||
for _, container := range pod.Spec.InitContainers {
|
||||
@ -1091,6 +1091,10 @@ func getDockerNetworkMode(container *dockertypes.ContainerJSON) string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (dm *DockerManager) pluginDisablesDockerNetworking() bool {
|
||||
return dm.network.PluginName() == "cni" || dm.network.PluginName() == "kubenet"
|
||||
}
|
||||
|
||||
// newDockerVersion returns a semantically versioned docker version value
|
||||
func newDockerVersion(version string) (*utilversion.Version, error) {
|
||||
return utilversion.ParseSemantic(version)
|
||||
@ -1436,21 +1440,22 @@ func (dm *DockerManager) KillPod(pod *v1.Pod, runningPod kubecontainer.Pod, grac
|
||||
}
|
||||
|
||||
// NOTE(random-liu): The pod passed in could be *nil* when kubelet restarted.
|
||||
func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
|
||||
// runtimePod may contain either running or exited containers
|
||||
func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runtimePod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) {
|
||||
// Short circuit if there's nothing to kill.
|
||||
if len(runningPod.Containers) == 0 {
|
||||
if len(runtimePod.Containers) == 0 {
|
||||
return
|
||||
}
|
||||
// Send the kills in parallel since they may take a long time.
|
||||
// There may be len(runningPod.Containers) or len(runningPod.Containers)-1 of result in the channel
|
||||
containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
|
||||
// There may be len(runtimePod.Containers) or len(runtimePod.Containers)-1 of result in the channel
|
||||
containerResults := make(chan *kubecontainer.SyncResult, len(runtimePod.Containers))
|
||||
wg := sync.WaitGroup{}
|
||||
var (
|
||||
networkContainer *kubecontainer.Container
|
||||
networkSpec *v1.Container
|
||||
networkContainers []*kubecontainer.Container
|
||||
networkSpecs []*v1.Container
|
||||
)
|
||||
wg.Add(len(runningPod.Containers))
|
||||
for _, container := range runningPod.Containers {
|
||||
wg.Add(len(runtimePod.Containers))
|
||||
for _, container := range runtimePod.Containers {
|
||||
go func(container *kubecontainer.Container) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer wg.Done()
|
||||
@ -1475,21 +1480,20 @@ func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubeconta
|
||||
|
||||
// TODO: Handle this without signaling the pod infra container to
|
||||
// adapt to the generic container runtime.
|
||||
if container.Name == PodInfraContainerName {
|
||||
if containerIsNetworked(container.Name) {
|
||||
// Store the container runtime for later deletion.
|
||||
// We do this so that PreStop handlers can run in the network namespace.
|
||||
networkContainer = container
|
||||
networkSpec = containerSpec
|
||||
return
|
||||
networkContainers = append(networkContainers, container)
|
||||
networkSpecs = append(networkSpecs, containerSpec)
|
||||
} else {
|
||||
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
|
||||
err := dm.KillContainerInPod(container.ID, containerSpec, pod, "Need to kill pod.", gracePeriodOverride)
|
||||
if err != nil {
|
||||
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
|
||||
glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", container.ID.ID, err, runtimePod.ID)
|
||||
}
|
||||
containerResults <- killContainerResult
|
||||
}
|
||||
|
||||
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
|
||||
err := dm.KillContainerInPod(container.ID, containerSpec, pod, "Need to kill pod.", gracePeriodOverride)
|
||||
if err != nil {
|
||||
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
|
||||
glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", container.ID.ID, err, runningPod.ID)
|
||||
}
|
||||
containerResults <- killContainerResult
|
||||
}(container)
|
||||
}
|
||||
wg.Wait()
|
||||
@ -1497,29 +1501,37 @@ func (dm *DockerManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubeconta
|
||||
for containerResult := range containerResults {
|
||||
result.AddSyncResult(containerResult)
|
||||
}
|
||||
if networkContainer != nil {
|
||||
|
||||
// Tear down any dead or running network/infra containers, but only kill
|
||||
// those that are still running.
|
||||
for i := range networkContainers {
|
||||
networkContainer := networkContainers[i]
|
||||
networkSpec := networkSpecs[i]
|
||||
|
||||
teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, kubecontainer.BuildPodFullName(runtimePod.Name, runtimePod.Namespace))
|
||||
result.AddSyncResult(teardownNetworkResult)
|
||||
|
||||
ins, err := dm.client.InspectContainer(networkContainer.ID.ID)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("Error inspecting container %v: %v", networkContainer.ID.ID, err)
|
||||
glog.Error(err)
|
||||
result.Fail(err)
|
||||
return
|
||||
teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
if getDockerNetworkMode(ins) != namespaceModeHost {
|
||||
teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace))
|
||||
result.AddSyncResult(teardownNetworkResult)
|
||||
glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", dm.networkPlugin.Name(), kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace))
|
||||
if err := dm.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, networkContainer.ID); err != nil {
|
||||
message := fmt.Sprintf("Failed to teardown network for pod %q using network plugins %q: %v", runningPod.ID, dm.networkPlugin.Name(), err)
|
||||
teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, message)
|
||||
glog.Error(message)
|
||||
if err := dm.network.TearDownPod(runtimePod.Namespace, runtimePod.Name, networkContainer.ID); err != nil {
|
||||
teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, err.Error())
|
||||
glog.Error(err)
|
||||
}
|
||||
}
|
||||
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, networkContainer.Name)
|
||||
result.AddSyncResult(killContainerResult)
|
||||
if err := dm.KillContainerInPod(networkContainer.ID, networkSpec, pod, "Need to kill pod.", gracePeriodOverride); err != nil {
|
||||
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
|
||||
glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", networkContainer.ID.ID, err, runningPod.ID)
|
||||
if networkContainer.State == kubecontainer.ContainerStateRunning {
|
||||
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, networkContainer.Name)
|
||||
result.AddSyncResult(killContainerResult)
|
||||
if err := dm.KillContainerInPod(networkContainer.ID, networkSpec, pod, "Need to kill pod.", gracePeriodOverride); err != nil {
|
||||
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
|
||||
glog.Errorf("Failed to delete container %v: %v; Skipping pod %q", networkContainer.ID.ID, err, runtimePod.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
@ -1915,7 +1927,7 @@ func (dm *DockerManager) createPodInfraContainer(pod *v1.Pod) (kubecontainer.Doc
|
||||
|
||||
if kubecontainer.IsHostNetworkPod(pod) {
|
||||
netNamespace = namespaceModeHost
|
||||
} else if dm.networkPlugin.Name() == "cni" || dm.networkPlugin.Name() == "kubenet" {
|
||||
} else if dm.pluginDisablesDockerNetworking() {
|
||||
netNamespace = "none"
|
||||
} else {
|
||||
// Docker only exports ports from the pod infra container. Let's
|
||||
@ -2148,9 +2160,29 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon
|
||||
glog.V(4).Infof("Killing Infra Container for %q, will start new one", format.Pod(pod))
|
||||
}
|
||||
|
||||
// Get list of running container(s) to kill
|
||||
podToKill := kubecontainer.ConvertPodStatusToRunningPod(dm.Type(), podStatus)
|
||||
|
||||
// If there are dead network containers, also kill them to ensure
|
||||
// their network resources get released and are available to be
|
||||
// re-used by new net containers
|
||||
for _, containerStatus := range podStatus.ContainerStatuses {
|
||||
if containerIsNetworked(containerStatus.Name) && containerStatus.State == kubecontainer.ContainerStateExited {
|
||||
container := &kubecontainer.Container{
|
||||
ID: containerStatus.ID,
|
||||
Name: containerStatus.Name,
|
||||
Image: containerStatus.Image,
|
||||
ImageID: containerStatus.ImageID,
|
||||
Hash: containerStatus.Hash,
|
||||
State: containerStatus.State,
|
||||
}
|
||||
podToKill.Containers = append(podToKill.Containers, container)
|
||||
}
|
||||
}
|
||||
|
||||
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
|
||||
// TODO(random-liu): We'll use pod status directly in the future
|
||||
killResult := dm.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(dm.Type(), podStatus), nil)
|
||||
killResult := dm.killPodWithSyncResult(pod, podToKill, nil)
|
||||
result.AddPodSyncResult(killResult)
|
||||
if killResult.Error() != nil {
|
||||
return
|
||||
@ -2217,20 +2249,14 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon
|
||||
setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod))
|
||||
result.AddSyncResult(setupNetworkResult)
|
||||
if !kubecontainer.IsHostNetworkPod(pod) {
|
||||
glog.V(3).Infof("Calling network plugin %s to setup pod for %s", dm.networkPlugin.Name(), format.Pod(pod))
|
||||
err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID())
|
||||
if err != nil {
|
||||
// TODO: (random-liu) There shouldn't be "Skipping pod" in sync result message
|
||||
message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v; Skipping pod", format.Pod(pod), dm.networkPlugin.Name(), err)
|
||||
setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message)
|
||||
glog.Error(message)
|
||||
if err := dm.network.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID.ContainerID()); err != nil {
|
||||
setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, err.Error())
|
||||
glog.Error(err)
|
||||
|
||||
// Delete infra container
|
||||
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, PodInfraContainerName)
|
||||
result.AddSyncResult(killContainerResult)
|
||||
if delErr := dm.KillContainerInPod(kubecontainer.ContainerID{
|
||||
ID: string(podInfraContainerID),
|
||||
Type: "docker"}, nil, pod, message, nil); delErr != nil {
|
||||
if delErr := dm.KillContainerInPod(podInfraContainerID.ContainerID(), nil, pod, err.Error(), nil); delErr != nil {
|
||||
killContainerResult.Fail(kubecontainer.ErrKillContainer, delErr.Error())
|
||||
glog.Warningf("Clear infra container failed for pod %q: %v", format.Pod(pod), delErr)
|
||||
}
|
||||
@ -2246,7 +2272,7 @@ func (dm *DockerManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecon
|
||||
}
|
||||
|
||||
if dm.configureHairpinMode {
|
||||
if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, network.DefaultInterfaceName); err != nil {
|
||||
if err = hairpin.SetUpContainerPid(podInfraContainer.State.Pid, knetwork.DefaultInterfaceName); err != nil {
|
||||
glog.Warningf("Hairpin setup failed for pod %q: %v", format.Pod(pod), err)
|
||||
}
|
||||
}
|
||||
@ -2679,7 +2705,7 @@ func (dm *DockerManager) GetPodStatus(uid kubetypes.UID, name, namespace string)
|
||||
}
|
||||
}
|
||||
containerStatuses = append(containerStatuses, result)
|
||||
if containerProvidesPodIP(dockerName) && ip != "" {
|
||||
if containerProvidesPodIP(dockerName.ContainerName) && ip != "" {
|
||||
podStatus.IP = ip
|
||||
}
|
||||
}
|
||||
|
@ -52,8 +52,13 @@ func getContainerIP(container *dockertypes.ContainerJSON) string {
|
||||
func getNetworkingMode() string { return "" }
|
||||
|
||||
// Returns true if the container name matches the infrastructure's container name
|
||||
func containerProvidesPodIP(name *KubeletContainerName) bool {
|
||||
return name.ContainerName == PodInfraContainerName
|
||||
func containerProvidesPodIP(containerName string) bool {
|
||||
return containerName == PodInfraContainerName
|
||||
}
|
||||
|
||||
// Only the infrastructure container needs network setup/teardown
|
||||
func containerIsNetworked(containerName string) bool {
|
||||
return containerName == PodInfraContainerName
|
||||
}
|
||||
|
||||
// Returns Seccomp and AppArmor Security options
|
||||
|
@ -33,7 +33,7 @@ import (
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/events"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/mock_network"
|
||||
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
|
||||
"k8s.io/kubernetes/pkg/security/apparmor"
|
||||
utilstrings "k8s.io/kubernetes/pkg/util/strings"
|
||||
)
|
||||
@ -455,8 +455,9 @@ func TestGetPodStatusFromNetworkPlugin(t *testing.T) {
|
||||
for _, test := range cases {
|
||||
dm, fakeDocker := newTestDockerManager()
|
||||
ctrl := gomock.NewController(t)
|
||||
fnp := mock_network.NewMockNetworkPlugin(ctrl)
|
||||
dm.networkPlugin = fnp
|
||||
defer ctrl.Finish()
|
||||
fnp := nettest.NewMockNetworkPlugin(ctrl)
|
||||
dm.network = network.NewPluginManager(fnp)
|
||||
|
||||
fakeDocker.SetFakeRunningContainers([]*FakeContainer{
|
||||
{
|
||||
|
@ -54,7 +54,6 @@ import (
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/images"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/mock_network"
|
||||
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
|
||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||
@ -1829,6 +1828,55 @@ func TestGetPodStatusNoSuchContainer(t *testing.T) {
|
||||
// Verify that we will try to start new contrainers even if the inspections
|
||||
// failed.
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Inspect dead infra container for possible network teardown
|
||||
"inspect_container",
|
||||
// Start a new infra container.
|
||||
"create", "start", "inspect_container", "inspect_container",
|
||||
// Start a new container.
|
||||
"create", "start", "inspect_container",
|
||||
})
|
||||
}
|
||||
|
||||
func TestSyncPodDeadInfraContainerTeardown(t *testing.T) {
|
||||
const (
|
||||
noSuchContainerID = "nosuchcontainer"
|
||||
infraContainerID = "9876"
|
||||
)
|
||||
dm, fakeDocker := newTestDockerManager()
|
||||
dm.podInfraContainerImage = "pod_infra_image"
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
fnp := nettest.NewMockNetworkPlugin(ctrl)
|
||||
dm.network = network.NewPluginManager(fnp)
|
||||
|
||||
pod := makePod("foo", &v1.PodSpec{
|
||||
Containers: []v1.Container{{Name: noSuchContainerID}},
|
||||
})
|
||||
|
||||
fakeDocker.SetFakeContainers([]*FakeContainer{
|
||||
{
|
||||
ID: infraContainerID,
|
||||
Name: "/k8s_POD." + strconv.FormatUint(generatePodInfraContainerHash(pod), 16) + "_foo_new_12345678_42",
|
||||
ExitCode: 0,
|
||||
StartedAt: time.Now(),
|
||||
FinishedAt: time.Now(),
|
||||
Running: false,
|
||||
},
|
||||
})
|
||||
|
||||
// Can be called multiple times due to GetPodStatus
|
||||
fnp.EXPECT().Name().Return("someNetworkPlugin").AnyTimes()
|
||||
fnp.EXPECT().TearDownPod("new", "foo", gomock.Any()).Return(nil)
|
||||
fnp.EXPECT().GetPodNetworkStatus("new", "foo", gomock.Any()).Return(&network.PodNetworkStatus{IP: net.ParseIP("1.1.1.1")}, nil).AnyTimes()
|
||||
fnp.EXPECT().SetUpPod("new", "foo", gomock.Any()).Return(nil)
|
||||
|
||||
runSyncPod(t, dm, fakeDocker, pod, nil, false)
|
||||
|
||||
// Verify that we will try to start new contrainers even if the inspections
|
||||
// failed.
|
||||
verifyCalls(t, fakeDocker, []string{
|
||||
// Inspect dead infra container for possible network teardown
|
||||
"inspect_container",
|
||||
// Start a new infra container.
|
||||
"create", "start", "inspect_container", "inspect_container",
|
||||
// Start a new container.
|
||||
@ -1878,8 +1926,8 @@ func TestSyncPodGetsPodIPFromNetworkPlugin(t *testing.T) {
|
||||
dm.podInfraContainerImage = "pod_infra_image"
|
||||
ctrl := gomock.NewController(t)
|
||||
defer ctrl.Finish()
|
||||
fnp := mock_network.NewMockNetworkPlugin(ctrl)
|
||||
dm.networkPlugin = fnp
|
||||
fnp := nettest.NewMockNetworkPlugin(ctrl)
|
||||
dm.network = network.NewPluginManager(fnp)
|
||||
|
||||
pod := makePod("foo", &v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
|
@ -42,7 +42,11 @@ func getNetworkingMode() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func containerProvidesPodIP(name *KubeletContainerName) bool {
|
||||
func containerProvidesPodIP(containerName string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func containerIsNetworked(containerName string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -65,8 +65,13 @@ func getNetworkingMode() string {
|
||||
|
||||
// Infrastructure containers are not supported on Windows. For this reason, we
|
||||
// make sure to not grab the infra container's IP for the pod.
|
||||
func containerProvidesPodIP(name *KubeletContainerName) bool {
|
||||
return name.ContainerName != PodInfraContainerName
|
||||
func containerProvidesPodIP(containerName string) bool {
|
||||
return containerName != PodInfraContainerName
|
||||
}
|
||||
|
||||
// All containers in Windows need networking setup/teardown
|
||||
func containerIsNetworked(containerName string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// Returns nil as both Seccomp and AppArmor security options are not valid on Windows
|
||||
|
@ -5,7 +5,6 @@ licenses(["notice"])
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
@ -31,17 +30,6 @@ go_library(
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["plugins_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/apis/componentconfig:go_default_library",
|
||||
"//pkg/kubelet/network/testing:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
@ -57,7 +45,6 @@ filegroup(
|
||||
"//pkg/kubelet/network/hairpin:all-srcs",
|
||||
"//pkg/kubelet/network/hostport:all-srcs",
|
||||
"//pkg/kubelet/network/kubenet:all-srcs",
|
||||
"//pkg/kubelet/network/mock_network:all-srcs",
|
||||
"//pkg/kubelet/network/testing:all-srcs",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
|
@ -1,34 +0,0 @@
|
||||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["network_plugins.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/apis/componentconfig:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/network:go_default_library",
|
||||
"//vendor:github.com/golang/mock/gomock",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
)
|
@ -20,6 +20,7 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@ -293,3 +294,123 @@ type NoopPortMappingGetter struct{}
|
||||
func (*NoopPortMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// The PluginManager wraps a kubelet network plugin and provides synchronization
|
||||
// for a given pod's network operations. Each pod's setup/teardown/status operations
|
||||
// are synchronized against each other, but network operations of other pods can
|
||||
// proceed in parallel.
|
||||
type PluginManager struct {
|
||||
// Network plugin being wrapped
|
||||
plugin NetworkPlugin
|
||||
|
||||
// Pod list and lock
|
||||
podsLock sync.Mutex
|
||||
pods map[string]*podLock
|
||||
}
|
||||
|
||||
func NewPluginManager(plugin NetworkPlugin) *PluginManager {
|
||||
return &PluginManager{
|
||||
plugin: plugin,
|
||||
pods: make(map[string]*podLock),
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *PluginManager) PluginName() string {
|
||||
return pm.plugin.Name()
|
||||
}
|
||||
|
||||
func (pm *PluginManager) Event(name string, details map[string]interface{}) {
|
||||
pm.plugin.Event(name, details)
|
||||
}
|
||||
|
||||
func (pm *PluginManager) Status() error {
|
||||
return pm.plugin.Status()
|
||||
}
|
||||
|
||||
type podLock struct {
|
||||
// Count of in-flight operations for this pod; when this reaches zero
|
||||
// the lock can be removed from the pod map
|
||||
refcount uint
|
||||
|
||||
// Lock to synchronize operations for this specific pod
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
// Lock network operations for a specific pod. If that pod is not yet in
|
||||
// the pod map, it will be added. The reference count for the pod will
|
||||
// be increased.
|
||||
func (pm *PluginManager) podLock(fullPodName string) *sync.Mutex {
|
||||
pm.podsLock.Lock()
|
||||
defer pm.podsLock.Unlock()
|
||||
|
||||
lock, ok := pm.pods[fullPodName]
|
||||
if !ok {
|
||||
lock = &podLock{}
|
||||
pm.pods[fullPodName] = lock
|
||||
}
|
||||
lock.refcount++
|
||||
return &lock.mu
|
||||
}
|
||||
|
||||
// Unlock network operations for a specific pod. The reference count for the
|
||||
// pod will be decreased. If the reference count reaches zero, the pod will be
|
||||
// removed from the pod map.
|
||||
func (pm *PluginManager) podUnlock(fullPodName string) {
|
||||
pm.podsLock.Lock()
|
||||
defer pm.podsLock.Unlock()
|
||||
|
||||
lock, ok := pm.pods[fullPodName]
|
||||
if !ok {
|
||||
glog.Warningf("Unbalanced pod lock unref for %s", fullPodName)
|
||||
return
|
||||
} else if lock.refcount == 0 {
|
||||
// This should never ever happen, but handle it anyway
|
||||
delete(pm.pods, fullPodName)
|
||||
glog.Warningf("Pod lock for %s still in map with zero refcount", fullPodName)
|
||||
return
|
||||
}
|
||||
lock.refcount--
|
||||
lock.mu.Unlock()
|
||||
if lock.refcount == 0 {
|
||||
delete(pm.pods, fullPodName)
|
||||
}
|
||||
}
|
||||
|
||||
func (pm *PluginManager) GetPodNetworkStatus(podNamespace, podName string, id kubecontainer.ContainerID) (*PodNetworkStatus, error) {
|
||||
fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
|
||||
pm.podLock(fullPodName).Lock()
|
||||
defer pm.podUnlock(fullPodName)
|
||||
|
||||
netStatus, err := pm.plugin.GetPodNetworkStatus(podNamespace, podName, id)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("NetworkPlugin %s failed on the status hook for pod %q: %v", pm.plugin.Name(), fullPodName, err)
|
||||
}
|
||||
|
||||
return netStatus, nil
|
||||
}
|
||||
|
||||
func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID) error {
|
||||
fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
|
||||
pm.podLock(fullPodName).Lock()
|
||||
defer pm.podUnlock(fullPodName)
|
||||
|
||||
glog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName)
|
||||
if err := pm.plugin.SetUpPod(podNamespace, podName, id); err != nil {
|
||||
return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pm *PluginManager) TearDownPod(podNamespace, podName string, id kubecontainer.ContainerID) error {
|
||||
fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
|
||||
pm.podLock(fullPodName).Lock()
|
||||
defer pm.podUnlock(fullPodName)
|
||||
|
||||
glog.V(3).Infof("Calling network plugin %s to tear down pod %q", pm.plugin.Name(), fullPodName)
|
||||
if err := pm.plugin.TearDownPod(podNamespace, podName, id); err != nil {
|
||||
return fmt.Errorf("NetworkPlugin %s failed to teardown pod %q network: %v", pm.plugin.Name(), fullPodName, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -1,38 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package network
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
|
||||
)
|
||||
|
||||
func TestSelectDefaultPlugin(t *testing.T) {
|
||||
all_plugins := []NetworkPlugin{}
|
||||
plug, err := InitNetworkPlugin(all_plugins, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8", UseDefaultMTU)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error in selecting default plugin: %v", err)
|
||||
}
|
||||
if plug == nil {
|
||||
t.Fatalf("Failed to select the default plugin.")
|
||||
}
|
||||
if plug.Name() != DefaultPluginName {
|
||||
t.Errorf("Failed to select the default plugin. Expected %s. Got %s", DefaultPluginName, plug.Name())
|
||||
}
|
||||
}
|
@ -5,18 +5,40 @@ licenses(["notice"])
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["fake_host.go"],
|
||||
srcs = [
|
||||
"fake_host.go",
|
||||
"mock_network_plugin.go",
|
||||
],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api/v1:go_default_library",
|
||||
"//pkg/apis/componentconfig:go_default_library",
|
||||
"//pkg/client/clientset_generated/clientset:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/container/testing:go_default_library",
|
||||
"//pkg/kubelet/network:go_default_library",
|
||||
"//pkg/kubelet/network/hostport:go_default_library",
|
||||
"//vendor:github.com/golang/mock/gomock",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["plugins_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/apis/componentconfig:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/network:go_default_library",
|
||||
"//vendor:github.com/golang/mock/gomock",
|
||||
"//vendor:k8s.io/apimachinery/pkg/util/sets",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -14,11 +14,11 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Generated code, generated via: `mockgen k8s.io/kubernetes/pkg/kubelet/network NetworkPlugin > $GOPATH/src/k8s.io/kubernetes/pkg/kubelet/network/mock_network/network_plugins.go`
|
||||
// Generated code, generated via: `mockgen k8s.io/kubernetes/pkg/kubelet/network NetworkPlugin > $GOPATH/src/k8s.io/kubernetes/pkg/kubelet/network/testing/mock_network_plugin.go`
|
||||
// Edited by hand for boilerplate and gofmt.
|
||||
// TODO, this should be autogenerated/autoupdated by scripts.
|
||||
|
||||
package mock_network
|
||||
package testing
|
||||
|
||||
import (
|
||||
gomock "github.com/golang/mock/gomock"
|
218
pkg/kubelet/network/testing/plugins_test.go
Normal file
218
pkg/kubelet/network/testing/plugins_test.go
Normal file
@ -0,0 +1,218 @@
|
||||
/*
|
||||
Copyright 2014 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package testing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
utilsets "k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
|
||||
"github.com/golang/mock/gomock"
|
||||
)
|
||||
|
||||
func TestSelectDefaultPlugin(t *testing.T) {
|
||||
all_plugins := []network.NetworkPlugin{}
|
||||
plug, err := network.InitNetworkPlugin(all_plugins, "", NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8", network.UseDefaultMTU)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error in selecting default plugin: %v", err)
|
||||
}
|
||||
if plug == nil {
|
||||
t.Fatalf("Failed to select the default plugin.")
|
||||
}
|
||||
if plug.Name() != network.DefaultPluginName {
|
||||
t.Errorf("Failed to select the default plugin. Expected %s. Got %s", network.DefaultPluginName, plug.Name())
|
||||
}
|
||||
}
|
||||
|
||||
func TestPluginManager(t *testing.T) {
|
||||
ctrl := gomock.NewController(t)
|
||||
fnp := NewMockNetworkPlugin(ctrl)
|
||||
defer fnp.Finish()
|
||||
pm := network.NewPluginManager(fnp)
|
||||
|
||||
fnp.EXPECT().Name().Return("someNetworkPlugin").AnyTimes()
|
||||
|
||||
allCreatedWg := sync.WaitGroup{}
|
||||
allCreatedWg.Add(1)
|
||||
allDoneWg := sync.WaitGroup{}
|
||||
|
||||
// 10 pods, 4 setup/status/teardown runs each. Ensure that network locking
|
||||
// works and the pod map isn't concurrently accessed
|
||||
for i := 0; i < 10; i++ {
|
||||
podName := fmt.Sprintf("pod%d", i)
|
||||
containerID := kubecontainer.ContainerID{ID: podName}
|
||||
|
||||
fnp.EXPECT().SetUpPod("", podName, containerID).Return(nil).Times(4)
|
||||
fnp.EXPECT().GetPodNetworkStatus("", podName, containerID).Return(&network.PodNetworkStatus{IP: net.ParseIP("1.2.3.4")}, nil).Times(4)
|
||||
fnp.EXPECT().TearDownPod("", podName, containerID).Return(nil).Times(4)
|
||||
|
||||
for x := 0; x < 4; x++ {
|
||||
allDoneWg.Add(1)
|
||||
go func(name string, id kubecontainer.ContainerID, num int) {
|
||||
defer allDoneWg.Done()
|
||||
|
||||
// Block all goroutines from running until all have
|
||||
// been created and are ready. This ensures we
|
||||
// have more pod network operations running
|
||||
// concurrently.
|
||||
allCreatedWg.Wait()
|
||||
|
||||
if err := pm.SetUpPod("", name, id); err != nil {
|
||||
t.Errorf("Failed to set up pod %q: %v", name, err)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := pm.GetPodNetworkStatus("", name, id); err != nil {
|
||||
t.Errorf("Failed to inspect pod %q: %v", name, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := pm.TearDownPod("", name, id); err != nil {
|
||||
t.Errorf("Failed to tear down pod %q: %v", name, err)
|
||||
return
|
||||
}
|
||||
}(podName, containerID, x)
|
||||
}
|
||||
}
|
||||
// Block all goroutines from running until all have been created and started
|
||||
allCreatedWg.Done()
|
||||
|
||||
// Wait for them all to finish
|
||||
allDoneWg.Wait()
|
||||
}
|
||||
|
||||
type hookableFakeNetworkPluginSetupHook func(namespace, name string, id kubecontainer.ContainerID)
|
||||
|
||||
type hookableFakeNetworkPlugin struct {
|
||||
setupHook hookableFakeNetworkPluginSetupHook
|
||||
}
|
||||
|
||||
func newHookableFakeNetworkPlugin(setupHook hookableFakeNetworkPluginSetupHook) *hookableFakeNetworkPlugin {
|
||||
return &hookableFakeNetworkPlugin{
|
||||
setupHook: setupHook,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *hookableFakeNetworkPlugin) Init(host network.Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *hookableFakeNetworkPlugin) Event(name string, details map[string]interface{}) {
|
||||
}
|
||||
|
||||
func (p *hookableFakeNetworkPlugin) Name() string {
|
||||
return "fakeplugin"
|
||||
}
|
||||
|
||||
func (p *hookableFakeNetworkPlugin) Capabilities() utilsets.Int {
|
||||
return utilsets.NewInt()
|
||||
}
|
||||
|
||||
func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
|
||||
if p.setupHook != nil {
|
||||
p.setupHook(namespace, name, id)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *hookableFakeNetworkPlugin) TearDownPod(string, string, kubecontainer.ContainerID) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *hookableFakeNetworkPlugin) GetPodNetworkStatus(string, string, kubecontainer.ContainerID) (*network.PodNetworkStatus, error) {
|
||||
return &network.PodNetworkStatus{IP: net.ParseIP("10.1.2.3")}, nil
|
||||
}
|
||||
|
||||
func (p *hookableFakeNetworkPlugin) Status() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Ensure that one pod's network operations don't block another's. If the
|
||||
// test is successful (eg, first pod doesn't block on second) the test
|
||||
// will complete. If unsuccessful, it will hang and get killed.
|
||||
func TestMultiPodParallelNetworkOps(t *testing.T) {
|
||||
podWg := sync.WaitGroup{}
|
||||
podWg.Add(1)
|
||||
|
||||
// Can't do this with MockNetworkPlugin because the gomock controller
|
||||
// has its own locks which don't allow the parallel network operation
|
||||
// to proceed.
|
||||
didWait := false
|
||||
fakePlugin := newHookableFakeNetworkPlugin(func(podNamespace, podName string, id kubecontainer.ContainerID) {
|
||||
if podName == "waiter" {
|
||||
podWg.Wait()
|
||||
didWait = true
|
||||
}
|
||||
})
|
||||
pm := network.NewPluginManager(fakePlugin)
|
||||
|
||||
opsWg := sync.WaitGroup{}
|
||||
|
||||
// Start the pod that will wait for the other to complete
|
||||
opsWg.Add(1)
|
||||
go func() {
|
||||
defer opsWg.Done()
|
||||
|
||||
podName := "waiter"
|
||||
containerID := kubecontainer.ContainerID{ID: podName}
|
||||
|
||||
// Setup will block on the runner pod completing. If network
|
||||
// operations locking isn't correct (eg pod network operations
|
||||
// block other pods) setUpPod() will never return.
|
||||
if err := pm.SetUpPod("", podName, containerID); err != nil {
|
||||
t.Errorf("Failed to set up waiter pod: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := pm.TearDownPod("", podName, containerID); err != nil {
|
||||
t.Errorf("Failed to tear down waiter pod: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
opsWg.Add(1)
|
||||
go func() {
|
||||
defer opsWg.Done()
|
||||
// Let other pod proceed
|
||||
defer podWg.Done()
|
||||
|
||||
podName := "runner"
|
||||
containerID := kubecontainer.ContainerID{ID: podName}
|
||||
|
||||
if err := pm.SetUpPod("", podName, containerID); err != nil {
|
||||
t.Errorf("Failed to set up runner pod: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := pm.TearDownPod("", podName, containerID); err != nil {
|
||||
t.Errorf("Failed to tear down runner pod: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
opsWg.Wait()
|
||||
|
||||
if !didWait {
|
||||
t.Errorf("waiter pod didn't wait for runner pod!")
|
||||
}
|
||||
}
|
@ -76,7 +76,7 @@ go_test(
|
||||
"//pkg/kubelet/lifecycle:go_default_library",
|
||||
"//pkg/kubelet/network:go_default_library",
|
||||
"//pkg/kubelet/network/kubenet:go_default_library",
|
||||
"//pkg/kubelet/network/mock_network:go_default_library",
|
||||
"//pkg/kubelet/network/testing:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
"//pkg/util/exec:go_default_library",
|
||||
"//vendor:github.com/appc/spec/schema",
|
||||
|
@ -163,8 +163,8 @@ type Runtime struct {
|
||||
execer utilexec.Interface
|
||||
os kubecontainer.OSInterface
|
||||
|
||||
// Network plugin.
|
||||
networkPlugin network.NetworkPlugin
|
||||
// Network plugin manager.
|
||||
network *network.PluginManager
|
||||
|
||||
// If true, the "hairpin mode" flag is set on container interfaces.
|
||||
// A false value means the kubelet just backs off from setting it,
|
||||
@ -266,7 +266,7 @@ func New(
|
||||
runtimeHelper: runtimeHelper,
|
||||
recorder: recorder,
|
||||
livenessManager: livenessManager,
|
||||
networkPlugin: networkPlugin,
|
||||
network: network.NewPluginManager(networkPlugin),
|
||||
execer: execer,
|
||||
touchPath: touchPath,
|
||||
nsenterPath: nsenterPath,
|
||||
@ -946,7 +946,7 @@ func serviceFilePath(serviceName string) string {
|
||||
// The pod does not run in host network. And
|
||||
// The pod runs inside a netns created outside of rkt.
|
||||
func (r *Runtime) shouldCreateNetns(pod *v1.Pod) bool {
|
||||
return !kubecontainer.IsHostNetworkPod(pod) && r.networkPlugin.Name() != network.DefaultPluginName
|
||||
return !kubecontainer.IsHostNetworkPod(pod) && r.network.PluginName() != network.DefaultPluginName
|
||||
}
|
||||
|
||||
// usesRktHostNetwork returns true if:
|
||||
@ -1047,18 +1047,17 @@ func (r *Runtime) generateRunCommand(pod *v1.Pod, uuid, netnsName string) (strin
|
||||
}
|
||||
|
||||
func (r *Runtime) cleanupPodNetwork(pod *v1.Pod) error {
|
||||
glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.networkPlugin.Name(), format.Pod(pod))
|
||||
glog.V(3).Infof("Calling network plugin %s to tear down pod for %s", r.network.PluginName(), format.Pod(pod))
|
||||
|
||||
// No-op if the pod is not running in a created netns.
|
||||
if !r.shouldCreateNetns(pod) {
|
||||
return nil
|
||||
}
|
||||
|
||||
var teardownErr error
|
||||
containerID := kubecontainer.ContainerID{ID: string(pod.UID)}
|
||||
if err := r.networkPlugin.TearDownPod(pod.Namespace, pod.Name, containerID); err != nil {
|
||||
teardownErr = fmt.Errorf("rkt: failed to tear down network for pod %s: %v", format.Pod(pod), err)
|
||||
glog.Errorf("%v", teardownErr)
|
||||
teardownErr := r.network.TearDownPod(pod.Namespace, pod.Name, containerID)
|
||||
if teardownErr != nil {
|
||||
glog.Error(teardownErr)
|
||||
}
|
||||
|
||||
if _, err := r.execer.Command("ip", "netns", "del", makePodNetnsName(pod.UID)).Output(); err != nil {
|
||||
@ -1265,7 +1264,7 @@ func netnsPathFromName(netnsName string) string {
|
||||
//
|
||||
// If the pod is running in host network or is running using the no-op plugin, then nothing will be done.
|
||||
func (r *Runtime) setupPodNetwork(pod *v1.Pod) (string, string, error) {
|
||||
glog.V(3).Infof("Calling network plugin %s to set up pod for %s", r.networkPlugin.Name(), format.Pod(pod))
|
||||
glog.V(3).Infof("Calling network plugin %s to set up pod for %s", r.network.PluginName(), format.Pod(pod))
|
||||
|
||||
// No-op if the pod is not running in a created netns.
|
||||
if !r.shouldCreateNetns(pod) {
|
||||
@ -1282,15 +1281,14 @@ func (r *Runtime) setupPodNetwork(pod *v1.Pod) (string, string, error) {
|
||||
}
|
||||
|
||||
// Set up networking with the network plugin
|
||||
glog.V(3).Infof("Calling network plugin %s to setup pod for %s", r.networkPlugin.Name(), format.Pod(pod))
|
||||
containerID := kubecontainer.ContainerID{ID: string(pod.UID)}
|
||||
err = r.networkPlugin.SetUpPod(pod.Namespace, pod.Name, containerID)
|
||||
err = r.network.SetUpPod(pod.Namespace, pod.Name, containerID)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to set up pod network: %v", err)
|
||||
return "", "", err
|
||||
}
|
||||
status, err := r.networkPlugin.GetPodNetworkStatus(pod.Namespace, pod.Name, containerID)
|
||||
status, err := r.network.GetPodNetworkStatus(pod.Namespace, pod.Name, containerID)
|
||||
if err != nil {
|
||||
return "", "", fmt.Errorf("failed to get status of pod network: %v", err)
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
if r.configureHairpinMode {
|
||||
@ -2329,7 +2327,7 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube
|
||||
}
|
||||
|
||||
// If we are running no-op network plugin, then get the pod IP from the rkt pod status.
|
||||
if r.networkPlugin.Name() == network.DefaultPluginName {
|
||||
if r.network.PluginName() == network.DefaultPluginName {
|
||||
if latestPod != nil {
|
||||
for _, n := range latestPod.Networks {
|
||||
if n.Name == defaultNetworkName {
|
||||
@ -2340,9 +2338,9 @@ func (r *Runtime) GetPodStatus(uid kubetypes.UID, name, namespace string) (*kube
|
||||
}
|
||||
} else {
|
||||
containerID := kubecontainer.ContainerID{ID: string(uid)}
|
||||
status, err := r.networkPlugin.GetPodNetworkStatus(namespace, name, containerID)
|
||||
status, err := r.network.GetPodNetworkStatus(namespace, name, containerID)
|
||||
if err != nil {
|
||||
glog.Warningf("rkt: Failed to get pod network status for pod (UID %q, name %q, namespace %q): %v", uid, name, namespace, err)
|
||||
glog.Warningf("rkt: %v", err)
|
||||
} else if status != nil {
|
||||
// status can be nil when the pod is running on the host network, in which case the pod IP
|
||||
// will be populated by the upper layer.
|
||||
|
@ -43,7 +43,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/kubenet"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/mock_network"
|
||||
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
||||
)
|
||||
@ -583,7 +583,7 @@ func TestGetPodStatus(t *testing.T) {
|
||||
defer ctrl.Finish()
|
||||
fr := newFakeRktInterface()
|
||||
fs := newFakeSystemd()
|
||||
fnp := mock_network.NewMockNetworkPlugin(ctrl)
|
||||
fnp := nettest.NewMockNetworkPlugin(ctrl)
|
||||
fos := &containertesting.FakeOS{}
|
||||
frh := &fakeRuntimeHelper{}
|
||||
r := &Runtime{
|
||||
@ -591,7 +591,7 @@ func TestGetPodStatus(t *testing.T) {
|
||||
systemd: fs,
|
||||
runtimeHelper: frh,
|
||||
os: fos,
|
||||
networkPlugin: fnp,
|
||||
network: network.NewPluginManager(fnp),
|
||||
}
|
||||
|
||||
ns := func(seconds int64) int64 {
|
||||
@ -826,6 +826,8 @@ func TestGetPodStatus(t *testing.T) {
|
||||
} else {
|
||||
fnp.EXPECT().GetPodNetworkStatus("default", "guestbook", kubecontainer.ContainerID{ID: "42"}).
|
||||
Return(nil, fmt.Errorf("no such network"))
|
||||
// Plugin name only requested again in error case
|
||||
fnp.EXPECT().Name().Return(tt.networkPluginName)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1388,7 +1390,7 @@ func TestGenerateRunCommand(t *testing.T) {
|
||||
|
||||
for i, tt := range tests {
|
||||
testCaseHint := fmt.Sprintf("test case #%d", i)
|
||||
rkt.networkPlugin = tt.networkPlugin
|
||||
rkt.network = network.NewPluginManager(tt.networkPlugin)
|
||||
rkt.runtimeHelper = &fakeRuntimeHelper{tt.dnsServers, tt.dnsSearches, tt.hostName, "", tt.err}
|
||||
rkt.execer = &utilexec.FakeExec{CommandScript: []utilexec.FakeCommandAction{func(cmd string, args ...string) utilexec.Cmd {
|
||||
return utilexec.InitFakeCmd(&utilexec.FakeCmd{}, cmd, args...)
|
||||
|
Loading…
Reference in New Issue
Block a user