From f06ce08d63db87da3d5cf6aaf8a50c858e5d8a35 Mon Sep 17 00:00:00 2001 From: Shivanshu Raj Shrivastava Date: Fri, 17 Sep 2021 00:06:02 +0530 Subject: [PATCH] Migrated pkg/proxy/winuserspace to structured logging (#105035) * migrated roundrobin.go * migrated proxysocket.go * used KRef in service --- pkg/proxy/winuserspace/proxysocket.go | 40 +++++++++++++-------------- pkg/proxy/winuserspace/roundrobin.go | 27 +++++++++--------- 2 files changed, 33 insertions(+), 34 deletions(-) diff --git a/pkg/proxy/winuserspace/proxysocket.go b/pkg/proxy/winuserspace/proxysocket.go index 4ac2110d708..a788894ce2b 100644 --- a/pkg/proxy/winuserspace/proxysocket.go +++ b/pkg/proxy/winuserspace/proxysocket.go @@ -101,10 +101,10 @@ func tryConnect(service ServicePortPortalName, srcAddr net.Addr, protocol string } endpoint, err := proxier.loadBalancer.NextEndpoint(servicePortName, srcAddr, sessionAffinityReset) if err != nil { - klog.Errorf("Couldn't find an endpoint for %s: %v", service, err) + klog.ErrorS(err, "Couldn't find an endpoint for service", "service", klog.KRef(service.Namespace, service.Name)) return nil, err } - klog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) + klog.V(3).InfoS("Mapped service to endpoint", "service", klog.KRef(service.Namespace, service.Name), "endpoint", endpoint) // TODO: This could spin up a new goroutine to make the outbound connection, // and keep accepting inbound traffic. outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout) @@ -112,7 +112,7 @@ func tryConnect(service ServicePortPortalName, srcAddr net.Addr, protocol string if isTooManyFDsError(err) { panic("Dial failed: " + err.Error()) } - klog.Errorf("Dial failed: %v", err) + klog.ErrorS(err, "Dial failed") sessionAffinityReset = true continue } @@ -141,13 +141,13 @@ func (tcp *tcpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv // Then the service port was just closed so the accept failure is to be expected. return } - klog.Errorf("Accept failed: %v", err) + klog.ErrorS(err, "Accept failed") continue } - klog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) + klog.V(3).InfoS("Accepted TCP connection from remote", "remoteAddress", inConn.RemoteAddr(), "localAddress", inConn.LocalAddr()) outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier) if err != nil { - klog.Errorf("Failed to connect to balancer: %v", err) + klog.ErrorS(err, "Failed to connect to balancer") inConn.Close() continue } @@ -160,8 +160,8 @@ func (tcp *tcpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv func proxyTCP(in, out *net.TCPConn) { var wg sync.WaitGroup wg.Add(2) - klog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", - in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) + klog.V(4).InfoS("Creating proxy between remote and local addresses", + "inRemoteAddress", in.RemoteAddr(), "inLocalAddress", in.LocalAddr(), "outLocalAddress", out.LocalAddr(), "outRemoteAddress", out.RemoteAddr()) go copyBytes("from backend", in, out, &wg) go copyBytes("to backend", out, in, &wg) wg.Wait() @@ -169,14 +169,14 @@ func proxyTCP(in, out *net.TCPConn) { func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { defer wg.Done() - klog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr()) + klog.V(4).InfoS("Copying remote address bytes", "direction", direction, "sourceRemoteAddress", src.RemoteAddr(), "destinationRemoteAddress", dest.RemoteAddr()) n, err := io.Copy(dest, src) if err != nil { if !isClosedError(err) { - klog.Errorf("I/O error: %v", err) + klog.ErrorS(err, "I/O error occurred") } } - klog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr()) + klog.V(4).InfoS("Copied remote address bytes", "bytes", n, "direction", direction, "sourceRemoteAddress", src.RemoteAddr(), "destinationRemoteAddress", dest.RemoteAddr()) dest.Close() src.Close() } @@ -222,11 +222,11 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv if err != nil { if e, ok := err.(net.Error); ok { if e.Temporary() { - klog.V(1).Infof("ReadFrom had a temporary failure: %v", err) + klog.V(1).ErrorS(err, "ReadFrom had a temporary failure") continue } } - klog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err) + klog.ErrorS(err, "ReadFrom failed, exiting ProxyLoop") break } @@ -240,14 +240,14 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv _, err = svrConn.Write(buffer[0:n]) if err != nil { if !logTimeout(err) { - klog.Errorf("Write failed: %v", err) + klog.ErrorS(err, "Write failed") // TODO: Maybe tear down the goroutine for this client/server pair? } continue } err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout)) if err != nil { - klog.Errorf("SetDeadline failed: %v", err) + klog.ErrorS(err, "SetDeadline failed") continue } } @@ -261,14 +261,14 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne if !found { // TODO: This could spin up a new goroutine to make the outbound connection, // and keep accepting inbound traffic. - klog.V(3).Infof("New UDP connection from %s", cliAddr) + klog.V(3).InfoS("New UDP connection from client", "address", cliAddr) var err error svrConn, err = tryConnect(service, cliAddr, "udp", proxier) if err != nil { return nil, err } if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil { - klog.Errorf("SetDeadline failed: %v", err) + klog.ErrorS(err, "SetDeadline failed") return nil, err } activeClients.clients[cliAddr.String()] = svrConn @@ -289,20 +289,20 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ n, err := svrConn.Read(buffer[0:]) if err != nil { if !logTimeout(err) { - klog.Errorf("Read failed: %v", err) + klog.ErrorS(err, "Read failed") } break } err = svrConn.SetDeadline(time.Now().Add(timeout)) if err != nil { - klog.Errorf("SetDeadline failed: %v", err) + klog.ErrorS(err, "SetDeadline failed") break } _, err = udp.WriteTo(buffer[0:n], cliAddr) if err != nil { if !logTimeout(err) { - klog.Errorf("WriteTo failed: %v", err) + klog.ErrorS(err, "WriteTo failed") } break } diff --git a/pkg/proxy/winuserspace/roundrobin.go b/pkg/proxy/winuserspace/roundrobin.go index f725c05a48c..041be3e0198 100644 --- a/pkg/proxy/winuserspace/roundrobin.go +++ b/pkg/proxy/winuserspace/roundrobin.go @@ -82,7 +82,7 @@ func NewLoadBalancerRR() *LoadBalancerRR { } func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType v1.ServiceAffinity, ttlSeconds int) error { - klog.V(4).Infof("LoadBalancerRR NewService %q", svcPort) + klog.V(4).InfoS("LoadBalancerRR NewService", "servicePortName", svcPort) lb.lock.Lock() defer lb.lock.Unlock() lb.newServiceInternal(svcPort, affinityType, ttlSeconds) @@ -97,7 +97,7 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi if _, exists := lb.services[svcPort]; !exists { lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlSeconds)} - klog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort) + klog.V(4).InfoS("LoadBalancerRR service did not exist, created", "servicePortName", svcPort) } else if affinityType != "" { lb.services[svcPort].affinity.affinityType = affinityType } @@ -105,7 +105,7 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi } func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) { - klog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort) + klog.V(4).InfoS("LoadBalancerRR DeleteService", "servicePortName", svcPort) lb.lock.Lock() defer lb.lock.Unlock() delete(lb.services, svcPort) @@ -135,8 +135,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne if len(state.endpoints) == 0 { return "", ErrMissingEndpoints } - klog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", svcPort, srcAddr, state.endpoints) - + klog.V(4).InfoS("NextEndpoint for service", "servicePortName", svcPort, "address", srcAddr, "endpoints", state.endpoints) sessionAffinityEnabled := isSessionAffinity(&state.affinity) var ipaddr string @@ -153,7 +152,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne // Affinity wins. endpoint := sessionAffinity.endpoint sessionAffinity.lastUsed = time.Now() - klog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %#v: %s", svcPort, ipaddr, sessionAffinity, endpoint) + klog.V(4).InfoS("NextEndpoint for service from IP with sessionAffinity", "servicePortName", svcPort, "IP", ipaddr, "sessionAffinity", sessionAffinity, "endpoint", endpoint) return endpoint, nil } } @@ -172,7 +171,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne affinity.lastUsed = time.Now() affinity.endpoint = endpoint affinity.clientIP = ipaddr - klog.V(4).Infof("Updated affinity key %s: %#v", ipaddr, state.affinity.affinityMap[ipaddr]) + klog.V(4).InfoS("Updated affinity key", "IP", ipaddr, "affinityState", state.affinity.affinityMap[ipaddr]) } return endpoint, nil @@ -182,7 +181,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) { for _, affinity := range state.affinity.affinityMap { if affinity.endpoint == endpoint { - klog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, svcPort) + klog.V(4).InfoS("Removing client from affinityMap for service", "endpoint", affinity.endpoint, "servicePortName", svcPort) delete(state.affinity.affinityMap, affinity.clientIP) } } @@ -205,7 +204,7 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn } for mKey, mVal := range allEndpoints { if mVal == 1 { - klog.V(2).Infof("Delete endpoint %s for service %q", mKey, svcPort) + klog.V(2).InfoS("Delete endpoint for service", "endpoint", mKey, "servicePortName", svcPort) removeSessionAffinityByEndpoint(state, svcPort, mKey) } } @@ -223,7 +222,7 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) { state, exists := lb.services[svcPort] if !exists || state == nil || len(newEndpoints) > 0 { - klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + klog.V(1).InfoS("LoadBalancerRR: Setting endpoints service", "servicePortName", svcPort, "endpoints", newEndpoints) lb.updateAffinityMap(svcPort, newEndpoints) // OnEndpointsAdd can be called without NewService being called externally. // To be safe we will call it here. A new service will only be created @@ -257,7 +256,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint } if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(stringslices.Clone(curEndpoints), newEndpoints) { - klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) + klog.V(1).InfoS("LoadBalancerRR: Setting endpoints for service", "servicePortName", svcPort, "endpoints", newEndpoints) lb.updateAffinityMap(svcPort, newEndpoints) // OnEndpointsUpdate can be called without NewService being called externally. // To be safe we will call it here. A new service will only be created @@ -275,7 +274,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint for portname := range oldPortsToEndpoints { svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} if _, exists := registeredEndpoints[svcPort]; !exists { - klog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) + klog.V(2).InfoS("LoadBalancerRR: Removing endpoints service", "servicePortName", svcPort) // Reset but don't delete. state := lb.services[svcPort] state.endpoints = []string{} @@ -293,7 +292,7 @@ func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) { for portname := range portsToEndpoints { svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} - klog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) + klog.V(2).InfoS("LoadBalancerRR: Removing endpoints service", "servicePortName", svcPort) // If the service is still around, reset but don't delete. if state, ok := lb.services[svcPort]; ok { state.endpoints = []string{} @@ -326,7 +325,7 @@ func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort proxy.ServicePortNa } for ip, affinity := range state.affinity.affinityMap { if int(time.Since(affinity.lastUsed).Seconds()) >= state.affinity.ttlSeconds { - klog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, svcPort) + klog.V(4).InfoS("Removing client from affinityMap for service", "IP", affinity.clientIP, "servicePortName", svcPort) delete(state.affinity.affinityMap, ip) } }