Adapt pod killing and cleanup for generic container runtime

This change removes docker-specifc code in killUnwantedPods. It
also instructs the cleanup code to move away from interacting with
containers directly. They should always deal with the pod-level
abstraction if at all possible.
This commit is contained in:
Yu-Ju Hong 2015-04-29 10:47:25 -07:00
parent ba1140a54f
commit d81ecc58f8
4 changed files with 45 additions and 184 deletions

View File

@ -391,114 +391,6 @@ func TestIsImagePresent(t *testing.T) {
}
}
func TestGetRunningContainers(t *testing.T) {
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
fakeRecorder := &record.FakeRecorder{}
np, _ := network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
containerManager := NewDockerManager(fakeDocker, fakeRecorder, nil, nil, PodInfraContainerImage, 0, 0, "", kubecontainer.FakeOS{}, np, &kubeletProber.FakeProber{})
tests := []struct {
containers map[string]*docker.Container
inputIDs []string
expectedIDs []string
err error
}{
{
containers: map[string]*docker.Container{
"foobar": {
ID: "foobar",
State: docker.State{
Running: false,
},
},
"baz": {
ID: "baz",
State: docker.State{
Running: true,
},
},
},
inputIDs: []string{"foobar", "baz"},
expectedIDs: []string{"baz"},
},
{
containers: map[string]*docker.Container{
"foobar": {
ID: "foobar",
State: docker.State{
Running: true,
},
},
"baz": {
ID: "baz",
State: docker.State{
Running: true,
},
},
},
inputIDs: []string{"foobar", "baz"},
expectedIDs: []string{"foobar", "baz"},
},
{
containers: map[string]*docker.Container{
"foobar": {
ID: "foobar",
State: docker.State{
Running: false,
},
},
"baz": {
ID: "baz",
State: docker.State{
Running: false,
},
},
},
inputIDs: []string{"foobar", "baz"},
expectedIDs: []string{},
},
{
containers: map[string]*docker.Container{
"foobar": {
ID: "foobar",
State: docker.State{
Running: false,
},
},
"baz": {
ID: "baz",
State: docker.State{
Running: false,
},
},
},
inputIDs: []string{"foobar", "baz"},
err: fmt.Errorf("test error"),
},
}
for _, test := range tests {
fakeDocker.ContainerMap = test.containers
if test.err != nil {
fakeDocker.Errors["inspect_container"] = test.err
}
if results, err := containerManager.GetRunningContainers(test.inputIDs); err == nil {
resultIDs := []string{}
for _, result := range results {
resultIDs = append(resultIDs, result.ID)
}
if !reflect.DeepEqual(resultIDs, test.expectedIDs) {
t.Errorf("expected: %#v, saw: %#v", test.expectedIDs, resultIDs)
}
if err != nil {
t.Errorf("unexpected error: %v", err)
}
} else {
if err != test.err {
t.Errorf("unexpected error: %v", err)
}
}
}
}
type podsByID []*kubecontainer.Pod
func (b podsByID) Len() int { return len(b) }

View File

@ -458,23 +458,6 @@ func (dm *DockerManager) GetPodInfraContainer(pod kubecontainer.Pod) (kubecontai
return kubecontainer.Container{}, fmt.Errorf("unable to find pod infra container for pod %v", pod.ID)
}
func (dm *DockerManager) GetRunningContainers(ids []string) ([]*docker.Container, error) {
var result []*docker.Container
if dm.client == nil {
return nil, fmt.Errorf("unexpected nil docker client.")
}
for ix := range ids {
status, err := dm.client.InspectContainer(ids[ix])
if err != nil {
return nil, err
}
if status != nil && status.State.Running {
result = append(result, status)
}
}
return result, nil
}
func (dm *DockerManager) runContainerRecordErrorReason(pod *api.Pod, container *api.Container, opts *kubecontainer.RunContainerOptions, ref *api.ObjectReference) (string, error) {
dockerID, err := dm.runContainer(pod, container, opts, ref)
if err != nil {

View File

@ -54,7 +54,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/GoogleCloudPlatform/kubernetes/pkg/volume"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
docker "github.com/fsouza/go-dockerclient"
"github.com/golang/glog"
cadvisorApi "github.com/google/cadvisor/info/v1"
)
@ -1109,20 +1108,15 @@ func (kl *Kubelet) cleanupOrphanedPodDirs(pods []*api.Pod) error {
// Compares the map of current volumes to the map of desired volumes.
// If an active volume does not have a respective desired volume, clean it up.
func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, running []*docker.Container) error {
func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
desiredVolumes := getDesiredVolumes(pods)
currentVolumes := kl.getPodVolumesFromDisk()
runningSet := util.StringSet{}
for ix := range running {
if len(running[ix].Name) == 0 {
glog.V(2).Infof("Found running container ix=%d with info: %+v", ix, running[ix])
}
containerName, _, err := dockertools.ParseDockerName(running[ix].Name)
if err != nil {
continue
}
runningSet.Insert(string(containerName.PodUID))
for _, pod := range runningPods {
runningSet.Insert(string(pod.ID))
}
for name, vol := range currentVolumes {
if _, ok := desiredVolumes[name]; !ok {
parts := strings.Split(name, "/")
@ -1232,16 +1226,24 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
return nil
}
// Kill containers associated with unwanted pods and get a list of
// unwanted containers that are still running.
running, err := kl.killUnwantedPods(desiredPods, runningPods)
// Kill containers associated with unwanted pods.
err = kl.killUnwantedPods(desiredPods, runningPods)
if err != nil {
glog.Errorf("Failed killing unwanted containers: %v", err)
}
// Note that we just killed the unwanted pods. This may not have reflected
// in the cache. We need to bypass the cach to get the latest set of
// running pods to clean up the volumes.
// TODO: Evaluate the performance impact of bypassing the runtime cache.
runningPods, err = kl.containerManager.GetPods(false)
if err != nil {
glog.Errorf("Error listing containers: %#v", err)
return err
}
// Remove any orphaned volumes.
err = kl.cleanupOrphanedVolumes(pods, running)
err = kl.cleanupOrphanedVolumes(pods, runningPods)
if err != nil {
glog.Errorf("Failed cleaning up orphaned volumes: %v", err)
return err
@ -1260,15 +1262,10 @@ func (kl *Kubelet) SyncPods(allPods []*api.Pod, podSyncTypes map[types.UID]metri
return err
}
// killUnwantedPods kills the unwanted, running pods in parallel, and returns
// containers in those pods that it failed to terminate.
// killUnwantedPods kills the unwanted, running pods in parallel.
func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
runningPods []*kubecontainer.Pod) ([]*docker.Container, error) {
type result struct {
containers []*docker.Container
err error
}
ch := make(chan result, len(runningPods))
runningPods []*kubecontainer.Pod) error {
ch := make(chan error, len(runningPods))
defer close(ch)
numWorkers := 0
for _, pod := range runningPods {
@ -1277,15 +1274,14 @@ func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
continue
}
numWorkers++
go func(pod *kubecontainer.Pod, ch chan result) {
go func(pod *kubecontainer.Pod, ch chan error) {
var err error = nil
defer func() {
// Send the IDs of the containers that we failed to killed.
containers, err := kl.getRunningContainersByPod(pod)
ch <- result{containers: containers, err: err}
ch <- err
}()
glog.V(1).Infof("Killing unwanted pod %q", pod.Name)
// Stop the containers.
err := kl.killPod(*pod)
err = kl.killPod(*pod)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
return
@ -1293,26 +1289,15 @@ func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
}(pod, ch)
}
// Aggregate results from the pod killing workers.
// Aggregate errors from the pod killing workers.
var errs []error
var running []*docker.Container
for i := 0; i < numWorkers; i++ {
m := <-ch
if m.err != nil {
errs = append(errs, m.err)
continue
err := <-ch
if err != nil {
errs = append(errs, err)
}
running = append(running, m.containers...)
}
return running, utilErrors.NewAggregate(errs)
}
func (kl *Kubelet) getRunningContainersByPod(pod *kubecontainer.Pod) ([]*docker.Container, error) {
containerIDs := make([]string, len(pod.Containers))
for i, c := range pod.Containers {
containerIDs[i] = string(c.ID)
}
return kl.containerManager.GetRunningContainers(containerIDs)
return utilErrors.NewAggregate(errs)
}
type podsByCreationTime []*api.Pod

View File

@ -528,7 +528,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
}
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list",
"list", "list", "list",
// Get pod status.
"list", "inspect_container", "inspect_container",
// Check the pod infra contianer.
@ -570,7 +570,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
}
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list",
"list", "list", "list",
// Get pod status.
"list", "inspect_image",
// Create pod infra container.
@ -630,7 +630,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list",
"list", "list", "list",
// Get pod status.
"list", "inspect_image",
// Create pod infra container.
@ -693,7 +693,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list",
"list", "list", "list",
// Get pod status.
"list", "inspect_image",
// Create pod infra container.
@ -760,7 +760,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list",
"list", "list", "list",
// Get pod status.
"list", "inspect_container", "inspect_image",
// Check the pod infra container.
@ -835,7 +835,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list",
"list", "list", "list",
// Get pod status.
"list", "inspect_container", "inspect_image",
// Check the pod infra container.
@ -936,6 +936,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
"list",
// foo1
"list",
"list",
// Get pod status.
"list", "inspect_container",
// Kill the container since pod infra container is not running.
@ -999,7 +1000,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
if err := kubelet.SyncPods([]*api.Pod{}, emptyPodUIDs, map[string]*api.Pod{}, time.Now()); err != nil {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "list"})
// A map iteration is used to delete containers, so must not depend on
// order here.
@ -1040,7 +1041,7 @@ func TestSyncPodsDeletes(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "list"})
// A map iteration is used to delete containers, so must not depend on
// order here.
@ -1121,7 +1122,7 @@ func TestSyncPodsDeletesDuplicate(t *testing.T) {
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list",
"list", "list", "list",
// Get pod status.
"list", "inspect_container", "inspect_container", "inspect_container",
// Check the pod infra container.
@ -1192,7 +1193,7 @@ func TestSyncPodsBadHash(t *testing.T) {
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list",
"list", "list", "list",
// Get pod status.
"list", "inspect_container", "inspect_container",
// Check the pod infra container.
@ -1266,7 +1267,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list",
"list", "list", "list",
// Get pod status.
"list", "inspect_container", "inspect_container",
// Check the pod infra container.
@ -1915,7 +1916,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
waitGroup.Wait()
verifyCalls(t, fakeDocker, []string{
"list", "list",
"list", "list", "list",
// Get pod status.
"list", "inspect_container", "inspect_image",
// Check the pod infra container.
@ -3958,7 +3959,7 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
}{
{
api.RestartPolicyAlways,
[]string{"list", "list",
[]string{"list", "list", "list",
// Get pod status.
"list", "inspect_container", "inspect_container", "inspect_container",
// Check the pod infra container.
@ -3972,7 +3973,7 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
},
{
api.RestartPolicyOnFailure,
[]string{"list", "list",
[]string{"list", "list", "list",
// Get pod status.
"list", "inspect_container", "inspect_container", "inspect_container",
// Check the pod infra container.
@ -3986,7 +3987,7 @@ func TestSyncPodsWithRestartPolicy(t *testing.T) {
},
{
api.RestartPolicyNever,
[]string{"list", "list",
[]string{"list", "list", "list",
// Get pod status.
"list", "inspect_container", "inspect_container", "inspect_container",
// Check the pod infra container.