mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-08 03:33:56 +00:00
Merge pull request #104743 from gjkim42/ensure-pod-uniqueness
Ensure there is one running static pod with the same full name
This commit is contained in:
commit
5f0a94b23c
@ -246,6 +246,8 @@ type podSyncStatus struct {
|
|||||||
cancelFn context.CancelFunc
|
cancelFn context.CancelFunc
|
||||||
// working is true if a pod worker is currently in a sync method.
|
// working is true if a pod worker is currently in a sync method.
|
||||||
working bool
|
working bool
|
||||||
|
// fullname of the pod
|
||||||
|
fullname string
|
||||||
|
|
||||||
// syncedAt is the time at which the pod worker first observed this pod.
|
// syncedAt is the time at which the pod worker first observed this pod.
|
||||||
syncedAt time.Time
|
syncedAt time.Time
|
||||||
@ -375,9 +377,10 @@ type podWorkers struct {
|
|||||||
// Tracks by UID the termination status of a pod - syncing, terminating,
|
// Tracks by UID the termination status of a pod - syncing, terminating,
|
||||||
// terminated, and evicted.
|
// terminated, and evicted.
|
||||||
podSyncStatuses map[types.UID]*podSyncStatus
|
podSyncStatuses map[types.UID]*podSyncStatus
|
||||||
// Tracks when a static pod is being killed and is removed when the
|
// Tracks all uids for started static pods by full name
|
||||||
// static pod transitions to the killed state.
|
startedStaticPodsByFullname map[string]types.UID
|
||||||
terminatingStaticPodFullnames map[string]struct{}
|
// Tracks all uids for static pods that are waiting to start by full name
|
||||||
|
waitingToStartStaticPodsByFullname map[string][]types.UID
|
||||||
|
|
||||||
workQueue queue.WorkQueue
|
workQueue queue.WorkQueue
|
||||||
|
|
||||||
@ -412,18 +415,19 @@ func newPodWorkers(
|
|||||||
podCache kubecontainer.Cache,
|
podCache kubecontainer.Cache,
|
||||||
) PodWorkers {
|
) PodWorkers {
|
||||||
return &podWorkers{
|
return &podWorkers{
|
||||||
podSyncStatuses: map[types.UID]*podSyncStatus{},
|
podSyncStatuses: map[types.UID]*podSyncStatus{},
|
||||||
podUpdates: map[types.UID]chan podWork{},
|
podUpdates: map[types.UID]chan podWork{},
|
||||||
lastUndeliveredWorkUpdate: map[types.UID]podWork{},
|
lastUndeliveredWorkUpdate: map[types.UID]podWork{},
|
||||||
terminatingStaticPodFullnames: map[string]struct{}{},
|
startedStaticPodsByFullname: map[string]types.UID{},
|
||||||
syncPodFn: syncPodFn,
|
waitingToStartStaticPodsByFullname: map[string][]types.UID{},
|
||||||
syncTerminatingPodFn: syncTerminatingPodFn,
|
syncPodFn: syncPodFn,
|
||||||
syncTerminatedPodFn: syncTerminatedPodFn,
|
syncTerminatingPodFn: syncTerminatingPodFn,
|
||||||
recorder: recorder,
|
syncTerminatedPodFn: syncTerminatedPodFn,
|
||||||
workQueue: workQueue,
|
recorder: recorder,
|
||||||
resyncInterval: resyncInterval,
|
workQueue: workQueue,
|
||||||
backOffPeriod: backOffPeriod,
|
resyncInterval: resyncInterval,
|
||||||
podCache: podCache,
|
backOffPeriod: backOffPeriod,
|
||||||
|
podCache: podCache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -497,8 +501,19 @@ func (p *podWorkers) ShouldPodContentBeRemoved(uid types.UID) bool {
|
|||||||
func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) bool {
|
func (p *podWorkers) IsPodForMirrorPodTerminatingByFullName(podFullName string) bool {
|
||||||
p.podLock.Lock()
|
p.podLock.Lock()
|
||||||
defer p.podLock.Unlock()
|
defer p.podLock.Unlock()
|
||||||
_, ok := p.terminatingStaticPodFullnames[podFullName]
|
uid, started := p.startedStaticPodsByFullname[podFullName]
|
||||||
return ok
|
if !started {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
status, exists := p.podSyncStatuses[uid]
|
||||||
|
if !exists {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !status.IsTerminationRequested() || status.IsTerminated() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool {
|
func isPodStatusCacheTerminal(status *kubecontainer.PodStatus) bool {
|
||||||
@ -551,6 +566,7 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
|
|||||||
klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
|
klog.V(4).InfoS("Pod is being synced for the first time", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
status = &podSyncStatus{
|
status = &podSyncStatus{
|
||||||
syncedAt: now,
|
syncedAt: now,
|
||||||
|
fullname: kubecontainer.GetPodFullName(pod),
|
||||||
}
|
}
|
||||||
// if this pod is being synced for the first time, we need to make sure it is an active pod
|
// if this pod is being synced for the first time, we need to make sure it is an active pod
|
||||||
if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
|
if !isRuntimePod && (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) {
|
||||||
@ -663,11 +679,6 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
|
|||||||
// will never be zero.
|
// will never be zero.
|
||||||
options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod
|
options.KillPodOptions.PodTerminationGracePeriodSecondsOverride = &gracePeriod
|
||||||
|
|
||||||
// if a static pod comes through, start tracking it explicitly (cleared by the pod worker loop)
|
|
||||||
if kubetypes.IsStaticPod(pod) {
|
|
||||||
p.terminatingStaticPodFullnames[kubecontainer.GetPodFullName(pod)] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
default:
|
||||||
workType = SyncPodWork
|
workType = SyncPodWork
|
||||||
|
|
||||||
@ -697,6 +708,12 @@ func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
|
|||||||
podUpdates = make(chan podWork, 1)
|
podUpdates = make(chan podWork, 1)
|
||||||
p.podUpdates[uid] = podUpdates
|
p.podUpdates[uid] = podUpdates
|
||||||
|
|
||||||
|
// ensure that static pods start in the order they are received by UpdatePod
|
||||||
|
if kubetypes.IsStaticPod(pod) {
|
||||||
|
p.waitingToStartStaticPodsByFullname[status.fullname] =
|
||||||
|
append(p.waitingToStartStaticPodsByFullname[status.fullname], uid)
|
||||||
|
}
|
||||||
|
|
||||||
// Creating a new pod worker either means this is a new pod, or that the
|
// Creating a new pod worker either means this is a new pod, or that the
|
||||||
// kubelet just restarted. In either case the kubelet is willing to believe
|
// kubelet just restarted. In either case the kubelet is willing to believe
|
||||||
// the status of the pod for the first pod worker sync. See corresponding
|
// the status of the pod for the first pod worker sync. See corresponding
|
||||||
@ -766,10 +783,76 @@ func calculateEffectiveGracePeriod(status *podSyncStatus, pod *v1.Pod, options *
|
|||||||
return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod
|
return gracePeriod, status.gracePeriod != 0 && status.gracePeriod != gracePeriod
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// allowPodStart tries to start the pod and returns true if allowed, otherwise
|
||||||
|
// it requeues the pod and returns false.
|
||||||
|
func (p *podWorkers) allowPodStart(pod *v1.Pod) bool {
|
||||||
|
if !kubetypes.IsStaticPod(pod) {
|
||||||
|
// TBD: Do we want to allow non-static pods with the same full name?
|
||||||
|
// Note that it may disable the force deletion of pods.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
p.podLock.Lock()
|
||||||
|
defer p.podLock.Unlock()
|
||||||
|
status, ok := p.podSyncStatuses[pod.UID]
|
||||||
|
if !ok {
|
||||||
|
klog.ErrorS(nil, "Failed to get a valid podSyncStatuses", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
|
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
|
||||||
|
status.working = false
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if !p.allowStaticPodStart(status.fullname, pod.UID) {
|
||||||
|
p.workQueue.Enqueue(pod.UID, wait.Jitter(p.backOffPeriod, workerBackOffPeriodJitterFactor))
|
||||||
|
status.working = false
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
// allowStaticPodStart tries to start the static pod and returns true if
|
||||||
|
// 1. there are no other started static pods with the same fullname
|
||||||
|
// 2. the uid matches that of the first valid static pod waiting to start
|
||||||
|
func (p *podWorkers) allowStaticPodStart(fullname string, uid types.UID) bool {
|
||||||
|
startedUID, started := p.startedStaticPodsByFullname[fullname]
|
||||||
|
if started {
|
||||||
|
return startedUID == uid
|
||||||
|
}
|
||||||
|
|
||||||
|
waitingPods := p.waitingToStartStaticPodsByFullname[fullname]
|
||||||
|
for i, waitingUID := range waitingPods {
|
||||||
|
// has pod already terminated or been deleted?
|
||||||
|
if _, ok := p.podSyncStatuses[waitingUID]; !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// another pod is next in line
|
||||||
|
if waitingUID != uid {
|
||||||
|
p.waitingToStartStaticPodsByFullname[fullname] = waitingPods[i:]
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// we are up next, remove ourselves
|
||||||
|
waitingPods = waitingPods[i+1:]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if len(waitingPods) != 0 {
|
||||||
|
p.waitingToStartStaticPodsByFullname[fullname] = waitingPods
|
||||||
|
} else {
|
||||||
|
delete(p.waitingToStartStaticPodsByFullname, fullname)
|
||||||
|
}
|
||||||
|
p.startedStaticPodsByFullname[fullname] = uid
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
|
func (p *podWorkers) managePodLoop(podUpdates <-chan podWork) {
|
||||||
var lastSyncTime time.Time
|
var lastSyncTime time.Time
|
||||||
|
var podStarted bool
|
||||||
for update := range podUpdates {
|
for update := range podUpdates {
|
||||||
pod := update.Options.Pod
|
pod := update.Options.Pod
|
||||||
|
if !podStarted {
|
||||||
|
if !p.allowPodStart(pod) {
|
||||||
|
klog.V(4).InfoS("Pod cannot start yet", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
podStarted = true
|
||||||
|
}
|
||||||
|
|
||||||
klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
|
klog.V(4).InfoS("Processing pod event", "pod", klog.KObj(pod), "podUID", pod.UID, "updateType", update.WorkType)
|
||||||
err := func() error {
|
err := func() error {
|
||||||
@ -899,9 +982,6 @@ func (p *podWorkers) completeTerminating(pod *v1.Pod) {
|
|||||||
|
|
||||||
klog.V(4).InfoS("Pod terminated all containers successfully", "pod", klog.KObj(pod), "podUID", pod.UID)
|
klog.V(4).InfoS("Pod terminated all containers successfully", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
|
|
||||||
// if a static pod is being tracked, forget it
|
|
||||||
delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod))
|
|
||||||
|
|
||||||
if status, ok := p.podSyncStatuses[pod.UID]; ok {
|
if status, ok := p.podSyncStatuses[pod.UID]; ok {
|
||||||
if status.terminatingAt.IsZero() {
|
if status.terminatingAt.IsZero() {
|
||||||
klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
|
klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
@ -933,9 +1013,6 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) {
|
|||||||
|
|
||||||
klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)
|
klog.V(4).InfoS("Pod terminated all orphaned containers successfully and worker can now stop", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
|
|
||||||
// if a static pod is being tracked, forget it
|
|
||||||
delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod))
|
|
||||||
|
|
||||||
if status, ok := p.podSyncStatuses[pod.UID]; ok {
|
if status, ok := p.podSyncStatuses[pod.UID]; ok {
|
||||||
if status.terminatingAt.IsZero() {
|
if status.terminatingAt.IsZero() {
|
||||||
klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
|
klog.V(4).InfoS("Pod worker was terminated but did not have terminatingAt set, likely programmer error", "pod", klog.KObj(pod), "podUID", pod.UID)
|
||||||
@ -943,6 +1020,10 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) {
|
|||||||
status.terminatedAt = time.Now()
|
status.terminatedAt = time.Now()
|
||||||
status.finished = true
|
status.finished = true
|
||||||
status.working = false
|
status.working = false
|
||||||
|
|
||||||
|
if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
|
||||||
|
delete(p.startedStaticPodsByFullname, status.fullname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ch, ok := p.podUpdates[pod.UID]
|
ch, ok := p.podUpdates[pod.UID]
|
||||||
@ -951,7 +1032,6 @@ func (p *podWorkers) completeTerminatingRuntimePod(pod *v1.Pod) {
|
|||||||
}
|
}
|
||||||
delete(p.podUpdates, pod.UID)
|
delete(p.podUpdates, pod.UID)
|
||||||
delete(p.lastUndeliveredWorkUpdate, pod.UID)
|
delete(p.lastUndeliveredWorkUpdate, pod.UID)
|
||||||
delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// completeTerminated is invoked after syncTerminatedPod completes successfully and means we
|
// completeTerminated is invoked after syncTerminatedPod completes successfully and means we
|
||||||
@ -968,7 +1048,6 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) {
|
|||||||
}
|
}
|
||||||
delete(p.podUpdates, pod.UID)
|
delete(p.podUpdates, pod.UID)
|
||||||
delete(p.lastUndeliveredWorkUpdate, pod.UID)
|
delete(p.lastUndeliveredWorkUpdate, pod.UID)
|
||||||
delete(p.terminatingStaticPodFullnames, kubecontainer.GetPodFullName(pod))
|
|
||||||
|
|
||||||
if status, ok := p.podSyncStatuses[pod.UID]; ok {
|
if status, ok := p.podSyncStatuses[pod.UID]; ok {
|
||||||
if status.terminatingAt.IsZero() {
|
if status.terminatingAt.IsZero() {
|
||||||
@ -979,6 +1058,10 @@ func (p *podWorkers) completeTerminated(pod *v1.Pod) {
|
|||||||
}
|
}
|
||||||
status.finished = true
|
status.finished = true
|
||||||
status.working = false
|
status.working = false
|
||||||
|
|
||||||
|
if p.startedStaticPodsByFullname[status.fullname] == pod.UID {
|
||||||
|
delete(p.startedStaticPodsByFullname, status.fullname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1078,6 +1161,11 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if startedUID, started := p.startedStaticPodsByFullname[status.fullname]; started && startedUID != uid {
|
||||||
|
klog.V(4).InfoS("Pod cannot start yet but is no longer known to the kubelet, finish it", "podUID", uid)
|
||||||
|
status.finished = true
|
||||||
|
}
|
||||||
|
|
||||||
if !status.finished {
|
if !status.finished {
|
||||||
klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid)
|
klog.V(4).InfoS("Pod worker has been requested for removal but is still not fully terminated", "podUID", uid)
|
||||||
return
|
return
|
||||||
@ -1091,6 +1179,10 @@ func (p *podWorkers) removeTerminatedWorker(uid types.UID) {
|
|||||||
delete(p.podSyncStatuses, uid)
|
delete(p.podSyncStatuses, uid)
|
||||||
delete(p.podUpdates, uid)
|
delete(p.podUpdates, uid)
|
||||||
delete(p.lastUndeliveredWorkUpdate, uid)
|
delete(p.lastUndeliveredWorkUpdate, uid)
|
||||||
|
|
||||||
|
if p.startedStaticPodsByFullname[status.fullname] == uid {
|
||||||
|
delete(p.startedStaticPodsByFullname, status.fullname)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// killPodNow returns a KillPodFunc that can be used to kill a pod.
|
// killPodNow returns a KillPodFunc that can be used to kill a pod.
|
||||||
|
@ -152,6 +152,18 @@ func newPodWithPhase(uid, name string, phase v1.PodPhase) *v1.Pod {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newStaticPod(uid, name string) *v1.Pod {
|
||||||
|
return &v1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
UID: types.UID(uid),
|
||||||
|
Name: name,
|
||||||
|
Annotations: map[string]string{
|
||||||
|
kubetypes.ConfigSourceAnnotationKey: kubetypes.FileSource,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// syncPodRecord is a record of a sync pod call
|
// syncPodRecord is a record of a sync pod call
|
||||||
type syncPodRecord struct {
|
type syncPodRecord struct {
|
||||||
name string
|
name string
|
||||||
@ -682,3 +694,189 @@ func TestKillPodNowFunc(t *testing.T) {
|
|||||||
t.Errorf("Pod terminated %v, but expected %v", syncPodRecords[1].terminated, true)
|
t.Errorf("Pod terminated %v, but expected %v", syncPodRecords[1].terminated, true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_allowPodStart(t *testing.T) {
|
||||||
|
testCases := []struct {
|
||||||
|
desc string
|
||||||
|
pod *v1.Pod
|
||||||
|
podSyncStatuses map[types.UID]*podSyncStatus
|
||||||
|
startedStaticPodsByFullname map[string]types.UID
|
||||||
|
waitingToStartStaticPodsByFullname map[string][]types.UID
|
||||||
|
allowed bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
// TBD: Do we want to allow non-static pods with the same full name?
|
||||||
|
// Note that it may disable the force deletion of pods.
|
||||||
|
desc: "non-static pod",
|
||||||
|
pod: newPod("uid-0", "test"),
|
||||||
|
podSyncStatuses: map[types.UID]*podSyncStatus{
|
||||||
|
"uid-0": {
|
||||||
|
fullname: "test_",
|
||||||
|
},
|
||||||
|
"uid-1": {
|
||||||
|
fullname: "test_",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
allowed: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// TBD: Do we want to allow a non-static pod with the same full name
|
||||||
|
// as the started static pod?
|
||||||
|
desc: "non-static pod when there is a started static pod with the same full name",
|
||||||
|
pod: newPod("uid-0", "test"),
|
||||||
|
podSyncStatuses: map[types.UID]*podSyncStatus{
|
||||||
|
"uid-0": {
|
||||||
|
fullname: "test_",
|
||||||
|
},
|
||||||
|
"uid-1": {
|
||||||
|
fullname: "test_",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
startedStaticPodsByFullname: map[string]types.UID{
|
||||||
|
"test_": types.UID("uid-1"),
|
||||||
|
},
|
||||||
|
allowed: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// TBD: Do we want to allow a static pod with the same full name as the
|
||||||
|
// started non-static pod?
|
||||||
|
desc: "static pod when there is a started non-static pod with the same full name",
|
||||||
|
pod: newPod("uid-0", "test"),
|
||||||
|
podSyncStatuses: map[types.UID]*podSyncStatus{
|
||||||
|
"uid-0": {
|
||||||
|
fullname: "test_",
|
||||||
|
},
|
||||||
|
"uid-1": {
|
||||||
|
fullname: "test_",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
startedStaticPodsByFullname: map[string]types.UID{
|
||||||
|
"test_": types.UID("uid-1"),
|
||||||
|
},
|
||||||
|
allowed: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "static pod when there are no started static pods with the same full name",
|
||||||
|
pod: newStaticPod("uid-0", "foo"),
|
||||||
|
podSyncStatuses: map[types.UID]*podSyncStatus{
|
||||||
|
"uid-0": {
|
||||||
|
fullname: "foo_",
|
||||||
|
},
|
||||||
|
"uid-1": {
|
||||||
|
fullname: "bar_",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
startedStaticPodsByFullname: map[string]types.UID{
|
||||||
|
"bar_": types.UID("uid-1"),
|
||||||
|
},
|
||||||
|
allowed: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "static pod when there is a started static pod with the same full name",
|
||||||
|
pod: newStaticPod("uid-0", "foo"),
|
||||||
|
podSyncStatuses: map[types.UID]*podSyncStatus{
|
||||||
|
"uid-0": {
|
||||||
|
fullname: "foo_",
|
||||||
|
},
|
||||||
|
"uid-1": {
|
||||||
|
fullname: "foo_",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
startedStaticPodsByFullname: map[string]types.UID{
|
||||||
|
"foo_": types.UID("uid-1"),
|
||||||
|
},
|
||||||
|
allowed: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "static pod if the static pod has already started",
|
||||||
|
pod: newStaticPod("uid-0", "foo"),
|
||||||
|
podSyncStatuses: map[types.UID]*podSyncStatus{
|
||||||
|
"uid-0": {
|
||||||
|
fullname: "foo_",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
startedStaticPodsByFullname: map[string]types.UID{
|
||||||
|
"foo_": types.UID("uid-0"),
|
||||||
|
},
|
||||||
|
allowed: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "static pod if the static pod is the first pod waiting to start",
|
||||||
|
pod: newStaticPod("uid-0", "foo"),
|
||||||
|
podSyncStatuses: map[types.UID]*podSyncStatus{
|
||||||
|
"uid-0": {
|
||||||
|
fullname: "foo_",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
waitingToStartStaticPodsByFullname: map[string][]types.UID{
|
||||||
|
"foo_": {
|
||||||
|
types.UID("uid-0"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
allowed: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "static pod if the static pod is not the first pod waiting to start",
|
||||||
|
pod: newStaticPod("uid-0", "foo"),
|
||||||
|
podSyncStatuses: map[types.UID]*podSyncStatus{
|
||||||
|
"uid-0": {
|
||||||
|
fullname: "foo_",
|
||||||
|
},
|
||||||
|
"uid-1": {
|
||||||
|
fullname: "foo_",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
waitingToStartStaticPodsByFullname: map[string][]types.UID{
|
||||||
|
"foo_": {
|
||||||
|
types.UID("uid-1"),
|
||||||
|
types.UID("uid-0"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
allowed: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
desc: "static pod if the static pod is the first valid pod waiting to start / clean up until picking the first valid pod",
|
||||||
|
pod: newStaticPod("uid-0", "foo"),
|
||||||
|
podSyncStatuses: map[types.UID]*podSyncStatus{
|
||||||
|
"uid-0": {
|
||||||
|
fullname: "foo_",
|
||||||
|
},
|
||||||
|
"uid-1": {
|
||||||
|
fullname: "foo_",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
waitingToStartStaticPodsByFullname: map[string][]types.UID{
|
||||||
|
"foo_": {
|
||||||
|
types.UID("uid-2"),
|
||||||
|
types.UID("uid-2"),
|
||||||
|
types.UID("uid-3"),
|
||||||
|
types.UID("uid-0"),
|
||||||
|
types.UID("uid-1"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
allowed: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.desc, func(t *testing.T) {
|
||||||
|
podWorkers, _ := createPodWorkers()
|
||||||
|
if tc.podSyncStatuses != nil {
|
||||||
|
podWorkers.podSyncStatuses = tc.podSyncStatuses
|
||||||
|
}
|
||||||
|
if tc.startedStaticPodsByFullname != nil {
|
||||||
|
podWorkers.startedStaticPodsByFullname = tc.startedStaticPodsByFullname
|
||||||
|
}
|
||||||
|
if tc.waitingToStartStaticPodsByFullname != nil {
|
||||||
|
podWorkers.waitingToStartStaticPodsByFullname = tc.waitingToStartStaticPodsByFullname
|
||||||
|
}
|
||||||
|
if podWorkers.allowPodStart(tc.pod) != tc.allowed {
|
||||||
|
if tc.allowed {
|
||||||
|
t.Errorf("Pod should be allowed")
|
||||||
|
} else {
|
||||||
|
t.Errorf("Pod should not be allowed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -26,8 +26,11 @@ import (
|
|||||||
"github.com/onsi/gomega"
|
"github.com/onsi/gomega"
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/uuid"
|
"k8s.io/apimachinery/pkg/util/uuid"
|
||||||
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
|
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||||
)
|
)
|
||||||
|
|
||||||
var _ = SIGDescribe("MirrorPodWithGracePeriod", func() {
|
var _ = SIGDescribe("MirrorPodWithGracePeriod", func() {
|
||||||
@ -53,10 +56,9 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() {
|
|||||||
|
|
||||||
ginkgo.It("mirror pod termination should satisfy grace period when static pod is deleted [NodeConformance]", func() {
|
ginkgo.It("mirror pod termination should satisfy grace period when static pod is deleted [NodeConformance]", func() {
|
||||||
ginkgo.By("get mirror pod uid")
|
ginkgo.By("get mirror pod uid")
|
||||||
_, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{})
|
pod, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{})
|
||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
|
uid := pod.UID
|
||||||
start := time.Now()
|
|
||||||
|
|
||||||
ginkgo.By("delete the static pod")
|
ginkgo.By("delete the static pod")
|
||||||
file := staticPodPath(podPath, staticPodName, ns)
|
file := staticPodPath(podPath, staticPodName, ns)
|
||||||
@ -64,25 +66,51 @@ var _ = SIGDescribe("MirrorPodWithGracePeriod", func() {
|
|||||||
err = os.Remove(file)
|
err = os.Remove(file)
|
||||||
framework.ExpectNoError(err)
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
for {
|
ginkgo.By("wait for the mirror pod to be running for grace period")
|
||||||
if time.Now().Sub(start).Seconds() > 19 {
|
gomega.Consistently(func() error {
|
||||||
break
|
return checkMirrorPodRunningWithUID(f.ClientSet, mirrorPodName, ns, uid)
|
||||||
}
|
}, 19*time.Second, 200*time.Millisecond).Should(gomega.BeNil())
|
||||||
pod, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{})
|
})
|
||||||
framework.ExpectNoError(err)
|
|
||||||
if pod.Status.Phase != v1.PodRunning {
|
ginkgo.It("mirror pod termination should satisfy grace period when static pod is updated [NodeConformance]", func() {
|
||||||
framework.Failf("expected the mirror pod %q to be running, got %q", mirrorPodName, pod.Status.Phase)
|
ginkgo.By("get mirror pod uid")
|
||||||
}
|
pod, err := f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{})
|
||||||
// have some pause in between the API server queries to avoid throttling
|
framework.ExpectNoError(err)
|
||||||
time.Sleep(time.Duration(200) * time.Millisecond)
|
uid := pod.UID
|
||||||
}
|
|
||||||
|
ginkgo.By("update the static pod container image")
|
||||||
|
image := imageutils.GetPauseImageName()
|
||||||
|
err = createStaticPod(podPath, staticPodName, ns, image, v1.RestartPolicyAlways)
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
|
||||||
|
ginkgo.By("wait for the mirror pod to be running for grace period")
|
||||||
|
gomega.Consistently(func() error {
|
||||||
|
return checkMirrorPodRunningWithUID(f.ClientSet, mirrorPodName, ns, uid)
|
||||||
|
}, 19*time.Second, 200*time.Millisecond).Should(gomega.BeNil())
|
||||||
|
|
||||||
|
ginkgo.By("wait for the mirror pod to be updated")
|
||||||
|
gomega.Eventually(func() error {
|
||||||
|
return checkMirrorPodRecreatedAndRunning(f.ClientSet, mirrorPodName, ns, uid)
|
||||||
|
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
|
||||||
|
|
||||||
|
ginkgo.By("check the mirror pod container image is updated")
|
||||||
|
pod, err = f.ClientSet.CoreV1().Pods(ns).Get(context.TODO(), mirrorPodName, metav1.GetOptions{})
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
framework.ExpectEqual(len(pod.Spec.Containers), 1)
|
||||||
|
framework.ExpectEqual(pod.Spec.Containers[0].Image, image)
|
||||||
})
|
})
|
||||||
|
|
||||||
ginkgo.AfterEach(func() {
|
ginkgo.AfterEach(func() {
|
||||||
|
ginkgo.By("delete the static pod")
|
||||||
|
err := deleteStaticPod(podPath, staticPodName, ns)
|
||||||
|
if !os.IsNotExist(err) {
|
||||||
|
framework.ExpectNoError(err)
|
||||||
|
}
|
||||||
|
|
||||||
ginkgo.By("wait for the mirror pod to disappear")
|
ginkgo.By("wait for the mirror pod to disappear")
|
||||||
gomega.Eventually(func() error {
|
gomega.Eventually(func() error {
|
||||||
return checkMirrorPodDisappear(f.ClientSet, mirrorPodName, ns)
|
return checkMirrorPodDisappear(f.ClientSet, mirrorPodName, ns)
|
||||||
}, time.Second*19, time.Second).Should(gomega.BeNil())
|
}, 2*time.Minute, time.Second*4).Should(gomega.BeNil())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -124,3 +152,17 @@ spec:
|
|||||||
framework.Logf("has written %v", file)
|
framework.Logf("has written %v", file)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkMirrorPodRunningWithUID(cl clientset.Interface, name, namespace string, oUID types.UID) error {
|
||||||
|
pod, err := cl.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("expected the mirror pod %q to appear: %v", name, err)
|
||||||
|
}
|
||||||
|
if pod.UID != oUID {
|
||||||
|
return fmt.Errorf("expected the uid of mirror pod %q to be same, got %q", name, pod.UID)
|
||||||
|
}
|
||||||
|
if pod.Status.Phase != v1.PodRunning {
|
||||||
|
return fmt.Errorf("expected the mirror pod %q to be running, got %q", name, pod.Status.Phase)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user