Define interfaces for kubelet observing admission, sync loop, sync pod

This commit is contained in:
derekwaynecarr 2016-04-15 14:17:17 -04:00
parent 46a6cae14e
commit 033ae3e37e
4 changed files with 360 additions and 2 deletions

View File

@ -52,6 +52,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/pleg"
@ -778,6 +779,16 @@ type Kubelet struct {
// handlers called during the tryUpdateNodeStatus cycle
setNodeStatusFuncs []func(*api.Node) error
// TODO: think about moving this to be centralized in PodWorkers in follow-on.
// the list of handlers to call during pod admission.
lifecycle.PodAdmitHandlers
// the list of handlers to call during pod sync loop.
lifecycle.PodSyncLoopHandlers
// the list of handlers to call during pod sync.
lifecycle.PodSyncHandlers
}
// Validate given node IP belongs to the current host
@ -1839,7 +1850,7 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// Kill pod if it should not be running
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed {
if err := kl.killPod(pod, nil, podStatus); err != nil {
utilruntime.HandleError(err)
}
@ -2142,6 +2153,7 @@ func (kl *Kubelet) pastActiveDeadline(pod *api.Pod) bool {
// Get pods which should be resynchronized. Currently, the following pod should be resynchronized:
// * pod whose work is ready.
// * pod past the active deadline.
// * internal modules that request sync of a pod.
func (kl *Kubelet) getPodsToSync() []*api.Pod {
allPods := kl.podManager.GetPods()
podUIDs := kl.workQueue.GetWork()
@ -2151,6 +2163,7 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod {
}
var podsToSync []*api.Pod
for _, pod := range allPods {
// TODO: move active deadline code into a sync/evict pattern
if kl.pastActiveDeadline(pod) {
// The pod has passed the active deadline
podsToSync = append(podsToSync, pod)
@ -2159,6 +2172,13 @@ func (kl *Kubelet) getPodsToSync() []*api.Pod {
if podUIDSet.Has(string(pod.UID)) {
// The work of the pod is ready
podsToSync = append(podsToSync, pod)
continue
}
for _, podSyncLoopHandler := range kl.PodSyncLoopHandlers {
if podSyncLoopHandler.ShouldSync(pod) {
podsToSync = append(podsToSync, pod)
break
}
}
}
return podsToSync
@ -2468,6 +2488,18 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
otherPods = append(otherPods, p)
}
}
// the kubelet will invoke each pod admit handler in sequence
// if any handler rejects, the pod is rejected.
// TODO: move predicate check into a pod admitter
// TODO: move out of disk check into a pod admitter
// TODO: out of resource eviction should have a pod admitter call-out
attrs := &lifecycle.PodAdmitAttributes{Pod: pod, OtherPods: otherPods}
for _, podAdmitHandler := range kl.PodAdmitHandlers {
if result := podAdmitHandler.Admit(attrs); !result.Admit {
return false, result.Reason, result.Message
}
}
nodeInfo := schedulercache.NewNodeInfo(otherPods...)
nodeInfo.SetNode(node)
fit, err := predicates.GeneralPredicates(pod, kl.nodeName, nodeInfo)
@ -2495,6 +2527,7 @@ func (kl *Kubelet) canAdmitPod(pods []*api.Pod, pod *api.Pod) (bool, string, str
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), "predicate fails due to isOutOfDisk")
return false, "OutOfDisk", "cannot be started due to lack of disk space."
}
return true, "", ""
}
@ -3461,7 +3494,20 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
// internal pod status.
func (kl *Kubelet) generateAPIPodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
glog.V(3).Infof("Generating status for %q", format.Pod(pod))
// check if an internal module has requested the pod is evicted.
for _, podSyncHandler := range kl.PodSyncHandlers {
if result := podSyncHandler.ShouldEvict(pod); result.Evict {
return api.PodStatus{
Phase: api.PodFailed,
Reason: result.Reason,
Message: result.Message,
}
}
}
// TODO: Consider include the container information.
// TODO: move this into a sync/evictor
if kl.pastActiveDeadline(pod) {
reason := "DeadlineExceeded"
kl.recorder.Eventf(pod, api.EventTypeNormal, reason, "Pod was active on the node longer than specified deadline")

View File

@ -46,6 +46,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/network"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
"k8s.io/kubernetes/pkg/kubelet/pleg"
@ -4693,3 +4694,191 @@ func TestGenerateAPIPodStatusWithReasonCache(t *testing.T) {
verifyStatus(t, i, apiStatus, test.expectedState, test.expectedLastTerminationState)
}
}
// testPodAdmitHandler is a lifecycle.PodAdmitHandler for testing.
type testPodAdmitHandler struct {
// list of pods to reject.
podsToReject []*api.Pod
}
// Admit rejects all pods in the podsToReject list with a matching UID.
func (a *testPodAdmitHandler) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
for _, podToReject := range a.podsToReject {
if podToReject.UID == attrs.Pod.UID {
return lifecycle.PodAdmitResult{Admit: false, Reason: "Rejected", Message: "Pod is rejected"}
}
}
return lifecycle.PodAdmitResult{Admit: true}
}
// Test verifies that the kubelet invokes an admission handler during HandlePodAdditions.
func TestHandlePodAdditionsInvokesPodAdmitHandlers(t *testing.T) {
testKubelet := newTestKubelet(t)
kl := testKubelet.kubelet
kl.nodeLister = testNodeLister{nodes: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: kl.nodeName},
Status: api.NodeStatus{
Allocatable: api.ResourceList{
api.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
},
},
},
}}
kl.nodeInfo = testNodeInfo{nodes: []api.Node{
{
ObjectMeta: api.ObjectMeta{Name: kl.nodeName},
Status: api.NodeStatus{
Allocatable: api.ResourceList{
api.ResourcePods: *resource.NewQuantity(110, resource.DecimalSI),
},
},
},
}}
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{}, nil)
testKubelet.fakeCadvisor.On("DockerImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
testKubelet.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
pods := []*api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "123456789",
Name: "podA",
Namespace: "foo",
},
},
{
ObjectMeta: api.ObjectMeta{
UID: "987654321",
Name: "podB",
Namespace: "foo",
},
},
}
podToReject := pods[0]
podToAdmit := pods[1]
podsToReject := []*api.Pod{podToReject}
kl.AddPodAdmitHandler(&testPodAdmitHandler{podsToReject: podsToReject})
kl.HandlePodAdditions(pods)
// Check pod status stored in the status map.
// podToReject should be Failed
status, found := kl.statusManager.GetPodStatus(podToReject.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", podToReject.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
// podToAdmit should be Pending
status, found = kl.statusManager.GetPodStatus(podToAdmit.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", podToAdmit.UID)
}
if status.Phase != api.PodPending {
t.Fatalf("expected pod status %q. Got %q.", api.PodPending, status.Phase)
}
}
// testPodSyncLoopHandler is a lifecycle.PodSyncLoopHandler that is used for testing.
type testPodSyncLoopHandler struct {
// list of pods to sync
podsToSync []*api.Pod
}
// ShouldSync evaluates if the pod should be synced from the kubelet.
func (a *testPodSyncLoopHandler) ShouldSync(pod *api.Pod) bool {
for _, podToSync := range a.podsToSync {
if podToSync.UID == pod.UID {
return true
}
}
return false
}
// TestGetPodsToSyncInvokesPodSyncLoopHandlers ensures that the get pods to sync routine invokes the handler.
func TestGetPodsToSyncInvokesPodSyncLoopHandlers(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
pods := newTestPods(5)
podUIDs := []types.UID{}
for _, pod := range pods {
podUIDs = append(podUIDs, pod.UID)
}
podsToSync := []*api.Pod{pods[0]}
kubelet.AddPodSyncLoopHandler(&testPodSyncLoopHandler{podsToSync})
kubelet.podManager.SetPods(pods)
expectedPodsUID := []types.UID{pods[0].UID}
podsToSync = kubelet.getPodsToSync()
if len(podsToSync) == len(expectedPodsUID) {
var rightNum int
for _, podUID := range expectedPodsUID {
for _, podToSync := range podsToSync {
if podToSync.UID == podUID {
rightNum++
break
}
}
}
if rightNum != len(expectedPodsUID) {
// Just for report error
podsToSyncUID := []types.UID{}
for _, podToSync := range podsToSync {
podsToSyncUID = append(podsToSyncUID, podToSync.UID)
}
t.Errorf("expected pods %v to sync, got %v", expectedPodsUID, podsToSyncUID)
}
} else {
t.Errorf("expected %d pods to sync, got %d", 3, len(podsToSync))
}
}
// testPodSyncHandler is a lifecycle.PodSyncHandler that is used for testing.
type testPodSyncHandler struct {
// list of pods to evict.
podsToEvict []*api.Pod
// the reason for the eviction
reason string
// the mesage for the eviction
message string
}
// ShouldEvict evaluates if the pod should be evicted from the kubelet.
func (a *testPodSyncHandler) ShouldEvict(pod *api.Pod) lifecycle.ShouldEvictResponse {
for _, podToEvict := range a.podsToEvict {
if podToEvict.UID == pod.UID {
return lifecycle.ShouldEvictResponse{Evict: true, Reason: a.reason, Message: a.message}
}
}
return lifecycle.ShouldEvictResponse{Evict: false}
}
// TestGenerateAPIPodStatusInvokesPodSyncHandlers invokes the handlers and reports the proper status
func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) {
testKubelet := newTestKubelet(t)
kubelet := testKubelet.kubelet
pod := newTestPods(1)[0]
podsToEvict := []*api.Pod{pod}
kubelet.AddPodSyncHandler(&testPodSyncHandler{podsToEvict, "Evicted", "because"})
status := &kubecontainer.PodStatus{
ID: pod.UID,
Name: pod.Name,
Namespace: pod.Namespace,
}
apiStatus := kubelet.generateAPIPodStatus(pod, status)
if apiStatus.Phase != api.PodFailed {
t.Fatalf("Expected phase %v, but got %v", api.PodFailed, apiStatus.Phase)
}
if apiStatus.Reason != "Evicted" {
t.Fatalf("Expected reason %v, but got %v", "Evicted", apiStatus.Reason)
}
if apiStatus.Message != "because" {
t.Fatalf("Expected message %v, but got %v", "because", apiStatus.Message)
}
}

View File

@ -14,5 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Handlers for pod lifecycle events.
// Handlers for pod lifecycle events and interfaces to integrate
// with kubelet admission, synchronization, and eviction of pods.
package lifecycle

View File

@ -0,0 +1,122 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package lifecycle
import "k8s.io/kubernetes/pkg/api"
// PodAdmitAttributes is the context for a pod admission decision.
// The member fields of this struct should never be mutated.
type PodAdmitAttributes struct {
// the pod to evaluate for admission
Pod *api.Pod
// all pods bound to the kubelet excluding the pod being evaluated
OtherPods []*api.Pod
}
// PodAdmitResult provides the result of a pod admission decision.
type PodAdmitResult struct {
// if true, the pod should be admitted.
Admit bool
// a brief single-word reason why the pod could not be admitted.
Reason string
// a brief message explaining why the pod could not be admitted.
Message string
}
// PodAdmitHandler is notified during pod admission.
type PodAdmitHandler interface {
// Admit evaluates if a pod can be admitted.
Admit(attrs *PodAdmitAttributes) PodAdmitResult
}
// PodAdmitTarget maintains a list of handlers to invoke.
type PodAdmitTarget interface {
// AddPodAdmitHandler adds the specified handler.
AddPodAdmitHandler(a PodAdmitHandler)
}
// PodSyncLoopHandler is invoked during each sync loop iteration.
type PodSyncLoopHandler interface {
// ShouldSync returns true if the pod needs to be synced.
// This operation must return immediately as its called for each pod.
// The provided pod should never be modified.
ShouldSync(pod *api.Pod) bool
}
// PodSyncLoopTarget maintains a list of handlers to pod sync loop.
type PodSyncLoopTarget interface {
// AddPodSyncLoopHandler adds the specified handler.
AddPodSyncLoopHandler(a PodSyncLoopHandler)
}
// ShouldEvictResponse provides the result of a should evict request.
type ShouldEvictResponse struct {
// if true, the pod should be evicted.
Evict bool
// a brief CamelCase reason why the pod should be evicted.
Reason string
// a brief message why the pod should be evicted.
Message string
}
// PodSyncHandler is invoked during each sync pod operation.
type PodSyncHandler interface {
// ShouldEvict is invoked during each sync pod operation to determine
// if the pod should be evicted from the kubelet. If so, the pod status
// is updated to mark its phase as failed with the provided reason and message,
// and the pod is immediately killed.
// This operation must return immediately as its called for each sync pod.
// The provided pod should never be modified.
ShouldEvict(pod *api.Pod) ShouldEvictResponse
}
// PodSyncTarget maintains a list of handlers to pod sync.
type PodSyncTarget interface {
// AddPodSyncHandler adds the specified handler
AddPodSyncHandler(a PodSyncHandler)
}
// PodLifecycleTarget groups a set of lifecycle interfaces for convenience.
type PodLifecycleTarget interface {
PodAdmitTarget
PodSyncLoopTarget
PodSyncTarget
}
// PodAdmitHandlers maintains a list of handlers to pod admission.
type PodAdmitHandlers []PodAdmitHandler
// AddPodAdmitHandler adds the specified observer.
func (handlers *PodAdmitHandlers) AddPodAdmitHandler(a PodAdmitHandler) {
*handlers = append(*handlers, a)
}
// PodSyncLoopHandlers maintains a list of handlers to pod sync loop.
type PodSyncLoopHandlers []PodSyncLoopHandler
// AddPodSyncLoopHandler adds the specified observer.
func (handlers *PodSyncLoopHandlers) AddPodSyncLoopHandler(a PodSyncLoopHandler) {
*handlers = append(*handlers, a)
}
// PodSyncHandlers maintains a list of handlers to pod sync.
type PodSyncHandlers []PodSyncHandler
// AddPodSyncHandler adds the specified handler.
func (handlers *PodSyncHandlers) AddPodSyncHandler(a PodSyncHandler) {
*handlers = append(*handlers, a)
}