Merge pull request #107900 from smarterclayton/pr-107854

kubelet: Pods that have terminated before starting should not block startup
This commit is contained in:
Kubernetes Prow Robot 2022-02-02 15:51:45 -08:00 committed by GitHub
commit baad1caee9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 938 additions and 58 deletions

View File

@ -392,6 +392,11 @@ type podWorkers struct {
syncTerminatingPodFn syncTerminatingPodFnType
syncTerminatedPodFn syncTerminatedPodFnType
// workerChannelFn is exposed for testing to allow unit tests to impose delays
// in channel communication. The function is invoked once each time a new worker
// goroutine starts.
workerChannelFn func(uid types.UID, in chan podWork) (out <-chan podWork)
// The EventRecorder to use
recorder record.EventRecorder
@ -699,9 +704,8 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
}
// start the pod worker goroutine if it doesn't exist
var podUpdates chan podWork
var exists bool
if podUpdates, exists = p.podUpdates[uid]; !exists {
podUpdates, exists := p.podUpdates[uid]
if !exists {
// We need to have a buffer here, because checkForUpdates() method that
// puts an update into channel is called from the same goroutine where
// the channel is consumed. However, it is guaranteed that in such case
@ -715,13 +719,21 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
}
// allow testing of delays in the pod update channel
var outCh <-chan podWork
if p.workerChannelFn != nil {
outCh = p.workerChannelFn(uid, podUpdates)
} else {
outCh = podUpdates
}
// Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
// comment in syncPod.
go func() {
defer runtime.HandleCrash()
p.managePodLoop(podUpdates)
p.managePodLoop(outCh)
}()
}
@ -785,28 +797,31 @@ func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *
}
// allowPodStart tries to start the pod and returns true if allowed, otherwise
// it requeues the pod and returns false.
func (p *podWorkers) allowPodStart(pod *v1.Pod) bool {
// it requeues the pod and returns false. If the pod will never be able to start
// because data is missing, or the pod was terminated before start, canEverStart
// is false.
func (p *podWorkers) allowPodStart(pod *v1.Pod) (canStart bool, canEverStart bool) {
if !kubetypes.IsStaticPod(pod) {
// TBD: Do we want to allow non-static pods with the same full name?
// TODO: Do we want to allow non-static pods with the same full name?
// Note that it may disable the force deletion of pods.
return true
return true, true
}
p.podLock.Lock()
defer p.podLock.Unlock()
status, ok := p.podSyncStatuses[pod.UID]
if !ok {
klog.ErrorS(nil, "Failed to get a valid podSyncStatuses", "pod", klog.KObj(pod), "podUID", pod.UID)
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
status.working = false
return false
klog.ErrorS(nil, "Pod sync status does not exist, the worker should not be running", "pod", klog.KObj(pod), "podUID", pod.UID)
return false, false
}
if status.IsTerminationRequested() {
return false, false
}
if !p.allowStaticPodStart(status.fullname, pod.UID) {
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
status.working = false
return false
return false, true
}
return true
return true, true
}
// allowStaticPodStart tries to start the static pod and returns true if
@ -819,9 +834,12 @@ func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool {
}
waitingPods := p.waitingToStartStaticPodsByFullname[fullname]
// TODO: This is O(N) with respect to the number of updates to static pods
// with overlapping full names, and ideally would be O(1).
for i, waitingUID := range waitingPods {
// has pod already terminated or been deleted?
if _, ok := p.podSyncStatuses[waitingUID]; !ok {
status, ok := p.podSyncStatuses[waitingUID]
if !ok || status.IsTerminationRequested() || status.IsTerminated() {
continue
}
// another pod is next in line
@ -847,8 +865,20 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
var podStarted bool
for update := range podUpdates {
pod := update.Options.Pod
// Decide whether to start the pod. If the pod was terminated prior to the pod being allowed
// to start, we have to clean it up and then exit the pod worker loop.
if !podStarted {
if !p.allowPodStart(pod) {
canStart, canEverStart := p.allowPodStart(pod)
if !canEverStart {
p.completeUnstartedTerminated(pod)
if start := update.Options.StartTime; !start.IsZero() {
metrics.PodWorkerDuration.WithLabelValues("terminated").Observe(metrics.SinceInSeconds(start))
}
klog.V(4).InfoS("Processing pod event done", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
return
}
if !canStart {
klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(pod), "podUID", pod.UID)
continue
}
@ -1027,12 +1057,7 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) {
}
}
ch, ok := p.podUpdates[pod.UID]
if ok {
close(ch)
}
delete(p.podUpdates, pod.UID)
delete(p.lastUndeliveredWorkUpdate, pod.UID)
p.cleanupPodUpdates(pod.UID)
}
// completeTerminated is invoked after syncTerminatedPod completes successfully and means we
@ -1043,12 +1068,7 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) {
klog.V(4).InfoS("Pod is complete and the worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)
ch, ok := p.podUpdates[pod.UID]
if ok {
close(ch)
}
delete(p.podUpdates, pod.UID)
delete(p.lastUndeliveredWorkUpdate, pod.UID)
p.cleanupPodUpdates(pod.UID)
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
@ -1066,6 +1086,33 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) {
}
}
// completeUnstartedTerminated is invoked if a pod that has never been started receives a termination
// signal before it can be started.
func (p *podWorkers) completeUnstartedTerminated(pod *v1.Pod) {
p.podLock.Lock()
defer p.podLock.Unlock()
klog.V(4).InfoS("Pod never started and the worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)
p.cleanupPodUpdates(pod.UID)
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
if !status.terminatedAt.IsZero() {
klog.V(4).InfoS("Pod worker is complete and had terminatedAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
}
status.finished = true
status.working = false
status.terminatedAt = time.Now()
if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
delete(p.startedStaticPodsByFullname, status.fullname)
}
}
}
// completeWork requeues on error or the next sync interval and then immediately executes any pending
// work.
func (p *podWorkers) completeWork(pod *v1.Pod, syncErr error) {
@ -1150,10 +1197,10 @@ func (p *podWorkers) SyncKnownPods(desiredPods []*v1.Pod) map[types.UID]PodWorke
return workers
}
// removeTerminatedWorker cleans up and removes the worker status for a worker that
// has reached a terminal state of "finished" - has successfully exited
// syncTerminatedPod. This "forgets" a pod by UID and allows another pod to be recreated
// with the same UID.
// removeTerminatedWorker cleans up and removes the worker status for a worker
// that has reached a terminal state of "finished" - has successfully exited
// syncTerminatedPod. This "forgets" a pod by UID and allows another pod to be
// recreated with the same UID.
func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
status, ok := p.podSyncStatuses[uid]
if !ok {
@ -1162,11 +1209,6 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
return
}
if startedUID, started := p.startedStaticPodsByFullname[status.fullname]; started && startedUID != uid {
klog.V(4).InfoS("Pod cannot start yet but is no longer known to the kubelet, finish it", "podUID", uid)
status.finished = true
}
if !status.finished {
klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid)
return
@ -1178,8 +1220,7 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
klog.V(4).InfoS("Pod has been terminated and is no longer known to the kubelet, remove all history", "podUID", uid)
}
delete(p.podSyncStatuses, uid)
delete(p.podUpdates, uid)
delete(p.lastUndeliveredWorkUpdate, uid)
p.cleanupPodUpdates(uid)
if p.startedStaticPodsByFullname[status.fullname] == uid {
delete(p.startedStaticPodsByFullname, status.fullname)
@ -1230,3 +1271,15 @@ func killPodNow(podWorkers PodWorkers, recorder record.EventRecorder) eviction.K
}
}
}
// cleanupPodUpdates closes the podUpdates channel and removes it from
// podUpdates map so that the corresponding pod worker can stop. It also
// removes any undelivered work. This method must be called holding the
// pod lock.
func (p *podWorkers) cleanupPodUpdates(uid types.UID) {
if ch, ok := p.podUpdates[uid]; ok {
close(ch)
}
delete(p.podUpdates, uid)
delete(p.lastUndeliveredWorkUpdate, uid)
}

View File

@ -18,17 +18,20 @@ package kubelet
import (
"context"
"flag"
"reflect"
"strconv"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -164,6 +167,22 @@ func newStaticPod(uid, name string) *v1.Pod {
}
}
func newNamedPod(uid, namespace, name string, isStatic bool) *v1.Pod {
pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(uid),
Namespace: namespace,
Name: name,
},
}
if isStatic {
pod.Annotations = map[string]string{
kubetypes.ConfigSourceAnnotationKey: kubetypes.FileSource,
}
}
return pod
}
// syncPodRecord is a record of a sync pod call
type syncPodRecord struct {
name string
@ -172,12 +191,63 @@ type syncPodRecord struct {
terminated bool
}
type FakeQueueItem struct {
UID types.UID
Delay time.Duration
}
type fakeQueue struct {
lock sync.Mutex
queue []FakeQueueItem
currentStart int
}
func (q *fakeQueue) Empty() bool {
q.lock.Lock()
defer q.lock.Unlock()
return (len(q.queue) - q.currentStart) == 0
}
func (q *fakeQueue) Items() []FakeQueueItem {
q.lock.Lock()
defer q.lock.Unlock()
return append(make([]FakeQueueItem, 0, len(q.queue)), q.queue...)
}
func (q *fakeQueue) Set() sets.String {
q.lock.Lock()
defer q.lock.Unlock()
work := sets.NewString()
for _, item := range q.queue[q.currentStart:] {
work.Insert(string(item.UID))
}
return work
}
func (q *fakeQueue) Enqueue(uid types.UID, delay time.Duration) {
q.lock.Lock()
defer q.lock.Unlock()
q.queue = append(q.queue, FakeQueueItem{UID: uid, Delay: delay})
}
func (q *fakeQueue) GetWork() []types.UID {
q.lock.Lock()
defer q.lock.Unlock()
work := make([]types.UID, 0, len(q.queue)-q.currentStart)
for _, item := range q.queue[q.currentStart:] {
work = append(work, item.UID)
}
q.currentStart = len(q.queue)
return work
}
func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
lock := sync.Mutex{}
processed := make(map[types.UID][]syncPodRecord)
fakeRecorder := &record.FakeRecorder{}
fakeRuntime := &containertest.FakeRuntime{}
fakeCache := containertest.NewFakeCache(fakeRuntime)
fakeQueue := &fakeQueue{}
w := newPodWorkers(
func(ctx context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) error {
func() {
@ -215,9 +285,9 @@ func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
return nil
},
fakeRecorder,
queue.NewBasicWorkQueue(&clock.RealClock{}),
time.Second,
fakeQueue,
time.Second,
time.Millisecond,
fakeCache,
)
return w.(*podWorkers), processed
@ -241,6 +311,31 @@ func drainWorkers(podWorkers *podWorkers, numPods int) {
}
}
func drainWorkersExcept(podWorkers *podWorkers, uids ...types.UID) {
set := sets.NewString()
for _, uid := range uids {
set.Insert(string(uid))
}
for {
stillWorking := false
podWorkers.podLock.Lock()
for k, v := range podWorkers.podSyncStatuses {
if set.Has(string(k)) {
continue
}
if v.working {
stillWorking = true
break
}
}
podWorkers.podLock.Unlock()
if !stillWorking {
break
}
time.Sleep(50 * time.Millisecond)
}
}
func drainAllWorkers(podWorkers *podWorkers) {
for {
stillWorking := false
@ -427,6 +522,473 @@ func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) {
}
}
func newUIDSet(uids ...types.UID) sets.String {
set := sets.NewString()
for _, uid := range uids {
set.Insert(string(uid))
}
return set
}
func init() {
klog.InitFlags(nil)
flag.Lookup("v").Value.Set("5")
}
func TestStaticPodExclusion(t *testing.T) {
podWorkers, processed := createPodWorkers()
var channels WorkChannel
podWorkers.workerChannelFn = channels.Intercept
testPod := newNamedPod("2-static", "test1", "pod1", true)
if !kubetypes.IsStaticPod(testPod) {
t.Fatalf("unable to test static pod")
}
// start two pods with the same name, one static, one apiserver
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("1-normal", "test1", "pod1", false),
UpdateType: kubetypes.SyncPodUpdate,
})
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("2-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// should observe both pods running
pod1 := podWorkers.podSyncStatuses[types.UID("1-normal")]
if pod1.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod1)
}
pod2 := podWorkers.podSyncStatuses[types.UID("2-static")]
if pod2.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod2)
}
if len(processed) != 2 {
t.Fatalf("unexpected synced pods: %#v", processed)
}
if e, a :=
[]syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
processed[types.UID("2-static")]; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(e, a))
}
if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
}
// attempt to start a second and third static pod, which should not start
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("3-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("4-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// should observe both pods running but last pod shouldn't have synced
pod1 = podWorkers.podSyncStatuses[types.UID("1-normal")]
if pod1.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod1)
}
pod2 = podWorkers.podSyncStatuses[types.UID("2-static")]
if pod2.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod2)
}
pod3 := podWorkers.podSyncStatuses[types.UID("3-static")]
if pod3.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod3)
}
pod4 := podWorkers.podSyncStatuses[types.UID("4-static")]
if pod4.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod4)
}
if len(processed) != 2 {
t.Fatalf("unexpected synced pods: %#v", processed)
}
if expected, actual :=
[]syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
processed[types.UID("2-static")]; !reflect.DeepEqual(expected, actual) {
t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
}
if expected, actual :=
[]syncPodRecord(nil),
processed[types.UID("3-static")]; !reflect.DeepEqual(expected, actual) {
t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
}
if expected, actual :=
[]syncPodRecord(nil),
processed[types.UID("4-static")]; !reflect.DeepEqual(expected, actual) {
t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
}
if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
}
if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
}
// verify all are enqueued
if e, a := sets.NewString("1-normal", "2-static", "4-static", "3-static"), podWorkers.workQueue.(*fakeQueue).Set(); !e.Equal(a) {
t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a))
}
// send a basic update for 3-static
podWorkers.workQueue.GetWork()
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("3-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// 3-static should not be started because 2-static is still running
if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
}
if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
}
// the queue should include a single item for 3-static (indicating we need to retry later)
if e, a := sets.NewString("3-static"), newUIDSet(podWorkers.workQueue.GetWork()...); !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a))
}
// mark 3-static as deleted while 2-static is still running
podWorkers.workQueue.GetWork()
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("3-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
drainAllWorkers(podWorkers)
// should observe 3-static as terminated because it has never started, but other state should be a no-op
pod3 = podWorkers.podSyncStatuses[types.UID("3-static")]
if !pod3.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod3)
}
// the queue should be empty because the worker is now done
if e, a := sets.NewString(), newUIDSet(podWorkers.workQueue.GetWork()...); !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected queued items: %s", cmp.Diff(e, a))
}
// 2-static is still running
if e, a := map[string]types.UID{"pod1_test1": "2-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
}
// 3-static and 4-static are both still queued
if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
}
// terminate 2-static
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("2-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
drainAllWorkers(podWorkers)
// should observe 2-static as terminated, and 2-static should no longer be reported as the started static pod
pod2 = podWorkers.podSyncStatuses[types.UID("2-static")]
if !pod2.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod3)
}
if e, a := map[string]types.UID{}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
}
if e, a := map[string][]types.UID{"pod1_test1": {"3-static", "4-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
}
// simulate a periodic event from the work queue for 4-static
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("4-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// 4-static should be started because 3-static has already terminated
pod4 = podWorkers.podSyncStatuses[types.UID("4-static")]
if pod4.IsTerminated() {
t.Fatalf("unexpected pod state: %#v", pod3)
}
if e, a := map[string]types.UID{"pod1_test1": "4-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
}
if e, a := map[string][]types.UID{}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
}
// initiate a sync with all pods remaining
state := podWorkers.SyncKnownPods([]*v1.Pod{
newNamedPod("1-normal", "test1", "pod1", false),
newNamedPod("2-static", "test1", "pod1", true),
newNamedPod("3-static", "test1", "pod1", true),
newNamedPod("4-static", "test1", "pod1", true),
})
drainAllWorkers(podWorkers)
// 2-static and 3-static should both be listed as terminated
if e, a := map[types.UID]PodWorkerState{
"1-normal": SyncPod,
"2-static": TerminatedPod,
"3-static": TerminatedPod,
"4-static": SyncPod,
}, state; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected actual state: %s", cmp.Diff(e, a))
}
// 3-static is still in the config, it should still be in our status
if status, ok := podWorkers.podSyncStatuses["3-static"]; !ok || status.terminatedAt.IsZero() || !status.finished || status.working {
t.Fatalf("unexpected post termination status: %#v", status)
}
// initiate a sync with 3-static removed
state = podWorkers.SyncKnownPods([]*v1.Pod{
newNamedPod("1-normal", "test1", "pod1", false),
newNamedPod("2-static", "test1", "pod1", true),
newNamedPod("4-static", "test1", "pod1", true),
})
drainAllWorkers(podWorkers)
// expect sync to put 3-static into final state and remove the status
if e, a := map[types.UID]PodWorkerState{
"1-normal": SyncPod,
"2-static": TerminatedPod,
"3-static": TerminatedPod,
"4-static": SyncPod,
}, state; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected actual state: %s", cmp.Diff(e, a))
}
if status, ok := podWorkers.podSyncStatuses["3-static"]; ok {
t.Fatalf("unexpected post termination status: %#v", status)
}
// start a static pod, kill it, then add another one, but ensure the pod worker
// for pod 5 doesn't see the kill event (so it remains waiting to start)
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("5-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
channels.Channel("5-static").Hold()
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("5-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("6-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
drainWorkersExcept(podWorkers, "5-static")
// pod 5 should have termination requested, but hasn't cleaned up
pod5 := podWorkers.podSyncStatuses[types.UID("5-static")]
if !pod5.IsTerminationRequested() || pod5.IsTerminated() {
t.Fatalf("unexpected status for pod 5: %#v", pod5)
}
if e, a := map[string]types.UID{"pod1_test1": "4-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
}
if e, a := map[string][]types.UID{"pod1_test1": {"5-static", "6-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
}
// terminate 4-static and wake 6-static
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("4-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
drainWorkersExcept(podWorkers, "5-static")
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("6-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
drainWorkersExcept(podWorkers, "5-static")
// 5-static should still be waiting, 6-static should have started and synced
pod5 = podWorkers.podSyncStatuses[types.UID("5-static")]
if !pod5.IsTerminationRequested() || pod5.IsTerminated() {
t.Fatalf("unexpected status for pod 5: %#v", pod5)
}
if e, a := map[string]types.UID{"pod1_test1": "6-static"}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
}
// no static pods shoud be waiting
if e, a := map[string][]types.UID{}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
}
// prove 6-static synced
if expected, actual :=
[]syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
processed[types.UID("6-static")]; !reflect.DeepEqual(expected, actual) {
t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
}
// ensure 5-static exits when we deliver the event out of order
channels.Channel("5-static").Release()
drainAllWorkers(podWorkers)
pod5 = podWorkers.podSyncStatuses[types.UID("5-static")]
if !pod5.IsTerminated() {
t.Fatalf("unexpected status for pod 5: %#v", pod5)
}
// start three more static pods, kill the previous static pod blocking start,
// and simulate the second pod of three (8) getting to run first
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("7-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("8-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("9-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("6-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
drainAllWorkers(podWorkers)
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("8-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// 7 and 8 should both be waiting still with no syncs
if e, a := map[string]types.UID{}, podWorkers.startedStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected started static pods: %s", cmp.Diff(e, a))
}
// only 7-static can start now, but it hasn't received an event
if e, a := map[string][]types.UID{"pod1_test1": {"7-static", "8-static", "9-static"}}, podWorkers.waitingToStartStaticPodsByFullname; !reflect.DeepEqual(e, a) {
t.Fatalf("unexpected waiting static pods: %s", cmp.Diff(e, a))
}
// none of the new pods have synced
if expected, actual :=
[]syncPodRecord(nil),
processed[types.UID("7-static")]; !reflect.DeepEqual(expected, actual) {
t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
}
if expected, actual :=
[]syncPodRecord(nil),
processed[types.UID("8-static")]; !reflect.DeepEqual(expected, actual) {
t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
}
if expected, actual :=
[]syncPodRecord(nil),
processed[types.UID("9-static")]; !reflect.DeepEqual(expected, actual) {
t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
}
// terminate 7-static and wake 8-static
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("7-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodKill,
})
drainAllWorkers(podWorkers)
podWorkers.UpdatePod(UpdatePodOptions{
Pod: newNamedPod("8-static", "test1", "pod1", true),
UpdateType: kubetypes.SyncPodUpdate,
})
drainAllWorkers(podWorkers)
// 8 should have synced
if expected, actual :=
[]syncPodRecord{{name: "pod1", updateType: kubetypes.SyncPodUpdate}},
processed[types.UID("8-static")]; !reflect.DeepEqual(expected, actual) {
t.Fatalf("unexpected sync pod calls: %s", cmp.Diff(expected, actual))
}
}
type WorkChannelItem struct {
out chan podWork
lock sync.Mutex
pause bool
queue []podWork
}
func (item *WorkChannelItem) Handle(work podWork) {
item.lock.Lock()
defer item.lock.Unlock()
if item.pause {
item.queue = append(item.queue, work)
return
}
item.out <- work
}
func (item *WorkChannelItem) Hold() {
item.lock.Lock()
defer item.lock.Unlock()
item.pause = true
}
func (item *WorkChannelItem) Close() {
item.lock.Lock()
defer item.lock.Unlock()
if item.out != nil {
close(item.out)
item.out = nil
}
}
// Release blocks until all work is passed on the chain
func (item *WorkChannelItem) Release() {
item.lock.Lock()
defer item.lock.Unlock()
item.pause = false
for _, work := range item.queue {
item.out <- work
}
item.queue = nil
}
// WorkChannel intercepts podWork channels between the pod worker and its child
// goroutines and allows tests to pause or release the flow of podWork to the
// workers.
type WorkChannel struct {
lock sync.Mutex
channels map[types.UID]*WorkChannelItem
}
func (w *WorkChannel) Channel(uid types.UID) *WorkChannelItem {
w.lock.Lock()
defer w.lock.Unlock()
if w.channels == nil {
w.channels = make(map[types.UID]*WorkChannelItem)
}
channel, ok := w.channels[uid]
if !ok {
channel = &WorkChannelItem{
out: make(chan podWork, 1),
}
w.channels[uid] = channel
}
return channel
}
func (w *WorkChannel) Intercept(uid types.UID, ch chan podWork) (outCh <-chan podWork) {
channel := w.Channel(uid)
w.lock.Lock()
defer w.lock.Unlock()
go func() {
defer func() {
channel.Close()
w.lock.Lock()
defer w.lock.Unlock()
delete(w.channels, uid)
}()
for w := range ch {
channel.Handle(w)
}
}()
return channel.out
}
func TestSyncKnownPods(t *testing.T) {
podWorkers, _ := createPodWorkers()
@ -570,6 +1132,70 @@ func TestSyncKnownPods(t *testing.T) {
}
}
func Test_removeTerminatedWorker(t *testing.T) {
podUID := types.UID("pod-uid")
testCases := []struct {
desc string
podSyncStatus *podSyncStatus
startedStaticPodsByFullname map[string]types.UID
waitingToStartStaticPodsByFullname map[string][]types.UID
removed bool
}{
{
desc: "finished worker",
podSyncStatus: &podSyncStatus{
finished: true,
},
removed: true,
},
{
desc: "waiting to start worker because of another started pod with the same fullname",
podSyncStatus: &podSyncStatus{
finished: false,
fullname: "fake-fullname",
},
startedStaticPodsByFullname: map[string]types.UID{
"fake-fullname": "another-pod-uid",
},
waitingToStartStaticPodsByFullname: map[string][]types.UID{
"fake-fullname": {podUID},
},
removed: false,
},
{
desc: "not yet started worker",
podSyncStatus: &podSyncStatus{
finished: false,
fullname: "fake-fullname",
},
startedStaticPodsByFullname: make(map[string]types.UID),
waitingToStartStaticPodsByFullname: map[string][]types.UID{
"fake-fullname": {podUID},
},
removed: false,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
podWorkers, _ := createPodWorkers()
podWorkers.podSyncStatuses[podUID] = tc.podSyncStatus
podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname
podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname
podWorkers.removeTerminatedWorker(podUID)
_, exists := podWorkers.podSyncStatuses[podUID]
if tc.removed && exists {
t.Errorf("Expected pod worker to be removed")
}
if !tc.removed && !exists {
t.Errorf("Expected pod worker to not be removed")
}
})
}
}
type simpleFakeKubelet struct {
pod *v1.Pod
mirrorPod *v1.Pod
@ -702,10 +1328,14 @@ func Test_allowPodStart(t *testing.T) {
podSyncStatuses map[types.UID]*podSyncStatus
startedStaticPodsByFullname map[string]types.UID
waitingToStartStaticPodsByFullname map[string][]types.UID
expectedStartedStaticPodsByFullname map[string]types.UID
expectedWaitingToStartStaticPodsByFullname map[string][]types.UID
allowed bool
allowedEver bool
}{
{
// TBD: Do we want to allow non-static pods with the same full name?
// TODO: Do we want to allow non-static pods with the same full name?
// Note that it may disable the force deletion of pods.
desc: "non-static pod",
pod: newPod("uid-0", "test"),
@ -718,9 +1348,10 @@ func Test_allowPodStart(t *testing.T) {
},
},
allowed: true,
allowedEver: true,
},
{
// TBD: Do we want to allow a non-static pod with the same full name
// TODO: Do we want to allow a non-static pod with the same full name
// as the started static pod?
desc: "non-static pod when there is a started static pod with the same full name",
pod: newPod("uid-0", "test"),
@ -735,10 +1366,14 @@ func Test_allowPodStart(t *testing.T) {
startedStaticPodsByFullname: map[string]types.UID{
"test_": types.UID("uid-1"),
},
expectedStartedStaticPodsByFullname: map[string]types.UID{
"test_": types.UID("uid-1"),
},
allowed: true,
allowedEver: true,
},
{
// TBD: Do we want to allow a static pod with the same full name as the
// TODO: Do we want to allow a static pod with the same full name as the
// started non-static pod?
desc: "static pod when there is a started non-static pod with the same full name",
pod: newPod("uid-0", "test"),
@ -750,10 +1385,8 @@ func Test_allowPodStart(t *testing.T) {
fullname: "test_",
},
},
startedStaticPodsByFullname: map[string]types.UID{
"test_": types.UID("uid-1"),
},
allowed: true,
allowedEver: true,
},
{
desc: "static pod when there are no started static pods with the same full name",
@ -769,7 +1402,12 @@ func Test_allowPodStart(t *testing.T) {
startedStaticPodsByFullname: map[string]types.UID{
"bar_": types.UID("uid-1"),
},
expectedStartedStaticPodsByFullname: map[string]types.UID{
"foo_": types.UID("uid-0"),
"bar_": types.UID("uid-1"),
},
allowed: true,
allowedEver: true,
},
{
desc: "static pod when there is a started static pod with the same full name",
@ -785,7 +1423,11 @@ func Test_allowPodStart(t *testing.T) {
startedStaticPodsByFullname: map[string]types.UID{
"foo_": types.UID("uid-1"),
},
expectedStartedStaticPodsByFullname: map[string]types.UID{
"foo_": types.UID("uid-1"),
},
allowed: false,
allowedEver: true,
},
{
desc: "static pod if the static pod has already started",
@ -798,7 +1440,11 @@ func Test_allowPodStart(t *testing.T) {
startedStaticPodsByFullname: map[string]types.UID{
"foo_": types.UID("uid-0"),
},
expectedStartedStaticPodsByFullname: map[string]types.UID{
"foo_": types.UID("uid-0"),
},
allowed: true,
allowedEver: true,
},
{
desc: "static pod if the static pod is the first pod waiting to start",
@ -813,7 +1459,12 @@ func Test_allowPodStart(t *testing.T) {
types.UID("uid-0"),
},
},
expectedStartedStaticPodsByFullname: map[string]types.UID{
"foo_": types.UID("uid-0"),
},
expectedWaitingToStartStaticPodsByFullname: make(map[string][]types.UID),
allowed: true,
allowedEver: true,
},
{
desc: "static pod if the static pod is not the first pod waiting to start",
@ -832,7 +1483,15 @@ func Test_allowPodStart(t *testing.T) {
types.UID("uid-0"),
},
},
expectedStartedStaticPodsByFullname: make(map[string]types.UID),
expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-1"),
types.UID("uid-0"),
},
},
allowed: false,
allowedEver: true,
},
{
desc: "static pod if the static pod is the first valid pod waiting to start / clean up until picking the first valid pod",
@ -854,7 +1513,113 @@ func Test_allowPodStart(t *testing.T) {
types.UID("uid-1"),
},
},
expectedStartedStaticPodsByFullname: map[string]types.UID{
"foo_": types.UID("uid-0"),
},
expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-1"),
},
},
allowed: true,
allowedEver: true,
},
{
desc: "static pod if the static pod is the first pod that is not termination requested and waiting to start",
pod: newStaticPod("uid-0", "foo"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "foo_",
},
"uid-1": {
fullname: "foo_",
},
"uid-2": {
fullname: "foo_",
terminatingAt: time.Now(),
},
"uid-3": {
fullname: "foo_",
terminatedAt: time.Now(),
},
},
waitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-2"),
types.UID("uid-3"),
types.UID("uid-0"),
types.UID("uid-1"),
},
},
expectedStartedStaticPodsByFullname: map[string]types.UID{
"foo_": types.UID("uid-0"),
},
expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-1"),
},
},
allowed: true,
allowedEver: true,
},
{
desc: "static pod if there is no sync status for the pod should be denied",
pod: newStaticPod("uid-0", "foo"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-1": {
fullname: "foo_",
},
"uid-2": {
fullname: "foo_",
terminatingAt: time.Now(),
},
"uid-3": {
fullname: "foo_",
terminatedAt: time.Now(),
},
},
waitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-1"),
},
},
expectedStartedStaticPodsByFullname: map[string]types.UID{},
expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-1"),
},
},
allowed: false,
allowedEver: false,
},
{
desc: "static pod if the static pod is terminated should not be allowed",
pod: newStaticPod("uid-0", "foo"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "foo_",
terminatingAt: time.Now(),
},
},
waitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-2"),
types.UID("uid-3"),
types.UID("uid-0"),
types.UID("uid-1"),
},
},
expectedStartedStaticPodsByFullname: map[string]types.UID{},
expectedWaitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-2"),
types.UID("uid-3"),
types.UID("uid-0"),
types.UID("uid-1"),
},
},
allowed: false,
allowedEver: false,
},
}
@ -870,13 +1635,46 @@ func Test_allowPodStart(t *testing.T) {
if tc.waitingToStartStaticPodsByFullname != nil {
podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname
}
if podWorkers.allowPodStart(tc.pod) != tc.allowed {
allowed, allowedEver := podWorkers.allowPodStart(tc.pod)
if allowed != tc.allowed {
if tc.allowed {
t.Errorf("Pod should be allowed")
} else {
t.Errorf("Pod should not be allowed")
}
}
if allowedEver != tc.allowedEver {
if tc.allowedEver {
t.Errorf("Pod should be allowed ever")
} else {
t.Errorf("Pod should not be allowed ever")
}
}
// if maps are neither nil nor empty
if len(podWorkers.startedStaticPodsByFullname) != 0 ||
len(podWorkers.startedStaticPodsByFullname) != len(tc.expectedStartedStaticPodsByFullname) {
if !reflect.DeepEqual(
podWorkers.startedStaticPodsByFullname,
tc.expectedStartedStaticPodsByFullname) {
t.Errorf("startedStaticPodsByFullname: expected %v, got %v",
tc.expectedStartedStaticPodsByFullname,
podWorkers.startedStaticPodsByFullname)
}
}
// if maps are neither nil nor empty
if len(podWorkers.waitingToStartStaticPodsByFullname) != 0 ||
len(podWorkers.waitingToStartStaticPodsByFullname) != len(tc.expectedWaitingToStartStaticPodsByFullname) {
if !reflect.DeepEqual(
podWorkers.waitingToStartStaticPodsByFullname,
tc.expectedWaitingToStartStaticPodsByFullname) {
t.Errorf("waitingToStartStaticPodsByFullname: expected %v, got %v",
tc.expectedWaitingToStartStaticPodsByFullname,
podWorkers.waitingToStartStaticPodsByFullname)
}
}
})
}
}

View File

@ -100,6 +100,35 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() {
framework.ExpectEqual(pod.Spec.Containers[0].Image, image)
})
ginkgo.It("should update a static pod when the static pod is updated multiple times during the graceful termination period [NodeConformance]", func() {
ginkgo.By("get mirror pod uid")
pod, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
uid := pod.UID
ginkgo.By("update the pod manifest multiple times during the graceful termination period")
for i := 0; i < 300; i++ {
err = createStaticPod(podPath, staticPodName, ns,
fmt.Sprintf("image-%d", i), v1.RestartPolicyAlways)
framework.ExpectNoError(err)
time.Sleep(100 * time.Millisecond)
}
image := imageutils.GetPauseImageName()
err = createStaticPod(podPath, staticPodName, ns, image, v1.RestartPolicyAlways)
framework.ExpectNoError(err)
ginkgo.By("wait for the mirror pod to be updated")
gomega.Eventually(func() error {
return checkMirrorPodRecreatedAndRunning(f.ClientSet, mirrorPodName, ns, uid)
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
ginkgo.By("check the mirror pod container image is updated")
pod, err = f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{})
framework.ExpectNoError(err)
framework.ExpectEqual(len(pod.Spec.Containers), 1)
framework.ExpectEqual(pod.Spec.Containers[0].Image, image)
})
ginkgo.AfterEach(func() {
ginkgo.By("delete the static pod")
err := deleteStaticPod(podPath, staticPodName, ns)