From cc1847e6ee182e191a213e86020fa44a88526baa Mon Sep 17 00:00:00 2001 From: Dan Winship Date: Fri, 30 Dec 2022 20:32:53 -0500 Subject: [PATCH] Port TestLastChangeTriggerTime from Endpoints tracker to EndpointSlice This exposed a bug in the EndpointSlice tracking code, which is that we didn't properly reset the "last change time" when a slice was deleted. (This means kube-proxy would report an erroneous value in the "endpoint programming time" metric if a service was added/updated, then deleted before kube-proxy processed the add/update, then later added again.) --- pkg/proxy/endpoints.go | 4 +- pkg/proxy/endpoints_test.go | 79 +++++++++++++++++++++++++------------ 2 files changed, 56 insertions(+), 27 deletions(-) diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index ca7071a9eb6..d893e08414a 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -293,7 +293,9 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E // we want to measure. So we simply ignore it in this cases. // TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion // when other EndpointSlice for that service still exist. - if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && !removeSlice && t.After(ect.trackerStartTime) { + if removeSlice { + delete(ect.lastChangeTriggerTimes, namespacedName) + } else if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && t.After(ect.trackerStartTime) { ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t) } diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 1a6db465dc6..65f5e819089 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -41,6 +41,18 @@ func (proxier *FakeProxier) deleteEndpoints(endpoints *v1.Endpoints) { proxier.endpointsChanges.Update(endpoints, nil) } +func (proxier *FakeProxier) addEndpointSlice(slice *discovery.EndpointSlice) { + proxier.endpointsChanges.EndpointSliceUpdate(slice, false) +} + +func (proxier *FakeProxier) updateEndpointSlice(oldSlice, slice *discovery.EndpointSlice) { + proxier.endpointsChanges.EndpointSliceUpdate(slice, false) +} + +func (proxier *FakeProxier) deleteEndpointSlice(slice *discovery.EndpointSlice) { + proxier.endpointsChanges.EndpointSliceUpdate(slice, true) +} + func TestGetLocalEndpointIPs(t *testing.T) { testCases := []struct { endpointsMap EndpointsMap @@ -1118,24 +1130,38 @@ func TestLastChangeTriggerTime(t *testing.T) { t2 := t1.Add(time.Second) t3 := t2.Add(time.Second) - createEndpoints := func(namespace, name string, triggerTime time.Time) *v1.Endpoints { - e := makeTestEndpoints(namespace, name, func(ept *v1.Endpoints) { - ept.Subsets = []v1.EndpointSubset{{ - Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}}, - Ports: []v1.EndpointPort{{Port: 11}}, - }} - }) - e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano) - return e + createEndpoints := func(namespace, name string, triggerTime time.Time) *discovery.EndpointSlice { + tcp := v1.ProtocolTCP + return &discovery.EndpointSlice{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Annotations: map[string]string{ + v1.EndpointsLastChangeTriggerTime: triggerTime.Format(time.RFC3339Nano), + }, + Labels: map[string]string{ + discovery.LabelServiceName: name, + }, + }, + AddressType: discovery.AddressTypeIPv4, + Endpoints: []discovery.Endpoint{{ + Addresses: []string{"1.1.1.1"}, + }}, + Ports: []discovery.EndpointPort{{ + Name: pointer.String("p11"), + Port: pointer.Int32(11), + Protocol: &tcp, + }}, + } } createName := func(namespace, name string) types.NamespacedName { return types.NamespacedName{Namespace: namespace, Name: name} } - modifyEndpoints := func(endpoints *v1.Endpoints, triggerTime time.Time) *v1.Endpoints { - e := endpoints.DeepCopy() - e.Subsets[0].Ports[0].Port++ + modifyEndpoints := func(slice *discovery.EndpointSlice, triggerTime time.Time) *discovery.EndpointSlice { + e := slice.DeepCopy() + (*e.Ports[0].Port)++ e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano) return e } @@ -1149,7 +1175,7 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "Single addEndpoints", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t0) - fp.addEndpoints(e) + fp.addEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0}}, }, @@ -1157,10 +1183,10 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "addEndpoints then updatedEndpoints", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t0) - fp.addEndpoints(e) + fp.addEndpointSlice(e) e1 := modifyEndpoints(e, t1) - fp.updateEndpoints(e, e1) + fp.updateEndpointSlice(e, e1) }, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0, t1}}, }, @@ -1168,13 +1194,13 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "Add two endpoints then modify one", scenario: func(fp *FakeProxier) { e1 := createEndpoints("ns", "ep1", t1) - fp.addEndpoints(e1) + fp.addEndpointSlice(e1) e2 := createEndpoints("ns", "ep2", t2) - fp.addEndpoints(e2) + fp.addEndpointSlice(e2) e11 := modifyEndpoints(e1, t3) - fp.updateEndpoints(e1, e11) + fp.updateEndpointSlice(e1, e11) }, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t1, t3}, createName("ns", "ep2"): {t2}}, }, @@ -1183,7 +1209,7 @@ func TestLastChangeTriggerTime(t *testing.T) { scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t1) delete(e.Annotations, v1.EndpointsLastChangeTriggerTime) - fp.addEndpoints(e) + fp.addEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{}, }, @@ -1191,7 +1217,7 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "Endpoints create before tracker started", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t_1) - fp.addEndpoints(e) + fp.addEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{}, }, @@ -1199,8 +1225,8 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "addEndpoints then deleteEndpoints", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t1) - fp.addEndpoints(e) - fp.deleteEndpoints(e) + fp.addEndpointSlice(e) + fp.deleteEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{}, }, @@ -1208,10 +1234,10 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "add then delete then add again", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t1) - fp.addEndpoints(e) - fp.deleteEndpoints(e) + fp.addEndpointSlice(e) + fp.deleteEndpointSlice(e) e = modifyEndpoints(e, t2) - fp.addEndpoints(e) + fp.addEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t2}}, }, @@ -1219,7 +1245,7 @@ func TestLastChangeTriggerTime(t *testing.T) { name: "delete", scenario: func(fp *FakeProxier) { e := createEndpoints("ns", "ep1", t1) - fp.deleteEndpoints(e) + fp.deleteEndpointSlice(e) }, expected: map[types.NamespacedName][]time.Time{}, }, @@ -1227,6 +1253,7 @@ func TestLastChangeTriggerTime(t *testing.T) { for _, tc := range testCases { fp := newFakeProxier(v1.IPv4Protocol, startTime) + fp.endpointsChanges.endpointSliceCache = NewEndpointSliceCache(testHostname, v1.IPv4Protocol, nil, nil) tc.scenario(fp)