mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 21:17:23 +00:00
rkt: Add pre-stop lifecycle hooks for rkt.
When a pod is being terminated, the pre-stop hooks of all the containers will be run before the containers are stopped.
This commit is contained in:
parent
f9ffeb8244
commit
cc4336829d
@ -424,6 +424,7 @@ func NewMainKubelet(
|
|||||||
containerRefManager,
|
containerRefManager,
|
||||||
klet.livenessManager,
|
klet.livenessManager,
|
||||||
klet.volumeManager,
|
klet.volumeManager,
|
||||||
|
klet.httpClient,
|
||||||
imageBackOff,
|
imageBackOff,
|
||||||
serializeImagePulls,
|
serializeImagePulls,
|
||||||
)
|
)
|
||||||
|
62
pkg/kubelet/lifecycle/fake_handler_runner.go
Normal file
62
pkg/kubelet/lifecycle/fake_handler_runner.go
Normal file
@ -0,0 +1,62 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package lifecycle
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FakeHandlerRunner struct {
|
||||||
|
sync.Mutex
|
||||||
|
HandlerRuns []string
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFakeHandlerRunner() *FakeHandlerRunner {
|
||||||
|
return &FakeHandlerRunner{HandlerRuns: []string{}}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hr *FakeHandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container, handler *api.Handler) error {
|
||||||
|
hr.Lock()
|
||||||
|
defer hr.Unlock()
|
||||||
|
|
||||||
|
if hr.Err != nil {
|
||||||
|
return hr.Err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case handler.Exec != nil:
|
||||||
|
hr.HandlerRuns = append(hr.HandlerRuns, fmt.Sprintf("exec on pod: %v, container: %v: %v", format.Pod(pod), container.Name, containerID.String()))
|
||||||
|
case handler.HTTPGet != nil:
|
||||||
|
hr.HandlerRuns = append(hr.HandlerRuns, fmt.Sprintf("http-get on pod: %v, container: %v: %v", format.Pod(pod), container.Name, containerID.String()))
|
||||||
|
case handler.TCPSocket != nil:
|
||||||
|
hr.HandlerRuns = append(hr.HandlerRuns, fmt.Sprintf("tcp-socket on pod: %v, container: %v: %v", format.Pod(pod), container.Name, containerID.String()))
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("Invalid handler: %v", handler)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (hr *FakeHandlerRunner) Reset() {
|
||||||
|
hr.HandlerRuns = []string{}
|
||||||
|
hr.Err = nil
|
||||||
|
}
|
@ -27,6 +27,7 @@ import (
|
|||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@ -41,11 +42,14 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/client/record"
|
"k8s.io/kubernetes/pkg/client/record"
|
||||||
"k8s.io/kubernetes/pkg/credentialprovider"
|
"k8s.io/kubernetes/pkg/credentialprovider"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||||
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||||
"k8s.io/kubernetes/pkg/securitycontext"
|
"k8s.io/kubernetes/pkg/securitycontext"
|
||||||
"k8s.io/kubernetes/pkg/types"
|
"k8s.io/kubernetes/pkg/types"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
|
"k8s.io/kubernetes/pkg/util/errors"
|
||||||
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
||||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||||
"k8s.io/kubernetes/pkg/util/sets"
|
"k8s.io/kubernetes/pkg/util/sets"
|
||||||
@ -100,6 +104,9 @@ const (
|
|||||||
// TODO(yifan): Import them from docker2aci. See https://github.com/appc/docker2aci/issues/133.
|
// TODO(yifan): Import them from docker2aci. See https://github.com/appc/docker2aci/issues/133.
|
||||||
appcDockerEntrypoint = "appc.io/docker/entrypoint"
|
appcDockerEntrypoint = "appc.io/docker/entrypoint"
|
||||||
appcDockerCmd = "appc.io/docker/cmd"
|
appcDockerCmd = "appc.io/docker/cmd"
|
||||||
|
|
||||||
|
// TODO(yifan): Reuse this const with Docker runtime.
|
||||||
|
minimumGracePeriodInSeconds = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
// Runtime implements the Containerruntime for rkt. The implementation
|
// Runtime implements the Containerruntime for rkt. The implementation
|
||||||
@ -120,6 +127,7 @@ type Runtime struct {
|
|||||||
livenessManager proberesults.Manager
|
livenessManager proberesults.Manager
|
||||||
volumeGetter volumeGetter
|
volumeGetter volumeGetter
|
||||||
imagePuller kubecontainer.ImagePuller
|
imagePuller kubecontainer.ImagePuller
|
||||||
|
runner kubecontainer.HandlerRunner
|
||||||
|
|
||||||
versions versions
|
versions versions
|
||||||
}
|
}
|
||||||
@ -142,6 +150,7 @@ func New(
|
|||||||
containerRefManager *kubecontainer.RefManager,
|
containerRefManager *kubecontainer.RefManager,
|
||||||
livenessManager proberesults.Manager,
|
livenessManager proberesults.Manager,
|
||||||
volumeGetter volumeGetter,
|
volumeGetter volumeGetter,
|
||||||
|
httpClient kubetypes.HttpGetter,
|
||||||
imageBackOff *flowcontrol.Backoff,
|
imageBackOff *flowcontrol.Backoff,
|
||||||
serializeImagePulls bool,
|
serializeImagePulls bool,
|
||||||
) (*Runtime, error) {
|
) (*Runtime, error) {
|
||||||
@ -185,6 +194,8 @@ func New(
|
|||||||
return nil, fmt.Errorf("rkt: cannot get config from rkt api service: %v", err)
|
return nil, fmt.Errorf("rkt: cannot get config from rkt api service: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
rkt.runner = lifecycle.NewHandlerRunner(httpClient, rkt, rkt)
|
||||||
|
|
||||||
if serializeImagePulls {
|
if serializeImagePulls {
|
||||||
rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff)
|
rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff)
|
||||||
} else {
|
} else {
|
||||||
@ -944,10 +955,44 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
r.generateEvents(runtimePod, "Started", nil)
|
r.generateEvents(runtimePod, "Started", nil)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Runtime) runPreStopHook(pod *api.Pod, runtimePod *kubecontainer.Pod) error {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var errlist []error
|
||||||
|
errCh := make(chan error, len(pod.Spec.Containers))
|
||||||
|
|
||||||
|
wg.Add(len(pod.Spec.Containers))
|
||||||
|
|
||||||
|
for i, c := range pod.Spec.Containers {
|
||||||
|
if c.Lifecycle == nil || c.Lifecycle.PreStop == nil {
|
||||||
|
wg.Done()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
hook := c.Lifecycle.PreStop
|
||||||
|
containerID := runtimePod.Containers[i].ID
|
||||||
|
container := &pod.Spec.Containers[i]
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
if err := r.runner.Run(containerID, pod, container, hook); err != nil {
|
||||||
|
glog.Errorf("rkt: Failed to run pre-stop hook for container %q of pod %q: %v", container.Name, format.Pod(pod), err)
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
close(errCh)
|
||||||
|
|
||||||
|
for err := range errCh {
|
||||||
|
errlist = append(errlist, err)
|
||||||
|
}
|
||||||
|
return errors.NewAggregate(errlist)
|
||||||
|
}
|
||||||
|
|
||||||
// convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod
|
// convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod
|
||||||
func (r *Runtime) convertRktPod(rktpod *rktapi.Pod) (*kubecontainer.Pod, error) {
|
func (r *Runtime) convertRktPod(rktpod *rktapi.Pod) (*kubecontainer.Pod, error) {
|
||||||
manifest := &appcschema.PodManifest{}
|
manifest := &appcschema.PodManifest{}
|
||||||
@ -1066,11 +1111,48 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Runtime) waitPreStopHooks(pod *api.Pod, runningPod *kubecontainer.Pod) {
|
||||||
|
gracePeriod := int64(minimumGracePeriodInSeconds)
|
||||||
|
switch {
|
||||||
|
case pod.DeletionGracePeriodSeconds != nil:
|
||||||
|
gracePeriod = *pod.DeletionGracePeriodSeconds
|
||||||
|
case pod.Spec.TerminationGracePeriodSeconds != nil:
|
||||||
|
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
|
||||||
|
}
|
||||||
|
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
go func() {
|
||||||
|
if err := r.runPreStopHook(pod, runningPod); err != nil {
|
||||||
|
errCh <- err
|
||||||
|
}
|
||||||
|
close(errCh)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Duration(gracePeriod) * time.Second):
|
||||||
|
glog.V(2).Infof("rkt: Some pre-stop hooks did not complete in %d seconds for pod %v", gracePeriod, format.Pod(pod))
|
||||||
|
case err := <-errCh:
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("rkt: Some pre-stop hooks failed for pod %v: %v", format.Pod(pod), err)
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("rkt: pre-stop hooks for pod %v completed", format.Pod(pod))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
|
// KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
|
||||||
// TODO(yifan): Handle network plugin.
|
|
||||||
func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
|
func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
|
||||||
glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name)
|
glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name)
|
||||||
|
|
||||||
|
if len(runningPod.Containers) == 0 {
|
||||||
|
glog.V(4).Infof("rkt: Pod %q is already being killed, no action will be taken", runningPod.Name)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if pod != nil {
|
||||||
|
r.waitPreStopHooks(pod, &runningPod)
|
||||||
|
}
|
||||||
|
|
||||||
serviceName := makePodServiceFileName(runningPod.ID)
|
serviceName := makePodServiceFileName(runningPod.ID)
|
||||||
r.generateEvents(&runningPod, "Killing", nil)
|
r.generateEvents(&runningPod, "Killing", nil)
|
||||||
for _, c := range runningPod.Containers {
|
for _, c := range runningPod.Containers {
|
||||||
|
@ -31,6 +31,8 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/resource"
|
"k8s.io/kubernetes/pkg/api/resource"
|
||||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||||
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||||
|
"k8s.io/kubernetes/pkg/util/errors"
|
||||||
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -817,6 +819,12 @@ func sortAppFields(app *appctypes.App) {
|
|||||||
sort.Sort(isolatorsByName(app.Isolators))
|
sort.Sort(isolatorsByName(app.Isolators))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type sortedStringList []string
|
||||||
|
|
||||||
|
func (s sortedStringList) Len() int { return len(s) }
|
||||||
|
func (s sortedStringList) Less(i, j int) bool { return s[i] < s[j] }
|
||||||
|
func (s sortedStringList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||||
|
|
||||||
func TestSetApp(t *testing.T) {
|
func TestSetApp(t *testing.T) {
|
||||||
tmpDir, err := utiltesting.MkTmpdir("rkt_test")
|
tmpDir, err := utiltesting.MkTmpdir("rkt_test")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1149,3 +1157,129 @@ func TestGenerateRunCommand(t *testing.T) {
|
|||||||
assert.Equal(t, tt.expect, result, testCaseHint)
|
assert.Equal(t, tt.expect, result, testCaseHint)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPreStopHooks(t *testing.T) {
|
||||||
|
runner := lifecycle.NewFakeHandlerRunner()
|
||||||
|
fr := newFakeRktInterface()
|
||||||
|
fs := newFakeSystemd()
|
||||||
|
|
||||||
|
rkt := &Runtime{
|
||||||
|
runner: runner,
|
||||||
|
apisvc: fr,
|
||||||
|
systemd: fs,
|
||||||
|
containerRefManager: kubecontainer.NewRefManager(),
|
||||||
|
}
|
||||||
|
|
||||||
|
tests := []struct {
|
||||||
|
pod *api.Pod
|
||||||
|
runtimePod *kubecontainer.Pod
|
||||||
|
preStopRuns []string
|
||||||
|
err error
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
// Case 0, container without any hooks.
|
||||||
|
&api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "pod-1",
|
||||||
|
Namespace: "ns-1",
|
||||||
|
UID: "uid-1",
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{Name: "container-name-1"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&kubecontainer.Pod{
|
||||||
|
Containers: []*kubecontainer.Container{
|
||||||
|
{ID: kubecontainer.BuildContainerID("rkt", "id-1")},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
[]string{},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Case 1, containers with pre-stop hook.
|
||||||
|
&api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "pod-1",
|
||||||
|
Namespace: "ns-1",
|
||||||
|
UID: "uid-1",
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{
|
||||||
|
Name: "container-name-1",
|
||||||
|
Lifecycle: &api.Lifecycle{
|
||||||
|
PreStop: &api.Handler{
|
||||||
|
Exec: &api.ExecAction{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Name: "container-name-2",
|
||||||
|
Lifecycle: &api.Lifecycle{
|
||||||
|
PreStop: &api.Handler{
|
||||||
|
HTTPGet: &api.HTTPGetAction{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&kubecontainer.Pod{
|
||||||
|
Containers: []*kubecontainer.Container{
|
||||||
|
{ID: kubecontainer.BuildContainerID("rkt", "id-1")},
|
||||||
|
{ID: kubecontainer.BuildContainerID("rkt", "id-2")},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
[]string{
|
||||||
|
"exec on pod: pod-1_ns-1(uid-1), container: container-name-1: rkt://id-1",
|
||||||
|
"http-get on pod: pod-1_ns-1(uid-1), container: container-name-2: rkt://id-2",
|
||||||
|
},
|
||||||
|
nil,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Case 2, one container with invalid hooks.
|
||||||
|
&api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "pod-1",
|
||||||
|
Namespace: "ns-1",
|
||||||
|
UID: "uid-1",
|
||||||
|
},
|
||||||
|
Spec: api.PodSpec{
|
||||||
|
Containers: []api.Container{
|
||||||
|
{
|
||||||
|
Name: "container-name-1",
|
||||||
|
Lifecycle: &api.Lifecycle{
|
||||||
|
PreStop: &api.Handler{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
&kubecontainer.Pod{
|
||||||
|
Containers: []*kubecontainer.Container{
|
||||||
|
{ID: kubecontainer.BuildContainerID("rkt", "id-1")},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
[]string{},
|
||||||
|
errors.NewAggregate([]error{fmt.Errorf("Invalid handler: %v", &api.Handler{})}),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, tt := range tests {
|
||||||
|
testCaseHint := fmt.Sprintf("test case #%d", i)
|
||||||
|
|
||||||
|
// Run pre-stop hooks.
|
||||||
|
err := rkt.runPreStopHook(tt.pod, tt.runtimePod)
|
||||||
|
assert.Equal(t, tt.err, err, testCaseHint)
|
||||||
|
|
||||||
|
sort.Sort(sortedStringList(tt.preStopRuns))
|
||||||
|
sort.Sort(sortedStringList(runner.HandlerRuns))
|
||||||
|
|
||||||
|
assert.Equal(t, tt.preStopRuns, runner.HandlerRuns, testCaseHint)
|
||||||
|
|
||||||
|
runner.Reset()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user