Merge pull request #35572 from bprashanth/ip_gc

Automatic merge from submit-queue

GC pod ips

Finally managed to write a *failing* test. 
Supersedes https://github.com/kubernetes/kubernetes/pull/34373

```release-note
GC pod ips
```
This commit is contained in:
Kubernetes Submit Queue 2016-10-28 14:44:28 -07:00 committed by GitHub
commit cf7178d7c3
5 changed files with 240 additions and 5 deletions

View File

@ -21,6 +21,7 @@ package kubenet
import (
"fmt"
"net"
"path/filepath"
"strings"
"sync"
"syscall"
@ -69,6 +70,10 @@ const (
// ebtables Chain to store dedup rules
dedupChain = utilebtables.Chain("KUBE-DEDUP")
// defaultIPAMDir is the default location for the checkpoint files stored by host-local ipam
// https://github.com/containernetworking/cni/tree/master/plugins/ipam/host-local#backends
defaultIPAMDir = "/var/lib/cni/networks"
)
// CNI plugins required by kubenet in /opt/cni/bin or vendor directory
@ -434,6 +439,16 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
// Not a hard error or warning
glog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err)
}
// TODO: Remove this hack once we've figured out how to retrieve the netns
// of an exited container. Currently, restarting docker will leak a bunch of
// ips. This will exhaust available ip space unless we cleanup old ips. At the
// same time we don't want to try GC'ing them periodically as that could lead
// to a performance regression in starting pods. So on each setup failure, try
// GC on the assumption that the kubelet is going to retry pod creation, and
// when it does, there will be ips.
plugin.ipamGarbageCollection()
return err
}
@ -572,20 +587,32 @@ func (plugin *kubenetNetworkPlugin) checkCNIPluginInDir(dir string) bool {
return true
}
// Returns a list of pods running or ready to run on this node and each pod's IP address.
// Assumes PodSpecs retrieved from the runtime include the name and ID of containers in
// each pod.
func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, error) {
// getNonExitedPods returns a list of pods that have at least one running container.
func (plugin *kubenetNetworkPlugin) getNonExitedPods() ([]*kubecontainer.Pod, error) {
ret := []*kubecontainer.Pod{}
pods, err := plugin.host.GetRuntime().GetPods(true)
if err != nil {
return nil, fmt.Errorf("Failed to retrieve pods from runtime: %v", err)
}
activePods := make([]*hostport.ActivePod, 0)
for _, p := range pods {
if podIsExited(p) {
continue
}
ret = append(ret, p)
}
return ret, nil
}
// Returns a list of pods running or ready to run on this node and each pod's IP address.
// Assumes PodSpecs retrieved from the runtime include the name and ID of containers in
// each pod.
func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, error) {
pods, err := plugin.getNonExitedPods()
if err != nil {
return nil, err
}
activePods := make([]*hostport.ActivePod, 0)
for _, p := range pods {
containerID, err := plugin.host.GetRuntime().GetPodContainerID(p)
if err != nil {
continue
@ -608,6 +635,77 @@ func (plugin *kubenetNetworkPlugin) getActivePods() ([]*hostport.ActivePod, erro
return activePods, nil
}
// ipamGarbageCollection will release unused IP.
// kubenet uses the CNI bridge plugin, which stores allocated ips on file. Each
// file created under defaultIPAMDir has the format: ip/container-hash. So this
// routine looks for hashes that are not reported by the currently running docker,
// and invokes DelNetwork on each one. Note that this will only work for the
// current CNI bridge plugin, because we have no way of finding the NetNs.
func (plugin *kubenetNetworkPlugin) ipamGarbageCollection() {
glog.V(2).Infof("Starting IP garbage collection")
ipamDir := filepath.Join(defaultIPAMDir, KubenetPluginName)
files, err := ioutil.ReadDir(ipamDir)
if err != nil {
glog.Errorf("Failed to list files in %q: %v", ipamDir, err)
return
}
// gather containerIDs for allocated ips
ipContainerIdMap := make(map[string]string)
for _, file := range files {
// skip non checkpoint file
if ip := net.ParseIP(file.Name()); ip == nil {
continue
}
content, err := ioutil.ReadFile(filepath.Join(ipamDir, file.Name()))
if err != nil {
glog.Errorf("Failed to read file %v: %v", file, err)
}
ipContainerIdMap[file.Name()] = strings.TrimSpace(string(content))
}
// gather infra container IDs of current running Pods
runningContainerIDs := utilsets.String{}
pods, err := plugin.getNonExitedPods()
if err != nil {
glog.Errorf("Failed to get pods: %v", err)
return
}
for _, pod := range pods {
containerID, err := plugin.host.GetRuntime().GetPodContainerID(pod)
if err != nil {
glog.Warningf("Failed to get infra containerID of %q/%q: %v", pod.Namespace, pod.Name, err)
continue
}
runningContainerIDs.Insert(strings.TrimSpace(containerID.ID))
}
// release leaked ips
for ip, containerID := range ipContainerIdMap {
// if the container is not running, release IP
if runningContainerIDs.Has(containerID) {
continue
}
// CNI requires all config to be presented, although only containerID is needed in this case
rt := &libcni.RuntimeConf{
ContainerID: containerID,
IfName: network.DefaultInterfaceName,
// TODO: How do we find the NetNs of an exited container? docker inspect
// doesn't show us the pid, so we probably need to checkpoint
NetNS: "",
}
glog.V(2).Infof("Releasing IP %q allocated to %q.", ip, containerID)
// CNI bridge plugin should try to release IP and then return
if err := plugin.cniConfig.DelNetwork(plugin.netConfig, rt); err != nil {
glog.Errorf("Error while releasing IP: %v", err)
}
}
}
// podIsExited returns true if the pod is exited (all containers inside are exited).
func podIsExited(p *kubecontainer.Pod) bool {
for _, c := range p.Containers {

View File

@ -66,6 +66,7 @@ go_test(
"memory_eviction_test.go",
"mirror_pod_test.go",
"resource_usage_test.go",
"restart_test.go",
"runtime_conformance_test.go",
"summary_test.go",
],
@ -95,6 +96,7 @@ go_test(
"//test/e2e/common:go_default_library",
"//test/e2e/framework:go_default_library",
"//test/e2e_node/services:go_default_library",
"//test/utils:go_default_library",
"//vendor:github.com/davecgh/go-spew/spew",
"//vendor:github.com/golang/glog",
"//vendor:github.com/onsi/ginkgo",

View File

@ -0,0 +1,117 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e_node
import (
"k8s.io/kubernetes/test/e2e/framework"
"time"
"fmt"
. "github.com/onsi/ginkgo"
"k8s.io/kubernetes/pkg/api"
testutils "k8s.io/kubernetes/test/utils"
"os/exec"
)
// waitForPods waits for timeout duration, for pod_count.
// If the timeout is hit, it returns the list of currently running pods.
func waitForPods(f *framework.Framework, pod_count int, timeout time.Duration) (runningPods []*api.Pod) {
for start := time.Now(); time.Since(start) < timeout; time.Sleep(10 * time.Second) {
podList, err := f.PodClient().List(api.ListOptions{})
if err != nil {
framework.Logf("Failed to list pods on node: %v", err)
continue
}
runningPods = []*api.Pod{}
for _, pod := range podList.Items {
if r, err := testutils.PodRunningReady(&pod); err != nil || !r {
continue
}
runningPods = append(runningPods, &pod)
}
framework.Logf("Running pod count %d", len(runningPods))
if len(runningPods) >= pod_count {
break
}
}
return runningPods
}
var _ = framework.KubeDescribe("Restart [Serial] [Slow] [Disruptive]", func() {
const (
// Saturate the node. It's not necessary that all these pods enter
// Running/Ready, because we don't know the number of cores in the
// test node or default limits applied (if any). It's is essential
// that no containers end up in terminated. 100 was chosen because
// it's the max pods per node.
podCount = 100
podCreationInterval = 100 * time.Millisecond
recoverTimeout = 5 * time.Minute
startTimeout = 3 * time.Minute
// restartCount is chosen so even with minPods we exhaust the default
// allocation of a /24.
minPods = 50
restartCount = 6
)
f := framework.NewDefaultFramework("restart-test")
Context("Docker Daemon", func() {
Context("Network", func() {
It("should recover from ip leak", func() {
pods := newTestPods(podCount, framework.GetPauseImageNameForHostArch(), "restart-docker-test")
By(fmt.Sprintf("Trying to create %d pods on node", len(pods)))
createBatchPodWithRateControl(f, pods, podCreationInterval)
defer deletePodsSync(f, pods)
// Give the node some time to stabilize, assume pods that enter RunningReady within
// startTimeout fit on the node and the node is now saturated.
runningPods := waitForPods(f, podCount, startTimeout)
if len(runningPods) < minPods {
framework.Failf("Failed to start %d pods, cannot test that restarting docker doesn't leak IPs", minPods)
}
for i := 0; i < restartCount; i += 1 {
By(fmt.Sprintf("Restarting Docker Daemon iteration %d", i))
// TODO: Find a uniform way to deal with systemctl/initctl/service operations. #34494
if stdout, err := exec.Command("sudo", "systemctl", "restart", "docker").CombinedOutput(); err != nil {
framework.Logf("Failed to trigger docker restart with systemd/systemctl: %v, stdout: %q", err, string(stdout))
if stdout, err = exec.Command("sudo", "service", "docker", "restart").CombinedOutput(); err != nil {
framework.Failf("Failed to trigger docker restart with upstart/service: %v, stdout: %q", err, string(stdout))
}
}
time.Sleep(20 * time.Second)
}
By("Checking currently Running/Ready pods")
postRestartRunningPods := waitForPods(f, len(runningPods), recoverTimeout)
if len(postRestartRunningPods) == 0 {
framework.Failf("Failed to start *any* pods after docker restart, this might indicate an IP leak")
}
By("Confirm no containers have terminated")
for _, pod := range postRestartRunningPods {
if c := testutils.TerminatedContainers(pod); len(c) != 0 {
framework.Failf("Pod %q has failed containers %+v after docker restart, this might indicate an IP leak", pod.Name, c)
}
}
By(fmt.Sprintf("Docker restart test passed with %d pods", len(postRestartRunningPods)))
})
})
})
})

View File

@ -343,6 +343,7 @@ ResourceQuota should create a ResourceQuota and capture the life of a service.,t
ResourceQuota should create a ResourceQuota and ensure its status is promptly calculated.,krousey,1
ResourceQuota should verify ResourceQuota with best effort scope.,mml,1
ResourceQuota should verify ResourceQuota with terminating scopes.,ncdc,1
Restart Docker Daemon Network should recover from ip leak,bprashanth,0
Restart should restart all nodes and ensure all nodes and pods recover,andyzheng0831,1
RethinkDB should create and stop rethinkdb servers,mwielgus,1
SSH should SSH to all nodes and run commands,quinton-hoole,0

1 name owner auto-assigned
343 ResourceQuota should create a ResourceQuota and ensure its status is promptly calculated. krousey 1
344 ResourceQuota should verify ResourceQuota with best effort scope. mml 1
345 ResourceQuota should verify ResourceQuota with terminating scopes. ncdc 1
346 Restart Docker Daemon Network should recover from ip leak bprashanth 0
347 Restart should restart all nodes and ensure all nodes and pods recover andyzheng0831 1
348 RethinkDB should create and stop rethinkdb servers mwielgus 1
349 SSH should SSH to all nodes and run commands quinton-hoole 0

View File

@ -82,6 +82,23 @@ func FailedContainers(pod *api.Pod) map[string]ContainerFailures {
return states
}
// TerminatedContainers inspects all containers in a pod and returns a map
// of "container name: termination reason", for all currently terminated
// containers.
func TerminatedContainers(pod *api.Pod) map[string]string {
states := make(map[string]string)
statuses := pod.Status.ContainerStatuses
if len(statuses) == 0 {
return states
}
for _, status := range statuses {
if status.State.Terminated != nil {
states[status.Name] = status.State.Terminated.Reason
}
}
return states
}
// PodNotReady checks whether pod p's has a ready condition of status false.
func PodNotReady(p *api.Pod) (bool, error) {
// Check the ready condition is false.