diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 99ebcfd7545..93b48519ffa 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -217,9 +217,8 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de for _, ref := range filtered { name := kubecontainer.GetPodFullName(ref) if existing, found := pods[name]; found { - if !reflect.DeepEqual(existing.Spec, ref.Spec) { + if checkAndUpdatePod(existing, ref) { // this is an update - existing.Spec = ref.Spec updates.Pods = append(updates.Pods, existing) continue } @@ -261,9 +260,8 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de name := kubecontainer.GetPodFullName(ref) if existing, found := oldPods[name]; found { pods[name] = existing - if !reflect.DeepEqual(existing.Spec, ref.Spec) { + if checkAndUpdatePod(existing, ref) { // this is an update - existing.Spec = ref.Spec updates.Pods = append(updates.Pods, existing) continue } @@ -335,6 +333,23 @@ func filterInvalidPods(pods []*api.Pod, source string, recorder record.EventReco return } +// checkAndUpdatePod updates existing if ref makes a meaningful change and returns true, or +// returns false if there was no update. +func checkAndUpdatePod(existing, ref *api.Pod) bool { + // TODO: it would be better to update the whole object and only preserve certain things + // like the source annotation or the UID (to ensure safety) + if reflect.DeepEqual(existing.Spec, ref.Spec) && + reflect.DeepEqual(existing.DeletionTimestamp, ref.DeletionTimestamp) && + reflect.DeepEqual(existing.DeletionGracePeriodSeconds, ref.DeletionGracePeriodSeconds) { + return false + } + // this is an update + existing.Spec = ref.Spec + existing.DeletionTimestamp = ref.DeletionTimestamp + existing.DeletionGracePeriodSeconds = ref.DeletionGracePeriodSeconds + return true +} + // Sync sends a copy of the current state through the update channel. func (s *podStorage) Sync() { s.updateLock.Lock() diff --git a/pkg/kubelet/container/fake_runtime.go b/pkg/kubelet/container/fake_runtime.go index 03d65e50a33..324d7ae6bf3 100644 --- a/pkg/kubelet/container/fake_runtime.go +++ b/pkg/kubelet/container/fake_runtime.go @@ -163,13 +163,13 @@ func (f *FakeRuntime) SyncPod(pod *api.Pod, _ Pod, _ api.PodStatus, _ []api.Secr return f.Err } -func (f *FakeRuntime) KillPod(pod Pod) error { +func (f *FakeRuntime) KillPod(pod *api.Pod, runningPod Pod) error { f.Lock() defer f.Unlock() f.CalledFunctions = append(f.CalledFunctions, "KillPod") - f.KilledPods = append(f.KilledPods, string(pod.ID)) - for _, c := range pod.Containers { + f.KilledPods = append(f.KilledPods, string(runningPod.ID)) + for _, c := range runningPod.Containers { f.KilledContainers = append(f.KilledContainers, c.Name) } return f.Err diff --git a/pkg/kubelet/container/runtime.go b/pkg/kubelet/container/runtime.go index be206f61f78..70ee3983152 100644 --- a/pkg/kubelet/container/runtime.go +++ b/pkg/kubelet/container/runtime.go @@ -54,8 +54,8 @@ type Runtime interface { GetPods(all bool) ([]*Pod, error) // Syncs the running pod into the desired pod. SyncPod(pod *api.Pod, runningPod Pod, podStatus api.PodStatus, pullSecrets []api.Secret) error - // KillPod kills all the containers of a pod. - KillPod(pod Pod) error + // KillPod kills all the containers of a pod. Pod may be nil, running pod must not be. + KillPod(pod *api.Pod, runningPod Pod) error // GetPodStatus retrieves the status of the pod, including the information of // all containers in the pod. Clients of this interface assume the containers // statuses in a pod always have a deterministic ordering (eg: sorted by name). diff --git a/pkg/kubelet/dockertools/manager.go b/pkg/kubelet/dockertools/manager.go index 3a32ee95b55..fe395ac9bcf 100644 --- a/pkg/kubelet/dockertools/manager.go +++ b/pkg/kubelet/dockertools/manager.go @@ -56,12 +56,21 @@ import ( const ( maxReasonCacheEntries = 200 - kubernetesPodLabel = "io.kubernetes.pod.data" - kubernetesContainerLabel = "io.kubernetes.container.name" // ndots specifies the minimum number of dots that a domain name must contain for the resolver to consider it as FQDN (fully-qualified) // we want to able to consider SRV lookup names like _dns._udp.kube-dns.default.svc to be considered relative. // hence, setting ndots to be 5. ndotsDNSOption = "options ndots:5\n" + // In order to avoid unnecessary SIGKILLs, give every container a minimum grace + // period after SIGTERM. Docker will guarantee the termination, but SIGTERM is + // potentially dangerous. + // TODO: evaluate whether there are scenarios in which SIGKILL is preferable to + // SIGTERM for certain process types, which may justify setting this to 0. + minimumGracePeriodInSeconds = 2 + + kubernetesNameLabel = "io.kubernetes.pod.name" + kubernetesPodLabel = "io.kubernetes.pod.data" + kubernetesTerminationGracePeriodLabel = "io.kubernetes.pod.terminationGracePeriod" + kubernetesContainerLabel = "io.kubernetes.container.name" ) // DockerManager implements the Runtime interface. @@ -589,12 +598,19 @@ func (dm *DockerManager) runContainer( if len(containerHostname) > hostnameMaxLen { containerHostname = containerHostname[:hostnameMaxLen] } + + // Pod information is recorded on the container as labels to preserve it in the event the pod is deleted + // while the Kubelet is down and there is no information available to recover the pod. This includes + // termination information like the termination grace period and the pre stop hooks. + // TODO: keep these labels up to date if the pod changes namespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} labels := map[string]string{ - "io.kubernetes.pod.name": namespacedName.String(), + kubernetesNameLabel: namespacedName.String(), + } + if pod.Spec.TerminationGracePeriodSeconds != nil { + labels[kubernetesTerminationGracePeriodLabel] = strconv.FormatInt(*pod.Spec.TerminationGracePeriodSeconds, 10) } if container.Lifecycle != nil && container.Lifecycle.PreStop != nil { - glog.V(1).Infof("Setting preStop hook") // TODO: This is kind of hacky, we should really just encode the bits we need. data, err := latest.Codec.Encode(pod) if err != nil { @@ -1106,40 +1122,56 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream } // Kills all containers in the specified pod -func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error { +func (dm *DockerManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error { // Send the kills in parallel since they may take a long time. Len + 1 since there // can be Len errors + the networkPlugin teardown error. - errs := make(chan error, len(pod.Containers)+1) + errs := make(chan error, len(runningPod.Containers)+1) wg := sync.WaitGroup{} - var networkID types.UID - for _, container := range pod.Containers { + var ( + networkContainer *kubecontainer.Container + networkSpec *api.Container + ) + for _, container := range runningPod.Containers { wg.Add(1) go func(container *kubecontainer.Container) { defer util.HandleCrash() defer wg.Done() + var containerSpec *api.Container + if pod != nil { + for i, c := range pod.Spec.Containers { + if c.Name == container.Name { + containerSpec = &pod.Spec.Containers[i] + break + } + } + } + // TODO: Handle this without signaling the pod infra container to // adapt to the generic container runtime. if container.Name == PodInfraContainerName { // Store the container runtime for later deletion. // We do this so that PreStop handlers can run in the network namespace. - networkID = container.ID + networkContainer = container + networkSpec = containerSpec return } - if err := dm.killContainer(container.ID); err != nil { - glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID) + + err := dm.KillContainerInPod(container.ID, containerSpec, pod) + if err != nil { + glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID) errs <- err } }(container) } wg.Wait() - if len(networkID) > 0 { - if err := dm.networkPlugin.TearDownPod(pod.Namespace, pod.Name, kubeletTypes.DockerID(networkID)); err != nil { + if networkContainer != nil { + if err := dm.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, kubeletTypes.DockerID(networkContainer.ID)); err != nil { glog.Errorf("Failed tearing down the infra container: %v", err) errs <- err } - if err := dm.killContainer(networkID); err != nil { - glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, pod.ID) + if err := dm.KillContainerInPod(networkContainer.ID, networkSpec, pod); err != nil { + glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID) errs <- err } } @@ -1154,75 +1186,152 @@ func (dm *DockerManager) KillPod(pod kubecontainer.Pod) error { return nil } -// KillContainerInPod kills a container in the pod. -func (dm *DockerManager) KillContainerInPod(container api.Container, pod *api.Pod) error { - // Locate the container. - pods, err := dm.GetPods(false) - if err != nil { - return err - } - targetPod := kubecontainer.Pods(pods).FindPod(kubecontainer.GetPodFullName(pod), pod.UID) - targetContainer := targetPod.FindContainerByName(container.Name) - if targetContainer == nil { - return fmt.Errorf("unable to find container %q in pod %q", container.Name, targetPod.Name) - } - return dm.killContainer(targetContainer.ID) -} - -// TODO(vmarmol): Unexport this as it is no longer used externally. -// KillContainer kills a container identified by containerID. -// Internally, it invokes docker's StopContainer API with a timeout of 10s. -// TODO: Deprecate this function in favor of KillContainerInPod. -func (dm *DockerManager) KillContainer(containerID types.UID) error { - return dm.killContainer(containerID) -} - -func (dm *DockerManager) killContainer(containerID types.UID) error { - ID := string(containerID) - glog.V(2).Infof("Killing container with id %q", ID) - inspect, err := dm.client.InspectContainer(ID) - if err != nil { - return err - } - var found bool - var preStop string - if inspect != nil && inspect.Config != nil && inspect.Config.Labels != nil { - preStop, found = inspect.Config.Labels[kubernetesPodLabel] - } - if found { - var pod api.Pod - err := latest.Codec.DecodeInto([]byte(preStop), &pod) +// KillContainerInPod kills a container in the pod. It must be passed either a container ID or a container and pod, +// and will attempt to lookup the other information if missing. +func (dm *DockerManager) KillContainerInPod(containerID types.UID, container *api.Container, pod *api.Pod) error { + switch { + case len(containerID) == 0: + // Locate the container. + pods, err := dm.GetPods(false) if err != nil { - glog.Errorf("Failed to decode prestop: %s, %s", preStop, ID) - } else { - name := inspect.Config.Labels[kubernetesContainerLabel] - var container *api.Container + return err + } + targetPod := kubecontainer.Pods(pods).FindPod(kubecontainer.GetPodFullName(pod), pod.UID) + targetContainer := targetPod.FindContainerByName(container.Name) + if targetContainer == nil { + return fmt.Errorf("unable to find container %q in pod %q", container.Name, targetPod.Name) + } + containerID = targetContainer.ID + + case container == nil || pod == nil: + // Read information about the container from labels + inspect, err := dm.client.InspectContainer(string(containerID)) + if err != nil { + return err + } + storedPod, storedContainer, cerr := containerAndPodFromLabels(inspect) + if cerr != nil { + glog.Errorf("unable to access pod data from container: %v", err) + } + if container == nil { + container = storedContainer + } + if pod == nil { + pod = storedPod + } + } + return dm.killContainer(containerID, container, pod) +} + +// killContainer accepts a containerID and an optional container or pod containing shutdown policies. Invoke +// KillContainerInPod if information must be retrieved first. +func (dm *DockerManager) killContainer(containerID types.UID, container *api.Container, pod *api.Pod) error { + ID := string(containerID) + name := ID + if container != nil { + name = fmt.Sprintf("%s %s", name, container.Name) + } + if pod != nil { + name = fmt.Sprintf("%s %s/%s", name, pod.Namespace, pod.Name) + } + + gracePeriod := int64(minimumGracePeriodInSeconds) + if pod != nil { + switch { + case pod.DeletionGracePeriodSeconds != nil: + gracePeriod = *pod.DeletionGracePeriodSeconds + case pod.Spec.TerminationGracePeriodSeconds != nil: + gracePeriod = *pod.Spec.TerminationGracePeriodSeconds + } + } + glog.V(2).Infof("Killing container %q with %d second grace period", name, gracePeriod) + start := util.Now() + + if pod != nil && container != nil && container.Lifecycle != nil && container.Lifecycle.PreStop != nil { + glog.V(4).Infof("Running preStop hook for container %q", name) + done := make(chan struct{}) + go func() { + defer close(done) + defer util.HandleCrash() + if err := dm.runner.Run(ID, pod, container, container.Lifecycle.PreStop); err != nil { + glog.Errorf("preStop hook for container %q failed: %v", name, err) + } + }() + select { + case <-time.After(time.Duration(gracePeriod) * time.Second): + glog.V(2).Infof("preStop hook for container %q did not complete in %d seconds", name, gracePeriod) + case <-done: + glog.V(4).Infof("preStop hook for container %q completed", name) + } + gracePeriod -= int64(util.Now().Sub(start.Time).Seconds()) + } + + dm.readinessManager.RemoveReadiness(ID) + + // always give containers a minimal shutdown window to avoid unnecessary SIGKILLs + if gracePeriod < minimumGracePeriodInSeconds { + gracePeriod = minimumGracePeriodInSeconds + } + err := dm.client.StopContainer(ID, uint(gracePeriod)) + if _, ok := err.(*docker.ContainerNotRunning); ok && err != nil { + glog.V(4).Infof("Container %q has already exited", name) + return nil + } + if err == nil { + glog.V(2).Infof("Container %q exited after %s", name, util.Now().Sub(start.Time)) + } else { + glog.V(2).Infof("Container %q termination failed after %s: %v", name, util.Now().Sub(start.Time), err) + } + ref, ok := dm.containerRefManager.GetRef(ID) + if !ok { + glog.Warningf("No ref for pod '%q'", name) + } else { + // TODO: pass reason down here, and state, or move this call up the stack. + dm.recorder.Eventf(ref, "Killing", "Killing with docker id %v", util.ShortenString(ID, 12)) + } + return err +} + +var errNoPodOnContainer = fmt.Errorf("no pod information labels on Docker container") + +// containerAndPodFromLabels tries to load the appropriate container info off of a Docker container's labels +func containerAndPodFromLabels(inspect *docker.Container) (pod *api.Pod, container *api.Container, err error) { + if inspect == nil && inspect.Config == nil && inspect.Config.Labels == nil { + return nil, nil, errNoPodOnContainer + } + labels := inspect.Config.Labels + + // the pod data may not be set + if body, found := labels[kubernetesPodLabel]; found { + pod = &api.Pod{} + if err = latest.Codec.DecodeInto([]byte(body), pod); err == nil { + name := labels[kubernetesContainerLabel] for ix := range pod.Spec.Containers { if pod.Spec.Containers[ix].Name == name { container = &pod.Spec.Containers[ix] break } } - if container != nil { - glog.V(1).Infof("Running preStop hook") - if err := dm.runner.Run(ID, &pod, container, container.Lifecycle.PreStop); err != nil { - glog.Errorf("failed to run preStop hook: %v", err) - } - } else { - glog.Errorf("unable to find container %v, %s", pod, name) + if container == nil { + err = fmt.Errorf("unable to find container %s in pod %v", name, pod) + } + } else { + pod = nil + } + } + + // attempt to find the default grace period if we didn't commit a pod, but set the generic metadata + // field (the one used by kill) + if pod == nil { + if period, ok := labels[kubernetesTerminationGracePeriodLabel]; ok { + if seconds, err := strconv.ParseInt(period, 10, 64); err == nil { + pod = &api.Pod{} + pod.DeletionGracePeriodSeconds = &seconds } } } - dm.readinessManager.RemoveReadiness(ID) - err = dm.client.StopContainer(ID, 10) - ref, ok := dm.containerRefManager.GetRef(ID) - if !ok { - glog.Warningf("No ref for pod '%v'", ID) - } else { - // TODO: pass reason down here, and state, or move this call up the stack. - dm.recorder.Eventf(ref, "Killing", "Killing with docker id %v", util.ShortenString(ID, 12)) - } - return err + + return } // Run a single container from a pod. Returns the docker container ID @@ -1259,7 +1368,7 @@ func (dm *DockerManager) runContainerInPod(pod *api.Pod, container *api.Containe if container.Lifecycle != nil && container.Lifecycle.PostStart != nil { handlerErr := dm.runner.Run(id, pod, container, container.Lifecycle.PostStart) if handlerErr != nil { - dm.killContainer(types.UID(id)) + dm.KillContainerInPod(types.UID(id), container, pod) return kubeletTypes.DockerID(""), fmt.Errorf("failed to call event handler: %v", handlerErr) } } @@ -1540,10 +1649,10 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod podFullName := kubecontainer.GetPodFullName(pod) containerChanges, err := dm.computePodContainerChanges(pod, runningPod, podStatus) - glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) if err != nil { return err } + glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges) if containerChanges.StartInfraContainer || (len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0) { if len(containerChanges.ContainersToKeep) == 0 && len(containerChanges.ContainersToStart) == 0 { @@ -1553,7 +1662,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod } // Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container) - err = dm.KillPod(runningPod) + err = dm.KillPod(pod, runningPod) if err != nil { return err } @@ -1563,7 +1672,15 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod _, keep := containerChanges.ContainersToKeep[kubeletTypes.DockerID(container.ID)] if !keep { glog.V(3).Infof("Killing unwanted container %+v", container) - err = dm.KillContainer(container.ID) + // attempt to find the appropriate container policy + var podContainer *api.Container + for i, c := range pod.Spec.Containers { + if c.Name == container.Name { + podContainer = &pod.Spec.Containers[i] + break + } + } + err = dm.KillContainerInPod(container.ID, podContainer, pod) if err != nil { glog.Errorf("Error killing container: %v", err) } diff --git a/pkg/kubelet/dockertools/manager_test.go b/pkg/kubelet/dockertools/manager_test.go index 3f76372a6e2..78597c464f8 100644 --- a/pkg/kubelet/dockertools/manager_test.go +++ b/pkg/kubelet/dockertools/manager_test.go @@ -405,7 +405,7 @@ func TestKillContainerInPod(t *testing.T) { manager.readinessManager.SetReadiness(c.ID, true) } - if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err != nil { + if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil { t.Errorf("unexpected error: %v", err) } // Assert the container has been stopped. @@ -478,14 +478,14 @@ func TestKillContainerInPodWithPreStop(t *testing.T) { manager.readinessManager.SetReadiness(c.ID, true) } - if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err != nil { + if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err != nil { t.Errorf("unexpected error: %v", err) } // Assert the container has been stopped. if err := fakeDocker.AssertStopped([]string{containerToKill.ID}); err != nil { t.Errorf("container was not stopped correctly: %v", err) } - verifyCalls(t, fakeDocker, []string{"list", "inspect_container", "create_exec", "start_exec", "stop"}) + verifyCalls(t, fakeDocker, []string{"list", "create_exec", "start_exec", "stop"}) if !reflect.DeepEqual(expectedCmd, fakeDocker.execCmd) { t.Errorf("expected: %v, got %v", expectedCmd, fakeDocker.execCmd) } @@ -522,7 +522,7 @@ func TestKillContainerInPodWithError(t *testing.T) { manager.readinessManager.SetReadiness(c.ID, true) } - if err := manager.KillContainerInPod(pod.Spec.Containers[0], pod); err == nil { + if err := manager.KillContainerInPod("", &pod.Spec.Containers[0], pod); err == nil { t.Errorf("expected error, found nil") } @@ -1021,7 +1021,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) { verifyCalls(t, fakeDocker, []string{ // Kill the container since pod infra container is not running. - "inspect_container", "stop", + "stop", // Create pod infra container. "create", "start", "inspect_container", // Create container. @@ -1096,7 +1096,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) { // Check the pod infra container. "inspect_container", // Kill the duplicated container. - "inspect_container", "stop", + "stop", }) // Expect one of the duplicates to be killed. if len(fakeDocker.Stopped) != 1 || (fakeDocker.Stopped[0] != "1234" && fakeDocker.Stopped[0] != "4567") { @@ -1150,7 +1150,7 @@ func TestSyncPodBadHash(t *testing.T) { // Check the pod infra container. "inspect_container", // Kill and restart the bad hash container. - "inspect_container", "stop", "create", "start", "inspect_container", + "stop", "create", "start", "inspect_container", }) if err := fakeDocker.AssertStopped([]string{"1234"}); err != nil { @@ -1208,7 +1208,7 @@ func TestSyncPodsUnhealthy(t *testing.T) { // Check the pod infra container. "inspect_container", // Kill the unhealthy container. - "inspect_container", "stop", + "stop", // Restart the unhealthy container. "create", "start", "inspect_container", }) @@ -1441,9 +1441,9 @@ func TestSyncPodWithRestartPolicy(t *testing.T) { api.RestartPolicyNever, []string{ // Check the pod infra container. - "inspect_container", + "inspect_container", "inspect_container", // Stop the last pod infra container. - "inspect_container", "stop", + "stop", }, []string{}, []string{"9876"}, @@ -1910,7 +1910,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) { // Create the container. "create", "start", // Kill the container since event handler fails. - "inspect_container", "stop", + "stop", }) // TODO(yifan): Check the stopped container's name. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index cf56785e954..c8c278157e7 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1124,8 +1124,8 @@ func parseResolvConf(reader io.Reader) (nameservers []string, searches []string, } // Kill all running containers in a pod (includes the pod infra container). -func (kl *Kubelet) killPod(pod kubecontainer.Pod) error { - return kl.containerRuntime.KillPod(pod) +func (kl *Kubelet) killPod(pod *api.Pod, runningPod kubecontainer.Pod) error { + return kl.containerRuntime.KillPod(pod, runningPod) } type empty struct{} @@ -1181,9 +1181,10 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont }() // Kill pods we can't run. - err := canRunPod(pod) - if err != nil { - kl.killPod(runningPod) + if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil { + if err := kl.killPod(pod, runningPod); err != nil { + util.HandleError(err) + } return err } @@ -1370,6 +1371,32 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []*api.Pod, runningPods []*kubeco return nil } +// Delete any pods that are no longer running and are marked for deletion. +func (kl *Kubelet) cleanupTerminatedPods(pods []*api.Pod, runningPods []*kubecontainer.Pod) error { + var terminating []*api.Pod + for _, pod := range pods { + if pod.DeletionTimestamp != nil { + found := false + for _, runningPod := range runningPods { + if runningPod.ID == pod.UID { + found = true + break + } + } + if found { + podFullName := kubecontainer.GetPodFullName(pod) + glog.V(5).Infof("Keeping terminated pod %q and uid %q, still running", podFullName, pod.UID) + continue + } + terminating = append(terminating, pod) + } + } + if !kl.statusManager.TerminatePods(terminating) { + return errors.New("not all pods were successfully terminated") + } + return nil +} + // pastActiveDeadline returns true if the pod has been active for more than // ActiveDeadlineSeconds. func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { @@ -1532,6 +1559,10 @@ func (kl *Kubelet) cleanupPods(allPods []*api.Pod, admittedPods []*api.Pod) erro // Remove any orphaned mirror pods. kl.podManager.DeleteOrphanedMirrorPods() + if err := kl.cleanupTerminatedPods(allPods, runningPods); err != nil { + glog.Errorf("Failed to cleanup terminated pods: %v", err) + } + return err } @@ -1554,7 +1585,7 @@ func (kl *Kubelet) killUnwantedPods(desiredPods map[types.UID]empty, }() glog.V(1).Infof("Killing unwanted pod %q", pod.Name) // Stop the containers. - err = kl.killPod(*pod) + err = kl.killPod(nil, *pod) if err != nil { glog.Errorf("Failed killing the pod %q: %v", pod.Name, err) return diff --git a/pkg/kubelet/mirror_client.go b/pkg/kubelet/mirror_client.go index c6367120c2d..e53cf51f63e 100644 --- a/pkg/kubelet/mirror_client.go +++ b/pkg/kubelet/mirror_client.go @@ -64,7 +64,7 @@ func (mc *basicMirrorClient) DeleteMirrorPod(podFullName string) error { return err } glog.V(4).Infof("Deleting a mirror pod %q", podFullName) - if err := mc.apiserverClient.Pods(namespace).Delete(name, nil); err != nil { + if err := mc.apiserverClient.Pods(namespace).Delete(name, api.NewDeleteOptions(0)); err != nil { glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err) } return nil diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 43535a58d45..8eed23beeda 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -693,11 +693,11 @@ func (r *runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { } // KillPod invokes 'systemctl kill' to kill the unit that runs the pod. -func (r *runtime) KillPod(pod kubecontainer.Pod) error { - glog.V(4).Infof("Rkt is killing pod: name %q.", pod.Name) +func (r *runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error { + glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name) // TODO(yifan): More graceful stop. Replace with StopUnit and wait for a timeout. - r.systemd.KillUnit(makePodServiceFileName(pod.ID), int32(syscall.SIGKILL)) + r.systemd.KillUnit(makePodServiceFileName(runningPod.ID), int32(syscall.SIGKILL)) return r.systemd.Reload() } @@ -966,7 +966,7 @@ func (r *runtime) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, podStatus if restartPod { // TODO(yifan): Handle network plugin. - if err := r.KillPod(runningPod); err != nil { + if err := r.KillPod(pod, runningPod); err != nil { return err } if err := r.RunPod(pod, pullSecrets); err != nil { diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 4553619c5a1..5a6e53e7723 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -24,6 +24,7 @@ import ( "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" client "k8s.io/kubernetes/pkg/client/unversioned" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types" @@ -122,7 +123,7 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { // Currently this routine is not called for the same pod from multiple // workers and/or the kubelet but dropping the lock before sending the // status down the channel feels like an easy way to get a bullet in foot. - if !found || !isStatusEqual(&oldStatus, &status) { + if !found || !isStatusEqual(&oldStatus, &status) || pod.DeletionTimestamp != nil { s.podStatuses[podFullName] = status s.podStatusChannel <- podStatusSyncRequest{pod, status} } else { @@ -130,6 +131,29 @@ func (s *statusManager) SetPodStatus(pod *api.Pod, status api.PodStatus) { } } +// TerminatePods resets the container status for the provided pods to terminated and triggers +// a status update. This function may not enqueue all the provided pods, in which case it will +// return false +func (s *statusManager) TerminatePods(pods []*api.Pod) bool { + sent := true + s.podStatusesLock.Lock() + defer s.podStatusesLock.Unlock() + for _, pod := range pods { + for i := range pod.Status.ContainerStatuses { + pod.Status.ContainerStatuses[i].State = api.ContainerState{ + Terminated: &api.ContainerStateTerminated{}, + } + } + select { + case s.podStatusChannel <- podStatusSyncRequest{pod, pod.Status}: + default: + sent = false + glog.V(4).Infof("Termination notice for %q was dropped because the status channel is full", kubeletUtil.FormatPodName(pod)) + } + } + return sent +} + func (s *statusManager) DeletePodStatus(podFullName string) { s.podStatusesLock.Lock() defer s.podStatusesLock.Unlock() @@ -161,13 +185,33 @@ func (s *statusManager) syncBatch() error { } // TODO: make me easier to express from client code statusPod, err = s.kubeClient.Pods(statusPod.Namespace).Get(statusPod.Name) + if errors.IsNotFound(err) { + glog.V(3).Infof("Pod %q was deleted on the server", pod.Name) + return nil + } if err == nil { + if len(pod.UID) > 0 && statusPod.UID != pod.UID { + glog.V(3).Infof("Pod %q was deleted and then recreated, skipping status update", kubeletUtil.FormatPodName(pod)) + return nil + } statusPod.Status = status - _, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod) // TODO: handle conflict as a retry, make that easier too. + statusPod, err = s.kubeClient.Pods(pod.Namespace).UpdateStatus(statusPod) if err == nil { glog.V(3).Infof("Status for pod %q updated successfully", kubeletUtil.FormatPodName(pod)) - return nil + + if pod.DeletionTimestamp == nil { + return nil + } + if !notRunning(pod.Status.ContainerStatuses) { + glog.V(3).Infof("Pod %q is terminated, but some pods are still running", pod.Name) + return nil + } + if err := s.kubeClient.Pods(statusPod.Namespace).Delete(statusPod.Name, api.NewDeleteOptions(0)); err == nil { + glog.V(3).Infof("Pod %q fully terminated and removed from etcd", statusPod.Name) + s.DeletePodStatus(podFullName) + return nil + } } } @@ -181,3 +225,14 @@ func (s *statusManager) syncBatch() error { go s.DeletePodStatus(podFullName) return fmt.Errorf("error updating status for pod %q: %v", kubeletUtil.FormatPodName(pod), err) } + +// notRunning returns true if every status is terminated or waiting, or the status list +// is empty. +func notRunning(statuses []api.ContainerStatus) bool { + for _, status := range statuses { + if status.State.Terminated == nil && status.State.Waiting == nil { + return false + } + } + return true +} diff --git a/pkg/kubelet/status_manager_test.go b/pkg/kubelet/status_manager_test.go index 0c30c764219..84da4f3d376 100644 --- a/pkg/kubelet/status_manager_test.go +++ b/pkg/kubelet/status_manager_test.go @@ -153,8 +153,21 @@ func TestUnchangedStatus(t *testing.T) { verifyUpdates(t, syncer, 1) } +func TestSyncBatchIgnoresNotFound(t *testing.T) { + syncer := newTestStatusManager() + syncer.SetPodStatus(testPod, getRandomPodStatus()) + err := syncer.syncBatch() + if err != nil { + t.Errorf("unexpected syncing error: %v", err) + } + verifyActions(t, syncer.kubeClient, []testclient.Action{ + testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, + }) +} + func TestSyncBatch(t *testing.T) { syncer := newTestStatusManager() + syncer.kubeClient = testclient.NewSimpleFake(testPod) syncer.SetPodStatus(testPod, getRandomPodStatus()) err := syncer.syncBatch() if err != nil { @@ -167,6 +180,22 @@ func TestSyncBatch(t *testing.T) { ) } +func TestSyncBatchChecksMismatchedUID(t *testing.T) { + syncer := newTestStatusManager() + testPod.UID = "first" + differentPod := *testPod + differentPod.UID = "second" + syncer.kubeClient = testclient.NewSimpleFake(testPod) + syncer.SetPodStatus(&differentPod, getRandomPodStatus()) + err := syncer.syncBatch() + if err != nil { + t.Errorf("unexpected syncing error: %v", err) + } + verifyActions(t, syncer.kubeClient, []testclient.Action{ + testclient.GetActionImpl{ActionImpl: testclient.ActionImpl{Verb: "get", Resource: "pods"}}, + }) +} + // shuffle returns a new shuffled list of container statuses. func shuffle(statuses []api.ContainerStatus) []api.ContainerStatus { numStatuses := len(statuses)