Ensure there is one running static pod with the same full name

This commit is contained in:
Gunju Kim 2021-09-01 23:33:14 +09:00
parent 0f2c94d842
commit 3bce245279
No known key found for this signature in database
GPG Key ID: AA73776FF098A5BD
2 changed files with 320 additions and 30 deletions

View File

@ -246,6 +246,8 @@ type podSyncStatus struct {
cancelFn context.CancelFunc
// working is true if a pod worker is currently in a sync method.
working bool
// fullname of the pod
fullname string
// syncedAt is the time at which the pod worker first observed this pod.
syncedAt time.Time
@ -375,9 +377,10 @@ type podWorkers struct {
// Tracks by UID the termination status of a pod - syncing, terminating,
// terminated, and evicted.
podSyncStatuses map[types.UID]*podSyncStatus
// Tracks when a static pod is being killed and is removed when the
// static pod transitions to the killed state.
terminatingStaticPodFullnames map[string]struct{}
// Tracks all uids for started static pods by full name
startedStaticPodsByFullname map[string]types.UID
// Tracks all uids for static pods that are waiting to start by full name
waitingToStartStaticPodsByFullname map[string][]types.UID
workQueue queue.WorkQueue
@ -412,18 +415,19 @@ func newPodWorkers(
podCache kubecontainer.Cache,
) PodWorkers {
return &podWorkers{
podSyncStatuses: map[types.UID]*podSyncStatus{},
podUpdates: map[types.UID]chan podWork{},
lastUndeliveredWorkUpdate: map[types.UID]podWork{},
terminatingStaticPodFullnames: map[string]struct{}{},
syncPodFn: syncPodFn,
syncTerminatingPodFn: syncTerminatingPodFn,
syncTerminatedPodFn: syncTerminatedPodFn,
recorder: recorder,
workQueue: workQueue,
resyncInterval: resyncInterval,
backOffPeriod: backOffPeriod,
podCache: podCache,
podSyncStatuses: map[types.UID]*podSyncStatus{},
podUpdates: map[types.UID]chan podWork{},
lastUndeliveredWorkUpdate: map[types.UID]podWork{},
startedStaticPodsByFullname: map[string]types.UID{},
waitingToStartStaticPodsByFullname: map[string][]types.UID{},
syncPodFn: syncPodFn,
syncTerminatingPodFn: syncTerminatingPodFn,
syncTerminatedPodFn: syncTerminatedPodFn,
recorder: recorder,
workQueue: workQueue,
resyncInterval: resyncInterval,
backOffPeriod: backOffPeriod,
podCache: podCache,
}
}
@ -497,8 +501,19 @@ func (p *podWorkers) ShouldPodContentBeRemoved(uid types.UID) bool {
func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) bool {
p.podLock.Lock()
defer p.podLock.Unlock()
_, ok := p.terminatingStaticPodFullnames[podFullName]
return ok
uid, started := p.startedStaticPodsByFullname[podFullName]
if !started {
return false
}
status, exists := p.podSyncStatuses[uid]
if !exists {
return false
}
if !status.IsTerminationRequested() || status.IsTerminated() {
return false
}
return true
}
func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool {
@ -551,6 +566,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
status = &podSyncStatus{
syncedAt: now,
fullname: kubecontainer.GetPodFullName(pod),
}
// if this pod is being synced for the first time, we need to make sure it is an active pod
if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
@ -663,11 +679,6 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
// will never be zero.
options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod
// if a static pod comes through, start tracking it explicitly (cleared by the pod worker loop)
if kubetypes.IsStaticPod(pod) {
p.terminatingStaticPodFullnames[kubecontainer.GetPodFullName(pod)] = struct{}{}
}
default:
workType = SyncPodWork
@ -697,6 +708,12 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
podUpdates = make(chan podWork, 1)
p.podUpdates[uid] = podUpdates
// ensure that static pods start in the order they are received by UpdatePod
if kubetypes.IsStaticPod(pod) {
p.waitingToStartStaticPodsByFullname[status.fullname] =
append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
}
// Creating a new pod worker either means this is a new pod, or that the
// kubelet just restarted. In either case the kubelet is willing to believe
// the status of the pod for the first pod worker sync. See corresponding
@ -766,10 +783,76 @@ func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *
return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod
}
// allowPodStart tries to start the pod and returns true if allowed, otherwise
// it requeues the pod and returns false.
func (p *podWorkers) allowPodStart(pod *v1.Pod) bool {
if !kubetypes.IsStaticPod(pod) {
// TBD: Do we want to allow non-static pods with the same full name?
// Note that it may disable the force deletion of pods.
return true
}
p.podLock.Lock()
defer p.podLock.Unlock()
status, ok := p.podSyncStatuses[pod.UID]
if !ok {
klog.ErrorS(nil, "Failed to get a valid podSyncStatuses", "pod", klog.KObj(pod), "podUID", pod.UID)
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
status.working = false
return false
}
if !p.allowStaticPodStart(status.fullname, pod.UID) {
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
status.working = false
return false
}
return true
}
// allowStaticPodStart tries to start the static pod and returns true if
// 1. there are no other started static pods with the same fullname
// 2. the uid matches that of the first valid static pod waiting to start
func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool {
startedUID, started := p.startedStaticPodsByFullname[fullname]
if started {
return startedUID == uid
}
waitingPods := p.waitingToStartStaticPodsByFullname[fullname]
for i, waitingUID := range waitingPods {
// has pod already terminated or been deleted?
if _, ok := p.podSyncStatuses[waitingUID]; !ok {
continue
}
// another pod is next in line
if waitingUID != uid {
p.waitingToStartStaticPodsByFullname[fullname] = waitingPods[i:]
return false
}
// we are up next, remove ourselves
waitingPods = waitingPods[i+1:]
break
}
if len(waitingPods) != 0 {
p.waitingToStartStaticPodsByFullname[fullname] = waitingPods
} else {
delete(p.waitingToStartStaticPodsByFullname, fullname)
}
p.startedStaticPodsByFullname[fullname] = uid
return true
}
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
var lastSyncTime time.Time
var podStarted bool
for update := range podUpdates {
pod := update.Options.Pod
if !podStarted {
if !p.allowPodStart(pod) {
klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(pod), "podUID", pod.UID)
continue
}
podStarted = true
}
klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
err := func() error {
@ -899,9 +982,6 @@ func (p *podWorkers) completeTerminating(pod *v1.Pod) {
klog.V(4).InfoS("Pod terminated all containers successfully", "pod", klog.KObj(pod), "podUID", pod.UID)
// if a static pod is being tracked, forget it
delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod))
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
@ -933,9 +1013,6 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) {
klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)
// if a static pod is being tracked, forget it
delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod))
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
@ -943,6 +1020,10 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) {
status.terminatedAt = time.Now()
status.finished = true
status.working = false
if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
delete(p.startedStaticPodsByFullname, status.fullname)
}
}
ch, ok := p.podUpdates[pod.UID]
@ -951,7 +1032,6 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) {
}
delete(p.podUpdates, pod.UID)
delete(p.lastUndeliveredWorkUpdate, pod.UID)
delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod))
}
// completeTerminated is invoked after syncTerminatedPod completes successfully and means we
@ -968,7 +1048,6 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) {
}
delete(p.podUpdates, pod.UID)
delete(p.lastUndeliveredWorkUpdate, pod.UID)
delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod))
if status, ok := p.podSyncStatuses[pod.UID]; ok {
if status.terminatingAt.IsZero() {
@ -979,6 +1058,10 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) {
}
status.finished = true
status.working = false
if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
delete(p.startedStaticPodsByFullname, status.fullname)
}
}
}
@ -1078,6 +1161,11 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
return
}
if startedUID, started := p.startedStaticPodsByFullname[status.fullname]; started && startedUID != uid {
klog.V(4).InfoS("Pod cannot start yet but is no longer known to the kubelet, finish it", "podUID", uid)
status.finished = true
}
if !status.finished {
klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid)
return
@ -1091,6 +1179,10 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
delete(p.podSyncStatuses, uid)
delete(p.podUpdates, uid)
delete(p.lastUndeliveredWorkUpdate, uid)
if p.startedStaticPodsByFullname[status.fullname] == uid {
delete(p.startedStaticPodsByFullname, status.fullname)
}
}
// killPodNow returns a KillPodFunc that can be used to kill a pod.

View File

@ -152,6 +152,18 @@ func newPodWithPhase(uid, name string, phase v1.PodPhase) *v1.Pod {
}
}
func newStaticPod(uid, name string) *v1.Pod {
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
UID: types.UID(uid),
Name: name,
Annotations: map[string]string{
kubetypes.ConfigSourceAnnotationKey: kubetypes.FileSource,
},
},
}
}
// syncPodRecord is a record of a sync pod call
type syncPodRecord struct {
name string
@ -682,3 +694,189 @@ func TestKillPodNowFunc(t *testing.T) {
t.Errorf("Pod terminated %v, but expected %v", syncPodRecords[1].terminated, true)
}
}
func Test_allowPodStart(t *testing.T) {
testCases := []struct {
desc string
pod *v1.Pod
podSyncStatuses map[types.UID]*podSyncStatus
startedStaticPodsByFullname map[string]types.UID
waitingToStartStaticPodsByFullname map[string][]types.UID
allowed bool
}{
{
// TBD: Do we want to allow non-static pods with the same full name?
// Note that it may disable the force deletion of pods.
desc: "non-static pod",
pod: newPod("uid-0", "test"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "test_",
},
"uid-1": {
fullname: "test_",
},
},
allowed: true,
},
{
// TBD: Do we want to allow a non-static pod with the same full name
// as the started static pod?
desc: "non-static pod when there is a started static pod with the same full name",
pod: newPod("uid-0", "test"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "test_",
},
"uid-1": {
fullname: "test_",
},
},
startedStaticPodsByFullname: map[string]types.UID{
"test_": types.UID("uid-1"),
},
allowed: true,
},
{
// TBD: Do we want to allow a static pod with the same full name as the
// started non-static pod?
desc: "static pod when there is a started non-static pod with the same full name",
pod: newPod("uid-0", "test"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "test_",
},
"uid-1": {
fullname: "test_",
},
},
startedStaticPodsByFullname: map[string]types.UID{
"test_": types.UID("uid-1"),
},
allowed: true,
},
{
desc: "static pod when there are no started static pods with the same full name",
pod: newStaticPod("uid-0", "foo"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "foo_",
},
"uid-1": {
fullname: "bar_",
},
},
startedStaticPodsByFullname: map[string]types.UID{
"bar_": types.UID("uid-1"),
},
allowed: true,
},
{
desc: "static pod when there is a started static pod with the same full name",
pod: newStaticPod("uid-0", "foo"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "foo_",
},
"uid-1": {
fullname: "foo_",
},
},
startedStaticPodsByFullname: map[string]types.UID{
"foo_": types.UID("uid-1"),
},
allowed: false,
},
{
desc: "static pod if the static pod has already started",
pod: newStaticPod("uid-0", "foo"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "foo_",
},
},
startedStaticPodsByFullname: map[string]types.UID{
"foo_": types.UID("uid-0"),
},
allowed: true,
},
{
desc: "static pod if the static pod is the first pod waiting to start",
pod: newStaticPod("uid-0", "foo"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "foo_",
},
},
waitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-0"),
},
},
allowed: true,
},
{
desc: "static pod if the static pod is not the first pod waiting to start",
pod: newStaticPod("uid-0", "foo"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "foo_",
},
"uid-1": {
fullname: "foo_",
},
},
waitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-1"),
types.UID("uid-0"),
},
},
allowed: false,
},
{
desc: "static pod if the static pod is the first valid pod waiting to start / clean up until picking the first valid pod",
pod: newStaticPod("uid-0", "foo"),
podSyncStatuses: map[types.UID]*podSyncStatus{
"uid-0": {
fullname: "foo_",
},
"uid-1": {
fullname: "foo_",
},
},
waitingToStartStaticPodsByFullname: map[string][]types.UID{
"foo_": {
types.UID("uid-2"),
types.UID("uid-2"),
types.UID("uid-3"),
types.UID("uid-0"),
types.UID("uid-1"),
},
},
allowed: true,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
podWorkers, _ := createPodWorkers()
if tc.podSyncStatuses != nil {
podWorkers.podSyncStatuses = tc.podSyncStatuses
}
if tc.startedStaticPodsByFullname != nil {
podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname
}
if tc.waitingToStartStaticPodsByFullname != nil {
podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname
}
if podWorkers.allowPodStart(tc.pod) != tc.allowed {
if tc.allowed {
t.Errorf("Pod should be allowed")
} else {
t.Errorf("Pod should not be allowed")
}
}
})
}
}