Migrated pkg/proxy to structured logging (#104891)

* migrated service.go to structured logging

* fixing capital letter in starting

* migrated topology.go

* migrated endpointslicecache.go

* migrated endpoints.go

* nit typo

* nit plural to singular

* fixed format

* code formatting

* resolving review comment for key ipFamily

* resolving review comment for key endpoints.go

* code formating

* Converted Warningf to ErrorS, wherever applicable

* included review changes

* included review changes
This commit is contained in:
Shivanshu Raj Shrivastava 2021-10-14 22:17:17 +05:30 committed by GitHub
parent dea052ceba
commit daf5af2917
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 31 additions and 29 deletions

View File

@ -246,7 +246,7 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
delete(ect.lastChangeTriggerTimes, namespacedName) delete(ect.lastChangeTriggerTimes, namespacedName)
} else { } else {
for spn, eps := range change.current { for spn, eps := range change.current {
klog.V(2).Infof("Service port %s updated: %d endpoints", spn, len(eps)) klog.V(2).InfoS("Service port endpoints update", "servicePort", spn, "endpoints", len(eps))
} }
} }
@ -259,19 +259,19 @@ func (ect *EndpointChangeTracker) Update(previous, current *v1.Endpoints) bool {
// If removeSlice is true, slice will be removed, otherwise it will be added or updated. // If removeSlice is true, slice will be removed, otherwise it will be added or updated.
func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool { func (ect *EndpointChangeTracker) EndpointSliceUpdate(endpointSlice *discovery.EndpointSlice, removeSlice bool) bool {
if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) { if !supportedEndpointSliceAddressTypes.Has(string(endpointSlice.AddressType)) {
klog.V(4).Infof("EndpointSlice address type not supported by kube-proxy: %s", endpointSlice.AddressType) klog.V(4).InfoS("EndpointSlice address type not supported by kube-proxy", "addressType", endpointSlice.AddressType)
return false return false
} }
// This should never happen // This should never happen
if endpointSlice == nil { if endpointSlice == nil {
klog.Error("Nil endpointSlice passed to EndpointSliceUpdate") klog.ErrorS(nil, "Nil endpointSlice passed to EndpointSliceUpdate")
return false return false
} }
namespacedName, _, err := endpointSliceCacheKeys(endpointSlice) namespacedName, _, err := endpointSliceCacheKeys(endpointSlice)
if err != nil { if err != nil {
klog.Warningf("Error getting endpoint slice cache keys: %v", err) klog.InfoS("Error getting endpoint slice cache keys", "err", err)
return false return false
} }
@ -349,8 +349,8 @@ func getLastChangeTriggerTime(annotations map[string]string) time.Time {
} }
val, err := time.Parse(time.RFC3339Nano, annotations[v1.EndpointsLastChangeTriggerTime]) val, err := time.Parse(time.RFC3339Nano, annotations[v1.EndpointsLastChangeTriggerTime])
if err != nil { if err != nil {
klog.Warningf("Error while parsing EndpointsLastChangeTriggerTimeAnnotation: '%s'. Error is %v", klog.ErrorS(err, "Error while parsing EndpointsLastChangeTriggerTimeAnnotation",
annotations[v1.EndpointsLastChangeTriggerTime], err) "value", annotations[v1.EndpointsLastChangeTriggerTime])
// In case of error val = time.Zero, which is ignored in the upstream code. // In case of error val = time.Zero, which is ignored in the upstream code.
} }
return val return val
@ -419,7 +419,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
for i := range ss.Ports { for i := range ss.Ports {
port := &ss.Ports[i] port := &ss.Ports[i]
if port.Port == 0 { if port.Port == 0 {
klog.Warningf("ignoring invalid endpoint port %s", port.Name) klog.ErrorS(nil, "Ignoring invalid endpoint port", "portName", port.Name)
continue continue
} }
svcPortName := ServicePortName{ svcPortName := ServicePortName{
@ -430,7 +430,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
for i := range ss.Addresses { for i := range ss.Addresses {
addr := &ss.Addresses[i] addr := &ss.Addresses[i]
if addr.IP == "" { if addr.IP == "" {
klog.Warningf("ignoring invalid endpoint port %s with empty host", port.Name) klog.ErrorS(nil, "Ignoring invalid endpoint port with empty host", "portName", port.Name)
continue continue
} }
@ -466,7 +466,7 @@ func (ect *EndpointChangeTracker) endpointsToEndpointsMap(endpoints *v1.Endpoint
} }
} }
klog.V(3).Infof("Setting endpoints for %q to %+v", svcPortName, formatEndpointsList(endpointsMap[svcPortName])) klog.V(3).InfoS("Setting endpoints for service port", "portName", svcPortName, "endpoints", formatEndpointsList(endpointsMap[svcPortName]))
} }
} }
return endpointsMap return endpointsMap
@ -550,7 +550,7 @@ func detectStaleConnections(oldEndpointsMap, newEndpointsMap EndpointsMap, stale
} }
} }
if stale { if stale {
klog.V(4).Infof("Stale endpoint %v -> %v", svcPortName, ep.String()) klog.V(4).InfoS("Stale endpoint", "portName", svcPortName, "endpoint", ep)
*staleEndpoints = append(*staleEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName}) *staleEndpoints = append(*staleEndpoints, ServiceEndpoint{Endpoint: ep.String(), ServicePortName: svcPortName})
} }
} }

View File

@ -166,7 +166,7 @@ func standardEndpointInfo(ep *BaseEndpointInfo) Endpoint {
func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool { func (cache *EndpointSliceCache) updatePending(endpointSlice *discovery.EndpointSlice, remove bool) bool {
serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice) serviceKey, sliceKey, err := endpointSliceCacheKeys(endpointSlice)
if err != nil { if err != nil {
klog.Warningf("Error getting endpoint slice cache keys: %v", err) klog.ErrorS(err, "Error getting endpoint slice cache keys")
return false return false
} }
@ -235,12 +235,12 @@ func (cache *EndpointSliceCache) endpointInfoByServicePort(serviceNN types.Names
for _, sliceInfo := range sliceInfoByName { for _, sliceInfo := range sliceInfoByName {
for _, port := range sliceInfo.Ports { for _, port := range sliceInfo.Ports {
if port.Name == nil { if port.Name == nil {
klog.Warningf("ignoring port with nil name %v", port) klog.ErrorS(nil, "Ignoring port with nil name", "portName", port.Name)
continue continue
} }
// TODO: handle nil ports to mean "all" // TODO: handle nil ports to mean "all"
if port.Port == nil || *port.Port == int32(0) { if port.Port == nil || *port.Port == int32(0) {
klog.Warningf("ignoring invalid endpoint port %s", *port.Name) klog.ErrorS(nil, "Ignoring invalid endpoint port", "portName", *port.Name)
continue continue
} }
@ -266,7 +266,7 @@ func (cache *EndpointSliceCache) addEndpoints(serviceNN types.NamespacedName, po
// iterate through endpoints to add them to endpointSet. // iterate through endpoints to add them to endpointSet.
for _, endpoint := range endpoints { for _, endpoint := range endpoints {
if len(endpoint.Addresses) == 0 { if len(endpoint.Addresses) == 0 {
klog.Warningf("ignoring invalid endpoint port %s with empty addresses", endpoint) klog.ErrorS(nil, "Ignoring invalid endpoint port with empty address", "endpoint", endpoint)
continue continue
} }
@ -355,7 +355,7 @@ func endpointsMapFromEndpointInfo(endpointInfoBySP map[ServicePortName]map[strin
// Ensure endpoints are always returned in the same order to simplify diffing. // Ensure endpoints are always returned in the same order to simplify diffing.
sort.Sort(byEndpoint(endpointsMap[svcPortName])) sort.Sort(byEndpoint(endpointsMap[svcPortName]))
klog.V(3).Infof("Setting endpoints for %q to %+v", svcPortName, formatEndpointsList(endpointsMap[svcPortName])) klog.V(3).InfoS("Setting endpoints for service port name", "portName", svcPortName, "endpoints", formatEndpointsList(endpointsMap[svcPortName]))
} }
} }

View File

@ -182,14 +182,16 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
// Log the IPs not matching the ipFamily // Log the IPs not matching the ipFamily
if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(sct.ipFamily)]; ok && len(ips) > 0 { if ips, ok := ipFamilyMap[utilproxy.OtherIPFamily(sct.ipFamily)]; ok && len(ips) > 0 {
klog.V(4).Infof("service change tracker(%v) ignored the following external IPs(%s) for service %v/%v as they don't match IPFamily", sct.ipFamily, strings.Join(ips, ","), service.Namespace, service.Name) klog.V(4).InfoS("Service change tracker ignored the following external IPs for given service as they don't match IP Family",
"ipFamily", sct.ipFamily, "externalIPs", strings.Join(ips, ","), "service", klog.KObj(service))
} }
ipFamilyMap = utilproxy.MapCIDRsByIPFamily(loadBalancerSourceRanges) ipFamilyMap = utilproxy.MapCIDRsByIPFamily(loadBalancerSourceRanges)
info.loadBalancerSourceRanges = ipFamilyMap[sct.ipFamily] info.loadBalancerSourceRanges = ipFamilyMap[sct.ipFamily]
// Log the CIDRs not matching the ipFamily // Log the CIDRs not matching the ipFamily
if cidrs, ok := ipFamilyMap[utilproxy.OtherIPFamily(sct.ipFamily)]; ok && len(cidrs) > 0 { if cidrs, ok := ipFamilyMap[utilproxy.OtherIPFamily(sct.ipFamily)]; ok && len(cidrs) > 0 {
klog.V(4).Infof("service change tracker(%v) ignored the following load balancer source ranges(%s) for service %v/%v as they don't match IPFamily", sct.ipFamily, strings.Join(cidrs, ","), service.Namespace, service.Name) klog.V(4).InfoS("Service change tracker ignored the following load balancer source ranges for given Service as they don't match IP Family",
"ipFamily", sct.ipFamily, "loadBalancerSourceRanges", strings.Join(cidrs, ","), "service", klog.KObj(service))
} }
// Obtain Load Balancer Ingress IPs // Obtain Load Balancer Ingress IPs
@ -204,8 +206,8 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
ipFamilyMap = utilproxy.MapIPsByIPFamily(ips) ipFamilyMap = utilproxy.MapIPsByIPFamily(ips)
if ipList, ok := ipFamilyMap[utilproxy.OtherIPFamily(sct.ipFamily)]; ok && len(ipList) > 0 { if ipList, ok := ipFamilyMap[utilproxy.OtherIPFamily(sct.ipFamily)]; ok && len(ipList) > 0 {
klog.V(4).Infof("service change tracker(%v) ignored the following load balancer(%s) ingress ips for service %v/%v as they don't match IPFamily", sct.ipFamily, strings.Join(ipList, ","), service.Namespace, service.Name) klog.V(4).InfoS("Service change tracker ignored the following load balancer ingress IPs for given Service as they don't match the IP Family",
"ipFamily", sct.ipFamily, "loadBalancerIngressIps", strings.Join(ipList, ","), "service", klog.KObj(service))
} }
// Create the LoadBalancerStatus with the filtered IPs // Create the LoadBalancerStatus with the filtered IPs
for _, ip := range ipFamilyMap[sct.ipFamily] { for _, ip := range ipFamilyMap[sct.ipFamily] {
@ -216,7 +218,7 @@ func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, servic
if apiservice.NeedsHealthCheck(service) { if apiservice.NeedsHealthCheck(service) {
p := service.Spec.HealthCheckNodePort p := service.Spec.HealthCheckNodePort
if p == 0 { if p == 0 {
klog.Errorf("Service %s/%s has no healthcheck nodeport", service.Namespace, service.Name) klog.ErrorS(nil, "Service has no healthcheck nodeport", "service", klog.KObj(service))
} else { } else {
info.healthCheckNodePort = int(p) info.healthCheckNodePort = int(p)
} }
@ -299,7 +301,7 @@ func (sct *ServiceChangeTracker) Update(previous, current *v1.Service) bool {
if reflect.DeepEqual(change.previous, change.current) { if reflect.DeepEqual(change.previous, change.current) {
delete(sct.items, namespacedName) delete(sct.items, namespacedName)
} else { } else {
klog.V(2).Infof("Service %s updated: %d ports", namespacedName, len(change.current)) klog.V(2).InfoS("Service updated ports", "service", klog.KObj(svc), "portCount", len(change.current))
} }
metrics.ServiceChangesPending.Set(float64(len(sct.items))) metrics.ServiceChangesPending.Set(float64(len(sct.items)))
return len(sct.items) > 0 return len(sct.items) > 0
@ -414,9 +416,9 @@ func (sm *ServiceMap) merge(other ServiceMap) sets.String {
existingPorts.Insert(svcPortName.String()) existingPorts.Insert(svcPortName.String())
_, exists := (*sm)[svcPortName] _, exists := (*sm)[svcPortName]
if !exists { if !exists {
klog.V(1).Infof("Adding new service port %q at %s", svcPortName, info.String()) klog.V(1).InfoS("Adding new service port", "portName", svcPortName, "servicePort", info)
} else { } else {
klog.V(1).Infof("Updating existing service port %q at %s", svcPortName, info.String()) klog.V(1).InfoS("Updating existing service port", "portName", svcPortName, "servicePort", info)
} }
(*sm)[svcPortName] = info (*sm)[svcPortName] = info
} }
@ -439,13 +441,13 @@ func (sm *ServiceMap) unmerge(other ServiceMap, UDPStaleClusterIP sets.String) {
for svcPortName := range other { for svcPortName := range other {
info, exists := (*sm)[svcPortName] info, exists := (*sm)[svcPortName]
if exists { if exists {
klog.V(1).Infof("Removing service port %q", svcPortName) klog.V(1).InfoS("Removing service port", "portName", svcPortName)
if info.Protocol() == v1.ProtocolUDP { if info.Protocol() == v1.ProtocolUDP {
UDPStaleClusterIP.Insert(info.ClusterIP().String()) UDPStaleClusterIP.Insert(info.ClusterIP().String())
} }
delete(*sm, svcPortName) delete(*sm, svcPortName)
} else { } else {
klog.Errorf("Service port %q doesn't exists", svcPortName) klog.ErrorS(nil, "Service port does not exists", "portName", svcPortName)
} }
} }
} }

View File

@ -54,14 +54,14 @@ func FilterEndpoints(endpoints []Endpoint, svcInfo ServicePort, nodeLabels map[s
func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, nodeLabels map[string]string) []Endpoint { func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, nodeLabels map[string]string) []Endpoint {
if hintsAnnotation != "Auto" && hintsAnnotation != "auto" { if hintsAnnotation != "Auto" && hintsAnnotation != "auto" {
if hintsAnnotation != "" && hintsAnnotation != "Disabled" && hintsAnnotation != "disabled" { if hintsAnnotation != "" && hintsAnnotation != "Disabled" && hintsAnnotation != "disabled" {
klog.Warningf("Skipping topology aware endpoint filtering since Service has unexpected value for %s annotation: %s", v1.AnnotationTopologyAwareHints, hintsAnnotation) klog.InfoS("Skipping topology aware endpoint filtering since Service has unexpected value", "annotationTopologyAwareHints", v1.AnnotationTopologyAwareHints, "hints", hintsAnnotation)
} }
return endpoints return endpoints
} }
zone, ok := nodeLabels[v1.LabelTopologyZone] zone, ok := nodeLabels[v1.LabelTopologyZone]
if !ok || zone == "" { if !ok || zone == "" {
klog.Warningf("Skipping topology aware endpoint filtering since node is missing %s label", v1.LabelTopologyZone) klog.InfoS("Skipping topology aware endpoint filtering since node is missing label", "label", v1.LabelTopologyZone)
return endpoints return endpoints
} }
@ -69,7 +69,7 @@ func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, node
for _, endpoint := range endpoints { for _, endpoint := range endpoints {
if endpoint.GetZoneHints().Len() == 0 { if endpoint.GetZoneHints().Len() == 0 {
klog.Warningf("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint") klog.InfoS("Skipping topology aware endpoint filtering since one or more endpoints is missing a zone hint")
return endpoints return endpoints
} }
if endpoint.GetZoneHints().Has(zone) { if endpoint.GetZoneHints().Has(zone) {
@ -78,7 +78,7 @@ func filterEndpointsWithHints(endpoints []Endpoint, hintsAnnotation string, node
} }
if len(filteredEndpoints) == 0 { if len(filteredEndpoints) == 0 {
klog.Warningf("Skipping topology aware endpoint filtering since no hints were provided for zone %s", zone) klog.InfoS("Skipping topology aware endpoint filtering since no hints were provided for zone", "zone", zone)
return endpoints return endpoints
} }