diff --git a/pkg/proxy/endpoints.go b/pkg/proxy/endpoints.go index 8224f30954d..af1be874932 100644 --- a/pkg/proxy/endpoints.go +++ b/pkg/proxy/endpoints.go @@ -164,6 +164,11 @@ type EndpointChangeTracker struct { // Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints // object to change. Used to calculate the network-programming-latency. lastChangeTriggerTimes map[types.NamespacedName][]time.Time + // record the time when the endpointChangeTracker was created so we can ignore the endpoints + // that were generated before, because we can't estimate the network-programming-latency on those. + // This is specially problematic on restarts, because we process all the endpoints that may have been + // created hours or days before. + trackerStartTime time.Time } // NewEndpointChangeTracker initializes an EndpointsChangeMap @@ -175,6 +180,7 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc ipFamily: ipFamily, recorder: recorder, lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), + trackerStartTime: time.Now(), processEndpointsMapChange: processEndpointsMapChange, } if endpointSlicesEnabled { @@ -216,7 +222,7 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool { // In case of Endpoints deletion, the LastChangeTriggerTime annotation is // by-definition coming from the time of last update, which is not what // we want to measure. So we simply ignore it in this cases. - if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() && current != nil { + if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() && current != nil && t.After(ect.trackerStartTime) { ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t) } @@ -276,7 +282,7 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E // we want to measure. So we simply ignore it in this cases. // TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion // when other EndpointSlice for that service still exist. - if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && !removeSlice { + if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && !removeSlice && t.After(ect.trackerStartTime) { ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t) } diff --git a/pkg/proxy/endpoints_test.go b/pkg/proxy/endpoints_test.go index 2f75a0d6d87..d736cd15eb8 100644 --- a/pkg/proxy/endpoints_test.go +++ b/pkg/proxy/endpoints_test.go @@ -23,7 +23,7 @@ import ( "github.com/davecgh/go-spew/spew" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -1294,7 +1294,7 @@ func TestUpdateEndpointsMap(t *testing.T) { for tci, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - fp := newFakeProxier(v1.IPv4Protocol) + fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) fp.hostname = nodeName // First check that after adding all previous versions of endpoints, @@ -1364,7 +1364,9 @@ func TestUpdateEndpointsMap(t *testing.T) { } func TestLastChangeTriggerTime(t *testing.T) { - t0 := time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC) + startTime := time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC) + t_1 := startTime.Add(-time.Second) + t0 := startTime.Add(time.Second) t1 := t0.Add(time.Second) t2 := t1.Add(time.Second) t3 := t2.Add(time.Second) @@ -1438,6 +1440,14 @@ func TestLastChangeTriggerTime(t *testing.T) { }, expected: map[types.NamespacedName][]time.Time{}, }, + { + name: "Endpoints create before tracker started", + scenario: func(fp *FakeProxier) { + e := createEndpoints("ns", "ep1", t_1) + fp.addEndpoints(e) + }, + expected: map[types.NamespacedName][]time.Time{}, + }, { name: "addEndpoints then deleteEndpoints", scenario: func(fp *FakeProxier) { @@ -1469,7 +1479,7 @@ func TestLastChangeTriggerTime(t *testing.T) { } for _, tc := range testCases { - fp := newFakeProxier(v1.IPv4Protocol) + fp := newFakeProxier(v1.IPv4Protocol, startTime) tc.scenario(fp) diff --git a/pkg/proxy/service_test.go b/pkg/proxy/service_test.go index ebf4002c787..dbb98bf0e01 100644 --- a/pkg/proxy/service_test.go +++ b/pkg/proxy/service_test.go @@ -20,10 +20,11 @@ import ( "net" "reflect" "testing" + "time" "github.com/davecgh/go-spew/spew" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" @@ -511,12 +512,21 @@ type FakeProxier struct { hostname string } -func newFakeProxier(ipFamily v1.IPFamily) *FakeProxier { +func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier { return &FakeProxier{ - serviceMap: make(ServiceMap), - serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), - endpointsMap: make(EndpointsMap), - endpointsChanges: NewEndpointChangeTracker(testHostname, nil, ipFamily, nil, false, nil), + serviceMap: make(ServiceMap), + serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil), + endpointsMap: make(EndpointsMap), + endpointsChanges: &EndpointChangeTracker{ + hostname: testHostname, + items: make(map[types.NamespacedName]*endpointsChange), + makeEndpointInfo: nil, + ipFamily: ipFamily, + recorder: nil, + lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time), + trackerStartTime: t, + processEndpointsMapChange: nil, + }, } } @@ -539,7 +549,7 @@ func (fake *FakeProxier) deleteService(service *v1.Service) { } func TestServiceMapUpdateHeadless(t *testing.T) { - fp := newFakeProxier(v1.IPv4Protocol) + fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) makeServiceMap(fp, makeTestService("ns2", "headless", func(svc *v1.Service) { @@ -570,7 +580,7 @@ func TestServiceMapUpdateHeadless(t *testing.T) { } func TestUpdateServiceTypeExternalName(t *testing.T) { - fp := newFakeProxier(v1.IPv4Protocol) + fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) makeServiceMap(fp, makeTestService("ns2", "external-name", func(svc *v1.Service) { @@ -595,7 +605,7 @@ func TestUpdateServiceTypeExternalName(t *testing.T) { } func TestBuildServiceMapAddRemove(t *testing.T) { - fp := newFakeProxier(v1.IPv4Protocol) + fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) services := []*v1.Service{ makeTestService("ns2", "cluster-ip", func(svc *v1.Service) { @@ -698,7 +708,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) { } func TestBuildServiceMapServiceUpdate(t *testing.T) { - fp := newFakeProxier(v1.IPv4Protocol) + fp := newFakeProxier(v1.IPv4Protocol, time.Time{}) servicev1 := makeTestService("ns1", "svc1", func(svc *v1.Service) { svc.Spec.Type = v1.ServiceTypeClusterIP