From 865c3c56706d131c3164d16812fec58454a77020 Mon Sep 17 00:00:00 2001 From: Miciah Masters Date: Mon, 16 Sep 2019 21:51:23 -0400 Subject: [PATCH 1/2] TestGetPodsToDelete: Use field names in test cases * pkg/controller/replicaset/replica_set_test.go (TestGetPodsToDelete): Use explicit field names in declarations of test cases. --- pkg/controller/replicaset/replica_set_test.go | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 99847bc7a92..9b468657845 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -1468,39 +1468,39 @@ func TestGetPodsToDelete(t *testing.T) { // a scheduled, running, ready pod // Note that a pending pod cannot be ready { - "len(pods) = 0 (i.e., diff = 0 too)", - []*v1.Pod{}, - 0, - []*v1.Pod{}, + name: "len(pods) = 0 (i.e., diff = 0 too)", + pods: []*v1.Pod{}, + diff: 0, + expectedPodsToDelete: []*v1.Pod{}, }, { - "diff = len(pods)", - []*v1.Pod{ + name: "diff = len(pods)", + pods: []*v1.Pod{ scheduledRunningNotReadyPod, scheduledRunningReadyPod, }, - 2, - []*v1.Pod{scheduledRunningNotReadyPod, scheduledRunningReadyPod}, + diff: 2, + expectedPodsToDelete: []*v1.Pod{scheduledRunningNotReadyPod, scheduledRunningReadyPod}, }, { - "diff < len(pods)", - []*v1.Pod{ + name: "diff < len(pods)", + pods: []*v1.Pod{ scheduledRunningReadyPod, scheduledRunningNotReadyPod, }, - 1, - []*v1.Pod{scheduledRunningNotReadyPod}, + diff: 1, + expectedPodsToDelete: []*v1.Pod{scheduledRunningNotReadyPod}, }, { - "various pod phases and conditions, diff = len(pods)", - []*v1.Pod{ + name: "various pod phases and conditions, diff = len(pods)", + pods: []*v1.Pod{ scheduledRunningReadyPod, scheduledRunningNotReadyPod, scheduledPendingPod, unscheduledPendingPod, }, - 4, - []*v1.Pod{ + diff: 4, + expectedPodsToDelete: []*v1.Pod{ scheduledRunningReadyPod, scheduledRunningNotReadyPod, scheduledPendingPod, @@ -1508,50 +1508,50 @@ func TestGetPodsToDelete(t *testing.T) { }, }, { - "scheduled vs unscheduled, diff < len(pods)", - []*v1.Pod{ + name: "scheduled vs unscheduled, diff < len(pods)", + pods: []*v1.Pod{ scheduledPendingPod, unscheduledPendingPod, }, - 1, - []*v1.Pod{ + diff: 1, + expectedPodsToDelete: []*v1.Pod{ unscheduledPendingPod, }, }, { - "ready vs not-ready, diff < len(pods)", - []*v1.Pod{ + name: "ready vs not-ready, diff < len(pods)", + pods: []*v1.Pod{ scheduledRunningReadyPod, scheduledRunningNotReadyPod, scheduledRunningNotReadyPod, }, - 2, - []*v1.Pod{ + diff: 2, + expectedPodsToDelete: []*v1.Pod{ scheduledRunningNotReadyPod, scheduledRunningNotReadyPod, }, }, { - "pending vs running, diff < len(pods)", - []*v1.Pod{ + name: "pending vs running, diff < len(pods)", + pods: []*v1.Pod{ scheduledPendingPod, scheduledRunningNotReadyPod, }, - 1, - []*v1.Pod{ + diff: 1, + expectedPodsToDelete: []*v1.Pod{ scheduledPendingPod, }, }, { - "various pod phases and conditions, diff < len(pods)", - []*v1.Pod{ + name: "various pod phases and conditions, diff < len(pods)", + pods: []*v1.Pod{ scheduledRunningReadyPod, scheduledRunningNotReadyPod, scheduledPendingPod, unscheduledPendingPod, }, - 3, - []*v1.Pod{ + diff: 3, + expectedPodsToDelete: []*v1.Pod{ unscheduledPendingPod, scheduledPendingPod, scheduledRunningNotReadyPod, From 980b6406b24b41bc0ef061624385f1f420708819 Mon Sep 17 00:00:00 2001 From: Miciah Masters Date: Wed, 10 Jul 2019 18:56:19 -0400 Subject: [PATCH 2/2] Prefer to delete doubled-up pods of a ReplicaSet When scaling down a ReplicaSet, delete doubled up replicas first, where a "doubled up replica" is defined as one that is on the same node as an active replica belonging to a related ReplicaSet. ReplicaSets are considered "related" if they have a common controller (typically a Deployment). The intention of this change is to make a rolling update of a Deployment scale down the old ReplicaSet as it scales up the new ReplicaSet by deleting pods from the old ReplicaSet that are colocated with ready pods of the new ReplicaSet. This change in the behavior of rolling updates can be combined with pod affinity rules to preserve the locality of a Deployment's pods over rollout. A specific scenario that benefits from this change is when a Deployment's pods are exposed by a Service that has type "LoadBalancer" and external traffic policy "Local". In this scenario, the load balancer uses health checks to determine whether it should forward traffic for the Service to a particular node. If the node has no local endpoints for the Service, the health check will fail for that node. Eventually, the load balancer will stop forwarding traffic to that node. In the meantime, the service proxy drops traffic for that Service. Thus, in order to reduce risk of dropping traffic during a rolling update, it is desirable preserve node locality of endpoints. * pkg/controller/controller_utils.go (ActivePodsWithRanks): New type to sort pods using a given ranking. * pkg/controller/controller_utils_test.go (TestSortingActivePodsWithRanks): New test for ActivePodsWithRanks. * pkg/controller/replicaset/replica_set.go (getReplicaSetsWithSameController): New method. Given a ReplicaSet, return all ReplicaSets that have the same owner. (manageReplicas): Call getIndirectlyRelatedPods, and pass its result to getPodsToDelete. (getIndirectlyRelatedPods): New method. Given a ReplicaSet, return all pods that are owned by any ReplicaSet with the same owner. (getPodsToDelete): Add an argument for related pods. Use related pods and the new getPodsRankedByRelatedPodsOnSameNode function to take into account whether a pod is doubled up when sorting pods for deletion. (getPodsRankedByRelatedPodsOnSameNode): New function. Return an ActivePodsWithRanks value that wraps the given slice of pods and computes ranks where each pod's rank is equal to the number of active related pods that are colocated on the same node. * pkg/controller/replicaset/replica_set_test.go (newReplicaSet): Set OwnerReferences on the ReplicaSet. (newPod): Set a unique UID on the pod. (byName): New type to sort pods by name. (TestGetReplicaSetsWithSameController): New test for getReplicaSetsWithSameController. (TestRelatedPodsLookup): New test for getIndirectlyRelatedPods. (TestGetPodsToDelete): Augment the "various pod phases and conditions, diff = len(pods)" test case to ensure that scale-down still selects doubled-up pods if there are not enough other pods to scale down. Add a "various pod phases and conditions, diff = len(pods), relatedPods empty" test case to verify that getPodsToDelete works even if related pods could not be determined. Add a "ready and colocated with another ready pod vs not colocated, diff < len(pods)" test case to verify that a doubled-up pod gets preferred for deletion. Augment the "various pod phases and conditions, diff < len(pods)" test case to ensure that not-ready pods are preferred over ready but doubled-up pods. * pkg/controller/replicaset/BUILD: Regenerate. * test/e2e/apps/deployment.go (testRollingUpdateDeploymentWithLocalTrafficLoadBalancer): New end-to-end test. Create a deployment with a rolling update strategy and affinity rules and a load balancer with "Local" external traffic policy, and verify that set of nodes with local endponts for the service remains unchanged during rollouts. (setAffinity): New helper, used by testRollingUpdateDeploymentWithLocalTrafficLoadBalancer. * test/e2e/framework/service/jig.go (GetEndpointNodes): Factor building the set of node names out... (GetEndpointNodeNames): ...into this new method. --- pkg/controller/controller_utils.go | 88 +++++++ pkg/controller/controller_utils_test.go | 90 +++++++ pkg/controller/replicaset/BUILD | 1 + pkg/controller/replicaset/replica_set.go | 102 +++++++- pkg/controller/replicaset/replica_set_test.go | 245 ++++++++++++++++-- test/e2e/apps/deployment.go | 153 +++++++++++ test/e2e/framework/service/jig.go | 21 +- 7 files changed, 668 insertions(+), 32 deletions(-) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 025bf9d5ace..17fcf98f9a7 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -771,6 +771,94 @@ func (s ActivePods) Less(i, j int) bool { return false } +// ActivePodsWithRanks is a sortable list of pods and a list of corresponding +// ranks which will be considered during sorting. The two lists must have equal +// length. After sorting, the pods will be ordered as follows, applying each +// rule in turn until one matches: +// +// 1. If only one of the pods is assigned to a node, the pod that is not +// assigned comes before the pod that is. +// 2. If the pods' phases differ, a pending pod comes before a pod whose phase +// is unknown, and a pod whose phase is unknown comes before a running pod. +// 3. If exactly one of the pods is ready, the pod that is not ready comes +// before the ready pod. +// 4. If the pods' ranks differ, the pod with greater rank comes before the pod +// with lower rank. +// 5. If both pods are ready but have not been ready for the same amount of +// time, the pod that has been ready for a shorter amount of time comes +// before the pod that has been ready for longer. +// 6. If one pod has a container that has restarted more than any container in +// the other pod, the pod with the container with more restarts comes +// before the other pod. +// 7. If the pods' creation times differ, the pod that was created more recently +// comes before the older pod. +// +// If none of these rules matches, the second pod comes before the first pod. +// +// The intention of this ordering is to put pods that should be preferred for +// deletion first in the list. +type ActivePodsWithRanks struct { + // Pods is a list of pods. + Pods []*v1.Pod + + // Rank is a ranking of pods. This ranking is used during sorting when + // comparing two pods that are both scheduled, in the same phase, and + // having the same ready status. + Rank []int +} + +func (s ActivePodsWithRanks) Len() int { + return len(s.Pods) +} + +func (s ActivePodsWithRanks) Swap(i, j int) { + s.Pods[i], s.Pods[j] = s.Pods[j], s.Pods[i] + s.Rank[i], s.Rank[j] = s.Rank[j], s.Rank[i] +} + +// Less compares two pods with corresponding ranks and returns true if the first +// one should be preferred for deletion. +func (s ActivePodsWithRanks) Less(i, j int) bool { + // 1. Unassigned < assigned + // If only one of the pods is unassigned, the unassigned one is smaller + if s.Pods[i].Spec.NodeName != s.Pods[j].Spec.NodeName && (len(s.Pods[i].Spec.NodeName) == 0 || len(s.Pods[j].Spec.NodeName) == 0) { + return len(s.Pods[i].Spec.NodeName) == 0 + } + // 2. PodPending < PodUnknown < PodRunning + m := map[v1.PodPhase]int{v1.PodPending: 0, v1.PodUnknown: 1, v1.PodRunning: 2} + if m[s.Pods[i].Status.Phase] != m[s.Pods[j].Status.Phase] { + return m[s.Pods[i].Status.Phase] < m[s.Pods[j].Status.Phase] + } + // 3. Not ready < ready + // If only one of the pods is not ready, the not ready one is smaller + if podutil.IsPodReady(s.Pods[i]) != podutil.IsPodReady(s.Pods[j]) { + return !podutil.IsPodReady(s.Pods[i]) + } + // 4. Doubled up < not doubled up + // If one of the two pods is on the same node as one or more additional + // ready pods that belong to the same replicaset, whichever pod has more + // colocated ready pods is less + if s.Rank[i] != s.Rank[j] { + return s.Rank[i] > s.Rank[j] + } + // TODO: take availability into account when we push minReadySeconds information from deployment into pods, + // see https://github.com/kubernetes/kubernetes/issues/22065 + // 5. Been ready for empty time < less time < more time + // If both pods are ready, the latest ready one is smaller + if podutil.IsPodReady(s.Pods[i]) && podutil.IsPodReady(s.Pods[j]) && !podReadyTime(s.Pods[i]).Equal(podReadyTime(s.Pods[j])) { + return afterOrZero(podReadyTime(s.Pods[i]), podReadyTime(s.Pods[j])) + } + // 6. Pods with containers with higher restart counts < lower restart counts + if maxContainerRestarts(s.Pods[i]) != maxContainerRestarts(s.Pods[j]) { + return maxContainerRestarts(s.Pods[i]) > maxContainerRestarts(s.Pods[j]) + } + // 7. Empty creation time pods < newer pods < older pods + if !s.Pods[i].CreationTimestamp.Equal(&s.Pods[j].CreationTimestamp) { + return afterOrZero(&s.Pods[i].CreationTimestamp, &s.Pods[j].CreationTimestamp) + } + return false +} + // afterOrZero checks if time t1 is after time t2; if one of them // is zero, the zero time is seen as after non-zero time. func afterOrZero(t1, t2 *metav1.Time) bool { diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index 3bdda84a662..837a10b22a8 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -429,6 +429,96 @@ func TestSortingActivePods(t *testing.T) { } } +func TestSortingActivePodsWithRanks(t *testing.T) { + now := metav1.Now() + then := metav1.Time{Time: now.AddDate(0, -1, 0)} + zeroTime := metav1.Time{} + pod := func(podName, nodeName string, phase v1.PodPhase, ready bool, restarts int32, readySince metav1.Time, created metav1.Time) *v1.Pod { + var conditions []v1.PodCondition + var containerStatuses []v1.ContainerStatus + if ready { + conditions = []v1.PodCondition{{Type: v1.PodReady, Status: v1.ConditionTrue, LastTransitionTime: readySince}} + containerStatuses = []v1.ContainerStatus{{RestartCount: restarts}} + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: created, + Name: podName, + }, + Spec: v1.PodSpec{NodeName: nodeName}, + Status: v1.PodStatus{ + Conditions: conditions, + ContainerStatuses: containerStatuses, + Phase: phase, + }, + } + } + var ( + unscheduledPod = pod("unscheduled", "", v1.PodPending, false, 0, zeroTime, zeroTime) + scheduledPendingPod = pod("pending", "node", v1.PodPending, false, 0, zeroTime, zeroTime) + unknownPhasePod = pod("unknown-phase", "node", v1.PodUnknown, false, 0, zeroTime, zeroTime) + runningNotReadyPod = pod("not-ready", "node", v1.PodRunning, false, 0, zeroTime, zeroTime) + runningReadyNoLastTransitionTimePod = pod("ready-no-last-transition-time", "node", v1.PodRunning, true, 0, zeroTime, zeroTime) + runningReadyNow = pod("ready-now", "node", v1.PodRunning, true, 0, now, now) + runningReadyThen = pod("ready-then", "node", v1.PodRunning, true, 0, then, then) + runningReadyNowHighRestarts = pod("ready-high-restarts", "node", v1.PodRunning, true, 9001, now, now) + runningReadyNowCreatedThen = pod("ready-now-created-then", "node", v1.PodRunning, true, 0, now, then) + ) + equalityTests := []*v1.Pod{ + unscheduledPod, + scheduledPendingPod, + unknownPhasePod, + runningNotReadyPod, + runningReadyNowCreatedThen, + runningReadyNow, + runningReadyThen, + runningReadyNowHighRestarts, + runningReadyNowCreatedThen, + } + for _, pod := range equalityTests { + podsWithRanks := ActivePodsWithRanks{ + Pods: []*v1.Pod{pod, pod}, + Rank: []int{1, 1}, + } + if podsWithRanks.Less(0, 1) || podsWithRanks.Less(1, 0) { + t.Errorf("expected pod %q not to be less than than itself", pod.Name) + } + } + type podWithRank struct { + pod *v1.Pod + rank int + } + inequalityTests := []struct { + lesser, greater podWithRank + }{ + {podWithRank{unscheduledPod, 1}, podWithRank{scheduledPendingPod, 2}}, + {podWithRank{unscheduledPod, 2}, podWithRank{scheduledPendingPod, 1}}, + {podWithRank{scheduledPendingPod, 1}, podWithRank{unknownPhasePod, 2}}, + {podWithRank{unknownPhasePod, 1}, podWithRank{runningNotReadyPod, 2}}, + {podWithRank{runningNotReadyPod, 1}, podWithRank{runningReadyNoLastTransitionTimePod, 1}}, + {podWithRank{runningReadyNoLastTransitionTimePod, 1}, podWithRank{runningReadyNow, 1}}, + {podWithRank{runningReadyNow, 2}, podWithRank{runningReadyNoLastTransitionTimePod, 1}}, + {podWithRank{runningReadyNow, 1}, podWithRank{runningReadyThen, 1}}, + {podWithRank{runningReadyNow, 2}, podWithRank{runningReadyThen, 1}}, + {podWithRank{runningReadyNowHighRestarts, 1}, podWithRank{runningReadyNow, 1}}, + {podWithRank{runningReadyNow, 2}, podWithRank{runningReadyNowHighRestarts, 1}}, + {podWithRank{runningReadyNow, 1}, podWithRank{runningReadyNowCreatedThen, 1}}, + {podWithRank{runningReadyNowCreatedThen, 2}, podWithRank{runningReadyNow, 1}}, + } + for _, test := range inequalityTests { + podsWithRanks := ActivePodsWithRanks{ + Pods: []*v1.Pod{test.lesser.pod, test.greater.pod}, + Rank: []int{test.lesser.rank, test.greater.rank}, + } + if !podsWithRanks.Less(0, 1) { + t.Errorf("expected pod %q with rank %v to be less than %q with rank %v", podsWithRanks.Pods[0].Name, podsWithRanks.Rank[0], podsWithRanks.Pods[1].Name, podsWithRanks.Rank[1]) + } + if podsWithRanks.Less(1, 0) { + t.Errorf("expected pod %q with rank %v not to be less than %v with rank %v", podsWithRanks.Pods[1].Name, podsWithRanks.Rank[1], podsWithRanks.Pods[0].Name, podsWithRanks.Rank[0]) + } + } +} + func TestActiveReplicaSetsFiltering(t *testing.T) { var replicaSets []*apps.ReplicaSet replicaSets = append(replicaSets, newReplicaSet("zero", 0)) diff --git a/pkg/controller/replicaset/BUILD b/pkg/controller/replicaset/BUILD index abdef8aeb93..f2be3ee7f53 100644 --- a/pkg/controller/replicaset/BUILD +++ b/pkg/controller/replicaset/BUILD @@ -23,6 +23,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/informers/apps/v1:go_default_library", diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 4e3773aff72..6727df358bb 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -41,6 +41,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" appsinformers "k8s.io/client-go/informers/apps/v1" @@ -193,6 +194,43 @@ func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) { <-stopCh } +// getReplicaSetsWithSameController returns a list of ReplicaSets with the same +// owner as the given ReplicaSet. +func (rsc *ReplicaSetController) getReplicaSetsWithSameController(rs *apps.ReplicaSet) []*apps.ReplicaSet { + controllerRef := metav1.GetControllerOf(rs) + if controllerRef == nil { + utilruntime.HandleError(fmt.Errorf("ReplicaSet has no controller: %v", rs)) + return nil + } + + allRSs, err := rsc.rsLister.ReplicaSets(rs.Namespace).List(labels.Everything()) + if err != nil { + utilruntime.HandleError(err) + return nil + } + + var relatedRSs []*apps.ReplicaSet + for _, r := range allRSs { + if ref := metav1.GetControllerOf(r); ref != nil && ref.UID == controllerRef.UID { + relatedRSs = append(relatedRSs, r) + } + } + + if klog.V(2) { + var related string + if len(relatedRSs) > 0 { + var relatedNames []string + for _, r := range relatedRSs { + relatedNames = append(relatedNames, r.Name) + } + related = ": " + strings.Join(relatedNames, ", ") + } + klog.Infof("Found %d related %vs for %v %s/%s%s", len(relatedRSs), rsc.Kind, rsc.Kind, rs.Namespace, rs.Name, related) + } + + return relatedRSs +} + // getPodReplicaSets returns a list of ReplicaSets matching the given pod. func (rsc *ReplicaSetController) getPodReplicaSets(pod *v1.Pod) []*apps.ReplicaSet { rss, err := rsc.rsLister.GetPodReplicaSets(pod) @@ -515,8 +553,11 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps } klog.V(2).Infof("Too many replicas for %v %s/%s, need %d, deleting %d", rsc.Kind, rs.Namespace, rs.Name, *(rs.Spec.Replicas), diff) + relatedPods, err := rsc.getIndirectlyRelatedPods(rs) + utilruntime.HandleError(err) + // Choose which Pods to delete, preferring those in earlier phases of startup. - podsToDelete := getPodsToDelete(filteredPods, diff) + podsToDelete := getPodsToDelete(filteredPods, relatedPods, diff) // Snapshot the UIDs (ns/name) of the pods we're expecting to see // deleted, so we know to record their expectations exactly once either @@ -681,18 +722,67 @@ func slowStartBatch(count int, initialBatchSize int, fn func() error) (int, erro return successes, nil } -func getPodsToDelete(filteredPods []*v1.Pod, diff int) []*v1.Pod { +// getIndirectlyRelatedPods returns all pods that are owned by any ReplicaSet +// that is owned by the given ReplicaSet's owner. +func (rsc *ReplicaSetController) getIndirectlyRelatedPods(rs *apps.ReplicaSet) ([]*v1.Pod, error) { + var relatedPods []*v1.Pod + seen := make(map[types.UID]*apps.ReplicaSet) + for _, relatedRS := range rsc.getReplicaSetsWithSameController(rs) { + selector, err := metav1.LabelSelectorAsSelector(relatedRS.Spec.Selector) + if err != nil { + return nil, err + } + pods, err := rsc.podLister.Pods(relatedRS.Namespace).List(selector) + if err != nil { + return nil, err + } + for _, pod := range pods { + if otherRS, found := seen[pod.UID]; found { + klog.V(5).Infof("Pod %s/%s is owned by both %v %s/%s and %v %s/%s", pod.Namespace, pod.Name, rsc.Kind, otherRS.Namespace, otherRS.Name, rsc.Kind, relatedRS.Namespace, relatedRS.Name) + continue + } + seen[pod.UID] = relatedRS + relatedPods = append(relatedPods, pod) + } + } + if klog.V(4) { + var relatedNames []string + for _, related := range relatedPods { + relatedNames = append(relatedNames, related.Name) + } + klog.Infof("Found %d related pods for %v %s/%s: %v", len(relatedPods), rsc.Kind, rs.Namespace, rs.Name, strings.Join(relatedNames, ", ")) + } + return relatedPods, nil +} + +func getPodsToDelete(filteredPods, relatedPods []*v1.Pod, diff int) []*v1.Pod { // No need to sort pods if we are about to delete all of them. // diff will always be <= len(filteredPods), so not need to handle > case. if diff < len(filteredPods) { - // Sort the pods in the order such that not-ready < ready, unscheduled - // < scheduled, and pending < running. This ensures that we delete pods - // in the earlier stages whenever possible. - sort.Sort(controller.ActivePods(filteredPods)) + podsWithRanks := getPodsRankedByRelatedPodsOnSameNode(filteredPods, relatedPods) + sort.Sort(podsWithRanks) } return filteredPods[:diff] } +// getPodsRankedByRelatedPodsOnSameNode returns an ActivePodsWithRanks value +// that wraps podsToRank and assigns each pod a rank equal to the number of +// active pods in relatedPods that are colocated on the same node with the pod. +// relatedPods generally should be a superset of podsToRank. +func getPodsRankedByRelatedPodsOnSameNode(podsToRank, relatedPods []*v1.Pod) controller.ActivePodsWithRanks { + podsOnNode := make(map[string]int) + for _, pod := range relatedPods { + if controller.IsPodActive(pod) { + podsOnNode[pod.Spec.NodeName]++ + } + } + ranks := make([]int, len(podsToRank)) + for i, pod := range podsToRank { + ranks[i] = podsOnNode[pod.Spec.NodeName] + } + return controller.ActivePodsWithRanks{Pods: podsToRank, Rank: ranks} +} + func getPodKeys(pods []*v1.Pod) []string { podKeys := make([]string, 0, len(pods)) for _, pod := range pods { diff --git a/pkg/controller/replicaset/replica_set_test.go b/pkg/controller/replicaset/replica_set_test.go index 9b468657845..4f6dafe3eba 100644 --- a/pkg/controller/replicaset/replica_set_test.go +++ b/pkg/controller/replicaset/replica_set_test.go @@ -23,6 +23,7 @@ import ( "net/http/httptest" "net/url" "reflect" + "sort" "strings" "sync" "testing" @@ -80,12 +81,16 @@ func skipListerFunc(verb string, url url.URL) bool { var alwaysReady = func() bool { return true } func newReplicaSet(replicas int, selectorMap map[string]string) *apps.ReplicaSet { + isController := true rs := &apps.ReplicaSet{ TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: "ReplicaSet"}, ObjectMeta: metav1.ObjectMeta{ - UID: uuid.NewUUID(), - Name: "foobar", - Namespace: metav1.NamespaceDefault, + UID: uuid.NewUUID(), + Name: "foobar", + Namespace: metav1.NamespaceDefault, + OwnerReferences: []metav1.OwnerReference{ + {UID: "123", Controller: &isController}, + }, ResourceVersion: "18", }, Spec: apps.ReplicaSetSpec{ @@ -136,6 +141,7 @@ func newPod(name string, rs *apps.ReplicaSet, status v1.PodPhase, lastTransition } return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ + UID: uuid.NewUUID(), Name: name, Namespace: rs.Namespace, Labels: rs.Spec.Selector.MatchLabels, @@ -342,6 +348,68 @@ func TestSyncReplicaSetDormancy(t *testing.T) { fakeHandler.ValidateRequestCount(t, 2) } +func TestGetReplicaSetsWithSameController(t *testing.T) { + someRS := newReplicaSet(1, map[string]string{"foo": "bar"}) + someRS.Name = "rs1" + relatedRS := newReplicaSet(1, map[string]string{"foo": "baz"}) + relatedRS.Name = "rs2" + unrelatedRS := newReplicaSet(1, map[string]string{"foo": "quux"}) + unrelatedRS.Name = "rs3" + unrelatedRS.ObjectMeta.OwnerReferences[0].UID = "456" + pendingDeletionRS := newReplicaSet(1, map[string]string{"foo": "xyzzy"}) + pendingDeletionRS.Name = "rs4" + pendingDeletionRS.ObjectMeta.OwnerReferences[0].UID = "789" + now := metav1.Now() + pendingDeletionRS.DeletionTimestamp = &now + + stopCh := make(chan struct{}) + defer close(stopCh) + manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas) + testCases := []struct { + name string + rss []*apps.ReplicaSet + rs *apps.ReplicaSet + expectedRSs []*apps.ReplicaSet + }{ + { + name: "expect to get back a ReplicaSet that is pending deletion", + rss: []*apps.ReplicaSet{pendingDeletionRS, unrelatedRS}, + rs: pendingDeletionRS, + expectedRSs: []*apps.ReplicaSet{pendingDeletionRS}, + }, + { + name: "expect to get back only the given ReplicaSet if there is no related ReplicaSet", + rss: []*apps.ReplicaSet{someRS, unrelatedRS}, + rs: someRS, + expectedRSs: []*apps.ReplicaSet{someRS}, + }, + { + name: "expect to get back the given ReplicaSet as well as any related ReplicaSet but not an unrelated ReplicaSet", + rss: []*apps.ReplicaSet{someRS, relatedRS, unrelatedRS}, + rs: someRS, + expectedRSs: []*apps.ReplicaSet{someRS, relatedRS}, + }, + } + for _, c := range testCases { + for _, r := range c.rss { + informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r) + } + actualRSs := manager.getReplicaSetsWithSameController(c.rs) + var actualRSNames, expectedRSNames []string + for _, r := range actualRSs { + actualRSNames = append(actualRSNames, r.Name) + } + for _, r := range c.expectedRSs { + expectedRSNames = append(expectedRSNames, r.Name) + } + sort.Strings(actualRSNames) + sort.Strings(expectedRSNames) + if !reflect.DeepEqual(actualRSNames, expectedRSNames) { + t.Errorf("Got [%s]; expected [%s]", strings.Join(actualRSNames, ", "), strings.Join(expectedRSNames, ", ")) + } + } +} + func TestPodControllerLookup(t *testing.T) { stopCh := make(chan struct{}) defer close(stopCh) @@ -408,6 +476,87 @@ func TestPodControllerLookup(t *testing.T) { } } +// byName sorts pods by their names. +type byName []*v1.Pod + +func (pods byName) Len() int { return len(pods) } +func (pods byName) Swap(i, j int) { pods[i], pods[j] = pods[j], pods[i] } +func (pods byName) Less(i, j int) bool { return pods[i].Name < pods[j].Name } + +func TestRelatedPodsLookup(t *testing.T) { + someRS := newReplicaSet(1, map[string]string{"foo": "bar"}) + someRS.Name = "foo1" + relatedRS := newReplicaSet(1, map[string]string{"foo": "baz"}) + relatedRS.Name = "foo2" + unrelatedRS := newReplicaSet(1, map[string]string{"foo": "quux"}) + unrelatedRS.Name = "bar1" + unrelatedRS.ObjectMeta.OwnerReferences[0].UID = "456" + pendingDeletionRS := newReplicaSet(1, map[string]string{"foo": "xyzzy"}) + pendingDeletionRS.Name = "foo3" + pendingDeletionRS.ObjectMeta.OwnerReferences[0].UID = "789" + now := metav1.Now() + pendingDeletionRS.DeletionTimestamp = &now + pod1 := newPod("pod1", someRS, v1.PodRunning, nil, true) + pod2 := newPod("pod2", someRS, v1.PodRunning, nil, true) + pod3 := newPod("pod3", relatedRS, v1.PodRunning, nil, true) + pod4 := newPod("pod4", unrelatedRS, v1.PodRunning, nil, true) + + stopCh := make(chan struct{}) + defer close(stopCh) + manager, informers := testNewReplicaSetControllerFromClient(clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: &schema.GroupVersion{Group: "", Version: "v1"}}}), stopCh, BurstReplicas) + testCases := []struct { + name string + rss []*apps.ReplicaSet + pods []*v1.Pod + rs *apps.ReplicaSet + expectedPodNames []string + }{ + { + name: "expect to get a pod even if its owning ReplicaSet is pending deletion", + rss: []*apps.ReplicaSet{pendingDeletionRS, unrelatedRS}, + rs: pendingDeletionRS, + pods: []*v1.Pod{newPod("pod", pendingDeletionRS, v1.PodRunning, nil, true)}, + expectedPodNames: []string{"pod"}, + }, + { + name: "expect to get only the ReplicaSet's own pods if there is no related ReplicaSet", + rss: []*apps.ReplicaSet{someRS, unrelatedRS}, + rs: someRS, + pods: []*v1.Pod{pod1, pod2, pod4}, + expectedPodNames: []string{"pod1", "pod2"}, + }, + { + name: "expect to get own pods as well as any related ReplicaSet's but not an unrelated ReplicaSet's", + rss: []*apps.ReplicaSet{someRS, relatedRS, unrelatedRS}, + rs: someRS, + pods: []*v1.Pod{pod1, pod2, pod3, pod4}, + expectedPodNames: []string{"pod1", "pod2", "pod3"}, + }, + } + for _, c := range testCases { + for _, r := range c.rss { + informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(r) + } + for _, pod := range c.pods { + informers.Core().V1().Pods().Informer().GetIndexer().Add(pod) + manager.addPod(pod) + } + actualPods, err := manager.getIndirectlyRelatedPods(c.rs) + if err != nil { + t.Errorf("Unexpected error from getIndirectlyRelatedPods: %v", err) + } + var actualPodNames []string + for _, pod := range actualPods { + actualPodNames = append(actualPodNames, pod.Name) + } + sort.Strings(actualPodNames) + sort.Strings(c.expectedPodNames) + if !reflect.DeepEqual(actualPodNames, c.expectedPodNames) { + t.Errorf("Got [%s]; expected [%s]", strings.Join(actualPodNames, ", "), strings.Join(c.expectedPodNames, ", ")) + } + } +} + func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() client := fake.NewSimpleClientset() @@ -1445,10 +1594,19 @@ func TestGetPodsToDelete(t *testing.T) { Status: v1.ConditionFalse, }, } - // a scheduled, running, ready pod - scheduledRunningReadyPod := newPod("scheduled-running-ready-pod", rs, v1.PodRunning, nil, true) - scheduledRunningReadyPod.Spec.NodeName = "fake-node" - scheduledRunningReadyPod.Status.Conditions = []v1.PodCondition{ + // a scheduled, running, ready pod on fake-node-1 + scheduledRunningReadyPodOnNode1 := newPod("scheduled-running-ready-pod-on-node-1", rs, v1.PodRunning, nil, true) + scheduledRunningReadyPodOnNode1.Spec.NodeName = "fake-node-1" + scheduledRunningReadyPodOnNode1.Status.Conditions = []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + }, + } + // a scheduled, running, ready pod on fake-node-2 + scheduledRunningReadyPodOnNode2 := newPod("scheduled-running-ready-pod-on-node-2", rs, v1.PodRunning, nil, true) + scheduledRunningReadyPodOnNode2.Spec.NodeName = "fake-node-2" + scheduledRunningReadyPodOnNode2.Status.Conditions = []v1.PodCondition{ { Type: v1.PodReady, Status: v1.ConditionTrue, @@ -1456,8 +1614,10 @@ func TestGetPodsToDelete(t *testing.T) { } tests := []struct { - name string - pods []*v1.Pod + name string + pods []*v1.Pod + // related defaults to pods if nil. + related []*v1.Pod diff int expectedPodsToDelete []*v1.Pod }{ @@ -1465,7 +1625,8 @@ func TestGetPodsToDelete(t *testing.T) { // an unscheduled, pending pod // a scheduled, pending pod // a scheduled, running, not-ready pod - // a scheduled, running, ready pod + // a scheduled, running, ready pod on same node as a related pod + // a scheduled, running, ready pod not on node with related pods // Note that a pending pod cannot be ready { name: "len(pods) = 0 (i.e., diff = 0 too)", @@ -1477,15 +1638,15 @@ func TestGetPodsToDelete(t *testing.T) { name: "diff = len(pods)", pods: []*v1.Pod{ scheduledRunningNotReadyPod, - scheduledRunningReadyPod, + scheduledRunningReadyPodOnNode1, }, diff: 2, - expectedPodsToDelete: []*v1.Pod{scheduledRunningNotReadyPod, scheduledRunningReadyPod}, + expectedPodsToDelete: []*v1.Pod{scheduledRunningNotReadyPod, scheduledRunningReadyPodOnNode1}, }, { name: "diff < len(pods)", pods: []*v1.Pod{ - scheduledRunningReadyPod, + scheduledRunningReadyPodOnNode1, scheduledRunningNotReadyPod, }, diff: 1, @@ -1494,14 +1655,39 @@ func TestGetPodsToDelete(t *testing.T) { { name: "various pod phases and conditions, diff = len(pods)", pods: []*v1.Pod{ - scheduledRunningReadyPod, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, scheduledRunningNotReadyPod, scheduledPendingPod, unscheduledPendingPod, }, - diff: 4, + diff: 6, expectedPodsToDelete: []*v1.Pod{ - scheduledRunningReadyPod, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, + scheduledRunningNotReadyPod, + scheduledPendingPod, + unscheduledPendingPod, + }, + }, + { + name: "various pod phases and conditions, diff = len(pods), relatedPods empty", + pods: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, + scheduledRunningNotReadyPod, + scheduledPendingPod, + unscheduledPendingPod, + }, + related: []*v1.Pod{}, + diff: 6, + expectedPodsToDelete: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, scheduledRunningNotReadyPod, scheduledPendingPod, unscheduledPendingPod, @@ -1521,7 +1707,7 @@ func TestGetPodsToDelete(t *testing.T) { { name: "ready vs not-ready, diff < len(pods)", pods: []*v1.Pod{ - scheduledRunningReadyPod, + scheduledRunningReadyPodOnNode1, scheduledRunningNotReadyPod, scheduledRunningNotReadyPod, }, @@ -1531,6 +1717,22 @@ func TestGetPodsToDelete(t *testing.T) { scheduledRunningNotReadyPod, }, }, + { + name: "ready and colocated with another ready pod vs not colocated, diff < len(pods)", + pods: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, + }, + related: []*v1.Pod{ + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, + scheduledRunningReadyPodOnNode2, + }, + diff: 1, + expectedPodsToDelete: []*v1.Pod{ + scheduledRunningReadyPodOnNode2, + }, + }, { name: "pending vs running, diff < len(pods)", pods: []*v1.Pod{ @@ -1545,7 +1747,8 @@ func TestGetPodsToDelete(t *testing.T) { { name: "various pod phases and conditions, diff < len(pods)", pods: []*v1.Pod{ - scheduledRunningReadyPod, + scheduledRunningReadyPodOnNode1, + scheduledRunningReadyPodOnNode2, scheduledRunningNotReadyPod, scheduledPendingPod, unscheduledPendingPod, @@ -1560,7 +1763,11 @@ func TestGetPodsToDelete(t *testing.T) { } for _, test := range tests { - podsToDelete := getPodsToDelete(test.pods, test.diff) + related := test.related + if related == nil { + related = test.pods + } + podsToDelete := getPodsToDelete(test.pods, related, test.diff) if len(podsToDelete) != len(test.expectedPodsToDelete) { t.Errorf("%s: unexpected pods to delete, expected %v, got %v", test.name, test.expectedPodsToDelete, podsToDelete) } diff --git a/test/e2e/apps/deployment.go b/test/e2e/apps/deployment.go index 67e0087c8cc..94c375fe252 100644 --- a/test/e2e/apps/deployment.go +++ b/test/e2e/apps/deployment.go @@ -42,6 +42,7 @@ import ( e2edeploy "k8s.io/kubernetes/test/e2e/framework/deployment" e2epod "k8s.io/kubernetes/test/e2e/framework/pod" "k8s.io/kubernetes/test/e2e/framework/replicaset" + e2eservice "k8s.io/kubernetes/test/e2e/framework/service" testutil "k8s.io/kubernetes/test/utils" utilpointer "k8s.io/utils/pointer" ) @@ -119,6 +120,10 @@ var _ = SIGDescribe("Deployment", func() { framework.ConformanceIt("deployment should support proportional scaling", func() { testProportionalScalingDeployment(f) }) + ginkgo.It("should not disrupt a cloud load-balancer's connectivity during rollout", func() { + framework.SkipUnlessProviderIs("aws", "azure", "gce", "gke") + testRollingUpdateDeploymentWithLocalTrafficLoadBalancer(f) + }) // TODO: add tests that cover deployment.Spec.MinReadySeconds once we solved clock-skew issues // See https://github.com/kubernetes/kubernetes/issues/29229 }) @@ -856,3 +861,151 @@ func orphanDeploymentReplicaSets(c clientset.Interface, d *appsv1.Deployment) er deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(d.UID)) return c.AppsV1().Deployments(d.Namespace).Delete(d.Name, deleteOptions) } + +func testRollingUpdateDeploymentWithLocalTrafficLoadBalancer(f *framework.Framework) { + ns := f.Namespace.Name + c := f.ClientSet + + name := "test-rolling-update-with-lb" + framework.Logf("Creating Deployment %q", name) + podLabels := map[string]string{"name": name} + replicas := int32(3) + d := e2edeploy.NewDeployment(name, replicas, podLabels, AgnhostImageName, AgnhostImage, appsv1.RollingUpdateDeploymentStrategyType) + // NewDeployment assigned the same value to both d.Spec.Selector and + // d.Spec.Template.Labels, so mutating the one would mutate the other. + // Thus we need to set d.Spec.Template.Labels to a new value if we want + // to mutate it alone. + d.Spec.Template.Labels = map[string]string{ + "iteration": "0", + "name": name, + } + d.Spec.Template.Spec.Containers[0].Args = []string{"netexec", "--http-port=80", "--udp-port=80"} + // To ensure that a node that had a local endpoint prior to a rolling + // update continues to have a local endpoint throughout the rollout, we + // need an affinity policy that will cause pods to be scheduled on the + // same nodes as old pods, and we need the deployment to scale up a new + // pod before deleting an old pod. This affinity policy will define + // inter-pod affinity for pods of different rollouts and anti-affinity + // for pods of the same rollout, so it will need to be updated when + // performing a rollout. + setAffinity(d) + d.Spec.Strategy.RollingUpdate = &appsv1.RollingUpdateDeployment{ + MaxSurge: intOrStrP(1), + MaxUnavailable: intOrStrP(0), + } + deployment, err := c.AppsV1().Deployments(ns).Create(d) + framework.ExpectNoError(err) + err = e2edeploy.WaitForDeploymentComplete(c, deployment) + framework.ExpectNoError(err) + + framework.Logf("Creating a service %s with type=LoadBalancer and externalTrafficPolicy=Local in namespace %s", name, ns) + jig := e2eservice.NewTestJig(c, name) + jig.Labels = podLabels + service := jig.CreateLoadBalancerService(ns, name, e2eservice.LoadBalancerCreateTimeoutDefault, func(svc *v1.Service) { + svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyTypeLocal + }) + + lbNameOrAddress := e2eservice.GetIngressPoint(&service.Status.LoadBalancer.Ingress[0]) + svcPort := int(service.Spec.Ports[0].Port) + + framework.Logf("Hitting the replica set's pods through the service's load balancer") + timeout := e2eservice.LoadBalancerLagTimeoutDefault + if framework.ProviderIs("aws") { + timeout = e2eservice.LoadBalancerLagTimeoutAWS + } + e2eservice.TestReachableHTTP(lbNameOrAddress, svcPort, timeout) + + framework.Logf("Starting a goroutine to watch the service's endpoints in the background") + done := make(chan struct{}) + failed := make(chan struct{}) + defer close(done) + go func() { + defer ginkgo.GinkgoRecover() + expectedNodes := jig.GetEndpointNodeNames(service) + // The affinity policy should ensure that before an old pod is + // deleted, a new pod will have been created on the same node. + // Thus the set of nodes with local endpoints for the service + // should remain unchanged. + wait.Until(func() { + actualNodes := jig.GetEndpointNodeNames(service) + if !actualNodes.Equal(expectedNodes) { + framework.Logf("The set of nodes with local endpoints changed; started with %v, now have %v", expectedNodes.List(), actualNodes.List()) + failed <- struct{}{} + } + }, framework.Poll, done) + }() + + framework.Logf("Triggering a rolling deployment several times") + for i := 1; i <= 3; i++ { + framework.Logf("Updating label deployment %q pod spec (iteration #%d)", name, i) + deployment, err = e2edeploy.UpdateDeploymentWithRetries(c, ns, d.Name, func(update *appsv1.Deployment) { + update.Spec.Template.Labels["iteration"] = fmt.Sprintf("%d", i) + setAffinity(update) + }) + framework.ExpectNoError(err) + + framework.Logf("Waiting for observed generation %d", deployment.Generation) + err = e2edeploy.WaitForObservedDeployment(c, ns, name, deployment.Generation) + framework.ExpectNoError(err) + + framework.Logf("Make sure deployment %q is complete", name) + err = e2edeploy.WaitForDeploymentCompleteAndCheckRolling(c, deployment) + framework.ExpectNoError(err) + } + + select { + case <-failed: + framework.Failf("Connectivity to the load balancer was interrupted") + case <-time.After(1 * time.Minute): + } +} + +func setAffinity(d *appsv1.Deployment) { + d.Spec.Template.Spec.Affinity = &v1.Affinity{ + PodAffinity: &v1.PodAffinity{ + PreferredDuringSchedulingIgnoredDuringExecution: []v1.WeightedPodAffinityTerm{ + { + Weight: int32(100), + PodAffinityTerm: v1.PodAffinityTerm{ + TopologyKey: "kubernetes.io/hostname", + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{d.Spec.Template.Labels["name"]}, + }, + { + Key: "iteration", + Operator: metav1.LabelSelectorOpNotIn, + Values: []string{d.Spec.Template.Labels["iteration"]}, + }, + }, + }, + }, + }, + }, + }, + PodAntiAffinity: &v1.PodAntiAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{ + { + TopologyKey: "kubernetes.io/hostname", + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "name", + Operator: metav1.LabelSelectorOpIn, + Values: []string{d.Spec.Template.Labels["name"]}, + }, + { + Key: "iteration", + Operator: metav1.LabelSelectorOpIn, + Values: []string{d.Spec.Template.Labels["iteration"]}, + }, + }, + }, + }, + }, + }, + } +} diff --git a/test/e2e/framework/service/jig.go b/test/e2e/framework/service/jig.go index 49fae155d8d..ec1196d8dd1 100644 --- a/test/e2e/framework/service/jig.go +++ b/test/e2e/framework/service/jig.go @@ -266,6 +266,19 @@ func (j *TestJig) CreateLoadBalancerService(namespace, serviceName string, timeo func (j *TestJig) GetEndpointNodes(svc *v1.Service) map[string][]string { nodes, err := e2enode.GetBoundedReadySchedulableNodes(j.Client, MaxNodesForEndpointsTests) framework.ExpectNoError(err) + epNodes := j.GetEndpointNodeNames(svc) + nodeMap := map[string][]string{} + for _, n := range nodes.Items { + if epNodes.Has(n.Name) { + nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP) + } + } + return nodeMap +} + +// GetEndpointNodeNames returns a string set of node names on which the +// endpoints of the given Service are running. +func (j *TestJig) GetEndpointNodeNames(svc *v1.Service) sets.String { endpoints, err := j.Client.CoreV1().Endpoints(svc.Namespace).Get(svc.Name, metav1.GetOptions{}) if err != nil { framework.Failf("Get endpoints for service %s/%s failed (%s)", svc.Namespace, svc.Name, err) @@ -281,13 +294,7 @@ func (j *TestJig) GetEndpointNodes(svc *v1.Service) map[string][]string { } } } - nodeMap := map[string][]string{} - for _, n := range nodes.Items { - if epNodes.Has(n.Name) { - nodeMap[n.Name] = e2enode.GetAddresses(&n, v1.NodeExternalIP) - } - } - return nodeMap + return epNodes } // WaitForEndpointOnNode waits for a service endpoint on the given node.