mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 20:53:33 +00:00
Kube proxy for windows userspace, remove dns Mangling
This commit is contained in:
parent
3a47ddccdf
commit
dd5f67d23c
@ -52,7 +52,6 @@ type serviceInfo struct {
|
||||
socket proxySocket
|
||||
timeout time.Duration
|
||||
activeClients *clientCache
|
||||
dnsClients *dnsClientCache
|
||||
sessionAffinityType v1.ServiceAffinity
|
||||
}
|
||||
|
||||
@ -237,7 +236,6 @@ func (proxier *Proxier) addServicePortPortal(servicePortPortalName ServicePortPo
|
||||
socket: sock,
|
||||
timeout: timeout,
|
||||
activeClients: newClientCache(),
|
||||
dnsClients: newDNSClientCache(),
|
||||
sessionAffinityType: v1.ServiceAffinityNone, // default
|
||||
}
|
||||
proxier.setServiceInfo(servicePortPortalName, si)
|
||||
|
@ -23,45 +23,13 @@ import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/proxy"
|
||||
"k8s.io/kubernetes/pkg/util/ipconfig"
|
||||
"k8s.io/utils/exec"
|
||||
)
|
||||
|
||||
const (
|
||||
// Kubernetes DNS suffix search list
|
||||
// TODO: Get DNS suffix search list from docker containers.
|
||||
// --dns-search option doesn't work on Windows containers and has been
|
||||
// fixed recently in docker.
|
||||
|
||||
// Kubernetes cluster domain
|
||||
clusterDomain = "cluster.local"
|
||||
|
||||
// Kubernetes service domain
|
||||
serviceDomain = "svc." + clusterDomain
|
||||
|
||||
// Kubernetes default namespace domain
|
||||
namespaceServiceDomain = "default." + serviceDomain
|
||||
|
||||
// Kubernetes DNS service port name
|
||||
dnsPortName = "dns"
|
||||
|
||||
// DNS TYPE value A (a host address)
|
||||
dnsTypeA uint16 = 0x01
|
||||
|
||||
// DNS TYPE value AAAA (a host IPv6 address)
|
||||
dnsTypeAAAA uint16 = 0x1c
|
||||
|
||||
// DNS CLASS value IN (the Internet)
|
||||
dnsClassInternet uint16 = 0x01
|
||||
)
|
||||
|
||||
// Abstraction over TCP/UDP sockets which are proxied.
|
||||
@ -150,7 +118,7 @@ func tryConnect(service ServicePortPortalName, srcAddr net.Addr, protocol string
|
||||
}
|
||||
return outConn, nil
|
||||
}
|
||||
return nil, fmt.Errorf("failed to connect to an endpoint.")
|
||||
return nil, fmt.Errorf("failed to connect to an endpoint")
|
||||
}
|
||||
|
||||
func (tcp *tcpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) {
|
||||
@ -239,277 +207,8 @@ func newClientCache() *clientCache {
|
||||
return &clientCache{clients: map[string]net.Conn{}}
|
||||
}
|
||||
|
||||
// DNS query client classified by address and QTYPE
|
||||
type dnsClientQuery struct {
|
||||
clientAddress string
|
||||
dnsQType uint16
|
||||
}
|
||||
|
||||
// Holds DNS client query, the value contains the index in DNS suffix search list,
|
||||
// the original DNS message and length for the same client and QTYPE
|
||||
type dnsClientCache struct {
|
||||
mu sync.Mutex
|
||||
clients map[dnsClientQuery]*dnsQueryState
|
||||
}
|
||||
|
||||
type dnsQueryState struct {
|
||||
searchIndex int32
|
||||
msg *dns.Msg
|
||||
}
|
||||
|
||||
func newDNSClientCache() *dnsClientCache {
|
||||
return &dnsClientCache{clients: map[dnsClientQuery]*dnsQueryState{}}
|
||||
}
|
||||
|
||||
func packetRequiresDNSSuffix(dnsType, dnsClass uint16) bool {
|
||||
return (dnsType == dnsTypeA || dnsType == dnsTypeAAAA) && dnsClass == dnsClassInternet
|
||||
}
|
||||
|
||||
func isDNSService(portName string) bool {
|
||||
return portName == dnsPortName
|
||||
}
|
||||
|
||||
func appendDNSSuffix(msg *dns.Msg, buffer []byte, length int, dnsSuffix string) (int, error) {
|
||||
if msg == nil || len(msg.Question) == 0 {
|
||||
return length, fmt.Errorf("DNS message parameter is invalid")
|
||||
}
|
||||
|
||||
// Save the original name since it will be reused for next iteration
|
||||
origName := msg.Question[0].Name
|
||||
if dnsSuffix != "" {
|
||||
msg.Question[0].Name += dnsSuffix + "."
|
||||
}
|
||||
mbuf, err := msg.PackBuffer(buffer)
|
||||
msg.Question[0].Name = origName
|
||||
|
||||
if err != nil {
|
||||
klog.Warningf("Unable to pack DNS packet. Error is: %v", err)
|
||||
return length, err
|
||||
}
|
||||
|
||||
if &buffer[0] != &mbuf[0] {
|
||||
return length, fmt.Errorf("Buffer is too small in packing DNS packet")
|
||||
}
|
||||
|
||||
return len(mbuf), nil
|
||||
}
|
||||
|
||||
func recoverDNSQuestion(origName string, msg *dns.Msg, buffer []byte, length int) (int, error) {
|
||||
if msg == nil || len(msg.Question) == 0 {
|
||||
return length, fmt.Errorf("DNS message parameter is invalid")
|
||||
}
|
||||
|
||||
if origName == msg.Question[0].Name {
|
||||
return length, nil
|
||||
}
|
||||
|
||||
msg.Question[0].Name = origName
|
||||
if len(msg.Answer) > 0 {
|
||||
msg.Answer[0].Header().Name = origName
|
||||
}
|
||||
mbuf, err := msg.PackBuffer(buffer)
|
||||
|
||||
if err != nil {
|
||||
klog.Warningf("Unable to pack DNS packet. Error is: %v", err)
|
||||
return length, err
|
||||
}
|
||||
|
||||
if &buffer[0] != &mbuf[0] {
|
||||
return length, fmt.Errorf("Buffer is too small in packing DNS packet")
|
||||
}
|
||||
|
||||
return len(mbuf), nil
|
||||
}
|
||||
|
||||
func processUnpackedDNSQueryPacket(
|
||||
dnsClients *dnsClientCache,
|
||||
msg *dns.Msg,
|
||||
host string,
|
||||
dnsQType uint16,
|
||||
buffer []byte,
|
||||
length int,
|
||||
dnsSearch []string) int {
|
||||
if dnsSearch == nil || len(dnsSearch) == 0 {
|
||||
klog.V(1).Infof("DNS search list is not initialized and is empty.")
|
||||
return length
|
||||
}
|
||||
|
||||
// TODO: handle concurrent queries from a client
|
||||
dnsClients.mu.Lock()
|
||||
state, found := dnsClients.clients[dnsClientQuery{host, dnsQType}]
|
||||
if !found {
|
||||
state = &dnsQueryState{0, msg}
|
||||
dnsClients.clients[dnsClientQuery{host, dnsQType}] = state
|
||||
}
|
||||
dnsClients.mu.Unlock()
|
||||
|
||||
index := atomic.SwapInt32(&state.searchIndex, state.searchIndex+1)
|
||||
// Also update message ID if the client retries due to previous query time out
|
||||
state.msg.MsgHdr.Id = msg.MsgHdr.Id
|
||||
|
||||
if index < 0 || index >= int32(len(dnsSearch)) {
|
||||
klog.V(1).Infof("Search index %d is out of range.", index)
|
||||
return length
|
||||
}
|
||||
|
||||
length, err := appendDNSSuffix(msg, buffer, length, dnsSearch[index])
|
||||
if err != nil {
|
||||
klog.Errorf("Append DNS suffix failed: %v", err)
|
||||
}
|
||||
|
||||
return length
|
||||
}
|
||||
|
||||
func processUnpackedDNSResponsePacket(
|
||||
svrConn net.Conn,
|
||||
dnsClients *dnsClientCache,
|
||||
msg *dns.Msg,
|
||||
rcode int,
|
||||
host string,
|
||||
dnsQType uint16,
|
||||
buffer []byte,
|
||||
length int,
|
||||
dnsSearch []string) (bool, int) {
|
||||
var drop bool
|
||||
var err error
|
||||
if dnsSearch == nil || len(dnsSearch) == 0 {
|
||||
klog.V(1).Infof("DNS search list is not initialized and is empty.")
|
||||
return drop, length
|
||||
}
|
||||
|
||||
dnsClients.mu.Lock()
|
||||
state, found := dnsClients.clients[dnsClientQuery{host, dnsQType}]
|
||||
dnsClients.mu.Unlock()
|
||||
|
||||
if found {
|
||||
index := atomic.SwapInt32(&state.searchIndex, state.searchIndex+1)
|
||||
if rcode != 0 && index >= 0 && index < int32(len(dnsSearch)) {
|
||||
// If the response has failure and iteration through the search list has not
|
||||
// reached the end, retry on behalf of the client using the original query message
|
||||
drop = true
|
||||
length, err = appendDNSSuffix(state.msg, buffer, length, dnsSearch[index])
|
||||
if err != nil {
|
||||
klog.Errorf("Append DNS suffix failed: %v", err)
|
||||
}
|
||||
|
||||
_, err = svrConn.Write(buffer[0:length])
|
||||
if err != nil {
|
||||
if !logTimeout(err) {
|
||||
klog.Errorf("Write failed: %v", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
length, err = recoverDNSQuestion(state.msg.Question[0].Name, msg, buffer, length)
|
||||
if err != nil {
|
||||
klog.Errorf("Recover DNS question failed: %v", err)
|
||||
}
|
||||
|
||||
dnsClients.mu.Lock()
|
||||
delete(dnsClients.clients, dnsClientQuery{host, dnsQType})
|
||||
dnsClients.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
return drop, length
|
||||
}
|
||||
|
||||
func processDNSQueryPacket(
|
||||
dnsClients *dnsClientCache,
|
||||
cliAddr net.Addr,
|
||||
buffer []byte,
|
||||
length int,
|
||||
dnsSearch []string) (int, error) {
|
||||
msg := &dns.Msg{}
|
||||
if err := msg.Unpack(buffer[:length]); err != nil {
|
||||
klog.Warningf("Unable to unpack DNS packet. Error is: %v", err)
|
||||
return length, err
|
||||
}
|
||||
|
||||
// Query - Response bit that specifies whether this message is a query (0) or a response (1).
|
||||
if msg.MsgHdr.Response {
|
||||
return length, fmt.Errorf("DNS packet should be a query message")
|
||||
}
|
||||
|
||||
// QDCOUNT
|
||||
if len(msg.Question) != 1 {
|
||||
klog.V(1).Infof("Number of entries in the question section of the DNS packet is: %d", len(msg.Question))
|
||||
klog.V(1).Infof("DNS suffix appending does not support more than one question.")
|
||||
return length, nil
|
||||
}
|
||||
|
||||
// ANCOUNT, NSCOUNT, ARCOUNT
|
||||
if len(msg.Answer) != 0 || len(msg.Ns) != 0 || len(msg.Extra) != 0 {
|
||||
klog.V(1).Infof("DNS packet contains more than question section.")
|
||||
return length, nil
|
||||
}
|
||||
|
||||
dnsQType := msg.Question[0].Qtype
|
||||
dnsQClass := msg.Question[0].Qclass
|
||||
if packetRequiresDNSSuffix(dnsQType, dnsQClass) {
|
||||
host, _, err := net.SplitHostPort(cliAddr.String())
|
||||
if err != nil {
|
||||
klog.V(1).Infof("Failed to get host from client address: %v", err)
|
||||
host = cliAddr.String()
|
||||
}
|
||||
|
||||
length = processUnpackedDNSQueryPacket(dnsClients, msg, host, dnsQType, buffer, length, dnsSearch)
|
||||
}
|
||||
|
||||
return length, nil
|
||||
}
|
||||
|
||||
func processDNSResponsePacket(
|
||||
svrConn net.Conn,
|
||||
dnsClients *dnsClientCache,
|
||||
cliAddr net.Addr,
|
||||
buffer []byte,
|
||||
length int,
|
||||
dnsSearch []string) (bool, int, error) {
|
||||
var drop bool
|
||||
msg := &dns.Msg{}
|
||||
if err := msg.Unpack(buffer[:length]); err != nil {
|
||||
klog.Warningf("Unable to unpack DNS packet. Error is: %v", err)
|
||||
return drop, length, err
|
||||
}
|
||||
|
||||
// Query - Response bit that specifies whether this message is a query (0) or a response (1).
|
||||
if !msg.MsgHdr.Response {
|
||||
return drop, length, fmt.Errorf("DNS packet should be a response message")
|
||||
}
|
||||
|
||||
// QDCOUNT
|
||||
if len(msg.Question) != 1 {
|
||||
klog.V(1).Infof("Number of entries in the response section of the DNS packet is: %d", len(msg.Answer))
|
||||
return drop, length, nil
|
||||
}
|
||||
|
||||
dnsQType := msg.Question[0].Qtype
|
||||
dnsQClass := msg.Question[0].Qclass
|
||||
if packetRequiresDNSSuffix(dnsQType, dnsQClass) {
|
||||
host, _, err := net.SplitHostPort(cliAddr.String())
|
||||
if err != nil {
|
||||
klog.V(1).Infof("Failed to get host from client address: %v", err)
|
||||
host = cliAddr.String()
|
||||
}
|
||||
|
||||
drop, length = processUnpackedDNSResponsePacket(svrConn, dnsClients, msg, msg.MsgHdr.Rcode, host, dnsQType, buffer, length, dnsSearch)
|
||||
}
|
||||
|
||||
return drop, length, nil
|
||||
}
|
||||
|
||||
func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serviceInfo, proxier *Proxier) {
|
||||
var buffer [4096]byte // 4KiB should be enough for most whole-packets
|
||||
var dnsSearch []string
|
||||
if isDNSService(service.Port) {
|
||||
dnsSearch = []string{"", namespaceServiceDomain, serviceDomain, clusterDomain}
|
||||
execer := exec.New()
|
||||
ipconfigInterface := ipconfig.New(execer)
|
||||
suffixList, err := ipconfigInterface.GetDNSSuffixSearchList()
|
||||
if err == nil {
|
||||
dnsSearch = append(dnsSearch, suffixList...)
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
if !myInfo.isAlive() {
|
||||
@ -531,16 +230,8 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv
|
||||
break
|
||||
}
|
||||
|
||||
// If this is DNS query packet
|
||||
if isDNSService(service.Port) {
|
||||
n, err = processDNSQueryPacket(myInfo.dnsClients, cliAddr, buffer[:], n, dnsSearch)
|
||||
if err != nil {
|
||||
klog.Errorf("Process DNS query packet failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// If this is a client we know already, reuse the connection and goroutine.
|
||||
svrConn, err := udp.getBackendConn(myInfo.activeClients, myInfo.dnsClients, cliAddr, proxier, service, myInfo.timeout, dnsSearch)
|
||||
svrConn, err := udp.getBackendConn(myInfo.activeClients, cliAddr, proxier, service, myInfo.timeout)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@ -562,7 +253,7 @@ func (udp *udpProxySocket) ProxyLoop(service ServicePortPortalName, myInfo *serv
|
||||
}
|
||||
}
|
||||
|
||||
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, dnsClients *dnsClientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) (net.Conn, error) {
|
||||
func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, cliAddr net.Addr, proxier *Proxier, service ServicePortPortalName, timeout time.Duration) (net.Conn, error) {
|
||||
activeClients.mu.Lock()
|
||||
defer activeClients.mu.Unlock()
|
||||
|
||||
@ -581,17 +272,17 @@ func (udp *udpProxySocket) getBackendConn(activeClients *clientCache, dnsClients
|
||||
return nil, err
|
||||
}
|
||||
activeClients.clients[cliAddr.String()] = svrConn
|
||||
go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, dnsClients *dnsClientCache, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) {
|
||||
go func(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, service ServicePortPortalName, timeout time.Duration) {
|
||||
defer runtime.HandleCrash()
|
||||
udp.proxyClient(cliAddr, svrConn, activeClients, dnsClients, service, timeout, dnsSearch)
|
||||
}(cliAddr, svrConn, activeClients, dnsClients, service, timeout, dnsSearch)
|
||||
udp.proxyClient(cliAddr, svrConn, activeClients, service, timeout)
|
||||
}(cliAddr, svrConn, activeClients, service, timeout)
|
||||
}
|
||||
return svrConn, nil
|
||||
}
|
||||
|
||||
// This function is expected to be called as a goroutine.
|
||||
// TODO: Track and log bytes copied, like TCP
|
||||
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, dnsClients *dnsClientCache, service ServicePortPortalName, timeout time.Duration, dnsSearch []string) {
|
||||
func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activeClients *clientCache, service ServicePortPortalName, timeout time.Duration) {
|
||||
defer svrConn.Close()
|
||||
var buffer [4096]byte
|
||||
for {
|
||||
@ -603,27 +294,17 @@ func (udp *udpProxySocket) proxyClient(cliAddr net.Addr, svrConn net.Conn, activ
|
||||
break
|
||||
}
|
||||
|
||||
drop := false
|
||||
if isDNSService(service.Port) {
|
||||
drop, n, err = processDNSResponsePacket(svrConn, dnsClients, cliAddr, buffer[:], n, dnsSearch)
|
||||
if err != nil {
|
||||
klog.Errorf("Process DNS response packet failed: %v", err)
|
||||
}
|
||||
err = svrConn.SetDeadline(time.Now().Add(timeout))
|
||||
if err != nil {
|
||||
klog.Errorf("SetDeadline failed: %v", err)
|
||||
break
|
||||
}
|
||||
|
||||
if !drop {
|
||||
err = svrConn.SetDeadline(time.Now().Add(timeout))
|
||||
if err != nil {
|
||||
klog.Errorf("SetDeadline failed: %v", err)
|
||||
break
|
||||
}
|
||||
_, err = udp.WriteTo(buffer[0:n], cliAddr)
|
||||
if err != nil {
|
||||
if !logTimeout(err) {
|
||||
klog.Errorf("WriteTo failed: %v", err)
|
||||
}
|
||||
break
|
||||
_, err = udp.WriteTo(buffer[0:n], cliAddr)
|
||||
if err != nil {
|
||||
if !logTimeout(err) {
|
||||
klog.Errorf("WriteTo failed: %v", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
activeClients.mu.Lock()
|
||||
|
Loading…
Reference in New Issue
Block a user