Remove DNS code from the main repo (code is now in http://github.com/kubernetes/dns)

This commit is contained in:
Bowei Du 2017-01-04 13:23:06 -08:00
parent b5c0fd5837
commit 0992e2bfc9
33 changed files with 0 additions and 3684 deletions

View File

@ -1,26 +0,0 @@
### Version 1.9 (Fri November 18 2016 Bowei Du <bowei@google.com>)
- Add limited ConfigMap support (pr #36775)
### Version 1.8 (Thu September 29 2016 Zihong Zheng <zihongz@google.com>)
- Add support for graceful termination (issue #31807)
### Version 1.7 (Wed August 24 2016 Zihong Zheng <zihongz@google.com>)
- Add support for ExternalName services (pr #31159)
### Version 1.6 (Wed June 29 2016 Girish Kalele <gkalele@google.com>)
- Godeps update for vendor code (skydns/mux)
### Version 1.5 (Thu June 23 2016 Nikhil Jindal <nikhiljindal@google.com>)
- Adding support to return local service (pr #27708)
### Version 1.4 (Tue June 21 2016 Nikhil Jindal <nikhiljindal@google.com>)
- Initialising nodesStore (issue #27820)
### Version 1.3 (Fri June 3 2016 Prashanth.B <beeps@google.com>)
- Fixed SRV record lookup (issue #26116)
### Version 1.2 (Fri May 27 2016 Tim Hockin <thockin@google.com>)
- First Changelog entry
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/build/kube-dns/CHANGELOG.md?pixel)]()

View File

@ -1,17 +0,0 @@
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM BASEIMAGE
ADD kube-dns /
ENTRYPOINT ["/kube-dns"]

View File

@ -1,5 +0,0 @@
# Maintainers
Tim Hockin <thockin@google.com>
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/build/kube-dns/MAINTAINERS.md?pixel)]()

View File

@ -1,68 +0,0 @@
# Copyright 2016 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Makefile for the Docker image gcr.io/google_containers/kubedns-<ARCH>
# MAINTAINER: Tim Hockin <thockin@google.com>
# If you update this image please bump the tag value before pushing.
#
# Usage:
# [ARCH=amd64] [TAG=1.6] [REGISTRY=gcr.io/google_containers] [BASEIMAGE=busybox] make (container|push)
# Default registry, arch and tag. This can be overwritten by arguments to make
PLATFORM?=linux
ARCH?=amd64
TAG?=1.9
REGISTRY?=gcr.io/google_containers
GOLANG_VERSION=1.6
KUBE_ROOT=$(shell pwd)/../..
TEMP_DIR:=$(shell mktemp -d)
ifeq ($(ARCH),amd64)
BASEIMAGE?=busybox
endif
ifeq ($(ARCH),arm)
BASEIMAGE?=armel/busybox
endif
ifeq ($(ARCH),arm64)
BASEIMAGE?=aarch64/busybox
endif
ifeq ($(ARCH),ppc64le)
BASEIMAGE?=ppc64le/busybox
endif
ifeq ($(ARCH),s390x)
BASEIMAGE?=s390x/busybox
endif
all: container
container:
# Copy the content in this dir to the temp dir
cp $(KUBE_ROOT)/_output/dockerized/bin/$(PLATFORM)/$(ARCH)/kube-dns $(TEMP_DIR)
cp $(KUBE_ROOT)/build/kube-dns/Dockerfile $(TEMP_DIR)
# Replace BASEIMAGE with the real base image
cd $(TEMP_DIR) && sed -i "s|BASEIMAGE|$(BASEIMAGE)|g" Dockerfile
# And build the image
docker build -t $(REGISTRY)/kubedns-$(ARCH):$(TAG) $(TEMP_DIR)
# delete temp dir
rm -rf $(TEMP_DIR)
push: container
gcloud docker -- push $(REGISTRY)/kubedns-$(ARCH):$(TAG)
.PHONY: all container push

View File

@ -1,11 +0,0 @@
approvers:
- thockin
- boweidu
- mrhohn
reviewers:
- mikedanese
- nikhiljindal
- bprashanth
- luxas
- jessfraz
- david-mcmahon

View File

@ -1,22 +0,0 @@
# DNS in Kubernetes
Kubernetes offers a DNS cluster addon, which most of the supported environments
enable by default. The source code is in [cmd/kube-dns][kube-dns].
The [Kubernetes DNS Admin Guide][dns-admin] provides further details on this plugin.
[kube-dns]: https://github.com/kubernetes/kubernetes/tree/master/cmd/kube-dns
[dns-admin]: http://kubernetes.io/docs/admin/dns/
## Making Changes
The container containing the kube-dns binary needs to be built for every
architecture and pushed to the registry manually whenever the kube-dns binary
has code changes. Every significant change to the functionality should result
in a bump of the TAG in the Makefile.
Any significant changes to the YAML template for `kube-dns` should result a bump
of the version number for the `kube-dns` replication controller and well as the
`version` label. This will permit a rolling update of `kube-dns`.
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/build/kube-dns/README.md?pixel)]()

View File

@ -1,51 +0,0 @@
# Cutting a release
Until we have a proper setup for building this automatically with every binary
release, here are the steps for making a release. We make releases when they
are ready, not on every PR.
1. Build the container for testing:
```
make release
cd build/kube-dns
make container PREFIX=<your-docker-hub> TAG=rc
```
2. Manually deploy this to your own cluster by updating the replication
controller and deleting the running pod(s).
3. Verify it works.
4. Update the TAG version in `Makefile` and update the `Changelog`. Update the
`*.yaml.in` to point to the new tag. Send a PR but mark it as "DO NOT MERGE".
5. Once the PR is approved, build and push the container for real **for all architectures**:
```console
# Build for linux/amd64 (default)
$ make push ARCH=amd64
# ---> gcr.io/google_containers/kube-dns-amd64:TAG
$ make push ARCH=arm
# ---> gcr.io/google_containers/kube-dns-arm:TAG
$ make push ARCH=arm64
# ---> gcr.io/google_containers/kube-dns-arm64:TAG
$ make push ARCH=ppc64le
# ---> gcr.io/google_containers/kube-dns-ppc64le:TAG
$ make push ARCH=s390x
# ---> gcr.io/google_containers/kube-dns-s390x:TAG
```
6. Manually deploy this to your own cluster by updating the replication
controller and deleting the running pod(s).
7. Verify it works.
8. Allow the PR to be merged.
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/build/kube-dns/RELEASES.md?pixel)]()

View File

@ -1,49 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
)
go_binary(
name = "kube-dns",
library = ":go_default_library",
tags = ["automanaged"],
)
go_library(
name = "go_default_library",
srcs = ["dns.go"],
tags = ["automanaged"],
deps = [
"//cmd/kube-dns/app:go_default_library",
"//cmd/kube-dns/app/options:go_default_library",
"//pkg/client/metrics/prometheus:go_default_library",
"//pkg/util/flag:go_default_library",
"//pkg/util/logs:go_default_library",
"//pkg/version:go_default_library",
"//pkg/version/prometheus:go_default_library",
"//pkg/version/verflag:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/spf13/pflag",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//cmd/kube-dns/app:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -1,42 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["server.go"],
tags = ["automanaged"],
deps = [
"//cmd/kube-dns/app/options:go_default_library",
"//pkg/dns:go_default_library",
"//pkg/dns/config:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/skynetservices/skydns/metrics",
"//vendor:github.com/skynetservices/skydns/server",
"//vendor:github.com/spf13/pflag",
"//vendor:k8s.io/client-go/kubernetes",
"//vendor:k8s.io/client-go/rest",
"//vendor:k8s.io/client-go/tools/clientcmd",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//cmd/kube-dns/app/options:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -1,33 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["options.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/dns/federation:go_default_library",
"//pkg/util/validation:go_default_library",
"//vendor:github.com/spf13/pflag",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,168 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package options contains flags for initializing a proxy.
package options
import (
"fmt"
_ "net/http/pprof"
"net/url"
"os"
"strings"
"time"
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/api"
fed "k8s.io/kubernetes/pkg/dns/federation"
"k8s.io/kubernetes/pkg/util/validation"
)
type KubeDNSConfig struct {
ClusterDomain string
KubeConfigFile string
KubeMasterURL string
InitialSyncTimeout time.Duration
HealthzPort int
DNSBindAddress string
DNSPort int
Federations map[string]string
ConfigMapNs string
ConfigMap string
}
func NewKubeDNSConfig() *KubeDNSConfig {
return &KubeDNSConfig{
ClusterDomain: "cluster.local.",
HealthzPort: 8081,
DNSBindAddress: "0.0.0.0",
DNSPort: 53,
InitialSyncTimeout: 60 * time.Second,
Federations: make(map[string]string),
ConfigMapNs: api.NamespaceSystem,
ConfigMap: "", // default to using command line flags
}
}
type clusterDomainVar struct {
val *string
}
func (m clusterDomainVar) Set(v string) error {
v = strings.TrimSuffix(v, ".")
segments := strings.Split(v, ".")
for _, segment := range segments {
if errs := validation.IsDNS1123Label(segment); len(errs) > 0 {
return fmt.Errorf("Not a valid DNS label. %v", errs)
}
}
if !strings.HasSuffix(v, ".") {
v = fmt.Sprintf("%s.", v)
}
*m.val = v
return nil
}
func (m clusterDomainVar) String() string {
return *m.val
}
func (m clusterDomainVar) Type() string {
return "string"
}
type kubeMasterURLVar struct {
val *string
}
func (m kubeMasterURLVar) Set(v string) error {
parsedURL, err := url.Parse(os.ExpandEnv(v))
if err != nil {
return fmt.Errorf("failed to parse kube-master-url")
}
if parsedURL.Scheme == "" || parsedURL.Host == "" || parsedURL.Host == ":" {
return fmt.Errorf("invalid kube-master-url specified")
}
*m.val = v
return nil
}
func (m kubeMasterURLVar) String() string {
return *m.val
}
func (m kubeMasterURLVar) Type() string {
return "string"
}
type federationsVar struct {
nameDomainMap map[string]string
}
func (fv federationsVar) Set(keyVal string) error {
return fed.ParseFederationsFlag(keyVal, fv.nameDomainMap)
}
func (fv federationsVar) String() string {
var splits []string
for name, domain := range fv.nameDomainMap {
splits = append(splits, fmt.Sprintf("%s=%s", name, domain))
}
return strings.Join(splits, ",")
}
func (fv federationsVar) Type() string {
return "[]string"
}
func (s *KubeDNSConfig) AddFlags(fs *pflag.FlagSet) {
fs.Var(clusterDomainVar{&s.ClusterDomain}, "domain",
"domain under which to create names")
fs.StringVar(&s.KubeConfigFile, "kubecfg-file", s.KubeConfigFile,
"Location of kubecfg file for access to kubernetes master service;"+
" --kube-master-url overrides the URL part of this; if neither this nor"+
" --kube-master-url are provided, defaults to service account tokens")
fs.Var(kubeMasterURLVar{&s.KubeMasterURL}, "kube-master-url",
"URL to reach kubernetes master. Env variables in this flag will be expanded.")
fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort,
"port on which to serve a kube-dns HTTP readiness probe.")
fs.StringVar(&s.DNSBindAddress, "dns-bind-address", s.DNSBindAddress,
"address on which to serve DNS requests.")
fs.IntVar(&s.DNSPort, "dns-port", s.DNSPort, "port on which to serve DNS requests.")
fs.Var(federationsVar{s.Federations}, "federations",
"a comma separated list of the federation names and their corresponding"+
" domain names to which this cluster belongs. Example:"+
" \"myfederation1=example.com,myfederation2=example2.com,myfederation3=example.com\"."+
" It is an error to set both the federations and config-map flags.")
fs.MarkDeprecated("federations", "use config-map instead. Will be removed in future version")
fs.StringVar(&s.ConfigMapNs, "config-map-namespace", s.ConfigMapNs,
"namespace for the config-map")
fs.StringVar(&s.ConfigMap, "config-map", s.ConfigMap,
"config-map name. If empty, then the config-map will not used. Cannot be "+
" used in conjunction with federations flag. config-map contains "+
"dynamically adjustable configuration.")
fs.DurationVar(&s.InitialSyncTimeout, "initial-sync-timeout", s.InitialSyncTimeout,
"Timeout for initial resource sync.")
}

View File

@ -1,156 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app
import (
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/golang/glog"
"github.com/skynetservices/skydns/metrics"
"github.com/skynetservices/skydns/server"
"github.com/spf13/pflag"
"k8s.io/kubernetes/cmd/kube-dns/app/options"
"k8s.io/kubernetes/pkg/dns"
dnsconfig "k8s.io/kubernetes/pkg/dns/config"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
type KubeDNSServer struct {
// DNS domain name.
domain string
healthzPort int
dnsBindAddress string
dnsPort int
kd *dns.KubeDNS
}
func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
kubeClient, err := newKubeClient(config)
if err != nil {
glog.Fatalf("Failed to create a kubernetes client: %v", err)
}
var configSync dnsconfig.Sync
if config.ConfigMap == "" {
glog.V(0).Infof("ConfigMap not configured, using values from command line flags")
configSync = dnsconfig.NewNopSync(
&dnsconfig.Config{Federations: config.Federations})
} else {
glog.V(0).Infof("Using configuration read from ConfigMap: %v:%v",
config.ConfigMapNs, config.ConfigMap)
configSync = dnsconfig.NewSync(
kubeClient, config.ConfigMapNs, config.ConfigMap)
}
return &KubeDNSServer{
domain: config.ClusterDomain,
healthzPort: config.HealthzPort,
dnsBindAddress: config.DNSBindAddress,
dnsPort: config.DNSPort,
kd: dns.NewKubeDNS(kubeClient, config.ClusterDomain, config.InitialSyncTimeout, configSync),
}
}
func newKubeClient(dnsConfig *options.KubeDNSConfig) (kubernetes.Interface, error) {
var config *rest.Config
var err error
if dnsConfig.KubeConfigFile == "" {
config, err = rest.InClusterConfig()
if err != nil {
return nil, err
}
} else {
config, err = clientcmd.BuildConfigFromFlags(
dnsConfig.KubeMasterURL, dnsConfig.KubeConfigFile)
if err != nil {
return nil, err
}
}
return kubernetes.NewForConfig(config)
}
func (server *KubeDNSServer) Run() {
pflag.VisitAll(func(flag *pflag.Flag) {
glog.V(0).Infof("FLAG: --%s=%q", flag.Name, flag.Value)
})
setupSignalHandlers()
server.startSkyDNSServer()
server.kd.Start()
server.setupHandlers()
glog.V(0).Infof("Status HTTP port %v", server.healthzPort)
glog.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", server.healthzPort), nil))
}
// setupHealthzHandlers sets up a readiness and liveness endpoint for kube2sky.
func (server *KubeDNSServer) setupHandlers() {
glog.V(0).Infof("Setting up Healthz Handler (/readiness)")
http.HandleFunc("/readiness", func(w http.ResponseWriter, req *http.Request) {
fmt.Fprintf(w, "ok\n")
})
glog.V(0).Infof("Setting up cache handler (/cache)")
http.HandleFunc("/cache", func(w http.ResponseWriter, req *http.Request) {
serializedJSON, err := server.kd.GetCacheAsJSON()
if err == nil {
fmt.Fprint(w, serializedJSON)
} else {
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, err)
}
})
}
// setupSignalHandlers installs signal handler to ignore SIGINT and
// SIGTERM. This daemon will be killed by SIGKILL after the grace
// period to allow for some manner of graceful shutdown.
func setupSignalHandlers() {
sigChan := make(chan os.Signal)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
glog.V(0).Infof("Ignoring signal %v (can only be terminated by SIGKILL)", <-sigChan)
}()
}
func (d *KubeDNSServer) startSkyDNSServer() {
glog.V(0).Infof("Starting SkyDNS server (%v:%v)", d.dnsBindAddress, d.dnsPort)
skydnsConfig := &server.Config{
Domain: d.domain,
DnsAddr: fmt.Sprintf("%s:%d", d.dnsBindAddress, d.dnsPort),
}
server.SetDefaults(skydnsConfig)
s := server.New(d.kd, skydnsConfig)
if err := metrics.Metrics(); err != nil {
glog.Fatalf("Skydns metrics error: %s", err)
} else if metrics.Port != "" {
glog.V(0).Infof("Skydns metrics enabled (%v:%v)", metrics.Path, metrics.Port)
} else {
glog.V(0).Infof("Skydns metrics not enabled")
}
go s.Run()
}

View File

@ -1,48 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"github.com/golang/glog"
"github.com/spf13/pflag"
"k8s.io/kubernetes/cmd/kube-dns/app"
"k8s.io/kubernetes/cmd/kube-dns/app/options"
"k8s.io/kubernetes/pkg/util/flag"
"k8s.io/kubernetes/pkg/util/logs"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/version/verflag"
_ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration
_ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration
)
func main() {
config := options.NewKubeDNSConfig()
config.AddFlags(pflag.CommandLine)
flag.InitFlags()
logs.InitLogs()
defer logs.FlushLogs()
verflag.PrintAndExitIfRequested()
glog.V(0).Infof("version: %+v", version.Get())
server := app.NewKubeDNSServerDefault(config)
server.Run()
}

View File

@ -15,7 +15,6 @@ cmd/kube-apiserver/app/options
cmd/kube-controller-manager
cmd/kube-controller-manager/app/options
cmd/kube-discovery
cmd/kube-dns
cmd/kube-proxy
cmd/kubeadm
cmd/kubeadm

View File

@ -23,7 +23,6 @@ readonly KUBE_GOPATH="${KUBE_OUTPUT}/go"
# kube::build::source_targets in build/common.sh as well.
kube::golang::server_targets() {
local targets=(
cmd/kube-dns
cmd/kube-proxy
cmd/kube-apiserver
cmd/kube-controller-manager

View File

@ -1,77 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = [
"dns.go",
"doc.go",
],
tags = ["automanaged"],
deps = [
"//pkg/dns/config:go_default_library",
"//pkg/dns/treecache:go_default_library",
"//pkg/dns/util:go_default_library",
"//pkg/util/validation:go_default_library",
"//pkg/util/wait:go_default_library",
"//vendor:github.com/coreos/etcd/client",
"//vendor:github.com/golang/glog",
"//vendor:github.com/miekg/dns",
"//vendor:github.com/skynetservices/skydns/msg",
"//vendor:k8s.io/client-go/kubernetes",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/pkg/apis/meta/v1",
"//vendor:k8s.io/client-go/pkg/runtime",
"//vendor:k8s.io/client-go/pkg/watch",
"//vendor:k8s.io/client-go/tools/cache",
],
)
go_test(
name = "go_default_test",
srcs = ["dns_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/dns/config:go_default_library",
"//pkg/dns/treecache:go_default_library",
"//pkg/dns/util:go_default_library",
"//pkg/util/sets:go_default_library",
"//vendor:github.com/coreos/etcd/client",
"//vendor:github.com/miekg/dns",
"//vendor:github.com/skynetservices/skydns/msg",
"//vendor:github.com/skynetservices/skydns/server",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:github.com/stretchr/testify/require",
"//vendor:k8s.io/client-go/kubernetes/fake",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/pkg/apis/meta/v1",
"//vendor:k8s.io/client-go/tools/cache",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/dns/config:all-srcs",
"//pkg/dns/federation:all-srcs",
"//pkg/dns/treecache:all-srcs",
"//pkg/dns/util:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -1,53 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = [
"config.go",
"mocksync.go",
"nopsync.go",
"sync.go",
],
tags = ["automanaged"],
deps = [
"//pkg/dns/federation:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/client-go/kubernetes",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/pkg/apis/meta/v1",
"//vendor:k8s.io/client-go/pkg/fields",
"//vendor:k8s.io/client-go/pkg/runtime",
"//vendor:k8s.io/client-go/pkg/util/wait",
"//vendor:k8s.io/client-go/pkg/watch",
"//vendor:k8s.io/client-go/tools/cache",
],
)
go_test(
name = "go_default_test",
srcs = ["config_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//vendor:github.com/stretchr/testify/assert"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,65 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
types "k8s.io/client-go/pkg/apis/meta/v1"
fed "k8s.io/kubernetes/pkg/dns/federation"
)
// Config populated either from the configuration source (command
// line flags or via the config map mechanism).
type Config struct {
// The inclusion of TypeMeta is to ensure future compatibility if the
// Config object was populated directly via a Kubernetes API mechanism.
//
// For example, instead of the custom implementation here, the
// configuration could be obtained from an API that unifies
// command-line flags, config-map, etc mechanisms.
types.TypeMeta
// Map of federation names that the cluster in which this kube-dns
// is running belongs to, to the corresponding domain names.
Federations map[string]string `json:"federations"`
}
func NewDefaultConfig() *Config {
return &Config{
Federations: make(map[string]string),
}
}
// IsValid returns whether or not the configuration is valid.
func (config *Config) Validate() error {
if err := config.validateFederations(); err != nil {
return err
}
return nil
}
func (config *Config) validateFederations() error {
for name, domain := range config.Federations {
if err := fed.ValidateName(name); err != nil {
return err
}
if err := fed.ValidateDomain(domain); err != nil {
return err
}
}
return nil
}

View File

@ -1,56 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestValidate(t *testing.T) {
for _, testCase := range []struct {
config *Config
hasError bool
}{
{
config: &Config{Federations: map[string]string{}},
},
{
config: &Config{
Federations: map[string]string{
"abc": "d.e.f",
},
},
},
{
config: &Config{
Federations: map[string]string{
"a.b": "cdef",
},
},
hasError: true,
},
} {
err := testCase.config.Validate()
if !testCase.hasError {
assert.Nil(t, err, "should be valid", testCase)
} else {
assert.NotNil(t, err, "should not be valid", testCase)
}
}
}

View File

@ -1,46 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
// MockSync is a testing mock.
type MockSync struct {
// Config that will be returned from Once().
Config *Config
// Error that will be returned from Once().
Error error
// Chan to send new configurations on.
Chan chan *Config
}
var _ Sync = (*MockSync)(nil)
func NewMockSync(config *Config, error error) *MockSync {
return &MockSync{
Config: config,
Error: error,
Chan: make(chan *Config),
}
}
func (sync *MockSync) Once() (*Config, error) {
return sync.Config, sync.Error
}
func (sync *MockSync) Periodic() <-chan *Config {
return sync.Chan
}

View File

@ -1,37 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
// nopSync does no synchronization, used when the DNS server is
// started without a ConfigMap configured.
type nopSync struct {
config *Config
}
var _ Sync = (*nopSync)(nil)
func NewNopSync(config *Config) Sync {
return &nopSync{config: config}
}
func (sync *nopSync) Once() (*Config, error) {
return sync.config, nil
}
func (sync *nopSync) Periodic() <-chan *Config {
return make(chan *Config)
}

View File

@ -1,203 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/fields"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/util/wait"
"k8s.io/client-go/pkg/watch"
"k8s.io/client-go/tools/cache"
fed "k8s.io/kubernetes/pkg/dns/federation"
"time"
"github.com/golang/glog"
)
// Sync manages synchronization of the config map.
type Sync interface {
// Once does a blocking synchronization of the config map. If the
// ConfigMap fails to validate, this method will return nil, err.
Once() (*Config, error)
// Start a periodic synchronization of the configuration map. When a
// successful configuration map update is detected, the
// configuration will be sent to the channel.
//
// It is an error to call this more than once.
Periodic() <-chan *Config
}
// NewSync for ConfigMap from namespace `ns` and `name`.
func NewSync(client kubernetes.Interface, ns string, name string) Sync {
sync := &kubeSync{
ns: ns,
name: name,
client: client,
channel: make(chan *Config),
}
listWatch := &cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
options.FieldSelector = fields.Set{"metadata.name": name}.AsSelector().String()
return client.Core().ConfigMaps(ns).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
options.FieldSelector = fields.Set{"metadata.name": name}.AsSelector().String()
return client.Core().ConfigMaps(ns).Watch(options)
},
}
store, controller := cache.NewInformer(
listWatch,
&v1.ConfigMap{},
time.Duration(0),
cache.ResourceEventHandlerFuncs{
AddFunc: sync.onAdd,
DeleteFunc: sync.onDelete,
UpdateFunc: sync.onUpdate,
})
sync.store = store
sync.controller = controller
return sync
}
// kubeSync implements Sync for the Kubernetes API.
type kubeSync struct {
ns string
name string
client kubernetes.Interface
store cache.Store
controller *cache.Controller
channel chan *Config
latestVersion string
}
var _ Sync = (*kubeSync)(nil)
func (sync *kubeSync) Once() (*Config, error) {
cm, err := sync.client.Core().ConfigMaps(sync.ns).Get(sync.name, metav1.GetOptions{})
if err != nil {
glog.Errorf("Error getting ConfigMap %v:%v err: %v",
sync.ns, sync.name, err)
return nil, err
}
config, _, err := sync.processUpdate(cm)
return config, err
}
func (sync *kubeSync) Periodic() <-chan *Config {
go sync.controller.Run(wait.NeverStop)
return sync.channel
}
func (sync *kubeSync) toConfigMap(obj interface{}) *v1.ConfigMap {
cm, ok := obj.(*v1.ConfigMap)
if !ok {
glog.Fatalf("Expected ConfigMap, got %T", obj)
}
return cm
}
func (sync *kubeSync) onAdd(obj interface{}) {
cm := sync.toConfigMap(obj)
glog.V(2).Infof("ConfigMap %s:%s was created", sync.ns, sync.name)
config, updated, err := sync.processUpdate(cm)
if updated && err == nil {
sync.channel <- config
}
}
func (sync *kubeSync) onDelete(_ interface{}) {
glog.V(2).Infof("ConfigMap %s:%s was deleted, reverting to default configuration",
sync.ns, sync.name)
sync.latestVersion = ""
sync.channel <- NewDefaultConfig()
}
func (sync *kubeSync) onUpdate(_, obj interface{}) {
cm := sync.toConfigMap(obj)
glog.V(2).Infof("ConfigMap %s:%s was updated", sync.ns, sync.name)
config, changed, err := sync.processUpdate(cm)
if changed && err == nil {
sync.channel <- config
}
}
func (sync *kubeSync) processUpdate(cm *v1.ConfigMap) (config *Config, changed bool, err error) {
glog.V(4).Infof("processUpdate ConfigMap %+v", *cm)
if cm.ObjectMeta.ResourceVersion != sync.latestVersion {
glog.V(3).Infof("Updating config to version %v (was %v)",
cm.ObjectMeta.ResourceVersion, sync.latestVersion)
changed = true
sync.latestVersion = cm.ObjectMeta.ResourceVersion
} else {
glog.V(4).Infof("Config was unchanged (version %v)", sync.latestVersion)
return
}
config = &Config{}
if err = sync.updateFederations(cm, config); err != nil {
glog.Errorf("Invalid configuration, ignoring update")
return
}
if err = config.Validate(); err != nil {
glog.Errorf("Invalid onfiguration: %v (value was %+v), ignoring update",
err, config)
config = nil
return
}
return
}
func (sync *kubeSync) updateFederations(cm *v1.ConfigMap, config *Config) (err error) {
if flagValue, ok := cm.Data["federations"]; ok {
config.Federations = make(map[string]string)
if err = fed.ParseFederationsFlag(flagValue, config.Federations); err != nil {
glog.Errorf("Invalid federations value: %v (value was %q)",
err, cm.Data["federations"])
return
}
glog.V(2).Infof("Updated federations to %v", config.Federations)
} else {
glog.V(2).Infof("No federations present")
}
return
}

View File

@ -1,868 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dns
import (
"fmt"
"net"
"strings"
"sync"
"time"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/pkg/runtime"
"k8s.io/client-go/pkg/watch"
kcache "k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/dns/config"
"k8s.io/kubernetes/pkg/dns/treecache"
"k8s.io/kubernetes/pkg/dns/util"
"k8s.io/kubernetes/pkg/util/validation"
"k8s.io/kubernetes/pkg/util/wait"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
"github.com/miekg/dns"
skymsg "github.com/skynetservices/skydns/msg"
)
const (
// A subdomain added to the user specified domain for all services.
serviceSubdomain = "svc"
// A subdomain added to the user specified dmoain for all pods.
podSubdomain = "pod"
// Resync period for the kube controller loop.
resyncPeriod = 5 * time.Minute
// Duration for which the TTL cache should hold the node resource to retrieve the zone
// annotation from it so that it could be added to federation CNAMEs. There is ideally
// no need to expire this cache, but we don't want to assume that node annotations
// never change. So we expire the cache and retrieve a node once every 180 seconds.
// The value is chosen to be neither too long nor too short.
nodeCacheTTL = 180 * time.Second
)
type KubeDNS struct {
// kubeClient makes calls to API Server and registers calls with API Server
// to get Endpoints and Service objects.
kubeClient clientset.Interface
// domain for which this DNS Server is authoritative.
domain string
// configMap where kube-dns dynamic configuration is store. If this
// is empty then getting configuration from a configMap will be
// disabled.
configMap string
// endpointsStore that contains all the endpoints in the system.
endpointsStore kcache.Store
// servicesStore that contains all the services in the system.
servicesStore kcache.Store
// nodesStore contains some subset of nodes in the system so that we
// can retrieve the cluster zone annotation from the cached node
// instead of getting it from the API server every time.
nodesStore kcache.Store
// cache stores DNS records for the domain. A Records and SRV Records for
// (regular) services and headless Services. CNAME Records for
// ExternalName Services.
cache treecache.TreeCache
// TODO(nikhiljindal): Remove this. It can be recreated using
// clusterIPServiceMap.
reverseRecordMap map[string]*skymsg.Service
// clusterIPServiceMap to service object. Headless services are not
// part of this map. Used to get a service when given its cluster
// IP. Access to this is coordinated using cacheLock. We use the
// same lock for cache and this map to ensure that they don't get
// out of sync.
clusterIPServiceMap map[string]*v1.Service
// cacheLock protecting the cache. caller is responsible for using
// the cacheLock before invoking methods on cache the cache is not
// thread-safe, and the caller can guarantee thread safety by using
// the cacheLock
cacheLock sync.RWMutex
// The domain for which this DNS Server is authoritative, in array
// format and reversed. e.g. if domain is "cluster.local",
// domainPath is []string{"local", "cluster"}
domainPath []string
// endpointsController invokes registered callbacks when endpoints change.
endpointsController *kcache.Controller
// serviceController invokes registered callbacks when services change.
serviceController *kcache.Controller
// config set from the dynamic configuration source.
config *config.Config
// configLock protects the config below.
configLock sync.RWMutex
// configSync manages synchronization of the config map
configSync config.Sync
// Initial timeout for endpoints and services to be synced from APIServer
initialSyncTimeout time.Duration
}
func NewKubeDNS(client clientset.Interface, clusterDomain string, timeout time.Duration, configSync config.Sync) *KubeDNS {
kd := &KubeDNS{
kubeClient: client,
domain: clusterDomain,
cache: treecache.NewTreeCache(),
cacheLock: sync.RWMutex{},
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*v1.Service),
domainPath: util.ReverseArray(strings.Split(strings.TrimRight(clusterDomain, "."), ".")),
initialSyncTimeout: timeout,
configLock: sync.RWMutex{},
configSync: configSync,
}
kd.setEndpointsStore()
kd.setServicesStore()
return kd
}
func (kd *KubeDNS) Start() {
glog.V(2).Infof("Starting endpointsController")
go kd.endpointsController.Run(wait.NeverStop)
glog.V(2).Infof("Starting serviceController")
go kd.serviceController.Run(wait.NeverStop)
kd.startConfigMapSync()
// Wait synchronously for the initial list operations to be
// complete of endpoints and services from APIServer.
kd.waitForResourceSyncedOrDie()
}
func (kd *KubeDNS) waitForResourceSyncedOrDie() {
// Wait for both controllers have completed an initial resource listing
timeout := time.After(kd.initialSyncTimeout)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-timeout:
glog.Fatalf("Timeout waiting for initialization")
case <-ticker.C:
if kd.endpointsController.HasSynced() && kd.serviceController.HasSynced() {
glog.V(0).Infof("Initialized services and endpoints from apiserver")
return
}
glog.V(0).Infof("DNS server not ready, retry in 500 milliseconds")
}
}
}
func (kd *KubeDNS) startConfigMapSync() {
initialConfig, err := kd.configSync.Once()
if err != nil {
glog.Errorf(
"Error getting initial ConfigMap: %v, starting with default values", err)
kd.config = config.NewDefaultConfig()
} else {
kd.config = initialConfig
}
go kd.syncConfigMap(kd.configSync.Periodic())
}
func (kd *KubeDNS) syncConfigMap(syncChan <-chan *config.Config) {
for {
nextConfig := <-syncChan
kd.configLock.Lock()
kd.config = nextConfig
glog.V(2).Infof("Configuration updated: %+v", *kd.config)
kd.configLock.Unlock()
}
}
func (kd *KubeDNS) GetCacheAsJSON() (string, error) {
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
json, err := kd.cache.Serialize()
return json, err
}
func (kd *KubeDNS) setServicesStore() {
// Returns a cache.ListWatch that gets all changes to services.
kd.servicesStore, kd.serviceController = kcache.NewInformer(
&kcache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return kd.kubeClient.Core().Services(v1.NamespaceAll).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return kd.kubeClient.Core().Services(v1.NamespaceAll).Watch(options)
},
},
&v1.Service{},
resyncPeriod,
kcache.ResourceEventHandlerFuncs{
AddFunc: kd.newService,
DeleteFunc: kd.removeService,
UpdateFunc: kd.updateService,
},
)
}
func (kd *KubeDNS) setEndpointsStore() {
// Returns a cache.ListWatch that gets all changes to endpoints.
kd.endpointsStore, kd.endpointsController = kcache.NewInformer(
&kcache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
return kd.kubeClient.Core().Endpoints(v1.NamespaceAll).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
return kd.kubeClient.Core().Endpoints(v1.NamespaceAll).Watch(options)
},
},
&v1.Endpoints{},
resyncPeriod,
kcache.ResourceEventHandlerFuncs{
AddFunc: kd.handleEndpointAdd,
UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: Avoid unwanted updates.
kd.handleEndpointAdd(newObj)
},
// No DeleteFunc for EndpointsStore because endpoint object will be deleted
// when corresponding service is deleted.
},
)
}
func assertIsService(obj interface{}) (*v1.Service, bool) {
if service, ok := obj.(*v1.Service); ok {
return service, ok
} else {
glog.Errorf("Type assertion failed! Expected 'Service', got %T", service)
return nil, ok
}
}
func (kd *KubeDNS) newService(obj interface{}) {
if service, ok := assertIsService(obj); ok {
glog.V(2).Infof("New service: %v", service.Name)
glog.V(4).Infof("Service details: %v", service)
// ExternalName services are a special kind that return CNAME records
if service.Spec.Type == v1.ServiceTypeExternalName {
kd.newExternalNameService(service)
return
}
// if ClusterIP is not set, a DNS entry should not be created
if !v1.IsServiceIPSet(service) {
kd.newHeadlessService(service)
return
}
if len(service.Spec.Ports) == 0 {
glog.Warningf("Service with no ports, this should not have happened: %v",
service)
}
kd.newPortalService(service)
}
}
func (kd *KubeDNS) removeService(obj interface{}) {
if s, ok := assertIsService(obj); ok {
subCachePath := append(kd.domainPath, serviceSubdomain, s.Namespace, s.Name)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
success := kd.cache.DeletePath(subCachePath...)
glog.V(2).Infof("removeService %v at path %v. Success: %v",
s.Name, subCachePath, success)
// ExternalName services have no IP
if v1.IsServiceIPSet(s) {
delete(kd.reverseRecordMap, s.Spec.ClusterIP)
delete(kd.clusterIPServiceMap, s.Spec.ClusterIP)
}
}
}
func (kd *KubeDNS) updateService(oldObj, newObj interface{}) {
if new, ok := assertIsService(newObj); ok {
if old, ok := assertIsService(oldObj); ok {
// Remove old cache path only if changing type to/from ExternalName.
// In all other cases, we'll update records in place.
if (new.Spec.Type == v1.ServiceTypeExternalName) !=
(old.Spec.Type == v1.ServiceTypeExternalName) {
kd.removeService(oldObj)
}
kd.newService(newObj)
}
}
}
func (kd *KubeDNS) handleEndpointAdd(obj interface{}) {
if e, ok := obj.(*v1.Endpoints); ok {
kd.addDNSUsingEndpoints(e)
}
}
func (kd *KubeDNS) addDNSUsingEndpoints(e *v1.Endpoints) error {
svc, err := kd.getServiceFromEndpoints(e)
if err != nil {
return err
}
if svc == nil || v1.IsServiceIPSet(svc) {
// No headless service found corresponding to endpoints object.
return nil
}
return kd.generateRecordsForHeadlessService(e, svc)
}
func (kd *KubeDNS) getServiceFromEndpoints(e *v1.Endpoints) (*v1.Service, error) {
key, err := kcache.MetaNamespaceKeyFunc(e)
if err != nil {
return nil, err
}
obj, exists, err := kd.servicesStore.GetByKey(key)
if err != nil {
return nil, fmt.Errorf("failed to get service object from services store - %v", err)
}
if !exists {
glog.V(3).Infof("No service for endpoint %q in namespace %q",
e.Name, e.Namespace)
return nil, nil
}
if svc, ok := assertIsService(obj); ok {
return svc, nil
}
return nil, fmt.Errorf("got a non service object in services store %v", obj)
}
// fqdn constructs the fqdn for the given service. subpaths is a list of path
// elements rooted at the given service, ending at a service record.
func (kd *KubeDNS) fqdn(service *v1.Service, subpaths ...string) string {
domainLabels := append(append(kd.domainPath, serviceSubdomain, service.Namespace, service.Name), subpaths...)
return dns.Fqdn(strings.Join(util.ReverseArray(domainLabels), "."))
}
func (kd *KubeDNS) newPortalService(service *v1.Service) {
subCache := treecache.NewTreeCache()
recordValue, recordLabel := util.GetSkyMsg(service.Spec.ClusterIP, 0)
subCache.SetEntry(recordLabel, recordValue, kd.fqdn(service, recordLabel))
// Generate SRV Records
for i := range service.Spec.Ports {
port := &service.Spec.Ports[i]
if port.Name != "" && port.Protocol != "" {
srvValue := kd.generateSRVRecordValue(service, int(port.Port))
l := []string{"_" + strings.ToLower(string(port.Protocol)), "_" + port.Name}
glog.V(2).Infof("Added SRV record %+v", srvValue)
subCache.SetEntry(recordLabel, srvValue, kd.fqdn(service, append(l, recordLabel)...), l...)
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
host := getServiceFQDN(kd.domain, service)
reverseRecord, _ := util.GetSkyMsg(host, 0)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
kd.cache.SetSubCache(service.Name, subCache, subCachePath...)
kd.reverseRecordMap[service.Spec.ClusterIP] = reverseRecord
kd.clusterIPServiceMap[service.Spec.ClusterIP] = service
}
func (kd *KubeDNS) generateRecordsForHeadlessService(e *v1.Endpoints, svc *v1.Service) error {
subCache := treecache.NewTreeCache()
glog.V(4).Infof("Endpoints Annotations: %v", e.Annotations)
for idx := range e.Subsets {
for subIdx := range e.Subsets[idx].Addresses {
address := &e.Subsets[idx].Addresses[subIdx]
endpointIP := address.IP
recordValue, endpointName := util.GetSkyMsg(endpointIP, 0)
if hostLabel, exists := getHostname(address); exists {
endpointName = hostLabel
}
subCache.SetEntry(endpointName, recordValue, kd.fqdn(svc, endpointName))
for portIdx := range e.Subsets[idx].Ports {
endpointPort := &e.Subsets[idx].Ports[portIdx]
if endpointPort.Name != "" && endpointPort.Protocol != "" {
srvValue := kd.generateSRVRecordValue(svc, int(endpointPort.Port), endpointName)
glog.V(2).Infof("Added SRV record %+v", srvValue)
l := []string{"_" + strings.ToLower(string(endpointPort.Protocol)), "_" + endpointPort.Name}
subCache.SetEntry(endpointName, srvValue, kd.fqdn(svc, append(l, endpointName)...), l...)
}
}
}
}
subCachePath := append(kd.domainPath, serviceSubdomain, svc.Namespace)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
kd.cache.SetSubCache(svc.Name, subCache, subCachePath...)
return nil
}
func getHostname(address *v1.EndpointAddress) (string, bool) {
if len(address.Hostname) > 0 {
return address.Hostname, true
}
return "", false
}
func (kd *KubeDNS) generateSRVRecordValue(svc *v1.Service, portNumber int, labels ...string) *skymsg.Service {
host := strings.Join([]string{svc.Name, svc.Namespace, serviceSubdomain, kd.domain}, ".")
for _, cNameLabel := range labels {
host = cNameLabel + "." + host
}
recordValue, _ := util.GetSkyMsg(host, portNumber)
return recordValue
}
// Generates skydns records for a headless service.
func (kd *KubeDNS) newHeadlessService(service *v1.Service) error {
// Create an A record for every pod in the service.
// This record must be periodically updated.
// Format is as follows:
// For a service x, with pods a and b create DNS records,
// a.x.ns.domain. and, b.x.ns.domain.
key, err := kcache.MetaNamespaceKeyFunc(service)
if err != nil {
return err
}
e, exists, err := kd.endpointsStore.GetByKey(key)
if err != nil {
return fmt.Errorf("failed to get endpoints object from endpoints store - %v", err)
}
if !exists {
glog.V(1).Infof("Could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up.",
service.Name, service.Namespace)
return nil
}
if e, ok := e.(*v1.Endpoints); ok {
return kd.generateRecordsForHeadlessService(e, service)
}
return nil
}
// Generates skydns records for an ExternalName service.
func (kd *KubeDNS) newExternalNameService(service *v1.Service) {
// Create a CNAME record for the service's ExternalName.
// TODO: TTL?
recordValue, _ := util.GetSkyMsg(service.Spec.ExternalName, 0)
cachePath := append(kd.domainPath, serviceSubdomain, service.Namespace)
fqdn := kd.fqdn(service)
glog.V(2).Infof("newExternalNameService: storing key %s with value %v as %s under %v",
service.Name, recordValue, fqdn, cachePath)
kd.cacheLock.Lock()
defer kd.cacheLock.Unlock()
// Store the service name directly as the leaf key
kd.cache.SetEntry(service.Name, recordValue, fqdn, cachePath...)
}
// Records responds with DNS records that match the given name, in a format
// understood by the skydns server. If "exact" is true, a single record
// matching the given name is returned, otherwise all records stored under
// the subtree matching the name are returned.
func (kd *KubeDNS) Records(name string, exact bool) (retval []skymsg.Service, err error) {
glog.V(3).Infof("Query for %q, exact: %v", name, exact)
trimmed := strings.TrimRight(name, ".")
segments := strings.Split(trimmed, ".")
isFederationQuery := false
federationSegments := []string{}
if !exact && kd.isFederationQuery(segments) {
glog.V(3).Infof("Received federation query, trying local service first")
// Try querying the non-federation (local) service first. Will try
// the federation one later, if this fails.
isFederationQuery = true
federationSegments = append(federationSegments, segments...)
// To try local service, remove federation name from segments.
// Federation name is 3rd in the segment (after service name and
// namespace).
segments = append(segments[:2], segments[3:]...)
}
path := util.ReverseArray(segments)
records, err := kd.getRecordsForPath(path, exact)
if err != nil {
return nil, err
}
if isFederationQuery {
return kd.recordsForFederation(records, path, exact, federationSegments)
} else if len(records) > 0 {
glog.V(4).Infof("Records for %v: %v", name, records)
return records, nil
}
glog.V(3).Infof("No record found for %v", name)
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string, exact bool, federationSegments []string) (retval []skymsg.Service, err error) {
// For federation query, verify that the local service has endpoints.
validRecord := false
for _, val := range records {
// We know that a headless service has endpoints for sure if a
// record was returned for it. The record contains endpoint
// IPs. So nothing to check for headless services.
//
// TODO: this access to the cluster IP map does not seem to be
// threadsafe.
if !kd.isHeadlessServiceRecord(&val) {
ok, err := kd.serviceWithClusterIPHasEndpoints(&val)
if err != nil {
glog.V(2).Infof(
"Federation: error finding if service has endpoint: %v", err)
continue
}
if !ok {
glog.V(2).Infof("Federation: skipping record since service has no endpoint: %v", val)
continue
}
}
validRecord = true
break
}
if validRecord {
// There is a local service with valid endpoints, return its CNAME.
name := strings.Join(util.ReverseArray(path), ".")
// Ensure that this name that we are returning as a CNAME response
// is a fully qualified domain name so that the client's resolver
// library doesn't have to go through its search list all over
// again.
if !strings.HasSuffix(name, ".") {
name = name + "."
}
glog.V(3).Infof(
"Federation: Returning CNAME for local service: %v", name)
return []skymsg.Service{{Host: name}}, nil
}
// If the name query is not an exact query and does not match any
// records in the local store, attempt to send a federation redirect
// (CNAME) response.
if !exact {
glog.V(3).Infof(
"Federation: Did not find a local service. Trying federation redirect (CNAME)")
return kd.federationRecords(util.ReverseArray(federationSegments))
}
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
func (kd *KubeDNS) getRecordsForPath(path []string, exact bool) ([]skymsg.Service, error) {
if kd.isPodRecord(path) {
ip, err := kd.getPodIP(path)
if err == nil {
skyMsg, _ := util.GetSkyMsg(ip, 0)
return []skymsg.Service{*skyMsg}, nil
}
return nil, err
}
if exact {
key := path[len(path)-1]
if key == "" {
return []skymsg.Service{}, nil
}
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
if record, ok := kd.cache.GetEntry(key, path[:len(path)-1]...); ok {
glog.V(3).Infof("Exact match %v for %v received from cache", record, path[:len(path)-1])
return []skymsg.Service{*(record.(*skymsg.Service))}, nil
}
glog.V(3).Infof("Exact match for %v not found in cache", path)
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
records := kd.cache.GetValuesForPathWithWildcards(path...)
glog.V(3).Infof("Found %d records for %v in the cache", len(records), path)
retval := []skymsg.Service{}
for _, val := range records {
retval = append(retval, *val)
}
glog.V(4).Infof("getRecordsForPath retval=%+v, path=%v", retval, path)
return retval, nil
}
// Returns true if the given record corresponds to a headless service.
// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it.
// This is because the code will panic, if we try to acquire it again if we already have it.
func (kd *KubeDNS) isHeadlessServiceRecord(msg *skymsg.Service) bool {
// If it is not a headless service, then msg.Host will be the cluster IP.
// So we can check if msg.host exists in our clusterIPServiceMap.
_, ok := kd.clusterIPServiceMap[msg.Host]
// It is headless service if no record was found.
return !ok
}
// Returns true if the service corresponding to the given message has endpoints.
// Note: Works only for services with ClusterIP. Will return an error for headless service (service without a clusterIP).
// Important: Assumes that we already have the cacheLock. Callers responsibility to acquire it.
// This is because the code will panic, if we try to acquire it again if we already have it.
func (kd *KubeDNS) serviceWithClusterIPHasEndpoints(msg *skymsg.Service) (bool, error) {
svc, ok := kd.clusterIPServiceMap[msg.Host]
if !ok {
// It is a headless service.
return false, fmt.Errorf("method not expected to be called for headless service")
}
key, err := kcache.MetaNamespaceKeyFunc(svc)
if err != nil {
return false, err
}
e, exists, err := kd.endpointsStore.GetByKey(key)
if err != nil {
return false, fmt.Errorf("failed to get endpoints object from endpoints store - %v", err)
}
if !exists {
return false, nil
}
if e, ok := e.(*v1.Endpoints); ok {
return len(e.Subsets) > 0, nil
}
return false, fmt.Errorf("unexpected: found non-endpoint object in endpoint store: %v", e)
}
// ReverseRecords performs a reverse lookup for the given name.
func (kd *KubeDNS) ReverseRecord(name string) (*skymsg.Service, error) {
glog.V(3).Infof("Query for ReverseRecord %q", name)
// if portalIP is not a valid IP, the reverseRecordMap lookup will fail
portalIP, ok := util.ExtractIP(name)
if !ok {
return nil, fmt.Errorf("does not support reverse lookup for %s", name)
}
kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock()
if reverseRecord, ok := kd.reverseRecordMap[portalIP]; ok {
return reverseRecord, nil
}
return nil, fmt.Errorf("must be exactly one service record")
}
// e.g {"local", "cluster", "pod", "default", "10-0-0-1"}
func (kd *KubeDNS) isPodRecord(path []string) bool {
if len(path) != len(kd.domainPath)+3 {
return false
}
if path[len(kd.domainPath)] != "pod" {
return false
}
for _, segment := range path {
if segment == "*" {
return false
}
}
return true
}
func (kd *KubeDNS) getPodIP(path []string) (string, error) {
ipStr := path[len(path)-1]
ip := strings.Replace(ipStr, "-", ".", -1)
if parsed := net.ParseIP(ip); parsed != nil {
return ip, nil
}
return "", fmt.Errorf("Invalid IP Address %v", ip)
}
// isFederationQuery checks if the given query `path` matches the federated service query pattern.
// The conjunction of the following conditions forms the test for the federated service query
// pattern:
// 1. `path` has exactly 4+len(domainPath) segments: mysvc.myns.myfederation.svc.domain.path.
// 2. Service name component must be a valid RFC 1035 name.
// 3. Namespace component must be a valid RFC 1123 name.
// 4. Federation component must also be a valid RFC 1123 name.
// 5. Fourth segment is exactly "svc"
// 6. The remaining segments match kd.domainPath.
// 7. And federation must be one of the listed federations in the config.
// Note: Because of the above conditions, this method will treat wildcard queries such as
// *.mysvc.myns.myfederation.svc.domain.path as non-federation queries.
// We can add support for wildcard queries later, if needed.
func (kd *KubeDNS) isFederationQuery(path []string) bool {
if len(path) != 4+len(kd.domainPath) {
glog.V(4).Infof("Not a federation query: len(%q) != 4+len(%q)", path, kd.domainPath)
return false
}
if errs := validation.IsDNS1035Label(path[0]); len(errs) != 0 {
glog.V(4).Infof("Not a federation query: %q is not an RFC 1035 label: %q",
path[0], errs)
return false
}
if errs := validation.IsDNS1123Label(path[1]); len(errs) != 0 {
glog.V(4).Infof("Not a federation query: %q is not an RFC 1123 label: %q",
path[1], errs)
return false
}
if errs := validation.IsDNS1123Label(path[2]); len(errs) != 0 {
glog.V(4).Infof("Not a federation query: %q is not an RFC 1123 label: %q",
path[2], errs)
return false
}
if path[3] != serviceSubdomain {
glog.V(4).Infof("Not a federation query: %q != %q (serviceSubdomain)",
path[3], serviceSubdomain)
return false
}
for i, domComp := range kd.domainPath {
// kd.domainPath is reversed, so we need to look in the `path` in the reverse order.
if domComp != path[len(path)-i-1] {
glog.V(4).Infof("Not a federation query: kd.domainPath[%d] != path[%d] (%q != %q)",
i, len(path)-i-1, domComp, path[len(path)-i-1])
return false
}
}
kd.configLock.RLock()
defer kd.configLock.RUnlock()
if _, ok := kd.config.Federations[path[2]]; !ok {
glog.V(4).Infof("Not a federation query: label %q not found", path[2])
return false
}
return true
}
// federationRecords checks if the given `queryPath` is for a federated service and if it is,
// it returns a CNAME response containing the cluster zone name and federation domain name
// suffix.
func (kd *KubeDNS) federationRecords(queryPath []string) ([]skymsg.Service, error) {
// `queryPath` is a reversed-array of the queried name, reverse it back to make it easy
// to follow through this code and reduce confusion. There is no reason for it to be
// reversed here.
path := util.ReverseArray(queryPath)
// Check if the name query matches the federation query pattern.
if !kd.isFederationQuery(path) {
return nil, etcd.Error{Code: etcd.ErrorCodeKeyNotFound}
}
// Now that we have already established that the query is a federation query, remove the local
// domain path components, i.e. kd.domainPath, from the query.
path = path[:len(path)-len(kd.domainPath)]
// Append the zone name (zone in the cloud provider terminology, not a DNS
// zone) and the region name.
zone, region, err := kd.getClusterZoneAndRegion()
if err != nil {
return nil, fmt.Errorf("failed to obtain the cluster zone and region: %v", err)
}
path = append(path, zone, region)
// We have already established that the map entry exists for the given federation,
// we just need to retrieve the domain name, validate it and append it to the path.
kd.configLock.RLock()
domain := kd.config.Federations[path[2]]
kd.configLock.RUnlock()
// We accept valid subdomains as well, so just let all the valid subdomains.
if len(validation.IsDNS1123Subdomain(domain)) != 0 {
return nil, fmt.Errorf("%s is not a valid domain name for federation %s", domain, path[2])
}
name := strings.Join(append(path, domain), ".")
// Ensure that this name that we are returning as a CNAME response is a fully qualified
// domain name so that the client's resolver library doesn't have to go through its
// search list all over again.
if !strings.HasSuffix(name, ".") {
name = name + "."
}
return []skymsg.Service{{Host: name}}, nil
}
// getClusterZoneAndRegion returns the name of the zone and the region the
// cluster is running in. It arbitrarily selects a node and reads the failure
// domain label on the node. An alternative is to obtain this pod's
// (i.e. kube-dns pod's) name using the downward API, get the pod, get the
// node the pod is bound to and retrieve that node's labels. But even just by
// reading those steps, it looks complex and it is not entirely clear what
// that complexity is going to buy us. So taking a simpler approach here.
// Also note that zone here means the zone in cloud provider terminology, not
// the DNS zone.
func (kd *KubeDNS) getClusterZoneAndRegion() (string, string, error) {
var node *v1.Node
objs := kd.nodesStore.List()
if len(objs) > 0 {
var ok bool
if node, ok = objs[0].(*v1.Node); !ok {
return "", "", fmt.Errorf("expected node object, got: %T", objs[0])
}
} else {
// An alternative to listing nodes each time is to set a watch, but that is totally
// wasteful in case of non-federated independent Kubernetes clusters. So carefully
// proceeding here.
// TODO(madhusudancs): Move this to external/v1 API.
nodeList, err := kd.kubeClient.Core().Nodes().List(v1.ListOptions{})
if err != nil || len(nodeList.Items) == 0 {
return "", "", fmt.Errorf("failed to retrieve the cluster nodes: %v", err)
}
// Select a node (arbitrarily the first node) that has
// `LabelZoneFailureDomain` and `LabelZoneRegion` set.
for _, nodeItem := range nodeList.Items {
_, zfound := nodeItem.Labels[metav1.LabelZoneFailureDomain]
_, rfound := nodeItem.Labels[metav1.LabelZoneRegion]
if !zfound || !rfound {
continue
}
// Make a copy of the node, don't rely on the loop variable.
node = &(*(&nodeItem))
if err := kd.nodesStore.Add(node); err != nil {
return "", "", fmt.Errorf("couldn't add the retrieved node to the cache: %v", err)
}
// Node is found, break out of the loop.
break
}
}
if node == nil {
return "", "", fmt.Errorf("Could not find any nodes")
}
zone, ok := node.Labels[metav1.LabelZoneFailureDomain]
if !ok || zone == "" {
return "", "", fmt.Errorf("unknown cluster zone")
}
region, ok := node.Labels[metav1.LabelZoneRegion]
if !ok || region == "" {
return "", "", fmt.Errorf("unknown cluster region")
}
return zone, region, nil
}
func getServiceFQDN(domain string, service *v1.Service) string {
return strings.Join(
[]string{service.Name, service.Namespace, serviceSubdomain, domain}, ".")
}

View File

@ -1,839 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dns
import (
"fmt"
"net"
"reflect"
"strings"
"sync"
"testing"
"time"
etcd "github.com/coreos/etcd/client"
"github.com/miekg/dns"
skymsg "github.com/skynetservices/skydns/msg"
skyserver "github.com/skynetservices/skydns/server"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/pkg/api/v1"
metav1 "k8s.io/client-go/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/kubernetes/pkg/dns/config"
"k8s.io/kubernetes/pkg/dns/treecache"
"k8s.io/kubernetes/pkg/dns/util"
"k8s.io/kubernetes/pkg/util/sets"
)
const (
testDomain = "cluster.local."
testService = "testservice"
testNamespace = "default"
testExternalName = "foo.bar.example.com"
)
func newKubeDNS() *KubeDNS {
return &KubeDNS{
domain: testDomain,
domainPath: util.ReverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
cache: treecache.NewTreeCache(),
reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*v1.Service),
cacheLock: sync.RWMutex{},
config: config.NewDefaultConfig(),
configLock: sync.RWMutex{},
configSync: config.NewNopSync(config.NewDefaultConfig()),
}
}
func TestPodDns(t *testing.T) {
const (
testPodIP = "1.2.3.4"
sanitizedPodIP = "1-2-3-4"
)
kd := newKubeDNS()
records, err := kd.Records(sanitizedPodIP+".default.pod."+kd.domain, false)
require.NoError(t, err)
assert.Equal(t, 1, len(records))
assert.Equal(t, testPodIP, records[0].Host)
}
func TestUnnamedSinglePortService(t *testing.T) {
kd := newKubeDNS()
s := newService(testNamespace, testService, "1.2.3.4", "", 80)
// Add the service
kd.newService(s)
assertDNSForClusterIP(t, kd, s)
assertReverseRecord(t, kd, s)
// Delete the service
kd.removeService(s)
assertNoDNSForClusterIP(t, kd, s)
assertNoReverseRecord(t, kd, s)
}
func TestNamedSinglePortService(t *testing.T) {
const (
portName1 = "http1"
portName2 = "http2"
)
kd := newKubeDNS()
s := newService(testNamespace, testService, "1.2.3.4", portName1, 80)
// Add the service
kd.newService(s)
assertDNSForClusterIP(t, kd, s)
assertSRVForNamedPort(t, kd, s, portName1)
newService := *s
// update the portName of the service
newService.Spec.Ports[0].Name = portName2
kd.updateService(s, &newService)
assertDNSForClusterIP(t, kd, s)
assertSRVForNamedPort(t, kd, s, portName2)
assertNoSRVForNamedPort(t, kd, s, portName1)
// Delete the service
kd.removeService(s)
assertNoDNSForClusterIP(t, kd, s)
assertNoSRVForNamedPort(t, kd, s, portName1)
assertNoSRVForNamedPort(t, kd, s, portName2)
}
func assertARecordsMatchIPs(t *testing.T, records []dns.RR, ips ...string) {
expectedEndpoints := sets.NewString(ips...)
gotEndpoints := sets.NewString()
for _, r := range records {
if a, ok := r.(*dns.A); !ok {
t.Errorf("Expected A record, got %#v", a)
} else {
gotEndpoints.Insert(a.A.String())
}
}
if !gotEndpoints.Equal(expectedEndpoints) {
t.Errorf("Expected %v got %v", expectedEndpoints, gotEndpoints)
}
}
func assertSRVRecordsMatchTarget(t *testing.T, records []dns.RR, targets ...string) {
expectedTargets := sets.NewString(targets...)
gotTargets := sets.NewString()
for _, r := range records {
if srv, ok := r.(*dns.SRV); !ok {
t.Errorf("Expected SRV record, got %+v", srv)
} else {
gotTargets.Insert(srv.Target)
}
}
if !gotTargets.Equal(expectedTargets) {
t.Errorf("Expected %v got %v", expectedTargets, gotTargets)
}
}
func assertSRVRecordsMatchPort(t *testing.T, records []dns.RR, port ...int) {
expectedPorts := sets.NewInt(port...)
gotPorts := sets.NewInt()
for _, r := range records {
if srv, ok := r.(*dns.SRV); !ok {
t.Errorf("Expected SRV record, got %+v", srv)
} else {
gotPorts.Insert(int(srv.Port))
t.Logf("got %+v", srv)
}
}
if !gotPorts.Equal(expectedPorts) {
t.Errorf("Expected %v got %v", expectedPorts, gotPorts)
}
}
func TestSkySimpleSRVLookup(t *testing.T) {
kd := newKubeDNS()
skydnsConfig := &skyserver.Config{Domain: testDomain, DnsAddr: "0.0.0.0:53"}
skyserver.SetDefaults(skydnsConfig)
s := skyserver.New(kd, skydnsConfig)
service := newHeadlessService()
endpointIPs := []string{"10.0.0.1", "10.0.0.2"}
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, endpointIPs...))
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(service)
name := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".")
question := dns.Question{Name: name, Qtype: dns.TypeSRV, Qclass: dns.ClassINET}
rec, extra, err := s.SRVRecords(question, name, 512, false)
if err != nil {
t.Fatalf("Failed srv record lookup on service with fqdn %v", name)
}
assertARecordsMatchIPs(t, extra, endpointIPs...)
targets := []string{}
for _, eip := range endpointIPs {
// A portal service is always created with a port of '0'
targets = append(targets,
fmt.Sprintf("%x.%v",
util.HashServiceRecord(util.NewServiceRecord(eip, 0)), name))
}
assertSRVRecordsMatchTarget(t, rec, targets...)
}
func TestSkyPodHostnameSRVLookup(t *testing.T) {
kd := newKubeDNS()
skydnsConfig := &skyserver.Config{Domain: testDomain, DnsAddr: "0.0.0.0:53"}
skyserver.SetDefaults(skydnsConfig)
s := skyserver.New(kd, skydnsConfig)
service := newHeadlessService()
endpointIPs := []string{"10.0.0.1", "10.0.0.2"}
endpoints := newEndpoints(
service,
newSubsetWithOnePortWithHostname("", 80, true, endpointIPs...))
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(service)
name := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".")
question := dns.Question{Name: name, Qtype: dns.TypeSRV, Qclass: dns.ClassINET}
rec, _, err := s.SRVRecords(question, name, 512, false)
if err != nil {
t.Fatalf("Failed srv record lookup on service with fqdn %v", name)
}
targets := []string{}
for i := range endpointIPs {
targets = append(targets, fmt.Sprintf("%v.%v", fmt.Sprintf("ep-%d", i), name))
}
assertSRVRecordsMatchTarget(t, rec, targets...)
}
func TestSkyNamedPortSRVLookup(t *testing.T) {
kd := newKubeDNS()
skydnsConfig := &skyserver.Config{Domain: testDomain, DnsAddr: "0.0.0.0:53"}
skyserver.SetDefaults(skydnsConfig)
s := skyserver.New(kd, skydnsConfig)
service := newHeadlessService()
eip := "10.0.0.1"
endpoints := newEndpoints(service, newSubsetWithOnePort("http", 8081, eip))
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(service)
name := strings.Join([]string{"_http", "_tcp", testService, testNamespace, "svc", testDomain}, ".")
question := dns.Question{Name: name, Qtype: dns.TypeSRV, Qclass: dns.ClassINET}
rec, extra, err := s.SRVRecords(question, name, 512, false)
if err != nil {
t.Fatalf("Failed srv record lookup on service with fqdn %v", name)
}
svcDomain := strings.Join([]string{testService, testNamespace, "svc", testDomain}, ".")
assertARecordsMatchIPs(t, extra, eip)
assertSRVRecordsMatchTarget(
t, rec, fmt.Sprintf("%x.%v", util.HashServiceRecord(util.NewServiceRecord(eip, 0)), svcDomain))
assertSRVRecordsMatchPort(t, rec, 8081)
}
func TestSimpleExternalService(t *testing.T) {
kd := newKubeDNS()
s := newExternalNameService()
assert.NoError(t, kd.servicesStore.Add(s))
kd.newService(s)
assertDNSForExternalService(t, kd, s)
kd.removeService(s)
assertNoDNSForExternalService(t, kd, s)
}
func TestSimpleHeadlessService(t *testing.T) {
kd := newKubeDNS()
s := newHeadlessService()
assert.NoError(t, kd.servicesStore.Add(s))
endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"))
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.newService(s)
assertDNSForHeadlessService(t, kd, endpoints)
kd.removeService(s)
assertNoDNSForHeadlessService(t, kd, s)
}
func TestHeadlessServiceWithNamedPorts(t *testing.T) {
kd := newKubeDNS()
service := newHeadlessService()
// add service to store
assert.NoError(t, kd.servicesStore.Add(service))
endpoints := newEndpoints(service, newSubsetWithTwoPorts("http1", 80, "http2", 81, "10.0.0.1", "10.0.0.2"),
newSubsetWithOnePort("https", 443, "10.0.0.3", "10.0.0.4"))
// We expect 10 records. 6 SRV records. 4 POD records.
// add endpoints
assert.NoError(t, kd.endpointsStore.Add(endpoints))
// add service
kd.newService(service)
assertDNSForHeadlessService(t, kd, endpoints)
assertSRVForHeadlessService(t, kd, service, endpoints)
// reduce endpoints
endpoints.Subsets = endpoints.Subsets[:1]
kd.handleEndpointAdd(endpoints)
// We expect 6 records. 4 SRV records. 2 POD records.
assertDNSForHeadlessService(t, kd, endpoints)
assertSRVForHeadlessService(t, kd, service, endpoints)
kd.removeService(service)
assertNoDNSForHeadlessService(t, kd, service)
}
func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
kd := newKubeDNS()
service := newHeadlessService()
// add service to store
assert.NoError(t, kd.servicesStore.Add(service))
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"))
// add endpoints to store
assert.NoError(t, kd.endpointsStore.Add(endpoints))
// add service
kd.newService(service)
assertDNSForHeadlessService(t, kd, endpoints)
// increase endpoints
endpoints.Subsets = append(endpoints.Subsets,
newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"),
)
// expected DNSRecords = 4
kd.handleEndpointAdd(endpoints)
assertDNSForHeadlessService(t, kd, endpoints)
// remove all endpoints
endpoints.Subsets = []v1.EndpointSubset{}
kd.handleEndpointAdd(endpoints)
assertNoDNSForHeadlessService(t, kd, service)
// remove service
kd.removeService(service)
assertNoDNSForHeadlessService(t, kd, service)
}
func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
kd := newKubeDNS()
// create service
service := newHeadlessService()
// add service to store
assert.NoError(t, kd.servicesStore.Add(service))
// add service
kd.newService(service)
assertNoDNSForHeadlessService(t, kd, service)
// create endpoints
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"))
// add endpoints to store
assert.NoError(t, kd.endpointsStore.Add(endpoints))
// add endpoints
kd.handleEndpointAdd(endpoints)
assertDNSForHeadlessService(t, kd, endpoints)
// remove service
kd.removeService(service)
assertNoDNSForHeadlessService(t, kd, service)
}
// Verifies that a single record with host "a" is returned for query "q".
func verifyRecord(q, a string, t *testing.T, kd *KubeDNS) {
records, err := kd.Records(q, false)
require.NoError(t, err)
assert.Equal(t, 1, len(records))
assert.Equal(t, a, records[0].Host)
}
const federatedServiceFQDN = "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com."
// Verifies that querying KubeDNS for a headless federation service
// returns the DNS hostname when a local service does not exist and
// returns the endpoint IP when a local service exists.
func TestFederationHeadlessService(t *testing.T) {
kd := newKubeDNS()
kd.config.Federations = map[string]string{
"myfederation": "example.com",
}
kd.kubeClient = fake.NewSimpleClientset(newNodes())
// Verify that querying for federation service returns a federation domain name.
verifyRecord("testservice.default.myfederation.svc.cluster.local.",
federatedServiceFQDN, t, kd)
// Add a local service without any endpoint.
s := newHeadlessService()
assert.NoError(t, kd.servicesStore.Add(s))
kd.newService(s)
// Verify that querying for federation service still returns the federation domain name.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
federatedServiceFQDN, t, kd)
// Now add an endpoint.
endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1"))
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.updateService(s, s)
// Verify that querying for federation service returns the local service domain name this time.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
"testservice.default.svc.cluster.local.", t, kd)
// Delete the endpoint.
endpoints.Subsets = []v1.EndpointSubset{}
kd.handleEndpointAdd(endpoints)
kd.updateService(s, s)
// Verify that querying for federation service returns the federation domain name again.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
federatedServiceFQDN, t, kd)
}
// Verifies that querying KubeDNS for a federation service returns the
// DNS hostname if no endpoint exists and returns the local cluster IP
// if endpoints exist.
func TestFederationService(t *testing.T) {
kd := newKubeDNS()
kd.config.Federations = map[string]string{
"myfederation": "example.com",
}
kd.kubeClient = fake.NewSimpleClientset(newNodes())
// Verify that querying for federation service returns the federation domain name.
verifyRecord("testservice.default.myfederation.svc.cluster.local.",
federatedServiceFQDN, t, kd)
// Add a local service without any endpoint.
s := newService(testNamespace, testService, "1.2.3.4", "", 80)
assert.NoError(t, kd.servicesStore.Add(s))
kd.newService(s)
// Verify that querying for federation service still returns the federation domain name.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
federatedServiceFQDN, t, kd)
// Now add an endpoint.
endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1"))
assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.updateService(s, s)
// Verify that querying for federation service returns the local service domain name this time.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
"testservice.default.svc.cluster.local.", t, kd)
// Remove the endpoint.
endpoints.Subsets = []v1.EndpointSubset{}
kd.handleEndpointAdd(endpoints)
kd.updateService(s, s)
// Verify that querying for federation service returns the federation domain name again.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
federatedServiceFQDN, t, kd)
}
func TestFederationQueryWithoutCache(t *testing.T) {
kd := newKubeDNS()
kd.config.Federations = map[string]string{
"myfederation": "example.com",
"secondfederation": "second.example.com",
}
kd.kubeClient = fake.NewSimpleClientset(newNodes())
testValidFederationQueries(t, kd)
testInvalidFederationQueries(t, kd)
}
func TestFederationQueryWithCache(t *testing.T) {
kd := newKubeDNS()
kd.config.Federations = map[string]string{
"myfederation": "example.com",
"secondfederation": "second.example.com",
}
// Add a node to the cache.
nodeList := newNodes()
if err := kd.nodesStore.Add(&nodeList.Items[1]); err != nil {
t.Errorf("failed to add the node to the cache: %v", err)
}
testValidFederationQueries(t, kd)
testInvalidFederationQueries(t, kd)
}
func testValidFederationQueries(t *testing.T, kd *KubeDNS) {
queries := []struct {
q string
a string
}{
// Federation suffix is just a domain.
{
q: "mysvc.myns.myfederation.svc.cluster.local.",
a: "mysvc.myns.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.",
},
// Federation suffix is a subdomain.
{
q: "secsvc.default.secondfederation.svc.cluster.local.",
a: "secsvc.default.secondfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.second.example.com.",
},
}
for _, query := range queries {
verifyRecord(query.q, query.a, t, kd)
}
}
func testInvalidFederationQueries(t *testing.T, kd *KubeDNS) {
noAnswerQueries := []string{
"mysvc.myns.svc.cluster.local.",
"mysvc.default.nofederation.svc.cluster.local.",
}
for _, q := range noAnswerQueries {
records, err := kd.Records(q, false)
if err == nil {
t.Errorf("expected not found error, got nil")
}
if etcdErr, ok := err.(etcd.Error); !ok || etcdErr.Code != etcd.ErrorCodeKeyNotFound {
t.Errorf("expected not found error, got %v", err)
}
assert.Equal(t, 0, len(records))
}
}
func checkConfigEqual(t *testing.T, kd *KubeDNS, expected *config.Config) {
const timeout = time.Duration(5)
start := time.Now()
ok := false
for time.Since(start) < timeout*time.Second {
kd.configLock.RLock()
isEqual := reflect.DeepEqual(expected.Federations, kd.config.Federations)
kd.configLock.RUnlock()
if isEqual {
ok = true
break
}
}
if !ok {
t.Errorf("Federations should be %v, got %v",
expected.Federations, kd.config.Federations)
}
}
func TestConfigSync(t *testing.T) {
kd := newKubeDNS()
mockSync := config.NewMockSync(
&config.Config{Federations: make(map[string]string)}, nil)
kd.configSync = mockSync
kd.startConfigMapSync()
checkConfigEqual(t, kd, &config.Config{Federations: make(map[string]string)})
// update
mockSync.Chan <- &config.Config{Federations: map[string]string{"name1": "domain1"}}
checkConfigEqual(t, kd, &config.Config{Federations: map[string]string{"name1": "domain1"}})
// update
mockSync.Chan <- &config.Config{Federations: map[string]string{"name2": "domain2"}}
checkConfigEqual(t, kd, &config.Config{Federations: map[string]string{"name2": "domain2"}})
}
func TestConfigSyncInitialMap(t *testing.T) {
// start with different initial map
kd := newKubeDNS()
mockSync := config.NewMockSync(
&config.Config{Federations: map[string]string{"name3": "domain3"}}, nil)
kd.configSync = mockSync
kd.startConfigMapSync()
checkConfigEqual(t, kd, &config.Config{Federations: map[string]string{"name3": "domain3"}})
}
func newNodes() *v1.NodeList {
return &v1.NodeList{
Items: []v1.Node{
// Node without annotation.
{
ObjectMeta: v1.ObjectMeta{
Name: "testnode-0",
},
},
{
ObjectMeta: v1.ObjectMeta{
Name: "testnode-1",
Labels: map[string]string{
// Note: The zone name here is an arbitrary string and doesn't exactly follow the
// format used by the cloud providers to name their zones. But that shouldn't matter
// for these tests here.
metav1.LabelZoneFailureDomain: "testcontinent-testreg-testzone",
metav1.LabelZoneRegion: "testcontinent-testreg",
},
},
},
},
}
}
func newService(namespace, serviceName, clusterIP, portName string, portNumber int32) *v1.Service {
service := v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: serviceName,
Namespace: namespace,
},
Spec: v1.ServiceSpec{
ClusterIP: clusterIP,
Ports: []v1.ServicePort{
{Port: portNumber, Name: portName, Protocol: "TCP"},
},
},
}
return &service
}
func newExternalNameService() *v1.Service {
service := v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: testService,
Namespace: testNamespace,
},
Spec: v1.ServiceSpec{
ClusterIP: "None",
Type: v1.ServiceTypeExternalName,
ExternalName: testExternalName,
Ports: []v1.ServicePort{
{Port: 0},
},
},
}
return &service
}
func newHeadlessService() *v1.Service {
service := v1.Service{
ObjectMeta: v1.ObjectMeta{
Name: testService,
Namespace: testNamespace,
},
Spec: v1.ServiceSpec{
ClusterIP: "None",
Ports: []v1.ServicePort{
{Port: 0},
},
},
}
return &service
}
func newEndpoints(service *v1.Service, subsets ...v1.EndpointSubset) *v1.Endpoints {
endpoints := v1.Endpoints{
ObjectMeta: service.ObjectMeta,
Subsets: []v1.EndpointSubset{},
}
endpoints.Subsets = append(endpoints.Subsets, subsets...)
return &endpoints
}
func newSubsetWithOnePort(portName string, port int32, ips ...string) v1.EndpointSubset {
return newSubsetWithOnePortWithHostname(portName, port, false, ips...)
}
func newSubsetWithOnePortWithHostname(portName string, port int32, addHostname bool, ips ...string) v1.EndpointSubset {
subset := newSubset()
subset.Ports = append(subset.Ports, v1.EndpointPort{Port: port, Name: portName, Protocol: "TCP"})
for i, ip := range ips {
var hostname string
if addHostname {
hostname = fmt.Sprintf("ep-%d", i)
}
subset.Addresses = append(subset.Addresses, v1.EndpointAddress{IP: ip, Hostname: hostname})
}
return subset
}
func newSubsetWithTwoPorts(portName1 string, portNumber1 int32, portName2 string, portNumber2 int32, ips ...string) v1.EndpointSubset {
subset := newSubsetWithOnePort(portName1, portNumber1, ips...)
subset.Ports = append(subset.Ports, v1.EndpointPort{Port: portNumber2, Name: portName2, Protocol: "TCP"})
return subset
}
func newSubset() v1.EndpointSubset {
subset := v1.EndpointSubset{
Addresses: []v1.EndpointAddress{},
Ports: []v1.EndpointPort{},
}
return subset
}
func assertSRVForHeadlessService(t *testing.T, kd *KubeDNS, s *v1.Service, e *v1.Endpoints) {
for _, subset := range e.Subsets {
for _, port := range subset.Ports {
records, err := kd.Records(getSRVFQDN(kd, s, port.Name), false)
require.NoError(t, err)
assertRecordPortsMatchPort(t, port.Port, records)
assertCNameRecordsMatchEndpointIPs(t, kd, subset.Addresses, records)
}
}
}
func assertDNSForHeadlessService(t *testing.T, kd *KubeDNS, e *v1.Endpoints) {
records, err := kd.Records(getEndpointsFQDN(kd, e), false)
require.NoError(t, err)
endpoints := map[string]bool{}
for _, subset := range e.Subsets {
for _, endpointAddress := range subset.Addresses {
endpoints[endpointAddress.IP] = true
}
}
assert.Equal(t, len(endpoints), len(records))
for _, record := range records {
_, found := endpoints[record.Host]
assert.True(t, found)
}
}
func assertDNSForExternalService(t *testing.T, kd *KubeDNS, s *v1.Service) {
records, err := kd.Records(getServiceFQDN(kd.domain, s), false)
require.NoError(t, err)
assert.Equal(t, 1, len(records))
assert.Equal(t, testExternalName, records[0].Host)
}
func assertRecordPortsMatchPort(t *testing.T, port int32, records []skymsg.Service) {
for _, record := range records {
assert.Equal(t, port, int32(record.Port))
}
}
func assertCNameRecordsMatchEndpointIPs(t *testing.T, kd *KubeDNS, e []v1.EndpointAddress, records []skymsg.Service) {
endpoints := map[string]bool{}
for _, endpointAddress := range e {
endpoints[endpointAddress.IP] = true
}
assert.Equal(t, len(e), len(records), "unexpected record count")
for _, record := range records {
_, found := endpoints[getIPForCName(t, kd, record.Host)]
assert.True(t, found, "Did not find endpoint with address:%s", record.Host)
}
}
func getIPForCName(t *testing.T, kd *KubeDNS, cname string) string {
records, err := kd.Records(cname, false)
require.NoError(t, err)
assert.Equal(t, 1, len(records), "Could not get IP for CNAME record for %s", cname)
assert.NotNil(t, net.ParseIP(records[0].Host), "Invalid IP address %q", records[0].Host)
return records[0].Host
}
func assertNoDNSForHeadlessService(t *testing.T, kd *KubeDNS, s *v1.Service) {
records, err := kd.Records(getServiceFQDN(kd.domain, s), false)
require.Error(t, err)
assert.Equal(t, 0, len(records))
}
func assertNoDNSForExternalService(t *testing.T, kd *KubeDNS, s *v1.Service) {
records, err := kd.Records(getServiceFQDN(kd.domain, s), false)
require.Error(t, err)
assert.Equal(t, 0, len(records))
}
func assertSRVForNamedPort(t *testing.T, kd *KubeDNS, s *v1.Service, portName string) {
records, err := kd.Records(getSRVFQDN(kd, s, portName), false)
require.NoError(t, err)
assert.Equal(t, 1, len(records))
assert.Equal(t, getServiceFQDN(kd.domain, s), records[0].Host)
}
func assertNoSRVForNamedPort(t *testing.T, kd *KubeDNS, s *v1.Service, portName string) {
records, err := kd.Records(getSRVFQDN(kd, s, portName), false)
require.Error(t, err)
assert.Equal(t, 0, len(records))
}
func assertNoDNSForClusterIP(t *testing.T, kd *KubeDNS, s *v1.Service) {
serviceFQDN := getServiceFQDN(kd.domain, s)
queries := getEquivalentQueries(serviceFQDN, s.Namespace)
for _, query := range queries {
records, err := kd.Records(query, false)
require.Error(t, err)
assert.Equal(t, 0, len(records))
}
}
func assertDNSForClusterIP(t *testing.T, kd *KubeDNS, s *v1.Service) {
serviceFQDN := getServiceFQDN(kd.domain, s)
queries := getEquivalentQueries(serviceFQDN, s.Namespace)
for _, query := range queries {
records, err := kd.Records(query, false)
require.NoError(t, err)
assert.Equal(t, 1, len(records))
assert.Equal(t, s.Spec.ClusterIP, records[0].Host)
}
}
func assertReverseRecord(t *testing.T, kd *KubeDNS, s *v1.Service) {
segments := util.ReverseArray(strings.Split(s.Spec.ClusterIP, "."))
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), util.ArpaSuffix)
reverseRecord, err := kd.ReverseRecord(reverseLookup)
require.NoError(t, err)
assert.Equal(t, getServiceFQDN(kd.domain, s), reverseRecord.Host)
}
func assertNoReverseRecord(t *testing.T, kd *KubeDNS, s *v1.Service) {
segments := util.ReverseArray(strings.Split(s.Spec.ClusterIP, "."))
reverseLookup := fmt.Sprintf("%s%s", strings.Join(segments, "."), util.ArpaSuffix)
reverseRecord, err := kd.ReverseRecord(reverseLookup)
require.Error(t, err)
require.Nil(t, reverseRecord)
}
func getEquivalentQueries(serviceFQDN, namespace string) []string {
return []string{
serviceFQDN,
strings.Replace(serviceFQDN, ".svc.", ".*.", 1),
strings.Replace(serviceFQDN, namespace, "*", 1),
strings.Replace(strings.Replace(serviceFQDN, namespace, "*", 1), ".svc.", ".*.", 1),
"*." + serviceFQDN,
}
}
func getFederationServiceFQDN(kd *KubeDNS, s *v1.Service, federationName string) string {
return fmt.Sprintf("%s.%s.%s.svc.%s", s.Name, s.Namespace, federationName, kd.domain)
}
func getEndpointsFQDN(kd *KubeDNS, e *v1.Endpoints) string {
return fmt.Sprintf("%s.%s.svc.%s", e.Name, e.Namespace, kd.domain)
}
func getSRVFQDN(kd *KubeDNS, s *v1.Service, portName string) string {
return fmt.Sprintf("_%s._tcp.%s.%s.svc.%s", portName, s.Name, s.Namespace, kd.domain)
}

View File

@ -1,23 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package DNS provides a backend for the skydns DNS server started by the
// kubedns cluster addon. It exposes the 2 interface method: Records and
// ReverseRecord, which skydns invokes according to the DNS queries it
// receives. It serves these records by consulting an in memory tree
// populated with Kubernetes Services and Endpoints received from the Kubernetes
// API server.
package dns // import "k8s.io/kubernetes/pkg/dns"

View File

@ -1,37 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["federation.go"],
tags = ["automanaged"],
deps = ["//pkg/util/validation:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["federation_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//vendor:github.com/stretchr/testify/assert"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,75 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Fed contains federation specific DNS code.
package fed
import (
"errors"
"fmt"
"strings"
"k8s.io/kubernetes/pkg/util/validation"
)
var ErrExpectedKeyEqualsValue = errors.New("invalid format, must be key=value")
// ParseFederationsFlag parses the federations command line flag. The
// flag is a comma-separated list of zero or more "name=label" pairs,
// e.g. "a=b,c=d".
func ParseFederationsFlag(str string, federations map[string]string) error {
if strings.TrimSpace(str) == "" {
return nil
}
for _, val := range strings.Split(str, ",") {
splits := strings.SplitN(strings.TrimSpace(val), "=", 2)
if len(splits) != 2 {
return ErrExpectedKeyEqualsValue
}
name := strings.TrimSpace(splits[0])
domain := strings.TrimSpace(splits[1])
if err := ValidateName(name); err != nil {
return err
}
if err := ValidateDomain(domain); err != nil {
return err
}
federations[name] = domain
}
return nil
}
// ValidateName checks the validity of a federation name.
func ValidateName(name string) error {
if errs := validation.IsDNS1123Label(name); len(errs) != 0 {
return fmt.Errorf("%q not a valid federation name: %q", name, errs)
}
return nil
}
// ValidateDomain checks the validity of a federation label.
func ValidateDomain(name string) error {
// The federation domain name need not strictly be domain names, we
// accept valid dns names with subdomain components.
if errs := validation.IsDNS1123Subdomain(name); len(errs) != 0 {
return fmt.Errorf("%q not a valid domain name: %q", name, errs)
}
return nil
}

View File

@ -1,76 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fed
import (
"github.com/stretchr/testify/assert"
"reflect"
"testing"
)
func TestParseFederationsFlag(t *testing.T) {
type TestCase struct {
input string
hasError bool
expected map[string]string
}
for _, testCase := range []TestCase{
{input: "", expected: make(map[string]string)},
{input: "a=b", expected: map[string]string{"a": "b"}},
{input: "a=b,cc=dd", expected: map[string]string{"a": "b", "cc": "dd"}},
{input: "abc=d.e.f", expected: map[string]string{"abc": "d.e.f"}},
{input: "ccdd", hasError: true},
{input: "a=b,ccdd", hasError: true},
{input: "-", hasError: true},
{input: "a.b.c=d.e.f", hasError: true},
} {
output := make(map[string]string)
err := ParseFederationsFlag(testCase.input, output)
if !testCase.hasError {
assert.Nil(t, err, "unexpected err", testCase)
assert.True(t, reflect.DeepEqual(
testCase.expected, output), output, testCase)
} else {
assert.NotNil(t, err, testCase)
}
}
}
func TestValidateName(t *testing.T) {
// More complete testing is done in validation.IsDNS1123Label. These
// tests are to catch issues specific to the implementation of
// kube-dns.
assert.NotNil(t, ValidateName(""))
assert.NotNil(t, ValidateName("."))
assert.NotNil(t, ValidateName("ab.cd"))
assert.Nil(t, ValidateName("abcd"))
}
func TestValidateDomain(t *testing.T) {
// More complete testing is done in
// validation.IsDNS1123Subdomain. These tests are to catch issues
// specific to the implementation of kube-dns.
assert.NotNil(t, ValidateDomain(""))
assert.NotNil(t, ValidateDomain("."))
assert.Nil(t, ValidateDomain("ab.cd"))
assert.Nil(t, ValidateDomain("abcd"))
assert.Nil(t, ValidateDomain("a.b.c.d"))
}

View File

@ -1,37 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["treecache.go"],
tags = ["automanaged"],
deps = ["//vendor:github.com/skynetservices/skydns/msg"],
)
go_test(
name = "go_default_test",
srcs = ["treecache_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = ["//vendor:github.com/skynetservices/skydns/msg"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,213 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package treecache
import (
"encoding/json"
"strings"
skymsg "github.com/skynetservices/skydns/msg"
)
type TreeCache interface {
// GetEntry with the given key for the given path.
GetEntry(key string, path ...string) (interface{}, bool)
// Get a list of values including wildcards labels (e.g. "*").
GetValuesForPathWithWildcards(path ...string) []*skymsg.Service
// SetEntry creates the entire path if it doesn't already exist in
// the cache, then sets the given service record under the given
// key. The path this entry would have occupied in an etcd datastore
// is computed from the given fqdn and stored as the "Key" of the
// skydns service; this is only required because skydns expects the
// service record to contain a key in a specific format (presumably
// for legacy compatibility). Note that the fqnd string typically
// contains both the key and all elements in the path.
SetEntry(key string, val *skymsg.Service, fqdn string, path ...string)
// SetSubCache inserts the given subtree under the given
// path:key. Usually the key is the name of a Kubernetes Service,
// and the path maps to the cluster subdomains matching the Service.
SetSubCache(key string, subCache TreeCache, path ...string)
// DeletePath removes all entries associated with a given path.
DeletePath(path ...string) bool
// Serialize dumps a JSON representation of the cache.
Serialize() (string, error)
}
type treeCache struct {
ChildNodes map[string]*treeCache
Entries map[string]interface{}
}
func NewTreeCache() TreeCache {
return &treeCache{
ChildNodes: make(map[string]*treeCache),
Entries: make(map[string]interface{}),
}
}
func (cache *treeCache) Serialize() (string, error) {
prettyJSON, err := json.MarshalIndent(cache, "", "\t")
if err != nil {
return "", err
}
return string(prettyJSON), nil
}
func (cache *treeCache) SetEntry(key string, val *skymsg.Service, fqdn string, path ...string) {
// TODO: Consolidate setEntry and setSubCache into a single method with a
// type switch.
// TODO: Instead of passing the fqdn as an argument, we can reconstruct
// it from the path, provided callers always pass the full path to the
// object. This is currently *not* the case, since callers first create
// a new, empty node, populate it, then parent it under the right path.
// So we don't know the full key till the final parenting operation.
node := cache.ensureChildNode(path...)
// This key is used to construct the "target" for SRV record lookups.
// For normal service/endpoint lookups, this will result in a key like:
// /skydns/local/cluster/svc/svcNS/svcName/record-hash
// but for headless services that govern pods requesting a specific
// hostname (as used by petset), this will end up being:
// /skydns/local/cluster/svc/svcNS/svcName/pod-hostname
val.Key = skymsg.Path(fqdn)
node.Entries[key] = val
}
func (cache *treeCache) getSubCache(path ...string) *treeCache {
childCache := cache
for _, subpath := range path {
childCache = childCache.ChildNodes[subpath]
if childCache == nil {
return nil
}
}
return childCache
}
func (cache *treeCache) SetSubCache(key string, subCache TreeCache, path ...string) {
node := cache.ensureChildNode(path...)
node.ChildNodes[key] = subCache.(*treeCache)
}
func (cache *treeCache) GetEntry(key string, path ...string) (interface{}, bool) {
childNode := cache.getSubCache(path...)
if childNode == nil {
return nil, false
}
val, ok := childNode.Entries[key]
return val, ok
}
func (cache *treeCache) GetValuesForPathWithWildcards(path ...string) []*skymsg.Service {
retval := []*skymsg.Service{}
nodesToExplore := []*treeCache{cache}
for idx, subpath := range path {
nextNodesToExplore := []*treeCache{}
if idx == len(path)-1 {
// if path ends on an entry, instead of a child node, add the entry
for _, node := range nodesToExplore {
if subpath == "*" {
nextNodesToExplore = append(nextNodesToExplore, node)
} else {
if val, ok := node.Entries[subpath]; ok {
retval = append(retval, val.(*skymsg.Service))
} else {
childNode := node.ChildNodes[subpath]
if childNode != nil {
nextNodesToExplore = append(nextNodesToExplore, childNode)
}
}
}
}
nodesToExplore = nextNodesToExplore
break
}
if subpath == "*" {
for _, node := range nodesToExplore {
for subkey, subnode := range node.ChildNodes {
if !strings.HasPrefix(subkey, "_") {
nextNodesToExplore = append(nextNodesToExplore, subnode)
}
}
}
} else {
for _, node := range nodesToExplore {
childNode := node.ChildNodes[subpath]
if childNode != nil {
nextNodesToExplore = append(nextNodesToExplore, childNode)
}
}
}
nodesToExplore = nextNodesToExplore
}
for _, node := range nodesToExplore {
for _, val := range node.Entries {
retval = append(retval, val.(*skymsg.Service))
}
}
return retval
}
func (cache *treeCache) DeletePath(path ...string) bool {
if len(path) == 0 {
return false
}
if parentNode := cache.getSubCache(path[:len(path)-1]...); parentNode != nil {
name := path[len(path)-1]
if _, ok := parentNode.ChildNodes[name]; ok {
delete(parentNode.ChildNodes, name)
return true
}
// ExternalName services are stored with their name as the leaf key
if _, ok := parentNode.Entries[name]; ok {
delete(parentNode.Entries, name)
return true
}
}
return false
}
func (cache *treeCache) appendValues(recursive bool, ref [][]interface{}) {
for _, value := range cache.Entries {
ref[0] = append(ref[0], value)
}
if recursive {
for _, node := range cache.ChildNodes {
node.appendValues(recursive, ref)
}
}
}
func (cache *treeCache) ensureChildNode(path ...string) *treeCache {
childNode := cache
for _, subpath := range path {
newNode, ok := childNode.ChildNodes[subpath]
if !ok {
newNode = NewTreeCache().(*treeCache)
childNode.ChildNodes[subpath] = newNode
}
childNode = newNode
}
return childNode
}

View File

@ -1,161 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package treecache
import (
"testing"
"github.com/skynetservices/skydns/msg"
)
func TestTreeCache(t *testing.T) {
tc := NewTreeCache()
{
_, ok := tc.GetEntry("key1", "p1", "p2")
if ok {
t.Errorf("key should not exist")
}
}
checkExists := func(key string, expectedSvc *msg.Service, path ...string) {
svc, ok := tc.GetEntry(key, path...)
if !ok {
t.Fatalf("key %v should exist", key)
}
if svc := svc.(*msg.Service); svc != nil {
if svc != expectedSvc {
t.Errorf("value is not correct (%v != %v)", svc, expectedSvc)
}
} else {
t.Errorf("entry is not of the right type: %T", svc)
}
}
setEntryTC := []struct {
key string
svc *msg.Service
fqdn string
path []string
}{
{"key1", &msg.Service{}, "key1.p2.p1.", []string{"p1", "p2"}},
{"key2", &msg.Service{}, "key2.p2.p1.", []string{"p1", "p2"}},
{"key3", &msg.Service{}, "key3.p2.p1.", []string{"p1", "p3"}},
}
for _, testCase := range setEntryTC {
tc.SetEntry(testCase.key, testCase.svc, testCase.fqdn, testCase.path...)
checkExists(testCase.key, testCase.svc, testCase.path...)
}
wildcardTC := []struct {
path []string
count int
}{
{[]string{"p1"}, 0},
{[]string{"p1", "p2"}, 2},
{[]string{"p1", "p3"}, 1},
{[]string{"p1", "p2", "key1"}, 1},
{[]string{"p1", "p2", "key2"}, 1},
{[]string{"p1", "p2", "key3"}, 0},
{[]string{"p1", "p3", "key3"}, 1},
{[]string{"p1", "p2", "*"}, 2},
{[]string{"p1", "*", "*"}, 3},
}
for _, testCase := range wildcardTC {
services := tc.GetValuesForPathWithWildcards(testCase.path...)
if len(services) != testCase.count {
t.Fatalf("Expected %v services for path %v, got %v",
testCase.count, testCase.path, len(services))
}
}
// Delete some paths
if !tc.DeletePath("p1", "p2") {
t.Fatal("should delete path p2.p1.")
}
if _, ok := tc.GetEntry("key3", "p1", "p3"); !ok {
t.Error("should not affect p3.p1.")
}
if tc.DeletePath("p1", "p2") {
t.Fatalf("should not be able to delete p2.p1")
}
if !tc.DeletePath("p1", "p3") {
t.Fatalf("should be able to delete p3.p1")
}
if tc.DeletePath("p1", "p3") {
t.Fatalf("should not be able to delete p3.t1")
}
for _, testCase := range []struct {
k string
p []string
}{
{"key1", []string{"p1", "p2"}},
{"key2", []string{"p1", "p2"}},
{"key3", []string{"p1", "p3"}},
} {
if _, ok := tc.GetEntry(testCase.k, testCase.p...); ok {
t.Error("path should not exist")
}
}
}
func TestTreeCacheSetSubCache(t *testing.T) {
tc := NewTreeCache()
m := &msg.Service{}
branch := NewTreeCache()
branch.SetEntry("key1", m, "key", "p2")
tc.SetSubCache("p1", branch, "p0")
if _, ok := tc.GetEntry("key1", "p0", "p1", "p2"); !ok {
t.Errorf("should be able to get entry p0.p1.p2.key1")
}
}
func TestTreeCacheSerialize(t *testing.T) {
tc := NewTreeCache()
tc.SetEntry("key1", &msg.Service{}, "key1.p2.p1.", "p1", "p2")
const expected = `{
"ChildNodes": {
"p1": {
"ChildNodes": {
"p2": {
"ChildNodes": {},
"Entries": {
"key1": {}
}
}
},
"Entries": {}
}
},
"Entries": {}
}`
actual, err := tc.Serialize()
if err != nil {
}
if actual != expected {
t.Errorf("expected %q, got %q", expected, actual)
}
}

View File

@ -1,31 +0,0 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["util.go"],
tags = ["automanaged"],
deps = [
"//vendor:github.com/golang/glog",
"//vendor:github.com/skynetservices/skydns/msg",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -1,89 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"fmt"
"hash/fnv"
"strings"
"github.com/golang/glog"
"github.com/skynetservices/skydns/msg"
)
const (
// ArpaSuffix is the standard suffix for PTR IP reverse lookups.
ArpaSuffix = ".in-addr.arpa."
// defaultPriority used for service records
defaultPriority = 10
// defaultWeight used for service records
defaultWeight = 10
// defaultTTL used for service records
defaultTTL = 30
)
// extractIP turns a standard PTR reverse record lookup name
// into an IP address
func ExtractIP(reverseName string) (string, bool) {
if !strings.HasSuffix(reverseName, ArpaSuffix) {
return "", false
}
search := strings.TrimSuffix(reverseName, ArpaSuffix)
// reverse the segments and then combine them
segments := ReverseArray(strings.Split(search, "."))
return strings.Join(segments, "."), true
}
// ReverseArray reverses an array.
func ReverseArray(arr []string) []string {
for i := 0; i < len(arr)/2; i++ {
j := len(arr) - i - 1
arr[i], arr[j] = arr[j], arr[i]
}
return arr
}
// Returns record in a format that SkyDNS understands.
// Also return the hash of the record.
func GetSkyMsg(ip string, port int) (*msg.Service, string) {
msg := NewServiceRecord(ip, port)
hash := HashServiceRecord(msg)
glog.V(5).Infof("Constructed new DNS record: %s, hash:%s",
fmt.Sprintf("%v", msg), hash)
return msg, fmt.Sprintf("%x", hash)
}
// NewServiceRecord creates a new service DNS message.
func NewServiceRecord(ip string, port int) *msg.Service {
return &msg.Service{
Host: ip,
Port: port,
Priority: defaultPriority,
Weight: defaultWeight,
Ttl: defaultTTL,
}
}
// HashServiceRecord hashes the string representation of a DNS
// message.
func HashServiceRecord(msg *msg.Service) string {
s := fmt.Sprintf("%v", msg)
h := fnv.New32a()
h.Write([]byte(s))
return fmt.Sprintf("%x", h.Sum32())
}