From 033ae3e37ec0c5092fae957590e4f524bdf98ad2 Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Fri, 15 Apr 2016 14:17:17 -0400 Subject: [PATCH] Define interfaces for kubelet observing admission, sync loop, sync pod --- pkg/kubelet/kubelet.go | 48 ++++++- pkg/kubelet/kubelet_test.go | 189 ++++++++++++++++++++++++++++ pkg/kubelet/lifecycle/doc.go | 3 +- pkg/kubelet/lifecycle/interfaces.go | 122 ++++++++++++++++++ 4 files changed, 360 insertions(+), 2 deletions(-) create mode 100644 pkg/kubelet/lifecycle/interfaces.go diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 91f8665ce98..0f0fb16ce67 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -52,6 +52,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/envvars" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/pleg" @@ -778,6 +779,16 @@ type Kubelet struct { // handlers called during the tryUpdateNodeStatus cycle setNodeStatusFuncs []func(*api.Node) error + + // TODO: think about moving this to be centralized in PodWorkers in follow-on. + // the list of handlers to call during pod admission. + lifecycle.PodAdmitHandlers + + // the list of handlers to call during pod sync loop. + lifecycle.PodSyncLoopHandlers + + // the list of handlers to call during pod sync. + lifecycle.PodSyncHandlers } // Validate given node IP belongs to the current host @@ -1839,7 +1850,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont kl.statusManager.SetPodStatus(pod, apiPodStatus) // Kill pod if it should not be running - if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil { + if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed { if err := kl.killPod(pod, nil, podStatus); err != nil { utilruntime.HandleError(err) } @@ -2142,6 +2153,7 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool { // Get pods which should be resynchronized. Currently, the following pod should be resynchronized: // * pod whose work is ready. // * pod past the active deadline. +// * internal modules that request sync of a pod. func (kl *Kubelet) getPodsToSync() []*api.Pod { allPods := kl.podManager.GetPods() podUIDs := kl.workQueue.GetWork() @@ -2151,6 +2163,7 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod { } var podsToSync []*api.Pod for _, pod := range allPods { + // TODO: move active deadline code into a sync/evict pattern if kl.pastActiveDeadline(pod) { // The pod has passed the active deadline podsToSync = append(podsToSync, pod) @@ -2159,6 +2172,13 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod { if podUIDSet.Has(string(pod.UID)) { // The work of the pod is ready podsToSync = append(podsToSync, pod) + continue + } + for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers { + if podSyncLoopHandler.ShouldSync(pod) { + podsToSync = append(podsToSync, pod) + break + } } } return podsToSync @@ -2468,6 +2488,18 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str otherPods = append(otherPods, p) } } + + // the kubelet will invoke each pod admit handler in sequence + // if any handler rejects, the pod is rejected. + // TODO: move predicate check into a pod admitter + // TODO: move out of disk check into a pod admitter + // TODO: out of resource eviction should have a pod admitter call-out + attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: otherPods} + for _, podAdmitHandler := range kl.PodAdmitHandlers { + if result := podAdmitHandler.Admit(attrs); !result.Admit { + return false, result.Reason, result.Message + } + } nodeInfo := schedulercache.NewNodeInfo(otherPods...) nodeInfo.SetNode(node) fit, err := predicates.GeneralPredicates(pod, kl.nodeName, nodeInfo) @@ -2495,6 +2527,7 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to isOutOfDisk") return false, "OutOfDisk", "cannot be started due to lack of disk space." } + return true, "", "" } @@ -3461,7 +3494,20 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase { // internal pod status. func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus { glog.V(3).Infof("Generating status for %q", format.Pod(pod)) + + // check if an internal module has requested the pod is evicted. + for _, podSyncHandler := range kl.PodSyncHandlers { + if result := podSyncHandler.ShouldEvict(pod); result.Evict { + return api.PodStatus{ + Phase: api.PodFailed, + Reason: result.Reason, + Message: result.Message, + } + } + } + // TODO: Consider include the container information. + // TODO: move this into a sync/evictor if kl.pastActiveDeadline(pod) { reason := "DeadlineExceeded" kl.recorder.Eventf(pod, api.EventTypeNormal, reason, "Pod was active on the node longer than specified deadline") diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index de92dd462c9..566675b8cf8 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -46,6 +46,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/cm" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" containertest "k8s.io/kubernetes/pkg/kubelet/container/testing" + "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/network" nettest "k8s.io/kubernetes/pkg/kubelet/network/testing" "k8s.io/kubernetes/pkg/kubelet/pleg" @@ -4693,3 +4694,191 @@ func TestGenerateAPIPodStatusWithReasonCache(t *testing.T) { verifyStatus(t, i, apiStatus, test.expectedState, test.expectedLastTerminationState) } } + +// testPodAdmitHandler is a lifecycle.PodAdmitHandler for testing. +type testPodAdmitHandler struct { + // list of pods to reject. + podsToReject []*api.Pod +} + +// Admit rejects all pods in the podsToReject list with a matching UID. +func (a *testPodAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult { + for _, podToReject := range a.podsToReject { + if podToReject.UID == attrs.Pod.UID { + return lifecycle.PodAdmitResult{Admit: false, Reason: "Rejected", Message: "Pod is rejected"} + } + } + return lifecycle.PodAdmitResult{Admit: true} +} + +// Test verifies that the kubelet invokes an admission handler during HandlePodAdditions. +func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) { + testKubelet := newTestKubelet(t) + kl := testKubelet.kubelet + kl.nodeLister = testNodeLister{nodes: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: kl.nodeName}, + Status: api.NodeStatus{ + Allocatable: api.ResourceList{ + api.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + }, + }, + }} + kl.nodeInfo = testNodeInfo{nodes: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: kl.nodeName}, + Status: api.NodeStatus{ + Allocatable: api.ResourceList{ + api.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI), + }, + }, + }, + }} + testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil) + testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) + testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil) + + pods := []*api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "123456789", + Name: "podA", + Namespace: "foo", + }, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "987654321", + Name: "podB", + Namespace: "foo", + }, + }, + } + podToReject := pods[0] + podToAdmit := pods[1] + podsToReject := []*api.Pod{podToReject} + + kl.AddPodAdmitHandler(&testPodAdmitHandler{podsToReject: podsToReject}) + + kl.HandlePodAdditions(pods) + // Check pod status stored in the status map. + // podToReject should be Failed + status, found := kl.statusManager.GetPodStatus(podToReject.UID) + if !found { + t.Fatalf("status of pod %q is not found in the status map", podToReject.UID) + } + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) + } + // podToAdmit should be Pending + status, found = kl.statusManager.GetPodStatus(podToAdmit.UID) + if !found { + t.Fatalf("status of pod %q is not found in the status map", podToAdmit.UID) + } + if status.Phase != api.PodPending { + t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase) + } +} + +// testPodSyncLoopHandler is a lifecycle.PodSyncLoopHandler that is used for testing. +type testPodSyncLoopHandler struct { + // list of pods to sync + podsToSync []*api.Pod +} + +// ShouldSync evaluates if the pod should be synced from the kubelet. +func (a *testPodSyncLoopHandler) ShouldSync(pod *api.Pod) bool { + for _, podToSync := range a.podsToSync { + if podToSync.UID == pod.UID { + return true + } + } + return false +} + +// TestGetPodsToSyncInvokesPodSyncLoopHandlers ensures that the get pods to sync routine invokes the handler. +func TestGetPodsToSyncInvokesPodSyncLoopHandlers(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + pods := newTestPods(5) + podUIDs := []types.UID{} + for _, pod := range pods { + podUIDs = append(podUIDs, pod.UID) + } + podsToSync := []*api.Pod{pods[0]} + kubelet.AddPodSyncLoopHandler(&testPodSyncLoopHandler{podsToSync}) + + kubelet.podManager.SetPods(pods) + + expectedPodsUID := []types.UID{pods[0].UID} + + podsToSync = kubelet.getPodsToSync() + + if len(podsToSync) == len(expectedPodsUID) { + var rightNum int + for _, podUID := range expectedPodsUID { + for _, podToSync := range podsToSync { + if podToSync.UID == podUID { + rightNum++ + break + } + } + } + if rightNum != len(expectedPodsUID) { + // Just for report error + podsToSyncUID := []types.UID{} + for _, podToSync := range podsToSync { + podsToSyncUID = append(podsToSyncUID, podToSync.UID) + } + t.Errorf("expected pods %v to sync, got %v", expectedPodsUID, podsToSyncUID) + } + + } else { + t.Errorf("expected %d pods to sync, got %d", 3, len(podsToSync)) + } +} + +// testPodSyncHandler is a lifecycle.PodSyncHandler that is used for testing. +type testPodSyncHandler struct { + // list of pods to evict. + podsToEvict []*api.Pod + // the reason for the eviction + reason string + // the mesage for the eviction + message string +} + +// ShouldEvict evaluates if the pod should be evicted from the kubelet. +func (a *testPodSyncHandler) ShouldEvict(pod *api.Pod) lifecycle.ShouldEvictResponse { + for _, podToEvict := range a.podsToEvict { + if podToEvict.UID == pod.UID { + return lifecycle.ShouldEvictResponse{Evict: true, Reason: a.reason, Message: a.message} + } + } + return lifecycle.ShouldEvictResponse{Evict: false} +} + +// TestGenerateAPIPodStatusInvokesPodSyncHandlers invokes the handlers and reports the proper status +func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) { + testKubelet := newTestKubelet(t) + kubelet := testKubelet.kubelet + pod := newTestPods(1)[0] + podsToEvict := []*api.Pod{pod} + kubelet.AddPodSyncHandler(&testPodSyncHandler{podsToEvict, "Evicted", "because"}) + status := &kubecontainer.PodStatus{ + ID: pod.UID, + Name: pod.Name, + Namespace: pod.Namespace, + } + apiStatus := kubelet.generateAPIPodStatus(pod, status) + if apiStatus.Phase != api.PodFailed { + t.Fatalf("Expected phase %v, but got %v", api.PodFailed, apiStatus.Phase) + } + if apiStatus.Reason != "Evicted" { + t.Fatalf("Expected reason %v, but got %v", "Evicted", apiStatus.Reason) + } + if apiStatus.Message != "because" { + t.Fatalf("Expected message %v, but got %v", "because", apiStatus.Message) + } +} diff --git a/pkg/kubelet/lifecycle/doc.go b/pkg/kubelet/lifecycle/doc.go index d10db1bca5e..f398ca060cc 100644 --- a/pkg/kubelet/lifecycle/doc.go +++ b/pkg/kubelet/lifecycle/doc.go @@ -14,5 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Handlers for pod lifecycle events. +// Handlers for pod lifecycle events and interfaces to integrate +// with kubelet admission, synchronization, and eviction of pods. package lifecycle diff --git a/pkg/kubelet/lifecycle/interfaces.go b/pkg/kubelet/lifecycle/interfaces.go new file mode 100644 index 00000000000..0dedd4510a7 --- /dev/null +++ b/pkg/kubelet/lifecycle/interfaces.go @@ -0,0 +1,122 @@ +/* +Copyright 2016 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lifecycle + +import "k8s.io/kubernetes/pkg/api" + +// PodAdmitAttributes is the context for a pod admission decision. +// The member fields of this struct should never be mutated. +type PodAdmitAttributes struct { + // the pod to evaluate for admission + Pod *api.Pod + // all pods bound to the kubelet excluding the pod being evaluated + OtherPods []*api.Pod +} + +// PodAdmitResult provides the result of a pod admission decision. +type PodAdmitResult struct { + // if true, the pod should be admitted. + Admit bool + // a brief single-word reason why the pod could not be admitted. + Reason string + // a brief message explaining why the pod could not be admitted. + Message string +} + +// PodAdmitHandler is notified during pod admission. +type PodAdmitHandler interface { + // Admit evaluates if a pod can be admitted. + Admit(attrs *PodAdmitAttributes) PodAdmitResult +} + +// PodAdmitTarget maintains a list of handlers to invoke. +type PodAdmitTarget interface { + // AddPodAdmitHandler adds the specified handler. + AddPodAdmitHandler(a PodAdmitHandler) +} + +// PodSyncLoopHandler is invoked during each sync loop iteration. +type PodSyncLoopHandler interface { + // ShouldSync returns true if the pod needs to be synced. + // This operation must return immediately as its called for each pod. + // The provided pod should never be modified. + ShouldSync(pod *api.Pod) bool +} + +// PodSyncLoopTarget maintains a list of handlers to pod sync loop. +type PodSyncLoopTarget interface { + // AddPodSyncLoopHandler adds the specified handler. + AddPodSyncLoopHandler(a PodSyncLoopHandler) +} + +// ShouldEvictResponse provides the result of a should evict request. +type ShouldEvictResponse struct { + // if true, the pod should be evicted. + Evict bool + // a brief CamelCase reason why the pod should be evicted. + Reason string + // a brief message why the pod should be evicted. + Message string +} + +// PodSyncHandler is invoked during each sync pod operation. +type PodSyncHandler interface { + // ShouldEvict is invoked during each sync pod operation to determine + // if the pod should be evicted from the kubelet. If so, the pod status + // is updated to mark its phase as failed with the provided reason and message, + // and the pod is immediately killed. + // This operation must return immediately as its called for each sync pod. + // The provided pod should never be modified. + ShouldEvict(pod *api.Pod) ShouldEvictResponse +} + +// PodSyncTarget maintains a list of handlers to pod sync. +type PodSyncTarget interface { + // AddPodSyncHandler adds the specified handler + AddPodSyncHandler(a PodSyncHandler) +} + +// PodLifecycleTarget groups a set of lifecycle interfaces for convenience. +type PodLifecycleTarget interface { + PodAdmitTarget + PodSyncLoopTarget + PodSyncTarget +} + +// PodAdmitHandlers maintains a list of handlers to pod admission. +type PodAdmitHandlers []PodAdmitHandler + +// AddPodAdmitHandler adds the specified observer. +func (handlers *PodAdmitHandlers) AddPodAdmitHandler(a PodAdmitHandler) { + *handlers = append(*handlers, a) +} + +// PodSyncLoopHandlers maintains a list of handlers to pod sync loop. +type PodSyncLoopHandlers []PodSyncLoopHandler + +// AddPodSyncLoopHandler adds the specified observer. +func (handlers *PodSyncLoopHandlers) AddPodSyncLoopHandler(a PodSyncLoopHandler) { + *handlers = append(*handlers, a) +} + +// PodSyncHandlers maintains a list of handlers to pod sync. +type PodSyncHandlers []PodSyncHandler + +// AddPodSyncHandler adds the specified handler. +func (handlers *PodSyncHandlers) AddPodSyncHandler(a PodSyncHandler) { + *handlers = append(*handlers, a) +}