Merge pull request #26393 from bprashanth/dns

Automatic merge from submit-queue

Fix srv record lookup

Took longer than expected for unrelated reasons. This is intentionally a pretty dumb fix.
Fixes https://github.com/kubernetes/kubernetes/issues/26116
This commit is contained in:
k8s-merge-robot 2016-06-04 01:00:15 -07:00
commit 48d87a0c57
10 changed files with 272 additions and 44 deletions

View File

@ -1,2 +1,5 @@
## Version 1.2 (Fri May 27 2016 Tim Hockin <thockin@google.com>) ## Version 1.2 (Fri May 27 2016 Tim Hockin <thockin@google.com>)
- First Changelog entry - First Changelog entry
## Version 1.3 (Fri June 3 2016 Prashanth.B <beeps@google.com>)
- Fixed SRV record lookup (issue #26116)

View File

@ -22,7 +22,7 @@
# Default registry, arch and tag. This can be overwritten by arguments to make # Default registry, arch and tag. This can be overwritten by arguments to make
PLATFORM?=linux PLATFORM?=linux
ARCH?=amd64 ARCH?=amd64
TAG?=1.2 TAG?=1.3
REGISTRY?=gcr.io/google_containers REGISTRY?=gcr.io/google_containers
GOLANG_VERSION=1.6 GOLANG_VERSION=1.6

View File

@ -21,27 +21,27 @@
apiVersion: v1 apiVersion: v1
kind: ReplicationController kind: ReplicationController
metadata: metadata:
name: kube-dns-v13 name: kube-dns-v14
namespace: kube-system namespace: kube-system
labels: labels:
k8s-app: kube-dns k8s-app: kube-dns
version: v13 version: v14
kubernetes.io/cluster-service: "true" kubernetes.io/cluster-service: "true"
spec: spec:
replicas: __PILLAR__DNS__REPLICAS__ replicas: __PILLAR__DNS__REPLICAS__
selector: selector:
k8s-app: kube-dns k8s-app: kube-dns
version: v13 version: v14
template: template:
metadata: metadata:
labels: labels:
k8s-app: kube-dns k8s-app: kube-dns
version: v13 version: v14
kubernetes.io/cluster-service: "true" kubernetes.io/cluster-service: "true"
spec: spec:
containers: containers:
- name: kubedns - name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.2 image: gcr.io/google_containers/kubedns-amd64:1.3
resources: resources:
# TODO: Set memory limits when we've profiled the container for large # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in # clusters, then set request = limit to keep this container in

View File

@ -21,27 +21,27 @@
apiVersion: v1 apiVersion: v1
kind: ReplicationController kind: ReplicationController
metadata: metadata:
name: kube-dns-v13 name: kube-dns-v14
namespace: kube-system namespace: kube-system
labels: labels:
k8s-app: kube-dns k8s-app: kube-dns
version: v13 version: v14
kubernetes.io/cluster-service: "true" kubernetes.io/cluster-service: "true"
spec: spec:
replicas: {{ pillar['dns_replicas'] }} replicas: {{ pillar['dns_replicas'] }}
selector: selector:
k8s-app: kube-dns k8s-app: kube-dns
version: v13 version: v14
template: template:
metadata: metadata:
labels: labels:
k8s-app: kube-dns k8s-app: kube-dns
version: v13 version: v14
kubernetes.io/cluster-service: "true" kubernetes.io/cluster-service: "true"
spec: spec:
containers: containers:
- name: kubedns - name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.2 image: gcr.io/google_containers/kubedns-amd64:1.3
resources: resources:
# TODO: Set memory limits when we've profiled the container for large # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in # clusters, then set request = limit to keep this container in

View File

@ -21,27 +21,27 @@
apiVersion: v1 apiVersion: v1
kind: ReplicationController kind: ReplicationController
metadata: metadata:
name: kube-dns-v13 name: kube-dns-v14
namespace: kube-system namespace: kube-system
labels: labels:
k8s-app: kube-dns k8s-app: kube-dns
version: v13 version: v14
kubernetes.io/cluster-service: "true" kubernetes.io/cluster-service: "true"
spec: spec:
replicas: $DNS_REPLICAS replicas: $DNS_REPLICAS
selector: selector:
k8s-app: kube-dns k8s-app: kube-dns
version: v13 version: v14
template: template:
metadata: metadata:
labels: labels:
k8s-app: kube-dns k8s-app: kube-dns
version: v13 version: v14
kubernetes.io/cluster-service: "true" kubernetes.io/cluster-service: "true"
spec: spec:
containers: containers:
- name: kubedns - name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.2 image: gcr.io/google_containers/kubedns-amd64:1.3
resources: resources:
# TODO: Set memory limits when we've profiled the container for large # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in # clusters, then set request = limit to keep this container in

View File

@ -469,4 +469,3 @@ watch-only
whitelist-override-label whitelist-override-label
windows-line-endings windows-line-endings
www-prefix www-prefix

View File

@ -26,7 +26,7 @@ import (
"time" "time"
etcd "github.com/coreos/etcd/client" etcd "github.com/coreos/etcd/client"
"github.com/golang/glog" "github.com/miekg/dns"
skymsg "github.com/skynetservices/skydns/msg" skymsg "github.com/skynetservices/skydns/msg"
kapi "k8s.io/kubernetes/pkg/api" kapi "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/endpoints" "k8s.io/kubernetes/pkg/api/endpoints"
@ -38,6 +38,8 @@ import (
"k8s.io/kubernetes/pkg/util/validation" "k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
) )
const ( const (
@ -61,6 +63,15 @@ const (
// never change. So we expire the cache and retrieve a node once every 180 seconds. // never change. So we expire the cache and retrieve a node once every 180 seconds.
// The value is chosen to be neither too long nor too short. // The value is chosen to be neither too long nor too short.
nodeCacheTTL = 180 * time.Second nodeCacheTTL = 180 * time.Second
// default priority used for service records
defaultPriority = 10
// default weight used for service records
defaultWeight = 10
// default TTL used for service records
defaultTTL = 30
) )
type KubeDNS struct { type KubeDNS struct {
@ -213,6 +224,7 @@ func assertIsService(obj interface{}) (*kapi.Service, bool) {
func (kd *KubeDNS) newService(obj interface{}) { func (kd *KubeDNS) newService(obj interface{}) {
if service, ok := assertIsService(obj); ok { if service, ok := assertIsService(obj); ok {
glog.V(4).Infof("Add/Updated for service %v", service.Name)
// if ClusterIP is not set, a DNS entry should not be created // if ClusterIP is not set, a DNS entry should not be created
if !kapi.IsServiceIPSet(service) { if !kapi.IsServiceIPSet(service) {
kd.newHeadlessService(service) kd.newHeadlessService(service)
@ -276,17 +288,26 @@ func (kd *KubeDNS) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, er
return nil, fmt.Errorf("got a non service object in services store %v", obj) return nil, fmt.Errorf("got a non service object in services store %v", obj)
} }
// fqdn constructs the fqdn for the given service. subpaths is a list of path
// elements rooted at the given service, ending at a service record.
func (kd *KubeDNS) fqdn(service *kapi.Service, subpaths ...string) string {
domainLabels := append(append(kd.domainPath, serviceSubdomain, service.Namespace, service.Name), subpaths...)
return dns.Fqdn(strings.Join(reverseArray(domainLabels), "."))
}
func (kd *KubeDNS) newPortalService(service *kapi.Service) { func (kd *KubeDNS) newPortalService(service *kapi.Service) {
subCache := NewTreeCache() subCache := NewTreeCache()
recordValue, recordLabel := getSkyMsg(service.Spec.ClusterIP, 0) recordValue, recordLabel := getSkyMsg(service.Spec.ClusterIP, 0)
subCache.setEntry(recordLabel, recordValue) subCache.setEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel))
// Generate SRV Records // Generate SRV Records
for i := range service.Spec.Ports { for i := range service.Spec.Ports {
port := &service.Spec.Ports[i] port := &service.Spec.Ports[i]
if port.Name != "" && port.Protocol != "" { if port.Name != "" && port.Protocol != "" {
srvValue := kd.generateSRVRecordValue(service, int(port.Port)) srvValue := kd.generateSRVRecordValue(service, int(port.Port))
subCache.setEntry(recordLabel, srvValue, "_"+strings.ToLower(string(port.Protocol)), "_"+port.Name)
l := []string{"_" + strings.ToLower(string(port.Protocol)), "_" + port.Name}
subCache.setEntry(recordLabel, srvValue, kd.fqdn(service, append(l, recordLabel)...), l...)
} }
} }
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace) subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
@ -315,12 +336,14 @@ func (kd *KubeDNS) generateRecordsForHeadlessService(e *kapi.Endpoints, svc *kap
if hostLabel, exists := getHostname(address, podHostnames); exists { if hostLabel, exists := getHostname(address, podHostnames); exists {
endpointName = hostLabel endpointName = hostLabel
} }
subCache.setEntry(endpointName, recordValue) subCache.setEntry(endpointName, recordValue, kd.fqdn(svc, endpointName))
for portIdx := range e.Subsets[idx].Ports { for portIdx := range e.Subsets[idx].Ports {
endpointPort := &e.Subsets[idx].Ports[portIdx] endpointPort := &e.Subsets[idx].Ports[portIdx]
if endpointPort.Name != "" && endpointPort.Protocol != "" { if endpointPort.Name != "" && endpointPort.Protocol != "" {
srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName) srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName)
subCache.setEntry(endpointName, srvValue, "_"+strings.ToLower(string(endpointPort.Protocol)), "_"+endpointPort.Name)
l := []string{"_" + strings.ToLower(string(endpointPort.Protocol)), "_" + endpointPort.Name}
subCache.setEntry(endpointName, srvValue, kd.fqdn(svc, append(l, endpointName)...), l...)
} }
} }
} }
@ -390,7 +413,11 @@ func (kd *KubeDNS) newHeadlessService(service *kapi.Service) error {
return nil return nil
} }
func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) { // Records responds with DNS records that match the given name, in a format
// understood by the skydns server. If "exact" is true, a single record
// matching the given name is returned, otherwise all records stored under
// the subtree matching the name are returned.
func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) {
glog.Infof("Received DNS Request:%s, exact:%v", name, exact) glog.Infof("Received DNS Request:%s, exact:%v", name, exact)
trimmed := strings.TrimRight(name, ".") trimmed := strings.TrimRight(name, ".")
segments := strings.Split(trimmed, ".") segments := strings.Split(trimmed, ".")
@ -420,9 +447,8 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) {
kd.cacheLock.RLock() kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock() defer kd.cacheLock.RUnlock()
records := kd.cache.getValuesForPathWithWildcards(path...) records := kd.cache.getValuesForPathWithWildcards(path...)
retval := []skymsg.Service{}
for _, val := range records { for _, val := range records {
retval = append(retval, *(val.(*skymsg.Service))) retval = append(retval, *val)
} }
glog.Infof("records:%v, retval:%v, path:%v", records, retval, path) glog.Infof("records:%v, retval:%v, path:%v", records, retval, path)
if len(retval) > 0 { if len(retval) > 0 {
@ -438,6 +464,7 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) {
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
} }
// ReverseRecords performs a reverse lookup for the given name.
func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) { func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
glog.Infof("Received ReverseRecord Request:%s", name) glog.Infof("Received ReverseRecord Request:%s", name)
@ -494,21 +521,29 @@ func (kd *KubeDNS) getPodIP(path []string) (string, error) {
return "", fmt.Errorf("Invalid IP Address %v", ip) return "", fmt.Errorf("Invalid IP Address %v", ip)
} }
// Returns record in a format that SkyDNS understands. func hashServiceRecord(msg *skymsg.Service) string {
// Also return the hash of the record.
func getSkyMsg(ip string, port int) (*skymsg.Service, string) {
msg := &skymsg.Service{
Host: ip,
Port: port,
Priority: 10,
Weight: 10,
Ttl: 30,
}
s := fmt.Sprintf("%v", msg) s := fmt.Sprintf("%v", msg)
h := fnv.New32a() h := fnv.New32a()
h.Write([]byte(s)) h.Write([]byte(s))
hash := fmt.Sprintf("%x", h.Sum32()) return fmt.Sprintf("%x", h.Sum32())
glog.Infof("DNS Record:%s, hash:%s", s, hash) }
func newServiceRecord(ip string, port int) *skymsg.Service {
return &skymsg.Service{
Host: ip,
Port: port,
Priority: defaultPriority,
Weight: defaultWeight,
Ttl: defaultTTL,
}
}
// Returns record in a format that SkyDNS understands.
// Also return the hash of the record.
func getSkyMsg(ip string, port int) (*skymsg.Service, string) {
msg := newServiceRecord(ip, port)
hash := hashServiceRecord(msg)
glog.Infof("DNS Record:%s, hash:%s", fmt.Sprintf("%v", msg), hash)
return msg, fmt.Sprintf("%x", hash) return msg, fmt.Sprintf("%x", hash)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package dns package dns
import ( import (
"encoding/json"
"fmt" "fmt"
"net" "net"
"strings" "strings"
@ -24,13 +25,17 @@ import (
"testing" "testing"
etcd "github.com/coreos/etcd/client" etcd "github.com/coreos/etcd/client"
"github.com/miekg/dns"
skymsg "github.com/skynetservices/skydns/msg" skymsg "github.com/skynetservices/skydns/msg"
skyServer "github.com/skynetservices/skydns/server"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
kapi "k8s.io/kubernetes/pkg/api" kapi "k8s.io/kubernetes/pkg/api"
endpointsapi "k8s.io/kubernetes/pkg/api/endpoints"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
fake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" fake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/util/sets"
) )
const ( const (
@ -106,7 +111,145 @@ func TestNamedSinglePortService(t *testing.T) {
assertNoSRVForNamedPort(t, kd, s, portName2) assertNoSRVForNamedPort(t, kd, s, portName2)
} }
func TestHeadlessService(t *testing.T) { func assertARecordsMatchIPs(t *testing.T, records []dns.RR, ips ...string) {
expectedEndpoints := sets.NewString(ips...)
gotEndpoints := sets.NewString()
for _, r := range records {
if a, ok := r.(*dns.A); !ok {
t.Errorf("Expected A record, got %+v", a)
} else {
gotEndpoints.Insert(a.A.String())
}
}
if !gotEndpoints.Equal(expectedEndpoints) {
t.Errorf("Expected %v got %v", expectedEndpoints, gotEndpoints)
}
}
func assertSRVRecordsMatchTarget(t *testing.T, records []dns.RR, targets ...string) {
expectedTargets := sets.NewString(targets...)
gotTargets := sets.NewString()
for _, r := range records {
if srv, ok := r.(*dns.SRV); !ok {
t.Errorf("Expected SRV record, got %+v", srv)
} else {
gotTargets.Insert(srv.Target)
}
}
if !gotTargets.Equal(expectedTargets) {
t.Errorf("Expected %v got %v", expectedTargets, gotTargets)
}
}
func assertSRVRecordsMatchPort(t *testing.T, records []dns.RR, port ...int) {
expectedPorts := sets.NewInt(port...)
gotPorts := sets.NewInt()
for _, r := range records {
if srv, ok := r.(*dns.SRV); !ok {
t.Errorf("Expected SRV record, got %+v", srv)
} else {
gotPorts.Insert(int(srv.Port))
t.Logf("got %+v", srv)
}
}
if !gotPorts.Equal(expectedPorts) {
t.Errorf("Expected %v got %v", expectedPorts, gotPorts)
}
}
func TestSkySimpleSRVLookup(t *testing.T) {
kd := newKubeDNS()
skydnsConfig := &skyServer.Config{Domain: testDomain, DnsAddr: "0.0.0.0:53"}
skyServer.SetDefaults(skydnsConfig)
s := skyServer.New(kd, skydnsConfig)
service := newHeadlessService()
endpointIPs := []string{"10.0.0.1", "10.0.0.2"}
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, endpointIPs...))
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(service)
name := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".")
question := dns.Question{Name: name, Qtype: dns.TypeSRV, Qclass: dns.ClassINET}
rec, extra, err := s.SRVRecords(question, name, 512, false)
if err != nil {
t.Fatalf("Failed srv record lookup on service with fqdn %v", name)
}
assertARecordsMatchIPs(t, extra, endpointIPs...)
targets := []string{}
for _, eip := range endpointIPs {
// A portal service is always created with a port of '0'
targets = append(targets, fmt.Sprintf("%v.%v", fmt.Sprintf("%x", hashServiceRecord(newServiceRecord(eip, 0))), name))
}
assertSRVRecordsMatchTarget(t, rec, targets...)
}
func TestSkyPodHostnameSRVLookup(t *testing.T) {
kd := newKubeDNS()
skydnsConfig := &skyServer.Config{Domain: testDomain, DnsAddr: "0.0.0.0:53"}
skyServer.SetDefaults(skydnsConfig)
s := skyServer.New(kd, skydnsConfig)
service := newHeadlessService()
endpointIPs := []string{"10.0.0.1", "10.0.0.2"}
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, endpointIPs...))
// The format of thes annotations is:
// endpoints.beta.kubernetes.io/hostnames-map: '{"ep-ip":{"HostName":"pod request hostname"}}'
epRecords := map[string]endpointsapi.HostRecord{}
for i, ep := range endpointIPs {
epRecords[ep] = endpointsapi.HostRecord{HostName: fmt.Sprintf("ep-%d", i)}
}
b, err := json.Marshal(epRecords)
if err != nil {
t.Fatalf("%v", err)
}
endpoints.Annotations = map[string]string{
endpointsapi.PodHostnamesAnnotation: string(b),
}
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(service)
name := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".")
question := dns.Question{Name: name, Qtype: dns.TypeSRV, Qclass: dns.ClassINET}
rec, _, err := s.SRVRecords(question, name, 512, false)
if err != nil {
t.Fatalf("Failed srv record lookup on service with fqdn %v", name)
}
targets := []string{}
for i := range endpointIPs {
targets = append(targets, fmt.Sprintf("%v.%v", fmt.Sprintf("ep-%d", i), name))
}
assertSRVRecordsMatchTarget(t, rec, targets...)
}
func TestSkyNamedPortSRVLookup(t *testing.T) {
kd := newKubeDNS()
skydnsConfig := &skyServer.Config{Domain: testDomain, DnsAddr: "0.0.0.0:53"}
skyServer.SetDefaults(skydnsConfig)
s := skyServer.New(kd, skydnsConfig)
service := newHeadlessService()
eip := "10.0.0.1"
endpoints := newEndpoints(service, newSubsetWithOnePort("http", 8081, eip))
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(service)
name := strings.Join([]string{"_http", "_tcp", testService, testNamespace, "svc", testDomain}, ".")
question := dns.Question{Name: name, Qtype: dns.TypeSRV, Qclass: dns.ClassINET}
rec, extra, err := s.SRVRecords(question, name, 512, false)
if err != nil {
t.Fatalf("Failed srv record lookup on service with fqdn %v", name)
}
svcDomain := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".")
assertARecordsMatchIPs(t, extra, eip)
assertSRVRecordsMatchTarget(t, rec, fmt.Sprintf("%v.%v", fmt.Sprintf("%x", hashServiceRecord(newServiceRecord(eip, 0))), svcDomain))
assertSRVRecordsMatchPort(t, rec, 8081)
}
func TestSimpleHeadlessService(t *testing.T) {
kd := newKubeDNS() kd := newKubeDNS()
s := newHeadlessService() s := newHeadlessService()
assert.NoError(t, kd.servicesStore.Add(s)) assert.NoError(t, kd.servicesStore.Add(s))

23
pkg/dns/doc.go Normal file
View File

@ -0,0 +1,23 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package DNS provides a backend for the skydns DNS server started by the
// kubedns cluster addon. It exposes the 2 interface method: Records and
// ReverseRecord, which skydns invokes according to the DNS queries it
// receives. It serves these records by consulting an in memory tree
// populated with Kubernetes Services and Endpoints received from the Kubernetes
// API server.
package dns

View File

@ -19,6 +19,7 @@ package dns
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
skymsg "github.com/skynetservices/skydns/msg"
"strings" "strings"
) )
@ -49,8 +50,30 @@ func (cache *TreeCache) Serialize() (string, error) {
return string(prettyJSON.Bytes()), nil return string(prettyJSON.Bytes()), nil
} }
func (cache *TreeCache) setEntry(key string, val interface{}, path ...string) { // setEntry creates the entire path if it doesn't already exist in the cache,
// then sets the given service record under the given key. The path this entry
// would have occupied in an etcd datastore is computed from the given fqdn and
// stored as the "Key" of the skydns service; this is only required because
// skydns expects the service record to contain a key in a specific format
// (presumably for legacy compatibility). Note that the fqnd string typically
// contains both the key and all elements in the path.
func (cache *TreeCache) setEntry(key string, val *skymsg.Service, fqdn string, path ...string) {
// TODO: Consolidate setEntry and setSubCache into a single method with a
// type switch.
// TODO: Insted of passing the fqdn as an argument, we can reconstruct
// it from the path, provided callers always pass the full path to the
// object. This is currently *not* the case, since callers first create
// a new, empty node, populate it, then parent it under the right path.
// So we don't know the full key till the final parenting operation.
node := cache.ensureChildNode(path...) node := cache.ensureChildNode(path...)
// This key is used to construct the "target" for SRV record lookups.
// For normal service/endpoint lookups, this will result in a key like:
// /skydns/local/cluster/svc/svcNS/svcName/record-hash
// but for headless services that govern pods requesting a specific
// hostname (as used by petset), this will end up being:
// /skydns/local/cluster/svc/svcNS/svcName/pod-hostname
val.Key = skymsg.Path(fqdn)
node.Entries[key] = val node.Entries[key] = val
} }
@ -65,6 +88,9 @@ func (cache *TreeCache) getSubCache(path ...string) *TreeCache {
return childCache return childCache
} }
// setSubCache inserts the given subtree under the given path:key. Usually the
// key is the name of a Kubernetes Service, and the path maps to the cluster
// subdomains matching the Service.
func (cache *TreeCache) setSubCache(key string, subCache *TreeCache, path ...string) { func (cache *TreeCache) setSubCache(key string, subCache *TreeCache, path ...string) {
node := cache.ensureChildNode(path...) node := cache.ensureChildNode(path...)
node.ChildNodes[key] = subCache node.ChildNodes[key] = subCache
@ -76,8 +102,8 @@ func (cache *TreeCache) getEntry(key string, path ...string) (interface{}, bool)
return val, ok return val, ok
} }
func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []interface{} { func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []*skymsg.Service {
retval := []interface{}{} retval := []*skymsg.Service{}
nodesToExplore := []*TreeCache{cache} nodesToExplore := []*TreeCache{cache}
for idx, subpath := range path { for idx, subpath := range path {
nextNodesToExplore := []*TreeCache{} nextNodesToExplore := []*TreeCache{}
@ -88,7 +114,7 @@ func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []interfac
nextNodesToExplore = append(nextNodesToExplore, node) nextNodesToExplore = append(nextNodesToExplore, node)
} else { } else {
if val, ok := node.Entries[subpath]; ok { if val, ok := node.Entries[subpath]; ok {
retval = append(retval, val) retval = append(retval, val.(*skymsg.Service))
} else { } else {
childNode := node.ChildNodes[subpath] childNode := node.ChildNodes[subpath]
if childNode != nil { if childNode != nil {
@ -122,10 +148,9 @@ func (cache *TreeCache) getValuesForPathWithWildcards(path ...string) []interfac
for _, node := range nodesToExplore { for _, node := range nodesToExplore {
for _, val := range node.Entries { for _, val := range node.Entries {
retval = append(retval, val) retval = append(retval, val.(*skymsg.Service))
} }
} }
return retval return retval
} }