Merge pull request #36775 from bowei/kube-dns-config-map

Automatic merge from submit-queue

Add limited config-map support to kube-dns

This is an integration bugfix for https://github.com/kubernetes/kubernetes/issues/36194

```release-note
kube-dns

Added --config-map and --config-map-namespace command line options. 
If --config-map is set, kube-dns will load dynamic configuration from the config map 
referenced by --config-map-namespace, --config-map. The config-map supports
the following properties: "federations".

--federations flag is now deprecated. Prefer to set federations via the config-map.
Federations can be configured by settings the "federations" field to the value currently 
set in the command line.

Example:

  kind: ConfigMap
  apiVersion: v1
  metadata:
    name: kube-dns
    namespace: kube-system
  data:
    federations: abc=def
```
This commit is contained in:
Kubernetes Submit Queue 2016-11-18 23:03:54 -08:00 committed by GitHub
commit d725b3e3cd
26 changed files with 1269 additions and 158 deletions

View File

@ -22,7 +22,7 @@
# Default registry, arch and tag. This can be overwritten by arguments to make # Default registry, arch and tag. This can be overwritten by arguments to make
PLATFORM?=linux PLATFORM?=linux
ARCH?=amd64 ARCH?=amd64
TAG?=1.8 TAG?=1.9
REGISTRY?=gcr.io/google_containers REGISTRY?=gcr.io/google_containers
GOLANG_VERSION=1.6 GOLANG_VERSION=1.6

View File

@ -44,7 +44,7 @@ spec:
spec: spec:
containers: containers:
- name: kubedns - name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.8 image: gcr.io/google_containers/kubedns-amd64:1.9
resources: resources:
# TODO: Set memory limits when we've profiled the container for large # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in # clusters, then set request = limit to keep this container in
@ -76,6 +76,7 @@ spec:
args: args:
- --domain=__PILLAR__DNS__DOMAIN__. - --domain=__PILLAR__DNS__DOMAIN__.
- --dns-port=10053 - --dns-port=10053
- --config-map=kube-dns
# This should be set to v=2 only after the new image (cut from 1.5) has # This should be set to v=2 only after the new image (cut from 1.5) has
# been released, otherwise we will flood the logs. # been released, otherwise we will flood the logs.
- --v=0 - --v=0

View File

@ -44,7 +44,7 @@ spec:
spec: spec:
containers: containers:
- name: kubedns - name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.8 image: gcr.io/google_containers/kubedns-amd64:1.9
resources: resources:
# TODO: Set memory limits when we've profiled the container for large # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in # clusters, then set request = limit to keep this container in
@ -76,6 +76,7 @@ spec:
args: args:
- --domain={{ pillar['dns_domain'] }}. - --domain={{ pillar['dns_domain'] }}.
- --dns-port=10053 - --dns-port=10053
- --config-map=kube-dns
# This should be set to v=2 only after the new image (cut from 1.5) has # This should be set to v=2 only after the new image (cut from 1.5) has
# been released, otherwise we will flood the logs. # been released, otherwise we will flood the logs.
- --v=0 - --v=0

View File

@ -44,7 +44,7 @@ spec:
spec: spec:
containers: containers:
- name: kubedns - name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.8 image: gcr.io/google_containers/kubedns-amd64:1.9
resources: resources:
# TODO: Set memory limits when we've profiled the container for large # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in # clusters, then set request = limit to keep this container in
@ -76,6 +76,7 @@ spec:
args: args:
- --domain=$DNS_DOMAIN. - --domain=$DNS_DOMAIN.
- --dns-port=10053 - --dns-port=10053
- --config-map=kube-dns
# This should be set to v=2 only after the new image (cut from 1.5) has # This should be set to v=2 only after the new image (cut from 1.5) has
# been released, otherwise we will flood the logs. # been released, otherwise we will flood the logs.
- --v=0 - --v=0

View File

@ -24,7 +24,7 @@ spec:
spec: spec:
containers: containers:
- name: kubedns - name: kubedns
image: gcr.io/google_containers/kubedns-amd64:1.8 image: gcr.io/google_containers/kubedns-amd64:1.9
resources: resources:
# TODO: Set memory limits when we've profiled the container for large # TODO: Set memory limits when we've profiled the container for large
# clusters, then set request = limit to keep this container in # clusters, then set request = limit to keep this container in

View File

@ -21,6 +21,7 @@ go_library(
"//pkg/client/restclient:go_default_library", "//pkg/client/restclient:go_default_library",
"//pkg/client/unversioned/clientcmd:go_default_library", "//pkg/client/unversioned/clientcmd:go_default_library",
"//pkg/dns:go_default_library", "//pkg/dns:go_default_library",
"//pkg/dns/config:go_default_library",
"//vendor:github.com/golang/glog", "//vendor:github.com/golang/glog",
"//vendor:github.com/skynetservices/skydns/metrics", "//vendor:github.com/skynetservices/skydns/metrics",
"//vendor:github.com/skynetservices/skydns/server", "//vendor:github.com/skynetservices/skydns/server",

View File

@ -15,6 +15,8 @@ go_library(
srcs = ["options.go"], srcs = ["options.go"],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//pkg/api:go_default_library",
"//pkg/dns/federation:go_default_library",
"//pkg/util/validation:go_default_library", "//pkg/util/validation:go_default_library",
"//vendor:github.com/spf13/pflag", "//vendor:github.com/spf13/pflag",
], ],

View File

@ -18,14 +18,15 @@ limitations under the License.
package options package options
import ( import (
"net/url"
"os"
"fmt" "fmt"
_ "net/http/pprof" _ "net/http/pprof"
"net/url"
"os"
"strings" "strings"
"github.com/spf13/pflag" "github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/api"
fed "k8s.io/kubernetes/pkg/dns/federation"
"k8s.io/kubernetes/pkg/util/validation" "k8s.io/kubernetes/pkg/util/validation"
) )
@ -33,22 +34,28 @@ type KubeDNSConfig struct {
ClusterDomain string ClusterDomain string
KubeConfigFile string KubeConfigFile string
KubeMasterURL string KubeMasterURL string
HealthzPort int HealthzPort int
DNSBindAddress string DNSBindAddress string
DNSPort int DNSPort int
// Federations maps federation names to their registered domain names.
Federations map[string]string Federations map[string]string
ConfigMapNs string
ConfigMap string
} }
func NewKubeDNSConfig() *KubeDNSConfig { func NewKubeDNSConfig() *KubeDNSConfig {
return &KubeDNSConfig{ return &KubeDNSConfig{
ClusterDomain: "cluster.local.", ClusterDomain: "cluster.local.",
KubeConfigFile: "",
KubeMasterURL: "",
HealthzPort: 8081, HealthzPort: 8081,
DNSBindAddress: "0.0.0.0", DNSBindAddress: "0.0.0.0",
DNSPort: 53, DNSPort: 53,
Federations: make(map[string]string),
Federations: make(map[string]string),
ConfigMapNs: api.NamespaceSystem,
ConfigMap: "", // default to using command line flags
} }
} }
@ -107,25 +114,8 @@ type federationsVar struct {
nameDomainMap map[string]string nameDomainMap map[string]string
} }
// Set deserializes the input string in the format
// "myfederation1=example.com,myfederation2=second.example.com,myfederation3=example.com"
// into a map of key-value pairs of federation names to domain names.
func (fv federationsVar) Set(keyVal string) error { func (fv federationsVar) Set(keyVal string) error {
for _, val := range strings.Split(keyVal, ",") { return fed.ParseFederationsFlag(keyVal, fv.nameDomainMap)
splits := strings.SplitN(strings.TrimSpace(val), "=", 2)
name := strings.TrimSpace(splits[0])
domain := strings.TrimSpace(splits[1])
if errs := validation.IsDNS1123Label(name); len(errs) != 0 {
return fmt.Errorf("%q not a valid federation name: %q", name, errs)
}
// The federation domain name need not strictly be domain names, we
// accept valid dns names with subdomain components.
if errs := validation.IsDNS1123Subdomain(domain); len(errs) != 0 {
return fmt.Errorf("%q not a valid domain name: %q", domain, errs)
}
fv.nameDomainMap[name] = domain
}
return nil
} }
func (fv federationsVar) String() string { func (fv federationsVar) String() string {
@ -141,11 +131,33 @@ func (fv federationsVar) Type() string {
} }
func (s *KubeDNSConfig) AddFlags(fs *pflag.FlagSet) { func (s *KubeDNSConfig) AddFlags(fs *pflag.FlagSet) {
fs.Var(clusterDomainVar{&s.ClusterDomain}, "domain", "domain under which to create names") fs.Var(clusterDomainVar{&s.ClusterDomain}, "domain",
fs.StringVar(&s.KubeConfigFile, "kubecfg-file", s.KubeConfigFile, "Location of kubecfg file for access to kubernetes master service; --kube-master-url overrides the URL part of this; if neither this nor --kube-master-url are provided, defaults to service account tokens") "domain under which to create names")
fs.Var(kubeMasterURLVar{&s.KubeMasterURL}, "kube-master-url", "URL to reach kubernetes master. Env variables in this flag will be expanded.")
fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort, "port on which to serve a kube-dns HTTP readiness probe.") fs.StringVar(&s.KubeConfigFile, "kubecfg-file", s.KubeConfigFile,
fs.StringVar(&s.DNSBindAddress, "dns-bind-address", s.DNSBindAddress, "address on which to serve DNS requests.") "Location of kubecfg file for access to kubernetes master service;"+
" --kube-master-url overrides the URL part of this; if neither this nor"+
" --kube-master-url are provided, defaults to service account tokens")
fs.Var(kubeMasterURLVar{&s.KubeMasterURL}, "kube-master-url",
"URL to reach kubernetes master. Env variables in this flag will be expanded.")
fs.IntVar(&s.HealthzPort, "healthz-port", s.HealthzPort,
"port on which to serve a kube-dns HTTP readiness probe.")
fs.StringVar(&s.DNSBindAddress, "dns-bind-address", s.DNSBindAddress,
"address on which to serve DNS requests.")
fs.IntVar(&s.DNSPort, "dns-port", s.DNSPort, "port on which to serve DNS requests.") fs.IntVar(&s.DNSPort, "dns-port", s.DNSPort, "port on which to serve DNS requests.")
fs.Var(federationsVar{s.Federations}, "federations", "a comma separated list of the federation names and their corresponding domain names to which this cluster belongs. Example: \"myfederation1=example.com,myfederation2=example2.com,myfederation3=example.com\"")
fs.Var(federationsVar{s.Federations}, "federations",
"a comma separated list of the federation names and their corresponding"+
" domain names to which this cluster belongs. Example:"+
" \"myfederation1=example.com,myfederation2=example2.com,myfederation3=example.com\"."+
" It is an error to set both the federations and config-map flags.")
fs.MarkDeprecated("federations", "use config-map instead. Will be removed in future version")
fs.StringVar(&s.ConfigMapNs, "config-map-namespace", s.ConfigMapNs,
"namespace for the config-map")
fs.StringVar(&s.ConfigMap, "config-map", s.ConfigMap,
"config-map name. If empty, then the config-map will not used. Cannot be "+
" used in conjunction with federations flag. config-map contains "+
"dynamically adjustable configuration.")
} }

View File

@ -34,6 +34,7 @@ import (
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
kdns "k8s.io/kubernetes/pkg/dns" kdns "k8s.io/kubernetes/pkg/dns"
dnsConfig "k8s.io/kubernetes/pkg/dns/config"
) )
type KubeDNSServer struct { type KubeDNSServer struct {
@ -52,13 +53,25 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
if err != nil { if err != nil {
glog.Fatalf("Failed to create a kubernetes client: %v", err) glog.Fatalf("Failed to create a kubernetes client: %v", err)
} }
ks.healthzPort = config.HealthzPort ks.healthzPort = config.HealthzPort
ks.dnsBindAddress = config.DNSBindAddress ks.dnsBindAddress = config.DNSBindAddress
ks.dnsPort = config.DNSPort ks.dnsPort = config.DNSPort
ks.kd, err = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.Federations)
if err != nil { var configSync dnsConfig.Sync
glog.Fatalf("Failed to start kubeDNS: %v", err) if config.ConfigMap == "" {
glog.V(0).Infof("ConfigMap not configured, using values from command line flags")
configSync = dnsConfig.NewNopSync(
&dnsConfig.Config{Federations: config.Federations})
} else {
glog.V(0).Infof("Using configuration read from ConfigMap: %v:%v",
config.ConfigMapNs, config.ConfigMap)
configSync = dnsConfig.NewSync(
kubeClient, config.ConfigMapNs, config.ConfigMap)
} }
ks.kd = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, configSync)
return &ks return &ks
} }

View File

@ -91,6 +91,8 @@ concurrent-replicaset-syncs
concurrent-resource-quota-syncs concurrent-resource-quota-syncs
concurrent-serviceaccount-token-syncs concurrent-serviceaccount-token-syncs
concurrent-service-syncs concurrent-service-syncs
config-map
config-map-namespace
config-sync-period config-sync-period
configure-cloud-routes configure-cloud-routes
conntrack-max conntrack-max

View File

@ -23,6 +23,7 @@ go_library(
"//pkg/api/unversioned:go_default_library", "//pkg/api/unversioned:go_default_library",
"//pkg/client/cache:go_default_library", "//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library", "//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/dns/config:go_default_library",
"//pkg/dns/treecache:go_default_library", "//pkg/dns/treecache:go_default_library",
"//pkg/dns/util:go_default_library", "//pkg/dns/util:go_default_library",
"//pkg/runtime:go_default_library", "//pkg/runtime:go_default_library",
@ -47,6 +48,7 @@ go_test(
"//pkg/api/unversioned:go_default_library", "//pkg/api/unversioned:go_default_library",
"//pkg/client/cache:go_default_library", "//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/internalclientset/fake:go_default_library", "//pkg/client/clientset_generated/internalclientset/fake:go_default_library",
"//pkg/dns/config:go_default_library",
"//pkg/dns/treecache:go_default_library", "//pkg/dns/treecache:go_default_library",
"//pkg/dns/util:go_default_library", "//pkg/dns/util:go_default_library",
"//pkg/util/sets:go_default_library", "//pkg/util/sets:go_default_library",

42
pkg/dns/config/BUILD Normal file
View File

@ -0,0 +1,42 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = [
"config.go",
"mocksync.go",
"nopsync.go",
"sync.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/client/cache:go_default_library",
"//pkg/client/clientset_generated/internalclientset:go_default_library",
"//pkg/dns/federation:go_default_library",
"//pkg/fields:go_default_library",
"//pkg/runtime:go_default_library",
"//pkg/watch:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/client-go/pkg/util/wait",
],
)
go_test(
name = "go_default_test",
srcs = ["config_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = ["//vendor:github.com/stretchr/testify/assert"],
)

65
pkg/dns/config/config.go Normal file
View File

@ -0,0 +1,65 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
types "k8s.io/kubernetes/pkg/api/unversioned"
fed "k8s.io/kubernetes/pkg/dns/federation"
)
// Config populated either from the configuration source (command
// line flags or via the config map mechanism).
type Config struct {
// The inclusion of TypeMeta is to ensure future compatibility if the
// Config object was populated directly via a Kubernetes API mechanism.
//
// For example, instead of the custom implementation here, the
// configuration could be obtained from an API that unifies
// command-line flags, config-map, etc mechanisms.
types.TypeMeta
// Map of federation names that the cluster in which this kube-dns
// is running belongs to, to the corresponding domain names.
Federations map[string]string `json:"federations"`
}
func NewDefaultConfig() *Config {
return &Config{
Federations: make(map[string]string),
}
}
// IsValid returns whether or not the configuration is valid.
func (config *Config) Validate() error {
if err := config.validateFederations(); err != nil {
return err
}
return nil
}
func (config *Config) validateFederations() error {
for name, domain := range config.Federations {
if err := fed.ValidateName(name); err != nil {
return err
}
if err := fed.ValidateDomain(domain); err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,55 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"github.com/stretchr/testify/assert"
"testing"
)
func TestValidate(t *testing.T) {
for _, testCase := range []struct {
config *Config
hasError bool
}{
{
config: &Config{Federations: map[string]string{}},
},
{
config: &Config{
Federations: map[string]string{
"abc": "d.e.f",
},
},
},
{
config: &Config{
Federations: map[string]string{
"a.b": "cdef",
},
},
hasError: true,
},
} {
err := testCase.config.Validate()
if !testCase.hasError {
assert.Nil(t, err, "should be valid", testCase)
} else {
assert.NotNil(t, err, "should not be valid", testCase)
}
}
}

View File

@ -0,0 +1,46 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
// MockSync is a testing mock.
type MockSync struct {
// Config that will be returned from Once().
Config *Config
// Error that will be returned from Once().
Error error
// Chan to send new configurations on.
Chan chan *Config
}
var _ Sync = (*MockSync)(nil)
func NewMockSync(config *Config, error error) *MockSync {
return &MockSync{
Config: config,
Error: error,
Chan: make(chan *Config),
}
}
func (sync *MockSync) Once() (*Config, error) {
return sync.Config, sync.Error
}
func (sync *MockSync) Periodic() <-chan *Config {
return sync.Chan
}

37
pkg/dns/config/nopsync.go Normal file
View File

@ -0,0 +1,37 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
// nopSync does no synchronization, used when the DNS server is
// started without a ConfigMap configured.
type nopSync struct {
config *Config
}
var _ Sync = (*nopSync)(nil)
func NewNopSync(config *Config) Sync {
return &nopSync{config: config}
}
func (sync *nopSync) Once() (*Config, error) {
return sync.config, nil
}
func (sync *nopSync) Periodic() <-chan *Config {
return make(chan *Config)
}

201
pkg/dns/config/sync.go Normal file
View File

@ -0,0 +1,201 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"k8s.io/client-go/pkg/util/wait"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
fed "k8s.io/kubernetes/pkg/dns/federation"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"time"
"github.com/golang/glog"
)
// Sync manages synchronization of the config map.
type Sync interface {
// Once does a blocking synchronization of the config map. If the
// ConfigMap fails to validate, this method will return nil, err.
Once() (*Config, error)
// Start a periodic synchronization of the configuration map. When a
// successful configuration map update is detected, the
// configuration will be sent to the channel.
//
// It is an error to call this more than once.
Periodic() <-chan *Config
}
// NewSync for ConfigMap from namespace `ns` and `name`.
func NewSync(client clientset.Interface, ns string, name string) Sync {
sync := &kubeSync{
ns: ns,
name: name,
client: client,
channel: make(chan *Config),
}
listWatch := &cache.ListWatch{
ListFunc: func(options api.ListOptions) (runtime.Object, error) {
options.FieldSelector = fields.Set{"metadata.name": name}.AsSelector()
return client.Core().ConfigMaps(ns).List(options)
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
options.FieldSelector = fields.Set{"metadata.name": name}.AsSelector()
return client.Core().ConfigMaps(ns).Watch(options)
},
}
store, controller := cache.NewInformer(
listWatch,
&api.ConfigMap{},
time.Duration(0),
cache.ResourceEventHandlerFuncs{
AddFunc: sync.onAdd,
DeleteFunc: sync.onDelete,
UpdateFunc: sync.onUpdate,
})
sync.store = store
sync.controller = controller
return sync
}
// kubeSync implements Sync for the Kubernetes API.
type kubeSync struct {
ns string
name string
client clientset.Interface
store cache.Store
controller *cache.Controller
channel chan *Config
latestVersion string
}
var _ Sync = (*kubeSync)(nil)
func (sync *kubeSync) Once() (*Config, error) {
cm, err := sync.client.Core().ConfigMaps(sync.ns).Get(sync.name)
if err != nil {
glog.Errorf("Error getting ConfigMap %v:%v err: %v",
sync.ns, sync.name, err)
return nil, err
}
config, _, err := sync.processUpdate(cm)
return config, err
}
func (sync *kubeSync) Periodic() <-chan *Config {
go sync.controller.Run(wait.NeverStop)
return sync.channel
}
func (sync *kubeSync) toConfigMap(obj interface{}) *api.ConfigMap {
cm, ok := obj.(*api.ConfigMap)
if !ok {
glog.Fatalf("Expected ConfigMap, got %T", obj)
}
return cm
}
func (sync *kubeSync) onAdd(obj interface{}) {
cm := sync.toConfigMap(obj)
glog.V(2).Infof("ConfigMap %s:%s was created", sync.ns, sync.name)
config, updated, err := sync.processUpdate(cm)
if updated && err == nil {
sync.channel <- config
}
}
func (sync *kubeSync) onDelete(_ interface{}) {
glog.V(2).Infof("ConfigMap %s:%s was deleted, reverting to default configuration",
sync.ns, sync.name)
sync.latestVersion = ""
sync.channel <- NewDefaultConfig()
}
func (sync *kubeSync) onUpdate(_, obj interface{}) {
cm := sync.toConfigMap(obj)
glog.V(2).Infof("ConfigMap %s:%s was updated", sync.ns, sync.name)
config, changed, err := sync.processUpdate(cm)
if changed && err == nil {
sync.channel <- config
}
}
func (sync *kubeSync) processUpdate(cm *api.ConfigMap) (config *Config, changed bool, err error) {
glog.V(4).Infof("processUpdate ConfigMap %+v", *cm)
if cm.ObjectMeta.ResourceVersion != sync.latestVersion {
glog.V(3).Infof("Updating config to version %v (was %v)",
cm.ObjectMeta.ResourceVersion, sync.latestVersion)
changed = true
sync.latestVersion = cm.ObjectMeta.ResourceVersion
} else {
glog.V(4).Infof("Config was unchanged (version %v)", sync.latestVersion)
return
}
config = &Config{}
if err = sync.updateFederations(cm, config); err != nil {
glog.Errorf("Invalid configuration, ignoring update")
return
}
if err = config.Validate(); err != nil {
glog.Errorf("Invalid onfiguration: %v (value was %+v), ignoring update",
err, config)
config = nil
return
}
return
}
func (sync *kubeSync) updateFederations(cm *api.ConfigMap, config *Config) (err error) {
if flagValue, ok := cm.Data["federations"]; ok {
config.Federations = make(map[string]string)
if err = fed.ParseFederationsFlag(flagValue, config.Federations); err != nil {
glog.Errorf("Invalid federations value: %v (value was %q)",
err, cm.Data["federations"])
return
}
glog.V(2).Infof("Updated federations to %v", config.Federations)
} else {
glog.V(2).Infof("No federations present")
}
return
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
kcache "k8s.io/kubernetes/pkg/client/cache" kcache "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/dns/config"
"k8s.io/kubernetes/pkg/dns/treecache" "k8s.io/kubernetes/pkg/dns/treecache"
"k8s.io/kubernetes/pkg/dns/util" "k8s.io/kubernetes/pkg/dns/util"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
@ -43,8 +44,6 @@ import (
) )
const ( const (
kubernetesSvcName = "kubernetes"
// A subdomain added to the user specified domain for all services. // A subdomain added to the user specified domain for all services.
serviceSubdomain = "svc" serviceSubdomain = "svc"
@ -67,77 +66,78 @@ type KubeDNS struct {
// to get Endpoints and Service objects. // to get Endpoints and Service objects.
kubeClient clientset.Interface kubeClient clientset.Interface
// The domain for which this DNS Server is authoritative. // domain for which this DNS Server is authoritative.
domain string domain string
// configMap where kube-dns dynamic configuration is store. If this
// is empty then getting configuration from a configMap will be
// disabled.
configMap string
// A cache that contains all the endpoints in the system. // endpointsStore that contains all the endpoints in the system.
endpointsStore kcache.Store endpointsStore kcache.Store
// servicesStore that contains all the services in the system.
// A cache that contains all the services in the system.
servicesStore kcache.Store servicesStore kcache.Store
// nodesStore contains some subset of nodes in the system so that we
// can retrieve the cluster zone annotation from the cached node
// instead of getting it from the API server every time.
nodesStore kcache.Store
// stores DNS records for the domain. // cache stores DNS records for the domain. A Records and SRV Records for
// A Records and SRV Records for (regular) services and headless Services. // (regular) services and headless Services. CNAME Records for
// CNAME Records for ExternalName Services. // ExternalName Services.
cache treecache.TreeCache cache treecache.TreeCache
// TODO(nikhiljindal): Remove this. It can be recreated using
// TODO(nikhiljindal): Remove this. It can be recreated using clusterIPServiceMap. // clusterIPServiceMap.
reverseRecordMap map[string]*skymsg.Service reverseRecordMap map[string]*skymsg.Service
// clusterIPServiceMap to service object. Headless services are not
// Map of cluster IP to service object. Headless services are not part of this map. // part of this map. Used to get a service when given its cluster
// Used to get a service when given its cluster IP. // IP. Access to this is coordinated using cacheLock. We use the
// Access to this is coordinated using cacheLock. We use the same lock for cache and this map // same lock for cache and this map to ensure that they don't get
// to ensure that they don't get out of sync. // out of sync.
clusterIPServiceMap map[string]*kapi.Service clusterIPServiceMap map[string]*kapi.Service
// cacheLock protecting the cache. caller is responsible for using
// caller is responsible for using the cacheLock before invoking methods on cache // the cacheLock before invoking methods on cache the cache is not
// the cache is not thread-safe, and the caller can guarantee thread safety by using // thread-safe, and the caller can guarantee thread safety by using
// the cacheLock // the cacheLock
cacheLock sync.RWMutex cacheLock sync.RWMutex
// The domain for which this DNS Server is authoritative, in array format and reversed. // The domain for which this DNS Server is authoritative, in array
// e.g. if domain is "cluster.local", domainPath is []string{"local", "cluster"} // format and reversed. e.g. if domain is "cluster.local",
// domainPath is []string{"local", "cluster"}
domainPath []string domainPath []string
// endpointsController invokes registered callbacks when endpoints change. // endpointsController invokes registered callbacks when endpoints change.
endpointsController *kcache.Controller endpointsController *kcache.Controller
// serviceController invokes registered callbacks when services change. // serviceController invokes registered callbacks when services change.
serviceController *kcache.Controller serviceController *kcache.Controller
// Map of federation names that the cluster in which this kube-dns is running belongs to, to // config set from the dynamic configuration source.
// the corresponding domain names. config *config.Config
federations map[string]string // configLock protects the config below.
configLock sync.RWMutex
// A TTL cache that contains some subset of nodes in the system so that we can retrieve the // configSync manages synchronization of the config map
// cluster zone annotation from the cached node instead of getting it from the API server configSync config.Sync
// every time.
nodesStore kcache.Store
} }
func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) (*KubeDNS, error) { func NewKubeDNS(client clientset.Interface, clusterDomain string, configSync config.Sync) *KubeDNS {
// Verify that federation names should not contain dots ('.')
// We can not allow dots since we use that as separator for path segments (svcname.nsname.fedname.svc.domain)
for key := range federations {
if strings.ContainsAny(key, ".") {
return nil, fmt.Errorf("invalid federation name: %s, cannot have '.'", key)
}
}
kd := &KubeDNS{ kd := &KubeDNS{
kubeClient: client, kubeClient: client,
domain: domain, domain: clusterDomain,
cache: treecache.NewTreeCache(), cache: treecache.NewTreeCache(),
cacheLock: sync.RWMutex{}, cacheLock: sync.RWMutex{},
nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc), nodesStore: kcache.NewStore(kcache.MetaNamespaceKeyFunc),
reverseRecordMap: make(map[string]*skymsg.Service), reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*kapi.Service), clusterIPServiceMap: make(map[string]*kapi.Service),
domainPath: util.ReverseArray(strings.Split(strings.TrimRight(domain, "."), ".")), domainPath: util.ReverseArray(strings.Split(strings.TrimRight(clusterDomain, "."), ".")),
federations: federations,
configLock: sync.RWMutex{},
configSync: configSync,
} }
kd.setEndpointsStore() kd.setEndpointsStore()
kd.setServicesStore() kd.setServicesStore()
return kd, nil return kd
} }
func (kd *KubeDNS) Start() { func (kd *KubeDNS) Start() {
@ -147,25 +147,29 @@ func (kd *KubeDNS) Start() {
glog.V(2).Infof("Starting serviceController") glog.V(2).Infof("Starting serviceController")
go kd.serviceController.Run(wait.NeverStop) go kd.serviceController.Run(wait.NeverStop)
// Wait synchronously for the Kubernetes service and add a DNS kd.startConfigMapSync()
// record for it. This ensures that the Start function returns only
// after having received Service objects from APIServer. // Wait synchronously for the Kubernetes service. This ensures that
// the Start function returns only after having received Service
// objects from APIServer.
// //
// TODO: we might not have to wait for kubernetes service // TODO: we might not have to wait for kubernetes service
// specifically. We should just wait for a list operation to be // specifically. We should just wait for a list operation to be
// complete from APIServer. // complete from APIServer.
glog.V(2).Infof("Waiting for Kubernetes service")
kd.waitForKubernetesService() kd.waitForKubernetesService()
} }
func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) { func (kd *KubeDNS) waitForKubernetesService() {
glog.V(2).Infof("Waiting for Kubernetes service")
const kubernetesSvcName = "kubernetes"
const servicePollInterval = 1 * time.Second
name := fmt.Sprintf("%v/%v", kapi.NamespaceDefault, kubernetesSvcName) name := fmt.Sprintf("%v/%v", kapi.NamespaceDefault, kubernetesSvcName)
glog.V(2).Infof("Waiting for service: %v", name) glog.V(2).Infof("Waiting for service: %v", name)
var err error
servicePollInterval := 1 * time.Second
for { for {
svc, err = kd.kubeClient.Core().Services(kapi.NamespaceDefault).Get(kubernetesSvcName) svc, err := kd.kubeClient.Core().Services(kapi.NamespaceDefault).Get(kubernetesSvcName)
if err != nil || svc == nil { if err != nil || svc == nil {
glog.V(3).Infof( glog.V(3).Infof(
"Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.", "Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.",
@ -179,6 +183,30 @@ func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) {
return return
} }
func (kd *KubeDNS) startConfigMapSync() {
initialConfig, err := kd.configSync.Once()
if err != nil {
glog.Errorf(
"Error getting initial ConfigMap: %v, starting with default values", err)
kd.config = config.NewDefaultConfig()
} else {
kd.config = initialConfig
}
go kd.syncConfigMap(kd.configSync.Periodic())
}
func (kd *KubeDNS) syncConfigMap(syncChan <-chan *config.Config) {
for {
nextConfig := <-syncChan
kd.configLock.Lock()
kd.config = nextConfig
glog.V(2).Infof("Configuration updated: %+v", *kd.config)
kd.configLock.Unlock()
}
}
func (kd *KubeDNS) GetCacheAsJSON() (string, error) { func (kd *KubeDNS) GetCacheAsJSON() (string, error) {
kd.cacheLock.RLock() kd.cacheLock.RLock()
defer kd.cacheLock.RUnlock() defer kd.cacheLock.RUnlock()
@ -528,6 +556,9 @@ func (kd *KubeDNS) recordsForFederation(records []skymsg.Service, path []string,
// We know that a headless service has endpoints for sure if a // We know that a headless service has endpoints for sure if a
// record was returned for it. The record contains endpoint // record was returned for it. The record contains endpoint
// IPs. So nothing to check for headless services. // IPs. So nothing to check for headless services.
//
// TODO: this access to the cluster IP map does not seem to be
// threadsafe.
if !kd.isHeadlessServiceRecord(&val) { if !kd.isHeadlessServiceRecord(&val) {
ok, err := kd.serviceWithClusterIPHasEndpoints(&val) ok, err := kd.serviceWithClusterIPHasEndpoints(&val)
if err != nil { if err != nil {
@ -740,10 +771,15 @@ func (kd *KubeDNS) isFederationQuery(path []string) bool {
return false return false
} }
} }
if _, ok := kd.federations[path[2]]; !ok {
glog.V(4).Infof("Not a federation query: kd.federations[%q] not found", path[2]) kd.configLock.RLock()
defer kd.configLock.RUnlock()
if _, ok := kd.config.Federations[path[2]]; !ok {
glog.V(4).Infof("Not a federation query: label %q not found", path[2])
return false return false
} }
return true return true
} }
@ -775,7 +811,10 @@ func (kd *KubeDNS) federationRecords(queryPath []string) ([]skymsg.Service, erro
// We have already established that the map entry exists for the given federation, // We have already established that the map entry exists for the given federation,
// we just need to retrieve the domain name, validate it and append it to the path. // we just need to retrieve the domain name, validate it and append it to the path.
domain := kd.federations[path[2]] kd.configLock.RLock()
domain := kd.config.Federations[path[2]]
kd.configLock.RUnlock()
// We accept valid subdomains as well, so just let all the valid subdomains. // We accept valid subdomains as well, so just let all the valid subdomains.
if len(validation.IsDNS1123Subdomain(domain)) != 0 { if len(validation.IsDNS1123Subdomain(domain)) != 0 {
return nil, fmt.Errorf("%s is not a valid domain name for federation %s", domain, path[2]) return nil, fmt.Errorf("%s is not a valid domain name for federation %s", domain, path[2])

View File

@ -20,9 +20,11 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"net" "net"
"reflect"
"strings" "strings"
"sync" "sync"
"testing" "testing"
"time"
etcd "github.com/coreos/etcd/client" etcd "github.com/coreos/etcd/client"
"github.com/miekg/dns" "github.com/miekg/dns"
@ -35,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
fake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" fake "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/dns/config"
"k8s.io/kubernetes/pkg/dns/treecache" "k8s.io/kubernetes/pkg/dns/treecache"
"k8s.io/kubernetes/pkg/dns/util" "k8s.io/kubernetes/pkg/dns/util"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
@ -48,25 +51,22 @@ const (
) )
func newKubeDNS() *KubeDNS { func newKubeDNS() *KubeDNS {
kd := &KubeDNS{ return &KubeDNS{
domain: testDomain, domain: testDomain,
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc), domainPath: util.ReverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
cache: treecache.NewTreeCache(), cache: treecache.NewTreeCache(),
reverseRecordMap: make(map[string]*skymsg.Service), reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*kapi.Service), clusterIPServiceMap: make(map[string]*kapi.Service),
cacheLock: sync.RWMutex{}, cacheLock: sync.RWMutex{},
domainPath: util.ReverseArray(strings.Split(strings.TrimRight(testDomain, "."), ".")),
nodesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
}
return kd
}
func TestNewKubeDNS(t *testing.T) { config: config.NewDefaultConfig(),
// Verify that it returns an error for invalid federation names. configLock: sync.RWMutex{},
_, err := NewKubeDNS(nil, "domainName", map[string]string{"invalid.name.with.dot": "example.come"}) configSync: config.NewNopSync(config.NewDefaultConfig()),
if err == nil {
t.Errorf("Expected an error due to invalid federation name")
} }
} }
@ -384,93 +384,95 @@ func verifyRecord(q, a string, t *testing.T, kd *KubeDNS) {
assert.Equal(t, a, records[0].Host) assert.Equal(t, a, records[0].Host)
} }
// Verifies that quering KubeDNS for a headless federation service returns the DNS hostname when a local service does not exist and returns the endpoint IP when a local service exists. const federatedServiceFQDN = "testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com."
// Verifies that querying KubeDNS for a headless federation service
// returns the DNS hostname when a local service does not exist and
// returns the endpoint IP when a local service exists.
func TestFederationHeadlessService(t *testing.T) { func TestFederationHeadlessService(t *testing.T) {
kd := newKubeDNS() kd := newKubeDNS()
kd.federations = map[string]string{ kd.config.Federations = map[string]string{
"myfederation": "example.com", "myfederation": "example.com",
} }
kd.kubeClient = fake.NewSimpleClientset(newNodes()) kd.kubeClient = fake.NewSimpleClientset(newNodes())
// Verify that quering for federation service returns a federation domain name. // Verify that querying for federation service returns a federation domain name.
verifyRecord("testservice.default.myfederation.svc.cluster.local.", verifyRecord("testservice.default.myfederation.svc.cluster.local.",
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", federatedServiceFQDN, t, kd)
t, kd)
// Add a local service without any endpoint. // Add a local service without any endpoint.
s := newHeadlessService() s := newHeadlessService()
assert.NoError(t, kd.servicesStore.Add(s)) assert.NoError(t, kd.servicesStore.Add(s))
kd.newService(s) kd.newService(s)
// Verify that quering for federation service still returns the federation domain name. // Verify that querying for federation service still returns the federation domain name.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", federatedServiceFQDN, t, kd)
t, kd)
// Now add an endpoint. // Now add an endpoint.
endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1")) endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1"))
assert.NoError(t, kd.endpointsStore.Add(endpoints)) assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.updateService(s, s) kd.updateService(s, s)
// Verify that quering for federation service returns the local service domain name this time. // Verify that querying for federation service returns the local service domain name this time.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), "testservice.default.svc.cluster.local.", t, kd) verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
"testservice.default.svc.cluster.local.", t, kd)
// Delete the endpoint. // Delete the endpoint.
endpoints.Subsets = []kapi.EndpointSubset{} endpoints.Subsets = []kapi.EndpointSubset{}
kd.handleEndpointAdd(endpoints) kd.handleEndpointAdd(endpoints)
kd.updateService(s, s) kd.updateService(s, s)
// Verify that quering for federation service returns the federation domain name again. // Verify that querying for federation service returns the federation domain name again.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", federatedServiceFQDN, t, kd)
t, kd)
} }
// Verifies that quering KubeDNS for a federation service returns the DNS hostname if no endpoint exists and returns the local cluster IP if endpoints exist. // Verifies that querying KubeDNS for a federation service returns the
// DNS hostname if no endpoint exists and returns the local cluster IP
// if endpoints exist.
func TestFederationService(t *testing.T) { func TestFederationService(t *testing.T) {
kd := newKubeDNS() kd := newKubeDNS()
kd.federations = map[string]string{ kd.config.Federations = map[string]string{
"myfederation": "example.com", "myfederation": "example.com",
} }
kd.kubeClient = fake.NewSimpleClientset(newNodes()) kd.kubeClient = fake.NewSimpleClientset(newNodes())
// Verify that quering for federation service returns the federation domain name. // Verify that querying for federation service returns the federation domain name.
verifyRecord("testservice.default.myfederation.svc.cluster.local.", verifyRecord("testservice.default.myfederation.svc.cluster.local.",
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", federatedServiceFQDN, t, kd)
t, kd)
// Add a local service without any endpoint. // Add a local service without any endpoint.
s := newService(testNamespace, testService, "1.2.3.4", "", 80) s := newService(testNamespace, testService, "1.2.3.4", "", 80)
assert.NoError(t, kd.servicesStore.Add(s)) assert.NoError(t, kd.servicesStore.Add(s))
kd.newService(s) kd.newService(s)
// Verify that quering for federation service still returns the federation domain name. // Verify that querying for federation service still returns the federation domain name.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", federatedServiceFQDN, t, kd)
t, kd)
// Now add an endpoint. // Now add an endpoint.
endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1")) endpoints := newEndpoints(s, newSubsetWithOnePort("", 80, "10.0.0.1"))
assert.NoError(t, kd.endpointsStore.Add(endpoints)) assert.NoError(t, kd.endpointsStore.Add(endpoints))
kd.updateService(s, s) kd.updateService(s, s)
// Verify that quering for federation service returns the local service domain name this time. // Verify that querying for federation service returns the local service domain name this time.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), "testservice.default.svc.cluster.local.", t, kd) verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
"testservice.default.svc.cluster.local.", t, kd)
// Remove the endpoint. // Remove the endpoint.
endpoints.Subsets = []kapi.EndpointSubset{} endpoints.Subsets = []kapi.EndpointSubset{}
kd.handleEndpointAdd(endpoints) kd.handleEndpointAdd(endpoints)
kd.updateService(s, s) kd.updateService(s, s)
// Verify that quering for federation service returns the federation domain name again. // Verify that querying for federation service returns the federation domain name again.
verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"), verifyRecord(getFederationServiceFQDN(kd, s, "myfederation"),
"testservice.default.myfederation.svc.testcontinent-testreg-testzone.testcontinent-testreg.example.com.", federatedServiceFQDN, t, kd)
t, kd)
} }
func TestFederationQueryWithoutCache(t *testing.T) { func TestFederationQueryWithoutCache(t *testing.T) {
kd := newKubeDNS() kd := newKubeDNS()
kd.federations = map[string]string{ kd.config.Federations = map[string]string{
"myfederation": "example.com", "myfederation": "example.com",
"secondfederation": "second.example.com", "secondfederation": "second.example.com",
} }
@ -482,7 +484,7 @@ func TestFederationQueryWithoutCache(t *testing.T) {
func TestFederationQueryWithCache(t *testing.T) { func TestFederationQueryWithCache(t *testing.T) {
kd := newKubeDNS() kd := newKubeDNS()
kd.federations = map[string]string{ kd.config.Federations = map[string]string{
"myfederation": "example.com", "myfederation": "example.com",
"secondfederation": "second.example.com", "secondfederation": "second.example.com",
} }
@ -530,12 +532,64 @@ func testInvalidFederationQueries(t *testing.T, kd *KubeDNS) {
t.Errorf("expected not found error, got nil") t.Errorf("expected not found error, got nil")
} }
if etcdErr, ok := err.(etcd.Error); !ok || etcdErr.Code != etcd.ErrorCodeKeyNotFound { if etcdErr, ok := err.(etcd.Error); !ok || etcdErr.Code != etcd.ErrorCodeKeyNotFound {
t.Errorf("expected not found error, got %v", etcdErr) t.Errorf("expected not found error, got %v", err)
} }
assert.Equal(t, 0, len(records)) assert.Equal(t, 0, len(records))
} }
} }
func checkConfigEqual(t *testing.T, kd *KubeDNS, expected *config.Config) {
const timeout = time.Duration(5)
start := time.Now()
ok := false
for time.Since(start) < timeout*time.Second {
kd.configLock.RLock()
isEqual := reflect.DeepEqual(expected.Federations, kd.config.Federations)
kd.configLock.RUnlock()
if isEqual {
ok = true
break
}
}
if !ok {
t.Errorf("Federations should be %v, got %v",
expected.Federations, kd.config.Federations)
}
}
func TestConfigSync(t *testing.T) {
kd := newKubeDNS()
mockSync := config.NewMockSync(
&config.Config{Federations: make(map[string]string)}, nil)
kd.configSync = mockSync
kd.startConfigMapSync()
checkConfigEqual(t, kd, &config.Config{Federations: make(map[string]string)})
// update
mockSync.Chan <- &config.Config{Federations: map[string]string{"name1": "domain1"}}
checkConfigEqual(t, kd, &config.Config{Federations: map[string]string{"name1": "domain1"}})
// update
mockSync.Chan <- &config.Config{Federations: map[string]string{"name2": "domain2"}}
checkConfigEqual(t, kd, &config.Config{Federations: map[string]string{"name2": "domain2"}})
}
func TestConfigSyncInitialMap(t *testing.T) {
// start with different initial map
kd := newKubeDNS()
mockSync := config.NewMockSync(
&config.Config{Federations: map[string]string{"name3": "domain3"}}, nil)
kd.configSync = mockSync
kd.startConfigMapSync()
checkConfigEqual(t, kd, &config.Config{Federations: map[string]string{"name3": "domain3"}})
}
func newNodes() *kapi.NodeList { func newNodes() *kapi.NodeList {
return &kapi.NodeList{ return &kapi.NodeList{
Items: []kapi.Node{ Items: []kapi.Node{

26
pkg/dns/federation/BUILD Normal file
View File

@ -0,0 +1,26 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = ["federation.go"],
tags = ["automanaged"],
deps = ["//pkg/util/validation:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = ["federation_test.go"],
library = "go_default_library",
tags = ["automanaged"],
deps = ["//vendor:github.com/stretchr/testify/assert"],
)

View File

@ -0,0 +1,75 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Fed contains federation specific DNS code.
package fed
import (
"errors"
"fmt"
"strings"
"k8s.io/kubernetes/pkg/util/validation"
)
var ErrExpectedKeyEqualsValue = errors.New("invalid format, must be key=value")
// ParseFederationsFlag parses the federations command line flag. The
// flag is a comma-separated list of zero or more "name=label" pairs,
// e.g. "a=b,c=d".
func ParseFederationsFlag(str string, federations map[string]string) error {
if strings.TrimSpace(str) == "" {
return nil
}
for _, val := range strings.Split(str, ",") {
splits := strings.SplitN(strings.TrimSpace(val), "=", 2)
if len(splits) != 2 {
return ErrExpectedKeyEqualsValue
}
name := strings.TrimSpace(splits[0])
domain := strings.TrimSpace(splits[1])
if err := ValidateName(name); err != nil {
return err
}
if err := ValidateDomain(domain); err != nil {
return err
}
federations[name] = domain
}
return nil
}
// ValidateName checks the validity of a federation name.
func ValidateName(name string) error {
if errs := validation.IsDNS1123Label(name); len(errs) != 0 {
return fmt.Errorf("%q not a valid federation name: %q", name, errs)
}
return nil
}
// ValidateDomain checks the validity of a federation label.
func ValidateDomain(name string) error {
// The federation domain name need not strictly be domain names, we
// accept valid dns names with subdomain components.
if errs := validation.IsDNS1123Subdomain(name); len(errs) != 0 {
return fmt.Errorf("%q not a valid domain name: %q", name, errs)
}
return nil
}

View File

@ -0,0 +1,76 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fed
import (
"github.com/stretchr/testify/assert"
"reflect"
"testing"
)
func TestParseFederationsFlag(t *testing.T) {
type TestCase struct {
input string
hasError bool
expected map[string]string
}
for _, testCase := range []TestCase{
{input: "", expected: make(map[string]string)},
{input: "a=b", expected: map[string]string{"a": "b"}},
{input: "a=b,cc=dd", expected: map[string]string{"a": "b", "cc": "dd"}},
{input: "abc=d.e.f", expected: map[string]string{"abc": "d.e.f"}},
{input: "ccdd", hasError: true},
{input: "a=b,ccdd", hasError: true},
{input: "-", hasError: true},
{input: "a.b.c=d.e.f", hasError: true},
} {
output := make(map[string]string)
err := ParseFederationsFlag(testCase.input, output)
if !testCase.hasError {
assert.Nil(t, err, "unexpected err", testCase)
assert.True(t, reflect.DeepEqual(
testCase.expected, output), output, testCase)
} else {
assert.NotNil(t, err, testCase)
}
}
}
func TestValidateName(t *testing.T) {
// More complete testing is done in validation.IsDNS1123Label. These
// tests are to catch issues specific to the implementation of
// kube-dns.
assert.NotNil(t, ValidateName(""))
assert.NotNil(t, ValidateName("."))
assert.NotNil(t, ValidateName("ab.cd"))
assert.Nil(t, ValidateName("abcd"))
}
func TestValidateDomain(t *testing.T) {
// More complete testing is done in
// validation.IsDNS1123Subdomain. These tests are to catch issues
// specific to the implementation of kube-dns.
assert.NotNil(t, ValidateDomain(""))
assert.NotNil(t, ValidateDomain("."))
assert.Nil(t, ValidateDomain("ab.cd"))
assert.Nil(t, ValidateDomain("abcd"))
assert.Nil(t, ValidateDomain("a.b.c.d"))
}

View File

@ -110,7 +110,7 @@ func TestTreeCache(t *testing.T) {
{"key3", []string{"p1", "p3"}}, {"key3", []string{"p1", "p3"}},
} { } {
if _, ok := tc.GetEntry(testCase.k, testCase.p...); ok { if _, ok := tc.GetEntry(testCase.k, testCase.p...); ok {
t.Error() t.Error("path should not exist")
} }
} }
} }

View File

@ -31,6 +31,7 @@ go_library(
"disruption.go", "disruption.go",
"dns.go", "dns.go",
"dns_autoscaling.go", "dns_autoscaling.go",
"dns_configmap.go",
"e2e.go", "e2e.go",
"empty.go", "empty.go",
"empty_dir_wrapper.go", "empty_dir_wrapper.go",
@ -148,6 +149,7 @@ go_library(
"//pkg/controller/petset:go_default_library", "//pkg/controller/petset:go_default_library",
"//pkg/controller/replicaset:go_default_library", "//pkg/controller/replicaset:go_default_library",
"//pkg/controller/replication:go_default_library", "//pkg/controller/replication:go_default_library",
"//pkg/dns/federation:go_default_library",
"//pkg/fields:go_default_library", "//pkg/fields:go_default_library",
"//pkg/kubectl:go_default_library", "//pkg/kubectl:go_default_library",
"//pkg/kubectl/cmd/util:go_default_library", "//pkg/kubectl/cmd/util:go_default_library",

318
test/e2e/dns_configmap.go Normal file
View File

@ -0,0 +1,318 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"fmt"
"strings"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
fed "k8s.io/kubernetes/pkg/dns/federation"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/test/e2e/framework"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)
type dnsConfigMapTest struct {
f *framework.Framework
c clientset.Interface
ns string
name string
labels []string
cm *api.ConfigMap
isValid bool
dnsPod *api.Pod
utilPod *api.Pod
utilService *api.Service
}
var _ = framework.KubeDescribe("DNS config map", func() {
test := &dnsConfigMapTest{
f: framework.NewDefaultFramework("dns-config-map"),
ns: "kube-system",
name: "kube-dns",
}
BeforeEach(func() {
test.c = test.f.ClientSet
})
It("should be able to change configuration", func() {
test.run()
})
})
func (t *dnsConfigMapTest) init() {
By("Finding a DNS pod")
label := labels.SelectorFromSet(labels.Set(map[string]string{"k8s-app": "kube-dns"}))
options := api.ListOptions{LabelSelector: label}
pods, err := t.f.ClientSet.Core().Pods("kube-system").List(options)
Expect(err).NotTo(HaveOccurred())
Expect(len(pods.Items)).Should(BeNumerically(">=", 1))
t.dnsPod = &pods.Items[0]
framework.Logf("Using DNS pod: %v", t.dnsPod.Name)
}
func (t *dnsConfigMapTest) run() {
t.init()
defer t.c.Core().ConfigMaps(t.ns).Delete(t.name, nil)
t.createUtilPod()
defer t.deleteUtilPod()
t.validate()
t.labels = []string{"abc", "ghi"}
valid1 := map[string]string{"federations": t.labels[0] + "=def"}
valid2 := map[string]string{"federations": t.labels[1] + "=xyz"}
invalid := map[string]string{"federations": "invalid.map=xyz"}
By("empty -> valid1")
t.setConfigMap(&api.ConfigMap{Data: valid1}, true)
t.validate()
By("valid1 -> valid2")
t.setConfigMap(&api.ConfigMap{Data: valid2}, true)
t.validate()
By("valid2 -> invalid")
t.setConfigMap(&api.ConfigMap{Data: invalid}, false)
t.validate()
By("invalid -> valid1")
t.setConfigMap(&api.ConfigMap{Data: valid1}, true)
t.validate()
By("valid1 -> deleted")
t.deleteConfigMap()
t.validate()
By("deleted -> invalid")
t.setConfigMap(&api.ConfigMap{Data: invalid}, false)
t.validate()
}
func (t *dnsConfigMapTest) validate() {
t.validateFederation()
}
func (t *dnsConfigMapTest) validateFederation() {
federations := make(map[string]string)
if t.cm != nil {
err := fed.ParseFederationsFlag(t.cm.Data["federations"], federations)
Expect(err).NotTo(HaveOccurred())
}
if len(federations) == 0 {
By(fmt.Sprintf("Validating federation labels %v do not exist", t.labels))
for _, label := range t.labels {
var federationDNS = fmt.Sprintf("e2e-dns-configmap.%s.%s.svc.cluster.local.",
t.f.Namespace.Name, label)
predicate := func(actual []string) bool {
return len(actual) == 0
}
t.checkDNSRecord(federationDNS, predicate, wait.ForeverTestTimeout)
}
} else {
for label := range federations {
var federationDNS = fmt.Sprintf("%s.%s.%s.svc.cluster.local.",
t.utilService.ObjectMeta.Name, t.f.Namespace.Name, label)
var localDNS = fmt.Sprintf("%s.%s.svc.cluster.local.",
t.utilService.ObjectMeta.Name, t.f.Namespace.Name)
// Check local mapping. Checking a remote mapping requires
// creating an arbitrary DNS record which is not possible at the
// moment.
By(fmt.Sprintf("Validating federation record %v", label))
predicate := func(actual []string) bool {
for _, v := range actual {
if v == localDNS {
return true
}
}
return false
}
t.checkDNSRecord(federationDNS, predicate, wait.ForeverTestTimeout)
}
}
}
func (t *dnsConfigMapTest) checkDNSRecord(name string, predicate func([]string) bool, timeout time.Duration) {
var actual []string
err := wait.PollImmediate(
time.Duration(1)*time.Second,
timeout,
func() (bool, error) {
actual = t.runDig(name)
if predicate(actual) {
return true, nil
}
return false, nil
})
if err != nil {
framework.Logf("dig result did not match: %#v after %v",
actual, timeout)
}
}
// runDig querying for `dnsName`. Returns a list of responses.
func (t *dnsConfigMapTest) runDig(dnsName string) []string {
cmd := []string{
"/usr/bin/dig",
"+short",
"@" + t.dnsPod.Status.PodIP,
"-p", "10053", dnsName,
}
stdout, stderr, err := t.f.ExecWithOptions(framework.ExecOptions{
Command: cmd,
Namespace: t.f.Namespace.Name,
PodName: t.utilPod.Name,
ContainerName: "util",
CaptureStdout: true,
CaptureStderr: true,
})
Expect(err).NotTo(HaveOccurred())
framework.Logf("Running dig: %v, stdout: %q, stderr: %q",
cmd, stdout, stderr)
if stdout == "" {
return []string{}
} else {
return strings.Split(stdout, "\n")
}
}
func (t *dnsConfigMapTest) setConfigMap(cm *api.ConfigMap, isValid bool) {
if isValid {
t.cm = cm
}
t.isValid = isValid
cm.ObjectMeta.Namespace = t.ns
cm.ObjectMeta.Name = t.name
options := api.ListOptions{
FieldSelector: fields.Set{
"metadata.namespace": t.ns,
"metadata.name": t.name,
}.AsSelector(),
}
cmList, err := t.c.Core().ConfigMaps(t.ns).List(options)
Expect(err).NotTo(HaveOccurred())
if len(cmList.Items) == 0 {
By(fmt.Sprintf("Creating the ConfigMap (%s:%s) %+v", t.ns, t.name, *cm))
_, err := t.c.Core().ConfigMaps(t.ns).Create(cm)
Expect(err).NotTo(HaveOccurred())
} else {
By(fmt.Sprintf("Updating the ConfigMap (%s:%s) to %+v", t.ns, t.name, *cm))
_, err := t.c.Core().ConfigMaps(t.ns).Update(cm)
Expect(err).NotTo(HaveOccurred())
}
}
func (t *dnsConfigMapTest) deleteConfigMap() {
By(fmt.Sprintf("Deleting the ConfigMap (%s:%s)", t.ns, t.name))
t.cm = nil
t.isValid = false
err := t.c.Core().ConfigMaps(t.ns).Delete(t.name, nil)
Expect(err).NotTo(HaveOccurred())
}
func (t *dnsConfigMapTest) createUtilPod() {
// Actual port # doesn't matter, just need to exist.
const servicePort = 10101
t.utilPod = &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
},
ObjectMeta: api.ObjectMeta{
Namespace: t.f.Namespace.Name,
Labels: map[string]string{"app": "e2e-dns-configmap"},
GenerateName: "e2e-dns-configmap-",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "util",
Image: "gcr.io/google_containers/dnsutils:e2e",
Command: []string{"sleep", "10000"},
Ports: []api.ContainerPort{
{ContainerPort: servicePort, Protocol: "TCP"},
},
},
},
},
}
var err error
t.utilPod, err = t.c.Core().Pods(t.f.Namespace.Name).Create(t.utilPod)
Expect(err).NotTo(HaveOccurred())
framework.Logf("Created pod %v", t.utilPod)
Expect(t.f.WaitForPodRunning(t.utilPod.Name)).NotTo(HaveOccurred())
t.utilService = &api.Service{
TypeMeta: unversioned.TypeMeta{
Kind: "Service",
},
ObjectMeta: api.ObjectMeta{
Namespace: t.f.Namespace.Name,
Name: "e2e-dns-configmap",
},
Spec: api.ServiceSpec{
Selector: map[string]string{"app": "e2e-dns-configmap"},
Ports: []api.ServicePort{
{
Protocol: "TCP",
Port: servicePort,
TargetPort: intstr.FromInt(servicePort),
},
},
},
}
t.utilService, err = t.c.Core().Services(t.f.Namespace.Name).Create(t.utilService)
Expect(err).NotTo(HaveOccurred())
framework.Logf("Created service %v", t.utilService)
}
func (t *dnsConfigMapTest) deleteUtilPod() {
podClient := t.c.Core().Pods(t.f.Namespace.Name)
if err := podClient.Delete(t.utilPod.Name, api.NewDeleteOptions(0)); err != nil {
framework.Logf("Delete of pod %v:%v failed: %v",
t.utilPod.Namespace, t.utilPod.Name, err)
}
}

View File

@ -30,42 +30,82 @@ import (
. "github.com/onsi/gomega" . "github.com/onsi/gomega"
) )
// ExecCommandInContainer execute a command in the specified container. // ExecOptions passed to ExecWithOptions
// Pass in stdin, tty if needed in the future. type ExecOptions struct {
func (f *Framework) ExecCommandInContainer(podName, containerName string, cmd ...string) string { Command []string
stdout, stderr, err := f.ExecCommandInContainerWithFullOutput(podName, containerName, cmd...)
Logf("Exec stderr: %q", stderr) Namespace string
Expect(err).NotTo(HaveOccurred(), "fail to execute command") PodName string
return stdout ContainerName string
Stdin io.Reader
CaptureStdout bool
CaptureStderr bool
// If false, whitespace in std{err,out} will be removed.
PreserveWhitespace bool
} }
// ExecCommandInContainerWithFullOutput executes a command in the specified container and return stdout, stderr and error // ExecWithOptions executes a command in the specified container,
func (f *Framework) ExecCommandInContainerWithFullOutput(podName, containerName string, cmd ...string) (string, string, error) { // returning stdout, stderr and error. `options` allowed for
Logf("Exec running '%s'", strings.Join(cmd, " ")) // additional parameters to be passed.
func (f *Framework) ExecWithOptions(options ExecOptions) (string, string, error) {
Logf("ExecWithOptions %+v", options)
config, err := LoadConfig() config, err := LoadConfig()
Expect(err).NotTo(HaveOccurred(), "failed to load restclient config") Expect(err).NotTo(HaveOccurred(), "failed to load restclient config")
var stdout, stderr bytes.Buffer
var stdin io.Reader const tty = false
tty := false
req := f.ClientSet.Core().RESTClient().Post(). req := f.ClientSet.Core().RESTClient().Post().
Resource("pods"). Resource("pods").
Name(podName). Name(options.PodName).
Namespace(f.Namespace.Name). Namespace(options.Namespace).
SubResource("exec"). SubResource("exec").
Param("container", containerName) Param("container", options.ContainerName)
req.VersionedParams(&api.PodExecOptions{ req.VersionedParams(&api.PodExecOptions{
Container: containerName, Container: options.ContainerName,
Command: cmd, Command: options.Command,
Stdin: stdin != nil, Stdin: options.Stdin != nil,
Stdout: true, Stdout: options.CaptureStdout,
Stderr: true, Stderr: options.CaptureStderr,
TTY: tty, TTY: tty,
}, api.ParameterCodec) }, api.ParameterCodec)
err = execute("POST", req.URL(), config, stdin, &stdout, &stderr, tty) var stdout, stderr bytes.Buffer
err = execute("POST", req.URL(), config, options.Stdin, &stdout, &stderr, tty)
if options.PreserveWhitespace {
return stdout.String(), stderr.String(), err
}
return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), err return strings.TrimSpace(stdout.String()), strings.TrimSpace(stderr.String()), err
} }
// ExecCommandInContainerWithFullOutput executes a command in the
// specified container and return stdout, stderr and error
func (f *Framework) ExecCommandInContainerWithFullOutput(podName, containerName string, cmd ...string) (string, string, error) {
return f.ExecWithOptions(ExecOptions{
Command: cmd,
Namespace: f.Namespace.Name,
PodName: podName,
ContainerName: containerName,
Stdin: nil,
CaptureStdout: true,
CaptureStderr: true,
PreserveWhitespace: false,
})
}
// ExecCommandInContainer executes a command in the specified container.
func (f *Framework) ExecCommandInContainer(podName, containerName string, cmd ...string) string {
stdout, stderr, err := f.ExecCommandInContainerWithFullOutput(podName, containerName, cmd...)
Logf("Exec stderr: %q", stderr)
Expect(err).NotTo(HaveOccurred(),
"failed to execute command in pod %v, container %v: %v",
podName, containerName, err)
return stdout
}
func (f *Framework) ExecShellInContainer(podName, containerName string, cmd string) string { func (f *Framework) ExecShellInContainer(podName, containerName string, cmd string) string {
return f.ExecCommandInContainer(podName, containerName, "/bin/sh", "-c", cmd) return f.ExecCommandInContainer(podName, containerName, "/bin/sh", "-c", cmd)
} }