diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 6cd334e72e2..ebc4c09ffbf 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -202,14 +202,15 @@ type UpdateEndpointMapResult struct { StaleServiceNames []ServicePortName // List of the trigger times for all endpoints objects that changed. It's used to export the // network programming latency. - LastChangeTriggerTimes []time.Time + // NOTE(oxddr): this can be simplified to []time.Time if memory consumption becomes an issue. + LastChangeTriggerTimes map[types.NamespacedName][]time.Time } // Update updates endpointsMap base on the given changes. func (em EndpointsMap) Update(changes *EndpointChangeTracker) (result UpdateEndpointMapResult) { result.StaleEndpoints = make([]ServiceEndpoint, 0) result.StaleServiceNames = make([]ServicePortName, 0) - result.LastChangeTriggerTimes = make([]time.Time, 0) + result.LastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time) em.apply( changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes) @@ -292,7 +293,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint // In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints // that were changed and will result in syncing the proxy rules. func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, - staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *[]time.Time) { + staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *map[types.NamespacedName][]time.Time) { if changes == nil { return } @@ -305,8 +306,13 @@ func (em EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]S } changes.items = make(map[types.NamespacedName]*endpointsChange) metrics.EndpointChangesPending.Set(0) - for _, lastChangeTriggerTime := range changes.lastChangeTriggerTimes { - *lastChangeTriggerTimes = append(*lastChangeTriggerTimes, lastChangeTriggerTime...) + for k, v := range changes.lastChangeTriggerTimes { + prev, ok := (*lastChangeTriggerTimes)[k] + if !ok { + (*lastChangeTriggerTimes)[k] = v + } else { + (*lastChangeTriggerTimes)[k] = append(prev, v...) + } } changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time) } diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 68227425fef..3d10e8000a7 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -18,13 +18,12 @@ package proxy import ( "reflect" - "sort" "testing" "time" "github.com/davecgh/go-spew/spew" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" @@ -1288,6 +1287,10 @@ func TestLastChangeTriggerTime(t *testing.T) { return e } + createName := func(namespace, name string) types.NamespacedName { + return types.NamespacedName{Namespace: namespace, Name: name} + } + modifyEndpoints := func(endpoints *v1.Endpoints, triggerTime time.Time) *v1.Endpoints { e := endpoints.DeepCopy() e.Subsets[0].Ports[0].Port++ @@ -1295,14 +1298,10 @@ func TestLastChangeTriggerTime(t *testing.T) { return e } - sortTimeSlice := func(data []time.Time) { - sort.Slice(data, func(i, j int) bool { return data[i].Before(data[j]) }) - } - testCases := []struct { name string scenario func(fp *FakeProxier) - expected []time.Time + expected map[types.NamespacedName][]time.Time }{ { name: "Single addEndpoints", @@ -1310,7 +1309,7 @@ func TestLastChangeTriggerTime(t *testing.T) { e := createEndpoints("ns", "ep1", t0) fp.addEndpoints(e) }, - expected: []time.Time{t0}, + expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0}}, }, { name: "addEndpoints then updatedEndpoints", @@ -1321,7 +1320,7 @@ func TestLastChangeTriggerTime(t *testing.T) { e1 := modifyEndpoints(e, t1) fp.updateEndpoints(e, e1) }, - expected: []time.Time{t0, t1}, + expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t0, t1}}, }, { name: "Add two endpoints then modify one", @@ -1335,7 +1334,7 @@ func TestLastChangeTriggerTime(t *testing.T) { e11 := modifyEndpoints(e1, t3) fp.updateEndpoints(e1, e11) }, - expected: []time.Time{t1, t2, t3}, + expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t1, t3}, createName("ns", "ep2"): {t2}}, }, { name: "Endpoints without annotation set", @@ -1344,7 +1343,7 @@ func TestLastChangeTriggerTime(t *testing.T) { delete(e.Annotations, v1.EndpointsLastChangeTriggerTime) fp.addEndpoints(e) }, - expected: []time.Time{}, + expected: map[types.NamespacedName][]time.Time{}, }, { name: "addEndpoints then deleteEndpoints", @@ -1353,7 +1352,7 @@ func TestLastChangeTriggerTime(t *testing.T) { fp.addEndpoints(e) fp.deleteEndpoints(e) }, - expected: []time.Time{}, + expected: map[types.NamespacedName][]time.Time{}, }, { name: "add then delete then add again", @@ -1364,7 +1363,7 @@ func TestLastChangeTriggerTime(t *testing.T) { e = modifyEndpoints(e, t2) fp.addEndpoints(e) }, - expected: []time.Time{t2}, + expected: map[types.NamespacedName][]time.Time{createName("ns", "ep1"): {t2}}, }, } @@ -1375,8 +1374,6 @@ func TestLastChangeTriggerTime(t *testing.T) { result := fp.endpointsMap.Update(fp.endpointsChanges) got := result.LastChangeTriggerTimes - sortTimeSlice(got) - sortTimeSlice(tc.expected) if !reflect.DeepEqual(got, tc.expected) { t.Errorf("%s: Invalid LastChangeTriggerTimes, expected: %v, got: %v", diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index b6eddfeb5d3..d4fe596e945 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -1374,10 +1374,12 @@ func (proxier *Proxier) syncProxyRules() { utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } - for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes { - latency := metrics.SinceInSeconds(lastChangeTriggerTime) - metrics.NetworkProgrammingLatency.Observe(latency) - klog.V(4).Infof("Network programming took %f seconds", latency) + for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { + for _, lastChangeTriggerTime := range lastChangeTriggerTimes { + latency := metrics.SinceInSeconds(lastChangeTriggerTime) + metrics.NetworkProgrammingLatency.Observe(latency) + klog.V(4).Infof("Network programming of %s took %f seconds", name, latency) + } } // Close old local ports and save new ones. diff --git a/pkg/proxy/ipvs/proxier.go b/pkg/proxy/ipvs/proxier.go index f3eae341512..e2129d944f9 100644 --- a/pkg/proxy/ipvs/proxier.go +++ b/pkg/proxy/ipvs/proxier.go @@ -1317,10 +1317,12 @@ func (proxier *Proxier) syncProxyRules() { utilproxy.RevertPorts(replacementPortsMap, proxier.portsMap) return } - for _, lastChangeTriggerTime := range endpointUpdateResult.LastChangeTriggerTimes { - latency := metrics.SinceInSeconds(lastChangeTriggerTime) - metrics.NetworkProgrammingLatency.Observe(latency) - klog.V(4).Infof("Network programming took %f seconds", latency) + for name, lastChangeTriggerTimes := range endpointUpdateResult.LastChangeTriggerTimes { + for _, lastChangeTriggerTime := range lastChangeTriggerTimes { + latency := metrics.SinceInSeconds(lastChangeTriggerTime) + metrics.NetworkProgrammingLatency.Observe(latency) + klog.V(4).Infof("Network programming of %s took %f seconds", name, latency) + } } // Close old local ports and save new ones.