mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #27708 from nikhiljindal/dnsHealthCheck
Automatic merge from submit-queue federation: Updating KubeDNS to try finding a local service first for federation query Ref https://github.com/kubernetes/kubernetes/issues/26762 Updating KubeDNS to try to find a local service first for federation query. Without this change, KubeDNS always returns the DNS hostname, even if a local service exists. Have updated the code to first remove federation name from path if it exists, so that the default search for local service happens. If we dont find a local service, then we try to find the DNS hostname. Will appreciate a strong review since this is my first change to KubeDNS. https://github.com/kubernetes/kubernetes/pull/25727 was the original PR that added federation support to KubeDNS. cc @kubernetes/sig-cluster-federation @quinton-hoole @madhusudancs @bprashanth @mml
This commit is contained in:
commit
a27fd4b01e
@ -6,3 +6,6 @@
|
|||||||
|
|
||||||
## Version 1.4 (Tue June 21 2016 Nikhil Jindal <nikhiljindal@google.com>)
|
## Version 1.4 (Tue June 21 2016 Nikhil Jindal <nikhiljindal@google.com>)
|
||||||
- Initialising nodesStore (issue #27820)
|
- Initialising nodesStore (issue #27820)
|
||||||
|
|
||||||
|
## Version 1.5 (Thu June 23 2016 Nikhil Jindal <nikhiljindal@google.com>)
|
||||||
|
- Adding support to return local service (pr #27708)
|
||||||
|
@ -17,12 +17,12 @@
|
|||||||
# If you update this image please bump the tag value before pushing.
|
# If you update this image please bump the tag value before pushing.
|
||||||
#
|
#
|
||||||
# Usage:
|
# Usage:
|
||||||
# [ARCH=amd64] [TAG=1.4] [REGISTRY=gcr.io/google_containers] [BASEIMAGE=busybox] make (container|push)
|
# [ARCH=amd64] [TAG=1.5] [REGISTRY=gcr.io/google_containers] [BASEIMAGE=busybox] make (container|push)
|
||||||
|
|
||||||
# Default registry, arch and tag. This can be overwritten by arguments to make
|
# Default registry, arch and tag. This can be overwritten by arguments to make
|
||||||
PLATFORM?=linux
|
PLATFORM?=linux
|
||||||
ARCH?=amd64
|
ARCH?=amd64
|
||||||
TAG?=1.4
|
TAG?=1.5
|
||||||
REGISTRY?=gcr.io/google_containers
|
REGISTRY?=gcr.io/google_containers
|
||||||
|
|
||||||
GOLANG_VERSION=1.6
|
GOLANG_VERSION=1.6
|
||||||
|
@ -1,27 +1,27 @@
|
|||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: ReplicationController
|
kind: ReplicationController
|
||||||
metadata:
|
metadata:
|
||||||
name: kube-dns-v14
|
name: kube-dns-v15
|
||||||
namespace: kube-system
|
namespace: kube-system
|
||||||
labels:
|
labels:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v14
|
version: v15
|
||||||
kubernetes.io/cluster-service: "true"
|
kubernetes.io/cluster-service: "true"
|
||||||
spec:
|
spec:
|
||||||
replicas: ${DNS_REPLICAS}
|
replicas: ${DNS_REPLICAS}
|
||||||
selector:
|
selector:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v14
|
version: v15
|
||||||
template:
|
template:
|
||||||
metadata:
|
metadata:
|
||||||
labels:
|
labels:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v14
|
version: v15
|
||||||
kubernetes.io/cluster-service: "true"
|
kubernetes.io/cluster-service: "true"
|
||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: kubedns
|
- name: kubedns
|
||||||
image: gcr.io/google_containers/kubedns-amd64:1.3
|
image: gcr.io/google_containers/kubedns-amd64:1.5
|
||||||
resources:
|
resources:
|
||||||
# TODO: Set memory limits when we've profiled the container for large
|
# TODO: Set memory limits when we've profiled the container for large
|
||||||
# clusters, then set request = limit to keep this container in
|
# clusters, then set request = limit to keep this container in
|
||||||
|
@ -21,27 +21,27 @@
|
|||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: ReplicationController
|
kind: ReplicationController
|
||||||
metadata:
|
metadata:
|
||||||
name: kube-dns-v16
|
name: kube-dns-v17
|
||||||
namespace: kube-system
|
namespace: kube-system
|
||||||
labels:
|
labels:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v16
|
version: v17
|
||||||
kubernetes.io/cluster-service: "true"
|
kubernetes.io/cluster-service: "true"
|
||||||
spec:
|
spec:
|
||||||
replicas: __PILLAR__DNS__REPLICAS__
|
replicas: __PILLAR__DNS__REPLICAS__
|
||||||
selector:
|
selector:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v16
|
version: v17
|
||||||
template:
|
template:
|
||||||
metadata:
|
metadata:
|
||||||
labels:
|
labels:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v16
|
version: v17
|
||||||
kubernetes.io/cluster-service: "true"
|
kubernetes.io/cluster-service: "true"
|
||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: kubedns
|
- name: kubedns
|
||||||
image: gcr.io/google_containers/kubedns-amd64:1.4
|
image: gcr.io/google_containers/kubedns-amd64:1.5
|
||||||
resources:
|
resources:
|
||||||
# TODO: Set memory limits when we've profiled the container for large
|
# TODO: Set memory limits when we've profiled the container for large
|
||||||
# clusters, then set request = limit to keep this container in
|
# clusters, then set request = limit to keep this container in
|
||||||
|
@ -21,27 +21,27 @@
|
|||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: ReplicationController
|
kind: ReplicationController
|
||||||
metadata:
|
metadata:
|
||||||
name: kube-dns-v16
|
name: kube-dns-v17
|
||||||
namespace: kube-system
|
namespace: kube-system
|
||||||
labels:
|
labels:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v16
|
version: v17
|
||||||
kubernetes.io/cluster-service: "true"
|
kubernetes.io/cluster-service: "true"
|
||||||
spec:
|
spec:
|
||||||
replicas: {{ pillar['dns_replicas'] }}
|
replicas: {{ pillar['dns_replicas'] }}
|
||||||
selector:
|
selector:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v16
|
version: v17
|
||||||
template:
|
template:
|
||||||
metadata:
|
metadata:
|
||||||
labels:
|
labels:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v16
|
version: v17
|
||||||
kubernetes.io/cluster-service: "true"
|
kubernetes.io/cluster-service: "true"
|
||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: kubedns
|
- name: kubedns
|
||||||
image: gcr.io/google_containers/kubedns-amd64:1.4
|
image: gcr.io/google_containers/kubedns-amd64:1.5
|
||||||
resources:
|
resources:
|
||||||
# TODO: Set memory limits when we've profiled the container for large
|
# TODO: Set memory limits when we've profiled the container for large
|
||||||
# clusters, then set request = limit to keep this container in
|
# clusters, then set request = limit to keep this container in
|
||||||
|
@ -21,27 +21,27 @@
|
|||||||
apiVersion: v1
|
apiVersion: v1
|
||||||
kind: ReplicationController
|
kind: ReplicationController
|
||||||
metadata:
|
metadata:
|
||||||
name: kube-dns-v16
|
name: kube-dns-v17
|
||||||
namespace: kube-system
|
namespace: kube-system
|
||||||
labels:
|
labels:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v16
|
version: v17
|
||||||
kubernetes.io/cluster-service: "true"
|
kubernetes.io/cluster-service: "true"
|
||||||
spec:
|
spec:
|
||||||
replicas: $DNS_REPLICAS
|
replicas: $DNS_REPLICAS
|
||||||
selector:
|
selector:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v16
|
version: v17
|
||||||
template:
|
template:
|
||||||
metadata:
|
metadata:
|
||||||
labels:
|
labels:
|
||||||
k8s-app: kube-dns
|
k8s-app: kube-dns
|
||||||
version: v16
|
version: v17
|
||||||
kubernetes.io/cluster-service: "true"
|
kubernetes.io/cluster-service: "true"
|
||||||
spec:
|
spec:
|
||||||
containers:
|
containers:
|
||||||
- name: kubedns
|
- name: kubedns
|
||||||
image: gcr.io/google_containers/kubedns-amd64:1.4
|
image: gcr.io/google_containers/kubedns-amd64:1.5
|
||||||
resources:
|
resources:
|
||||||
# TODO: Set memory limits when we've profiled the container for large
|
# TODO: Set memory limits when we've profiled the container for large
|
||||||
# clusters, then set request = limit to keep this container in
|
# clusters, then set request = limit to keep this container in
|
||||||
|
@ -53,7 +53,10 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
|
|||||||
}
|
}
|
||||||
ks.healthzPort = config.HealthzPort
|
ks.healthzPort = config.HealthzPort
|
||||||
ks.dnsPort = config.DNSPort
|
ks.dnsPort = config.DNSPort
|
||||||
ks.kd = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations)
|
ks.kd, err = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations)
|
||||||
|
if err != nil {
|
||||||
|
glog.Fatalf("Failed to start kubeDNS: %v", err)
|
||||||
|
}
|
||||||
return &ks
|
return &ks
|
||||||
}
|
}
|
||||||
|
|
||||||
|
154
pkg/dns/dns.go
154
pkg/dns/dns.go
@ -92,8 +92,15 @@ type KubeDNS struct {
|
|||||||
// A Records and SRV Records for (regular) services and headless Services.
|
// A Records and SRV Records for (regular) services and headless Services.
|
||||||
cache *TreeCache
|
cache *TreeCache
|
||||||
|
|
||||||
|
// TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap.
|
||||||
reverseRecordMap map[string]*skymsg.Service
|
reverseRecordMap map[string]*skymsg.Service
|
||||||
|
|
||||||
|
// Map of cluster IP to service object. Headless services are not part of this map.
|
||||||
|
// Used to get a service when given its cluster IP.
|
||||||
|
// Access to this is coordinated using cacheLock. We use the same lock for cache and this map
|
||||||
|
// to ensure that they dont get out of sync.
|
||||||
|
clusterIPServiceMap map[string]*kapi.Service
|
||||||
|
|
||||||
// caller is responsible for using the cacheLock before invoking methods on cache
|
// caller is responsible for using the cacheLock before invoking methods on cache
|
||||||
// the cache is not thread-safe, and the caller can guarantee thread safety by using
|
// the cache is not thread-safe, and the caller can guarantee thread safety by using
|
||||||
// the cacheLock
|
// the cacheLock
|
||||||
@ -119,20 +126,28 @@ type KubeDNS struct {
|
|||||||
nodesStore kcache.Store
|
nodesStore kcache.Store
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) *KubeDNS {
|
func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) (*KubeDNS, error) {
|
||||||
|
// Verify that federation names should not contain dots ('.')
|
||||||
|
// We can not allow dots since we use that as separator for path segments (svcname.nsname.fedname.svc.domain)
|
||||||
|
for key := range federations {
|
||||||
|
if strings.ContainsAny(key, ".") {
|
||||||
|
return nil, fmt.Errorf("invalid federation name: %s, cannot have '.'", key)
|
||||||
|
}
|
||||||
|
}
|
||||||
kd := &KubeDNS{
|
kd := &KubeDNS{
|
||||||
kubeClient: client,
|
kubeClient: client,
|
||||||
domain: domain,
|
domain: domain,
|
||||||
cache: NewTreeCache(),
|
cache: NewTreeCache(),
|
||||||
cacheLock: sync.RWMutex{},
|
cacheLock: sync.RWMutex{},
|
||||||
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
|
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
|
||||||
reverseRecordMap: make(map[string]*skymsg.Service),
|
reverseRecordMap: make(map[string]*skymsg.Service),
|
||||||
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
|
clusterIPServiceMap: make(map[string]*kapi.Service),
|
||||||
federations: federations,
|
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
|
||||||
|
federations: federations,
|
||||||
}
|
}
|
||||||
kd.setEndpointsStore()
|
kd.setEndpointsStore()
|
||||||
kd.setServicesStore()
|
kd.setServicesStore()
|
||||||
return kd
|
return kd, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kd *KubeDNS) Start() {
|
func (kd *KubeDNS) Start() {
|
||||||
@ -245,6 +260,7 @@ func (kd *KubeDNS) removeService(obj interface{}) {
|
|||||||
defer kd.cacheLock.Unlock()
|
defer kd.cacheLock.Unlock()
|
||||||
kd.cache.deletePath(subCachePath...)
|
kd.cache.deletePath(subCachePath...)
|
||||||
delete(kd.reverseRecordMap, s.Spec.ClusterIP)
|
delete(kd.reverseRecordMap, s.Spec.ClusterIP)
|
||||||
|
delete(kd.clusterIPServiceMap, s.Spec.ClusterIP)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -319,6 +335,7 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) {
|
|||||||
defer kd.cacheLock.Unlock()
|
defer kd.cacheLock.Unlock()
|
||||||
kd.cache.setSubCache(service.Name, subCache, subCachePath...)
|
kd.cache.setSubCache(service.Name, subCache, subCachePath...)
|
||||||
kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord
|
kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord
|
||||||
|
kd.clusterIPServiceMap[service.Spec.ClusterIP] = service
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kapi.Service) error {
|
func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kapi.Service) error {
|
||||||
@ -422,7 +439,74 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
|
|||||||
glog.Infof("Received DNS Request:%s, exact:%v", name, exact)
|
glog.Infof("Received DNS Request:%s, exact:%v", name, exact)
|
||||||
trimmed := strings.TrimRight(name, ".")
|
trimmed := strings.TrimRight(name, ".")
|
||||||
segments := strings.Split(trimmed, ".")
|
segments := strings.Split(trimmed, ".")
|
||||||
|
isFederationQuery := false
|
||||||
|
federationSegments := []string{}
|
||||||
|
if !exact && kd.isFederationQuery(segments) {
|
||||||
|
glog.Infof("federation service query: Received federation query. Going to try to find local service first")
|
||||||
|
// Try quering the non-federation (local) service first.
|
||||||
|
// Will try the federation one later, if this fails.
|
||||||
|
isFederationQuery = true
|
||||||
|
federationSegments = append(federationSegments, segments...)
|
||||||
|
// To try local service, remove federation name from segments.
|
||||||
|
// Federation name is 3rd in the segment (after service name and namespace).
|
||||||
|
segments = append(segments[:2], segments[3:]...)
|
||||||
|
}
|
||||||
path := reverseArray(segments)
|
path := reverseArray(segments)
|
||||||
|
records, err := kd.getRecordsForPath(path, exact)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if !isFederationQuery {
|
||||||
|
if len(records) > 0 {
|
||||||
|
return records, nil
|
||||||
|
}
|
||||||
|
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
|
||||||
|
}
|
||||||
|
|
||||||
|
// For federation query, verify that the local service has endpoints.
|
||||||
|
validRecord := false
|
||||||
|
for _, val := range records {
|
||||||
|
// We know that a headless service has endpoints for sure if a record was returned for it.
|
||||||
|
// The record contains endpoint IPs. So nothing to check for headless services.
|
||||||
|
if !kd.isHeadlessServiceRecord(&val) {
|
||||||
|
ok, err := kd.serviceWithClusterIPHasEndpoints(&val)
|
||||||
|
if err != nil {
|
||||||
|
glog.Infof("federation service query: unexpected error while trying to find if service has endpoint: %v")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !ok {
|
||||||
|
glog.Infof("federation service query: skipping record since service has no endpoint: %v", val)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
validRecord = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if validRecord {
|
||||||
|
// There is a local service with valid endpoints, return its CNAME.
|
||||||
|
name := strings.Join(reverseArray(path), ".")
|
||||||
|
// Ensure that this name that we are returning as a CNAME response is a fully qualified
|
||||||
|
// domain name so that the client's resolver library doesn't have to go through its
|
||||||
|
// search list all over again.
|
||||||
|
if !strings.HasSuffix(name, ".") {
|
||||||
|
name = name + "."
|
||||||
|
}
|
||||||
|
glog.Infof("federation service query: Returning CNAME for local service : %s", name)
|
||||||
|
return []skymsg.Service{{Host: name}}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the name query is not an exact query and does not match any records in the local store,
|
||||||
|
// attempt to send a federation redirect (CNAME) response.
|
||||||
|
if !exact {
|
||||||
|
glog.Infof("federation service query: Did not find a local service. Trying federation redirect (CNAME) response")
|
||||||
|
return kd.federationRecords(reverseArray(federationSegments))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Service, error) {
|
||||||
|
retval := []skymsg.Service{}
|
||||||
if kd.isPodRecord(path) {
|
if kd.isPodRecord(path) {
|
||||||
ip, err := kd.getPodIP(path)
|
ip, err := kd.getPodIP(path)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -448,21 +532,50 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
|
|||||||
kd.cacheLock.RLock()
|
kd.cacheLock.RLock()
|
||||||
defer kd.cacheLock.RUnlock()
|
defer kd.cacheLock.RUnlock()
|
||||||
records := kd.cache.getValuesForPathWithWildcards(path...)
|
records := kd.cache.getValuesForPathWithWildcards(path...)
|
||||||
|
glog.V(2).Infof("Received %d records from cache", len(records))
|
||||||
for _, val := range records {
|
for _, val := range records {
|
||||||
retval = append(retval, *val)
|
retval = append(retval, *val)
|
||||||
}
|
}
|
||||||
glog.Infof("records:%v, retval:%v, path:%v", records, retval, path)
|
glog.Infof("records:%v, retval:%v, path:%v", records, retval, path)
|
||||||
if len(retval) > 0 {
|
return retval, nil
|
||||||
return retval, nil
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// If the name query is not an exact query and does not match any records in the local store,
|
// Returns true if the given record corresponds to a headless service.
|
||||||
// attempt to send a federation redirect (CNAME) response.
|
// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it.
|
||||||
if !exact {
|
// This is because the code will panic, if we try to acquire it again if we already have it.
|
||||||
return kd.federationRecords(path)
|
func (kd *KubeDNS) isHeadlessServiceRecord(msg *skymsg.Service) bool {
|
||||||
}
|
// If it is not a headless service, then msg.Host will be the cluster IP.
|
||||||
|
// So we can check if msg.host exists in our clusterIPServiceMap.
|
||||||
|
_, ok := kd.clusterIPServiceMap[msg.Host]
|
||||||
|
// It is headless service if no record was found.
|
||||||
|
return !ok
|
||||||
|
}
|
||||||
|
|
||||||
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
|
// Returns true if the service corresponding to the given message has endpoints.
|
||||||
|
// Note: Works only for services with ClusterIP. Will return an error for headless service (service without a clusterIP).
|
||||||
|
// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it.
|
||||||
|
// This is because the code will panic, if we try to acquire it again if we already have it.
|
||||||
|
func (kd *KubeDNS) serviceWithClusterIPHasEndpoints(msg *skymsg.Service) (bool, error) {
|
||||||
|
svc, ok := kd.clusterIPServiceMap[msg.Host]
|
||||||
|
if !ok {
|
||||||
|
// It is a headless service.
|
||||||
|
return false, fmt.Errorf("method not expected to be called for headless service")
|
||||||
|
}
|
||||||
|
key, err := kcache.MetaNamespaceKeyFunc(svc)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
e, exists, err := kd.endpointsStore.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to get endpoints object from endpoints store - %v", err)
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if e, ok := e.(*kapi.Endpoints); ok {
|
||||||
|
return len(e.Subsets) > 0, nil
|
||||||
|
}
|
||||||
|
return false, fmt.Errorf("unexpected: found non-endpoint object in endpoint store: %v", e)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ReverseRecords performs a reverse lookup for the given name.
|
// ReverseRecords performs a reverse lookup for the given name.
|
||||||
@ -558,6 +671,9 @@ func getSkyMsg(ip string, port int) (*skymsg.Service, string) {
|
|||||||
// 5. Fourth segment is exactly "svc"
|
// 5. Fourth segment is exactly "svc"
|
||||||
// 6. The remaining segments match kd.domainPath.
|
// 6. The remaining segments match kd.domainPath.
|
||||||
// 7. And federation must be one of the listed federations in the config.
|
// 7. And federation must be one of the listed federations in the config.
|
||||||
|
// Note: Because of the above conditions, this method will treat wildcard queries such as
|
||||||
|
// *.mysvc.myns.myfederation.svc.domain.path as non-federation queries.
|
||||||
|
// We can add support for wildcard queries later, if needed.
|
||||||
func (kd *KubeDNS) isFederationQuery(path []string) bool {
|
func (kd *KubeDNS) isFederationQuery(path []string) bool {
|
||||||
if len(path) == 4+len(kd.domainPath) &&
|
if len(path) == 4+len(kd.domainPath) &&
|
||||||
len(validation.IsDNS952Label(path[0])) == 0 &&
|
len(validation.IsDNS952Label(path[0])) == 0 &&
|
||||||
|
@ -46,18 +46,27 @@ const (
|
|||||||
|
|
||||||
func newKubeDNS() *KubeDNS {
|
func newKubeDNS() *KubeDNS {
|
||||||
kd := &KubeDNS{
|
kd := &KubeDNS{
|
||||||
domain: testDomain,
|
domain: testDomain,
|
||||||
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||||
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||||
cache: NewTreeCache(),
|
cache: NewTreeCache(),
|
||||||
reverseRecordMap: make(map[string]*skymsg.Service),
|
reverseRecordMap: make(map[string]*skymsg.Service),
|
||||||
cacheLock: sync.RWMutex{},
|
clusterIPServiceMap: make(map[string]*kapi.Service),
|
||||||
domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
|
cacheLock: sync.RWMutex{},
|
||||||
nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
|
||||||
|
nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||||
}
|
}
|
||||||
return kd
|
return kd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestNewKubeDNS(t *testing.T) {
|
||||||
|
// Verify that it returns an error for invalid federation names.
|
||||||
|
_, err := NewKubeDNS(nil, "domainName", map[string]string{"invalid.name.with.dot": "example.come"})
|
||||||
|
if err == nil {
|
||||||
|
t.Errorf("Expected an error due to invalid federation name")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestPodDns(t *testing.T) {
|
func TestPodDns(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
testPodIP = "1.2.3.4"
|
testPodIP = "1.2.3.4"
|
||||||
@ -350,6 +359,98 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
|
|||||||
assertNoDNSForHeadlessService(t, kd, service)
|
assertNoDNSForHeadlessService(t, kd, service)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Verifies that a single record with host "a" is returned for query "q".
|
||||||
|
func verifyRecord(q, a string, t *testing.T, kd *KubeDNS) {
|
||||||
|
records, err := kd.Records(q, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, 1, len(records))
|
||||||
|
assert.Equal(t, a, records[0].Host)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verifies that quering KubeDNS for a headless federation service returns the DNS hostname when a local service does not exist and returns the endpoint IP when a local service exists.
|
||||||
|
func TestFederationHeadlessService(t *testing.T) {
|
||||||
|
kd := newKubeDNS()
|
||||||
|
kd.federations = map[string]string{
|
||||||
|
"myfederation": "example.com",
|
||||||
|
}
|
||||||
|
kd.kubeClient = fake.NewSimpleClientset(newNodes())
|
||||||
|
|
||||||
|
// Verify that quering for federation service returns a federation domain name.
|
||||||
|
verifyRecord("testservice.default.myfederation.svc.cluster.local.",
|
||||||
|
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||||
|
t, kd)
|
||||||
|
|
||||||
|
// Add a local service without any endpoint.
|
||||||
|
s := newHeadlessService()
|
||||||
|
assert.NoError(t, kd.servicesStore.Add(s))
|
||||||
|
kd.newService(s)
|
||||||
|
|
||||||
|
// Verify that quering for federation service still returns the federation domain name.
|
||||||
|
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
|
||||||
|
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||||
|
t, kd)
|
||||||
|
|
||||||
|
// Now add an endpoint.
|
||||||
|
endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1"))
|
||||||
|
assert.NoError(t, kd.endpointsStore.Add(endpoints))
|
||||||
|
kd.updateService(s, s)
|
||||||
|
|
||||||
|
// Verify that quering for federation service returns the local service domain name this time.
|
||||||
|
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), "testservice.default.svc.cluster.local.", t, kd)
|
||||||
|
|
||||||
|
// Delete the endpoint.
|
||||||
|
endpoints.Subsets = []kapi.EndpointSubset{}
|
||||||
|
kd.handleEndpointAdd(endpoints)
|
||||||
|
kd.updateService(s, s)
|
||||||
|
|
||||||
|
// Verify that quering for federation service returns the federation domain name again.
|
||||||
|
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
|
||||||
|
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||||
|
t, kd)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verifies that quering KubeDNS for a federation service returns the DNS hostname if no endpoint exists and returns the local cluster IP if endpoints exist.
|
||||||
|
func TestFederationService(t *testing.T) {
|
||||||
|
kd := newKubeDNS()
|
||||||
|
kd.federations = map[string]string{
|
||||||
|
"myfederation": "example.com",
|
||||||
|
}
|
||||||
|
kd.kubeClient = fake.NewSimpleClientset(newNodes())
|
||||||
|
|
||||||
|
// Verify that quering for federation service returns the federation domain name.
|
||||||
|
verifyRecord("testservice.default.myfederation.svc.cluster.local.",
|
||||||
|
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||||
|
t, kd)
|
||||||
|
|
||||||
|
// Add a local service without any endpoint.
|
||||||
|
s := newService(testNamespace, testService, "1.2.3.4", "", 80)
|
||||||
|
assert.NoError(t, kd.servicesStore.Add(s))
|
||||||
|
kd.newService(s)
|
||||||
|
|
||||||
|
// Verify that quering for federation service still returns the federation domain name.
|
||||||
|
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
|
||||||
|
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||||
|
t, kd)
|
||||||
|
|
||||||
|
// Now add an endpoint.
|
||||||
|
endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1"))
|
||||||
|
assert.NoError(t, kd.endpointsStore.Add(endpoints))
|
||||||
|
kd.updateService(s, s)
|
||||||
|
|
||||||
|
// Verify that quering for federation service returns the local service domain name this time.
|
||||||
|
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), "testservice.default.svc.cluster.local.", t, kd)
|
||||||
|
|
||||||
|
// Remove the endpoint.
|
||||||
|
endpoints.Subsets = []kapi.EndpointSubset{}
|
||||||
|
kd.handleEndpointAdd(endpoints)
|
||||||
|
kd.updateService(s, s)
|
||||||
|
|
||||||
|
// Verify that quering for federation service returns the federation domain name again.
|
||||||
|
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
|
||||||
|
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
|
||||||
|
t, kd)
|
||||||
|
}
|
||||||
|
|
||||||
func TestFederationQueryWithoutCache(t *testing.T) {
|
func TestFederationQueryWithoutCache(t *testing.T) {
|
||||||
kd := newKubeDNS()
|
kd := newKubeDNS()
|
||||||
kd.federations = map[string]string{
|
kd.federations = map[string]string{
|
||||||
@ -397,10 +498,7 @@ func testValidFederationQueries(t *testing.T, kd *KubeDNS) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, query := range queries {
|
for _, query := range queries {
|
||||||
records, err := kd.Records(query.q, false)
|
verifyRecord(query.q, query.a, t, kd)
|
||||||
require.NoError(t, err)
|
|
||||||
assert.Equal(t, 1, len(records))
|
|
||||||
assert.Equal(t, query.a, records[0].Host)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -630,6 +728,10 @@ func getEquivalentQueries(serviceFQDN, namespace string) []string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getFederationServiceFQDN(kd *KubeDNS, s *kapi.Service, federationName string) string {
|
||||||
|
return fmt.Sprintf("%s.%s.%s.svc.%s", s.Name, s.Namespace, federationName, kd.domain)
|
||||||
|
}
|
||||||
|
|
||||||
func getServiceFQDN(kd *KubeDNS, s *kapi.Service) string {
|
func getServiceFQDN(kd *KubeDNS, s *kapi.Service) string {
|
||||||
return fmt.Sprintf("%s.%s.svc.%s", s.Name, s.Namespace, kd.domain)
|
return fmt.Sprintf("%s.%s.svc.%s", s.Name, s.Namespace, kd.domain)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user