Migrated pkg/proxy/winuserspace to structured logging (#105035)

* migrated roundrobin.go

* migrated proxysocket.go

* used KRef in service
This commit is contained in:
Shivanshu Raj Shrivastava 2021-09-17 00:06:02 +05:30 committed by GitHub
parent 51e39a45d9
commit f06ce08d63
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 34 deletions

View File

@ -101,10 +101,10 @@ func tryConnect(service ServicePortPortalName, srcAddr net.Addr, protocol string
} }
endpoint, err := proxier.loadBalancer.NextEndpoint(servicePortName, srcAddr, sessionAffinityReset) endpoint, err := proxier.loadBalancer.NextEndpoint(servicePortName, srcAddr, sessionAffinityReset)
if err != nil { if err != nil {
klog.Errorf("Couldn't find an endpoint for %s: %v", service, err) klog.ErrorS(err, "Couldn't find an endpoint for service", "service", klog.KRef(service.Namespace, service.Name))
return nil, err return nil, err
} }
klog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint) klog.V(3).InfoS("Mapped service to endpoint", "service", klog.KRef(service.Namespace, service.Name), "endpoint", endpoint)
// TODO: This could spin up a new goroutine to make the outbound connection, // TODO: This could spin up a new goroutine to make the outbound connection,
// and keep accepting inbound traffic. // and keep accepting inbound traffic.
outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout) outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout)
@ -112,7 +112,7 @@ func tryConnect(service ServicePortPortalName, srcAddr net.Addr, protocol string
if isTooManyFDsError(err) { if isTooManyFDsError(err) {
panic("Dial failed: " + err.Error()) panic("Dial failed: " + err.Error())
} }
klog.Errorf("Dial failed: %v", err) klog.ErrorS(err, "Dial failed")
sessionAffinityReset = true sessionAffinityReset = true
continue continue
} }
@ -141,13 +141,13 @@ func (tcp *tcpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv
// Then the service port was just closed so the accept failure is to be expected. // Then the service port was just closed so the accept failure is to be expected.
return return
} }
klog.Errorf("Accept failed: %v", err) klog.ErrorS(err, "Accept failed")
continue continue
} }
klog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr()) klog.V(3).InfoS("Accepted TCP connection from remote", "remoteAddress", inConn.RemoteAddr(), "localAddress", inConn.LocalAddr())
outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier) outConn, err := tryConnect(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", proxier)
if err != nil { if err != nil {
klog.Errorf("Failed to connect to balancer: %v", err) klog.ErrorS(err, "Failed to connect to balancer")
inConn.Close() inConn.Close()
continue continue
} }
@ -160,8 +160,8 @@ func (tcp *tcpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv
func proxyTCP(in, out *net.TCPConn) { func proxyTCP(in, out *net.TCPConn) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(2) wg.Add(2)
klog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v", klog.V(4).InfoS("Creating proxy between remote and local addresses",
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr()) "inRemoteAddress", in.RemoteAddr(), "inLocalAddress", in.LocalAddr(), "outLocalAddress", out.LocalAddr(), "outRemoteAddress", out.RemoteAddr())
go copyBytes("from backend", in, out, &wg) go copyBytes("from backend", in, out, &wg)
go copyBytes("to backend", out, in, &wg) go copyBytes("to backend", out, in, &wg)
wg.Wait() wg.Wait()
@ -169,14 +169,14 @@ func proxyTCP(in, out *net.TCPConn) {
func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) { func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
klog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr()) klog.V(4).InfoS("Copying remote address bytes", "direction", direction, "sourceRemoteAddress", src.RemoteAddr(), "destinationRemoteAddress", dest.RemoteAddr())
n, err := io.Copy(dest, src) n, err := io.Copy(dest, src)
if err != nil { if err != nil {
if !isClosedError(err) { if !isClosedError(err) {
klog.Errorf("I/O error: %v", err) klog.ErrorS(err, "I/O error occurred")
} }
} }
klog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr()) klog.V(4).InfoS("Copied remote address bytes", "bytes", n, "direction", direction, "sourceRemoteAddress", src.RemoteAddr(), "destinationRemoteAddress", dest.RemoteAddr())
dest.Close() dest.Close()
src.Close() src.Close()
} }
@ -222,11 +222,11 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv
if err != nil { if err != nil {
if e, ok := err.(net.Error); ok { if e, ok := err.(net.Error); ok {
if e.Temporary() { if e.Temporary() {
klog.V(1).Infof("ReadFrom had a temporary failure: %v", err) klog.V(1).ErrorS(err, "ReadFrom had a temporary failure")
continue continue
} }
} }
klog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err) klog.ErrorS(err, "ReadFrom failed, exiting ProxyLoop")
break break
} }
@ -240,14 +240,14 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv
_, err = svrConn.Write(buffer[0:n]) _, err = svrConn.Write(buffer[0:n])
if err != nil { if err != nil {
if !logTimeout(err) { if !logTimeout(err) {
klog.Errorf("Write failed: %v", err) klog.ErrorS(err, "Write failed")
// TODO: Maybe tear down the goroutine for this client/server pair? // TODO: Maybe tear down the goroutine for this client/server pair?
} }
continue continue
} }
err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout)) err = svrConn.SetDeadline(time.Now().Add(myInfo.timeout))
if err != nil { if err != nil {
klog.Errorf("SetDeadline failed: %v", err) klog.ErrorS(err, "SetDeadline failed")
continue continue
} }
} }
@ -261,14 +261,14 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr ne
if !found { if !found {
// TODO: This could spin up a new goroutine to make the outbound connection, // TODO: This could spin up a new goroutine to make the outbound connection,
// and keep accepting inbound traffic. // and keep accepting inbound traffic.
klog.V(3).Infof("New UDP connection from %s", cliAddr) klog.V(3).InfoS("New UDP connection from client", "address", cliAddr)
var err error var err error
svrConn, err = tryConnect(service, cliAddr, "udp", proxier) svrConn, err = tryConnect(service, cliAddr, "udp", proxier)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil { if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil {
klog.Errorf("SetDeadline failed: %v", err) klog.ErrorS(err, "SetDeadline failed")
return nil, err return nil, err
} }
activeClients.clients[cliAddr.String()] = svrConn activeClients.clients[cliAddr.String()] = svrConn
@ -289,20 +289,20 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ
n, err := svrConn.Read(buffer[0:]) n, err := svrConn.Read(buffer[0:])
if err != nil { if err != nil {
if !logTimeout(err) { if !logTimeout(err) {
klog.Errorf("Read failed: %v", err) klog.ErrorS(err, "Read failed")
} }
break break
} }
err = svrConn.SetDeadline(time.Now().Add(timeout)) err = svrConn.SetDeadline(time.Now().Add(timeout))
if err != nil { if err != nil {
klog.Errorf("SetDeadline failed: %v", err) klog.ErrorS(err, "SetDeadline failed")
break break
} }
_, err = udp.WriteTo(buffer[0:n], cliAddr) _, err = udp.WriteTo(buffer[0:n], cliAddr)
if err != nil { if err != nil {
if !logTimeout(err) { if !logTimeout(err) {
klog.Errorf("WriteTo failed: %v", err) klog.ErrorS(err, "WriteTo failed")
} }
break break
} }

View File

@ -82,7 +82,7 @@ func NewLoadBalancerRR() *LoadBalancerRR {
} }
func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType v1.ServiceAffinity, ttlSeconds int) error { func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType v1.ServiceAffinity, ttlSeconds int) error {
klog.V(4).Infof("LoadBalancerRR NewService %q", svcPort) klog.V(4).InfoS("LoadBalancerRR NewService", "servicePortName", svcPort)
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
lb.newServiceInternal(svcPort, affinityType, ttlSeconds) lb.newServiceInternal(svcPort, affinityType, ttlSeconds)
@ -97,7 +97,7 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi
if _, exists := lb.services[svcPort]; !exists { if _, exists := lb.services[svcPort]; !exists {
lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlSeconds)} lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlSeconds)}
klog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort) klog.V(4).InfoS("LoadBalancerRR service did not exist, created", "servicePortName", svcPort)
} else if affinityType != "" { } else if affinityType != "" {
lb.services[svcPort].affinity.affinityType = affinityType lb.services[svcPort].affinity.affinityType = affinityType
} }
@ -105,7 +105,7 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi
} }
func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) { func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) {
klog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort) klog.V(4).InfoS("LoadBalancerRR DeleteService", "servicePortName", svcPort)
lb.lock.Lock() lb.lock.Lock()
defer lb.lock.Unlock() defer lb.lock.Unlock()
delete(lb.services, svcPort) delete(lb.services, svcPort)
@ -135,8 +135,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
if len(state.endpoints) == 0 { if len(state.endpoints) == 0 {
return "", ErrMissingEndpoints return "", ErrMissingEndpoints
} }
klog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", svcPort, srcAddr, state.endpoints) klog.V(4).InfoS("NextEndpoint for service", "servicePortName", svcPort, "address", srcAddr, "endpoints", state.endpoints)
sessionAffinityEnabled := isSessionAffinity(&state.affinity) sessionAffinityEnabled := isSessionAffinity(&state.affinity)
var ipaddr string var ipaddr string
@ -153,7 +152,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
// Affinity wins. // Affinity wins.
endpoint := sessionAffinity.endpoint endpoint := sessionAffinity.endpoint
sessionAffinity.lastUsed = time.Now() sessionAffinity.lastUsed = time.Now()
klog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %#v: %s", svcPort, ipaddr, sessionAffinity, endpoint) klog.V(4).InfoS("NextEndpoint for service from IP with sessionAffinity", "servicePortName", svcPort, "IP", ipaddr, "sessionAffinity", sessionAffinity, "endpoint", endpoint)
return endpoint, nil return endpoint, nil
} }
} }
@ -172,7 +171,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
affinity.lastUsed = time.Now() affinity.lastUsed = time.Now()
affinity.endpoint = endpoint affinity.endpoint = endpoint
affinity.clientIP = ipaddr affinity.clientIP = ipaddr
klog.V(4).Infof("Updated affinity key %s: %#v", ipaddr, state.affinity.affinityMap[ipaddr]) klog.V(4).InfoS("Updated affinity key", "IP", ipaddr, "affinityState", state.affinity.affinityMap[ipaddr])
} }
return endpoint, nil return endpoint, nil
@ -182,7 +181,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) { func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
for _, affinity := range state.affinity.affinityMap { for _, affinity := range state.affinity.affinityMap {
if affinity.endpoint == endpoint { if affinity.endpoint == endpoint {
klog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, svcPort) klog.V(4).InfoS("Removing client from affinityMap for service", "endpoint", affinity.endpoint, "servicePortName", svcPort)
delete(state.affinity.affinityMap, affinity.clientIP) delete(state.affinity.affinityMap, affinity.clientIP)
} }
} }
@ -205,7 +204,7 @@ func (lb *LoadBalancerRR) updateAffinityMap(svcPort proxy.ServicePortName, newEn
} }
for mKey, mVal := range allEndpoints { for mKey, mVal := range allEndpoints {
if mVal == 1 { if mVal == 1 {
klog.V(2).Infof("Delete endpoint %s for service %q", mKey, svcPort) klog.V(2).InfoS("Delete endpoint for service", "endpoint", mKey, "servicePortName", svcPort)
removeSessionAffinityByEndpoint(state, svcPort, mKey) removeSessionAffinityByEndpoint(state, svcPort, mKey)
} }
} }
@ -223,7 +222,7 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
state, exists := lb.services[svcPort] state, exists := lb.services[svcPort]
if !exists || state == nil || len(newEndpoints) > 0 { if !exists || state == nil || len(newEndpoints) > 0 {
klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) klog.V(1).InfoS("LoadBalancerRR: Setting endpoints service", "servicePortName", svcPort, "endpoints", newEndpoints)
lb.updateAffinityMap(svcPort, newEndpoints) lb.updateAffinityMap(svcPort, newEndpoints)
// OnEndpointsAdd can be called without NewService being called externally. // OnEndpointsAdd can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created // To be safe we will call it here. A new service will only be created
@ -257,7 +256,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint
} }
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(stringslices.Clone(curEndpoints), newEndpoints) { if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(stringslices.Clone(curEndpoints), newEndpoints) {
klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints) klog.V(1).InfoS("LoadBalancerRR: Setting endpoints for service", "servicePortName", svcPort, "endpoints", newEndpoints)
lb.updateAffinityMap(svcPort, newEndpoints) lb.updateAffinityMap(svcPort, newEndpoints)
// OnEndpointsUpdate can be called without NewService being called externally. // OnEndpointsUpdate can be called without NewService being called externally.
// To be safe we will call it here. A new service will only be created // To be safe we will call it here. A new service will only be created
@ -275,7 +274,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint
for portname := range oldPortsToEndpoints { for portname := range oldPortsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
if _, exists := registeredEndpoints[svcPort]; !exists { if _, exists := registeredEndpoints[svcPort]; !exists {
klog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) klog.V(2).InfoS("LoadBalancerRR: Removing endpoints service", "servicePortName", svcPort)
// Reset but don't delete. // Reset but don't delete.
state := lb.services[svcPort] state := lb.services[svcPort]
state.endpoints = []string{} state.endpoints = []string{}
@ -293,7 +292,7 @@ func (lb *LoadBalancerRR) OnEndpointsDelete(endpoints *v1.Endpoints) {
for portname := range portsToEndpoints { for portname := range portsToEndpoints {
svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname} svcPort := proxy.ServicePortName{NamespacedName: types.NamespacedName{Namespace: endpoints.Namespace, Name: endpoints.Name}, Port: portname}
klog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort) klog.V(2).InfoS("LoadBalancerRR: Removing endpoints service", "servicePortName", svcPort)
// If the service is still around, reset but don't delete. // If the service is still around, reset but don't delete.
if state, ok := lb.services[svcPort]; ok { if state, ok := lb.services[svcPort]; ok {
state.endpoints = []string{} state.endpoints = []string{}
@ -326,7 +325,7 @@ func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort proxy.ServicePortNa
} }
for ip, affinity := range state.affinity.affinityMap { for ip, affinity := range state.affinity.affinityMap {
if int(time.Since(affinity.lastUsed).Seconds()) >= state.affinity.ttlSeconds { if int(time.Since(affinity.lastUsed).Seconds()) >= state.affinity.ttlSeconds {
klog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, svcPort) klog.V(4).InfoS("Removing client from affinityMap for service", "IP", affinity.clientIP, "servicePortName", svcPort)
delete(state.affinity.affinityMap, ip) delete(state.affinity.affinityMap, ip)
} }
} }