From dd5f67d23c0396048b11998b585226f11e267829 Mon Sep 17 00:00:00 2001 From: jay vyas Date: Tue, 15 Jun 2021 18:46:18 -0400 Subject: [PATCH] Kube proxy for windows userspace, remove dns Mangling --- pkg/proxy/winuserspace/proxier.go | 2 - pkg/proxy/winuserspace/proxysocket.go | 351 ++------------------------ 2 files changed, 16 insertions(+), 337 deletions(-) diff --git a/pkg/proxy/winuserspace/proxier.go b/pkg/proxy/winuserspace/proxier.go index 5991d12aa8e..3a4dff95c1f 100644 --- a/pkg/proxy/winuserspace/proxier.go +++ b/pkg/proxy/winuserspace/proxier.go @@ -52,7 +52,6 @@ type serviceInfo struct { socket proxySocket timeout time.Duration activeClients *clientCache - dnsClients *dnsClientCache sessionAffinityType v1.ServiceAffinity } @@ -237,7 +236,6 @@ func (proxier *Proxier) addServicePortPortal(servicePortPortalName ServicePortPo socket: sock, timeout: timeout, activeClients: newClientCache(), - dnsClients: newDNSClientCache(), sessionAffinityType: v1.ServiceAffinityNone, // default } proxier.setServiceInfo(servicePortPortalName, si) diff --git a/pkg/proxy/winuserspace/proxysocket.go b/pkg/proxy/winuserspace/proxysocket.go index 12621cf5ec2..4ac2110d708 100644 --- a/pkg/proxy/winuserspace/proxysocket.go +++ b/pkg/proxy/winuserspace/proxysocket.go @@ -23,45 +23,13 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" - "github.com/miekg/dns" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/proxy" - "k8s.io/kubernetes/pkg/util/ipconfig" - "k8s.io/utils/exec" -) - -const ( - // Kubernetes DNS suffix search list - // TODO: Get DNS suffix search list from docker containers. - // --dns-search option doesn't work on Windows containers and has been - // fixed recently in docker. - - // Kubernetes cluster domain - clusterDomain = "cluster.local" - - // Kubernetes service domain - serviceDomain = "svc." + clusterDomain - - // Kubernetes default namespace domain - namespaceServiceDomain = "default." + serviceDomain - - // Kubernetes DNS service port name - dnsPortName = "dns" - - // DNS TYPE value A (a host address) - dnsTypeA uint16 = 0x01 - - // DNS TYPE value AAAA (a host IPv6 address) - dnsTypeAAAA uint16 = 0x1c - - // DNS CLASS value IN (the Internet) - dnsClassInternet uint16 = 0x01 ) // Abstraction over TCP/UDP sockets which are proxied. @@ -150,7 +118,7 @@ func tryConnect(service ServicePortPortalName, srcAddr net.Addr, protocol string } return outConn, nil } - return nil, fmt.Errorf("failed to connect to an endpoint.") + return nil, fmt.Errorf("failed to connect to an endpoint") } func (tcp *tcpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { @@ -239,277 +207,8 @@ func newClientCache() *clientCache { return &clientCache{clients: map[string]net.Conn{}} } -// DNS query client classified by address and QTYPE -type dnsClientQuery struct { - clientAddress string - dnsQType uint16 -} - -// Holds DNS client query, the value contains the index in DNS suffix search list, -// the original DNS message and length for the same client and QTYPE -type dnsClientCache struct { - mu sync.Mutex - clients map[dnsClientQuery]*dnsQueryState -} - -type dnsQueryState struct { - searchIndex int32 - msg *dns.Msg -} - -func newDNSClientCache() *dnsClientCache { - return &dnsClientCache{clients: map[dnsClientQuery]*dnsQueryState{}} -} - -func packetRequiresDNSSuffix(dnsType, dnsClass uint16) bool { - return (dnsType == dnsTypeA || dnsType == dnsTypeAAAA) && dnsClass == dnsClassInternet -} - -func isDNSService(portName string) bool { - return portName == dnsPortName -} - -func appendDNSSuffix(msg *dns.Msg, buffer []byte, length int, dnsSuffix string) (int, error) { - if msg == nil || len(msg.Question) == 0 { - return length, fmt.Errorf("DNS message parameter is invalid") - } - - // Save the original name since it will be reused for next iteration - origName := msg.Question[0].Name - if dnsSuffix != "" { - msg.Question[0].Name += dnsSuffix + "." - } - mbuf, err := msg.PackBuffer(buffer) - msg.Question[0].Name = origName - - if err != nil { - klog.Warningf("Unable to pack DNS packet. Error is: %v", err) - return length, err - } - - if &buffer[0] != &mbuf[0] { - return length, fmt.Errorf("Buffer is too small in packing DNS packet") - } - - return len(mbuf), nil -} - -func recoverDNSQuestion(origName string, msg *dns.Msg, buffer []byte, length int) (int, error) { - if msg == nil || len(msg.Question) == 0 { - return length, fmt.Errorf("DNS message parameter is invalid") - } - - if origName == msg.Question[0].Name { - return length, nil - } - - msg.Question[0].Name = origName - if len(msg.Answer) > 0 { - msg.Answer[0].Header().Name = origName - } - mbuf, err := msg.PackBuffer(buffer) - - if err != nil { - klog.Warningf("Unable to pack DNS packet. Error is: %v", err) - return length, err - } - - if &buffer[0] != &mbuf[0] { - return length, fmt.Errorf("Buffer is too small in packing DNS packet") - } - - return len(mbuf), nil -} - -func processUnpackedDNSQueryPacket( - dnsClients *dnsClientCache, - msg *dns.Msg, - host string, - dnsQType uint16, - buffer []byte, - length int, - dnsSearch []string) int { - if dnsSearch == nil || len(dnsSearch) == 0 { - klog.V(1).Infof("DNS search list is not initialized and is empty.") - return length - } - - // TODO: handle concurrent queries from a client - dnsClients.mu.Lock() - state, found := dnsClients.clients[dnsClientQuery{host, dnsQType}] - if !found { - state = &dnsQueryState{0, msg} - dnsClients.clients[dnsClientQuery{host, dnsQType}] = state - } - dnsClients.mu.Unlock() - - index := atomic.SwapInt32(&state.searchIndex, state.searchIndex+1) - // Also update message ID if the client retries due to previous query time out - state.msg.MsgHdr.Id = msg.MsgHdr.Id - - if index < 0 || index >= int32(len(dnsSearch)) { - klog.V(1).Infof("Search index %d is out of range.", index) - return length - } - - length, err := appendDNSSuffix(msg, buffer, length, dnsSearch[index]) - if err != nil { - klog.Errorf("Append DNS suffix failed: %v", err) - } - - return length -} - -func processUnpackedDNSResponsePacket( - svrConn net.Conn, - dnsClients *dnsClientCache, - msg *dns.Msg, - rcode int, - host string, - dnsQType uint16, - buffer []byte, - length int, - dnsSearch []string) (bool, int) { - var drop bool - var err error - if dnsSearch == nil || len(dnsSearch) == 0 { - klog.V(1).Infof("DNS search list is not initialized and is empty.") - return drop, length - } - - dnsClients.mu.Lock() - state, found := dnsClients.clients[dnsClientQuery{host, dnsQType}] - dnsClients.mu.Unlock() - - if found { - index := atomic.SwapInt32(&state.searchIndex, state.searchIndex+1) - if rcode != 0 && index >= 0 && index < int32(len(dnsSearch)) { - // If the response has failure and iteration through the search list has not - // reached the end, retry on behalf of the client using the original query message - drop = true - length, err = appendDNSSuffix(state.msg, buffer, length, dnsSearch[index]) - if err != nil { - klog.Errorf("Append DNS suffix failed: %v", err) - } - - _, err = svrConn.Write(buffer[0:length]) - if err != nil { - if !logTimeout(err) { - klog.Errorf("Write failed: %v", err) - } - } - } else { - length, err = recoverDNSQuestion(state.msg.Question[0].Name, msg, buffer, length) - if err != nil { - klog.Errorf("Recover DNS question failed: %v", err) - } - - dnsClients.mu.Lock() - delete(dnsClients.clients, dnsClientQuery{host, dnsQType}) - dnsClients.mu.Unlock() - } - } - - return drop, length -} - -func processDNSQueryPacket( - dnsClients *dnsClientCache, - cliAddr net.Addr, - buffer []byte, - length int, - dnsSearch []string) (int, error) { - msg := &dns.Msg{} - if err := msg.Unpack(buffer[:length]); err != nil { - klog.Warningf("Unable to unpack DNS packet. Error is: %v", err) - return length, err - } - - // Query - Response bit that specifies whether this message is a query (0) or a response (1). - if msg.MsgHdr.Response { - return length, fmt.Errorf("DNS packet should be a query message") - } - - // QDCOUNT - if len(msg.Question) != 1 { - klog.V(1).Infof("Number of entries in the question section of the DNS packet is: %d", len(msg.Question)) - klog.V(1).Infof("DNS suffix appending does not support more than one question.") - return length, nil - } - - // ANCOUNT, NSCOUNT, ARCOUNT - if len(msg.Answer) != 0 || len(msg.Ns) != 0 || len(msg.Extra) != 0 { - klog.V(1).Infof("DNS packet contains more than question section.") - return length, nil - } - - dnsQType := msg.Question[0].Qtype - dnsQClass := msg.Question[0].Qclass - if packetRequiresDNSSuffix(dnsQType, dnsQClass) { - host, _, err := net.SplitHostPort(cliAddr.String()) - if err != nil { - klog.V(1).Infof("Failed to get host from client address: %v", err) - host = cliAddr.String() - } - - length = processUnpackedDNSQueryPacket(dnsClients, msg, host, dnsQType, buffer, length, dnsSearch) - } - - return length, nil -} - -func processDNSResponsePacket( - svrConn net.Conn, - dnsClients *dnsClientCache, - cliAddr net.Addr, - buffer []byte, - length int, - dnsSearch []string) (bool, int, error) { - var drop bool - msg := &dns.Msg{} - if err := msg.Unpack(buffer[:length]); err != nil { - klog.Warningf("Unable to unpack DNS packet. Error is: %v", err) - return drop, length, err - } - - // Query - Response bit that specifies whether this message is a query (0) or a response (1). - if !msg.MsgHdr.Response { - return drop, length, fmt.Errorf("DNS packet should be a response message") - } - - // QDCOUNT - if len(msg.Question) != 1 { - klog.V(1).Infof("Number of entries in the response section of the DNS packet is: %d", len(msg.Answer)) - return drop, length, nil - } - - dnsQType := msg.Question[0].Qtype - dnsQClass := msg.Question[0].Qclass - if packetRequiresDNSSuffix(dnsQType, dnsQClass) { - host, _, err := net.SplitHostPort(cliAddr.String()) - if err != nil { - klog.V(1).Infof("Failed to get host from client address: %v", err) - host = cliAddr.String() - } - - drop, length = processUnpackedDNSResponsePacket(svrConn, dnsClients, msg, msg.MsgHdr.Rcode, host, dnsQType, buffer, length, dnsSearch) - } - - return drop, length, nil -} - func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) { var buffer [4096]byte // 4KiB should be enough for most whole-packets - var dnsSearch []string - if isDNSService(service.Port) { - dnsSearch = []string{"", namespaceServiceDomain, serviceDomain, clusterDomain} - execer := exec.New() - ipconfigInterface := ipconfig.New(execer) - suffixList, err := ipconfigInterface.GetDNSSuffixSearchList() - if err == nil { - dnsSearch = append(dnsSearch, suffixList...) - } - } for { if !myInfo.isAlive() { @@ -531,16 +230,8 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv break } - // If this is DNS query packet - if isDNSService(service.Port) { - n, err = processDNSQueryPacket(myInfo.dnsClients, cliAddr, buffer[:], n, dnsSearch) - if err != nil { - klog.Errorf("Process DNS query packet failed: %v", err) - } - } - // If this is a client we know already, reuse the connection and goroutine. - svrConn, err := udp.getBackendConn(myInfo.activeClients, myInfo.dnsClients, cliAddr, proxier, service, myInfo.timeout, dnsSearch) + svrConn, err := udp.getBackendConn(myInfo.activeClients, cliAddr, proxier, service, myInfo.timeout) if err != nil { continue } @@ -562,7 +253,7 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv } } -func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, dnsClients *dnsClientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) (net.Conn, error) { +func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortPortalName, timeout time.Duration) (net.Conn, error) { activeClients.mu.Lock() defer activeClients.mu.Unlock() @@ -581,17 +272,17 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, dnsClients return nil, err } activeClients.clients[cliAddr.String()] = svrConn - go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, dnsClients *dnsClientCache, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) { + go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, service ServicePortPortalName, timeout time.Duration) { defer runtime.HandleCrash() - udp.proxyClient(cliAddr, svrConn, activeClients, dnsClients, service, timeout, dnsSearch) - }(cliAddr, svrConn, activeClients, dnsClients, service, timeout, dnsSearch) + udp.proxyClient(cliAddr, svrConn, activeClients, service, timeout) + }(cliAddr, svrConn, activeClients, service, timeout) } return svrConn, nil } // This function is expected to be called as a goroutine. // TODO: Track and log bytes copied, like TCP -func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, dnsClients *dnsClientCache, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) { +func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, service ServicePortPortalName, timeout time.Duration) { defer svrConn.Close() var buffer [4096]byte for { @@ -603,27 +294,17 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ break } - drop := false - if isDNSService(service.Port) { - drop, n, err = processDNSResponsePacket(svrConn, dnsClients, cliAddr, buffer[:], n, dnsSearch) - if err != nil { - klog.Errorf("Process DNS response packet failed: %v", err) - } + err = svrConn.SetDeadline(time.Now().Add(timeout)) + if err != nil { + klog.Errorf("SetDeadline failed: %v", err) + break } - - if !drop { - err = svrConn.SetDeadline(time.Now().Add(timeout)) - if err != nil { - klog.Errorf("SetDeadline failed: %v", err) - break - } - _, err = udp.WriteTo(buffer[0:n], cliAddr) - if err != nil { - if !logTimeout(err) { - klog.Errorf("WriteTo failed: %v", err) - } - break + _, err = udp.WriteTo(buffer[0:n], cliAddr) + if err != nil { + if !logTimeout(err) { + klog.Errorf("WriteTo failed: %v", err) } + break } } activeClients.mu.Lock()