Moves some common functions into package dns/util

This commit is contained in:
Bowei Du 2016-10-20 16:32:03 -07:00
parent ffed9c6f93
commit 28cd4c58e3
3 changed files with 115 additions and 51 deletions

View File

@ -50,9 +50,6 @@ const (
// A subdomain added to the user specified dmoain for all pods.
podSubdomain = "pod"
// arpaSuffix is the standard suffix for PTR IP reverse lookups.
arpaSuffix = ".in-addr.arpa."
// Resync period for the kube controller loop.
resyncPeriod = 5 * time.Minute
@ -133,7 +130,7 @@ func NewKubeDNS(client clientset.Interface, domain string, federations map[strin
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*kapi.Service),
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
domainPath: util.ReverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
federations: federations,
}
kd.setEndpointsStore()
@ -320,7 +317,7 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er
// elements rooted at the given service, ending at a service record.
func (kd *KubeDNS) fqdn(service *kapi.Service, subpaths ...string) string {
domainLabels := append(append(kd.domainPath, serviceSubdomain, service.Namespace, service.Name), subpaths...)
return dns.Fqdn(strings.Join(reverseArray(domainLabels), "."))
return dns.Fqdn(strings.Join(util.ReverseArray(domainLabels), "."))
}
func (kd *KubeDNS) newPortalService(service *kapi.Service) {
@ -339,7 +336,7 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) {
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
host := kd.getServiceFQDN(service)
host := getServiceFQDN(kd.domain, service)
reverseRecord, _ := util.GetSkyMsg(host, 0)
kd.cacheLock.Lock()
@ -480,7 +477,7 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
segments = append(segments[:2], segments[3:]...)
}
path := reverseArray(segments)
path := util.ReverseArray(segments)
records, err := kd.getRecordsForPath(path, exact)
if err != nil {
@ -520,7 +517,7 @@ func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string,
if validRecord {
// There is a local service with valid endpoints, return its CNAME.
name := strings.Join(reverseArray(path), ".")
name := strings.Join(util.ReverseArray(path), ".")
// Ensure that this name that we are returning as a CNAME response is a fully qualified
// domain name so that the client's resolver library doesn't have to go through its
// search list all over again.
@ -536,14 +533,13 @@ func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string,
if !exact {
glog.V(2).Infof(
"federation service query: Did not find a local service. Trying federation redirect (CNAME) response")
return kd.federationRecords(reverseArray(federationSegments))
return kd.federationRecords(util.ReverseArray(federationSegments))
}
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Service, error) {
retval := []skymsg.Service{}
if kd.isPodRecord(path) {
ip, err := kd.getPodIP(path)
if err == nil {
@ -572,10 +568,14 @@ func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Servic
defer kd.cacheLock.RUnlock()
records := kd.cache.getValuesForPathWithWildcards(path...)
glog.V(2).Infof("Received %d records for %v from cache", len(records), path)
retval := []skymsg.Service{}
for _, val := range records {
retval = append(retval, *val)
}
glog.V(2).Infof("records:%v, retval:%v, path:%v", records, retval, path)
return retval, nil
}
@ -622,7 +622,7 @@ func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
glog.V(2).Infof("Received ReverseRecord Request:%s", name)
// if portalIP is not a valid IP, the reverseRecordMap lookup will fail
portalIP, ok := extractIP(name)
portalIP, ok := util.ExtractIP(name)
if !ok {
return nil, fmt.Errorf("does not support reverse lookup for %s", name)
}
@ -636,19 +636,6 @@ func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
return nil, fmt.Errorf("must be exactly one service record")
}
// extractIP turns a standard PTR reverse record lookup name
// into an IP address
func extractIP(reverseName string) (string, bool) {
if !strings.HasSuffix(reverseName, arpaSuffix) {
return "", false
}
search := strings.TrimSuffix(reverseName, arpaSuffix)
// reverse the segments and then combine them
segments := reverseArray(strings.Split(search, "."))
return strings.Join(segments, "."), true
}
// e.g {"local", "cluster", "pod", "default", "10-0-0-1"}
func (kd *KubeDNS) isPodRecord(path []string) bool {
if len(path) != len(kd.domainPath)+3 {
@ -729,7 +716,7 @@ func (kd *KubeDNS) federationRecords(queryPath []string) ([]skymsg.Service, erro
// `queryPath` is a reversed-array of the queried name, reverse it back to make it easy
// to follow through this code and reduce confusion. There is no reason for it to be
// reversed here.
path := reverseArray(queryPath)
path := util.ReverseArray(queryPath)
// Check if the name query matches the federation query pattern.
if !kd.isFederationQuery(path) {
@ -827,14 +814,7 @@ func (kd *KubeDNS) getClusterZoneAndRegion() (string, string, error) {
return zone, region, nil
}
func (kd *KubeDNS) getServiceFQDN(service *kapi.Service) string {
return strings.Join([]string{service.Name, service.Namespace, serviceSubdomain, kd.domain}, ".")
}
func reverseArray(arr []string) []string {
for i := 0; i < len(arr)/2; i++ {
j := len(arr) - i - 1
arr[i], arr[j] = arr[j], arr[i]
}
return arr
func getServiceFQDN(domain string, service *kapi.Service) string {
return strings.Join(
[]string{service.Name, service.Namespace, serviceSubdomain, domain}, ".")
}

View File

@ -55,7 +55,7 @@ func newKubeDNS() *KubeDNS {
reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*kapi.Service),
cacheLock: sync.RWMutex{},
domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
domainPath: util.ReverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
}
return kd
@ -671,7 +671,7 @@ func assertDNSForHeadlessService(t *testing.T, kd *KubeDNS, e *kapi.Endpoints) {
}
func assertDNSForExternalService(t *testing.T, kd *KubeDNS, s *kapi.Service) {
records, err := kd.Records(getServiceFQDN(kd, s), false)
records, err := kd.Records(getServiceFQDN(kd.domain, s), false)
require.NoError(t, err)
assert.Equal(t, 1, len(records))
assert.Equal(t, testExternalName, records[0].Host)
@ -704,13 +704,13 @@ func getIPForCName(t *testing.T, kd *KubeDNS, cname string) string {
}
func assertNoDNSForHeadlessService(t *testing.T, kd *KubeDNS, s *kapi.Service) {
records, err := kd.Records(getServiceFQDN(kd, s), false)
records, err := kd.Records(getServiceFQDN(kd.domain, s), false)
require.Error(t, err)
assert.Equal(t, 0, len(records))
}
func assertNoDNSForExternalService(t *testing.T, kd *KubeDNS, s *kapi.Service) {
records, err := kd.Records(getServiceFQDN(kd, s), false)
records, err := kd.Records(getServiceFQDN(kd.domain, s), false)
require.Error(t, err)
assert.Equal(t, 0, len(records))
}
@ -719,7 +719,7 @@ func assertSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portName
records, err := kd.Records(getSRVFQDN(kd, s, portName), false)
require.NoError(t, err)
assert.Equal(t, 1, len(records))
assert.Equal(t, getServiceFQDN(kd, s), records[0].Host)
assert.Equal(t, getServiceFQDN(kd.domain, s), records[0].Host)
}
func assertNoSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portName string) {
@ -729,7 +729,7 @@ func assertNoSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portNam
}
func assertNoDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
serviceFQDN := getServiceFQDN(kd, s)
serviceFQDN := getServiceFQDN(kd.domain, s)
queries := getEquivalentQueries(serviceFQDN, s.Namespace)
for _, query := range queries {
records, err := kd.Records(query, false)
@ -739,7 +739,7 @@ func assertNoDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
}
func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
serviceFQDN := getServiceFQDN(kd, s)
serviceFQDN := getServiceFQDN(kd.domain, s)
queries := getEquivalentQueries(serviceFQDN, s.Namespace)
for _, query := range queries {
records, err := kd.Records(query, false)
@ -750,16 +750,16 @@ func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
}
func assertReverseRecord(t *testing.T, kd *KubeDNS, s *kapi.Service) {
segments := reverseArray(strings.Split(s.Spec.ClusterIP, "."))
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), arpaSuffix)
segments := util.ReverseArray(strings.Split(s.Spec.ClusterIP, "."))
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), util.ArpaSuffix)
reverseRecord, err := kd.ReverseRecord(reverseLookup)
require.NoError(t, err)
assert.Equal(t, kd.getServiceFQDN(s), reverseRecord.Host)
assert.Equal(t, getServiceFQDN(kd.domain, s), reverseRecord.Host)
}
func assertNoReverseRecord(t *testing.T, kd *KubeDNS, s *kapi.Service) {
segments := reverseArray(strings.Split(s.Spec.ClusterIP, "."))
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), arpaSuffix)
segments := util.ReverseArray(strings.Split(s.Spec.ClusterIP, "."))
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), util.ArpaSuffix)
reverseRecord, err := kd.ReverseRecord(reverseLookup)
require.Error(t, err)
require.Nil(t, reverseRecord)
@ -779,10 +779,6 @@ func getFederationServiceFQDN(kd *KubeDNS, s *kapi.Service, federationName strin
return fmt.Sprintf("%s.%s.%s.svc.%s", s.Name, s.Namespace, federationName, kd.domain)
}
func getServiceFQDN(kd *KubeDNS, s *kapi.Service) string {
return fmt.Sprintf("%s.%s.svc.%s", s.Name, s.Namespace, kd.domain)
}
func getEndpointsFQDN(kd *KubeDNS, e *kapi.Endpoints) string {
return fmt.Sprintf("%s.%s.svc.%s", e.Name, e.Namespace, kd.domain)
}

88
pkg/dns/util/util.go Normal file
View File

@ -0,0 +1,88 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"hash/fnv"
"strings"
"github.com/golang/glog"
"github.com/skynetservices/skydns/msg"
)
const (
// ArpaSuffix is the standard suffix for PTR IP reverse lookups.
ArpaSuffix = ".in-addr.arpa."
// defaultPriority used for service records
defaultPriority = 10
// defaultWeight used for service records
defaultWeight = 10
// defaultTTL used for service records
defaultTTL = 30
)
// extractIP turns a standard PTR reverse record lookup name
// into an IP address
func ExtractIP(reverseName string) (string, bool) {
if !strings.HasSuffix(reverseName, ArpaSuffix) {
return "", false
}
search := strings.TrimSuffix(reverseName, ArpaSuffix)
// reverse the segments and then combine them
segments := ReverseArray(strings.Split(search, "."))
return strings.Join(segments, "."), true
}
// ReverseArray reverses an array.
func ReverseArray(arr []string) []string {
for i := 0; i < len(arr)/2; i++ {
j := len(arr) - i - 1
arr[i], arr[j] = arr[j], arr[i]
}
return arr
}
// Returns record in a format that SkyDNS understands.
// Also return the hash of the record.
func GetSkyMsg(ip string, port int) (*msg.Service, string) {
msg := NewServiceRecord(ip, port)
hash := HashServiceRecord(msg)
glog.V(2).Infof("DNS Record:%s, hash:%s", fmt.Sprintf("%v", msg), hash)
return msg, fmt.Sprintf("%x", hash)
}
// NewServiceRecord creates a new service DNS message.
func NewServiceRecord(ip string, port int) *msg.Service {
return &msg.Service{
Host: ip,
Port: port,
Priority: defaultPriority,
Weight: defaultWeight,
Ttl: defaultTTL,
}
}
// HashServiceRecord hashes the string representation of a DNS
// message.
func HashServiceRecord(msg *msg.Service) string {
s := fmt.Sprintf("%v", msg)
h := fnv.New32a()
h.Write([]byte(s))
return fmt.Sprintf("%x", h.Sum32())
}