diff --git a/pkg/kubelet/container/sync_result.go b/pkg/kubelet/container/sync_result.go index 5a976115585..61e115a9826 100644 --- a/pkg/kubelet/container/sync_result.go +++ b/pkg/kubelet/container/sync_result.go @@ -39,6 +39,8 @@ var ( ErrKillContainer = errors.New("KillContainerError") ErrVerifyNonRoot = errors.New("VerifyNonRootError") ErrRunInitContainer = errors.New("RunInitContainerError") + ErrCreatePodSandbox = errors.New("CreatePodSandboxError") + ErrKillPodSandbox = errors.New("KillPodSandboxError") ) var ( @@ -51,11 +53,13 @@ var ( type SyncAction string const ( - StartContainer SyncAction = "StartContainer" - KillContainer SyncAction = "KillContainer" - SetupNetwork SyncAction = "SetupNetwork" - TeardownNetwork SyncAction = "TeardownNetwork" - InitContainer SyncAction = "InitContainer" + StartContainer SyncAction = "StartContainer" + KillContainer SyncAction = "KillContainer" + SetupNetwork SyncAction = "SetupNetwork" + TeardownNetwork SyncAction = "TeardownNetwork" + InitContainer SyncAction = "InitContainer" + CreatePodSandbox SyncAction = "CreatePodSandbox" + KillPodSandbox SyncAction = "KillPodSandbox" ) // SyncResult is the result of sync action. diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 1c0d2d94bc1..61df94a2fd6 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -24,13 +24,17 @@ import ( "os" "path" "sort" + "sync" "time" "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/events" "k8s.io/kubernetes/pkg/types" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/term" ) @@ -302,6 +306,125 @@ func (m *kubeGenericRuntimeManager) getKubeletContainerStatuses(podSandboxID str return statuses, nil } +// generateContainerEvent generates an event for the container. +func (m *kubeGenericRuntimeManager) generateContainerEvent(containerID kubecontainer.ContainerID, eventType, reason, message string) { + ref, ok := m.containerRefManager.GetRef(containerID) + if !ok { + glog.Warningf("No ref for container %q", containerID) + return + } + m.recorder.Event(ref, eventType, reason, message) +} + +// executePreStopHook runs the pre-stop lifecycle hooks if applicable and returns the duration it takes. +func (m *kubeGenericRuntimeManager) executePreStopHook(pod *api.Pod, containerID kubecontainer.ContainerID, containerSpec *api.Container, gracePeriod int64) int64 { + glog.V(3).Infof("Running preStop hook for container %q", containerID.String()) + + start := unversioned.Now() + done := make(chan struct{}) + go func() { + defer close(done) + defer utilruntime.HandleCrash() + if msg, err := m.runner.Run(containerID, pod, containerSpec, containerSpec.Lifecycle.PreStop); err != nil { + glog.Errorf("preStop hook for container %q failed: %v", containerSpec.Name, err) + m.generateContainerEvent(containerID, api.EventTypeWarning, events.FailedPreStopHook, msg) + } + }() + + select { + case <-time.After(time.Duration(gracePeriod) * time.Second): + glog.V(2).Infof("preStop hook for container %q did not complete in %d seconds", containerID, gracePeriod) + case <-done: + glog.V(3).Infof("preStop hook for container %q completed", containerID) + } + + return int64(unversioned.Now().Sub(start.Time).Seconds()) +} + +// killContainer kills a container through the following steps: +// * Run the pre-stop lifecycle hooks (if applicable). +// * Stop the container. +func (m *kubeGenericRuntimeManager) killContainer(pod *api.Pod, containerID kubecontainer.ContainerID, containerSpec *api.Container, reason string, gracePeriodOverride *int64) error { + gracePeriod := int64(minimumGracePeriodInSeconds) + if pod != nil { + switch { + case pod.DeletionGracePeriodSeconds != nil: + gracePeriod = *pod.DeletionGracePeriodSeconds + case pod.Spec.TerminationGracePeriodSeconds != nil: + gracePeriod = *pod.Spec.TerminationGracePeriodSeconds + } + } + + glog.V(2).Infof("Killing container %q with %d second grace period", containerID.String(), gracePeriod) + + // Run the pre-stop lifecycle hooks if applicable. + if pod != nil && containerSpec != nil && containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil { + gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod) + } + if gracePeriodOverride == nil { + // always give containers a minimal shutdown window to avoid unnecessary SIGKILLs + if gracePeriod < minimumGracePeriodInSeconds { + gracePeriod = minimumGracePeriodInSeconds + } + } else { + gracePeriod = *gracePeriodOverride + glog.V(3).Infof("Killing container %q, but using %d second grace period override", containerID, gracePeriod) + } + + err := m.runtimeService.StopContainer(containerID.ID, gracePeriod) + if err != nil { + glog.Errorf("Container %q termination failed with gracePeriod %d: %v", containerID.String(), gracePeriod, err) + } else { + glog.V(3).Infof("Container %q exited normally", containerID.String()) + } + + message := fmt.Sprintf("Killing container with id %s", containerID.String()) + if reason != "" { + message = fmt.Sprint(message, ":", reason) + } + m.generateContainerEvent(containerID, api.EventTypeNormal, events.KillingContainer, message) + m.containerRefManager.ClearRef(containerID) + + return err +} + +// killContainersWithSyncResult kills all pod's containers with sync results. +func (m *kubeGenericRuntimeManager) killContainersWithSyncResult(pod *api.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (syncResults []*kubecontainer.SyncResult) { + containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers)) + wg := sync.WaitGroup{} + + wg.Add(len(runningPod.Containers)) + for _, container := range runningPod.Containers { + go func(container *kubecontainer.Container) { + defer utilruntime.HandleCrash() + defer wg.Done() + + var containerSpec *api.Container + if pod != nil { + for i, c := range pod.Spec.Containers { + if container.Name == c.Name { + containerSpec = &pod.Spec.Containers[i] + break + } + } + } + + killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name) + if err := m.killContainer(pod, container.ID, containerSpec, "Need to kill Pod", gracePeriodOverride); err != nil { + killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error()) + } + containerResults <- killContainerResult + }(container) + } + wg.Wait() + close(containerResults) + + for containerResult := range containerResults { + syncResults = append(syncResults, containerResult) + } + return +} + // AttachContainer attaches to the container's console func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) { return fmt.Errorf("not implemented") diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 636e0f6c7dc..690944713b3 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -36,6 +36,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/network" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/kubelet/util/format" kubetypes "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/flowcontrol" ) @@ -45,6 +46,8 @@ const ( kubeRuntimeAPIVersion = "0.1.0" // The root directory for pod logs podLogsRootDirectory = "/var/log/pods" + // A minimal shutdown window for avoiding unnecessary SIGKILLs + minimumGracePeriodInSeconds = 2 ) var ( @@ -307,7 +310,77 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus, // only hard kill paths are allowed to specify a gracePeriodOverride in the kubelet in order to not corrupt user data. // it is useful when doing SIGKILL for hard eviction scenarios, or max grace period during soft eviction scenarios. func (m *kubeGenericRuntimeManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) error { - return fmt.Errorf("not implemented") + err := m.killPodWithSyncResult(pod, runningPod, gracePeriodOverride) + return err.Error() +} + +// killPodWithSyncResult kills a runningPod and returns SyncResult. +// Note: The pod passed in could be *nil* when kubelet restarted. +func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *api.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) { + killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride) + for _, containerResult := range killContainerResults { + result.AddSyncResult(containerResult) + } + + // Teardown network plugin + if len(runningPod.Sandboxes) == 0 { + glog.V(4).Infof("Can not find pod sandbox by UID %q, assuming already removed.", runningPod.ID) + return + } + + sandboxID := runningPod.Sandboxes[0].ID.ID + isHostNetwork, err := m.isHostNetwork(sandboxID, pod) + if err != nil { + result.Fail(err) + return + } + if !isHostNetwork { + teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, pod.UID) + result.AddSyncResult(teardownNetworkResult) + // Tear down network plugin with sandbox id + if err := m.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, kubecontainer.ContainerID{ + Type: m.runtimeName, + ID: sandboxID, + }); err != nil { + message := fmt.Sprintf("Failed to teardown network for pod %q using network plugins %q: %v", + format.Pod(pod), m.networkPlugin.Name(), err) + teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, message) + glog.Error(message) + } + } + + // stop sandbox, the sandbox will be removed in GarbageCollect + killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID) + result.AddSyncResult(killSandboxResult) + // Stop all sandboxes belongs to same pod + for _, podSandbox := range runningPod.Sandboxes { + if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil { + killSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error()) + glog.Errorf("Failed to stop sandbox %q", podSandbox.ID) + } + } + + return +} + +// isHostNetwork checks whether the pod is running in host-network mode. +func (m *kubeGenericRuntimeManager) isHostNetwork(podSandBoxID string, pod *api.Pod) (bool, error) { + if pod != nil { + return kubecontainer.IsHostNetworkPod(pod), nil + } + + podStatus, err := m.runtimeService.PodSandboxStatus(podSandBoxID) + if err != nil { + return false, err + } + + if podStatus.Linux != nil && podStatus.Linux.Namespaces != nil && podStatus.Linux.Namespaces.Options != nil { + if podStatus.Linux.Namespaces.Options.HostNetwork != nil { + return podStatus.Linux.Namespaces.Options.GetHostNetwork(), nil + } + } + + return false, nil } // GetPodStatus retrieves the status of the pod, including the diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go index 33003652550..0672dfa675c 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager_test.go @@ -321,3 +321,75 @@ func TestGetNetNS(t *testing.T) { assert.Equal(t, "", actual) assert.Equal(t, "not supported", err.Error()) } + +func TestKillPod(t *testing.T) { + fakeRuntime, _, m, err := createTestRuntimeManager() + assert.NoError(t, err) + + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "foo", + Namespace: "new", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "foo1", + Image: "busybox", + }, + { + Name: "foo2", + Image: "busybox", + }, + }, + }, + } + + // Set fake sandbox and fake containers to fakeRuntime. + fakeSandbox, fakeContainers, err := makeAndSetFakePod(m, fakeRuntime, pod) + assert.NoError(t, err) + + // Convert the fakeContainers to kubecontainer.Container + containers := make([]*kubecontainer.Container, len(fakeContainers)) + for i := range containers { + fakeContainer := fakeContainers[i] + c, err := m.toKubeContainer(&runtimeApi.Container{ + Id: fakeContainer.Id, + Metadata: fakeContainer.Metadata, + State: fakeContainer.State, + Image: fakeContainer.Image, + ImageRef: fakeContainer.ImageRef, + Labels: fakeContainer.Labels, + }) + if err != nil { + t.Fatalf("unexpected error %v", err) + } + containers[i] = c + } + runningPod := kubecontainer.Pod{ + ID: pod.UID, + Name: pod.Name, + Namespace: pod.Namespace, + Containers: []*kubecontainer.Container{containers[0], containers[1]}, + Sandboxes: []*kubecontainer.Container{ + { + ID: kubecontainer.ContainerID{ + ID: fakeSandbox.GetId(), + Type: apitest.FakeRuntimeName, + }, + }, + }, + } + + err = m.KillPod(pod, runningPod, nil) + assert.NoError(t, err) + assert.Equal(t, 2, len(fakeRuntime.Containers)) + assert.Equal(t, 1, len(fakeRuntime.Sandboxes)) + for _, sandbox := range fakeRuntime.Sandboxes { + assert.Equal(t, runtimeApi.PodSandBoxState_NOTREADY, sandbox.GetState()) + } + for _, c := range fakeRuntime.Containers { + assert.Equal(t, runtimeApi.ContainerState_EXITED, c.GetState()) + } +} diff --git a/pkg/kubelet/network/plugins.go b/pkg/kubelet/network/plugins.go index 58fbf396122..c5e6ae26c28 100644 --- a/pkg/kubelet/network/plugins.go +++ b/pkg/kubelet/network/plugins.go @@ -68,12 +68,15 @@ type NetworkPlugin interface { // SetUpPod is the method called after the infra container of // the pod has been created but before the other containers of the // pod are launched. + // TODO: rename podInfraContainerID to sandboxID SetUpPod(namespace string, name string, podInfraContainerID kubecontainer.ContainerID) error // TearDownPod is the method called before a pod's infra container will be deleted + // TODO: rename podInfraContainerID to sandboxID TearDownPod(namespace string, name string, podInfraContainerID kubecontainer.ContainerID) error // Status is the method called to obtain the ipv4 or ipv6 addresses of the container + // TODO: rename podInfraContainerID to sandboxID GetPodNetworkStatus(namespace string, name string, podInfraContainerID kubecontainer.ContainerID) (*PodNetworkStatus, error) // NetworkStatus returns error if the network plugin is in error state