diff --git a/pkg/kubelet/eviction/types.go b/pkg/kubelet/eviction/types.go index 63d39fe875f..5725560a065 100644 --- a/pkg/kubelet/eviction/types.go +++ b/pkg/kubelet/eviction/types.go @@ -19,6 +19,7 @@ package eviction import ( "time" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/resource" ) @@ -49,3 +50,12 @@ type Threshold struct { // GracePeriod represents the amount of time that a threshold must be met before eviction is triggered. GracePeriod time.Duration } + +// KillPodFunc kills a pod. +// The pod status is updated, and then it is killed with the specified grace period. +// This function must block until either the pod is killed or an error is encountered. +// Arguments: +// pod - the pod to kill +// status - the desired status to associate with the pod (i.e. why its killed) +// gracePeriodOverride - the grace period override to use instead of what is on the pod spec +type KillPodFunc func(pod *api.Pod, status api.PodStatus, gracePeriodOverride *int64) error diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 1e66d39e4c9..d2ffe86f11e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -1698,7 +1698,30 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error { // // If any step if this workflow errors, the error is returned, and is repeated // on the next syncPod call. -func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error { +func (kl *Kubelet) syncPod(o syncPodOptions) error { + // pull out the required options + pod := o.pod + mirrorPod := o.mirrorPod + podStatus := o.podStatus + updateType := o.updateType + + // if we want to kill a pod, do it now! + if updateType == kubetypes.SyncPodKill { + killPodOptions := o.killPodOptions + if killPodOptions == nil || killPodOptions.PodStatusFunc == nil { + return fmt.Errorf("kill pod options are required if update type is kill") + } + apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus) + kl.statusManager.SetPodStatus(pod, apiPodStatus) + // we kill the pod with the specified grace period since this is a termination + if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil { + // there was an error killing the pod, so we return that error directly + utilruntime.HandleError(err) + return err + } + return nil + } + // Latency measurements for the main workflow are relative to the // (first time the pod was seen by the API server. var firstSeenTime time.Time @@ -1733,8 +1756,11 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont // Kill pod if it should not be running if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed { if err := kl.killPod(pod, nil, podStatus, nil); err != nil { + // there was an error killing the pod, so we return that error directly utilruntime.HandleError(err) + return err } + // there was no error killing the pod, but the pod cannot be run, so we return that err (if any) return err } @@ -2570,8 +2596,15 @@ func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mi return } // Run the sync in an async worker. - kl.podWorkers.UpdatePod(pod, mirrorPod, syncType, func() { - metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) + kl.podWorkers.UpdatePod(&UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: syncType, + OnCompleteFunc: func(err error) { + if err != nil { + metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start)) + } + }, }) // Note the number of containers for new pods. if syncType == kubetypes.SyncPodCreate { diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 03ea72ff813..7c9b7856900 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -3024,7 +3024,11 @@ func TestCreateMirrorPod(t *testing.T) { pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file" pods := []*api.Pod{pod} kl.podManager.SetPods(pods) - err := kl.syncPod(pod, nil, &kubecontainer.PodStatus{}, updateType) + err := kl.syncPod(syncPodOptions{ + pod: pod, + podStatus: &kubecontainer.PodStatus{}, + updateType: updateType, + }) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -3065,7 +3069,12 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) { pods := []*api.Pod{pod, mirrorPod} kl.podManager.SetPods(pods) - err := kl.syncPod(pod, mirrorPod, &kubecontainer.PodStatus{}, kubetypes.SyncPodUpdate) + err := kl.syncPod(syncPodOptions{ + pod: pod, + mirrorPod: mirrorPod, + podStatus: &kubecontainer.PodStatus{}, + updateType: kubetypes.SyncPodUpdate, + }) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -3223,7 +3232,11 @@ func TestHostNetworkAllowed(t *testing.T) { pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource kubelet.podManager.SetPods([]*api.Pod{pod}) - err := kubelet.syncPod(pod, nil, &kubecontainer.PodStatus{}, kubetypes.SyncPodUpdate) + err := kubelet.syncPod(syncPodOptions{ + pod: pod, + podStatus: &kubecontainer.PodStatus{}, + updateType: kubetypes.SyncPodUpdate, + }) if err != nil { t.Errorf("expected pod infra creation to succeed: %v", err) } @@ -3248,7 +3261,11 @@ func TestHostNetworkDisallowed(t *testing.T) { }) pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource - err := kubelet.syncPod(pod, nil, &kubecontainer.PodStatus{}, kubetypes.SyncPodUpdate) + err := kubelet.syncPod(syncPodOptions{ + pod: pod, + podStatus: &kubecontainer.PodStatus{}, + updateType: kubetypes.SyncPodUpdate, + }) if err == nil { t.Errorf("expected pod infra creation to fail") } @@ -3269,7 +3286,11 @@ func TestPrivilegeContainerAllowed(t *testing.T) { }) kubelet.podManager.SetPods([]*api.Pod{pod}) - err := kubelet.syncPod(pod, nil, &kubecontainer.PodStatus{}, kubetypes.SyncPodUpdate) + err := kubelet.syncPod(syncPodOptions{ + pod: pod, + podStatus: &kubecontainer.PodStatus{}, + updateType: kubetypes.SyncPodUpdate, + }) if err != nil { t.Errorf("expected pod infra creation to succeed: %v", err) } @@ -3289,7 +3310,11 @@ func TestPrivilegeContainerDisallowed(t *testing.T) { }, }) - err := kubelet.syncPod(pod, nil, &kubecontainer.PodStatus{}, kubetypes.SyncPodUpdate) + err := kubelet.syncPod(syncPodOptions{ + pod: pod, + podStatus: &kubecontainer.PodStatus{}, + updateType: kubetypes.SyncPodUpdate, + }) if err == nil { t.Errorf("expected pod infra creation to fail") } @@ -4260,3 +4285,44 @@ func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) { t.Fatalf("Expected message %v, but got %v", "because", apiStatus.Message) } } + +func TestSyncPodKillPod(t *testing.T) { + testKubelet := newTestKubelet(t) + kl := testKubelet.kubelet + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + UID: "12345678", + Name: "bar", + Namespace: "foo", + }, + } + pods := []*api.Pod{pod} + kl.podManager.SetPods(pods) + gracePeriodOverride := int64(0) + err := kl.syncPod(syncPodOptions{ + pod: pod, + podStatus: &kubecontainer.PodStatus{}, + updateType: kubetypes.SyncPodKill, + killPodOptions: &KillPodOptions{ + PodStatusFunc: func(p *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus { + return api.PodStatus{ + Phase: api.PodFailed, + Reason: "reason", + Message: "message", + } + }, + PodTerminationGracePeriodSecondsOverride: &gracePeriodOverride, + }, + }) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // Check pod status stored in the status map. + status, found := kl.statusManager.GetPodStatus(pod.UID) + if !found { + t.Fatalf("status of pod %q is not found in the status map", pod.UID) + } + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) + } +} diff --git a/pkg/kubelet/pod_workers.go b/pkg/kubelet/pod_workers.go index 15e7cf668b4..90d1c2c3d84 100644 --- a/pkg/kubelet/pod_workers.go +++ b/pkg/kubelet/pod_workers.go @@ -17,6 +17,7 @@ limitations under the License. package kubelet import ( + "fmt" "sync" "time" @@ -24,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/client/record" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/kubelet/eviction" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/util/queue" "k8s.io/kubernetes/pkg/types" @@ -31,14 +33,62 @@ import ( "k8s.io/kubernetes/pkg/util/wait" ) +// OnCompleteFunc is a function that is invoked when an operation completes. +// If err is non-nil, the operation did not complete successfully. +type OnCompleteFunc func(err error) + +// PodStatusFunc is a function that is invoked to generate a pod status. +type PodStatusFunc func(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus + +// KillPodOptions are options when performing a pod update whose update type is kill. +type KillPodOptions struct { + // PodStatusFunc is the function to invoke to set pod status in response to a kill request. + PodStatusFunc PodStatusFunc + // PodTerminationGracePeriodSecondsOverride is optional override to use if a pod is being killed as part of kill operation. + PodTerminationGracePeriodSecondsOverride *int64 +} + +// UpdatePodOptions is an options struct to pass to a UpdatePod operation. +type UpdatePodOptions struct { + // pod to update + Pod *api.Pod + // the mirror pod for the pod to update, if it is a static pod + MirrorPod *api.Pod + // the type of update (create, update, sync, kill) + UpdateType kubetypes.SyncPodType + // optional callback function when operation completes + // this callback is not guaranteed to be completed since a pod worker may + // drop update requests if it was fulfilling a previous request. this is + // only guaranteed to be invoked in response to a kill pod request which is + // always delivered. + OnCompleteFunc OnCompleteFunc + // if update type is kill, use the specified options to kill the pod. + KillPodOptions *KillPodOptions +} + // PodWorkers is an abstract interface for testability. type PodWorkers interface { - UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func()) + UpdatePod(options *UpdatePodOptions) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) ForgetWorker(uid types.UID) } -type syncPodFnType func(*api.Pod, *api.Pod, *kubecontainer.PodStatus, kubetypes.SyncPodType) error +// syncPodOptions provides the arguments to a SyncPod operation. +type syncPodOptions struct { + // the mirror pod for the pod to sync, if it is a static pod + mirrorPod *api.Pod + // pod to sync + pod *api.Pod + // the type of update (create, update, sync) + updateType kubetypes.SyncPodType + // the current status + podStatus *kubecontainer.PodStatus + // if update type is kill, use the specified options to kill the pod. + killPodOptions *KillPodOptions +} + +// the function to invoke to perform a sync. +type syncPodFnType func(options syncPodOptions) error const ( // jitter factor for resyncInterval @@ -54,14 +104,14 @@ type podWorkers struct { // Tracks all running per-pod goroutines - per-pod goroutine will be // processing updates received through its corresponding channel. - podUpdates map[types.UID]chan workUpdate + podUpdates map[types.UID]chan UpdatePodOptions // Track the current state of per-pod goroutines. // Currently all update request for a given pod coming when another // update of this pod is being processed are ignored. isWorking map[types.UID]bool // Tracks the last undelivered work item for this pod - a work item is // undelivered if it comes in while the worker is working. - lastUndeliveredWorkUpdate map[types.UID]workUpdate + lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions workQueue queue.WorkQueue @@ -83,26 +133,12 @@ type podWorkers struct { podCache kubecontainer.Cache } -type workUpdate struct { - // The pod state to reflect. - pod *api.Pod - - // The mirror pod of pod; nil if it does not exist. - mirrorPod *api.Pod - - // Function to call when the update is complete. - updateCompleteFn func() - - // A string describing the type of this update, eg: create - updateType kubetypes.SyncPodType -} - func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers { return &podWorkers{ - podUpdates: map[types.UID]chan workUpdate{}, + podUpdates: map[types.UID]chan UpdatePodOptions{}, isWorking: map[types.UID]bool{}, - lastUndeliveredWorkUpdate: map[types.UID]workUpdate{}, + lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{}, syncPodFn: syncPodFn, recorder: recorder, workQueue: workQueue, @@ -112,40 +148,52 @@ func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQ } } -func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) { +func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) { var lastSyncTime time.Time - for newWork := range podUpdates { + for update := range podUpdates { err := func() error { - podID := newWork.pod.UID + podUID := update.Pod.UID // This is a blocking call that would return only if the cache // has an entry for the pod that is newer than minRuntimeCache // Time. This ensures the worker doesn't start syncing until // after the cache is at least newer than the finished time of // the previous sync. - status, err := p.podCache.GetNewerThan(podID, lastSyncTime) + status, err := p.podCache.GetNewerThan(podUID, lastSyncTime) if err != nil { return err } - err = p.syncPodFn(newWork.pod, newWork.mirrorPod, status, newWork.updateType) + err = p.syncPodFn(syncPodOptions{ + mirrorPod: update.MirrorPod, + pod: update.Pod, + podStatus: status, + killPodOptions: update.KillPodOptions, + updateType: update.UpdateType, + }) lastSyncTime = time.Now() if err != nil { return err } - newWork.updateCompleteFn() return nil }() - if err != nil { - glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err) - p.recorder.Eventf(newWork.pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err) + // notify the call-back function if the operation succeeded or not + if update.OnCompleteFunc != nil { + update.OnCompleteFunc(err) } - p.wrapUp(newWork.pod.UID, err) + if err != nil { + glog.Errorf("Error syncing pod %s, skipping: %v", update.Pod.UID, err) + p.recorder.Eventf(update.Pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err) + } + p.wrapUp(update.Pod.UID, err) } } -// Apply the new setting to the specified pod. updateComplete is called when the update is completed. -func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func()) { +// Apply the new setting to the specified pod. +// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted. +// Update requests are ignored if a kill pod request is pending. +func (p *podWorkers) UpdatePod(options *UpdatePodOptions) { + pod := options.Pod uid := pod.UID - var podUpdates chan workUpdate + var podUpdates chan UpdatePodOptions var exists bool p.podLock.Lock() @@ -155,7 +203,7 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kube // puts an update into channel is called from the same goroutine where // the channel is consumed. However, it is guaranteed that in such case // the channel is empty, so buffer of size 1 is enough. - podUpdates = make(chan workUpdate, 1) + podUpdates = make(chan UpdatePodOptions, 1) p.podUpdates[uid] = podUpdates // Creating a new pod worker either means this is a new pod, or that the @@ -169,18 +217,12 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kube } if !p.isWorking[pod.UID] { p.isWorking[pod.UID] = true - podUpdates <- workUpdate{ - pod: pod, - mirrorPod: mirrorPod, - updateCompleteFn: updateComplete, - updateType: updateType, - } + podUpdates <- *options } else { - p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{ - pod: pod, - mirrorPod: mirrorPod, - updateCompleteFn: updateComplete, - updateType: updateType, + // if a request to kill a pod is pending, we do not let anything overwrite that request. + update, found := p.lastUndeliveredWorkUpdate[pod.UID] + if !found || update.UpdateType != kubetypes.SyncPodKill { + p.lastUndeliveredWorkUpdate[pod.UID] = *options } } } @@ -236,3 +278,53 @@ func (p *podWorkers) checkForUpdates(uid types.UID) { p.isWorking[uid] = false } } + +// killPodNow returns a KillPodFunc that can be used to kill a pod. +// It is intended to be injected into other modules that need to kill a pod. +func killPodNow(podWorkers PodWorkers) eviction.KillPodFunc { + return func(pod *api.Pod, status api.PodStatus, gracePeriodOverride *int64) error { + // determine the grace period to use when killing the pod + gracePeriod := int64(0) + if gracePeriodOverride != nil { + gracePeriod = *gracePeriodOverride + } else if pod.Spec.TerminationGracePeriodSeconds != nil { + gracePeriod = *pod.Spec.TerminationGracePeriodSeconds + } + + // we timeout and return an error if we dont get a callback within a reasonable time. + // the default timeout is relative to the grace period (we settle on 2s to wait for kubelet->runtime traffic to complete in sigkill) + timeout := int64(gracePeriod + (gracePeriod / 2)) + minTimeout := int64(2) + if timeout < minTimeout { + timeout = minTimeout + } + timeoutDuration := time.Duration(timeout) * time.Second + + // open a channel we block against until we get a result + type response struct { + err error + } + ch := make(chan response) + podWorkers.UpdatePod(&UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodKill, + OnCompleteFunc: func(err error) { + ch <- response{err: err} + }, + KillPodOptions: &KillPodOptions{ + PodStatusFunc: func(p *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus { + return status + }, + PodTerminationGracePeriodSecondsOverride: gracePeriodOverride, + }, + }) + + // wait for either a response, or a timeout + select { + case r := <-ch: + return r.err + case <-time.After(timeoutDuration): + return fmt.Errorf("timeout waiting to kill pod") + } + } +} diff --git a/pkg/kubelet/pod_workers_test.go b/pkg/kubelet/pod_workers_test.go index a81dbb6a439..655c835a55c 100644 --- a/pkg/kubelet/pod_workers_test.go +++ b/pkg/kubelet/pod_workers_test.go @@ -40,12 +40,18 @@ type fakePodWorkers struct { t TestingInterface } -func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func()) { - status, err := f.cache.Get(pod.UID) +func (f *fakePodWorkers) UpdatePod(options *UpdatePodOptions) { + status, err := f.cache.Get(options.Pod.UID) if err != nil { f.t.Errorf("Unexpected error: %v", err) } - if err := f.syncPodFn(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil { + if err := f.syncPodFn(syncPodOptions{ + mirrorPod: options.MirrorPod, + pod: options.Pod, + podStatus: status, + updateType: options.UpdateType, + killPodOptions: options.KillPodOptions, + }); err != nil { f.t.Errorf("Unexpected error: %v", err) } } @@ -67,18 +73,28 @@ func newPod(uid, name string) *api.Pod { } } -func createPodWorkers() (*podWorkers, map[types.UID][]string) { +// syncPodRecord is a record of a sync pod call +type syncPodRecord struct { + name string + updateType kubetypes.SyncPodType +} + +func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) { lock := sync.Mutex{} - processed := make(map[types.UID][]string) + processed := make(map[types.UID][]syncPodRecord) fakeRecorder := &record.FakeRecorder{} fakeRuntime := &containertest.FakeRuntime{} fakeCache := containertest.NewFakeCache(fakeRuntime) podWorkers := newPodWorkers( - func(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error { + func(options syncPodOptions) error { func() { lock.Lock() defer lock.Unlock() - processed[pod.UID] = append(processed[pod.UID], pod.Name) + pod := options.pod + processed[pod.UID] = append(processed[pod.UID], syncPodRecord{ + name: pod.Name, + updateType: options.updateType, + }) }() return nil }, @@ -115,12 +131,15 @@ func TestUpdatePod(t *testing.T) { numPods := 20 for i := 0; i < numPods; i++ { for j := i; j < numPods; j++ { - podWorkers.UpdatePod(newPod(string(j), string(i)), nil, kubetypes.SyncPodCreate, func() {}) + podWorkers.UpdatePod(&UpdatePodOptions{ + Pod: newPod(string(j), string(i)), + UpdateType: kubetypes.SyncPodCreate, + }) } } drainWorkers(podWorkers, numPods) - if len(processed) != 20 { + if len(processed) != numPods { t.Errorf("Not all pods processed: %v", len(processed)) return } @@ -133,22 +152,65 @@ func TestUpdatePod(t *testing.T) { first := 0 last := len(processed[uid]) - 1 - if processed[uid][first] != string(0) { + if processed[uid][first].name != string(0) { t.Errorf("Pod %v: incorrect order %v, %v", i, first, processed[uid][first]) } - if processed[uid][last] != string(i) { + if processed[uid][last].name != string(i) { t.Errorf("Pod %v: incorrect order %v, %v", i, last, processed[uid][last]) } } } +func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) { + podWorkers, processed := createPodWorkers() + numPods := 20 + for i := 0; i < numPods; i++ { + pod := newPod(string(i), string(i)) + podWorkers.UpdatePod(&UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodCreate, + }) + podWorkers.UpdatePod(&UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodKill, + }) + podWorkers.UpdatePod(&UpdatePodOptions{ + Pod: pod, + UpdateType: kubetypes.SyncPodUpdate, + }) + } + drainWorkers(podWorkers, numPods) + if len(processed) != numPods { + t.Errorf("Not all pods processed: %v", len(processed)) + return + } + for i := 0; i < numPods; i++ { + uid := types.UID(i) + // each pod should be processed two times (create, kill, but not update) + syncPodRecords := processed[uid] + if len(syncPodRecords) < 2 { + t.Errorf("Pod %v processed %v times, but expected at least 2", i, len(syncPodRecords)) + continue + } + if syncPodRecords[0].updateType != kubetypes.SyncPodCreate { + t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[0].updateType, kubetypes.SyncPodCreate) + } + if syncPodRecords[1].updateType != kubetypes.SyncPodKill { + t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[1].updateType, kubetypes.SyncPodKill) + } + } +} + func TestForgetNonExistingPodWorkers(t *testing.T) { podWorkers, _ := createPodWorkers() numPods := 20 for i := 0; i < numPods; i++ { - podWorkers.UpdatePod(newPod(string(i), "name"), nil, kubetypes.SyncPodUpdate, func() {}) + podWorkers.UpdatePod(&UpdatePodOptions{ + Pod: newPod(string(i), "name"), + UpdateType: kubetypes.SyncPodUpdate, + }) } drainWorkers(podWorkers, numPods) @@ -183,13 +245,13 @@ type simpleFakeKubelet struct { wg sync.WaitGroup } -func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error { - kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status +func (kl *simpleFakeKubelet) syncPod(options syncPodOptions) error { + kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus return nil } -func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error { - kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status +func (kl *simpleFakeKubelet) syncPodWithWaitGroup(options syncPodOptions) error { + kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus kl.wg.Done() return nil } @@ -240,8 +302,16 @@ func TestFakePodWorkers(t *testing.T) { for i, tt := range tests { kubeletForRealWorkers.wg.Add(1) - realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {}) - fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {}) + realPodWorkers.UpdatePod(&UpdatePodOptions{ + Pod: tt.pod, + MirrorPod: tt.mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + }) + fakePodWorkers.UpdatePod(&UpdatePodOptions{ + Pod: tt.pod, + MirrorPod: tt.mirrorPod, + UpdateType: kubetypes.SyncPodUpdate, + }) kubeletForRealWorkers.wg.Wait() @@ -258,3 +328,26 @@ func TestFakePodWorkers(t *testing.T) { } } } + +// TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected. +func TestKillPodNowFunc(t *testing.T) { + podWorkers, processed := createPodWorkers() + killPodFunc := killPodNow(podWorkers) + pod := newPod("test", "test") + gracePeriodOverride := int64(0) + err := killPodFunc(pod, api.PodStatus{Phase: api.PodFailed, Reason: "reason", Message: "message"}, &gracePeriodOverride) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if len(processed) != 1 { + t.Errorf("len(processed) expected: %v, actual: %v", 1, len(processed)) + return + } + syncPodRecords := processed[pod.UID] + if len(syncPodRecords) != 1 { + t.Errorf("Pod processed %v times, but expected %v", len(syncPodRecords), 1) + } + if syncPodRecords[0].updateType != kubetypes.SyncPodKill { + t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill) + } +} diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index c959dee3b2b..2e14d02c6d1 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -123,8 +123,12 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error { glog.Errorf("Failed creating a mirror pod %q: %v", format.Pod(pod), err) } mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - - if err = kl.syncPod(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil { + if err = kl.syncPod(syncPodOptions{ + pod: pod, + mirrorPod: mirrorPod, + podStatus: status, + updateType: kubetypes.SyncPodUpdate, + }); err != nil { return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err) } if retry >= runOnceMaxRetries { diff --git a/pkg/kubelet/types/pod_update.go b/pkg/kubelet/types/pod_update.go index f93576445f4..88bcb54b32e 100644 --- a/pkg/kubelet/types/pod_update.go +++ b/pkg/kubelet/types/pod_update.go @@ -104,9 +104,15 @@ func GetPodSource(pod *api.Pod) (string, error) { type SyncPodType int const ( + // SyncPodSync is when the pod is synced to ensure desired state SyncPodSync SyncPodType = iota + // SyncPodUpdate is when the pod is updated from source SyncPodUpdate + // SyncPodCreate is when the pod is created from source SyncPodCreate + // SyncPodKill is when the pod is killed based on a trigger internal to the kubelet for eviction. + // If a SyncPodKill request is made to pod workers, the request is never dropped, and will always be processed. + SyncPodKill ) func (sp SyncPodType) String() string { @@ -117,6 +123,8 @@ func (sp SyncPodType) String() string { return "update" case SyncPodSync: return "sync" + case SyncPodKill: + return "kill" default: return "unknown" }