diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index 89c785cc8d6..1fa5b4e70ac 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -19,7 +19,6 @@ package dns import ( "encoding/json" "fmt" - "hash/fnv" "net" "strings" "sync" @@ -33,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" kcache "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/dns/util" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/validation" "k8s.io/kubernetes/pkg/util/wait" @@ -50,9 +50,6 @@ const ( // A subdomain added to the user specified dmoain for all pods. podSubdomain = "pod" - // arpaSuffix is the standard suffix for PTR IP reverse lookups. - arpaSuffix = ".in-addr.arpa." - // Resync period for the kube controller loop. resyncPeriod = 5 * time.Minute @@ -62,15 +59,6 @@ const ( // never change. So we expire the cache and retrieve a node once every 180 seconds. // The value is chosen to be neither too long nor too short. nodeCacheTTL = 180 * time.Second - - // default priority used for service records - defaultPriority = 10 - - // default weight used for service records - defaultWeight = 10 - - // default TTL used for service records - defaultTTL = 30 ) type KubeDNS struct { @@ -142,7 +130,7 @@ func NewKubeDNS(client clientset.Interface, domain string, federations map[strin nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc), reverseRecordMap: make(map[string]*skymsg.Service), clusterIPServiceMap: make(map[string]*kapi.Service), - domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), + domainPath: util.ReverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), federations: federations, } kd.setEndpointsStore() @@ -329,12 +317,12 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er // elements rooted at the given service, ending at a service record. func (kd *KubeDNS) fqdn(service *kapi.Service, subpaths ...string) string { domainLabels := append(append(kd.domainPath, serviceSubdomain, service.Namespace, service.Name), subpaths...) - return dns.Fqdn(strings.Join(reverseArray(domainLabels), ".")) + return dns.Fqdn(strings.Join(util.ReverseArray(domainLabels), ".")) } func (kd *KubeDNS) newPortalService(service *kapi.Service) { subCache := NewTreeCache() - recordValue, recordLabel := getSkyMsg(service.Spec.ClusterIP, 0) + recordValue, recordLabel := util.GetSkyMsg(service.Spec.ClusterIP, 0) subCache.setEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel)) // Generate SRV Records @@ -348,8 +336,8 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) { } } subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace) - host := kd.getServiceFQDN(service) - reverseRecord, _ := getSkyMsg(host, 0) + host := getServiceFQDN(kd.domain, service) + reverseRecord, _ := util.GetSkyMsg(host, 0) kd.cacheLock.Lock() defer kd.cacheLock.Unlock() @@ -370,7 +358,7 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap for subIdx := range e.Subsets[idx].Addresses { address := &e.Subsets[idx].Addresses[subIdx] endpointIP := address.IP - recordValue, endpointName := getSkyMsg(endpointIP, 0) + recordValue, endpointName := util.GetSkyMsg(endpointIP, 0) if hostLabel, exists := getHostname(address, podHostnames); exists { endpointName = hostLabel } @@ -422,7 +410,7 @@ func (kd *KubeDNS) generateSRVRecordValue(svc *kapi.Service, portNumber int, lab for _, cNameLabel := range labels { host = cNameLabel + "." + host } - recordValue, _ := getSkyMsg(host, portNumber) + recordValue, _ := util.GetSkyMsg(host, portNumber) return recordValue } @@ -455,7 +443,7 @@ func (kd *KubeDNS) newHeadlessService(service *kapi.Service) error { func (kd *KubeDNS) newExternalNameService(service *kapi.Service) { // Create a CNAME record for the service's ExternalName. // TODO: TTL? - recordValue, _ := getSkyMsg(service.Spec.ExternalName, 0) + recordValue, _ := util.GetSkyMsg(service.Spec.ExternalName, 0) cachePath := append(kd.domainPath, serviceSubdomain, service.Namespace) fqdn := kd.fqdn(service) glog.V(2).Infof("newExternalNameService: storing key %s with value %v as %s under %v", service.Name, recordValue, fqdn, cachePath) @@ -471,12 +459,15 @@ func (kd *KubeDNS) newExternalNameService(service *kapi.Service) { // the subtree matching the name are returned. func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) { glog.V(2).Infof("Received DNS Request:%s, exact:%v", name, exact) + trimmed := strings.TrimRight(name, ".") segments := strings.Split(trimmed, ".") isFederationQuery := false federationSegments := []string{} + if !exact && kd.isFederationQuery(segments) { - glog.V(2).Infof("federation service query: Received federation query. Going to try to find local service first") + glog.V(2).Infof( + "federation service query: Received federation query. Going to try to find local service first") // Try quering the non-federation (local) service first. // Will try the federation one later, if this fails. isFederationQuery = true @@ -485,18 +476,24 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er // Federation name is 3rd in the segment (after service name and namespace). segments = append(segments[:2], segments[3:]...) } - path := reverseArray(segments) + + path := util.ReverseArray(segments) records, err := kd.getRecordsForPath(path, exact) + if err != nil { return nil, err } - if !isFederationQuery { - if len(records) > 0 { - return records, nil - } - return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} + + if isFederationQuery { + return kd.recordsForFederation(records, path, exact, federationSegments) + } else if len(records) > 0 { + return records, nil } + return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} +} + +func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string, exact bool, federationSegments []string) (retval []skymsg.Service, err error) { // For federation query, verify that the local service has endpoints. validRecord := false for _, val := range records { @@ -505,7 +502,8 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er if !kd.isHeadlessServiceRecord(&val) { ok, err := kd.serviceWithClusterIPHasEndpoints(&val) if err != nil { - glog.V(2).Infof("federation service query: unexpected error while trying to find if service has endpoint: %v", err) + glog.V(2).Infof( + "federation service query: unexpected error while trying to find if service has endpoint: %v", err) continue } if !ok { @@ -516,35 +514,36 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er validRecord = true break } + if validRecord { // There is a local service with valid endpoints, return its CNAME. - name := strings.Join(reverseArray(path), ".") + name := strings.Join(util.ReverseArray(path), ".") // Ensure that this name that we are returning as a CNAME response is a fully qualified // domain name so that the client's resolver library doesn't have to go through its // search list all over again. if !strings.HasSuffix(name, ".") { name = name + "." } - glog.Infof("federation service query: Returning CNAME for local service : %s", name) + glog.V(2).Infof("federation service query: Returning CNAME for local service : %s", name) return []skymsg.Service{{Host: name}}, nil } // If the name query is not an exact query and does not match any records in the local store, // attempt to send a federation redirect (CNAME) response. if !exact { - glog.V(2).Infof("federation service query: Did not find a local service. Trying federation redirect (CNAME) response") - return kd.federationRecords(reverseArray(federationSegments)) + glog.V(2).Infof( + "federation service query: Did not find a local service. Trying federation redirect (CNAME) response") + return kd.federationRecords(util.ReverseArray(federationSegments)) } return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} } func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Service, error) { - retval := []skymsg.Service{} if kd.isPodRecord(path) { ip, err := kd.getPodIP(path) if err == nil { - skyMsg, _ := getSkyMsg(ip, 0) + skyMsg, _ := util.GetSkyMsg(ip, 0) return []skymsg.Service{*skyMsg}, nil } return nil, err @@ -569,10 +568,14 @@ func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Servic defer kd.cacheLock.RUnlock() records := kd.cache.getValuesForPathWithWildcards(path...) glog.V(2).Infof("Received %d records for %v from cache", len(records), path) + + retval := []skymsg.Service{} for _, val := range records { retval = append(retval, *val) } + glog.V(2).Infof("records:%v, retval:%v, path:%v", records, retval, path) + return retval, nil } @@ -619,7 +622,7 @@ func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) { glog.V(2).Infof("Received ReverseRecord Request:%s", name) // if portalIP is not a valid IP, the reverseRecordMap lookup will fail - portalIP, ok := extractIP(name) + portalIP, ok := util.ExtractIP(name) if !ok { return nil, fmt.Errorf("does not support reverse lookup for %s", name) } @@ -633,19 +636,6 @@ func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) { return nil, fmt.Errorf("must be exactly one service record") } -// extractIP turns a standard PTR reverse record lookup name -// into an IP address -func extractIP(reverseName string) (string, bool) { - if !strings.HasSuffix(reverseName, arpaSuffix) { - return "", false - } - search := strings.TrimSuffix(reverseName, arpaSuffix) - - // reverse the segments and then combine them - segments := reverseArray(strings.Split(search, ".")) - return strings.Join(segments, "."), true -} - // e.g {"local", "cluster", "pod", "default", "10-0-0-1"} func (kd *KubeDNS) isPodRecord(path []string) bool { if len(path) != len(kd.domainPath)+3 { @@ -671,32 +661,6 @@ func (kd *KubeDNS) getPodIP(path []string) (string, error) { return "", fmt.Errorf("Invalid IP Address %v", ip) } -func hashServiceRecord(msg *skymsg.Service) string { - s := fmt.Sprintf("%v", msg) - h := fnv.New32a() - h.Write([]byte(s)) - return fmt.Sprintf("%x", h.Sum32()) -} - -func newServiceRecord(ip string, port int) *skymsg.Service { - return &skymsg.Service{ - Host: ip, - Port: port, - Priority: defaultPriority, - Weight: defaultWeight, - Ttl: defaultTTL, - } -} - -// Returns record in a format that SkyDNS understands. -// Also return the hash of the record. -func getSkyMsg(ip string, port int) (*skymsg.Service, string) { - msg := newServiceRecord(ip, port) - hash := hashServiceRecord(msg) - glog.V(2).Infof("DNS Record:%s, hash:%s", fmt.Sprintf("%v", msg), hash) - return msg, fmt.Sprintf("%x", hash) -} - // isFederationQuery checks if the given query `path` matches the federated service query pattern. // The conjunction of the following conditions forms the test for the federated service query // pattern: @@ -752,7 +716,7 @@ func (kd *KubeDNS) federationRecords(queryPath []string) ([]skymsg.Service, erro // `queryPath` is a reversed-array of the queried name, reverse it back to make it easy // to follow through this code and reduce confusion. There is no reason for it to be // reversed here. - path := reverseArray(queryPath) + path := util.ReverseArray(queryPath) // Check if the name query matches the federation query pattern. if !kd.isFederationQuery(path) { @@ -850,14 +814,7 @@ func (kd *KubeDNS) getClusterZoneAndRegion() (string, string, error) { return zone, region, nil } -func (kd *KubeDNS) getServiceFQDN(service *kapi.Service) string { - return strings.Join([]string{service.Name, service.Namespace, serviceSubdomain, kd.domain}, ".") -} - -func reverseArray(arr []string) []string { - for i := 0; i < len(arr)/2; i++ { - j := len(arr) - i - 1 - arr[i], arr[j] = arr[j], arr[i] - } - return arr +func getServiceFQDN(domain string, service *kapi.Service) string { + return strings.Join( + []string{service.Name, service.Namespace, serviceSubdomain, domain}, ".") } diff --git a/pkg/dns/dns_test.go b/pkg/dns/dns_test.go index 256aa47cc04..b0a39a57a10 100644 --- a/pkg/dns/dns_test.go +++ b/pkg/dns/dns_test.go @@ -35,6 +35,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" fake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/dns/util" "k8s.io/kubernetes/pkg/util/sets" ) @@ -54,7 +55,7 @@ func newKubeDNS() *KubeDNS { reverseRecordMap: make(map[string]*skymsg.Service), clusterIPServiceMap: make(map[string]*kapi.Service), cacheLock: sync.RWMutex{}, - domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")), + domainPath: util.ReverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")), nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), } return kd @@ -190,7 +191,9 @@ func TestSkySimpleSRVLookup(t *testing.T) { targets := []string{} for _, eip := range endpointIPs { // A portal service is always created with a port of '0' - targets = append(targets, fmt.Sprintf("%v.%v", fmt.Sprintf("%x", hashServiceRecord(newServiceRecord(eip, 0))), name)) + targets = append(targets, + fmt.Sprintf("%x.%v", + util.HashServiceRecord(util.NewServiceRecord(eip, 0)), name)) } assertSRVRecordsMatchTarget(t, rec, targets...) } @@ -255,7 +258,8 @@ func TestSkyNamedPortSRVLookup(t *testing.T) { svcDomain := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".") assertARecordsMatchIPs(t, extra, eip) - assertSRVRecordsMatchTarget(t, rec, fmt.Sprintf("%v.%v", fmt.Sprintf("%x", hashServiceRecord(newServiceRecord(eip, 0))), svcDomain)) + assertSRVRecordsMatchTarget( + t, rec, fmt.Sprintf("%x.%v", util.HashServiceRecord(util.NewServiceRecord(eip, 0)), svcDomain)) assertSRVRecordsMatchPort(t, rec, 8081) } @@ -667,7 +671,7 @@ func assertDNSForHeadlessService(t *testing.T, kd *KubeDNS, e *kapi.Endpoints) { } func assertDNSForExternalService(t *testing.T, kd *KubeDNS, s *kapi.Service) { - records, err := kd.Records(getServiceFQDN(kd, s), false) + records, err := kd.Records(getServiceFQDN(kd.domain, s), false) require.NoError(t, err) assert.Equal(t, 1, len(records)) assert.Equal(t, testExternalName, records[0].Host) @@ -700,13 +704,13 @@ func getIPForCName(t *testing.T, kd *KubeDNS, cname string) string { } func assertNoDNSForHeadlessService(t *testing.T, kd *KubeDNS, s *kapi.Service) { - records, err := kd.Records(getServiceFQDN(kd, s), false) + records, err := kd.Records(getServiceFQDN(kd.domain, s), false) require.Error(t, err) assert.Equal(t, 0, len(records)) } func assertNoDNSForExternalService(t *testing.T, kd *KubeDNS, s *kapi.Service) { - records, err := kd.Records(getServiceFQDN(kd, s), false) + records, err := kd.Records(getServiceFQDN(kd.domain, s), false) require.Error(t, err) assert.Equal(t, 0, len(records)) } @@ -715,7 +719,7 @@ func assertSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portName records, err := kd.Records(getSRVFQDN(kd, s, portName), false) require.NoError(t, err) assert.Equal(t, 1, len(records)) - assert.Equal(t, getServiceFQDN(kd, s), records[0].Host) + assert.Equal(t, getServiceFQDN(kd.domain, s), records[0].Host) } func assertNoSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portName string) { @@ -725,7 +729,7 @@ func assertNoSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portNam } func assertNoDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) { - serviceFQDN := getServiceFQDN(kd, s) + serviceFQDN := getServiceFQDN(kd.domain, s) queries := getEquivalentQueries(serviceFQDN, s.Namespace) for _, query := range queries { records, err := kd.Records(query, false) @@ -735,7 +739,7 @@ func assertNoDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) { } func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) { - serviceFQDN := getServiceFQDN(kd, s) + serviceFQDN := getServiceFQDN(kd.domain, s) queries := getEquivalentQueries(serviceFQDN, s.Namespace) for _, query := range queries { records, err := kd.Records(query, false) @@ -746,16 +750,16 @@ func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) { } func assertReverseRecord(t *testing.T, kd *KubeDNS, s *kapi.Service) { - segments := reverseArray(strings.Split(s.Spec.ClusterIP, ".")) - reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), arpaSuffix) + segments := util.ReverseArray(strings.Split(s.Spec.ClusterIP, ".")) + reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), util.ArpaSuffix) reverseRecord, err := kd.ReverseRecord(reverseLookup) require.NoError(t, err) - assert.Equal(t, kd.getServiceFQDN(s), reverseRecord.Host) + assert.Equal(t, getServiceFQDN(kd.domain, s), reverseRecord.Host) } func assertNoReverseRecord(t *testing.T, kd *KubeDNS, s *kapi.Service) { - segments := reverseArray(strings.Split(s.Spec.ClusterIP, ".")) - reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), arpaSuffix) + segments := util.ReverseArray(strings.Split(s.Spec.ClusterIP, ".")) + reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), util.ArpaSuffix) reverseRecord, err := kd.ReverseRecord(reverseLookup) require.Error(t, err) require.Nil(t, reverseRecord) @@ -775,10 +779,6 @@ func getFederationServiceFQDN(kd *KubeDNS, s *kapi.Service, federationName strin return fmt.Sprintf("%s.%s.%s.svc.%s", s.Name, s.Namespace, federationName, kd.domain) } -func getServiceFQDN(kd *KubeDNS, s *kapi.Service) string { - return fmt.Sprintf("%s.%s.svc.%s", s.Name, s.Namespace, kd.domain) -} - func getEndpointsFQDN(kd *KubeDNS, e *kapi.Endpoints) string { return fmt.Sprintf("%s.%s.svc.%s", e.Name, e.Namespace, kd.domain) } diff --git a/pkg/dns/util/util.go b/pkg/dns/util/util.go new file mode 100644 index 00000000000..a8f92b3c720 --- /dev/null +++ b/pkg/dns/util/util.go @@ -0,0 +1,88 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "hash/fnv" + "strings" + + "github.com/golang/glog" + "github.com/skynetservices/skydns/msg" +) + +const ( + // ArpaSuffix is the standard suffix for PTR IP reverse lookups. + ArpaSuffix = ".in-addr.arpa." + // defaultPriority used for service records + defaultPriority = 10 + // defaultWeight used for service records + defaultWeight = 10 + // defaultTTL used for service records + defaultTTL = 30 +) + +// extractIP turns a standard PTR reverse record lookup name +// into an IP address +func ExtractIP(reverseName string) (string, bool) { + if !strings.HasSuffix(reverseName, ArpaSuffix) { + return "", false + } + search := strings.TrimSuffix(reverseName, ArpaSuffix) + + // reverse the segments and then combine them + segments := ReverseArray(strings.Split(search, ".")) + return strings.Join(segments, "."), true +} + +// ReverseArray reverses an array. +func ReverseArray(arr []string) []string { + for i := 0; i < len(arr)/2; i++ { + j := len(arr) - i - 1 + arr[i], arr[j] = arr[j], arr[i] + } + return arr +} + +// Returns record in a format that SkyDNS understands. +// Also return the hash of the record. +func GetSkyMsg(ip string, port int) (*msg.Service, string) { + msg := NewServiceRecord(ip, port) + hash := HashServiceRecord(msg) + glog.V(2).Infof("DNS Record:%s, hash:%s", fmt.Sprintf("%v", msg), hash) + return msg, fmt.Sprintf("%x", hash) +} + +// NewServiceRecord creates a new service DNS message. +func NewServiceRecord(ip string, port int) *msg.Service { + return &msg.Service{ + Host: ip, + Port: port, + Priority: defaultPriority, + Weight: defaultWeight, + Ttl: defaultTTL, + } +} + +// HashServiceRecord hashes the string representation of a DNS +// message. +func HashServiceRecord(msg *msg.Service) string { + s := fmt.Sprintf("%v", msg) + h := fnv.New32a() + h.Write([]byte(s)) + return fmt.Sprintf("%x", h.Sum32()) +}