diff --git a/cmd/kube-dns/app/options/options.go b/cmd/kube-dns/app/options/options.go index 8991ae2e6f0..7a56138e796 100644 --- a/cmd/kube-dns/app/options/options.go +++ b/cmd/kube-dns/app/options/options.go @@ -33,6 +33,8 @@ type KubeDNSConfig struct { KubeConfigFile string KubeMasterURL string HealthzPort int + // Federations maps federation names to their registered domain names. + Federations map[string]string } func NewKubeDNSConfig() *KubeDNSConfig { @@ -41,6 +43,7 @@ func NewKubeDNSConfig() *KubeDNSConfig { KubeConfigFile: "", KubeMasterURL: "", HealthzPort: 8081, + Federations: make(map[string]string), } } @@ -95,9 +98,47 @@ func (m kubeMasterURLVar) Type() string { return "string" } +type federationsVar struct { + nameDomainMap map[string]string +} + +// Set deserializes the input string in the format +// "myfederation1=example.com,myfederation2=second.example.com,myfederation3=example.com" +// into a map of key-value pairs of federation names to domain names. +func (fv federationsVar) Set(keyVal string) error { + for _, val := range strings.Split(keyVal, ",") { + splits := strings.SplitN(strings.TrimSpace(val), "=", 2) + name := strings.TrimSpace(splits[0]) + domain := strings.TrimSpace(splits[1]) + if len(validation.IsDNS1123Label(name)) != 0 { + return fmt.Errorf("%s not a valid federation name", name) + } + // The federation domain name need not strictly be domain names, we + // accept valid dns names with subdomain components. + if len(validation.IsDNS1123Subdomain(domain)) != 0 { + return fmt.Errorf("%s not a valid federation name", name) + } + fv.nameDomainMap[name] = domain + } + return nil +} + +func (fv federationsVar) String() string { + var splits []string + for name, domain := range fv.nameDomainMap { + splits = append(splits, fmt.Sprintf("%s=%s", name, domain)) + } + return strings.Join(splits, ",") +} + +func (fv federationsVar) Type() string { + return "[]string" +} + func (s *KubeDNSConfig) AddFlags(fs *pflag.FlagSet) { fs.Var(clusterDomainVar{&s.ClusterDomain}, "domain", "domain under which to create names") fs.StringVar(&s.KubeConfigFile, "kubecfg-file", s.KubeConfigFile, "Location of kubecfg file for access to kubernetes master service; --kube-master-url overrides the URL part of this; if neither this nor --kube-master-url are provided, defaults to service account tokens") fs.Var(kubeMasterURLVar{&s.KubeMasterURL}, "kube-master-url", "URL to reach kubernetes master. Env variables in this flag will be expanded.") fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort, "port on which to serve a kube-dns HTTP readiness probe.") + fs.Var(federationsVar{s.Federations}, "federations", "a comma separated list of the federation names and their corresponding domain names to which this cluster belongs. Example: \"myfederation1=example.com,myfederation2=example2.com,myfederation3=example.com\"") } diff --git a/cmd/kube-dns/app/server.go b/cmd/kube-dns/app/server.go index 07637469d2f..60dddba5889 100644 --- a/cmd/kube-dns/app/server.go +++ b/cmd/kube-dns/app/server.go @@ -51,7 +51,7 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer { glog.Fatalf("Failed to create a kubernetes client: %v", err) } ks.healthzPort = config.HealthzPort - ks.kd = kdns.NewKubeDNS(kubeClient, config.ClusterDomain) + ks.kd = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations) return &ks } diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index d991c510f55..048ed4273b0 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -30,6 +30,7 @@ import ( skymsg "github.com/skynetservices/skydns/msg" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/endpoints" + kunversioned "k8s.io/kubernetes/pkg/api/unversioned" kcache "k8s.io/kubernetes/pkg/client/cache" kclient "k8s.io/kubernetes/pkg/client/unversioned" kframework "k8s.io/kubernetes/pkg/controller/framework" @@ -49,6 +50,13 @@ const ( // Resync period for the kube controller loop. resyncPeriod = 5 * time.Minute + + // Duration for which the TTL cache should hold the node resource to retrieve the zone + // annotation from it so that it could be added to federation CNAMEs. There is ideally + // no need to expire this cache, but we don't want to assume that node annotations + // never change. So we expire the cache and retrieve a node once every 180 seconds. + // The value is chosen to be neither too long nor too short. + nodeCacheTTL = 180 * time.Second ) type KubeDNS struct { @@ -83,15 +91,25 @@ type KubeDNS struct { // serviceController invokes registered callbacks when services change. serviceController *kframework.Controller + + // Map of federation names that the cluster in which this kube-dns is running belongs to, to + // the corresponding domain names. + federations map[string]string + + // A TTL cache that contains some subset of nodes in the system so that we can retrieve the + // cluster zone annotation from the cached node instead of getting it from the API server + // every time. + nodesStore kcache.Store } -func NewKubeDNS(client *kclient.Client, domain string) *KubeDNS { +func NewKubeDNS(client *kclient.Client, domain string, federations map[string]string) *KubeDNS { kd := &KubeDNS{ - kubeClient: client, - domain: domain, - cache: NewTreeCache(), - cacheLock: sync.RWMutex{}, - domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), + kubeClient: client, + domain: domain, + cache: NewTreeCache(), + cacheLock: sync.RWMutex{}, + domainPath: reverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), + federations: federations, } kd.setEndpointsStore() kd.setServicesStore() @@ -383,10 +401,17 @@ func (kd *KubeDNS) Records(name string, exact bool) ([]skymsg.Service, error) { retval = append(retval, *(val.(*skymsg.Service))) } glog.Infof("records:%v, retval:%v, path:%v", records, retval, path) - if len(retval) == 0 { - return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} + if len(retval) > 0 { + return retval, nil } - return retval, nil + + // If the name query is not an exact query and does not match any records in the local store, + // attempt to send a federation redirect (CNAME) response. + if !exact { + return kd.federationRecords(path) + } + + return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} } func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) { @@ -446,6 +471,112 @@ func getSkyMsg(ip string, port int) (*skymsg.Service, string) { return msg, fmt.Sprintf("%x", hash) } +// isFederationQuery checks if the given query `path` matches the federated service query pattern. +// The conjunction of the following conditions forms the test for the federated service query +// pattern: +// 1. `path` has exactly 4+len(domainPath) segments: mysvc.myns.myfederation.svc.domain.path. +// 2. Service name component must be a valid RFC 952 name. +// 3. Namespace component must be a valid RFC 1123 name. +// 4. Federation component must also be a valid RFC 1123 name. +// 5. Fourth segment is exactly "svc" +// 6. The remaining segments match kd.domainPath. +// 7. And federation must be one of the listed federations in the config. +func (kd *KubeDNS) isFederationQuery(path []string) bool { + if len(path) == 4+len(kd.domainPath) && + len(validation.IsDNS952Label(path[0])) == 0 && + len(validation.IsDNS1123Label(path[1])) == 0 && + len(validation.IsDNS1123Label(path[2])) == 0 && + path[3] == serviceSubdomain { + for i, domComp := range kd.domainPath { + // kd.domainPath is reversed, so we need to look in the `path` in the reverse order. + if domComp != path[len(path)-i-1] { + return false + } + } + if _, ok := kd.federations[path[2]]; ok { + return true + } + } + return false +} + +// federationRecords checks if the given `queryPath` is for a federated service and if it is, +// it returns a CNAME response containing the cluster zone name and federation domain name +// suffix. +func (kd *KubeDNS) federationRecords(queryPath []string) ([]skymsg.Service, error) { + // `queryPath` is a reversed-array of the queried name, reverse it back to make it easy + // to follow through this code and reduce confusion. There is no reason for it to be + // reversed here. + path := reverseArray(queryPath) + + // Check if the name query matches the federation query pattern. + if !kd.isFederationQuery(path) { + return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound} + } + + // Now that we have already established that the query is a federation query, remove the local + // domain path components, i.e. kd.domainPath, from the query. + path = path[:len(path)-len(kd.domainPath)] + + // Append the zone name (zone in the cloud provider terminology, not a DNS zone) + + // We have already established that the map entry exists for the given federation, + // we just need to retrieve the domain name, validate it and append it to the path. + domain := kd.federations[path[2]] + // We accept valid subdomains as well, so just let all the valid subdomains. + if len(validation.IsDNS1123Subdomain(domain)) != 0 { + return nil, fmt.Errorf("%s is not a valid domain name for federation %s", domain, path[2]) + } + name := strings.Join(append(path, domain), ".") + + // Ensure that this name that we are returning as a CNAME response is a fully qualified + // domain name so that the client's resolver library doesn't have to go through its + // search list all over again. + if !strings.HasSuffix(name, ".") { + name = name + "." + } + return []skymsg.Service{{Host: name}}, nil +} + +// getClusterZone returns the name of the zone the cluster is running in. It arbitrarily selects +// a node and reads the failure domain annotation on the node. An alternative is to obtain this +// pod's (i.e. kube-dns pod's) name using the downward API, get the pod, get the node the pod is +// bound to and retrieve that node's annotations. But even just by reading those steps, it looks +// complex and it is not entirely clear what that complexity is going to buy us. So taking a +// simpler approach here. +// Also note that zone here means the zone in cloud provider terminology, not the DNS zone. +func (kd *KubeDNS) getClusterZone() (string, error) { + var node *kapi.Node + + objs := kd.nodesStore.List() + if len(objs) > 0 { + var ok bool + if node, ok = objs[0].(*kapi.Node); !ok { + return "", fmt.Errorf("expected node object, got: %T", objs[0]) + } + } else { + // An alternative to listing nodes each time is to set a watch, but that is totally + // wasteful in case of non-federated independent Kubernetes clusters. So carefully + // proceeding here. + // TODO(madhusudancs): Move this to external/v1 API. + nodeList, err := kd.kubeClient.Nodes().List(kapi.ListOptions{}) + if err != nil || len(nodeList.Items) == 0 { + return "", fmt.Errorf("failed to retrieve the cluster nodes: %v", err) + } + + node = &nodeList.Items[0] + if err := kd.nodesStore.Add(node); err != nil { + return "", fmt.Errorf("couldn't add the retrieved node to the cache: %v", err) + } + } + + zone, ok := node.Annotations[kunversioned.LabelZoneFailureDomain] + if !ok || zone == "" { + return "", fmt.Errorf("unknown cluster zone") + } + return zone, nil +} + func reverseArray(arr []string) []string { for i := 0; i < len(arr)/2; i++ { j := len(arr) - i - 1