Merge pull request #25065 from derekwaynecarr/pod_worker_updates

Automatic merge from submit-queue

PodWorkers UpdatePod takes options struct

First commit from https://github.com/kubernetes/kubernetes/pull/24843 

Second commit:
The `PodWorkers.UpdatePod` operation is updated as follows:
* use options struct to pass arguments
* add a pod status func to allow override status
* add pod termination grace period if sync operation requires a kill pod
* add a call-back that is error aware

Third commit:
Add a `killPodNow` to kubelet that does a blocking kill pod call that properly integrates with pod workers.

The plan is to pass `killPodNow` as a function pointer into the out of resource killer.

```
// KillPodFunc kills a pod.
// The pod status is updated, and then it is killed with the specified grace period.
// This function must block until either the pod is killed or an error is encountered.
// Arguments:
// pod - the pod to kill
// status - the desired status to associate with the pod (i.e. why its killed)
// gracePeriodOverride - the grace period override to use instead of what is on the pod spec
type KillPodFunc func(pod *api.Pod, status api.PodStatus, gracePeriodOverride *int64) error
```

You can see it being used here in the WIP out of resource killer PR.

1344f858fb (diff-92ff0f643237f29824b4929574f84609R277)

/cc @vishh @yujuhong @pmorie
This commit is contained in:
k8s-merge-robot 2016-05-12 19:50:26 -07:00
commit a503bcd78e
7 changed files with 380 additions and 74 deletions

View File

@ -19,6 +19,7 @@ package eviction
import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)
@ -49,3 +50,12 @@ type Threshold struct {
// GracePeriod represents the amount of time that a threshold must be met before eviction is triggered.
GracePeriod time.Duration
}
// KillPodFunc kills a pod.
// The pod status is updated, and then it is killed with the specified grace period.
// This function must block until either the pod is killed or an error is encountered.
// Arguments:
// pod - the pod to kill
// status - the desired status to associate with the pod (i.e. why its killed)
// gracePeriodOverride - the grace period override to use instead of what is on the pod spec
type KillPodFunc func(pod *api.Pod, status api.PodStatus, gracePeriodOverride *int64) error

View File

@ -1698,7 +1698,30 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
//
// If any step if this workflow errors, the error is returned, and is repeated
// on the next syncPod call.
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
func (kl *Kubelet) syncPod(o syncPodOptions) error {
// pull out the required options
pod := o.pod
mirrorPod := o.mirrorPod
podStatus := o.podStatus
updateType := o.updateType
// if we want to kill a pod, do it now!
if updateType == kubetypes.SyncPodKill {
killPodOptions := o.killPodOptions
if killPodOptions == nil || killPodOptions.PodStatusFunc == nil {
return fmt.Errorf("kill pod options are required if update type is kill")
}
apiPodStatus := killPodOptions.PodStatusFunc(pod, podStatus)
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// we kill the pod with the specified grace period since this is a termination
if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil {
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
return nil
}
// Latency measurements for the main workflow are relative to the
// (first time the pod was seen by the API server.
var firstSeenTime time.Time
@ -1733,8 +1756,11 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecont
// Kill pod if it should not be running
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil || apiPodStatus.Phase == api.PodFailed {
if err := kl.killPod(pod, nil, podStatus, nil); err != nil {
// there was an error killing the pod, so we return that error directly
utilruntime.HandleError(err)
return err
}
// there was no error killing the pod, but the pod cannot be run, so we return that err (if any)
return err
}
@ -2570,8 +2596,15 @@ func (kl *Kubelet) dispatchWork(pod *api.Pod, syncType kubetypes.SyncPodType, mi
return
}
// Run the sync in an async worker.
kl.podWorkers.UpdatePod(pod, mirrorPod, syncType, func() {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
kl.podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
MirrorPod: mirrorPod,
UpdateType: syncType,
OnCompleteFunc: func(err error) {
if err != nil {
metrics.PodWorkerLatency.WithLabelValues(syncType.String()).Observe(metrics.SinceInMicroseconds(start))
}
},
})
// Note the number of containers for new pods.
if syncType == kubetypes.SyncPodCreate {

View File

@ -3024,7 +3024,11 @@ func TestCreateMirrorPod(t *testing.T) {
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = "file"
pods := []*api.Pod{pod}
kl.podManager.SetPods(pods)
err := kl.syncPod(pod, nil, &kubecontainer.PodStatus{}, updateType)
err := kl.syncPod(syncPodOptions{
pod: pod,
podStatus: &kubecontainer.PodStatus{},
updateType: updateType,
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -3065,7 +3069,12 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
pods := []*api.Pod{pod, mirrorPod}
kl.podManager.SetPods(pods)
err := kl.syncPod(pod, mirrorPod, &kubecontainer.PodStatus{}, kubetypes.SyncPodUpdate)
err := kl.syncPod(syncPodOptions{
pod: pod,
mirrorPod: mirrorPod,
podStatus: &kubecontainer.PodStatus{},
updateType: kubetypes.SyncPodUpdate,
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -3223,7 +3232,11 @@ func TestHostNetworkAllowed(t *testing.T) {
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource
kubelet.podManager.SetPods([]*api.Pod{pod})
err := kubelet.syncPod(pod, nil, &kubecontainer.PodStatus{}, kubetypes.SyncPodUpdate)
err := kubelet.syncPod(syncPodOptions{
pod: pod,
podStatus: &kubecontainer.PodStatus{},
updateType: kubetypes.SyncPodUpdate,
})
if err != nil {
t.Errorf("expected pod infra creation to succeed: %v", err)
}
@ -3248,7 +3261,11 @@ func TestHostNetworkDisallowed(t *testing.T) {
})
pod.Annotations[kubetypes.ConfigSourceAnnotationKey] = kubetypes.FileSource
err := kubelet.syncPod(pod, nil, &kubecontainer.PodStatus{}, kubetypes.SyncPodUpdate)
err := kubelet.syncPod(syncPodOptions{
pod: pod,
podStatus: &kubecontainer.PodStatus{},
updateType: kubetypes.SyncPodUpdate,
})
if err == nil {
t.Errorf("expected pod infra creation to fail")
}
@ -3269,7 +3286,11 @@ func TestPrivilegeContainerAllowed(t *testing.T) {
})
kubelet.podManager.SetPods([]*api.Pod{pod})
err := kubelet.syncPod(pod, nil, &kubecontainer.PodStatus{}, kubetypes.SyncPodUpdate)
err := kubelet.syncPod(syncPodOptions{
pod: pod,
podStatus: &kubecontainer.PodStatus{},
updateType: kubetypes.SyncPodUpdate,
})
if err != nil {
t.Errorf("expected pod infra creation to succeed: %v", err)
}
@ -3289,7 +3310,11 @@ func TestPrivilegeContainerDisallowed(t *testing.T) {
},
})
err := kubelet.syncPod(pod, nil, &kubecontainer.PodStatus{}, kubetypes.SyncPodUpdate)
err := kubelet.syncPod(syncPodOptions{
pod: pod,
podStatus: &kubecontainer.PodStatus{},
updateType: kubetypes.SyncPodUpdate,
})
if err == nil {
t.Errorf("expected pod infra creation to fail")
}
@ -4260,3 +4285,44 @@ func TestGenerateAPIPodStatusInvokesPodSyncHandlers(t *testing.T) {
t.Fatalf("Expected message %v, but got %v", "because", apiStatus.Message)
}
}
func TestSyncPodKillPod(t *testing.T) {
testKubelet := newTestKubelet(t)
kl := testKubelet.kubelet
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
UID: "12345678",
Name: "bar",
Namespace: "foo",
},
}
pods := []*api.Pod{pod}
kl.podManager.SetPods(pods)
gracePeriodOverride := int64(0)
err := kl.syncPod(syncPodOptions{
pod: pod,
podStatus: &kubecontainer.PodStatus{},
updateType: kubetypes.SyncPodKill,
killPodOptions: &KillPodOptions{
PodStatusFunc: func(p *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
return api.PodStatus{
Phase: api.PodFailed,
Reason: "reason",
Message: "message",
}
},
PodTerminationGracePeriodSecondsOverride: &gracePeriodOverride,
},
})
if err != nil {
t.Errorf("unexpected error: %v", err)
}
// Check pod status stored in the status map.
status, found := kl.statusManager.GetPodStatus(pod.UID)
if !found {
t.Fatalf("status of pod %q is not found in the status map", pod.UID)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package kubelet
import (
"fmt"
"sync"
"time"
@ -24,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/record"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/eviction"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/types"
@ -31,14 +33,62 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
)
// OnCompleteFunc is a function that is invoked when an operation completes.
// If err is non-nil, the operation did not complete successfully.
type OnCompleteFunc func(err error)
// PodStatusFunc is a function that is invoked to generate a pod status.
type PodStatusFunc func(pod *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus
// KillPodOptions are options when performing a pod update whose update type is kill.
type KillPodOptions struct {
// PodStatusFunc is the function to invoke to set pod status in response to a kill request.
PodStatusFunc PodStatusFunc
// PodTerminationGracePeriodSecondsOverride is optional override to use if a pod is being killed as part of kill operation.
PodTerminationGracePeriodSecondsOverride *int64
}
// UpdatePodOptions is an options struct to pass to a UpdatePod operation.
type UpdatePodOptions struct {
// pod to update
Pod *api.Pod
// the mirror pod for the pod to update, if it is a static pod
MirrorPod *api.Pod
// the type of update (create, update, sync, kill)
UpdateType kubetypes.SyncPodType
// optional callback function when operation completes
// this callback is not guaranteed to be completed since a pod worker may
// drop update requests if it was fulfilling a previous request. this is
// only guaranteed to be invoked in response to a kill pod request which is
// always delivered.
OnCompleteFunc OnCompleteFunc
// if update type is kill, use the specified options to kill the pod.
KillPodOptions *KillPodOptions
}
// PodWorkers is an abstract interface for testability.
type PodWorkers interface {
UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func())
UpdatePod(options *UpdatePodOptions)
ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty)
ForgetWorker(uid types.UID)
}
type syncPodFnType func(*api.Pod, *api.Pod, *kubecontainer.PodStatus, kubetypes.SyncPodType) error
// syncPodOptions provides the arguments to a SyncPod operation.
type syncPodOptions struct {
// the mirror pod for the pod to sync, if it is a static pod
mirrorPod *api.Pod
// pod to sync
pod *api.Pod
// the type of update (create, update, sync)
updateType kubetypes.SyncPodType
// the current status
podStatus *kubecontainer.PodStatus
// if update type is kill, use the specified options to kill the pod.
killPodOptions *KillPodOptions
}
// the function to invoke to perform a sync.
type syncPodFnType func(options syncPodOptions) error
const (
// jitter factor for resyncInterval
@ -54,14 +104,14 @@ type podWorkers struct {
// Tracks all running per-pod goroutines - per-pod goroutine will be
// processing updates received through its corresponding channel.
podUpdates map[types.UID]chan workUpdate
podUpdates map[types.UID]chan UpdatePodOptions
// Track the current state of per-pod goroutines.
// Currently all update request for a given pod coming when another
// update of this pod is being processed are ignored.
isWorking map[types.UID]bool
// Tracks the last undelivered work item for this pod - a work item is
// undelivered if it comes in while the worker is working.
lastUndeliveredWorkUpdate map[types.UID]workUpdate
lastUndeliveredWorkUpdate map[types.UID]UpdatePodOptions
workQueue queue.WorkQueue
@ -83,26 +133,12 @@ type podWorkers struct {
podCache kubecontainer.Cache
}
type workUpdate struct {
// The pod state to reflect.
pod *api.Pod
// The mirror pod of pod; nil if it does not exist.
mirrorPod *api.Pod
// Function to call when the update is complete.
updateCompleteFn func()
// A string describing the type of this update, eg: create
updateType kubetypes.SyncPodType
}
func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
podUpdates: map[types.UID]chan UpdatePodOptions{},
isWorking: map[types.UID]bool{},
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
lastUndeliveredWorkUpdate: map[types.UID]UpdatePodOptions{},
syncPodFn: syncPodFn,
recorder: recorder,
workQueue: workQueue,
@ -112,40 +148,52 @@ func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQ
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
func (p *podWorkers) managePodLoop(podUpdates <-chan UpdatePodOptions) {
var lastSyncTime time.Time
for newWork := range podUpdates {
for update := range podUpdates {
err := func() error {
podID := newWork.pod.UID
podUID := update.Pod.UID
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
status, err := p.podCache.GetNewerThan(podID, lastSyncTime)
status, err := p.podCache.GetNewerThan(podUID, lastSyncTime)
if err != nil {
return err
}
err = p.syncPodFn(newWork.pod, newWork.mirrorPod, status, newWork.updateType)
err = p.syncPodFn(syncPodOptions{
mirrorPod: update.MirrorPod,
pod: update.Pod,
podStatus: status,
killPodOptions: update.KillPodOptions,
updateType: update.UpdateType,
})
lastSyncTime = time.Now()
if err != nil {
return err
}
newWork.updateCompleteFn()
return nil
}()
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err)
// notify the call-back function if the operation succeeded or not
if update.OnCompleteFunc != nil {
update.OnCompleteFunc(err)
}
p.wrapUp(newWork.pod.UID, err)
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", update.Pod.UID, err)
p.recorder.Eventf(update.Pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err)
}
p.wrapUp(update.Pod.UID, err)
}
}
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func()) {
// Apply the new setting to the specified pod.
// If the options provide an OnCompleteFunc, the function is invoked if the update is accepted.
// Update requests are ignored if a kill pod request is pending.
func (p *podWorkers) UpdatePod(options *UpdatePodOptions) {
pod := options.Pod
uid := pod.UID
var podUpdates chan workUpdate
var podUpdates chan UpdatePodOptions
var exists bool
p.podLock.Lock()
@ -155,7 +203,7 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kube
// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
// the channel is empty, so buffer of size 1 is enough.
podUpdates = make(chan workUpdate, 1)
podUpdates = make(chan UpdatePodOptions, 1)
p.podUpdates[uid] = podUpdates
// Creating a new pod worker either means this is a new pod, or that the
@ -169,18 +217,12 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kube
}
if !p.isWorking[pod.UID] {
p.isWorking[pod.UID] = true
podUpdates <- workUpdate{
pod: pod,
mirrorPod: mirrorPod,
updateCompleteFn: updateComplete,
updateType: updateType,
}
podUpdates <- *options
} else {
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
pod: pod,
mirrorPod: mirrorPod,
updateCompleteFn: updateComplete,
updateType: updateType,
// if a request to kill a pod is pending, we do not let anything overwrite that request.
update, found := p.lastUndeliveredWorkUpdate[pod.UID]
if !found || update.UpdateType != kubetypes.SyncPodKill {
p.lastUndeliveredWorkUpdate[pod.UID] = *options
}
}
}
@ -236,3 +278,53 @@ func (p *podWorkers) checkForUpdates(uid types.UID) {
p.isWorking[uid] = false
}
}
// killPodNow returns a KillPodFunc that can be used to kill a pod.
// It is intended to be injected into other modules that need to kill a pod.
func killPodNow(podWorkers PodWorkers) eviction.KillPodFunc {
return func(pod *api.Pod, status api.PodStatus, gracePeriodOverride *int64) error {
// determine the grace period to use when killing the pod
gracePeriod := int64(0)
if gracePeriodOverride != nil {
gracePeriod = *gracePeriodOverride
} else if pod.Spec.TerminationGracePeriodSeconds != nil {
gracePeriod = *pod.Spec.TerminationGracePeriodSeconds
}
// we timeout and return an error if we dont get a callback within a reasonable time.
// the default timeout is relative to the grace period (we settle on 2s to wait for kubelet->runtime traffic to complete in sigkill)
timeout := int64(gracePeriod + (gracePeriod / 2))
minTimeout := int64(2)
if timeout < minTimeout {
timeout = minTimeout
}
timeoutDuration := time.Duration(timeout) * time.Second
// open a channel we block against until we get a result
type response struct {
err error
}
ch := make(chan response)
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
OnCompleteFunc: func(err error) {
ch <- response{err: err}
},
KillPodOptions: &KillPodOptions{
PodStatusFunc: func(p *api.Pod, podStatus *kubecontainer.PodStatus) api.PodStatus {
return status
},
PodTerminationGracePeriodSecondsOverride: gracePeriodOverride,
},
})
// wait for either a response, or a timeout
select {
case r := <-ch:
return r.err
case <-time.After(timeoutDuration):
return fmt.Errorf("timeout waiting to kill pod")
}
}
}

View File

@ -40,12 +40,18 @@ type fakePodWorkers struct {
t TestingInterface
}
func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func()) {
status, err := f.cache.Get(pod.UID)
func (f *fakePodWorkers) UpdatePod(options *UpdatePodOptions) {
status, err := f.cache.Get(options.Pod.UID)
if err != nil {
f.t.Errorf("Unexpected error: %v", err)
}
if err := f.syncPodFn(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil {
if err := f.syncPodFn(syncPodOptions{
mirrorPod: options.MirrorPod,
pod: options.Pod,
podStatus: status,
updateType: options.UpdateType,
killPodOptions: options.KillPodOptions,
}); err != nil {
f.t.Errorf("Unexpected error: %v", err)
}
}
@ -67,18 +73,28 @@ func newPod(uid, name string) *api.Pod {
}
}
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
// syncPodRecord is a record of a sync pod call
type syncPodRecord struct {
name string
updateType kubetypes.SyncPodType
}
func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
lock := sync.Mutex{}
processed := make(map[types.UID][]string)
processed := make(map[types.UID][]syncPodRecord)
fakeRecorder := &record.FakeRecorder{}
fakeRuntime := &containertest.FakeRuntime{}
fakeCache := containertest.NewFakeCache(fakeRuntime)
podWorkers := newPodWorkers(
func(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
func(options syncPodOptions) error {
func() {
lock.Lock()
defer lock.Unlock()
processed[pod.UID] = append(processed[pod.UID], pod.Name)
pod := options.pod
processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
name: pod.Name,
updateType: options.updateType,
})
}()
return nil
},
@ -115,12 +131,15 @@ func TestUpdatePod(t *testing.T) {
numPods := 20
for i := 0; i < numPods; i++ {
for j := i; j < numPods; j++ {
podWorkers.UpdatePod(newPod(string(j), string(i)), nil, kubetypes.SyncPodCreate, func() {})
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: newPod(string(j), string(i)),
UpdateType: kubetypes.SyncPodCreate,
})
}
}
drainWorkers(podWorkers, numPods)
if len(processed) != 20 {
if len(processed) != numPods {
t.Errorf("Not all pods processed: %v", len(processed))
return
}
@ -133,22 +152,65 @@ func TestUpdatePod(t *testing.T) {
first := 0
last := len(processed[uid]) - 1
if processed[uid][first] != string(0) {
if processed[uid][first].name != string(0) {
t.Errorf("Pod %v: incorrect order %v, %v", i, first, processed[uid][first])
}
if processed[uid][last] != string(i) {
if processed[uid][last].name != string(i) {
t.Errorf("Pod %v: incorrect order %v, %v", i, last, processed[uid][last])
}
}
}
func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) {
podWorkers, processed := createPodWorkers()
numPods := 20
for i := 0; i < numPods; i++ {
pod := newPod(string(i), string(i))
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodCreate,
})
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodKill,
})
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: pod,
UpdateType: kubetypes.SyncPodUpdate,
})
}
drainWorkers(podWorkers, numPods)
if len(processed) != numPods {
t.Errorf("Not all pods processed: %v", len(processed))
return
}
for i := 0; i < numPods; i++ {
uid := types.UID(i)
// each pod should be processed two times (create, kill, but not update)
syncPodRecords := processed[uid]
if len(syncPodRecords) < 2 {
t.Errorf("Pod %v processed %v times, but expected at least 2", i, len(syncPodRecords))
continue
}
if syncPodRecords[0].updateType != kubetypes.SyncPodCreate {
t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[0].updateType, kubetypes.SyncPodCreate)
}
if syncPodRecords[1].updateType != kubetypes.SyncPodKill {
t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[1].updateType, kubetypes.SyncPodKill)
}
}
}
func TestForgetNonExistingPodWorkers(t *testing.T) {
podWorkers, _ := createPodWorkers()
numPods := 20
for i := 0; i < numPods; i++ {
podWorkers.UpdatePod(newPod(string(i), "name"), nil, kubetypes.SyncPodUpdate, func() {})
podWorkers.UpdatePod(&UpdatePodOptions{
Pod: newPod(string(i), "name"),
UpdateType: kubetypes.SyncPodUpdate,
})
}
drainWorkers(podWorkers, numPods)
@ -183,13 +245,13 @@ type simpleFakeKubelet struct {
wg sync.WaitGroup
}
func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status
func (kl *simpleFakeKubelet) syncPod(options syncPodOptions) error {
kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
return nil
}
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(options syncPodOptions) error {
kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
kl.wg.Done()
return nil
}
@ -240,8 +302,16 @@ func TestFakePodWorkers(t *testing.T) {
for i, tt := range tests {
kubeletForRealWorkers.wg.Add(1)
realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {})
fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {})
realPodWorkers.UpdatePod(&UpdatePodOptions{
Pod: tt.pod,
MirrorPod: tt.mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
})
fakePodWorkers.UpdatePod(&UpdatePodOptions{
Pod: tt.pod,
MirrorPod: tt.mirrorPod,
UpdateType: kubetypes.SyncPodUpdate,
})
kubeletForRealWorkers.wg.Wait()
@ -258,3 +328,26 @@ func TestFakePodWorkers(t *testing.T) {
}
}
}
// TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected.
func TestKillPodNowFunc(t *testing.T) {
podWorkers, processed := createPodWorkers()
killPodFunc := killPodNow(podWorkers)
pod := newPod("test", "test")
gracePeriodOverride := int64(0)
err := killPodFunc(pod, api.PodStatus{Phase: api.PodFailed, Reason: "reason", Message: "message"}, &gracePeriodOverride)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(processed) != 1 {
t.Errorf("len(processed) expected: %v, actual: %v", 1, len(processed))
return
}
syncPodRecords := processed[pod.UID]
if len(syncPodRecords) != 1 {
t.Errorf("Pod processed %v times, but expected %v", len(syncPodRecords), 1)
}
if syncPodRecords[0].updateType != kubetypes.SyncPodKill {
t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill)
}
}

View File

@ -123,8 +123,12 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error {
glog.Errorf("Failed creating a mirror pod %q: %v", format.Pod(pod), err)
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
if err = kl.syncPod(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil {
if err = kl.syncPod(syncPodOptions{
pod: pod,
mirrorPod: mirrorPod,
podStatus: status,
updateType: kubetypes.SyncPodUpdate,
}); err != nil {
return fmt.Errorf("error syncing pod %q: %v", format.Pod(pod), err)
}
if retry >= runOnceMaxRetries {

View File

@ -104,9 +104,15 @@ func GetPodSource(pod *api.Pod) (string, error) {
type SyncPodType int
const (
// SyncPodSync is when the pod is synced to ensure desired state
SyncPodSync SyncPodType = iota
// SyncPodUpdate is when the pod is updated from source
SyncPodUpdate
// SyncPodCreate is when the pod is created from source
SyncPodCreate
// SyncPodKill is when the pod is killed based on a trigger internal to the kubelet for eviction.
// If a SyncPodKill request is made to pod workers, the request is never dropped, and will always be processed.
SyncPodKill
)
func (sp SyncPodType) String() string {
@ -117,6 +123,8 @@ func (sp SyncPodType) String() string {
return "update"
case SyncPodSync:
return "sync"
case SyncPodKill:
return "kill"
default:
return "unknown"
}