From cc4336829d1a480e285ea76430a9bf2554141d80 Mon Sep 17 00:00:00 2001 From: Yifan Gu Date: Fri, 4 Mar 2016 14:52:45 -0800 Subject: [PATCH] rkt: Add pre-stop lifecycle hooks for rkt. When a pod is being terminated, the pre-stop hooks of all the containers will be run before the containers are stopped. --- pkg/kubelet/kubelet.go | 1 + pkg/kubelet/lifecycle/fake_handler_runner.go | 62 +++++++++ pkg/kubelet/rkt/rkt.go | 86 +++++++++++- pkg/kubelet/rkt/rkt_test.go | 134 +++++++++++++++++++ 4 files changed, 281 insertions(+), 2 deletions(-) create mode 100644 pkg/kubelet/lifecycle/fake_handler_runner.go diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index a03afe0d1ab..3d5815f4186 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -424,6 +424,7 @@ func NewMainKubelet( containerRefManager, klet.livenessManager, klet.volumeManager, + klet.httpClient, imageBackOff, serializeImagePulls, ) diff --git a/pkg/kubelet/lifecycle/fake_handler_runner.go b/pkg/kubelet/lifecycle/fake_handler_runner.go new file mode 100644 index 00000000000..90304632e65 --- /dev/null +++ b/pkg/kubelet/lifecycle/fake_handler_runner.go @@ -0,0 +1,62 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lifecycle + +import ( + "fmt" + "sync" + + "k8s.io/kubernetes/pkg/api" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/util/format" +) + +type FakeHandlerRunner struct { + sync.Mutex + HandlerRuns []string + Err error +} + +func NewFakeHandlerRunner() *FakeHandlerRunner { + return &FakeHandlerRunner{HandlerRuns: []string{}} +} + +func (hr *FakeHandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container, handler *api.Handler) error { + hr.Lock() + defer hr.Unlock() + + if hr.Err != nil { + return hr.Err + } + + switch { + case handler.Exec != nil: + hr.HandlerRuns = append(hr.HandlerRuns, fmt.Sprintf("exec on pod: %v, container: %v: %v", format.Pod(pod), container.Name, containerID.String())) + case handler.HTTPGet != nil: + hr.HandlerRuns = append(hr.HandlerRuns, fmt.Sprintf("http-get on pod: %v, container: %v: %v", format.Pod(pod), container.Name, containerID.String())) + case handler.TCPSocket != nil: + hr.HandlerRuns = append(hr.HandlerRuns, fmt.Sprintf("tcp-socket on pod: %v, container: %v: %v", format.Pod(pod), container.Name, containerID.String())) + default: + return fmt.Errorf("Invalid handler: %v", handler) + } + return nil +} + +func (hr *FakeHandlerRunner) Reset() { + hr.HandlerRuns = []string{} + hr.Err = nil +} diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 1229491e486..4934acab146 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -27,6 +27,7 @@ import ( "path" "strconv" "strings" + "sync" "syscall" "time" @@ -41,11 +42,14 @@ import ( "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/credentialprovider" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/format" "k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/errors" utilexec "k8s.io/kubernetes/pkg/util/exec" "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/sets" @@ -100,6 +104,9 @@ const ( // TODO(yifan): Import them from docker2aci. See https://github.com/appc/docker2aci/issues/133. appcDockerEntrypoint = "appc.io/docker/entrypoint" appcDockerCmd = "appc.io/docker/cmd" + + // TODO(yifan): Reuse this const with Docker runtime. + minimumGracePeriodInSeconds = 2 ) // Runtime implements the Containerruntime for rkt. The implementation @@ -120,6 +127,7 @@ type Runtime struct { livenessManager proberesults.Manager volumeGetter volumeGetter imagePuller kubecontainer.ImagePuller + runner kubecontainer.HandlerRunner versions versions } @@ -142,6 +150,7 @@ func New( containerRefManager *kubecontainer.RefManager, livenessManager proberesults.Manager, volumeGetter volumeGetter, + httpClient kubetypes.HttpGetter, imageBackOff *flowcontrol.Backoff, serializeImagePulls bool, ) (*Runtime, error) { @@ -185,6 +194,8 @@ func New( return nil, fmt.Errorf("rkt: cannot get config from rkt api service: %v", err) } + rkt.runner = lifecycle.NewHandlerRunner(httpClient, rkt, rkt) + if serializeImagePulls { rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff) } else { @@ -944,10 +955,44 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { } r.generateEvents(runtimePod, "Started", nil) - return nil } +func (r *Runtime) runPreStopHook(pod *api.Pod, runtimePod *kubecontainer.Pod) error { + var wg sync.WaitGroup + var errlist []error + errCh := make(chan error, len(pod.Spec.Containers)) + + wg.Add(len(pod.Spec.Containers)) + + for i, c := range pod.Spec.Containers { + if c.Lifecycle == nil || c.Lifecycle.PreStop == nil { + wg.Done() + continue + } + + hook := c.Lifecycle.PreStop + containerID := runtimePod.Containers[i].ID + container := &pod.Spec.Containers[i] + + go func() { + if err := r.runner.Run(containerID, pod, container, hook); err != nil { + glog.Errorf("rkt: Failed to run pre-stop hook for container %q of pod %q: %v", container.Name, format.Pod(pod), err) + errCh <- err + } + wg.Done() + }() + } + + wg.Wait() + close(errCh) + + for err := range errCh { + errlist = append(errlist, err) + } + return errors.NewAggregate(errlist) +} + // convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod func (r *Runtime) convertRktPod(rktpod *rktapi.Pod) (*kubecontainer.Pod, error) { manifest := &appcschema.PodManifest{} @@ -1066,11 +1111,48 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) { return result, nil } +func (r *Runtime) waitPreStopHooks(pod *api.Pod, runningPod *kubecontainer.Pod) { + gracePeriod := int64(minimumGracePeriodInSeconds) + switch { + case pod.DeletionGracePeriodSeconds != nil: + gracePeriod = *pod.DeletionGracePeriodSeconds + case pod.Spec.TerminationGracePeriodSeconds != nil: + gracePeriod = *pod.Spec.TerminationGracePeriodSeconds + } + + errCh := make(chan error, 1) + go func() { + if err := r.runPreStopHook(pod, runningPod); err != nil { + errCh <- err + } + close(errCh) + }() + + select { + case <-time.After(time.Duration(gracePeriod) * time.Second): + glog.V(2).Infof("rkt: Some pre-stop hooks did not complete in %d seconds for pod %v", gracePeriod, format.Pod(pod)) + case err := <-errCh: + if err != nil { + glog.Errorf("rkt: Some pre-stop hooks failed for pod %v: %v", format.Pod(pod), err) + } else { + glog.V(4).Infof("rkt: pre-stop hooks for pod %v completed", format.Pod(pod)) + } + } +} + // KillPod invokes 'systemctl kill' to kill the unit that runs the pod. -// TODO(yifan): Handle network plugin. func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error { glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name) + if len(runningPod.Containers) == 0 { + glog.V(4).Infof("rkt: Pod %q is already being killed, no action will be taken", runningPod.Name) + return nil + } + + if pod != nil { + r.waitPreStopHooks(pod, &runningPod) + } + serviceName := makePodServiceFileName(runningPod.ID) r.generateEvents(&runningPod, "Killing", nil) for _, c := range runningPod.Containers { diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index bad8d5ea365..33d9174d478 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -31,6 +31,8 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/util/errors" utiltesting "k8s.io/kubernetes/pkg/util/testing" ) @@ -817,6 +819,12 @@ func sortAppFields(app *appctypes.App) { sort.Sort(isolatorsByName(app.Isolators)) } +type sortedStringList []string + +func (s sortedStringList) Len() int { return len(s) } +func (s sortedStringList) Less(i, j int) bool { return s[i] < s[j] } +func (s sortedStringList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + func TestSetApp(t *testing.T) { tmpDir, err := utiltesting.MkTmpdir("rkt_test") if err != nil { @@ -1149,3 +1157,129 @@ func TestGenerateRunCommand(t *testing.T) { assert.Equal(t, tt.expect, result, testCaseHint) } } + +func TestPreStopHooks(t *testing.T) { + runner := lifecycle.NewFakeHandlerRunner() + fr := newFakeRktInterface() + fs := newFakeSystemd() + + rkt := &Runtime{ + runner: runner, + apisvc: fr, + systemd: fs, + containerRefManager: kubecontainer.NewRefManager(), + } + + tests := []struct { + pod *api.Pod + runtimePod *kubecontainer.Pod + preStopRuns []string + err error + }{ + { + // Case 0, container without any hooks. + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + UID: "uid-1", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + {Name: "container-name-1"}, + }, + }, + }, + &kubecontainer.Pod{ + Containers: []*kubecontainer.Container{ + {ID: kubecontainer.BuildContainerID("rkt", "id-1")}, + }, + }, + []string{}, + nil, + }, + { + // Case 1, containers with pre-stop hook. + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + UID: "uid-1", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "container-name-1", + Lifecycle: &api.Lifecycle{ + PreStop: &api.Handler{ + Exec: &api.ExecAction{}, + }, + }, + }, + { + Name: "container-name-2", + Lifecycle: &api.Lifecycle{ + PreStop: &api.Handler{ + HTTPGet: &api.HTTPGetAction{}, + }, + }, + }, + }, + }, + }, + &kubecontainer.Pod{ + Containers: []*kubecontainer.Container{ + {ID: kubecontainer.BuildContainerID("rkt", "id-1")}, + {ID: kubecontainer.BuildContainerID("rkt", "id-2")}, + }, + }, + []string{ + "exec on pod: pod-1_ns-1(uid-1), container: container-name-1: rkt://id-1", + "http-get on pod: pod-1_ns-1(uid-1), container: container-name-2: rkt://id-2", + }, + nil, + }, + { + // Case 2, one container with invalid hooks. + &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod-1", + Namespace: "ns-1", + UID: "uid-1", + }, + Spec: api.PodSpec{ + Containers: []api.Container{ + { + Name: "container-name-1", + Lifecycle: &api.Lifecycle{ + PreStop: &api.Handler{}, + }, + }, + }, + }, + }, + &kubecontainer.Pod{ + Containers: []*kubecontainer.Container{ + {ID: kubecontainer.BuildContainerID("rkt", "id-1")}, + }, + }, + []string{}, + errors.NewAggregate([]error{fmt.Errorf("Invalid handler: %v", &api.Handler{})}), + }, + } + + for i, tt := range tests { + testCaseHint := fmt.Sprintf("test case #%d", i) + + // Run pre-stop hooks. + err := rkt.runPreStopHook(tt.pod, tt.runtimePod) + assert.Equal(t, tt.err, err, testCaseHint) + + sort.Sort(sortedStringList(tt.preStopRuns)) + sort.Sort(sortedStringList(runner.HandlerRuns)) + + assert.Equal(t, tt.preStopRuns, runner.HandlerRuns, testCaseHint) + + runner.Reset() + } +}