diff --git a/pkg/kubelet/rkt/container_id.go b/pkg/kubelet/rkt/container_id.go index bb8a23284d3..73810ed70a5 100644 --- a/pkg/kubelet/rkt/container_id.go +++ b/pkg/kubelet/rkt/container_id.go @@ -46,7 +46,7 @@ func buildContainerID(c *containerID) kubecontainer.ContainerID { func parseContainerID(id kubecontainer.ContainerID) (*containerID, error) { tuples := strings.Split(id.ID, ":") if len(tuples) != 2 { - return nil, fmt.Errorf("rkt: cannot parse container ID for: %v", id) + return nil, fmt.Errorf("rkt: cannot parse container ID for: %q, required format is [UUID:APPNAME]", id) } return &containerID{ uuid: tuples[0], diff --git a/pkg/kubelet/rkt/fake_rkt_interface_test.go b/pkg/kubelet/rkt/fake_rkt_interface_test.go index d028d9efc30..83b3874f7af 100644 --- a/pkg/kubelet/rkt/fake_rkt_interface_test.go +++ b/pkg/kubelet/rkt/fake_rkt_interface_test.go @@ -79,7 +79,7 @@ func (f *fakeRktInterface) InspectPod(ctx context.Context, in *rktapi.InspectPod return &rktapi.InspectPodResponse{Pod: pod}, f.err } } - return &rktapi.InspectPodResponse{Pod: nil}, f.err + return &rktapi.InspectPodResponse{}, fmt.Errorf("pod %q not found", in.Id) } func (f *fakeRktInterface) ListImages(ctx context.Context, in *rktapi.ListImagesRequest, opts ...grpc.CallOption) (*rktapi.ListImagesResponse, error) { diff --git a/pkg/kubelet/rkt/rkt.go b/pkg/kubelet/rkt/rkt.go index 306dd6a6048..a29cc08d69e 100644 --- a/pkg/kubelet/rkt/rkt.go +++ b/pkg/kubelet/rkt/rkt.go @@ -55,6 +55,7 @@ import ( "k8s.io/kubernetes/pkg/util/flowcontrol" "k8s.io/kubernetes/pkg/util/sets" utilstrings "k8s.io/kubernetes/pkg/util/strings" + utilwait "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -1031,10 +1032,63 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error { } r.generateEvents(runtimePod, "Started", nil) + + // This is a temporary solution until we have a clean design on how + // kubelet handles events. See https://github.com/kubernetes/kubernetes/issues/23084. + if err := r.runLifecycleHooks(pod, runtimePod, lifecyclePostStartHook); err != nil { + if errKill := r.KillPod(pod, *runtimePod); errKill != nil { + return errors.NewAggregate([]error{err, errKill}) + } + return err + } + return nil } -func (r *Runtime) runPreStopHook(pod *api.Pod, runtimePod *kubecontainer.Pod) error { +func (r *Runtime) runPreStopHook(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container) error { + glog.V(4).Infof("rkt: Running pre-stop hook for container %q of pod %q", container.Name, format.Pod(pod)) + return r.runner.Run(containerID, pod, container, container.Lifecycle.PreStop) +} + +func (r *Runtime) runPostStartHook(containerID kubecontainer.ContainerID, pod *api.Pod, container *api.Container) error { + glog.V(4).Infof("rkt: Running post-start hook for container %q of pod %q", container.Name, format.Pod(pod)) + cid, err := parseContainerID(containerID) + if err != nil { + return fmt.Errorf("cannot parse container ID %v", containerID) + } + + isContainerRunning := func() (done bool, err error) { + resp, err := r.apisvc.InspectPod(context.Background(), &rktapi.InspectPodRequest{Id: cid.uuid}) + if err != nil { + return false, fmt.Errorf("failed to inspect rkt pod %q for pod %q", cid.uuid, format.Pod(pod)) + } + + for _, app := range resp.Pod.Apps { + if app.Name == cid.appName { + return app.State == rktapi.AppState_APP_STATE_RUNNING, nil + } + } + return false, fmt.Errorf("failed to find container %q in rkt pod %q", cid.appName, cid.uuid) + } + + // TODO(yifan): Polling the pod's state for now. + timeout := time.Second * 5 + pollInterval := time.Millisecond * 500 + if err := utilwait.Poll(pollInterval, timeout, isContainerRunning); err != nil { + return fmt.Errorf("rkt: Pod %q doesn't become running in %v: %v", format.Pod(pod), timeout, err) + } + + return r.runner.Run(containerID, pod, container, container.Lifecycle.PostStart) +} + +type lifecycleHookType string + +const ( + lifecyclePostStartHook lifecycleHookType = "post-start" + lifecyclePreStopHook lifecycleHookType = "pre-stop" +) + +func (r *Runtime) runLifecycleHooks(pod *api.Pod, runtimePod *kubecontainer.Pod, typ lifecycleHookType) error { var wg sync.WaitGroup var errlist []error errCh := make(chan error, len(pod.Spec.Containers)) @@ -1042,21 +1096,43 @@ func (r *Runtime) runPreStopHook(pod *api.Pod, runtimePod *kubecontainer.Pod) er wg.Add(len(pod.Spec.Containers)) for i, c := range pod.Spec.Containers { - if c.Lifecycle == nil || c.Lifecycle.PreStop == nil { + var hookFunc func(kubecontainer.ContainerID, *api.Pod, *api.Container) error + + switch typ { + case lifecyclePostStartHook: + if c.Lifecycle != nil && c.Lifecycle.PostStart != nil { + hookFunc = r.runPostStartHook + } + case lifecyclePreStopHook: + if c.Lifecycle != nil && c.Lifecycle.PreStop != nil { + hookFunc = r.runPreStopHook + } + default: + errCh <- fmt.Errorf("Unrecognized lifecycle hook type %q for container %q in pod %q", typ, c.Name, format.Pod(pod)) + } + + if hookFunc == nil { wg.Done() continue } - hook := c.Lifecycle.PreStop - containerID := runtimePod.Containers[i].ID container := &pod.Spec.Containers[i] + runtimeContainer := runtimePod.FindContainerByName(container.Name) + if runtimeContainer == nil { + // Container already gone. + wg.Done() + continue + } + containerID := runtimeContainer.ID go func() { - if err := r.runner.Run(containerID, pod, container, hook); err != nil { - glog.Errorf("rkt: Failed to run pre-stop hook for container %q of pod %q: %v", container.Name, format.Pod(pod), err) + defer wg.Done() + if err := hookFunc(containerID, pod, container); err != nil { + glog.Errorf("rkt: Failed to run %s hook for container %q of pod %q: %v", typ, container.Name, format.Pod(pod), err) errCh <- err + } else { + glog.V(4).Infof("rkt: %s hook completed successfully for container %q of pod %q", typ, container.Name, format.Pod(pod)) } - wg.Done() }() } @@ -1188,23 +1264,18 @@ func (r *Runtime) waitPreStopHooks(pod *api.Pod, runningPod *kubecontainer.Pod) gracePeriod = *pod.Spec.TerminationGracePeriodSeconds } - errCh := make(chan error, 1) + done := make(chan struct{}) go func() { - if err := r.runPreStopHook(pod, runningPod); err != nil { - errCh <- err + if err := r.runLifecycleHooks(pod, runningPod, lifecyclePreStopHook); err != nil { + glog.Errorf("rkt: Some pre-stop hooks failed for pod %q: %v", format.Pod(pod), err) } - close(errCh) + close(done) }() select { case <-time.After(time.Duration(gracePeriod) * time.Second): - glog.V(2).Infof("rkt: Some pre-stop hooks did not complete in %d seconds for pod %v", gracePeriod, format.Pod(pod)) - case err := <-errCh: - if err != nil { - glog.Errorf("rkt: Some pre-stop hooks failed for pod %v: %v", format.Pod(pod), err) - } else { - glog.V(4).Infof("rkt: pre-stop hooks for pod %v completed", format.Pod(pod)) - } + glog.V(2).Infof("rkt: Some pre-stop hooks did not complete in %d seconds for pod %q", gracePeriod, format.Pod(pod)) + case <-done: } } diff --git a/pkg/kubelet/rkt/rkt_test.go b/pkg/kubelet/rkt/rkt_test.go index a721dcb7833..29edb0d2017 100644 --- a/pkg/kubelet/rkt/rkt_test.go +++ b/pkg/kubelet/rkt/rkt_test.go @@ -1204,7 +1204,7 @@ func TestGenerateRunCommand(t *testing.T) { } } -func TestPreStopHooks(t *testing.T) { +func TestLifeCycleHooks(t *testing.T) { runner := lifecycle.NewFakeHandlerRunner() fr := newFakeRktInterface() fs := newFakeSystemd() @@ -1217,10 +1217,11 @@ func TestPreStopHooks(t *testing.T) { } tests := []struct { - pod *api.Pod - runtimePod *kubecontainer.Pod - preStopRuns []string - err error + pod *api.Pod + runtimePod *kubecontainer.Pod + postStartRuns []string + preStopRuns []string + err error }{ { // Case 0, container without any hooks. @@ -1242,10 +1243,11 @@ func TestPreStopHooks(t *testing.T) { }, }, []string{}, + []string{}, nil, }, { - // Case 1, containers with pre-stop hook. + // Case 1, containers with post-start and pre-stop hooks. &api.Pod{ ObjectMeta: api.ObjectMeta{ Name: "pod-1", @@ -1257,13 +1259,29 @@ func TestPreStopHooks(t *testing.T) { { Name: "container-name-1", Lifecycle: &api.Lifecycle{ - PreStop: &api.Handler{ + PostStart: &api.Handler{ Exec: &api.ExecAction{}, }, }, }, { Name: "container-name-2", + Lifecycle: &api.Lifecycle{ + PostStart: &api.Handler{ + HTTPGet: &api.HTTPGetAction{}, + }, + }, + }, + { + Name: "container-name-3", + Lifecycle: &api.Lifecycle{ + PreStop: &api.Handler{ + Exec: &api.ExecAction{}, + }, + }, + }, + { + Name: "container-name-4", Lifecycle: &api.Lifecycle{ PreStop: &api.Handler{ HTTPGet: &api.HTTPGetAction{}, @@ -1275,13 +1293,31 @@ func TestPreStopHooks(t *testing.T) { }, &kubecontainer.Pod{ Containers: []*kubecontainer.Container{ - {ID: kubecontainer.BuildContainerID("rkt", "id-1")}, - {ID: kubecontainer.BuildContainerID("rkt", "id-2")}, + { + ID: kubecontainer.ParseContainerID("rkt://uuid:container-name-4"), + Name: "container-name-4", + }, + { + ID: kubecontainer.ParseContainerID("rkt://uuid:container-name-3"), + Name: "container-name-3", + }, + { + ID: kubecontainer.ParseContainerID("rkt://uuid:container-name-2"), + Name: "container-name-2", + }, + { + ID: kubecontainer.ParseContainerID("rkt://uuid:container-name-1"), + Name: "container-name-1", + }, }, }, []string{ - "exec on pod: pod-1_ns-1(uid-1), container: container-name-1: rkt://id-1", - "http-get on pod: pod-1_ns-1(uid-1), container: container-name-2: rkt://id-2", + "exec on pod: pod-1_ns-1(uid-1), container: container-name-1: rkt://uuid:container-name-1", + "http-get on pod: pod-1_ns-1(uid-1), container: container-name-2: rkt://uuid:container-name-2", + }, + []string{ + "exec on pod: pod-1_ns-1(uid-1), container: container-name-3: rkt://uuid:container-name-3", + "http-get on pod: pod-1_ns-1(uid-1), container: container-name-4: rkt://uuid:container-name-4", }, nil, }, @@ -1298,7 +1334,8 @@ func TestPreStopHooks(t *testing.T) { { Name: "container-name-1", Lifecycle: &api.Lifecycle{ - PreStop: &api.Handler{}, + PostStart: &api.Handler{}, + PreStop: &api.Handler{}, }, }, }, @@ -1306,10 +1343,14 @@ func TestPreStopHooks(t *testing.T) { }, &kubecontainer.Pod{ Containers: []*kubecontainer.Container{ - {ID: kubecontainer.BuildContainerID("rkt", "id-1")}, + { + ID: kubecontainer.ParseContainerID("rkt://uuid:container-name-1"), + Name: "container-name-1", + }, }, }, []string{}, + []string{}, errors.NewAggregate([]error{fmt.Errorf("Invalid handler: %v", &api.Handler{})}), }, } @@ -1317,8 +1358,28 @@ func TestPreStopHooks(t *testing.T) { for i, tt := range tests { testCaseHint := fmt.Sprintf("test case #%d", i) + pod := &rktapi.Pod{Id: "uuid"} + for _, c := range tt.runtimePod.Containers { + pod.Apps = append(pod.Apps, &rktapi.App{ + Name: c.Name, + State: rktapi.AppState_APP_STATE_RUNNING, + }) + } + fr.pods = []*rktapi.Pod{pod} + + // Run post-start hooks + err := rkt.runLifecycleHooks(tt.pod, tt.runtimePod, lifecyclePostStartHook) + assert.Equal(t, tt.err, err, testCaseHint) + + sort.Sort(sortedStringList(tt.postStartRuns)) + sort.Sort(sortedStringList(runner.HandlerRuns)) + + assert.Equal(t, tt.postStartRuns, runner.HandlerRuns, testCaseHint) + + runner.Reset() + // Run pre-stop hooks. - err := rkt.runPreStopHook(tt.pod, tt.runtimePod) + err = rkt.runLifecycleHooks(tt.pod, tt.runtimePod, lifecyclePreStopHook) assert.Equal(t, tt.err, err, testCaseHint) sort.Sort(sortedStringList(tt.preStopRuns))