diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 209cad19cfe..d0be95b4b7f 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -402,9 +402,6 @@ func (e *Controller) syncService(ctx context.Context, key string) error { return err } - // If the user specified the older (deprecated) annotation, we have to respect it. - tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses - // We call ComputeEndpointLastChangeTriggerTime here to make sure that the // state of the trigger time tracker gets updated even if the sync turns out // to be no-op and we don't update the endpoints object. @@ -416,12 +413,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error { var totalNotReadyEps int for _, pod := range pods { - if len(pod.Status.PodIP) == 0 { - klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) - continue - } - if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil { - klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name) + if !endpointutil.ShouldPodBeInEndpointSlice(pod, service.Spec.PublishNotReadyAddresses) { + klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name) continue } @@ -441,7 +434,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { // Allow headless service not to have ports. if len(service.Spec.Ports) == 0 { if service.Spec.ClusterIP == api.ClusterIPNone { - subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints) + subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses) // No need to repack subsets for headless service without ports. } } else { @@ -455,7 +448,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error { epp := endpointPortFromServicePort(servicePort, portNum) var readyEps, notReadyEps int - subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints) + subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses) totalReadyEps = totalReadyEps + readyEps totalNotReadyEps = totalNotReadyEps + notReadyEps } @@ -591,6 +584,10 @@ func (e *Controller) checkLeftoverEndpoints() { } } +// addEndpointSubset add the endpoints addresses and ports to the EndpointSubset. +// The addresses are added to the corresponding field, ready or not ready, depending +// on the pod status and the Service PublishNotReadyAddresses field value. +// The pod passed to this function must have already been filtered through ShouldPodBeInEndpointSlice. func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) { var readyEps int @@ -605,7 +602,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint Ports: ports, }) readyEps++ - } else if shouldPodBeInEndpoints(pod) { + } else { // if it is not a ready address it has to be not ready klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name) subsets = append(subsets, v1.EndpointSubset{ NotReadyAddresses: []v1.EndpointAddress{epa}, @@ -616,17 +613,6 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint return subsets, readyEps, notReadyEps } -func shouldPodBeInEndpoints(pod *v1.Pod) bool { - switch pod.Spec.RestartPolicy { - case v1.RestartPolicyNever: - return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded - case v1.RestartPolicyOnFailure: - return pod.Status.Phase != v1.PodSucceeded - default: - return true - } -} - func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort { epp := &v1.EndpointPort{ Name: servicePort.Name, diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 72b7f6813dd..a6514701ab4 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -1155,97 +1155,6 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) { endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data) } -// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here. -// Just list all of the 3 false cases and 3 of the 12 true cases. -func TestShouldPodBeInEndpoints(t *testing.T) { - testCases := []struct { - name string - pod *v1.Pod - expected bool - }{ - // Pod should not be in endpoints cases: - { - name: "Failed pod with Never RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyNever, - }, - Status: v1.PodStatus{ - Phase: v1.PodFailed, - }, - }, - expected: false, - }, - { - name: "Succeeded pod with Never RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyNever, - }, - Status: v1.PodStatus{ - Phase: v1.PodSucceeded, - }, - }, - expected: false, - }, - { - name: "Succeeded pod with OnFailure RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyOnFailure, - }, - Status: v1.PodStatus{ - Phase: v1.PodSucceeded, - }, - }, - expected: false, - }, - // Pod should be in endpoints cases: - { - name: "Failed pod with Always RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyAlways, - }, - Status: v1.PodStatus{ - Phase: v1.PodFailed, - }, - }, - expected: true, - }, - { - name: "Pending pod with Never RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyNever, - }, - Status: v1.PodStatus{ - Phase: v1.PodPending, - }, - }, - expected: true, - }, - { - name: "Unknown pod with OnFailure RestartPolicy", - pod: &v1.Pod{ - Spec: v1.PodSpec{ - RestartPolicy: v1.RestartPolicyOnFailure, - }, - Status: v1.PodStatus{ - Phase: v1.PodUnknown, - }, - }, - expected: true, - }, - } - for _, test := range testCases { - result := shouldPodBeInEndpoints(test.pod) - if result != test.expected { - t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result) - } - } -} - func TestPodToEndpointAddressForService(t *testing.T) { ipv4 := v1.IPv4Protocol ipv6 := v1.IPv6Protocol @@ -2313,6 +2222,235 @@ func TestMultipleServiceChanges(t *testing.T) { close(stopChan) } +func TestSyncServiceAddresses(t *testing.T) { + makeService := func(tolerateUnready bool) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + PublishNotReadyAddresses: tolerateUnready, + Type: v1.ServiceTypeClusterIP, + ClusterIP: "1.1.1.1", + Ports: []v1.ServicePort{{Port: 80}}, + }, + } + } + + makePod := func(phase v1.PodPhase, isReady bool, terminating bool) *v1.Pod { + statusCondition := v1.ConditionFalse + if isReady { + statusCondition = v1.ConditionTrue + } + + now := metav1.Now() + deletionTimestamp := &now + if !terminating { + deletionTimestamp = nil + } + return &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns", + Name: "fakepod", + DeletionTimestamp: deletionTimestamp, + Labels: map[string]string{"foo": "bar"}, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{{Ports: []v1.ContainerPort{ + {Name: "port1", ContainerPort: int32(8080)}, + }}}, + }, + Status: v1.PodStatus{ + Phase: phase, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: statusCondition, + }, + }, + PodIP: "10.1.1.1", + PodIPs: []v1.PodIP{ + {IP: "10.1.1.1"}, + }, + }, + } + } + + testCases := []struct { + name string + pod *v1.Pod + service *v1.Service + expectedReady int + expectedUnready int + }{ + { + name: "pod running phase", + pod: makePod(v1.PodRunning, true, false), + service: makeService(false), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod running phase being deleted", + pod: makePod(v1.PodRunning, true, true), + service: makeService(false), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod unknown phase container ready", + pod: makePod(v1.PodUnknown, true, false), + service: makeService(false), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod unknown phase container ready being deleted", + pod: makePod(v1.PodUnknown, true, true), + service: makeService(false), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod pending phase container ready", + pod: makePod(v1.PodPending, true, false), + service: makeService(false), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod pending phase container ready being deleted", + pod: makePod(v1.PodPending, true, true), + service: makeService(false), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod unknown phase container not ready", + pod: makePod(v1.PodUnknown, false, false), + service: makeService(false), + expectedReady: 0, + expectedUnready: 1, + }, + { + name: "pod pending phase container not ready", + pod: makePod(v1.PodPending, false, false), + service: makeService(false), + expectedReady: 0, + expectedUnready: 1, + }, + { + name: "pod failed phase", + pod: makePod(v1.PodFailed, false, false), + service: makeService(false), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod succeeded phase", + pod: makePod(v1.PodSucceeded, false, false), + service: makeService(false), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod running phase and tolerate unready", + pod: makePod(v1.PodRunning, false, false), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod running phase and tolerate unready being deleted", + pod: makePod(v1.PodRunning, false, true), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod unknown phase and tolerate unready", + pod: makePod(v1.PodUnknown, false, false), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod unknown phase and tolerate unready being deleted", + pod: makePod(v1.PodUnknown, false, true), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod pending phase and tolerate unready", + pod: makePod(v1.PodPending, false, false), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod pending phase and tolerate unready being deleted", + pod: makePod(v1.PodPending, false, true), + service: makeService(true), + expectedReady: 1, + expectedUnready: 0, + }, + { + name: "pod failed phase and tolerate unready", + pod: makePod(v1.PodFailed, false, false), + service: makeService(true), + expectedReady: 0, + expectedUnready: 0, + }, + { + name: "pod succeeded phase and tolerate unready endpoints", + pod: makePod(v1.PodSucceeded, false, false), + service: makeService(true), + expectedReady: 0, + expectedUnready: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ns := tc.service.Namespace + client, c := newFakeController(0 * time.Second) + + err := c.podStore.Add(tc.pod) + if err != nil { + t.Errorf("Unexpected error adding pod %v", err) + } + err = c.serviceStore.Add(tc.service) + if err != nil { + t.Errorf("Unexpected error adding service %v", err) + } + err = c.syncService(context.TODO(), fmt.Sprintf("%s/%s", ns, tc.service.Name)) + if err != nil { + t.Errorf("Unexpected error syncing service %v", err) + } + + endpoints, err := client.CoreV1().Endpoints(ns).Get(context.TODO(), tc.service.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + + readyEndpoints := 0 + unreadyEndpoints := 0 + for _, subset := range endpoints.Subsets { + readyEndpoints += len(subset.Addresses) + unreadyEndpoints += len(subset.NotReadyAddresses) + } + + if tc.expectedReady != readyEndpoints { + t.Errorf("Expected %d ready endpoints, got %d", tc.expectedReady, readyEndpoints) + } + + if tc.expectedUnready != unreadyEndpoints { + t.Errorf("Expected %d ready endpoints, got %d", tc.expectedUnready, unreadyEndpoints) + } + }) + } +} + func TestEndpointsDeletionEvents(t *testing.T) { ns := metav1.NamespaceDefault testServer, _ := makeTestServer(t, ns) diff --git a/pkg/controller/util/endpoint/controller_utils.go b/pkg/controller/util/endpoint/controller_utils.go index 411218c5f3b..391c6fb59c9 100644 --- a/pkg/controller/util/endpoint/controller_utils.go +++ b/pkg/controller/util/endpoint/controller_utils.go @@ -135,7 +135,7 @@ func DeepHashObjectToString(objectToWrite interface{}) string { return hex.EncodeToString(hasher.Sum(nil)[0:]) } -// ShouldPodBeInEndpointSlice returns true if a specified pod should be in an EndpointSlice object. +// ShouldPodBeInEndpointSlice returns true if a specified pod should be in an Endpoint or EndpointSlice object. // Terminating pods are only included if includeTerminating is true func ShouldPodBeInEndpointSlice(pod *v1.Pod, includeTerminating bool) bool { // "Terminal" describes when a Pod is complete (in a succeeded or failed phase).