mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Migrated pkg/proxy/userspace to structured logging (#104931)
* migrated roundrobin.go * migrated proxysocket.go * fixed typo * code formatting
This commit is contained in:
parent
9205473702
commit
4916b6cd74
@ -95,10 +95,10 @@ func TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protoc
|
|||||||
for _, dialTimeout := range EndpointDialTimeouts {
|
for _, dialTimeout := range EndpointDialTimeouts {
|
||||||
endpoint, err := loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
|
endpoint, err := loadBalancer.NextEndpoint(service, srcAddr, sessionAffinityReset)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Couldn't find an endpoint for %s: %v", service, err)
|
klog.ErrorS(err, "Couldn't find an endpoint for service", "service", service)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
klog.V(3).Infof("Mapped service %q to endpoint %s", service, endpoint)
|
klog.V(3).InfoS("Mapped service to endpoint", "service", service, "endpoint", endpoint)
|
||||||
// TODO: This could spin up a new goroutine to make the outbound connection,
|
// TODO: This could spin up a new goroutine to make the outbound connection,
|
||||||
// and keep accepting inbound traffic.
|
// and keep accepting inbound traffic.
|
||||||
outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout)
|
outConn, err := net.DialTimeout(protocol, endpoint, dialTimeout)
|
||||||
@ -106,7 +106,7 @@ func TryConnectEndpoints(service proxy.ServicePortName, srcAddr net.Addr, protoc
|
|||||||
if isTooManyFDsError(err) {
|
if isTooManyFDsError(err) {
|
||||||
panic("Dial failed: " + err.Error())
|
panic("Dial failed: " + err.Error())
|
||||||
}
|
}
|
||||||
klog.Errorf("Dial failed: %v", err)
|
klog.ErrorS(err, "Dial failed")
|
||||||
sessionAffinityReset = true
|
sessionAffinityReset = true
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -135,13 +135,13 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *Serv
|
|||||||
// Then the service port was just closed so the accept failure is to be expected.
|
// Then the service port was just closed so the accept failure is to be expected.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
klog.Errorf("Accept failed: %v", err)
|
klog.ErrorS(err, "Accept failed")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
klog.V(3).Infof("Accepted TCP connection from %v to %v", inConn.RemoteAddr(), inConn.LocalAddr())
|
klog.V(3).InfoS("Accepted TCP connection from remote", "remoteAddress", inConn.RemoteAddr(), "localAddress", inConn.LocalAddr())
|
||||||
outConn, err := TryConnectEndpoints(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer)
|
outConn, err := TryConnectEndpoints(service, inConn.(*net.TCPConn).RemoteAddr(), "tcp", loadBalancer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Failed to connect to balancer: %v", err)
|
klog.ErrorS(err, "Failed to connect to balancer")
|
||||||
inConn.Close()
|
inConn.Close()
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@ -154,8 +154,8 @@ func (tcp *tcpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *Serv
|
|||||||
func ProxyTCP(in, out *net.TCPConn) {
|
func ProxyTCP(in, out *net.TCPConn) {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Add(2)
|
wg.Add(2)
|
||||||
klog.V(4).Infof("Creating proxy between %v <-> %v <-> %v <-> %v",
|
klog.V(4).InfoS("Creating proxy between remote and local addresses",
|
||||||
in.RemoteAddr(), in.LocalAddr(), out.LocalAddr(), out.RemoteAddr())
|
"inRemoteAddress", in.RemoteAddr(), "inLocalAddress", in.LocalAddr(), "outLocalAddress", out.LocalAddr(), "outRemoteAddress", out.RemoteAddr())
|
||||||
go copyBytes("from backend", in, out, &wg)
|
go copyBytes("from backend", in, out, &wg)
|
||||||
go copyBytes("to backend", out, in, &wg)
|
go copyBytes("to backend", out, in, &wg)
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -163,14 +163,14 @@ func ProxyTCP(in, out *net.TCPConn) {
|
|||||||
|
|
||||||
func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) {
|
func copyBytes(direction string, dest, src *net.TCPConn, wg *sync.WaitGroup) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
klog.V(4).Infof("Copying %s: %s -> %s", direction, src.RemoteAddr(), dest.RemoteAddr())
|
klog.V(4).InfoS("Copying remote address bytes", "direction", direction, "sourceRemoteAddress", src.RemoteAddr(), "destinationRemoteAddress", dest.RemoteAddr())
|
||||||
n, err := io.Copy(dest, src)
|
n, err := io.Copy(dest, src)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !isClosedError(err) {
|
if !isClosedError(err) {
|
||||||
klog.Errorf("I/O error: %v", err)
|
klog.ErrorS(err, "I/O error occurred")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("Copied %d bytes %s: %s -> %s", n, direction, src.RemoteAddr(), dest.RemoteAddr())
|
klog.V(4).InfoS("Copied remote address bytes", "bytes", n, "direction", direction, "sourceRemoteAddress", src.RemoteAddr(), "destinationRemoteAddress", dest.RemoteAddr())
|
||||||
dest.Close()
|
dest.Close()
|
||||||
src.Close()
|
src.Close()
|
||||||
}
|
}
|
||||||
@ -215,11 +215,11 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *Serv
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
if e, ok := err.(net.Error); ok {
|
if e, ok := err.(net.Error); ok {
|
||||||
if e.Temporary() {
|
if e.Temporary() {
|
||||||
klog.V(1).Infof("ReadFrom had a temporary failure: %v", err)
|
klog.V(1).ErrorS(err, "ReadFrom had a temporary failure")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
klog.Errorf("ReadFrom failed, exiting ProxyLoop: %v", err)
|
klog.ErrorS(err, "ReadFrom failed, exiting ProxyLoop")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
// If this is a client we know already, reuse the connection and goroutine.
|
// If this is a client we know already, reuse the connection and goroutine.
|
||||||
@ -232,14 +232,14 @@ func (udp *udpProxySocket) ProxyLoop(service proxy.ServicePortName, myInfo *Serv
|
|||||||
_, err = svrConn.Write(buffer[0:n])
|
_, err = svrConn.Write(buffer[0:n])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !logTimeout(err) {
|
if !logTimeout(err) {
|
||||||
klog.Errorf("Write failed: %v", err)
|
klog.ErrorS(err, "Write failed")
|
||||||
// TODO: Maybe tear down the goroutine for this client/server pair?
|
// TODO: Maybe tear down the goroutine for this client/server pair?
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err = svrConn.SetDeadline(time.Now().Add(myInfo.Timeout))
|
err = svrConn.SetDeadline(time.Now().Add(myInfo.Timeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("SetDeadline failed: %v", err)
|
klog.ErrorS(err, "SetDeadline failed")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -253,14 +253,14 @@ func (udp *udpProxySocket) getBackendConn(activeClients *ClientCache, cliAddr ne
|
|||||||
if !found {
|
if !found {
|
||||||
// TODO: This could spin up a new goroutine to make the outbound connection,
|
// TODO: This could spin up a new goroutine to make the outbound connection,
|
||||||
// and keep accepting inbound traffic.
|
// and keep accepting inbound traffic.
|
||||||
klog.V(3).Infof("New UDP connection from %s", cliAddr)
|
klog.V(3).InfoS("New UDP connection from client", "address", cliAddr)
|
||||||
var err error
|
var err error
|
||||||
svrConn, err = TryConnectEndpoints(service, cliAddr, "udp", loadBalancer)
|
svrConn, err = TryConnectEndpoints(service, cliAddr, "udp", loadBalancer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
if err = svrConn.SetDeadline(time.Now().Add(timeout)); err != nil {
|
||||||
klog.Errorf("SetDeadline failed: %v", err)
|
klog.ErrorS(err, "SetDeadline failed")
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
activeClients.Clients[cliAddr.String()] = svrConn
|
activeClients.Clients[cliAddr.String()] = svrConn
|
||||||
@ -281,19 +281,19 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ
|
|||||||
n, err := svrConn.Read(buffer[0:])
|
n, err := svrConn.Read(buffer[0:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !logTimeout(err) {
|
if !logTimeout(err) {
|
||||||
klog.Errorf("Read failed: %v", err)
|
klog.ErrorS(err, "Read failed")
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
err = svrConn.SetDeadline(time.Now().Add(timeout))
|
err = svrConn.SetDeadline(time.Now().Add(timeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("SetDeadline failed: %v", err)
|
klog.ErrorS(err, "SetDeadline failed")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
_, err = udp.WriteTo(buffer[0:n], cliAddr)
|
_, err = udp.WriteTo(buffer[0:n], cliAddr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !logTimeout(err) {
|
if !logTimeout(err) {
|
||||||
klog.Errorf("WriteTo failed: %v", err)
|
klog.ErrorS(err, "WriteTo failed")
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -83,7 +83,7 @@ func NewLoadBalancerRR() *LoadBalancerRR {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType v1.ServiceAffinity, ttlSeconds int) error {
|
func (lb *LoadBalancerRR) NewService(svcPort proxy.ServicePortName, affinityType v1.ServiceAffinity, ttlSeconds int) error {
|
||||||
klog.V(4).Infof("LoadBalancerRR NewService %q", svcPort)
|
klog.V(4).InfoS("LoadBalancerRR NewService", "servicePortName", svcPort)
|
||||||
lb.lock.Lock()
|
lb.lock.Lock()
|
||||||
defer lb.lock.Unlock()
|
defer lb.lock.Unlock()
|
||||||
lb.newServiceInternal(svcPort, affinityType, ttlSeconds)
|
lb.newServiceInternal(svcPort, affinityType, ttlSeconds)
|
||||||
@ -98,7 +98,7 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi
|
|||||||
|
|
||||||
if _, exists := lb.services[svcPort]; !exists {
|
if _, exists := lb.services[svcPort]; !exists {
|
||||||
lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlSeconds)}
|
lb.services[svcPort] = &balancerState{affinity: *newAffinityPolicy(affinityType, ttlSeconds)}
|
||||||
klog.V(4).Infof("LoadBalancerRR service %q did not exist, created", svcPort)
|
klog.V(4).InfoS("LoadBalancerRR service does not exist, created", "servicePortName", svcPort)
|
||||||
} else if affinityType != "" {
|
} else if affinityType != "" {
|
||||||
lb.services[svcPort].affinity.affinityType = affinityType
|
lb.services[svcPort].affinity.affinityType = affinityType
|
||||||
}
|
}
|
||||||
@ -106,7 +106,7 @@ func (lb *LoadBalancerRR) newServiceInternal(svcPort proxy.ServicePortName, affi
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) {
|
func (lb *LoadBalancerRR) DeleteService(svcPort proxy.ServicePortName) {
|
||||||
klog.V(4).Infof("LoadBalancerRR DeleteService %q", svcPort)
|
klog.V(4).InfoS("LoadBalancerRR DeleteService", "servicePortName", svcPort)
|
||||||
lb.lock.Lock()
|
lb.lock.Lock()
|
||||||
defer lb.lock.Unlock()
|
defer lb.lock.Unlock()
|
||||||
delete(lb.services, svcPort)
|
delete(lb.services, svcPort)
|
||||||
@ -146,7 +146,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
|
|||||||
if len(state.endpoints) == 0 {
|
if len(state.endpoints) == 0 {
|
||||||
return "", ErrMissingEndpoints
|
return "", ErrMissingEndpoints
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("NextEndpoint for service %q, srcAddr=%v: endpoints: %+v", svcPort, srcAddr, state.endpoints)
|
klog.V(4).InfoS("NextEndpoint for service", "servicePortName", svcPort, "address", srcAddr, "endpoints", state.endpoints)
|
||||||
|
|
||||||
sessionAffinityEnabled := isSessionAffinity(&state.affinity)
|
sessionAffinityEnabled := isSessionAffinity(&state.affinity)
|
||||||
|
|
||||||
@ -164,7 +164,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
|
|||||||
// Affinity wins.
|
// Affinity wins.
|
||||||
endpoint := sessionAffinity.endpoint
|
endpoint := sessionAffinity.endpoint
|
||||||
sessionAffinity.lastUsed = time.Now()
|
sessionAffinity.lastUsed = time.Now()
|
||||||
klog.V(4).Infof("NextEndpoint for service %q from IP %s with sessionAffinity %#v: %s", svcPort, ipaddr, sessionAffinity, endpoint)
|
klog.V(4).InfoS("NextEndpoint for service from IP with sessionAffinity", "servicePortName", svcPort, "IP", ipaddr, "sessionAffinity", sessionAffinity, "endpoint", endpoint)
|
||||||
return endpoint, nil
|
return endpoint, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -183,7 +183,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
|
|||||||
affinity.lastUsed = time.Now()
|
affinity.lastUsed = time.Now()
|
||||||
affinity.endpoint = endpoint
|
affinity.endpoint = endpoint
|
||||||
affinity.clientIP = ipaddr
|
affinity.clientIP = ipaddr
|
||||||
klog.V(4).Infof("Updated affinity key %s: %#v", ipaddr, state.affinity.affinityMap[ipaddr])
|
klog.V(4).InfoS("Updated affinity key", "IP", ipaddr, "affinityState", state.affinity.affinityMap[ipaddr])
|
||||||
}
|
}
|
||||||
|
|
||||||
return endpoint, nil
|
return endpoint, nil
|
||||||
@ -193,7 +193,7 @@ func (lb *LoadBalancerRR) NextEndpoint(svcPort proxy.ServicePortName, srcAddr ne
|
|||||||
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
|
func removeSessionAffinityByEndpoint(state *balancerState, svcPort proxy.ServicePortName, endpoint string) {
|
||||||
for _, affinity := range state.affinity.affinityMap {
|
for _, affinity := range state.affinity.affinityMap {
|
||||||
if affinity.endpoint == endpoint {
|
if affinity.endpoint == endpoint {
|
||||||
klog.V(4).Infof("Removing client: %s from affinityMap for service %q", affinity.endpoint, svcPort)
|
klog.V(4).InfoS("Removing client from affinityMap for service", "endpoint", affinity.endpoint, "servicePortName", svcPort)
|
||||||
delete(state.affinity.affinityMap, affinity.clientIP)
|
delete(state.affinity.affinityMap, affinity.clientIP)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -214,7 +214,7 @@ func (lb *LoadBalancerRR) removeStaleAffinity(svcPort proxy.ServicePortName, new
|
|||||||
}
|
}
|
||||||
for _, existingEndpoint := range state.endpoints {
|
for _, existingEndpoint := range state.endpoints {
|
||||||
if !newEndpointsSet.Has(existingEndpoint) {
|
if !newEndpointsSet.Has(existingEndpoint) {
|
||||||
klog.V(2).Infof("Delete endpoint %s for service %q", existingEndpoint, svcPort)
|
klog.V(2).InfoS("Delete endpoint for service", "endpoint", existingEndpoint, "servicePortName", svcPort)
|
||||||
removeSessionAffinityByEndpoint(state, svcPort, existingEndpoint)
|
removeSessionAffinityByEndpoint(state, svcPort, existingEndpoint)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -232,7 +232,7 @@ func (lb *LoadBalancerRR) OnEndpointsAdd(endpoints *v1.Endpoints) {
|
|||||||
state, exists := lb.services[svcPort]
|
state, exists := lb.services[svcPort]
|
||||||
|
|
||||||
if !exists || state == nil || len(newEndpoints) > 0 {
|
if !exists || state == nil || len(newEndpoints) > 0 {
|
||||||
klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
|
klog.V(1).InfoS("LoadBalancerRR: Setting endpoints service", "servicePortName", svcPort, "endpoints", newEndpoints)
|
||||||
// OnEndpointsAdd can be called without NewService being called externally.
|
// OnEndpointsAdd can be called without NewService being called externally.
|
||||||
// To be safe we will call it here. A new service will only be created
|
// To be safe we will call it here. A new service will only be created
|
||||||
// if one does not already exist.
|
// if one does not already exist.
|
||||||
@ -264,7 +264,7 @@ func (lb *LoadBalancerRR) OnEndpointsUpdate(oldEndpoints, endpoints *v1.Endpoint
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(stringslices.Clone(curEndpoints), newEndpoints) {
|
if !exists || state == nil || len(curEndpoints) != len(newEndpoints) || !slicesEquiv(stringslices.Clone(curEndpoints), newEndpoints) {
|
||||||
klog.V(1).Infof("LoadBalancerRR: Setting endpoints for %s to %+v", svcPort, newEndpoints)
|
klog.V(1).InfoS("LoadBalancerRR: Setting endpoints for service", "servicePortName", svcPort, "endpoints", newEndpoints)
|
||||||
lb.removeStaleAffinity(svcPort, newEndpoints)
|
lb.removeStaleAffinity(svcPort, newEndpoints)
|
||||||
// OnEndpointsUpdate can be called without NewService being called externally.
|
// OnEndpointsUpdate can be called without NewService being called externally.
|
||||||
// To be safe we will call it here. A new service will only be created
|
// To be safe we will call it here. A new service will only be created
|
||||||
@ -292,7 +292,7 @@ func (lb *LoadBalancerRR) resetService(svcPort proxy.ServicePortName) {
|
|||||||
// If the service is still around, reset but don't delete.
|
// If the service is still around, reset but don't delete.
|
||||||
if state, ok := lb.services[svcPort]; ok {
|
if state, ok := lb.services[svcPort]; ok {
|
||||||
if len(state.endpoints) > 0 {
|
if len(state.endpoints) > 0 {
|
||||||
klog.V(2).Infof("LoadBalancerRR: Removing endpoints for %s", svcPort)
|
klog.V(2).InfoS("LoadBalancerRR: Removing endpoints service", "servicePortName", svcPort)
|
||||||
state.endpoints = []string{}
|
state.endpoints = []string{}
|
||||||
}
|
}
|
||||||
state.index = 0
|
state.index = 0
|
||||||
@ -335,7 +335,7 @@ func (lb *LoadBalancerRR) CleanupStaleStickySessions(svcPort proxy.ServicePortNa
|
|||||||
}
|
}
|
||||||
for ip, affinity := range state.affinity.affinityMap {
|
for ip, affinity := range state.affinity.affinityMap {
|
||||||
if int(time.Since(affinity.lastUsed).Seconds()) >= state.affinity.ttlSeconds {
|
if int(time.Since(affinity.lastUsed).Seconds()) >= state.affinity.ttlSeconds {
|
||||||
klog.V(4).Infof("Removing client %s from affinityMap for service %q", affinity.clientIP, svcPort)
|
klog.V(4).InfoS("Removing client from affinityMap for service", "IP", affinity.clientIP, "servicePortName", svcPort)
|
||||||
delete(state.affinity.affinityMap, ip)
|
delete(state.affinity.affinityMap, ip)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user