mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #23532 from yifan-gu/lifecycle
Automatic merge from submit-queue rkt: Add pre-stop lifecycle hooks for rkt. When a pod is being terminated, the pre-stop hooks of all the containers will be run before the containers are stopped. cc @yujuhong @Random-Liu @sjpotter
This commit is contained in:
commit
9c0a0833b3
@ -425,6 +425,7 @@ func NewMainKubelet(
|
||||
containerRefManager,
|
||||
klet.livenessManager,
|
||||
klet.volumeManager,
|
||||
klet.httpClient,
|
||||
imageBackOff,
|
||||
serializeImagePulls,
|
||||
)
|
||||
|
62
pkg/kubelet/lifecycle/fake_handler_runner.go
Normal file
62
pkg/kubelet/lifecycle/fake_handler_runner.go
Normal file
@ -0,0 +1,62 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package lifecycle
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
)
|
||||
|
||||
type FakeHandlerRunner struct {
|
||||
sync.Mutex
|
||||
HandlerRuns []string
|
||||
Err error
|
||||
}
|
||||
|
||||
func NewFakeHandlerRunner() *FakeHandlerRunner {
|
||||
return &FakeHandlerRunner{HandlerRuns: []string{}}
|
||||
}
|
||||
|
||||
func (hr *FakeHandlerRunner) Run(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container, handler *api.Handler) error {
|
||||
hr.Lock()
|
||||
defer hr.Unlock()
|
||||
|
||||
if hr.Err != nil {
|
||||
return hr.Err
|
||||
}
|
||||
|
||||
switch {
|
||||
case handler.Exec != nil:
|
||||
hr.HandlerRuns = append(hr.HandlerRuns, fmt.Sprintf("exec on pod: %v, container: %v: %v", format.Pod(pod), container.Name, containerID.String()))
|
||||
case handler.HTTPGet != nil:
|
||||
hr.HandlerRuns = append(hr.HandlerRuns, fmt.Sprintf("http-get on pod: %v, container: %v: %v", format.Pod(pod), container.Name, containerID.String()))
|
||||
case handler.TCPSocket != nil:
|
||||
hr.HandlerRuns = append(hr.HandlerRuns, fmt.Sprintf("tcp-socket on pod: %v, container: %v: %v", format.Pod(pod), container.Name, containerID.String()))
|
||||
default:
|
||||
return fmt.Errorf("Invalid handler: %v", handler)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (hr *FakeHandlerRunner) Reset() {
|
||||
hr.HandlerRuns = []string{}
|
||||
hr.Err = nil
|
||||
}
|
@ -27,6 +27,7 @@ import (
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@ -41,11 +42,14 @@ import (
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
"k8s.io/kubernetes/pkg/credentialprovider"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
|
||||
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
"k8s.io/kubernetes/pkg/securitycontext"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/errors"
|
||||
utilexec "k8s.io/kubernetes/pkg/util/exec"
|
||||
"k8s.io/kubernetes/pkg/util/flowcontrol"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
@ -100,6 +104,9 @@ const (
|
||||
// TODO(yifan): Import them from docker2aci. See https://github.com/appc/docker2aci/issues/133.
|
||||
appcDockerEntrypoint = "appc.io/docker/entrypoint"
|
||||
appcDockerCmd = "appc.io/docker/cmd"
|
||||
|
||||
// TODO(yifan): Reuse this const with Docker runtime.
|
||||
minimumGracePeriodInSeconds = 2
|
||||
)
|
||||
|
||||
// Runtime implements the Containerruntime for rkt. The implementation
|
||||
@ -120,6 +127,7 @@ type Runtime struct {
|
||||
livenessManager proberesults.Manager
|
||||
volumeGetter volumeGetter
|
||||
imagePuller kubecontainer.ImagePuller
|
||||
runner kubecontainer.HandlerRunner
|
||||
|
||||
versions versions
|
||||
}
|
||||
@ -142,6 +150,7 @@ func New(
|
||||
containerRefManager *kubecontainer.RefManager,
|
||||
livenessManager proberesults.Manager,
|
||||
volumeGetter volumeGetter,
|
||||
httpClient kubetypes.HttpGetter,
|
||||
imageBackOff *flowcontrol.Backoff,
|
||||
serializeImagePulls bool,
|
||||
) (*Runtime, error) {
|
||||
@ -185,6 +194,8 @@ func New(
|
||||
return nil, fmt.Errorf("rkt: cannot get config from rkt api service: %v", err)
|
||||
}
|
||||
|
||||
rkt.runner = lifecycle.NewHandlerRunner(httpClient, rkt, rkt)
|
||||
|
||||
if serializeImagePulls {
|
||||
rkt.imagePuller = kubecontainer.NewSerializedImagePuller(recorder, rkt, imageBackOff)
|
||||
} else {
|
||||
@ -944,10 +955,44 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
|
||||
}
|
||||
|
||||
r.generateEvents(runtimePod, "Started", nil)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Runtime) runPreStopHook(pod *api.Pod, runtimePod *kubecontainer.Pod) error {
|
||||
var wg sync.WaitGroup
|
||||
var errlist []error
|
||||
errCh := make(chan error, len(pod.Spec.Containers))
|
||||
|
||||
wg.Add(len(pod.Spec.Containers))
|
||||
|
||||
for i, c := range pod.Spec.Containers {
|
||||
if c.Lifecycle == nil || c.Lifecycle.PreStop == nil {
|
||||
wg.Done()
|
||||
continue
|
||||
}
|
||||
|
||||
hook := c.Lifecycle.PreStop
|
||||
containerID := runtimePod.Containers[i].ID
|
||||
container := &pod.Spec.Containers[i]
|
||||
|
||||
go func() {
|
||||
if err := r.runner.Run(containerID, pod, container, hook); err != nil {
|
||||
glog.Errorf("rkt: Failed to run pre-stop hook for container %q of pod %q: %v", container.Name, format.Pod(pod), err)
|
||||
errCh <- err
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
for err := range errCh {
|
||||
errlist = append(errlist, err)
|
||||
}
|
||||
return errors.NewAggregate(errlist)
|
||||
}
|
||||
|
||||
// convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod
|
||||
func (r *Runtime) convertRktPod(rktpod *rktapi.Pod) (*kubecontainer.Pod, error) {
|
||||
manifest := &appcschema.PodManifest{}
|
||||
@ -1066,11 +1111,48 @@ func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (r *Runtime) waitPreStopHooks(pod *api.Pod, runningPod *kubecontainer.Pod) {
|
||||
gracePeriod := int64(minimumGracePeriodInSeconds)
|
||||
switch {
|
||||
case pod.DeletionGracePeriodSeconds != nil:
|
||||
gracePeriod = *pod.DeletionGracePeriodSeconds
|
||||
case pod.Spec.TerminationGracePeriodSeconds != nil:
|
||||
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
|
||||
}
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
if err := r.runPreStopHook(pod, runningPod); err != nil {
|
||||
errCh <- err
|
||||
}
|
||||
close(errCh)
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-time.After(time.Duration(gracePeriod) * time.Second):
|
||||
glog.V(2).Infof("rkt: Some pre-stop hooks did not complete in %d seconds for pod %v", gracePeriod, format.Pod(pod))
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
glog.Errorf("rkt: Some pre-stop hooks failed for pod %v: %v", format.Pod(pod), err)
|
||||
} else {
|
||||
glog.V(4).Infof("rkt: pre-stop hooks for pod %v completed", format.Pod(pod))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// KillPod invokes 'systemctl kill' to kill the unit that runs the pod.
|
||||
// TODO(yifan): Handle network plugin.
|
||||
func (r *Runtime) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
|
||||
glog.V(4).Infof("Rkt is killing pod: name %q.", runningPod.Name)
|
||||
|
||||
if len(runningPod.Containers) == 0 {
|
||||
glog.V(4).Infof("rkt: Pod %q is already being killed, no action will be taken", runningPod.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
if pod != nil {
|
||||
r.waitPreStopHooks(pod, &runningPod)
|
||||
}
|
||||
|
||||
serviceName := makePodServiceFileName(runningPod.ID)
|
||||
r.generateEvents(&runningPod, "Killing", nil)
|
||||
for _, c := range runningPod.Containers {
|
||||
|
@ -31,6 +31,8 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/pkg/util/errors"
|
||||
utiltesting "k8s.io/kubernetes/pkg/util/testing"
|
||||
)
|
||||
|
||||
@ -817,6 +819,12 @@ func sortAppFields(app *appctypes.App) {
|
||||
sort.Sort(isolatorsByName(app.Isolators))
|
||||
}
|
||||
|
||||
type sortedStringList []string
|
||||
|
||||
func (s sortedStringList) Len() int { return len(s) }
|
||||
func (s sortedStringList) Less(i, j int) bool { return s[i] < s[j] }
|
||||
func (s sortedStringList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||
|
||||
func TestSetApp(t *testing.T) {
|
||||
tmpDir, err := utiltesting.MkTmpdir("rkt_test")
|
||||
if err != nil {
|
||||
@ -1149,3 +1157,129 @@ func TestGenerateRunCommand(t *testing.T) {
|
||||
assert.Equal(t, tt.expect, result, testCaseHint)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPreStopHooks(t *testing.T) {
|
||||
runner := lifecycle.NewFakeHandlerRunner()
|
||||
fr := newFakeRktInterface()
|
||||
fs := newFakeSystemd()
|
||||
|
||||
rkt := &Runtime{
|
||||
runner: runner,
|
||||
apisvc: fr,
|
||||
systemd: fs,
|
||||
containerRefManager: kubecontainer.NewRefManager(),
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
pod *api.Pod
|
||||
runtimePod *kubecontainer.Pod
|
||||
preStopRuns []string
|
||||
err error
|
||||
}{
|
||||
{
|
||||
// Case 0, container without any hooks.
|
||||
&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "pod-1",
|
||||
Namespace: "ns-1",
|
||||
UID: "uid-1",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{Name: "container-name-1"},
|
||||
},
|
||||
},
|
||||
},
|
||||
&kubecontainer.Pod{
|
||||
Containers: []*kubecontainer.Container{
|
||||
{ID: kubecontainer.BuildContainerID("rkt", "id-1")},
|
||||
},
|
||||
},
|
||||
[]string{},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
// Case 1, containers with pre-stop hook.
|
||||
&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "pod-1",
|
||||
Namespace: "ns-1",
|
||||
UID: "uid-1",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "container-name-1",
|
||||
Lifecycle: &api.Lifecycle{
|
||||
PreStop: &api.Handler{
|
||||
Exec: &api.ExecAction{},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "container-name-2",
|
||||
Lifecycle: &api.Lifecycle{
|
||||
PreStop: &api.Handler{
|
||||
HTTPGet: &api.HTTPGetAction{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&kubecontainer.Pod{
|
||||
Containers: []*kubecontainer.Container{
|
||||
{ID: kubecontainer.BuildContainerID("rkt", "id-1")},
|
||||
{ID: kubecontainer.BuildContainerID("rkt", "id-2")},
|
||||
},
|
||||
},
|
||||
[]string{
|
||||
"exec on pod: pod-1_ns-1(uid-1), container: container-name-1: rkt://id-1",
|
||||
"http-get on pod: pod-1_ns-1(uid-1), container: container-name-2: rkt://id-2",
|
||||
},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
// Case 2, one container with invalid hooks.
|
||||
&api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "pod-1",
|
||||
Namespace: "ns-1",
|
||||
UID: "uid-1",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: "container-name-1",
|
||||
Lifecycle: &api.Lifecycle{
|
||||
PreStop: &api.Handler{},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
&kubecontainer.Pod{
|
||||
Containers: []*kubecontainer.Container{
|
||||
{ID: kubecontainer.BuildContainerID("rkt", "id-1")},
|
||||
},
|
||||
},
|
||||
[]string{},
|
||||
errors.NewAggregate([]error{fmt.Errorf("Invalid handler: %v", &api.Handler{})}),
|
||||
},
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
testCaseHint := fmt.Sprintf("test case #%d", i)
|
||||
|
||||
// Run pre-stop hooks.
|
||||
err := rkt.runPreStopHook(tt.pod, tt.runtimePod)
|
||||
assert.Equal(t, tt.err, err, testCaseHint)
|
||||
|
||||
sort.Sort(sortedStringList(tt.preStopRuns))
|
||||
sort.Sort(sortedStringList(runner.HandlerRuns))
|
||||
|
||||
assert.Equal(t, tt.preStopRuns, runner.HandlerRuns, testCaseHint)
|
||||
|
||||
runner.Reset()
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user