mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
endpoints controller: don't consider terminal endpoints
Terminal pods, whose phase its Failed or Succeeded, are guaranteed to never regress and to be stopped, so their IPs never should be published on the Endpoints.
This commit is contained in:
parent
b905c2870b
commit
aa35f6f160
@ -402,9 +402,6 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// If the user specified the older (deprecated) annotation, we have to respect it.
|
||||
tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses
|
||||
|
||||
// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
|
||||
// state of the trigger time tracker gets updated even if the sync turns out
|
||||
// to be no-op and we don't update the endpoints object.
|
||||
@ -416,12 +413,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
||||
var totalNotReadyEps int
|
||||
|
||||
for _, pod := range pods {
|
||||
if len(pod.Status.PodIP) == 0 {
|
||||
klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
|
||||
continue
|
||||
}
|
||||
if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
|
||||
klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
|
||||
if !endpointutil.ShouldPodBeInEndpointSlice(pod, service.Spec.PublishNotReadyAddresses) {
|
||||
klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
@ -441,7 +434,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
||||
// Allow headless service not to have ports.
|
||||
if len(service.Spec.Ports) == 0 {
|
||||
if service.Spec.ClusterIP == api.ClusterIPNone {
|
||||
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
|
||||
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
|
||||
// No need to repack subsets for headless service without ports.
|
||||
}
|
||||
} else {
|
||||
@ -455,7 +448,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
|
||||
epp := endpointPortFromServicePort(servicePort, portNum)
|
||||
|
||||
var readyEps, notReadyEps int
|
||||
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
|
||||
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
|
||||
totalReadyEps = totalReadyEps + readyEps
|
||||
totalNotReadyEps = totalNotReadyEps + notReadyEps
|
||||
}
|
||||
@ -591,6 +584,10 @@ func (e *Controller) checkLeftoverEndpoints() {
|
||||
}
|
||||
}
|
||||
|
||||
// addEndpointSubset add the endpoints addresses and ports to the EndpointSubset.
|
||||
// The addresses are added to the corresponding field, ready or not ready, depending
|
||||
// on the pod status and the Service PublishNotReadyAddresses field value.
|
||||
// The pod passed to this function must have already been filtered through ShouldPodBeInEndpointSlice.
|
||||
func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
|
||||
epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
|
||||
var readyEps int
|
||||
@ -605,7 +602,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
|
||||
Ports: ports,
|
||||
})
|
||||
readyEps++
|
||||
} else if shouldPodBeInEndpoints(pod) {
|
||||
} else { // if it is not a ready address it has to be not ready
|
||||
klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
|
||||
subsets = append(subsets, v1.EndpointSubset{
|
||||
NotReadyAddresses: []v1.EndpointAddress{epa},
|
||||
@ -616,17 +613,6 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
|
||||
return subsets, readyEps, notReadyEps
|
||||
}
|
||||
|
||||
func shouldPodBeInEndpoints(pod *v1.Pod) bool {
|
||||
switch pod.Spec.RestartPolicy {
|
||||
case v1.RestartPolicyNever:
|
||||
return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
|
||||
case v1.RestartPolicyOnFailure:
|
||||
return pod.Status.Phase != v1.PodSucceeded
|
||||
default:
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort {
|
||||
epp := &v1.EndpointPort{
|
||||
Name: servicePort.Name,
|
||||
|
@ -1155,97 +1155,6 @@ func TestSyncEndpointsHeadlessWithoutPort(t *testing.T) {
|
||||
endpointsHandler.ValidateRequest(t, "/api/v1/namespaces/"+ns+"/endpoints", "POST", &data)
|
||||
}
|
||||
|
||||
// There are 3*5 possibilities(3 types of RestartPolicy by 5 types of PodPhase). Not list them all here.
|
||||
// Just list all of the 3 false cases and 3 of the 12 true cases.
|
||||
func TestShouldPodBeInEndpoints(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
pod *v1.Pod
|
||||
expected bool
|
||||
}{
|
||||
// Pod should not be in endpoints cases:
|
||||
{
|
||||
name: "Failed pod with Never RestartPolicy",
|
||||
pod: &v1.Pod{
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyNever,
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodFailed,
|
||||
},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "Succeeded pod with Never RestartPolicy",
|
||||
pod: &v1.Pod{
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyNever,
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodSucceeded,
|
||||
},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "Succeeded pod with OnFailure RestartPolicy",
|
||||
pod: &v1.Pod{
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyOnFailure,
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodSucceeded,
|
||||
},
|
||||
},
|
||||
expected: false,
|
||||
},
|
||||
// Pod should be in endpoints cases:
|
||||
{
|
||||
name: "Failed pod with Always RestartPolicy",
|
||||
pod: &v1.Pod{
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyAlways,
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodFailed,
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "Pending pod with Never RestartPolicy",
|
||||
pod: &v1.Pod{
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyNever,
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodPending,
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "Unknown pod with OnFailure RestartPolicy",
|
||||
pod: &v1.Pod{
|
||||
Spec: v1.PodSpec{
|
||||
RestartPolicy: v1.RestartPolicyOnFailure,
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodUnknown,
|
||||
},
|
||||
},
|
||||
expected: true,
|
||||
},
|
||||
}
|
||||
for _, test := range testCases {
|
||||
result := shouldPodBeInEndpoints(test.pod)
|
||||
if result != test.expected {
|
||||
t.Errorf("%s: expected : %t, got: %t", test.name, test.expected, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPodToEndpointAddressForService(t *testing.T) {
|
||||
ipv4 := v1.IPv4Protocol
|
||||
ipv6 := v1.IPv6Protocol
|
||||
@ -2313,6 +2222,235 @@ func TestMultipleServiceChanges(t *testing.T) {
|
||||
close(stopChan)
|
||||
}
|
||||
|
||||
func TestSyncServiceAddresses(t *testing.T) {
|
||||
makeService := func(tolerateUnready bool) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: "ns"},
|
||||
Spec: v1.ServiceSpec{
|
||||
Selector: map[string]string{"foo": "bar"},
|
||||
PublishNotReadyAddresses: tolerateUnready,
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
ClusterIP: "1.1.1.1",
|
||||
Ports: []v1.ServicePort{{Port: 80}},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
makePod := func(phase v1.PodPhase, isReady bool, terminating bool) *v1.Pod {
|
||||
statusCondition := v1.ConditionFalse
|
||||
if isReady {
|
||||
statusCondition = v1.ConditionTrue
|
||||
}
|
||||
|
||||
now := metav1.Now()
|
||||
deletionTimestamp := &now
|
||||
if !terminating {
|
||||
deletionTimestamp = nil
|
||||
}
|
||||
return &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: "ns",
|
||||
Name: "fakepod",
|
||||
DeletionTimestamp: deletionTimestamp,
|
||||
Labels: map[string]string{"foo": "bar"},
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{{Ports: []v1.ContainerPort{
|
||||
{Name: "port1", ContainerPort: int32(8080)},
|
||||
}}},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: phase,
|
||||
Conditions: []v1.PodCondition{
|
||||
{
|
||||
Type: v1.PodReady,
|
||||
Status: statusCondition,
|
||||
},
|
||||
},
|
||||
PodIP: "10.1.1.1",
|
||||
PodIPs: []v1.PodIP{
|
||||
{IP: "10.1.1.1"},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
pod *v1.Pod
|
||||
service *v1.Service
|
||||
expectedReady int
|
||||
expectedUnready int
|
||||
}{
|
||||
{
|
||||
name: "pod running phase",
|
||||
pod: makePod(v1.PodRunning, true, false),
|
||||
service: makeService(false),
|
||||
expectedReady: 1,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod running phase being deleted",
|
||||
pod: makePod(v1.PodRunning, true, true),
|
||||
service: makeService(false),
|
||||
expectedReady: 0,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod unknown phase container ready",
|
||||
pod: makePod(v1.PodUnknown, true, false),
|
||||
service: makeService(false),
|
||||
expectedReady: 1,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod unknown phase container ready being deleted",
|
||||
pod: makePod(v1.PodUnknown, true, true),
|
||||
service: makeService(false),
|
||||
expectedReady: 0,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod pending phase container ready",
|
||||
pod: makePod(v1.PodPending, true, false),
|
||||
service: makeService(false),
|
||||
expectedReady: 1,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod pending phase container ready being deleted",
|
||||
pod: makePod(v1.PodPending, true, true),
|
||||
service: makeService(false),
|
||||
expectedReady: 0,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod unknown phase container not ready",
|
||||
pod: makePod(v1.PodUnknown, false, false),
|
||||
service: makeService(false),
|
||||
expectedReady: 0,
|
||||
expectedUnready: 1,
|
||||
},
|
||||
{
|
||||
name: "pod pending phase container not ready",
|
||||
pod: makePod(v1.PodPending, false, false),
|
||||
service: makeService(false),
|
||||
expectedReady: 0,
|
||||
expectedUnready: 1,
|
||||
},
|
||||
{
|
||||
name: "pod failed phase",
|
||||
pod: makePod(v1.PodFailed, false, false),
|
||||
service: makeService(false),
|
||||
expectedReady: 0,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod succeeded phase",
|
||||
pod: makePod(v1.PodSucceeded, false, false),
|
||||
service: makeService(false),
|
||||
expectedReady: 0,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod running phase and tolerate unready",
|
||||
pod: makePod(v1.PodRunning, false, false),
|
||||
service: makeService(true),
|
||||
expectedReady: 1,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod running phase and tolerate unready being deleted",
|
||||
pod: makePod(v1.PodRunning, false, true),
|
||||
service: makeService(true),
|
||||
expectedReady: 1,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod unknown phase and tolerate unready",
|
||||
pod: makePod(v1.PodUnknown, false, false),
|
||||
service: makeService(true),
|
||||
expectedReady: 1,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod unknown phase and tolerate unready being deleted",
|
||||
pod: makePod(v1.PodUnknown, false, true),
|
||||
service: makeService(true),
|
||||
expectedReady: 1,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod pending phase and tolerate unready",
|
||||
pod: makePod(v1.PodPending, false, false),
|
||||
service: makeService(true),
|
||||
expectedReady: 1,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod pending phase and tolerate unready being deleted",
|
||||
pod: makePod(v1.PodPending, false, true),
|
||||
service: makeService(true),
|
||||
expectedReady: 1,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod failed phase and tolerate unready",
|
||||
pod: makePod(v1.PodFailed, false, false),
|
||||
service: makeService(true),
|
||||
expectedReady: 0,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
{
|
||||
name: "pod succeeded phase and tolerate unready endpoints",
|
||||
pod: makePod(v1.PodSucceeded, false, false),
|
||||
service: makeService(true),
|
||||
expectedReady: 0,
|
||||
expectedUnready: 0,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ns := tc.service.Namespace
|
||||
client, c := newFakeController(0 * time.Second)
|
||||
|
||||
err := c.podStore.Add(tc.pod)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error adding pod %v", err)
|
||||
}
|
||||
err = c.serviceStore.Add(tc.service)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error adding service %v", err)
|
||||
}
|
||||
err = c.syncService(context.TODO(), fmt.Sprintf("%s/%s", ns, tc.service.Name))
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error syncing service %v", err)
|
||||
}
|
||||
|
||||
endpoints, err := client.CoreV1().Endpoints(ns).Get(context.TODO(), tc.service.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
|
||||
readyEndpoints := 0
|
||||
unreadyEndpoints := 0
|
||||
for _, subset := range endpoints.Subsets {
|
||||
readyEndpoints += len(subset.Addresses)
|
||||
unreadyEndpoints += len(subset.NotReadyAddresses)
|
||||
}
|
||||
|
||||
if tc.expectedReady != readyEndpoints {
|
||||
t.Errorf("Expected %d ready endpoints, got %d", tc.expectedReady, readyEndpoints)
|
||||
}
|
||||
|
||||
if tc.expectedUnready != unreadyEndpoints {
|
||||
t.Errorf("Expected %d ready endpoints, got %d", tc.expectedUnready, unreadyEndpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointsDeletionEvents(t *testing.T) {
|
||||
ns := metav1.NamespaceDefault
|
||||
testServer, _ := makeTestServer(t, ns)
|
||||
|
@ -135,7 +135,7 @@ func DeepHashObjectToString(objectToWrite interface{}) string {
|
||||
return hex.EncodeToString(hasher.Sum(nil)[0:])
|
||||
}
|
||||
|
||||
// ShouldPodBeInEndpointSlice returns true if a specified pod should be in an EndpointSlice object.
|
||||
// ShouldPodBeInEndpointSlice returns true if a specified pod should be in an Endpoint or EndpointSlice object.
|
||||
// Terminating pods are only included if includeTerminating is true
|
||||
func ShouldPodBeInEndpointSlice(pod *v1.Pod, includeTerminating bool) bool {
|
||||
// "Terminal" describes when a Pod is complete (in a succeeded or failed phase).
|
||||
|
Loading…
Reference in New Issue
Block a user