Merge pull request #35246 from bowei/kubedns-logging

Automatic merge from submit-queue

Refactor some functions in kube-dns to reduce surface area

- Moves federation query path out to its own method
- Creates dns/util and moves some trivial methods to that package

This is just moving of code.
This commit is contained in:
Kubernetes Submit Queue 2016-10-22 23:15:25 -07:00 committed by GitHub
commit 432bbb5d5a
3 changed files with 149 additions and 104 deletions

View File

@ -19,7 +19,6 @@ package dns
import (
"encoding/json"
"fmt"
"hash/fnv"
"net"
"strings"
"sync"
@ -33,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
kcache "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/dns/util"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/wait"
@ -50,9 +50,6 @@ const (
// A subdomain added to the user specified dmoain for all pods.
podSubdomain = "pod"
// arpaSuffix is the standard suffix for PTR IP reverse lookups.
arpaSuffix = ".in-addr.arpa."
// Resync period for the kube controller loop.
resyncPeriod = 5 * time.Minute
@ -62,15 +59,6 @@ const (
// never change. So we expire the cache and retrieve a node once every 180 seconds.
// The value is chosen to be neither too long nor too short.
nodeCacheTTL = 180 * time.Second
// default priority used for service records
defaultPriority = 10
// default weight used for service records
defaultWeight = 10
// default TTL used for service records
defaultTTL = 30
)
type KubeDNS struct {
@ -142,7 +130,7 @@ func NewKubeDNS(client clientset.Interface, domain string, federations map[strin
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*kapi.Service),
domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
domainPath: util.ReverseArray(strings.Split(strings.TrimRight(domain, "."), ".")),
federations: federations,
}
kd.setEndpointsStore()
@ -329,12 +317,12 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er
// elements rooted at the given service, ending at a service record.
func (kd *KubeDNS) fqdn(service *kapi.Service, subpaths ...string) string {
domainLabels := append(append(kd.domainPath, serviceSubdomain, service.Namespace, service.Name), subpaths...)
return dns.Fqdn(strings.Join(reverseArray(domainLabels), "."))
return dns.Fqdn(strings.Join(util.ReverseArray(domainLabels), "."))
}
func (kd *KubeDNS) newPortalService(service *kapi.Service) {
subCache := NewTreeCache()
recordValue, recordLabel := getSkyMsg(service.Spec.ClusterIP, 0)
recordValue, recordLabel := util.GetSkyMsg(service.Spec.ClusterIP, 0)
subCache.setEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel))
// Generate SRV Records
@ -348,8 +336,8 @@ func (kd *KubeDNS) newPortalService(service *kapi.Service) {
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
host := kd.getServiceFQDN(service)
reverseRecord, _ := getSkyMsg(host, 0)
host := getServiceFQDN(kd.domain, service)
reverseRecord, _ := util.GetSkyMsg(host, 0)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
@ -370,7 +358,7 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
for subIdx := range e.Subsets[idx].Addresses {
address := &e.Subsets[idx].Addresses[subIdx]
endpointIP := address.IP
recordValue, endpointName := getSkyMsg(endpointIP, 0)
recordValue, endpointName := util.GetSkyMsg(endpointIP, 0)
if hostLabel, exists := getHostname(address, podHostnames); exists {
endpointName = hostLabel
}
@ -422,7 +410,7 @@ func (kd *KubeDNS) generateSRVRecordValue(svc *kapi.Service, portNumber int, lab
for _, cNameLabel := range labels {
host = cNameLabel + "." + host
}
recordValue, _ := getSkyMsg(host, portNumber)
recordValue, _ := util.GetSkyMsg(host, portNumber)
return recordValue
}
@ -455,7 +443,7 @@ func (kd *KubeDNS) newHeadlessService(service *kapi.Service) error {
func (kd *KubeDNS) newExternalNameService(service *kapi.Service) {
// Create a CNAME record for the service's ExternalName.
// TODO: TTL?
recordValue, _ := getSkyMsg(service.Spec.ExternalName, 0)
recordValue, _ := util.GetSkyMsg(service.Spec.ExternalName, 0)
cachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
fqdn := kd.fqdn(service)
glog.V(2).Infof("newExternalNameService: storing key %s with value %v as %s under %v", service.Name, recordValue, fqdn, cachePath)
@ -471,12 +459,15 @@ func (kd *KubeDNS) newExternalNameService(service *kapi.Service) {
// the subtree matching the name are returned.
func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) {
glog.V(2).Infof("Received DNS Request:%s, exact:%v", name, exact)
trimmed := strings.TrimRight(name, ".")
segments := strings.Split(trimmed, ".")
isFederationQuery := false
federationSegments := []string{}
if !exact && kd.isFederationQuery(segments) {
glog.V(2).Infof("federation service query: Received federation query. Going to try to find local service first")
glog.V(2).Infof(
"federation service query: Received federation query. Going to try to find local service first")
// Try quering the non-federation (local) service first.
// Will try the federation one later, if this fails.
isFederationQuery = true
@ -485,18 +476,24 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
// Federation name is 3rd in the segment (after service name and namespace).
segments = append(segments[:2], segments[3:]...)
}
path := reverseArray(segments)
path := util.ReverseArray(segments)
records, err := kd.getRecordsForPath(path, exact)
if err != nil {
return nil, err
}
if !isFederationQuery {
if len(records) > 0 {
return records, nil
}
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
if isFederationQuery {
return kd.recordsForFederation(records, path, exact, federationSegments)
} else if len(records) > 0 {
return records, nil
}
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string, exact bool, federationSegments []string) (retval []skymsg.Service, err error) {
// For federation query, verify that the local service has endpoints.
validRecord := false
for _, val := range records {
@ -505,7 +502,8 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
if !kd.isHeadlessServiceRecord(&val) {
ok, err := kd.serviceWithClusterIPHasEndpoints(&val)
if err != nil {
glog.V(2).Infof("federation service query: unexpected error while trying to find if service has endpoint: %v", err)
glog.V(2).Infof(
"federation service query: unexpected error while trying to find if service has endpoint: %v", err)
continue
}
if !ok {
@ -516,35 +514,36 @@ func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, er
validRecord = true
break
}
if validRecord {
// There is a local service with valid endpoints, return its CNAME.
name := strings.Join(reverseArray(path), ".")
name := strings.Join(util.ReverseArray(path), ".")
// Ensure that this name that we are returning as a CNAME response is a fully qualified
// domain name so that the client's resolver library doesn't have to go through its
// search list all over again.
if !strings.HasSuffix(name, ".") {
name = name + "."
}
glog.Infof("federation service query: Returning CNAME for local service : %s", name)
glog.V(2).Infof("federation service query: Returning CNAME for local service : %s", name)
return []skymsg.Service{{Host: name}}, nil
}
// If the name query is not an exact query and does not match any records in the local store,
// attempt to send a federation redirect (CNAME) response.
if !exact {
glog.V(2).Infof("federation service query: Did not find a local service. Trying federation redirect (CNAME) response")
return kd.federationRecords(reverseArray(federationSegments))
glog.V(2).Infof(
"federation service query: Did not find a local service. Trying federation redirect (CNAME) response")
return kd.federationRecords(util.ReverseArray(federationSegments))
}
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Service, error) {
retval := []skymsg.Service{}
if kd.isPodRecord(path) {
ip, err := kd.getPodIP(path)
if err == nil {
skyMsg, _ := getSkyMsg(ip, 0)
skyMsg, _ := util.GetSkyMsg(ip, 0)
return []skymsg.Service{*skyMsg}, nil
}
return nil, err
@ -569,10 +568,14 @@ func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Servic
defer kd.cacheLock.RUnlock()
records := kd.cache.getValuesForPathWithWildcards(path...)
glog.V(2).Infof("Received %d records for %v from cache", len(records), path)
retval := []skymsg.Service{}
for _, val := range records {
retval = append(retval, *val)
}
glog.V(2).Infof("records:%v, retval:%v, path:%v", records, retval, path)
return retval, nil
}
@ -619,7 +622,7 @@ func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
glog.V(2).Infof("Received ReverseRecord Request:%s", name)
// if portalIP is not a valid IP, the reverseRecordMap lookup will fail
portalIP, ok := extractIP(name)
portalIP, ok := util.ExtractIP(name)
if !ok {
return nil, fmt.Errorf("does not support reverse lookup for %s", name)
}
@ -633,19 +636,6 @@ func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
return nil, fmt.Errorf("must be exactly one service record")
}
// extractIP turns a standard PTR reverse record lookup name
// into an IP address
func extractIP(reverseName string) (string, bool) {
if !strings.HasSuffix(reverseName, arpaSuffix) {
return "", false
}
search := strings.TrimSuffix(reverseName, arpaSuffix)
// reverse the segments and then combine them
segments := reverseArray(strings.Split(search, "."))
return strings.Join(segments, "."), true
}
// e.g {"local", "cluster", "pod", "default", "10-0-0-1"}
func (kd *KubeDNS) isPodRecord(path []string) bool {
if len(path) != len(kd.domainPath)+3 {
@ -671,32 +661,6 @@ func (kd *KubeDNS) getPodIP(path []string) (string, error) {
return "", fmt.Errorf("Invalid IP Address %v", ip)
}
func hashServiceRecord(msg *skymsg.Service) string {
s := fmt.Sprintf("%v", msg)
h := fnv.New32a()
h.Write([]byte(s))
return fmt.Sprintf("%x", h.Sum32())
}
func newServiceRecord(ip string, port int) *skymsg.Service {
return &skymsg.Service{
Host: ip,
Port: port,
Priority: defaultPriority,
Weight: defaultWeight,
Ttl: defaultTTL,
}
}
// Returns record in a format that SkyDNS understands.
// Also return the hash of the record.
func getSkyMsg(ip string, port int) (*skymsg.Service, string) {
msg := newServiceRecord(ip, port)
hash := hashServiceRecord(msg)
glog.V(2).Infof("DNS Record:%s, hash:%s", fmt.Sprintf("%v", msg), hash)
return msg, fmt.Sprintf("%x", hash)
}
// isFederationQuery checks if the given query `path` matches the federated service query pattern.
// The conjunction of the following conditions forms the test for the federated service query
// pattern:
@ -752,7 +716,7 @@ func (kd *KubeDNS) federationRecords(queryPath []string) ([]skymsg.Service, erro
// `queryPath` is a reversed-array of the queried name, reverse it back to make it easy
// to follow through this code and reduce confusion. There is no reason for it to be
// reversed here.
path := reverseArray(queryPath)
path := util.ReverseArray(queryPath)
// Check if the name query matches the federation query pattern.
if !kd.isFederationQuery(path) {
@ -850,14 +814,7 @@ func (kd *KubeDNS) getClusterZoneAndRegion() (string, string, error) {
return zone, region, nil
}
func (kd *KubeDNS) getServiceFQDN(service *kapi.Service) string {
return strings.Join([]string{service.Name, service.Namespace, serviceSubdomain, kd.domain}, ".")
}
func reverseArray(arr []string) []string {
for i := 0; i < len(arr)/2; i++ {
j := len(arr) - i - 1
arr[i], arr[j] = arr[j], arr[i]
}
return arr
func getServiceFQDN(domain string, service *kapi.Service) string {
return strings.Join(
[]string{service.Name, service.Namespace, serviceSubdomain, domain}, ".")
}

View File

@ -35,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
fake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/dns/util"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -54,7 +55,7 @@ func newKubeDNS() *KubeDNS {
reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*kapi.Service),
cacheLock: sync.RWMutex{},
domainPath: reverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
domainPath: util.ReverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
}
return kd
@ -190,7 +191,9 @@ func TestSkySimpleSRVLookup(t *testing.T) {
targets := []string{}
for _, eip := range endpointIPs {
// A portal service is always created with a port of '0'
targets = append(targets, fmt.Sprintf("%v.%v", fmt.Sprintf("%x", hashServiceRecord(newServiceRecord(eip, 0))), name))
targets = append(targets,
fmt.Sprintf("%x.%v",
util.HashServiceRecord(util.NewServiceRecord(eip, 0)), name))
}
assertSRVRecordsMatchTarget(t, rec, targets...)
}
@ -255,7 +258,8 @@ func TestSkyNamedPortSRVLookup(t *testing.T) {
svcDomain := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".")
assertARecordsMatchIPs(t, extra, eip)
assertSRVRecordsMatchTarget(t, rec, fmt.Sprintf("%v.%v", fmt.Sprintf("%x", hashServiceRecord(newServiceRecord(eip, 0))), svcDomain))
assertSRVRecordsMatchTarget(
t, rec, fmt.Sprintf("%x.%v", util.HashServiceRecord(util.NewServiceRecord(eip, 0)), svcDomain))
assertSRVRecordsMatchPort(t, rec, 8081)
}
@ -667,7 +671,7 @@ func assertDNSForHeadlessService(t *testing.T, kd *KubeDNS, e *kapi.Endpoints) {
}
func assertDNSForExternalService(t *testing.T, kd *KubeDNS, s *kapi.Service) {
records, err := kd.Records(getServiceFQDN(kd, s), false)
records, err := kd.Records(getServiceFQDN(kd.domain, s), false)
require.NoError(t, err)
assert.Equal(t, 1, len(records))
assert.Equal(t, testExternalName, records[0].Host)
@ -700,13 +704,13 @@ func getIPForCName(t *testing.T, kd *KubeDNS, cname string) string {
}
func assertNoDNSForHeadlessService(t *testing.T, kd *KubeDNS, s *kapi.Service) {
records, err := kd.Records(getServiceFQDN(kd, s), false)
records, err := kd.Records(getServiceFQDN(kd.domain, s), false)
require.Error(t, err)
assert.Equal(t, 0, len(records))
}
func assertNoDNSForExternalService(t *testing.T, kd *KubeDNS, s *kapi.Service) {
records, err := kd.Records(getServiceFQDN(kd, s), false)
records, err := kd.Records(getServiceFQDN(kd.domain, s), false)
require.Error(t, err)
assert.Equal(t, 0, len(records))
}
@ -715,7 +719,7 @@ func assertSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portName
records, err := kd.Records(getSRVFQDN(kd, s, portName), false)
require.NoError(t, err)
assert.Equal(t, 1, len(records))
assert.Equal(t, getServiceFQDN(kd, s), records[0].Host)
assert.Equal(t, getServiceFQDN(kd.domain, s), records[0].Host)
}
func assertNoSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portName string) {
@ -725,7 +729,7 @@ func assertNoSRVForNamedPort(t *testing.T, kd *KubeDNS, s *kapi.Service, portNam
}
func assertNoDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
serviceFQDN := getServiceFQDN(kd, s)
serviceFQDN := getServiceFQDN(kd.domain, s)
queries := getEquivalentQueries(serviceFQDN, s.Namespace)
for _, query := range queries {
records, err := kd.Records(query, false)
@ -735,7 +739,7 @@ func assertNoDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
}
func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
serviceFQDN := getServiceFQDN(kd, s)
serviceFQDN := getServiceFQDN(kd.domain, s)
queries := getEquivalentQueries(serviceFQDN, s.Namespace)
for _, query := range queries {
records, err := kd.Records(query, false)
@ -746,16 +750,16 @@ func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *kapi.Service) {
}
func assertReverseRecord(t *testing.T, kd *KubeDNS, s *kapi.Service) {
segments := reverseArray(strings.Split(s.Spec.ClusterIP, "."))
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), arpaSuffix)
segments := util.ReverseArray(strings.Split(s.Spec.ClusterIP, "."))
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), util.ArpaSuffix)
reverseRecord, err := kd.ReverseRecord(reverseLookup)
require.NoError(t, err)
assert.Equal(t, kd.getServiceFQDN(s), reverseRecord.Host)
assert.Equal(t, getServiceFQDN(kd.domain, s), reverseRecord.Host)
}
func assertNoReverseRecord(t *testing.T, kd *KubeDNS, s *kapi.Service) {
segments := reverseArray(strings.Split(s.Spec.ClusterIP, "."))
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), arpaSuffix)
segments := util.ReverseArray(strings.Split(s.Spec.ClusterIP, "."))
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), util.ArpaSuffix)
reverseRecord, err := kd.ReverseRecord(reverseLookup)
require.Error(t, err)
require.Nil(t, reverseRecord)
@ -775,10 +779,6 @@ func getFederationServiceFQDN(kd *KubeDNS, s *kapi.Service, federationName strin
return fmt.Sprintf("%s.%s.%s.svc.%s", s.Name, s.Namespace, federationName, kd.domain)
}
func getServiceFQDN(kd *KubeDNS, s *kapi.Service) string {
return fmt.Sprintf("%s.%s.svc.%s", s.Name, s.Namespace, kd.domain)
}
func getEndpointsFQDN(kd *KubeDNS, e *kapi.Endpoints) string {
return fmt.Sprintf("%s.%s.svc.%s", e.Name, e.Namespace, kd.domain)
}

88
pkg/dns/util/util.go Normal file
View File

@ -0,0 +1,88 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"hash/fnv"
"strings"
"github.com/golang/glog"
"github.com/skynetservices/skydns/msg"
)
const (
// ArpaSuffix is the standard suffix for PTR IP reverse lookups.
ArpaSuffix = ".in-addr.arpa."
// defaultPriority used for service records
defaultPriority = 10
// defaultWeight used for service records
defaultWeight = 10
// defaultTTL used for service records
defaultTTL = 30
)
// extractIP turns a standard PTR reverse record lookup name
// into an IP address
func ExtractIP(reverseName string) (string, bool) {
if !strings.HasSuffix(reverseName, ArpaSuffix) {
return "", false
}
search := strings.TrimSuffix(reverseName, ArpaSuffix)
// reverse the segments and then combine them
segments := ReverseArray(strings.Split(search, "."))
return strings.Join(segments, "."), true
}
// ReverseArray reverses an array.
func ReverseArray(arr []string) []string {
for i := 0; i < len(arr)/2; i++ {
j := len(arr) - i - 1
arr[i], arr[j] = arr[j], arr[i]
}
return arr
}
// Returns record in a format that SkyDNS understands.
// Also return the hash of the record.
func GetSkyMsg(ip string, port int) (*msg.Service, string) {
msg := NewServiceRecord(ip, port)
hash := HashServiceRecord(msg)
glog.V(2).Infof("DNS Record:%s, hash:%s", fmt.Sprintf("%v", msg), hash)
return msg, fmt.Sprintf("%x", hash)
}
// NewServiceRecord creates a new service DNS message.
func NewServiceRecord(ip string, port int) *msg.Service {
return &msg.Service{
Host: ip,
Port: port,
Priority: defaultPriority,
Weight: defaultWeight,
Ttl: defaultTTL,
}
}
// HashServiceRecord hashes the string representation of a DNS
// message.
func HashServiceRecord(msg *msg.Service) string {
s := fmt.Sprintf("%v", msg)
h := fnv.New32a()
h.Write([]byte(s))
return fmt.Sprintf("%x", h.Sum32())
}