Start exporting the in-cluster network programming latency metric.

This commit is contained in:
Matt Matejczyk
2018-11-05 15:38:50 -05:00
parent 9c72a33685
commit 7141ece4bf
5 changed files with 197 additions and 10 deletions

View File

@@ -21,6 +21,7 @@ import (
"reflect"
"strconv"
"sync"
"time"
"k8s.io/klog"
@@ -92,16 +93,20 @@ type EndpointChangeTracker struct {
// isIPv6Mode indicates if change tracker is under IPv6/IPv4 mode. Nil means not applicable.
isIPv6Mode *bool
recorder record.EventRecorder
// Map from the Endpoints namespaced-name to the times of the triggers that caused the endpoints
// object to change. Used to calculate the network-programming-latency.
lastChangeTriggerTimes map[types.NamespacedName][]time.Time
}
// NewEndpointChangeTracker initializes an EndpointsChangeMap
func NewEndpointChangeTracker(hostname string, makeEndpointInfo makeEndpointFunc, isIPv6Mode *bool, recorder record.EventRecorder) *EndpointChangeTracker {
return &EndpointChangeTracker{
hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange),
makeEndpointInfo: makeEndpointInfo,
isIPv6Mode: isIPv6Mode,
recorder: recorder,
hostname: hostname,
items: make(map[types.NamespacedName]*endpointsChange),
makeEndpointInfo: makeEndpointInfo,
isIPv6Mode: isIPv6Mode,
recorder: recorder,
lastChangeTriggerTimes: make(map[types.NamespacedName][]time.Time),
}
}
@@ -133,14 +138,38 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
change.previous = ect.endpointsToEndpointsMap(previous)
ect.items[namespacedName] = change
}
if t := getLastChangeTriggerTime(endpoints); !t.IsZero() {
ect.lastChangeTriggerTimes[namespacedName] =
append(ect.lastChangeTriggerTimes[namespacedName], t)
}
change.current = ect.endpointsToEndpointsMap(current)
// if change.previous equal to change.current, it means no change
if reflect.DeepEqual(change.previous, change.current) {
delete(ect.items, namespacedName)
// Reset the lastChangeTriggerTimes for the Endpoints object. Given that the network programming
// SLI is defined as the duration between a time of an event and a time when the network was
// programmed to incorporate that event, if there are events that happened between two
// consecutive syncs and that canceled each other out, e.g. pod A added -> pod A deleted,
// there will be no network programming for them and thus no network programming latency metric
// should be exported.
delete(ect.lastChangeTriggerTimes, namespacedName)
}
return len(ect.items) > 0
}
// getLastChangeTriggerTime returns the time.Time value of the EndpointsLastChangeTriggerTime
// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
// or was set incorrectly.
func getLastChangeTriggerTime(endpoints *v1.Endpoints) time.Time {
val, err := time.Parse(time.RFC3339Nano, endpoints.Annotations[v1.EndpointsLastChangeTriggerTime])
if err != nil {
klog.Warningf("Error while parsing EndpointsLastChangeTriggerTimeAnnotation: '%s'. Error is %v",
endpoints.Annotations[v1.EndpointsLastChangeTriggerTime], err)
// In case of error val = time.Zero, which is ignored in the upstream code.
}
return val
}
// endpointsChange contains all changes to endpoints that happened since proxy rules were synced. For a single object,
// changes are accumulated, i.e. previous is state from before applying the changes,
// current is state after applying the changes.
@@ -157,14 +186,19 @@ type UpdateEndpointMapResult struct {
StaleEndpoints []ServiceEndpoint
// StaleServiceNames identifies if a service is stale.
StaleServiceNames []ServicePortName
// List of the trigger times for all endpoints objects that changed. It's used to export the
// network programming latency.
LastChangeTriggerTimes []time.Time
}
// UpdateEndpointsMap updates endpointsMap base on the given changes.
func UpdateEndpointsMap(endpointsMap EndpointsMap, changes *EndpointChangeTracker) (result UpdateEndpointMapResult) {
result.StaleEndpoints = make([]ServiceEndpoint, 0)
result.StaleServiceNames = make([]ServicePortName, 0)
result.LastChangeTriggerTimes = make([]time.Time, 0)
endpointsMap.apply(changes, &result.StaleEndpoints, &result.StaleServiceNames)
endpointsMap.apply(
changes, &result.StaleEndpoints, &result.StaleServiceNames, &result.LastChangeTriggerTimes)
// TODO: If this will appear to be computationally expensive, consider
// computing this incrementally similarly to endpointsMap.
@@ -241,7 +275,10 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
// apply the changes to EndpointsMap and updates stale endpoints and service-endpoints pair. The `staleEndpoints` argument
// is passed in to store the stale udp endpoints and `staleServiceNames` argument is passed in to store the stale udp service.
// The changes map is cleared after applying them.
func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint, staleServiceNames *[]ServicePortName) {
// In addition it returns (via argument) and resets the lastChangeTriggerTimes for all endpoints
// that were changed and will result in syncing the proxy rules.
func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndpoints *[]ServiceEndpoint,
staleServiceNames *[]ServicePortName, lastChangeTriggerTimes *[]time.Time) {
if changes == nil {
return
}
@@ -253,6 +290,10 @@ func (endpointsMap EndpointsMap) apply(changes *EndpointChangeTracker, staleEndp
detectStaleConnections(change.previous, change.current, staleEndpoints, staleServiceNames)
}
changes.items = make(map[types.NamespacedName]*endpointsChange)
for _, lastChangeTriggerTime := range changes.lastChangeTriggerTimes {
*lastChangeTriggerTimes = append(*lastChangeTriggerTimes, lastChangeTriggerTime...)
}
changes.lastChangeTriggerTimes = make(map[types.NamespacedName][]time.Time)
}
// Merge ensures that the current EndpointsMap contains all <service, endpoints> pairs from the EndpointsMap passed in.