Merge pull request #100861 from aojea/kproxy_latency

fix kube-proxy latency metrics
This commit is contained in:
Kubernetes Prow Robot 2021-04-10 19:03:55 -07:00 committed by GitHub
commit ff2a3e1147
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 42 additions and 16 deletions

View File

@ -164,6 +164,11 @@ type EndpointChangeTracker struct {
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
// object to change. Used to calculate the network-programming-latency.
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
// record the time when the endpointChangeTracker was created so we can ignore the endpoints
// that were generated before, because we can't estimate the network-programming-latency on those.
// This is specially problematic on restarts, because we process all the endpoints that may have been
// created hours or days before.
trackerStartTime time.Time
}
// NewEndpointChangeTracker initializes an EndpointsChangeMap
@ -175,6 +180,7 @@ func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc
ipFamily: ipFamily,
recorder: recorder,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: time.Now(),
processEndpointsMapChange: processEndpointsMapChange,
}
if endpointSlicesEnabled {
@ -216,7 +222,7 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
// In case of Endpoints deletion, the LastChangeTriggerTime annotation is
// by-definition coming from the time of last update, which is not what
// we want to measure. So we simply ignore it in this cases.
if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() && current != nil {
if t := getLastChangeTriggerTime(endpoints.Annotations); !t.IsZero() && current != nil && t.After(ect.trackerStartTime) {
ect.lastChangeTriggerTimes[namespacedName] = append(ect.lastChangeTriggerTimes[namespacedName], t)
}
@ -276,7 +282,7 @@ func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.E
// we want to measure. So we simply ignore it in this cases.
// TODO(wojtek-t, robscott): Address the problem for EndpointSlice deletion
// when other EndpointSlice for that service still exist.
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && !removeSlice {
if t := getLastChangeTriggerTime(endpointSlice.Annotations); !t.IsZero() && !removeSlice && t.After(ect.trackerStartTime) {
ect.lastChangeTriggerTimes[namespacedName] =
append(ect.lastChangeTriggerTimes[namespacedName], t)
}

View File

@ -23,7 +23,7 @@ import (
"github.com/davecgh/go-spew/spew"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -1294,7 +1294,7 @@ func TestUpdateEndpointsMap(t *testing.T) {
for tci, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fp := newFakeProxier(v1.IPv4Protocol)
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
fp.hostname = nodeName
// First check that after adding all previous versions of endpoints,
@ -1364,7 +1364,9 @@ func TestUpdateEndpointsMap(t *testing.T) {
}
func TestLastChangeTriggerTime(t *testing.T) {
t0 := time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
startTime := time.Date(2018, 01, 01, 0, 0, 0, 0, time.UTC)
t_1 := startTime.Add(-time.Second)
t0 := startTime.Add(time.Second)
t1 := t0.Add(time.Second)
t2 := t1.Add(time.Second)
t3 := t2.Add(time.Second)
@ -1438,6 +1440,14 @@ func TestLastChangeTriggerTime(t *testing.T) {
},
expected: map[types.NamespacedName][]time.Time{},
},
{
name: "Endpoints create before tracker started",
scenario: func(fp *FakeProxier) {
e := createEndpoints("ns", "ep1", t_1)
fp.addEndpoints(e)
},
expected: map[types.NamespacedName][]time.Time{},
},
{
name: "addEndpoints then deleteEndpoints",
scenario: func(fp *FakeProxier) {
@ -1469,7 +1479,7 @@ func TestLastChangeTriggerTime(t *testing.T) {
}
for _, tc := range testCases {
fp := newFakeProxier(v1.IPv4Protocol)
fp := newFakeProxier(v1.IPv4Protocol, startTime)
tc.scenario(fp)

View File

@ -20,10 +20,11 @@ import (
"net"
"reflect"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
@ -511,12 +512,21 @@ type FakeProxier struct {
hostname string
}
func newFakeProxier(ipFamily v1.IPFamily) *FakeProxier {
func newFakeProxier(ipFamily v1.IPFamily, t time.Time) *FakeProxier {
return &FakeProxier{
serviceMap: make(ServiceMap),
serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
endpointsMap: make(EndpointsMap),
endpointsChanges: NewEndpointChangeTracker(testHostname, nil, ipFamily, nil, false, nil),
serviceMap: make(ServiceMap),
serviceChanges: NewServiceChangeTracker(nil, ipFamily, nil, nil),
endpointsMap: make(EndpointsMap),
endpointsChanges: &EndpointChangeTracker{
hostname: testHostname,
items: make(map[types.NamespacedName]*endpointsChange),
makeEndpointInfo: nil,
ipFamily: ipFamily,
recorder: nil,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
trackerStartTime: t,
processEndpointsMapChange: nil,
},
}
}
@ -539,7 +549,7 @@ func (fake *FakeProxier) deleteService(service *v1.Service) {
}
func TestServiceMapUpdateHeadless(t *testing.T) {
fp := newFakeProxier(v1.IPv4Protocol)
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
makeServiceMap(fp,
makeTestService("ns2", "headless", func(svc *v1.Service) {
@ -570,7 +580,7 @@ func TestServiceMapUpdateHeadless(t *testing.T) {
}
func TestUpdateServiceTypeExternalName(t *testing.T) {
fp := newFakeProxier(v1.IPv4Protocol)
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
makeServiceMap(fp,
makeTestService("ns2", "external-name", func(svc *v1.Service) {
@ -595,7 +605,7 @@ func TestUpdateServiceTypeExternalName(t *testing.T) {
}
func TestBuildServiceMapAddRemove(t *testing.T) {
fp := newFakeProxier(v1.IPv4Protocol)
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
services := []*v1.Service{
makeTestService("ns2", "cluster-ip", func(svc *v1.Service) {
@ -698,7 +708,7 @@ func TestBuildServiceMapAddRemove(t *testing.T) {
}
func TestBuildServiceMapServiceUpdate(t *testing.T) {
fp := newFakeProxier(v1.IPv4Protocol)
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
servicev1 := makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP