Merge pull request #12959 from smarterclayton/handle_pods_in_kubelet

Support graceful termination in the Kubelet (5/7)
This commit is contained in:
Zach Loafman 2015-08-21 16:13:21 -07:00
commit 73d105e22c
10 changed files with 359 additions and 112 deletions

View File

@ -217,9 +217,8 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
for _, ref := range filtered { for _, ref := range filtered {
name := kubecontainer.GetPodFullName(ref) name := kubecontainer.GetPodFullName(ref)
if existing, found := pods[name]; found { if existing, found := pods[name]; found {
if !reflect.DeepEqual(existing.Spec, ref.Spec) { if checkAndUpdatePod(existing, ref) {
// this is an update // this is an update
existing.Spec = ref.Spec
updates.Pods = append(updates.Pods, existing) updates.Pods = append(updates.Pods, existing)
continue continue
} }
@ -261,9 +260,8 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
name := kubecontainer.GetPodFullName(ref) name := kubecontainer.GetPodFullName(ref)
if existing, found := oldPods[name]; found { if existing, found := oldPods[name]; found {
pods[name] = existing pods[name] = existing
if !reflect.DeepEqual(existing.Spec, ref.Spec) { if checkAndUpdatePod(existing, ref) {
// this is an update // this is an update
existing.Spec = ref.Spec
updates.Pods = append(updates.Pods, existing) updates.Pods = append(updates.Pods, existing)
continue continue
} }
@ -335,6 +333,23 @@ func filterInvalidPods(pods []*api.Pod, source string, recorder record.EventReco
return return
} }
// checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or
// returns false if there was no update.
func checkAndUpdatePod(existing, ref *api.Pod) bool {
// TODO: it would be better to update the whole object and only preserve certain things
// like the source annotation or the UID (to ensure safety)
if reflect.DeepEqual(existing.Spec, ref.Spec) &&
reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) &&
reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) {
return false
}
// this is an update
existing.Spec = ref.Spec
existing.DeletionTimestamp = ref.DeletionTimestamp
existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds
return true
}
// Sync sends a copy of the current state through the update channel. // Sync sends a copy of the current state through the update channel.
func (s *podStorage) Sync() { func (s *podStorage) Sync() {
s.updateLock.Lock() s.updateLock.Lock()

View File

@ -163,13 +163,13 @@ func (f *FakeRuntime) SyncPod(pod *api.Pod, _ Pod, _ api.PodStatus, _ []api.Secr
return f.Err return f.Err
} }
func (f *FakeRuntime) KillPod(pod Pod) error { func (f *FakeRuntime) KillPod(pod *api.Pod, runningPod Pod) error {
f.Lock() f.Lock()
defer f.Unlock() defer f.Unlock()
f.CalledFunctions = append(f.CalledFunctions, "KillPod") f.CalledFunctions = append(f.CalledFunctions, "KillPod")
f.KilledPods = append(f.KilledPods, string(pod.ID)) f.KilledPods = append(f.KilledPods, string(runningPod.ID))
for _, c := range pod.Containers { for _, c := range runningPod.Containers {
f.KilledContainers = append(f.KilledContainers, c.Name) f.KilledContainers = append(f.KilledContainers, c.Name)
} }
return f.Err return f.Err

View File

@ -54,8 +54,8 @@ type Runtime interface {
GetPods(all bool) ([]*Pod, error) GetPods(all bool) ([]*Pod, error)
// Syncs the running pod into the desired pod. // Syncs the running pod into the desired pod.
SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error
// KillPod kills all the containers of a pod. // KillPod kills all the containers of a pod. Pod may be nil, running pod must not be.
KillPod(pod Pod) error KillPod(pod *api.Pod, runningPod Pod) error
// GetPodStatus retrieves the status of the pod, including the information of // GetPodStatus retrieves the status of the pod, including the information of
// all containers in the pod. Clients of this interface assume the containers // all containers in the pod. Clients of this interface assume the containers
// statuses in a pod always have a deterministic ordering (eg: sorted by name). // statuses in a pod always have a deterministic ordering (eg: sorted by name).

View File

@ -56,12 +56,21 @@ import (
const ( const (
maxReasonCacheEntries = 200 maxReasonCacheEntries = 200
kubernetesPodLabel = "io.kubernetes.pod.data"
kubernetesContainerLabel = "io.kubernetes.container.name"
// ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified) // ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified)
// we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative. // we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative.
// hence, setting ndots to be 5. // hence, setting ndots to be 5.
ndotsDNSOption = "options ndots:5\n" ndotsDNSOption = "options ndots:5\n"
// In order to avoid unnecessary SIGKILLs, give every container a minimum grace
// period after SIGTERM. Docker will guarantee the termination, but SIGTERM is
// potentially dangerous.
// TODO: evaluate whether there are scenarios in which SIGKILL is preferable to
// SIGTERM for certain process types, which may justify setting this to 0.
minimumGracePeriodInSeconds = 2
kubernetesNameLabel = "io.kubernetes.pod.name"
kubernetesPodLabel = "io.kubernetes.pod.data"
kubernetesTerminationGracePeriodLabel = "io.kubernetes.pod.terminationGracePeriod"
kubernetesContainerLabel = "io.kubernetes.container.name"
) )
// DockerManager implements the Runtime interface. // DockerManager implements the Runtime interface.
@ -589,12 +598,19 @@ func (dm *DockerManager) runContainer(
if len(containerHostname) > hostnameMaxLen { if len(containerHostname) > hostnameMaxLen {
containerHostname = containerHostname[:hostnameMaxLen] containerHostname = containerHostname[:hostnameMaxLen]
} }
// Pod information is recorded on the container as labels to preserve it in the event the pod is deleted
// while the Kubelet is down and there is no information available to recover the pod. This includes
// termination information like the termination grace period and the pre stop hooks.
// TODO: keep these labels up to date if the pod changes
namespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} namespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
labels := map[string]string{ labels := map[string]string{
"io.kubernetes.pod.name": namespacedName.String(), kubernetesNameLabel: namespacedName.String(),
}
if pod.Spec.TerminationGracePeriodSeconds != nil {
labels[kubernetesTerminationGracePeriodLabel] = strconv.FormatInt(*pod.Spec.TerminationGracePeriodSeconds, 10)
} }
if container.Lifecycle != nil && container.Lifecycle.PreStop != nil { if container.Lifecycle != nil && container.Lifecycle.PreStop != nil {
glog.V(1).Infof("Setting preStop hook")
// TODO: This is kind of hacky, we should really just encode the bits we need. // TODO: This is kind of hacky, we should really just encode the bits we need.
data, err := latest.Codec.Encode(pod) data, err := latest.Codec.Encode(pod)
if err != nil { if err != nil {
@ -1106,40 +1122,56 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream
} }
// Kills all containers in the specified pod // Kills all containers in the specified pod
func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error { func (dm *DockerManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
// Send the kills in parallel since they may take a long time. Len + 1 since there // Send the kills in parallel since they may take a long time. Len + 1 since there
// can be Len errors + the networkPlugin teardown error. // can be Len errors + the networkPlugin teardown error.
errs := make(chan error, len(pod.Containers)+1) errs := make(chan error, len(runningPod.Containers)+1)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
var networkID types.UID var (
for _, container := range pod.Containers { networkContainer *kubecontainer.Container
networkSpec *api.Container
)
for _, container := range runningPod.Containers {
wg.Add(1) wg.Add(1)
go func(container *kubecontainer.Container) { go func(container *kubecontainer.Container) {
defer util.HandleCrash() defer util.HandleCrash()
defer wg.Done() defer wg.Done()
var containerSpec *api.Container
if pod != nil {
for i, c := range pod.Spec.Containers {
if c.Name == container.Name {
containerSpec = &pod.Spec.Containers[i]
break
}
}
}
// TODO: Handle this without signaling the pod infra container to // TODO: Handle this without signaling the pod infra container to
// adapt to the generic container runtime. // adapt to the generic container runtime.
if container.Name == PodInfraContainerName { if container.Name == PodInfraContainerName {
// Store the container runtime for later deletion. // Store the container runtime for later deletion.
// We do this so that PreStop handlers can run in the network namespace. // We do this so that PreStop handlers can run in the network namespace.
networkID = container.ID networkContainer = container
networkSpec = containerSpec
return return
} }
if err := dm.killContainer(container.ID); err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID) err := dm.KillContainerInPod(container.ID, containerSpec, pod)
if err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID)
errs <- err errs <- err
} }
}(container) }(container)
} }
wg.Wait() wg.Wait()
if len(networkID) > 0 { if networkContainer != nil {
if err := dm.networkPlugin.TearDownPod(pod.Namespace, pod.Name, kubeletTypes.DockerID(networkID)); err != nil { if err := dm.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, kubeletTypes.DockerID(networkContainer.ID)); err != nil {
glog.Errorf("Failed tearing down the infra container: %v", err) glog.Errorf("Failed tearing down the infra container: %v", err)
errs <- err errs <- err
} }
if err := dm.killContainer(networkID); err != nil { if err := dm.KillContainerInPod(networkContainer.ID, networkSpec, pod); err != nil {
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID) glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID)
errs <- err errs <- err
} }
} }
@ -1154,75 +1186,152 @@ func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error {
return nil return nil
} }
// KillContainerInPod kills a container in the pod. // KillContainerInPod kills a container in the pod. It must be passed either a container ID or a container and pod,
func (dm *DockerManager) KillContainerInPod(container api.Container, pod *api.Pod) error { // and will attempt to lookup the other information if missing.
// Locate the container. func (dm *DockerManager) KillContainerInPod(containerID types.UID, container *api.Container, pod *api.Pod) error {
pods, err := dm.GetPods(false) switch {
if err != nil { case len(containerID) == 0:
return err // Locate the container.
} pods, err := dm.GetPods(false)
targetPod := kubecontainer.Pods(pods).FindPod(kubecontainer.GetPodFullName(pod), pod.UID)
targetContainer := targetPod.FindContainerByName(container.Name)
if targetContainer == nil {
return fmt.Errorf("unable to find container %q in pod %q", container.Name, targetPod.Name)
}
return dm.killContainer(targetContainer.ID)
}
// TODO(vmarmol): Unexport this as it is no longer used externally.
// KillContainer kills a container identified by containerID.
// Internally, it invokes docker's StopContainer API with a timeout of 10s.
// TODO: Deprecate this function in favor of KillContainerInPod.
func (dm *DockerManager) KillContainer(containerID types.UID) error {
return dm.killContainer(containerID)
}
func (dm *DockerManager) killContainer(containerID types.UID) error {
ID := string(containerID)
glog.V(2).Infof("Killing container with id %q", ID)
inspect, err := dm.client.InspectContainer(ID)
if err != nil {
return err
}
var found bool
var preStop string
if inspect != nil && inspect.Config != nil && inspect.Config.Labels != nil {
preStop, found = inspect.Config.Labels[kubernetesPodLabel]
}
if found {
var pod api.Pod
err := latest.Codec.DecodeInto([]byte(preStop), &pod)
if err != nil { if err != nil {
glog.Errorf("Failed to decode prestop: %s, %s", preStop, ID) return err
} else { }
name := inspect.Config.Labels[kubernetesContainerLabel] targetPod := kubecontainer.Pods(pods).FindPod(kubecontainer.GetPodFullName(pod), pod.UID)
var container *api.Container targetContainer := targetPod.FindContainerByName(container.Name)
if targetContainer == nil {
return fmt.Errorf("unable to find container %q in pod %q", container.Name, targetPod.Name)
}
containerID = targetContainer.ID
case container == nil || pod == nil:
// Read information about the container from labels
inspect, err := dm.client.InspectContainer(string(containerID))
if err != nil {
return err
}
storedPod, storedContainer, cerr := containerAndPodFromLabels(inspect)
if cerr != nil {
glog.Errorf("unable to access pod data from container: %v", err)
}
if container == nil {
container = storedContainer
}
if pod == nil {
pod = storedPod
}
}
return dm.killContainer(containerID, container, pod)
}
// killContainer accepts a containerID and an optional container or pod containing shutdown policies. Invoke
// KillContainerInPod if information must be retrieved first.
func (dm *DockerManager) killContainer(containerID types.UID, container *api.Container, pod *api.Pod) error {
ID := string(containerID)
name := ID
if container != nil {
name = fmt.Sprintf("%s %s", name, container.Name)
}
if pod != nil {
name = fmt.Sprintf("%s %s/%s", name, pod.Namespace, pod.Name)
}
gracePeriod := int64(minimumGracePeriodInSeconds)
if pod != nil {
switch {
case pod.DeletionGracePeriodSeconds != nil:
gracePeriod = *pod.DeletionGracePeriodSeconds
case pod.Spec.TerminationGracePeriodSeconds != nil:
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
}
}
glog.V(2).Infof("Killing container %q with %d second grace period", name, gracePeriod)
start := util.Now()
if pod != nil && container != nil && container.Lifecycle != nil && container.Lifecycle.PreStop != nil {
glog.V(4).Infof("Running preStop hook for container %q", name)
done := make(chan struct{})
go func() {
defer close(done)
defer util.HandleCrash()
if err := dm.runner.Run(ID, pod, container, container.Lifecycle.PreStop); err != nil {
glog.Errorf("preStop hook for container %q failed: %v", name, err)
}
}()
select {
case <-time.After(time.Duration(gracePeriod) * time.Second):
glog.V(2).Infof("preStop hook for container %q did not complete in %d seconds", name, gracePeriod)
case <-done:
glog.V(4).Infof("preStop hook for container %q completed", name)
}
gracePeriod -= int64(util.Now().Sub(start.Time).Seconds())
}
dm.readinessManager.RemoveReadiness(ID)
// always give containers a minimal shutdown window to avoid unnecessary SIGKILLs
if gracePeriod < minimumGracePeriodInSeconds {
gracePeriod = minimumGracePeriodInSeconds
}
err := dm.client.StopContainer(ID, uint(gracePeriod))
if _, ok := err.(*docker.ContainerNotRunning); ok && err != nil {
glog.V(4).Infof("Container %q has already exited", name)
return nil
}
if err == nil {
glog.V(2).Infof("Container %q exited after %s", name, util.Now().Sub(start.Time))
} else {
glog.V(2).Infof("Container %q termination failed after %s: %v", name, util.Now().Sub(start.Time), err)
}
ref, ok := dm.containerRefManager.GetRef(ID)
if !ok {
glog.Warningf("No ref for pod '%q'", name)
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
dm.recorder.Eventf(ref, "Killing", "Killing with docker id %v", util.ShortenString(ID, 12))
}
return err
}
var errNoPodOnContainer = fmt.Errorf("no pod information labels on Docker container")
// containerAndPodFromLabels tries to load the appropriate container info off of a Docker container's labels
func containerAndPodFromLabels(inspect *docker.Container) (pod *api.Pod, container *api.Container, err error) {
if inspect == nil && inspect.Config == nil && inspect.Config.Labels == nil {
return nil, nil, errNoPodOnContainer
}
labels := inspect.Config.Labels
// the pod data may not be set
if body, found := labels[kubernetesPodLabel]; found {
pod = &api.Pod{}
if err = latest.Codec.DecodeInto([]byte(body), pod); err == nil {
name := labels[kubernetesContainerLabel]
for ix := range pod.Spec.Containers { for ix := range pod.Spec.Containers {
if pod.Spec.Containers[ix].Name == name { if pod.Spec.Containers[ix].Name == name {
container = &pod.Spec.Containers[ix] container = &pod.Spec.Containers[ix]
break break
} }
} }
if container != nil { if container == nil {
glog.V(1).Infof("Running preStop hook") err = fmt.Errorf("unable to find container %s in pod %v", name, pod)
if err := dm.runner.Run(ID, &pod, container, container.Lifecycle.PreStop); err != nil { }
glog.Errorf("failed to run preStop hook: %v", err) } else {
} pod = nil
} else { }
glog.Errorf("unable to find container %v, %s", pod, name) }
// attempt to find the default grace period if we didn't commit a pod, but set the generic metadata
// field (the one used by kill)
if pod == nil {
if period, ok := labels[kubernetesTerminationGracePeriodLabel]; ok {
if seconds, err := strconv.ParseInt(period, 10, 64); err == nil {
pod = &api.Pod{}
pod.DeletionGracePeriodSeconds = &seconds
} }
} }
} }
dm.readinessManager.RemoveReadiness(ID)
err = dm.client.StopContainer(ID, 10) return
ref, ok := dm.containerRefManager.GetRef(ID)
if !ok {
glog.Warningf("No ref for pod '%v'", ID)
} else {
// TODO: pass reason down here, and state, or move this call up the stack.
dm.recorder.Eventf(ref, "Killing", "Killing with docker id %v", util.ShortenString(ID, 12))
}
return err
} }
// Run a single container from a pod. Returns the docker container ID // Run a single container from a pod. Returns the docker container ID
@ -1259,7 +1368,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe
if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart) handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart)
if handlerErr != nil { if handlerErr != nil {
dm.killContainer(types.UID(id)) dm.KillContainerInPod(types.UID(id), container, pod)
return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr) return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr)
} }
} }
@ -1540,10 +1649,10 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
podFullName := kubecontainer.GetPodFullName(pod) podFullName := kubecontainer.GetPodFullName(pod)
containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus) containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus)
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if err != nil { if err != nil {
return err return err
} }
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) { if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) {
if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 { if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 {
@ -1553,7 +1662,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
} }
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
err = dm.KillPod(runningPod) err = dm.KillPod(pod, runningPod)
if err != nil { if err != nil {
return err return err
} }
@ -1563,7 +1672,15 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
_, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)] _, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)]
if !keep { if !keep {
glog.V(3).Infof("Killing unwanted container %+v", container) glog.V(3).Infof("Killing unwanted container %+v", container)
err = dm.KillContainer(container.ID) // attempt to find the appropriate container policy
var podContainer *api.Container
for i, c := range pod.Spec.Containers {
if c.Name == container.Name {
podContainer = &pod.Spec.Containers[i]
break
}
}
err = dm.KillContainerInPod(container.ID, podContainer, pod)
if err != nil { if err != nil {
glog.Errorf("Error killing container: %v", err) glog.Errorf("Error killing container: %v", err)
} }

View File

@ -405,7 +405,7 @@ func TestKillContainerInPod(t *testing.T) {
manager.readinessManager.SetReadiness(c.ID, true) manager.readinessManager.SetReadiness(c.ID, true)
} }
if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err != nil { if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
// Assert the container has been stopped. // Assert the container has been stopped.
@ -478,14 +478,14 @@ func TestKillContainerInPodWithPreStop(t *testing.T) {
manager.readinessManager.SetReadiness(c.ID, true) manager.readinessManager.SetReadiness(c.ID, true)
} }
if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err != nil { if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
// Assert the container has been stopped. // Assert the container has been stopped.
if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil { if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil {
t.Errorf("container was not stopped correctly: %v", err) t.Errorf("container was not stopped correctly: %v", err)
} }
verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "create_exec", "start_exec", "stop"}) verifyCalls(t, fakeDocker, []string{"list", "create_exec", "start_exec", "stop"})
if !reflect.DeepEqual(expectedCmd, fakeDocker.execCmd) { if !reflect.DeepEqual(expectedCmd, fakeDocker.execCmd) {
t.Errorf("expected: %v, got %v", expectedCmd, fakeDocker.execCmd) t.Errorf("expected: %v, got %v", expectedCmd, fakeDocker.execCmd)
} }
@ -522,7 +522,7 @@ func TestKillContainerInPodWithError(t *testing.T) {
manager.readinessManager.SetReadiness(c.ID, true) manager.readinessManager.SetReadiness(c.ID, true)
} }
if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err == nil { if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err == nil {
t.Errorf("expected error, found nil") t.Errorf("expected error, found nil")
} }
@ -1021,7 +1021,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) {
verifyCalls(t, fakeDocker, []string{ verifyCalls(t, fakeDocker, []string{
// Kill the container since pod infra container is not running. // Kill the container since pod infra container is not running.
"inspect_container", "stop", "stop",
// Create pod infra container. // Create pod infra container.
"create", "start", "inspect_container", "create", "start", "inspect_container",
// Create container. // Create container.
@ -1096,7 +1096,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
// Check the pod infra container. // Check the pod infra container.
"inspect_container", "inspect_container",
// Kill the duplicated container. // Kill the duplicated container.
"inspect_container", "stop", "stop",
}) })
// Expect one of the duplicates to be killed. // Expect one of the duplicates to be killed.
if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") { if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") {
@ -1150,7 +1150,7 @@ func TestSyncPodBadHash(t *testing.T) {
// Check the pod infra container. // Check the pod infra container.
"inspect_container", "inspect_container",
// Kill and restart the bad hash container. // Kill and restart the bad hash container.
"inspect_container", "stop", "create", "start", "inspect_container", "stop", "create", "start", "inspect_container",
}) })
if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil { if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil {
@ -1208,7 +1208,7 @@ func TestSyncPodsUnhealthy(t *testing.T) {
// Check the pod infra container. // Check the pod infra container.
"inspect_container", "inspect_container",
// Kill the unhealthy container. // Kill the unhealthy container.
"inspect_container", "stop", "stop",
// Restart the unhealthy container. // Restart the unhealthy container.
"create", "start", "inspect_container", "create", "start", "inspect_container",
}) })
@ -1441,9 +1441,9 @@ func TestSyncPodWithRestartPolicy(t *testing.T) {
api.RestartPolicyNever, api.RestartPolicyNever,
[]string{ []string{
// Check the pod infra container. // Check the pod infra container.
"inspect_container", "inspect_container", "inspect_container",
// Stop the last pod infra container. // Stop the last pod infra container.
"inspect_container", "stop", "stop",
}, },
[]string{}, []string{},
[]string{"9876"}, []string{"9876"},
@ -1910,7 +1910,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
// Create the container. // Create the container.
"create", "start", "create", "start",
// Kill the container since event handler fails. // Kill the container since event handler fails.
"inspect_container", "stop", "stop",
}) })
// TODO(yifan): Check the stopped container's name. // TODO(yifan): Check the stopped container's name.

View File

@ -1124,8 +1124,8 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string,
} }
// Kill all running containers in a pod (includes the pod infra container). // Kill all running containers in a pod (includes the pod infra container).
func (kl *Kubelet) killPod(pod kubecontainer.Pod) error { func (kl *Kubelet) killPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
return kl.containerRuntime.KillPod(pod) return kl.containerRuntime.KillPod(pod, runningPod)
} }
type empty struct{} type empty struct{}
@ -1181,9 +1181,10 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
}() }()
// Kill pods we can't run. // Kill pods we can't run.
err := canRunPod(pod) if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
if err != nil { if err := kl.killPod(pod, runningPod); err != nil {
kl.killPod(runningPod) util.HandleError(err)
}
return err return err
} }
@ -1370,6 +1371,32 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco
return nil return nil
} }
// Delete any pods that are no longer running and are marked for deletion.
func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error {
var terminating []*api.Pod
for _, pod := range pods {
if pod.DeletionTimestamp != nil {
found := false
for _, runningPod := range runningPods {
if runningPod.ID == pod.UID {
found = true
break
}
}
if found {
podFullName := kubecontainer.GetPodFullName(pod)
glog.V(5).Infof("Keeping terminated pod %q and uid %q, still running", podFullName, pod.UID)
continue
}
terminating = append(terminating, pod)
}
}
if !kl.statusManager.TerminatePods(terminating) {
return errors.New("not all pods were successfully terminated")
}
return nil
}
// pastActiveDeadline returns true if the pod has been active for more than // pastActiveDeadline returns true if the pod has been active for more than
// ActiveDeadlineSeconds. // ActiveDeadlineSeconds.
func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
@ -1532,6 +1559,10 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro
// Remove any orphaned mirror pods. // Remove any orphaned mirror pods.
kl.podManager.DeleteOrphanedMirrorPods() kl.podManager.DeleteOrphanedMirrorPods()
if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil {
glog.Errorf("Failed to cleanup terminated pods: %v", err)
}
return err return err
} }
@ -1554,7 +1585,7 @@ func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty,
}() }()
glog.V(1).Infof("Killing unwanted pod %q", pod.Name) glog.V(1).Infof("Killing unwanted pod %q", pod.Name)
// Stop the containers. // Stop the containers.
err = kl.killPod(*pod) err = kl.killPod(nil, *pod)
if err != nil { if err != nil {
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err) glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
return return

View File

@ -64,7 +64,7 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error {
return err return err
} }
glog.V(4).Infof("Deleting a mirror pod %q", podFullName) glog.V(4).Infof("Deleting a mirror pod %q", podFullName)
if err := mc.apiserverClient.Pods(namespace).Delete(name, nil); err != nil { if err := mc.apiserverClient.Pods(namespace).Delete(name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err) glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)
} }
return nil return nil

View File

@ -693,11 +693,11 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
} }
// KillPod invokes 'systemctl kill' to kill the unit that runs the pod. // KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
func (r *runtime) KillPod(pod kubecontainer.Pod) error { func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
glog.V(4).Infof("Rkt is killing pod: name %q.", pod.Name) glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name)
// TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout. // TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout.
r.systemd.KillUnit(makePodServiceFileName(pod.ID), int32(syscall.SIGKILL)) r.systemd.KillUnit(makePodServiceFileName(runningPod.ID), int32(syscall.SIGKILL))
return r.systemd.Reload() return r.systemd.Reload()
} }
@ -966,7 +966,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus
if restartPod { if restartPod {
// TODO(yifan): Handle network plugin. // TODO(yifan): Handle network plugin.
if err := r.KillPod(runningPod); err != nil { if err := r.KillPod(pod, runningPod); err != nil {
return err return err
} }
if err := r.RunPod(pod, pullSecrets); err != nil { if err := r.RunPod(pod, pullSecrets); err != nil {

View File

@ -24,6 +24,7 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -122,7 +123,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
// Currently this routine is not called for the same pod from multiple // Currently this routine is not called for the same pod from multiple
// workers and/or the kubelet but dropping the lock before sending the // workers and/or the kubelet but dropping the lock before sending the
// status down the channel feels like an easy way to get a bullet in foot. // status down the channel feels like an easy way to get a bullet in foot.
if !found || !isStatusEqual(&oldStatus, &status) { if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil {
s.podStatuses[podFullName] = status s.podStatuses[podFullName] = status
s.podStatusChannel <- podStatusSyncRequest{pod, status} s.podStatusChannel <- podStatusSyncRequest{pod, status}
} else { } else {
@ -130,6 +131,29 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) {
} }
} }
// TerminatePods resets the container status for the provided pods to terminated and triggers
// a status update. This function may not enqueue all the provided pods, in which case it will
// return false
func (s *statusManager) TerminatePods(pods []*api.Pod) bool {
sent := true
s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock()
for _, pod := range pods {
for i := range pod.Status.ContainerStatuses {
pod.Status.ContainerStatuses[i].State = api.ContainerState{
Terminated: &api.ContainerStateTerminated{},
}
}
select {
case s.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}:
default:
sent = false
glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod))
}
}
return sent
}
func (s *statusManager) DeletePodStatus(podFullName string) { func (s *statusManager) DeletePodStatus(podFullName string) {
s.podStatusesLock.Lock() s.podStatusesLock.Lock()
defer s.podStatusesLock.Unlock() defer s.podStatusesLock.Unlock()
@ -161,13 +185,33 @@ func (s *statusManager) syncBatch() error {
} }
// TODO: make me easier to express from client code // TODO: make me easier to express from client code
statusPod, err = s.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name) statusPod, err = s.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name)
if errors.IsNotFound(err) {
glog.V(3).Infof("Pod %q was deleted on the server", pod.Name)
return nil
}
if err == nil { if err == nil {
if len(pod.UID) > 0 && statusPod.UID != pod.UID {
glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", kubeletUtil.FormatPodName(pod))
return nil
}
statusPod.Status = status statusPod.Status = status
_, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)
// TODO: handle conflict as a retry, make that easier too. // TODO: handle conflict as a retry, make that easier too.
statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod)
if err == nil { if err == nil {
glog.V(3).Infof("Status for pod %q updated successfully", kubeletUtil.FormatPodName(pod)) glog.V(3).Infof("Status for pod %q updated successfully", kubeletUtil.FormatPodName(pod))
return nil
if pod.DeletionTimestamp == nil {
return nil
}
if !notRunning(pod.Status.ContainerStatuses) {
glog.V(3).Infof("Pod %q is terminated, but some pods are still running", pod.Name)
return nil
}
if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil {
glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name)
s.DeletePodStatus(podFullName)
return nil
}
} }
} }
@ -181,3 +225,14 @@ func (s *statusManager) syncBatch() error {
go s.DeletePodStatus(podFullName) go s.DeletePodStatus(podFullName)
return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err) return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err)
} }
// notRunning returns true if every status is terminated or waiting, or the status list
// is empty.
func notRunning(statuses []api.ContainerStatus) bool {
for _, status := range statuses {
if status.State.Terminated == nil && status.State.Waiting == nil {
return false
}
}
return true
}

View File

@ -153,8 +153,21 @@ func TestUnchangedStatus(t *testing.T) {
verifyUpdates(t, syncer, 1) verifyUpdates(t, syncer, 1)
} }
func TestSyncBatchIgnoresNotFound(t *testing.T) {
syncer := newTestStatusManager()
syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
t.Errorf("unexpected syncing error: %v", err)
}
verifyActions(t, syncer.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
})
}
func TestSyncBatch(t *testing.T) { func TestSyncBatch(t *testing.T) {
syncer := newTestStatusManager() syncer := newTestStatusManager()
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(testPod, getRandomPodStatus()) syncer.SetPodStatus(testPod, getRandomPodStatus())
err := syncer.syncBatch() err := syncer.syncBatch()
if err != nil { if err != nil {
@ -167,6 +180,22 @@ func TestSyncBatch(t *testing.T) {
) )
} }
func TestSyncBatchChecksMismatchedUID(t *testing.T) {
syncer := newTestStatusManager()
testPod.UID = "first"
differentPod := *testPod
differentPod.UID = "second"
syncer.kubeClient = testclient.NewSimpleFake(testPod)
syncer.SetPodStatus(&differentPod, getRandomPodStatus())
err := syncer.syncBatch()
if err != nil {
t.Errorf("unexpected syncing error: %v", err)
}
verifyActions(t, syncer.kubeClient, []testclient.Action{
testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}},
})
}
// shuffle returns a new shuffled list of container statuses. // shuffle returns a new shuffled list of container statuses.
func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus { func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus {
numStatuses := len(statuses) numStatuses := len(statuses)