|
|
@ -32,6 +32,7 @@ import (
|
|
|
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
|
|
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
|
|
|
kcache "k8s.io/kubernetes/pkg/client/cache"
|
|
|
|
kcache "k8s.io/kubernetes/pkg/client/cache"
|
|
|
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
|
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
|
|
|
|
|
|
|
"k8s.io/kubernetes/pkg/dns/treecache"
|
|
|
|
"k8s.io/kubernetes/pkg/dns/util"
|
|
|
|
"k8s.io/kubernetes/pkg/dns/util"
|
|
|
|
"k8s.io/kubernetes/pkg/runtime"
|
|
|
|
"k8s.io/kubernetes/pkg/runtime"
|
|
|
|
"k8s.io/kubernetes/pkg/util/validation"
|
|
|
|
"k8s.io/kubernetes/pkg/util/validation"
|
|
|
@ -78,7 +79,7 @@ type KubeDNS struct {
|
|
|
|
// stores DNS records for the domain.
|
|
|
|
// stores DNS records for the domain.
|
|
|
|
// A Records and SRV Records for (regular) services and headless Services.
|
|
|
|
// A Records and SRV Records for (regular) services and headless Services.
|
|
|
|
// CNAME Records for ExternalName Services.
|
|
|
|
// CNAME Records for ExternalName Services.
|
|
|
|
cache *TreeCache
|
|
|
|
cache treecache.TreeCache
|
|
|
|
|
|
|
|
|
|
|
|
// TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap.
|
|
|
|
// TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap.
|
|
|
|
reverseRecordMap map[string]*skymsg.Service
|
|
|
|
reverseRecordMap map[string]*skymsg.Service
|
|
|
@ -125,7 +126,7 @@ func NewKubeDNS(client clientset.Interface, domain string, federations map[strin
|
|
|
|
kd := &KubeDNS{
|
|
|
|
kd := &KubeDNS{
|
|
|
|
kubeClient: client,
|
|
|
|
kubeClient: client,
|
|
|
|
domain: domain,
|
|
|
|
domain: domain,
|
|
|
|
cache: NewTreeCache(),
|
|
|
|
cache: treecache.NewTreeCache(),
|
|
|
|
cacheLock: sync.RWMutex{},
|
|
|
|
cacheLock: sync.RWMutex{},
|
|
|
|
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
|
|
|
|
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
|
|
|
|
reverseRecordMap: make(map[string]*skymsg.Service),
|
|
|
|
reverseRecordMap: make(map[string]*skymsg.Service),
|
|
|
@ -135,34 +136,46 @@ func NewKubeDNS(client clientset.Interface, domain string, federations map[strin
|
|
|
|
}
|
|
|
|
}
|
|
|
|
kd.setEndpointsStore()
|
|
|
|
kd.setEndpointsStore()
|
|
|
|
kd.setServicesStore()
|
|
|
|
kd.setServicesStore()
|
|
|
|
|
|
|
|
|
|
|
|
return kd, nil
|
|
|
|
return kd, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (kd *KubeDNS) Start() {
|
|
|
|
func (kd *KubeDNS) Start() {
|
|
|
|
|
|
|
|
glog.V(2).Infof("Starting endpointsController")
|
|
|
|
go kd.endpointsController.Run(wait.NeverStop)
|
|
|
|
go kd.endpointsController.Run(wait.NeverStop)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
glog.V(2).Infof("Starting serviceController")
|
|
|
|
go kd.serviceController.Run(wait.NeverStop)
|
|
|
|
go kd.serviceController.Run(wait.NeverStop)
|
|
|
|
// Wait synchronously for the Kubernetes service and add a DNS record for it.
|
|
|
|
|
|
|
|
// This ensures that the Start function returns only after having received Service objects
|
|
|
|
// Wait synchronously for the Kubernetes service and add a DNS
|
|
|
|
// from APIServer.
|
|
|
|
// record for it. This ensures that the Start function returns only
|
|
|
|
// TODO: we might not have to wait for kubernetes service specifically. We should just wait
|
|
|
|
// after having received Service objects from APIServer.
|
|
|
|
// for a list operation to be complete from APIServer.
|
|
|
|
//
|
|
|
|
|
|
|
|
// TODO: we might not have to wait for kubernetes service
|
|
|
|
|
|
|
|
// specifically. We should just wait for a list operation to be
|
|
|
|
|
|
|
|
// complete from APIServer.
|
|
|
|
|
|
|
|
glog.V(2).Infof("Waiting for Kubernetes service")
|
|
|
|
kd.waitForKubernetesService()
|
|
|
|
kd.waitForKubernetesService()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) {
|
|
|
|
func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) {
|
|
|
|
name := fmt.Sprintf("%v/%v", kapi.NamespaceDefault, kubernetesSvcName)
|
|
|
|
name := fmt.Sprintf("%v/%v", kapi.NamespaceDefault, kubernetesSvcName)
|
|
|
|
glog.Infof("Waiting for service: %v", name)
|
|
|
|
glog.V(2).Infof("Waiting for service: %v", name)
|
|
|
|
var err error
|
|
|
|
var err error
|
|
|
|
servicePollInterval := 1 * time.Second
|
|
|
|
servicePollInterval := 1 * time.Second
|
|
|
|
|
|
|
|
|
|
|
|
for {
|
|
|
|
for {
|
|
|
|
svc, err = kd.kubeClient.Core().Services(kapi.NamespaceDefault).Get(kubernetesSvcName)
|
|
|
|
svc, err = kd.kubeClient.Core().Services(kapi.NamespaceDefault).Get(kubernetesSvcName)
|
|
|
|
if err != nil || svc == nil {
|
|
|
|
if err != nil || svc == nil {
|
|
|
|
glog.Infof("Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.", name, err, servicePollInterval)
|
|
|
|
glog.V(3).Infof(
|
|
|
|
|
|
|
|
"Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.",
|
|
|
|
|
|
|
|
name, err, servicePollInterval)
|
|
|
|
time.Sleep(servicePollInterval)
|
|
|
|
time.Sleep(servicePollInterval)
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
break
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -230,7 +243,9 @@ func assertIsService(obj interface{}) (*kapi.Service, bool) {
|
|
|
|
|
|
|
|
|
|
|
|
func (kd *KubeDNS) newService(obj interface{}) {
|
|
|
|
func (kd *KubeDNS) newService(obj interface{}) {
|
|
|
|
if service, ok := assertIsService(obj); ok {
|
|
|
|
if service, ok := assertIsService(obj); ok {
|
|
|
|
glog.V(4).Infof("Add/Updated for service %v", service.Name)
|
|
|
|
glog.V(2).Infof("New service: %v", service.Name)
|
|
|
|
|
|
|
|
glog.V(4).Infof("Service details: %v", service)
|
|
|
|
|
|
|
|
|
|
|
|
// ExternalName services are a special kind that return CNAME records
|
|
|
|
// ExternalName services are a special kind that return CNAME records
|
|
|
|
if service.Spec.Type == kapi.ServiceTypeExternalName {
|
|
|
|
if service.Spec.Type == kapi.ServiceTypeExternalName {
|
|
|
|
kd.newExternalNameService(service)
|
|
|
|
kd.newExternalNameService(service)
|
|
|
@ -242,7 +257,8 @@ func (kd *KubeDNS) newService(obj interface{}) {
|
|
|
|
return
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(service.Spec.Ports) == 0 {
|
|
|
|
if len(service.Spec.Ports) == 0 {
|
|
|
|
glog.Warningf("Unexpected service with no ports, this should not have happened: %v", service)
|
|
|
|
glog.Warningf("Service with no ports, this should not have happened: %v",
|
|
|
|
|
|
|
|
service)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
kd.newPortalService(service)
|
|
|
|
kd.newPortalService(service)
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -253,8 +269,11 @@ func (kd *KubeDNS) removeService(obj interface{}) {
|
|
|
|
subCachePath := append(kd.domainPath, serviceSubdomain, s.Namespace, s.Name)
|
|
|
|
subCachePath := append(kd.domainPath, serviceSubdomain, s.Namespace, s.Name)
|
|
|
|
kd.cacheLock.Lock()
|
|
|
|
kd.cacheLock.Lock()
|
|
|
|
defer kd.cacheLock.Unlock()
|
|
|
|
defer kd.cacheLock.Unlock()
|
|
|
|
success := kd.cache.deletePath(subCachePath...)
|
|
|
|
|
|
|
|
glog.V(2).Infof("Removing service %v at path %v. Success: ", s.Name, subCachePath, success)
|
|
|
|
success := kd.cache.DeletePath(subCachePath...)
|
|
|
|
|
|
|
|
glog.V(2).Infof("removeService %v at path %v. Success: %v",
|
|
|
|
|
|
|
|
s.Name, subCachePath, success)
|
|
|
|
|
|
|
|
|
|
|
|
// ExternalName services have no IP
|
|
|
|
// ExternalName services have no IP
|
|
|
|
if kapi.IsServiceIPSet(s) {
|
|
|
|
if kapi.IsServiceIPSet(s) {
|
|
|
|
delete(kd.reverseRecordMap, s.Spec.ClusterIP)
|
|
|
|
delete(kd.reverseRecordMap, s.Spec.ClusterIP)
|
|
|
@ -268,7 +287,8 @@ func (kd *KubeDNS) updateService(oldObj, newObj interface{}) {
|
|
|
|
if old, ok := assertIsService(oldObj); ok {
|
|
|
|
if old, ok := assertIsService(oldObj); ok {
|
|
|
|
// Remove old cache path only if changing type to/from ExternalName.
|
|
|
|
// Remove old cache path only if changing type to/from ExternalName.
|
|
|
|
// In all other cases, we'll update records in place.
|
|
|
|
// In all other cases, we'll update records in place.
|
|
|
|
if (new.Spec.Type == kapi.ServiceTypeExternalName) != (old.Spec.Type == kapi.ServiceTypeExternalName) {
|
|
|
|
if (new.Spec.Type == kapi.ServiceTypeExternalName) !=
|
|
|
|
|
|
|
|
(old.Spec.Type == kapi.ServiceTypeExternalName) {
|
|
|
|
kd.removeService(oldObj)
|
|
|
|
kd.removeService(oldObj)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
kd.newService(newObj)
|
|
|
|
kd.newService(newObj)
|
|
|
@ -304,7 +324,8 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er
|
|
|
|
return nil, fmt.Errorf("failed to get service object from services store - %v", err)
|
|
|
|
return nil, fmt.Errorf("failed to get service object from services store - %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !exists {
|
|
|
|
if !exists {
|
|
|
|
glog.V(1).Infof("could not find service for endpoint %q in namespace %q", e.Name, e.Namespace)
|
|
|
|
glog.V(3).Infof("No service for endpoint %q in namespace %q",
|
|
|
|
|
|
|
|
e.Name, e.Namespace)
|
|
|
|
return nil, nil
|
|
|
|
return nil, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if svc, ok := assertIsService(obj); ok {
|
|
|
|
if svc, ok := assertIsService(obj); ok {
|
|
|
@ -321,9 +342,9 @@ func (kd *KubeDNS) fqdn(service *kapi.Service, subpaths ...string) string {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (kd *KubeDNS) newPortalService(service *kapi.Service) {
|
|
|
|
func (kd *KubeDNS) newPortalService(service *kapi.Service) {
|
|
|
|
subCache := NewTreeCache()
|
|
|
|
subCache := treecache.NewTreeCache()
|
|
|
|
recordValue, recordLabel := util.GetSkyMsg(service.Spec.ClusterIP, 0)
|
|
|
|
recordValue, recordLabel := util.GetSkyMsg(service.Spec.ClusterIP, 0)
|
|
|
|
subCache.setEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel))
|
|
|
|
subCache.SetEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel))
|
|
|
|
|
|
|
|
|
|
|
|
// Generate SRV Records
|
|
|
|
// Generate SRV Records
|
|
|
|
for i := range service.Spec.Ports {
|
|
|
|
for i := range service.Spec.Ports {
|
|
|
@ -332,7 +353,9 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) {
|
|
|
|
srvValue := kd.generateSRVRecordValue(service, int(port.Port))
|
|
|
|
srvValue := kd.generateSRVRecordValue(service, int(port.Port))
|
|
|
|
|
|
|
|
|
|
|
|
l := []string{"_" + strings.ToLower(string(port.Protocol)), "_" + port.Name}
|
|
|
|
l := []string{"_" + strings.ToLower(string(port.Protocol)), "_" + port.Name}
|
|
|
|
subCache.setEntry(recordLabel, srvValue, kd.fqdn(service, append(l, recordLabel)...), l...)
|
|
|
|
glog.V(2).Infof("Added SRV record %+v", srvValue)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
subCache.SetEntry(recordLabel, srvValue, kd.fqdn(service, append(l, recordLabel)...), l...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
|
|
|
|
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
|
|
|
@ -341,7 +364,7 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) {
|
|
|
|
|
|
|
|
|
|
|
|
kd.cacheLock.Lock()
|
|
|
|
kd.cacheLock.Lock()
|
|
|
|
defer kd.cacheLock.Unlock()
|
|
|
|
defer kd.cacheLock.Unlock()
|
|
|
|
kd.cache.setSubCache(service.Name, subCache, subCachePath...)
|
|
|
|
kd.cache.SetSubCache(service.Name, subCache, subCachePath...)
|
|
|
|
kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord
|
|
|
|
kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord
|
|
|
|
kd.clusterIPServiceMap[service.Spec.ClusterIP] = service
|
|
|
|
kd.clusterIPServiceMap[service.Spec.ClusterIP] = service
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -352,7 +375,7 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
subCache := NewTreeCache()
|
|
|
|
subCache := treecache.NewTreeCache()
|
|
|
|
glog.V(4).Infof("Endpoints Annotations: %v", e.Annotations)
|
|
|
|
glog.V(4).Infof("Endpoints Annotations: %v", e.Annotations)
|
|
|
|
for idx := range e.Subsets {
|
|
|
|
for idx := range e.Subsets {
|
|
|
|
for subIdx := range e.Subsets[idx].Addresses {
|
|
|
|
for subIdx := range e.Subsets[idx].Addresses {
|
|
|
@ -362,14 +385,15 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
|
|
|
|
if hostLabel, exists := getHostname(address, podHostnames); exists {
|
|
|
|
if hostLabel, exists := getHostname(address, podHostnames); exists {
|
|
|
|
endpointName = hostLabel
|
|
|
|
endpointName = hostLabel
|
|
|
|
}
|
|
|
|
}
|
|
|
|
subCache.setEntry(endpointName, recordValue, kd.fqdn(svc, endpointName))
|
|
|
|
subCache.SetEntry(endpointName, recordValue, kd.fqdn(svc, endpointName))
|
|
|
|
for portIdx := range e.Subsets[idx].Ports {
|
|
|
|
for portIdx := range e.Subsets[idx].Ports {
|
|
|
|
endpointPort := &e.Subsets[idx].Ports[portIdx]
|
|
|
|
endpointPort := &e.Subsets[idx].Ports[portIdx]
|
|
|
|
if endpointPort.Name != "" && endpointPort.Protocol != "" {
|
|
|
|
if endpointPort.Name != "" && endpointPort.Protocol != "" {
|
|
|
|
srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName)
|
|
|
|
srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName)
|
|
|
|
|
|
|
|
glog.V(2).Infof("Added SRV record %+v", srvValue)
|
|
|
|
|
|
|
|
|
|
|
|
l := []string{"_" + strings.ToLower(string(endpointPort.Protocol)), "_" + endpointPort.Name}
|
|
|
|
l := []string{"_" + strings.ToLower(string(endpointPort.Protocol)), "_" + endpointPort.Name}
|
|
|
|
subCache.setEntry(endpointName, srvValue, kd.fqdn(svc, append(l, endpointName)...), l...)
|
|
|
|
subCache.SetEntry(endpointName, srvValue, kd.fqdn(svc, append(l, endpointName)...), l...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -377,7 +401,7 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
|
|
|
|
subCachePath := append(kd.domainPath, serviceSubdomain, svc.Namespace)
|
|
|
|
subCachePath := append(kd.domainPath, serviceSubdomain, svc.Namespace)
|
|
|
|
kd.cacheLock.Lock()
|
|
|
|
kd.cacheLock.Lock()
|
|
|
|
defer kd.cacheLock.Unlock()
|
|
|
|
defer kd.cacheLock.Unlock()
|
|
|
|
kd.cache.setSubCache(svc.Name, subCache, subCachePath...)
|
|
|
|
kd.cache.SetSubCache(svc.Name, subCache, subCachePath...)
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -430,7 +454,8 @@ func (kd *KubeDNS) newHeadlessService(service *kapi.Service) error {
|
|
|
|
return fmt.Errorf("failed to get endpoints object from endpoints store - %v", err)
|
|
|
|
return fmt.Errorf("failed to get endpoints object from endpoints store - %v", err)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !exists {
|
|
|
|
if !exists {
|
|
|
|
glog.V(1).Infof("Could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up.", service.Name, service.Namespace)
|
|
|
|
glog.V(1).Infof("Could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up.",
|
|
|
|
|
|
|
|
service.Name, service.Namespace)
|
|
|
|
return nil
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if e, ok := e.(*kapi.Endpoints); ok {
|
|
|
|
if e, ok := e.(*kapi.Endpoints); ok {
|
|
|
@ -446,11 +471,12 @@ func (kd *KubeDNS) newExternalNameService(service *kapi.Service) {
|
|
|
|
recordValue, _ := util.GetSkyMsg(service.Spec.ExternalName, 0)
|
|
|
|
recordValue, _ := util.GetSkyMsg(service.Spec.ExternalName, 0)
|
|
|
|
cachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
|
|
|
|
cachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
|
|
|
|
fqdn := kd.fqdn(service)
|
|
|
|
fqdn := kd.fqdn(service)
|
|
|
|
glog.V(2).Infof("newExternalNameService: storing key %s with value %v as %s under %v", service.Name, recordValue, fqdn, cachePath)
|
|
|
|
glog.V(2).Infof("newExternalNameService: storing key %s with value %v as %s under %v",
|
|
|
|
|
|
|
|
service.Name, recordValue, fqdn, cachePath)
|
|
|
|
kd.cacheLock.Lock()
|
|
|
|
kd.cacheLock.Lock()
|
|
|
|
defer kd.cacheLock.Unlock()
|
|
|
|
defer kd.cacheLock.Unlock()
|
|
|
|
// Store the service name directly as the leaf key
|
|
|
|
// Store the service name directly as the leaf key
|
|
|
|
kd.cache.setEntry(service.Name, recordValue, fqdn, cachePath...)
|
|
|
|
kd.cache.SetEntry(service.Name, recordValue, fqdn, cachePath...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// Records responds with DNS records that match the given name, in a format
|
|
|
|
// Records responds with DNS records that match the given name, in a format
|
|
|
@ -458,7 +484,7 @@ func (kd *KubeDNS) newExternalNameService(service *kapi.Service) {
|
|
|
|
// matching the given name is returned, otherwise all records stored under
|
|
|
|
// matching the given name is returned, otherwise all records stored under
|
|
|
|
// the subtree matching the name are returned.
|
|
|
|
// the subtree matching the name are returned.
|
|
|
|
func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) {
|
|
|
|
func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) {
|
|
|
|
glog.V(2).Infof("Received DNS Request:%s, exact:%v", name, exact)
|
|
|
|
glog.V(3).Infof("Query for %q, exact: %v", name, exact)
|
|
|
|
|
|
|
|
|
|
|
|
trimmed := strings.TrimRight(name, ".")
|
|
|
|
trimmed := strings.TrimRight(name, ".")
|
|
|
|
segments := strings.Split(trimmed, ".")
|
|
|
|
segments := strings.Split(trimmed, ".")
|
|
|
@ -466,14 +492,14 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
|
|
|
|
federationSegments := []string{}
|
|
|
|
federationSegments := []string{}
|
|
|
|
|
|
|
|
|
|
|
|
if !exact && kd.isFederationQuery(segments) {
|
|
|
|
if !exact && kd.isFederationQuery(segments) {
|
|
|
|
glog.V(2).Infof(
|
|
|
|
glog.V(3).Infof("Received federation query, trying local service first")
|
|
|
|
"federation service query: Received federation query. Going to try to find local service first")
|
|
|
|
// Try querying the non-federation (local) service first. Will try
|
|
|
|
// Try quering the non-federation (local) service first.
|
|
|
|
// the federation one later, if this fails.
|
|
|
|
// Will try the federation one later, if this fails.
|
|
|
|
|
|
|
|
isFederationQuery = true
|
|
|
|
isFederationQuery = true
|
|
|
|
federationSegments = append(federationSegments, segments...)
|
|
|
|
federationSegments = append(federationSegments, segments...)
|
|
|
|
// To try local service, remove federation name from segments.
|
|
|
|
// To try local service, remove federation name from segments.
|
|
|
|
// Federation name is 3rd in the segment (after service name and namespace).
|
|
|
|
// Federation name is 3rd in the segment (after service name and
|
|
|
|
|
|
|
|
// namespace).
|
|
|
|
segments = append(segments[:2], segments[3:]...)
|
|
|
|
segments = append(segments[:2], segments[3:]...)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -487,9 +513,11 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
|
|
|
|
if isFederationQuery {
|
|
|
|
if isFederationQuery {
|
|
|
|
return kd.recordsForFederation(records, path, exact, federationSegments)
|
|
|
|
return kd.recordsForFederation(records, path, exact, federationSegments)
|
|
|
|
} else if len(records) > 0 {
|
|
|
|
} else if len(records) > 0 {
|
|
|
|
|
|
|
|
glog.V(4).Infof("Records for %v: %v", name, records)
|
|
|
|
return records, nil
|
|
|
|
return records, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
glog.V(3).Infof("No record found for %v", name)
|
|
|
|
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
|
|
|
|
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -497,17 +525,18 @@ func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string,
|
|
|
|
// For federation query, verify that the local service has endpoints.
|
|
|
|
// For federation query, verify that the local service has endpoints.
|
|
|
|
validRecord := false
|
|
|
|
validRecord := false
|
|
|
|
for _, val := range records {
|
|
|
|
for _, val := range records {
|
|
|
|
// We know that a headless service has endpoints for sure if a record was returned for it.
|
|
|
|
// We know that a headless service has endpoints for sure if a
|
|
|
|
// The record contains endpoint IPs. So nothing to check for headless services.
|
|
|
|
// record was returned for it. The record contains endpoint
|
|
|
|
|
|
|
|
// IPs. So nothing to check for headless services.
|
|
|
|
if !kd.isHeadlessServiceRecord(&val) {
|
|
|
|
if !kd.isHeadlessServiceRecord(&val) {
|
|
|
|
ok, err := kd.serviceWithClusterIPHasEndpoints(&val)
|
|
|
|
ok, err := kd.serviceWithClusterIPHasEndpoints(&val)
|
|
|
|
if err != nil {
|
|
|
|
if err != nil {
|
|
|
|
glog.V(2).Infof(
|
|
|
|
glog.V(2).Infof(
|
|
|
|
"federation service query: unexpected error while trying to find if service has endpoint: %v", err)
|
|
|
|
"Federation: error finding if service has endpoint: %v", err)
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !ok {
|
|
|
|
if !ok {
|
|
|
|
glog.Infof("federation service query: skipping record since service has no endpoint: %v", val)
|
|
|
|
glog.V(2).Infof("Federation: skipping record since service has no endpoint: %v", val)
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -518,21 +547,24 @@ func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string,
|
|
|
|
if validRecord {
|
|
|
|
if validRecord {
|
|
|
|
// There is a local service with valid endpoints, return its CNAME.
|
|
|
|
// There is a local service with valid endpoints, return its CNAME.
|
|
|
|
name := strings.Join(util.ReverseArray(path), ".")
|
|
|
|
name := strings.Join(util.ReverseArray(path), ".")
|
|
|
|
// Ensure that this name that we are returning as a CNAME response is a fully qualified
|
|
|
|
// Ensure that this name that we are returning as a CNAME response
|
|
|
|
// domain name so that the client's resolver library doesn't have to go through its
|
|
|
|
// is a fully qualified domain name so that the client's resolver
|
|
|
|
// search list all over again.
|
|
|
|
// library doesn't have to go through its search list all over
|
|
|
|
|
|
|
|
// again.
|
|
|
|
if !strings.HasSuffix(name, ".") {
|
|
|
|
if !strings.HasSuffix(name, ".") {
|
|
|
|
name = name + "."
|
|
|
|
name = name + "."
|
|
|
|
}
|
|
|
|
}
|
|
|
|
glog.V(2).Infof("federation service query: Returning CNAME for local service : %s", name)
|
|
|
|
glog.V(3).Infof(
|
|
|
|
|
|
|
|
"Federation: Returning CNAME for local service: %v", name)
|
|
|
|
return []skymsg.Service{{Host: name}}, nil
|
|
|
|
return []skymsg.Service{{Host: name}}, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// If the name query is not an exact query and does not match any records in the local store,
|
|
|
|
// If the name query is not an exact query and does not match any
|
|
|
|
// attempt to send a federation redirect (CNAME) response.
|
|
|
|
// records in the local store, attempt to send a federation redirect
|
|
|
|
|
|
|
|
// (CNAME) response.
|
|
|
|
if !exact {
|
|
|
|
if !exact {
|
|
|
|
glog.V(2).Infof(
|
|
|
|
glog.V(3).Infof(
|
|
|
|
"federation service query: Did not find a local service. Trying federation redirect (CNAME) response")
|
|
|
|
"Federation: Did not find a local service. Trying federation redirect (CNAME)")
|
|
|
|
return kd.federationRecords(util.ReverseArray(federationSegments))
|
|
|
|
return kd.federationRecords(util.ReverseArray(federationSegments))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
@ -556,25 +588,26 @@ func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Servic
|
|
|
|
}
|
|
|
|
}
|
|
|
|
kd.cacheLock.RLock()
|
|
|
|
kd.cacheLock.RLock()
|
|
|
|
defer kd.cacheLock.RUnlock()
|
|
|
|
defer kd.cacheLock.RUnlock()
|
|
|
|
if record, ok := kd.cache.getEntry(key, path[:len(path)-1]...); ok {
|
|
|
|
if record, ok := kd.cache.GetEntry(key, path[:len(path)-1]...); ok {
|
|
|
|
glog.V(2).Infof("Exact match %v for %v received from cache", record, path[:len(path)-1])
|
|
|
|
glog.V(3).Infof("Exact match %v for %v received from cache", record, path[:len(path)-1])
|
|
|
|
return []skymsg.Service{*(record.(*skymsg.Service))}, nil
|
|
|
|
return []skymsg.Service{*(record.(*skymsg.Service))}, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
glog.V(2).Infof("Exact match for %v not found in cache", path)
|
|
|
|
|
|
|
|
|
|
|
|
glog.V(3).Infof("Exact match for %v not found in cache", path)
|
|
|
|
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
|
|
|
|
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
kd.cacheLock.RLock()
|
|
|
|
kd.cacheLock.RLock()
|
|
|
|
defer kd.cacheLock.RUnlock()
|
|
|
|
defer kd.cacheLock.RUnlock()
|
|
|
|
records := kd.cache.getValuesForPathWithWildcards(path...)
|
|
|
|
records := kd.cache.GetValuesForPathWithWildcards(path...)
|
|
|
|
glog.V(2).Infof("Received %d records for %v from cache", len(records), path)
|
|
|
|
glog.V(3).Infof("Found %d records for %v in the cache", len(records), path)
|
|
|
|
|
|
|
|
|
|
|
|
retval := []skymsg.Service{}
|
|
|
|
retval := []skymsg.Service{}
|
|
|
|
for _, val := range records {
|
|
|
|
for _, val := range records {
|
|
|
|
retval = append(retval, *val)
|
|
|
|
retval = append(retval, *val)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
glog.V(2).Infof("records:%v, retval:%v, path:%v", records, retval, path)
|
|
|
|
glog.V(4).Infof("getRecordsForPath retval=%+v, path=%v", retval, path)
|
|
|
|
|
|
|
|
|
|
|
|
return retval, nil
|
|
|
|
return retval, nil
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -619,7 +652,7 @@ func (kd *KubeDNS) serviceWithClusterIPHasEndpoints(msg *skymsg.Service) (bool,
|
|
|
|
|
|
|
|
|
|
|
|
// ReverseRecords performs a reverse lookup for the given name.
|
|
|
|
// ReverseRecords performs a reverse lookup for the given name.
|
|
|
|
func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
|
|
|
|
func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
|
|
|
|
glog.V(2).Infof("Received ReverseRecord Request:%s", name)
|
|
|
|
glog.V(3).Infof("Query for ReverseRecord %q", name)
|
|
|
|
|
|
|
|
|
|
|
|
// if portalIP is not a valid IP, the reverseRecordMap lookup will fail
|
|
|
|
// if portalIP is not a valid IP, the reverseRecordMap lookup will fail
|
|
|
|
portalIP, ok := util.ExtractIP(name)
|
|
|
|
portalIP, ok := util.ExtractIP(name)
|
|
|
@ -676,34 +709,39 @@ func (kd *KubeDNS) getPodIP(path []string) (string, error) {
|
|
|
|
// We can add support for wildcard queries later, if needed.
|
|
|
|
// We can add support for wildcard queries later, if needed.
|
|
|
|
func (kd *KubeDNS) isFederationQuery(path []string) bool {
|
|
|
|
func (kd *KubeDNS) isFederationQuery(path []string) bool {
|
|
|
|
if len(path) != 4+len(kd.domainPath) {
|
|
|
|
if len(path) != 4+len(kd.domainPath) {
|
|
|
|
glog.V(2).Infof("not a federation query: len(%q) != 4+len(%q)", path, kd.domainPath)
|
|
|
|
glog.V(4).Infof("Not a federation query: len(%q) != 4+len(%q)", path, kd.domainPath)
|
|
|
|
return false
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if errs := validation.IsDNS1035Label(path[0]); len(errs) != 0 {
|
|
|
|
if errs := validation.IsDNS1035Label(path[0]); len(errs) != 0 {
|
|
|
|
glog.V(2).Infof("not a federation query: %q is not an RFC 1035 label: %q", path[0], errs)
|
|
|
|
glog.V(4).Infof("Not a federation query: %q is not an RFC 1035 label: %q",
|
|
|
|
|
|
|
|
path[0], errs)
|
|
|
|
return false
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if errs := validation.IsDNS1123Label(path[1]); len(errs) != 0 {
|
|
|
|
if errs := validation.IsDNS1123Label(path[1]); len(errs) != 0 {
|
|
|
|
glog.V(2).Infof("not a federation query: %q is not an RFC 1123 label: %q", path[1], errs)
|
|
|
|
glog.V(4).Infof("Not a federation query: %q is not an RFC 1123 label: %q",
|
|
|
|
|
|
|
|
path[1], errs)
|
|
|
|
return false
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if errs := validation.IsDNS1123Label(path[2]); len(errs) != 0 {
|
|
|
|
if errs := validation.IsDNS1123Label(path[2]); len(errs) != 0 {
|
|
|
|
glog.V(2).Infof("not a federation query: %q is not an RFC 1123 label: %q", path[2], errs)
|
|
|
|
glog.V(4).Infof("Not a federation query: %q is not an RFC 1123 label: %q",
|
|
|
|
|
|
|
|
path[2], errs)
|
|
|
|
return false
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if path[3] != serviceSubdomain {
|
|
|
|
if path[3] != serviceSubdomain {
|
|
|
|
glog.V(2).Infof("not a federation query: %q != %q (serviceSubdomain)", path[3], serviceSubdomain)
|
|
|
|
glog.V(4).Infof("Not a federation query: %q != %q (serviceSubdomain)",
|
|
|
|
|
|
|
|
path[3], serviceSubdomain)
|
|
|
|
return false
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for i, domComp := range kd.domainPath {
|
|
|
|
for i, domComp := range kd.domainPath {
|
|
|
|
// kd.domainPath is reversed, so we need to look in the `path` in the reverse order.
|
|
|
|
// kd.domainPath is reversed, so we need to look in the `path` in the reverse order.
|
|
|
|
if domComp != path[len(path)-i-1] {
|
|
|
|
if domComp != path[len(path)-i-1] {
|
|
|
|
glog.V(2).Infof("not a federation query: kd.domainPath[%d] != path[%d] (%q != %q)", i, len(path)-i-1, domComp, path[len(path)-i-1])
|
|
|
|
glog.V(4).Infof("Not a federation query: kd.domainPath[%d] != path[%d] (%q != %q)",
|
|
|
|
|
|
|
|
i, len(path)-i-1, domComp, path[len(path)-i-1])
|
|
|
|
return false
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if _, ok := kd.federations[path[2]]; !ok {
|
|
|
|
if _, ok := kd.federations[path[2]]; !ok {
|
|
|
|
glog.V(2).Infof("not a federation query: kd.federations[%q] not found", path[2])
|
|
|
|
glog.V(4).Infof("Not a federation query: kd.federations[%q] not found", path[2])
|
|
|
|
return false
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return true
|
|
|
|
return true
|
|
|
|