Port TestLastChangeTriggerTime from Endpoints tracker to EndpointSlice

This exposed a bug in the EndpointSlice tracking code, which is that
we didn't properly reset the "last change time" when a slice was
deleted. (This means kube-proxy would report an erroneous value in the
"endpoint programming time" metric if a service was added/updated,
then deleted before kube-proxy processed the add/update, then later
added again.)
This commit is contained in:
Dan Winship 2022-12-30 20:32:53 -05:00
parent 886c4b0cf2
commit cc1847e6ee
2 changed files with 56 additions and 27 deletions

View File

@ -293,7 +293,9 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
// we want to measure. So we simply ignore it in this cases. // we want to measure. So we simply ignore it in this cases.
// TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion // TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion
// when other EndpointSlice for that service still exist. // when other EndpointSlice for that service still exist.
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && !removeSlice && t.After(ect.trackerStartTime) { if removeSlice {
delete(ect.lastChangeTriggerTimes, namespacedName)
} else if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && t.After(ect.trackerStartTime) {
ect.lastChangeTriggerTimes[namespacedName] = ect.lastChangeTriggerTimes[namespacedName] =
append(ect.lastChangeTriggerTimes[namespacedName], t) append(ect.lastChangeTriggerTimes[namespacedName], t)
} }

View File

@ -41,6 +41,18 @@ func (proxier *FakeProxier) deleteEndpoints(endpoints *v1.Endpoints) {
proxier.endpointsChanges.Update(endpoints, nil) proxier.endpointsChanges.Update(endpoints, nil)
} }
func (proxier *FakeProxier) addEndpointSlice(slice *discovery.EndpointSlice) {
proxier.endpointsChanges.EndpointSliceUpdate(slice, false)
}
func (proxier *FakeProxier) updateEndpointSlice(oldSlice, slice *discovery.EndpointSlice) {
proxier.endpointsChanges.EndpointSliceUpdate(slice, false)
}
func (proxier *FakeProxier) deleteEndpointSlice(slice *discovery.EndpointSlice) {
proxier.endpointsChanges.EndpointSliceUpdate(slice, true)
}
func TestGetLocalEndpointIPs(t *testing.T) { func TestGetLocalEndpointIPs(t *testing.T) {
testCases := []struct { testCases := []struct {
endpointsMap EndpointsMap endpointsMap EndpointsMap
@ -1118,24 +1130,38 @@ func TestLastChangeTriggerTime(t *testing.T) {
t2 := t1.Add(time.Second) t2 := t1.Add(time.Second)
t3 := t2.Add(time.Second) t3 := t2.Add(time.Second)
createEndpoints := func(namespace, name string, triggerTime time.Time) *v1.Endpoints { createEndpoints := func(namespace, name string, triggerTime time.Time) *discovery.EndpointSlice {
e := makeTestEndpoints(namespace, name, func(ept *v1.Endpoints) { tcp := v1.ProtocolTCP
ept.Subsets = []v1.EndpointSubset{{ return &discovery.EndpointSlice{
Addresses: []v1.EndpointAddress{{IP: "1.1.1.1"}}, ObjectMeta: metav1.ObjectMeta{
Ports: []v1.EndpointPort{{Port: 11}}, Name: name,
}} Namespace: namespace,
}) Annotations: map[string]string{
e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano) v1.EndpointsLastChangeTriggerTime: triggerTime.Format(time.RFC3339Nano),
return e },
Labels: map[string]string{
discovery.LabelServiceName: name,
},
},
AddressType: discovery.AddressTypeIPv4,
Endpoints: []discovery.Endpoint{{
Addresses: []string{"1.1.1.1"},
}},
Ports: []discovery.EndpointPort{{
Name: pointer.String("p11"),
Port: pointer.Int32(11),
Protocol: &tcp,
}},
}
} }
createName := func(namespace, name string) types.NamespacedName { createName := func(namespace, name string) types.NamespacedName {
return types.NamespacedName{Namespace: namespace, Name: name} return types.NamespacedName{Namespace: namespace, Name: name}
} }
modifyEndpoints := func(endpoints *v1.Endpoints, triggerTime time.Time) *v1.Endpoints { modifyEndpoints := func(slice *discovery.EndpointSlice, triggerTime time.Time) *discovery.EndpointSlice {
e := endpoints.DeepCopy() e := slice.DeepCopy()
e.Subsets[0].Ports[0].Port++ (*e.Ports[0].Port)++
e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano) e.Annotations[v1.EndpointsLastChangeTriggerTime] = triggerTime.Format(time.RFC3339Nano)
return e return e
} }
@ -1149,7 +1175,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
name: "Single addEndpoints", name: "Single addEndpoints",
scenario: func(fp *FakeProxier) { scenario: func(fp *FakeProxier) {
e := createEndpoints("ns", "ep1", t0) e := createEndpoints("ns", "ep1", t0)
fp.addEndpoints(e) fp.addEndpointSlice(e)
}, },
expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0}}, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0}},
}, },
@ -1157,10 +1183,10 @@ func TestLastChangeTriggerTime(t *testing.T) {
name: "addEndpoints then updatedEndpoints", name: "addEndpoints then updatedEndpoints",
scenario: func(fp *FakeProxier) { scenario: func(fp *FakeProxier) {
e := createEndpoints("ns", "ep1", t0) e := createEndpoints("ns", "ep1", t0)
fp.addEndpoints(e) fp.addEndpointSlice(e)
e1 := modifyEndpoints(e, t1) e1 := modifyEndpoints(e, t1)
fp.updateEndpoints(e, e1) fp.updateEndpointSlice(e, e1)
}, },
expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0, t1}}, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0, t1}},
}, },
@ -1168,13 +1194,13 @@ func TestLastChangeTriggerTime(t *testing.T) {
name: "Add two endpoints then modify one", name: "Add two endpoints then modify one",
scenario: func(fp *FakeProxier) { scenario: func(fp *FakeProxier) {
e1 := createEndpoints("ns", "ep1", t1) e1 := createEndpoints("ns", "ep1", t1)
fp.addEndpoints(e1) fp.addEndpointSlice(e1)
e2 := createEndpoints("ns", "ep2", t2) e2 := createEndpoints("ns", "ep2", t2)
fp.addEndpoints(e2) fp.addEndpointSlice(e2)
e11 := modifyEndpoints(e1, t3) e11 := modifyEndpoints(e1, t3)
fp.updateEndpoints(e1, e11) fp.updateEndpointSlice(e1, e11)
}, },
expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t1, t3}, createName("ns", "ep2"): {t2}}, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t1, t3}, createName("ns", "ep2"): {t2}},
}, },
@ -1183,7 +1209,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
scenario: func(fp *FakeProxier) { scenario: func(fp *FakeProxier) {
e := createEndpoints("ns", "ep1", t1) e := createEndpoints("ns", "ep1", t1)
delete(e.Annotations, v1.EndpointsLastChangeTriggerTime) delete(e.Annotations, v1.EndpointsLastChangeTriggerTime)
fp.addEndpoints(e) fp.addEndpointSlice(e)
}, },
expected: map[types.NamespacedName][]time.Time{}, expected: map[types.NamespacedName][]time.Time{},
}, },
@ -1191,7 +1217,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
name: "Endpoints create before tracker started", name: "Endpoints create before tracker started",
scenario: func(fp *FakeProxier) { scenario: func(fp *FakeProxier) {
e := createEndpoints("ns", "ep1", t_1) e := createEndpoints("ns", "ep1", t_1)
fp.addEndpoints(e) fp.addEndpointSlice(e)
}, },
expected: map[types.NamespacedName][]time.Time{}, expected: map[types.NamespacedName][]time.Time{},
}, },
@ -1199,8 +1225,8 @@ func TestLastChangeTriggerTime(t *testing.T) {
name: "addEndpoints then deleteEndpoints", name: "addEndpoints then deleteEndpoints",
scenario: func(fp *FakeProxier) { scenario: func(fp *FakeProxier) {
e := createEndpoints("ns", "ep1", t1) e := createEndpoints("ns", "ep1", t1)
fp.addEndpoints(e) fp.addEndpointSlice(e)
fp.deleteEndpoints(e) fp.deleteEndpointSlice(e)
}, },
expected: map[types.NamespacedName][]time.Time{}, expected: map[types.NamespacedName][]time.Time{},
}, },
@ -1208,10 +1234,10 @@ func TestLastChangeTriggerTime(t *testing.T) {
name: "add then delete then add again", name: "add then delete then add again",
scenario: func(fp *FakeProxier) { scenario: func(fp *FakeProxier) {
e := createEndpoints("ns", "ep1", t1) e := createEndpoints("ns", "ep1", t1)
fp.addEndpoints(e) fp.addEndpointSlice(e)
fp.deleteEndpoints(e) fp.deleteEndpointSlice(e)
e = modifyEndpoints(e, t2) e = modifyEndpoints(e, t2)
fp.addEndpoints(e) fp.addEndpointSlice(e)
}, },
expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t2}}, expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t2}},
}, },
@ -1219,7 +1245,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
name: "delete", name: "delete",
scenario: func(fp *FakeProxier) { scenario: func(fp *FakeProxier) {
e := createEndpoints("ns", "ep1", t1) e := createEndpoints("ns", "ep1", t1)
fp.deleteEndpoints(e) fp.deleteEndpointSlice(e)
}, },
expected: map[types.NamespacedName][]time.Time{}, expected: map[types.NamespacedName][]time.Time{},
}, },
@ -1227,6 +1253,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
for _, tc := range testCases { for _, tc := range testCases {
fp := newFakeProxier(v1.IPv4Protocol, startTime) fp := newFakeProxier(v1.IPv4Protocol, startTime)
fp.endpointsChanges.endpointSliceCache = NewEndpointSliceCache(testHostname, v1.IPv4Protocol, nil, nil)
tc.scenario(fp) tc.scenario(fp)