Handle graceful termination in the Kubelet

Supports pods that are gracefully deleted on the server being
handled in the Kubelet

preStop is limited to the grace period of the pod in execution.
This commit is contained in:
Clayton Coleman 2015-08-19 21:57:58 -04:00
parent 65f4ebd927
commit f5c4a3e7a6
10 changed files with 359 additions and 112 deletions

View File

@ -217,9 +217,8 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
for _, ref := range filtered { for _, ref := range filtered {
name := kubecontainer.GetPodFullName(ref) name := kubecontainer.GetPodFullName(ref)
if existing, found := pods[name]; found { if existing, found := pods[name]; found {
if !reflect.DeepEqual(existing.Spec, ref.Spec) { if checkAndUpdatePod(existing, ref) {
// this is an update // this is an update
existing.Spec = ref.Spec
updates.Pods = append(updates.Pods, existing) updates.Pods = append(updates.Pods, existing)
continue continue
} }
@ -261,9 +260,8 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
name := kubecontainer.GetPodFullName(ref) name := kubecontainer.GetPodFullName(ref)
if existing, found := oldPods[name]; found { if existing, found := oldPods[name]; found {
pods[name] = existing pods[name] = existing
if !reflect.DeepEqual(existing.Spec, ref.Spec) { if checkAndUpdatePod(existing, ref) {
// this is an update // this is an update
existing.Spec = ref.Spec
updates.Pods = append(updates.Pods, existing) updates.Pods = append(updates.Pods, existing)
continue continue
} }
@ -335,6 +333,23 @@ func filterInvalidPods(pods []*api.Pod, source string, recorder record.EventReco
return return
} }
// checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or
// returns false if there was no update.
func checkAndUpdatePod(existing, ref *api.Pod) bool {
// TODO: it would be better to update the whole object and only preserve certain things
// like the source annotation or the UID (to ensure safety)
if reflect.DeepEqual(existing.Spec, ref.Spec) &&
reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) &&
reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) {
return false
}
// this is an update
existing.Spec = ref.Spec
existing.DeletionTimestamp = ref.DeletionTimestamp
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
return true
}
// Sync sends a copy of the current state through the update channel. // Sync sends a copy of the current state through the update channel.
func (s *podStorage) Sync() { func (s *podStorage) Sync() {
s.updateLock.Lock() s.updateLock.Lock()

View File

@ -163,13 +163,13 @@ func (f *FakeRuntime) SyncPod(pod *api.Pod, _ Pod, _ api.PodStatus, _ []api.Secr
return f.Err return f.Err
} }
func (f *FakeRuntime) KillPod(pod Pod) error { func (f *FakeRuntime) KillPod(pod *api.Pod, runningPod Pod) error {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "KillPod") f.CalledFunctions = append(f.CalledFunctions, "KillPod")
f.KilledPods = append(f.KilledPods, string(pod.ID)) f.KilledPods = append(f.KilledPods, string(runningPod.ID))
for _, c := range pod.Containers { for _, c := range runningPod.Containers {
f.KilledContainers = append(f.KilledContainers, c.Name) f.KilledContainers = append(f.KilledContainers, c.Name)
} }
return f.Err return f.Err

View File

@ -54,8 +54,8 @@ type Runtime interface {
GetPods(all bool) ([]*Pod, error) GetPods(all bool) ([]*Pod, error)
// Syncs the running pod into the desired pod. // Syncs the running pod into the desired pod.
SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error
// KillPod kills all the containers of a pod. // KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
KillPod(pod Pod) error KillPod(pod *api.Pod, runningPod Pod) error
// GetPodStatus retrieves the status of the pod, including the information of // GetPodStatus retrieves the status of the pod, including the information of
// all containers in the pod. Clients of this interface assume the containers // all containers in the pod. Clients of this interface assume the containers
// statuses in a pod always have a deterministic ordering (eg: sorted by name). // statuses in a pod always have a deterministic ordering (eg: sorted by name).

View File

@ -56,12 +56,21 @@ import (
const ( const (
maxReasonCacheEntries = 200 maxReasonCacheEntries = 200
kubernetesPodLabel = "io.kubernetes.pod.data"
kubernetesContainerLabel = "io.kubernetes.container.name"
// ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified) // ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified)
// we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative. // we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative.
// hence, setting ndots to be 5. // hence, setting ndots to be 5.
ndotsDNSOption = "options ndots:5\n" ndotsDNSOption = "options ndots:5\n"
// In order to avoid unnecessary SIGKILLs, give every container a minimum grace
// period after SIGTERM. Docker will guarantee the termination, but SIGTERM is
// potentially dangerous.
// TODO: evaluate whether there are scenarios in which SIGKILL is preferable to
// SIGTERM for certain process types, which may justify setting this to 0.
minimumGracePeriodInSeconds = 2
kubernetesNameLabel = "io.kubernetes.pod.name"
kubernetesPodLabel = "io.kubernetes.pod.data"
kubernetesTerminationGracePeriodLabel = "io.kubernetes.pod.terminationGracePeriod"
kubernetesContainerLabel = "io.kubernetes.container.name"
) )
// DockerManager implements the Runtime interface. // DockerManager implements the Runtime interface.
@ -589,12 +598,19 @@ func (dm *DockerManager) runContainer(
if len(containerHostname) > hostnameMaxLen { if len(containerHostname) > hostnameMaxLen {
containerHostname = containerHostname[:hostnameMaxLen] containerHostname = containerHostname[:hostnameMaxLen]
} }
// Pod information is recorded on the container as labels to preserve it in the event the pod is deleted
// while the Kubelet is down and there is no information available to recover the pod. This includes
// termination information like the termination grace period and the pre stop hooks.
// TODO: keep these labels up to date if the pod changes
namespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} namespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
labels := map[string]string{ labels := map[string]string{
"io.kubernetes.pod.name": namespacedName.String(), kubernetesNameLabel: namespacedName.String(),
}
if pod.Spec.TerminationGracePeriodSeconds != nil {
labels[kubernetesTerminationGracePeriodLabel] = strconv.FormatInt(*pod.Spec.TerminationGracePeriodSeconds, 10)
} }
if container.Lifecycle != nil && container.Lifecycle.PreStop != nil { if container.Lifecycle != nil && container.Lifecycle.PreStop != nil {
glog.V(1).Infof("Setting preStop hook")
// TODO: This is kind of hacky, we should really just encode the bits we need. // TODO: This is kind of hacky, we should really just encode the bits we need.
data, err := latest.Codec.Encode(pod) data, err := latest.Codec.Encode(pod)
if err != nil { if err != nil {
@ -1106,40 +1122,56 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream
} }
// Kills all containers in the specified pod // Kills all containers in the specified pod
func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error { func (dm *DockerManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
// Send the kills in parallel since they may take a long time. Len + 1 since there // Send the kills in parallel since they may take a long time. Len + 1 since there
// can be Len errors + the networkPlugin teardown error. // can be Len errors + the networkPlugin teardown error.
errs := make(chan error, len(pod.Containers)+1) errs := make(chan error, len(runningPod.Containers)+1)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
var networkID types.UID var (
for _, container := range pod.Containers { networkContainer *kubecontainer.Container
networkSpec *api.Container
)
for _, container := range runningPod.Containers {
wg.Add(1) wg.Add(1)
go func(container *kubecontainer.Container) { go func(container *kubecontainer.Container) {
defer util.HandleCrash() defer util.HandleCrash()
defer wg.Done() defer wg.Done()
var containerSpec *api.Container
if pod != nil {
for i, c := range pod.Spec.Containers {
if c.Name == container.Name {
containerSpec = &pod.Spec.Containers[i]
break
}
}
}
// TODO: Handle this without signaling the pod infra container to // TODO: Handle this without signaling the pod infra container to
// adapt to the generic container runtime. // adapt to the generic container runtime.
if container.Name == PodInfraContainerName { if container.Name == PodInfraContainerName {
// Store the container runtime for later deletion. // Store the container runtime for later deletion.
// We do this so that PreStop handlers can run in the network namespace. // We do this so that PreStop handlers can run in the network namespace.
networkID = container.ID networkContainer = container
networkSpec = containerSpec
return return
} }
if err := dm.killContainer(container.ID); err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID) err := dm.KillContainerInPod(container.ID, containerSpec, pod)
if err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID)
errs <- err errs <- err
} }
}(container) }(container)
} }
wg.Wait() wg.Wait()
if len(networkID) > 0 { if networkContainer != nil {
if err := dm.networkPlugin.TearDownPod(pod.Namespace, pod.Name, kubeletTypes.DockerID(networkID)); err != nil { if err := dm.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, kubeletTypes.DockerID(networkContainer.ID)); err != nil {
glog.Errorf("Failed tearing down the infra container: %v", err) glog.Errorf("Failed tearing down the infra container: %v", err)
errs <- err errs <- err
} }
if err := dm.killContainer(networkID); err != nil { if err := dm.KillContainerInPod(networkContainer.ID, networkSpec, pod); err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID) glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID)
errs <- err errs <- err
} }
} }
@ -1154,75 +1186,152 @@ func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error {
return nil return nil
} }
// KillContainerInPod kills a container in the pod. // KillContainerInPod kills a container in the pod. It must be passed either a container ID or a container and pod,
func (dm *DockerManager) KillContainerInPod(container api.Container, pod *api.Pod) error { // and will attempt to lookup the other information if missing.
// Locate the container. func (dm *DockerManager) KillContainerInPod(containerID types.UID, container *api.Container, pod *api.Pod) error {
pods, err := dm.GetPods(false) switch {
if err != nil { case len(containerID) == 0:
return err // Locate the container.
} pods, err := dm.GetPods(false)
targetPod := kubecontainer.Pods(pods).FindPod(kubecontainer.GetPodFullName(pod), pod.UID)
targetContainer := targetPod.FindContainerByName(container.Name)
if targetContainer == nil {
return fmt.Errorf("unable to find container %q in pod %q", container.Name, targetPod.Name)
}
return dm.killContainer(targetContainer.ID)
}
// TODO(vmarmol): Unexport this as it is no longer used externally.
// KillContainer kills a container identified by containerID.
// Internally, it invokes docker's StopContainer API with a timeout of 10s.
// TODO: Deprecate this function in favor of KillContainerInPod.
func (dm *DockerManager) KillContainer(containerID types.UID) error {
return dm.killContainer(containerID)
}
func (dm *DockerManager) killContainer(containerID types.UID) error {
ID := string(containerID)
glog.V(2).Infof("Killing container with id %q", ID)
inspect, err := dm.client.InspectContainer(ID)
if err != nil {
return err
}
var found bool
var preStop string
if inspect != nil && inspect.Config != nil && inspect.Config.Labels != nil {
preStop, found = inspect.Config.Labels[kubernetesPodLabel]
}
if found {
var pod api.Pod
err := latest.Codec.DecodeInto([]byte(preStop), &pod)
if err != nil { if err != nil {
glog.Errorf("Failed to decode prestop: %s, %s", preStop, ID) return err
} else { }
name := inspect.Config.Labels[kubernetesContainerLabel] targetPod := kubecontainer.Pods(pods).FindPod(kubecontainer.GetPodFullName(pod), pod.UID)
var container *api.Container targetContainer := targetPod.FindContainerByName(container.Name)
if targetContainer == nil {
return fmt.Errorf("unable to find container %q in pod %q", container.Name, targetPod.Name)
}
containerID = targetContainer.ID
case container == nil || pod == nil:
// Read information about the container from labels
inspect, err := dm.client.InspectContainer(string(containerID))
if err != nil {
return err
}
storedPod, storedContainer, cerr := containerAndPodFromLabels(inspect)
if cerr != nil {
glog.Errorf("unable to access pod data from container: %v", err)
}
if container == nil {
container = storedContainer
}
if pod == nil {
pod = storedPod
}
}
return dm.killContainer(containerID, container, pod)
}
// killContainer accepts a containerID and an optional container or pod containing shutdown policies. Invoke
// KillContainerInPod if information must be retrieved first.
func (dm *DockerManager) killContainer(containerID types.UID, container *api.Container, pod *api.Pod) error {
ID := string(containerID)
name := ID
if container != nil {
name = fmt.Sprintf("%s %s", name, container.Name)
}
if pod != nil {
name = fmt.Sprintf("%s %s/%s", name, pod.Namespace, pod.Name)
}
gracePeriod := int64(minimumGracePeriodInSeconds)
if pod != nil {
switch {
case pod.DeletionGracePeriodSeconds != nil:
gracePeriod = *pod.DeletionGracePeriodSeconds
case pod.Spec.TerminationGracePeriodSeconds != nil:
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
}
}
glog.V(2).Infof("Killing container %q with %d second grace period", name, gracePeriod)
start := util.Now()
if pod != nil && container != nil && container.Lifecycle != nil && container.Lifecycle.PreStop != nil {
glog.V(4).Infof("Running preStop hook for container %q", name)
done := make(chan struct{})
go func() {
defer close(done)
defer util.HandleCrash()
if err := dm.runner.Run(ID, pod, container, container.Lifecycle.PreStop); err != nil {
glog.Errorf("preStop hook for container %q failed: %v", name, err)
}
}()
select {
case <-time.After(time.Duration(gracePeriod) * time.Second):
glog.V(2).Infof("preStop hook for container %q did not complete in %d seconds", name, gracePeriod)
case <-done:
glog.V(4).Infof("preStop hook for container %q completed", name)
}
gracePeriod -= int64(util.Now().Sub(start.Time).Seconds())
}
dm.readinessManager.RemoveReadiness(ID)
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds
}
err := dm.client.StopContainer(ID, uint(gracePeriod))
if _, ok := err.(*docker.ContainerNotRunning); ok && err != nil {
glog.V(4).Infof("Container %q has already exited", name)
return nil
}
if err == nil {
glog.V(2).Infof("Container %q exited after %s", name, util.Now().Sub(start.Time))
} else {
glog.V(2).Infof("Container %q termination failed after %s: %v", name, util.Now().Sub(start.Time), err)
}
ref, ok := dm.containerRefManager.GetRef(ID)
if !ok {
glog.Warningf("No ref for pod '%q'", name)
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
dm.recorder.Eventf(ref, "Killing", "Killing with docker id %v", util.ShortenString(ID, 12))
}
return err
}
var errNoPodOnContainer = fmt.Errorf("no pod information labels on Docker container")
// containerAndPodFromLabels tries to load the appropriate container info off of a Docker container's labels
func containerAndPodFromLabels(inspect *docker.Container) (pod *api.Pod, container *api.Container, err error) {
if inspect == nil && inspect.Config == nil && inspect.Config.Labels == nil {
return nil, nil, errNoPodOnContainer
}
labels := inspect.Config.Labels
// the pod data may not be set
if body, found := labels[kubernetesPodLabel]; found {
pod = &api.Pod{}
if err = latest.Codec.DecodeInto([]byte(body), pod); err == nil {
name := labels[kubernetesContainerLabel]
for ix := range pod.Spec.Containers { for ix := range pod.Spec.Containers {
if pod.Spec.Containers[ix].Name == name { if pod.Spec.Containers[ix].Name == name {
container = &pod.Spec.Containers[ix] container = &pod.Spec.Containers[ix]
break break
} }
} }
if container != nil { if container == nil {
glog.V(1).Infof("Running preStop hook") err = fmt.Errorf("unable to find container %s in pod %v", name, pod)
if err := dm.runner.Run(ID, &pod, container, container.Lifecycle.PreStop); err != nil { }
glog.Errorf("failed to run preStop hook: %v", err) } else {
} pod = nil
} else { }
glog.Errorf("unable to find container %v, %s", pod, name) }
// attempt to find the default grace period if we didn't commit a pod, but set the generic metadata
// field (the one used by kill)
if pod == nil {
if period, ok := labels[kubernetesTerminationGracePeriodLabel]; ok {
if seconds, err := strconv.ParseInt(period, 10, 64); err == nil {
pod = &api.Pod{}
pod.DeletionGracePeriodSeconds = &seconds
} }
} }
} }
dm.readinessManager.RemoveReadiness(ID)
err = dm.client.StopContainer(ID, 10) return
ref, ok := dm.containerRefManager.GetRef(ID)
if !ok {
glog.Warningf("No ref for pod '%v'", ID)
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
dm.recorder.Eventf(ref, "Killing", "Killing with docker id %v", util.ShortenString(ID, 12))
}
return err
} }
// Run a single container from a pod. Returns the docker container ID // Run a single container from a pod. Returns the docker container ID
@ -1259,7 +1368,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart) handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil { if handlerErr != nil {
dm.killContainer(types.UID(id)) dm.KillContainerInPod(types.UID(id), container, pod)
return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr) return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
} }
} }
@ -1540,10 +1649,10 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus) containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if err != nil { if err != nil {
return err return err
} }
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) { if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) {
if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 { if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 {
@ -1553,7 +1662,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
} }
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
err = dm.KillPod(runningPod) err = dm.KillPod(pod, runningPod)
if err != nil { if err != nil {
return err return err
} }
@ -1563,7 +1672,15 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
_, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)] _, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)]
if !keep { if !keep {
glog.V(3).Infof("Killing unwanted container %+v", container) glog.V(3).Infof("Killing unwanted container %+v", container)
err = dm.KillContainer(container.ID) // attempt to find the appropriate container policy
var podContainer *api.Container
for i, c := range pod.Spec.Containers {
if c.Name == container.Name {
podContainer = &pod.Spec.Containers[i]
break
}
}
err = dm.KillContainerInPod(container.ID, podContainer, pod)
if err != nil { if err != nil {
glog.Errorf("Error killing container: %v", err) glog.Errorf("Error killing container: %v", err)
} }

View File

@ -405,7 +405,7 @@ func TestKillContainerInPod(t *testing.T) {
manager.readinessManager.SetReadiness(c.ID, true) manager.readinessManager.SetReadiness(c.ID, true)
} }
if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err != nil { if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
// Assert the container has been stopped. // Assert the container has been stopped.
@ -478,14 +478,14 @@ func TestKillContainerInPodWithPreStop(t *testing.T) {
manager.readinessManager.SetReadiness(c.ID, true) manager.readinessManager.SetReadiness(c.ID, true)
} }
if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err != nil { if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
// Assert the container has been stopped. // Assert the container has been stopped.
if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil { if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil {
t.Errorf("container was not stopped correctly: %v", err) t.Errorf("container was not stopped correctly: %v", err)
} }
verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "create_exec", "start_exec", "stop"}) verifyCalls(t, fakeDocker, []string{"list", "create_exec", "start_exec", "stop"})
if !reflect.DeepEqual(expectedCmd, fakeDocker.execCmd) { if !reflect.DeepEqual(expectedCmd, fakeDocker.execCmd) {
t.Errorf("expected: %v, got %v", expectedCmd, fakeDocker.execCmd) t.Errorf("expected: %v, got %v", expectedCmd, fakeDocker.execCmd)
} }
@ -522,7 +522,7 @@ func TestKillContainerInPodWithError(t *testing.T) {
manager.readinessManager.SetReadiness(c.ID, true) manager.readinessManager.SetReadiness(c.ID, true)
} }
if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err == nil { if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err == nil {
t.Errorf("expected error, found nil") t.Errorf("expected error, found nil")
} }
@ -1021,7 +1021,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) {
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Kill the container since pod infra container is not running. // Kill the container since pod infra container is not running.
"inspect_container", "stop", "stop",
// Create pod infra container. // Create pod infra container.
"create", "start", "inspect_container", "create", "start", "inspect_container",
// Create container. // Create container.
@ -1096,7 +1096,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
// Check the pod infra container. // Check the pod infra container.
"inspect_container", "inspect_container",
// Kill the duplicated container. // Kill the duplicated container.
"inspect_container", "stop", "stop",
}) })
// Expect one of the duplicates to be killed. // Expect one of the duplicates to be killed.
if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") { if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") {
@ -1150,7 +1150,7 @@ func TestSyncPodBadHash(t *testing.T) {
// Check the pod infra container. // Check the pod infra container.
"inspect_container", "inspect_container",
// Kill and restart the bad hash container. // Kill and restart the bad hash container.
"inspect_container", "stop", "create", "start", "inspect_container", "stop", "create", "start", "inspect_container",
}) })
if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil { if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil {
@ -1208,7 +1208,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
// Check the pod infra container. // Check the pod infra container.
"inspect_container", "inspect_container",
// Kill the unhealthy container. // Kill the unhealthy container.
"inspect_container", "stop", "stop",
// Restart the unhealthy container. // Restart the unhealthy container.
"create", "start", "inspect_container", "create", "start", "inspect_container",
}) })
@ -1441,9 +1441,9 @@ func TestSyncPodWithRestartPolicy(t *testing.T) {
api.RestartPolicyNever, api.RestartPolicyNever,
[]string{ []string{
// Check the pod infra container. // Check the pod infra container.
"inspect_container", "inspect_container", "inspect_container",
// Stop the last pod infra container. // Stop the last pod infra container.
"inspect_container", "stop", "stop",
}, },
[]string{}, []string{},
[]string{"9876"}, []string{"9876"},
@ -1910,7 +1910,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
// Create the container. // Create the container.
"create", "start", "create", "start",
// Kill the container since event handler fails. // Kill the container since event handler fails.
"inspect_container", "stop", "stop",
}) })
// TODO(yifan): Check the stopped container's name. // TODO(yifan): Check the stopped container's name.

View File

@ -1124,8 +1124,8 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string,
} }
// Kill all running containers in a pod (includes the pod infra container). // Kill all running containers in a pod (includes the pod infra container).
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error { func (kl *Kubelet) killPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
return kl.containerRuntime.KillPod(pod) return kl.containerRuntime.KillPod(pod, runningPod)
} }
type empty struct{} type empty struct{}
@ -1181,9 +1181,10 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
}() }()
// Kill pods we can't run. // Kill pods we can't run.
err := canRunPod(pod) if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
if err != nil { if err := kl.killPod(pod, runningPod); err != nil {
kl.killPod(runningPod) util.HandleError(err)
}
return err return err
} }
@ -1370,6 +1371,32 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
return nil return nil
} }
// Delete any pods that are no longer running and are marked for deletion.
func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
var terminating []*api.Pod
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
found := false
for _, runningPod := range runningPods {
if runningPod.ID == pod.UID {
found = true
break
}
}
if found {
podFullName := kubecontainer.GetPodFullName(pod)
glog.V(5).Infof("Keeping terminated pod %q and uid %q, still running", podFullName, pod.UID)
continue
}
terminating = append(terminating, pod)
}
}
if !kl.statusManager.TerminatePods(terminating) {
return errors.New("not all pods were successfully terminated")
}
return nil
}
// pastActiveDeadline returns true if the pod has been active for more than // pastActiveDeadline returns true if the pod has been active for more than
// ActiveDeadlineSeconds. // ActiveDeadlineSeconds.
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
@ -1532,6 +1559,10 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro
// Remove any orphaned mirror pods. // Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods() kl.podManager.DeleteOrphanedMirrorPods()
if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil {
glog.Errorf("Failed to cleanup terminated pods: %v", err)
}
return err return err
} }
@ -1554,7 +1585,7 @@ func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
}() }()
glog.V(1).Infof("Killing unwanted pod %q", pod.Name) glog.V(1).Infof("Killing unwanted pod %q", pod.Name)
// Stop the containers. // Stop the containers.
err = kl.killPod(*pod) err = kl.killPod(nil, *pod)
if err != nil { if err != nil {
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err) glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
return return

View File

@ -64,7 +64,7 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error {
return err return err
} }
glog.V(4).Infof("Deleting a mirror pod %q", podFullName) glog.V(4).Infof("Deleting a mirror pod %q", podFullName)
if err := mc.apiserverClient.Pods(namespace).Delete(name, nil); err != nil { if err := mc.apiserverClient.Pods(namespace).Delete(name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err) glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)
} }
return nil return nil

View File

@ -693,11 +693,11 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
} }
// KillPod invokes 'systemctl kill' to kill the unit that runs the pod. // KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
func (r *runtime) KillPod(pod kubecontainer.Pod) error { func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
glog.V(4).Infof("Rkt is killing pod: name %q.", pod.Name) glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name)
// TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout. // TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout.
r.systemd.KillUnit(makePodServiceFileName(pod.ID), int32(syscall.SIGKILL)) r.systemd.KillUnit(makePodServiceFileName(runningPod.ID), int32(syscall.SIGKILL))
return r.systemd.Reload() return r.systemd.Reload()
} }
@ -966,7 +966,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
if restartPod { if restartPod {
// TODO(yifan): Handle network plugin. // TODO(yifan): Handle network plugin.
if err := r.KillPod(runningPod); err != nil { if err := r.KillPod(pod, runningPod); err != nil {
return err return err
} }
if err := r.RunPod(pod, pullSecrets); err != nil { if err := r.RunPod(pod, pullSecrets); err != nil {

View File

@ -24,6 +24,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -122,7 +123,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
// Currently this routine is not called for the same pod from multiple // Currently this routine is not called for the same pod from multiple
// workers and/or the kubelet but dropping the lock before sending the // workers and/or the kubelet but dropping the lock before sending the
// status down the channel feels like an easy way to get a bullet in foot. // status down the channel feels like an easy way to get a bullet in foot.
if !found || !isStatusEqual(&oldStatus, &status) { if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil {
s.podStatuses[podFullName] = status s.podStatuses[podFullName] = status
s.podStatusChannel <- podStatusSyncRequest{pod, status} s.podStatusChannel <- podStatusSyncRequest{pod, status}
} else { } else {
@ -130,6 +131,29 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
} }
} }
// TerminatePods resets the container status for the provided pods to terminated and triggers
// a status update. This function may not enqueue all the provided pods, in which case it will
// return false
func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
sent := true
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
for _, pod := range pods {
for i := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[i].State = api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
}
}
select {
case s.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}:
default:
sent = false
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod))
}
}
return sent
}
func (s *statusManager) DeletePodStatus(podFullName string) { func (s *statusManager) DeletePodStatus(podFullName string) {
s.podStatusesLock.Lock() s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock() defer s.podStatusesLock.Unlock()
@ -161,13 +185,33 @@ func (s *statusManager) syncBatch() error {
} }
// TODO: make me easier to express from client code // TODO: make me easier to express from client code
statusPod, err = s.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name) statusPod, err = s.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name)
if errors.IsNotFound(err) {
glog.V(3).Infof("Pod %q was deleted on the server", pod.Name)
return nil
}
if err == nil { if err == nil {
if len(pod.UID) > 0 && statusPod.UID != pod.UID {
glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", kubeletUtil.FormatPodName(pod))
return nil
}
statusPod.Status = status statusPod.Status = status
_, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)
// TODO: handle conflict as a retry, make that easier too. // TODO: handle conflict as a retry, make that easier too.
statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)
if err == nil { if err == nil {
glog.V(3).Infof("Status for pod %q updated successfully", kubeletUtil.FormatPodName(pod)) glog.V(3).Infof("Status for pod %q updated successfully", kubeletUtil.FormatPodName(pod))
return nil
if pod.DeletionTimestamp == nil {
return nil
}
if !notRunning(pod.Status.ContainerStatuses) {
glog.V(3).Infof("Pod %q is terminated, but some pods are still running", pod.Name)
return nil
}
if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name)
s.DeletePodStatus(podFullName)
return nil
}
} }
} }
@ -181,3 +225,14 @@ func (s *statusManager) syncBatch() error {
go s.DeletePodStatus(podFullName) go s.DeletePodStatus(podFullName)
return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err) return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err)
} }
// notRunning returns true if every status is terminated or waiting, or the status list
// is empty.
func notRunning(statuses []api.ContainerStatus) bool {
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return false
}
}
return true
}

View File

@ -153,8 +153,21 @@ func TestUnchangedStatus(t *testing.T) {
verifyUpdates(t, syncer, 1) verifyUpdates(t, syncer, 1)
} }
func TestSyncBatchIgnoresNotFound(t *testing.T) {
syncer := newTestStatusManager()
syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
t.Errorf("unexpected syncing error: %v", err)
}
verifyActions(t, syncer.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
})
}
func TestSyncBatch(t *testing.T) { func TestSyncBatch(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestStatusManager()
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch() err := syncer.syncBatch()
if err != nil { if err != nil {
@ -167,6 +180,22 @@ func TestSyncBatch(t *testing.T) {
) )
} }
func TestSyncBatchChecksMismatchedUID(t *testing.T) {
syncer := newTestStatusManager()
testPod.UID = "first"
differentPod := *testPod
differentPod.UID = "second"
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(&differentPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
t.Errorf("unexpected syncing error: %v", err)
}
verifyActions(t, syncer.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
})
}
// shuffle returns a new shuffled list of container statuses. // shuffle returns a new shuffled list of container statuses.
func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus { func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus {
numStatuses := len(statuses) numStatuses := len(statuses)