Merge pull request #24344 from derekwaynecarr/kubelet-lifecycle-callouts

Automatic merge from submit-queue

Define interfaces for kubelet pod admission and eviction

There is too much code and logic in `kubelet.go` that makes it hard to test functions in discrete pieces.

I propose an interface that an internal module can implement that will let it make an admission decision for a pod.  If folks are ok with the pattern, I want to move the a) predicate checking, b) out of disk, c) eviction preventing best-effort pods being admitted into their own dedicated handlers that would be easier for us to mock test.  We can then just write tests to ensure that the `Kubelet` calls a call-out, and we can write easier unit tests to ensure that dedicated handlers do the right thing.

The second interface I propose was a `PodEvictor` that is invoked in the main kubelet sync loop to know if pods should be pro-actively evicted from the machine.  The current active deadline check should move into a simple evictor implementation, and I want to plug the out of resource killer code path as an implementation of the same interface.

 @vishh @timothysc - if you guys can ack on this, I will add some unit testing to ensure we do the call-outs.

/cc @kubernetes/sig-node @kubernetes/rh-cluster-infra
This commit is contained in:
k8s-merge-robot 2016-05-06 08:53:35 -07:00
commit 16159b8bd0
4 changed files with 360 additions and 2 deletions

View File

@ -52,6 +52,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/pleg"
@ -777,6 +778,16 @@ type Kubelet struct {
// handlers called during the tryUpdateNodeStatus cycle
setNodeStatusFuncs []func(*api.Node) error
// TODO: think about moving this to be centralized in PodWorkers in follow-on.
// the list of handlers to call during pod admission.
lifecycle.PodAdmitHandlers
// the list of handlers to call during pod sync loop.
lifecycle.PodSyncLoopHandlers
// the list of handlers to call during pod sync.
lifecycle.PodSyncHandlers
}
// Validate given node IP belongs to the current host
@ -1722,7 +1733,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// Kill pod if it should not be running
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed {
if err := kl.killPod(pod, nil, podStatus); err != nil {
utilruntime.HandleError(err)
}
@ -2025,6 +2036,7 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// * pod whose work is ready.
// * pod past the active deadline.
// * internal modules that request sync of a pod.
func (kl *Kubelet) getPodsToSync() []*api.Pod {
allPods := kl.podManager.GetPods()
podUIDs := kl.workQueue.GetWork()
@ -2034,6 +2046,7 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod {
}
var podsToSync []*api.Pod
for _, pod := range allPods {
// TODO: move active deadline code into a sync/evict pattern
if kl.pastActiveDeadline(pod) {
// The pod has passed the active deadline
podsToSync = append(podsToSync, pod)
@ -2042,6 +2055,13 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod {
if podUIDSet.Has(string(pod.UID)) {
// The work of the pod is ready
podsToSync = append(podsToSync, pod)
continue
}
for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
if podSyncLoopHandler.ShouldSync(pod) {
podsToSync = append(podsToSync, pod)
break
}
}
}
return podsToSync
@ -2337,6 +2357,18 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
otherPods = append(otherPods, p)
}
}
// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
// TODO: move predicate check into a pod admitter
// TODO: move out of disk check into a pod admitter
// TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: otherPods}
for _, podAdmitHandler := range kl.PodAdmitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
nodeInfo := schedulercache.NewNodeInfo(otherPods...)
nodeInfo.SetNode(node)
fit, err := predicates.GeneralPredicates(pod, nodeInfo)
@ -2364,6 +2396,7 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to isOutOfDisk")
return false, "OutOfDisk", "cannot be started due to lack of disk space."
}
return true, "", ""
}
@ -3286,7 +3319,20 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
// internal pod status.
func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
glog.V(3).Infof("Generating status for %q", format.Pod(pod))
// check if an internal module has requested the pod is evicted.
for _, podSyncHandler := range kl.PodSyncHandlers {
if result := podSyncHandler.ShouldEvict(pod); result.Evict {
return api.PodStatus{
Phase: api.PodFailed,
Reason: result.Reason,
Message: result.Message,
}
}
}
// TODO: Consider include the container information.
// TODO: move this into a sync/evictor
if kl.pastActiveDeadline(pod) {
reason := "DeadlineExceeded"
kl.recorder.Eventf(pod, api.EventTypeNormal, reason, "Pod was active on the node longer than specified deadline")

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/network"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
"k8s.io/kubernetes/pkg/kubelet/pleg"
@ -4538,3 +4539,191 @@ func TestGenerateAPIPodStatusWithReasonCache(t *testing.T) {
verifyStatus(t, i, apiStatus, test.expectedState, test.expectedLastTerminationState)
}
}
// testPodAdmitHandler is a lifecycle.PodAdmitHandler for testing.
type testPodAdmitHandler struct {
// list of pods to reject.
podsToReject []*api.Pod
}
// Admit rejects all pods in the podsToReject list with a matching UID.
func (a *testPodAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
for _, podToReject := range a.podsToReject {
if podToReject.UID == attrs.Pod.UID {
return lifecycle.PodAdmitResult{Admit: false, Reason: "Rejected", Message: "Pod is rejected"}
}
}
return lifecycle.PodAdmitResult{Admit: true}
}
// Test verifies that the kubelet invokes an admission handler during HandlePodAdditions.
func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
testKubelet := newTestKubelet(t)
kl := testKubelet.kubelet
kl.nodeLister = testNodeLister{nodes: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: kl.nodeName},
Status: api.NodeStatus{
Allocatable: api.ResourceList{
api.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
},
},
},
}}
kl.nodeInfo = testNodeInfo{nodes: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: kl.nodeName},
Status: api.NodeStatus{
Allocatable: api.ResourceList{
api.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
},
},
},
}}
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
pods := []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "123456789",
Name: "podA",
Namespace: "foo",
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "987654321",
Name: "podB",
Namespace: "foo",
},
},
}
podToReject := pods[0]
podToAdmit := pods[1]
podsToReject := []*api.Pod{podToReject}
kl.AddPodAdmitHandler(&testPodAdmitHandler{podsToReject: podsToReject})
kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
// podToReject should be Failed
status, found := kl.statusManager.GetPodStatus(podToReject.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", podToReject.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
// podToAdmit should be Pending
status, found = kl.statusManager.GetPodStatus(podToAdmit.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", podToAdmit.UID)
}
if status.Phase != api.PodPending {
t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase)
}
}
// testPodSyncLoopHandler is a lifecycle.PodSyncLoopHandler that is used for testing.
type testPodSyncLoopHandler struct {
// list of pods to sync
podsToSync []*api.Pod
}
// ShouldSync evaluates if the pod should be synced from the kubelet.
func (a *testPodSyncLoopHandler) ShouldSync(pod *api.Pod) bool {
for _, podToSync := range a.podsToSync {
if podToSync.UID == pod.UID {
return true
}
}
return false
}
// TestGetPodsToSyncInvokesPodSyncLoopHandlers ensures that the get pods to sync routine invokes the handler.
func TestGetPodsToSyncInvokesPodSyncLoopHandlers(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
pods := newTestPods(5)
podUIDs := []types.UID{}
for _, pod := range pods {
podUIDs = append(podUIDs, pod.UID)
}
podsToSync := []*api.Pod{pods[0]}
kubelet.AddPodSyncLoopHandler(&testPodSyncLoopHandler{podsToSync})
kubelet.podManager.SetPods(pods)
expectedPodsUID := []types.UID{pods[0].UID}
podsToSync = kubelet.getPodsToSync()
if len(podsToSync) == len(expectedPodsUID) {
var rightNum int
for _, podUID := range expectedPodsUID {
for _, podToSync := range podsToSync {
if podToSync.UID == podUID {
rightNum++
break
}
}
}
if rightNum != len(expectedPodsUID) {
// Just for report error
podsToSyncUID := []types.UID{}
for _, podToSync := range podsToSync {
podsToSyncUID = append(podsToSyncUID, podToSync.UID)
}
t.Errorf("expected pods %v to sync, got %v", expectedPodsUID, podsToSyncUID)
}
} else {
t.Errorf("expected %d pods to sync, got %d", 3, len(podsToSync))
}
}
// testPodSyncHandler is a lifecycle.PodSyncHandler that is used for testing.
type testPodSyncHandler struct {
// list of pods to evict.
podsToEvict []*api.Pod
// the reason for the eviction
reason string
// the mesage for the eviction
message string
}
// ShouldEvict evaluates if the pod should be evicted from the kubelet.
func (a *testPodSyncHandler) ShouldEvict(pod *api.Pod) lifecycle.ShouldEvictResponse {
for _, podToEvict := range a.podsToEvict {
if podToEvict.UID == pod.UID {
return lifecycle.ShouldEvictResponse{Evict: true, Reason: a.reason, Message: a.message}
}
}
return lifecycle.ShouldEvictResponse{Evict: false}
}
// TestGenerateAPIPodStatusInvokesPodSyncHandlers invokes the handlers and reports the proper status
func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
pod := newTestPods(1)[0]
podsToEvict := []*api.Pod{pod}
kubelet.AddPodSyncHandler(&testPodSyncHandler{podsToEvict, "Evicted", "because"})
status := &kubecontainer.PodStatus{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
}
apiStatus := kubelet.generateAPIPodStatus(pod, status)
if apiStatus.Phase != api.PodFailed {
t.Fatalf("Expected phase %v, but got %v", api.PodFailed, apiStatus.Phase)
}
if apiStatus.Reason != "Evicted" {
t.Fatalf("Expected reason %v, but got %v", "Evicted", apiStatus.Reason)
}
if apiStatus.Message != "because" {
t.Fatalf("Expected message %v, but got %v", "because", apiStatus.Message)
}
}

View File

@ -14,5 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Handlers for pod lifecycle events.
// Handlers for pod lifecycle events and interfaces to integrate
// with kubelet admission, synchronization, and eviction of pods.
package lifecycle

View File

@ -0,0 +1,122 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lifecycle
import "k8s.io/kubernetes/pkg/api"
// PodAdmitAttributes is the context for a pod admission decision.
// The member fields of this struct should never be mutated.
type PodAdmitAttributes struct {
// the pod to evaluate for admission
Pod *api.Pod
// all pods bound to the kubelet excluding the pod being evaluated
OtherPods []*api.Pod
}
// PodAdmitResult provides the result of a pod admission decision.
type PodAdmitResult struct {
// if true, the pod should be admitted.
Admit bool
// a brief single-word reason why the pod could not be admitted.
Reason string
// a brief message explaining why the pod could not be admitted.
Message string
}
// PodAdmitHandler is notified during pod admission.
type PodAdmitHandler interface {
// Admit evaluates if a pod can be admitted.
Admit(attrs *PodAdmitAttributes) PodAdmitResult
}
// PodAdmitTarget maintains a list of handlers to invoke.
type PodAdmitTarget interface {
// AddPodAdmitHandler adds the specified handler.
AddPodAdmitHandler(a PodAdmitHandler)
}
// PodSyncLoopHandler is invoked during each sync loop iteration.
type PodSyncLoopHandler interface {
// ShouldSync returns true if the pod needs to be synced.
// This operation must return immediately as its called for each pod.
// The provided pod should never be modified.
ShouldSync(pod *api.Pod) bool
}
// PodSyncLoopTarget maintains a list of handlers to pod sync loop.
type PodSyncLoopTarget interface {
// AddPodSyncLoopHandler adds the specified handler.
AddPodSyncLoopHandler(a PodSyncLoopHandler)
}
// ShouldEvictResponse provides the result of a should evict request.
type ShouldEvictResponse struct {
// if true, the pod should be evicted.
Evict bool
// a brief CamelCase reason why the pod should be evicted.
Reason string
// a brief message why the pod should be evicted.
Message string
}
// PodSyncHandler is invoked during each sync pod operation.
type PodSyncHandler interface {
// ShouldEvict is invoked during each sync pod operation to determine
// if the pod should be evicted from the kubelet. If so, the pod status
// is updated to mark its phase as failed with the provided reason and message,
// and the pod is immediately killed.
// This operation must return immediately as its called for each sync pod.
// The provided pod should never be modified.
ShouldEvict(pod *api.Pod) ShouldEvictResponse
}
// PodSyncTarget maintains a list of handlers to pod sync.
type PodSyncTarget interface {
// AddPodSyncHandler adds the specified handler
AddPodSyncHandler(a PodSyncHandler)
}
// PodLifecycleTarget groups a set of lifecycle interfaces for convenience.
type PodLifecycleTarget interface {
PodAdmitTarget
PodSyncLoopTarget
PodSyncTarget
}
// PodAdmitHandlers maintains a list of handlers to pod admission.
type PodAdmitHandlers []PodAdmitHandler
// AddPodAdmitHandler adds the specified observer.
func (handlers *PodAdmitHandlers) AddPodAdmitHandler(a PodAdmitHandler) {
*handlers = append(*handlers, a)
}
// PodSyncLoopHandlers maintains a list of handlers to pod sync loop.
type PodSyncLoopHandlers []PodSyncLoopHandler
// AddPodSyncLoopHandler adds the specified observer.
func (handlers *PodSyncLoopHandlers) AddPodSyncLoopHandler(a PodSyncLoopHandler) {
*handlers = append(*handlers, a)
}
// PodSyncHandlers maintains a list of handlers to pod sync.
type PodSyncHandlers []PodSyncHandler
// AddPodSyncHandler adds the specified handler.
func (handlers *PodSyncHandlers) AddPodSyncHandler(a PodSyncHandler) {
*handlers = append(*handlers, a)
}