From 19cf8310866291edb2f0a77fe541e7ad8e936837 Mon Sep 17 00:00:00 2001 From: Zihong Zheng Date: Mon, 24 Oct 2016 10:45:02 -0700 Subject: [PATCH] kubedns: use initial resource listing as ready signal --- cmd/kube-dns/app/options/options.go | 19 ++++++---- cmd/kube-dns/app/server.go | 16 ++++----- hack/verify-flags/known-flags.txt | 1 + pkg/dns/dns.go | 56 +++++++++++++---------------- 4 files changed, 45 insertions(+), 47 deletions(-) diff --git a/cmd/kube-dns/app/options/options.go b/cmd/kube-dns/app/options/options.go index c2a1d527ad4..60908dbed21 100644 --- a/cmd/kube-dns/app/options/options.go +++ b/cmd/kube-dns/app/options/options.go @@ -23,6 +23,7 @@ import ( "net/url" "os" "strings" + "time" "github.com/spf13/pflag" "k8s.io/kubernetes/pkg/api" @@ -31,9 +32,10 @@ import ( ) type KubeDNSConfig struct { - ClusterDomain string - KubeConfigFile string - KubeMasterURL string + ClusterDomain string + KubeConfigFile string + KubeMasterURL string + InitialSyncTimeout time.Duration HealthzPort int DNSBindAddress string @@ -47,10 +49,11 @@ type KubeDNSConfig struct { func NewKubeDNSConfig() *KubeDNSConfig { return &KubeDNSConfig{ - ClusterDomain: "cluster.local.", - HealthzPort: 8081, - DNSBindAddress: "0.0.0.0", - DNSPort: 53, + ClusterDomain: "cluster.local.", + HealthzPort: 8081, + DNSBindAddress: "0.0.0.0", + DNSPort: 53, + InitialSyncTimeout: 60 * time.Second, Federations: make(map[string]string), @@ -160,4 +163,6 @@ func (s *KubeDNSConfig) AddFlags(fs *pflag.FlagSet) { "config-map name. If empty, then the config-map will not used. Cannot be "+ " used in conjunction with federations flag. config-map contains "+ "dynamically adjustable configuration.") + fs.DurationVar(&s.InitialSyncTimeout, "initial-sync-timeout", s.InitialSyncTimeout, + "Timeout for initial resource sync.") } diff --git a/cmd/kube-dns/app/server.go b/cmd/kube-dns/app/server.go index f3d5fc370f7..2b98f55ca18 100644 --- a/cmd/kube-dns/app/server.go +++ b/cmd/kube-dns/app/server.go @@ -47,17 +47,11 @@ type KubeDNSServer struct { } func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer { - ks := KubeDNSServer{domain: config.ClusterDomain} - kubeClient, err := newKubeClient(config) if err != nil { glog.Fatalf("Failed to create a kubernetes client: %v", err) } - ks.healthzPort = config.HealthzPort - ks.dnsBindAddress = config.DNSBindAddress - ks.dnsPort = config.DNSPort - var configSync dnsconfig.Sync if config.ConfigMap == "" { glog.V(0).Infof("ConfigMap not configured, using values from command line flags") @@ -70,9 +64,13 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer { kubeClient, config.ConfigMapNs, config.ConfigMap) } - ks.kd = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, configSync) - - return &ks + return &KubeDNSServer{ + domain: config.ClusterDomain, + healthzPort: config.HealthzPort, + dnsBindAddress: config.DNSBindAddress, + dnsPort: config.DNSPort, + kd: kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.InitialSyncTimeout, configSync), + } } // TODO: evaluate using pkg/client/clientcmd diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 54ac50c9b47..3bed392aa5c 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -275,6 +275,7 @@ image-service-endpoint include-extended-apis include-extended-apis included-types-overrides +initial-sync-timeout input-base input-dirs insecure-allow-any-token diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index 88d74fc96bf..316e6e1acb4 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -24,9 +24,6 @@ import ( "sync" "time" - etcd "github.com/coreos/etcd/client" - "github.com/miekg/dns" - skymsg "github.com/skynetservices/skydns/msg" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1/endpoints" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" @@ -40,7 +37,10 @@ import ( "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" + etcd "github.com/coreos/etcd/client" "github.com/golang/glog" + "github.com/miekg/dns" + skymsg "github.com/skynetservices/skydns/msg" ) const ( @@ -117,9 +117,12 @@ type KubeDNS struct { configLock sync.RWMutex // configSync manages synchronization of the config map configSync config.Sync + + // Initial timeout for endpoints and services to be synced from APIServer + initialSyncTimeout time.Duration } -func NewKubeDNS(client clientset.Interface, clusterDomain string, configSync config.Sync) *KubeDNS { +func NewKubeDNS(client clientset.Interface, clusterDomain string, timeout time.Duration, configSync config.Sync) *KubeDNS { kd := &KubeDNS{ kubeClient: client, domain: clusterDomain, @@ -129,6 +132,7 @@ func NewKubeDNS(client clientset.Interface, clusterDomain string, configSync con reverseRecordMap: make(map[string]*skymsg.Service), clusterIPServiceMap: make(map[string]*v1.Service), domainPath: util.ReverseArray(strings.Split(strings.TrimRight(clusterDomain, "."), ".")), + initialSyncTimeout: timeout, configLock: sync.RWMutex{}, configSync: configSync, @@ -149,38 +153,28 @@ func (kd *KubeDNS) Start() { kd.startConfigMapSync() - // Wait synchronously for the Kubernetes service. This ensures that - // the Start function returns only after having received Service - // objects from APIServer. - // - // TODO: we might not have to wait for kubernetes service - // specifically. We should just wait for a list operation to be - // complete from APIServer. - kd.waitForKubernetesService() + // Wait synchronously for the initial list operations to be + // complete of endpoints and services from APIServer. + kd.waitForResourceSyncedOrDie() } -func (kd *KubeDNS) waitForKubernetesService() { - glog.V(2).Infof("Waiting for Kubernetes service") - - const kubernetesSvcName = "kubernetes" - const servicePollInterval = 1 * time.Second - - name := fmt.Sprintf("%v/%v", v1.NamespaceDefault, kubernetesSvcName) - glog.V(2).Infof("Waiting for service: %v", name) - +func (kd *KubeDNS) waitForResourceSyncedOrDie() { + // Wait for both controllers have completed an initial resource listing + timeout := time.After(kd.initialSyncTimeout) + ticker := time.NewTicker(500 * time.Millisecond) + defer ticker.Stop() for { - svc, err := kd.kubeClient.Core().Services(v1.NamespaceDefault).Get(kubernetesSvcName) - if err != nil || svc == nil { - glog.V(3).Infof( - "Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.", - name, err, servicePollInterval) - time.Sleep(servicePollInterval) - continue + select { + case <-timeout: + glog.Fatalf("Timeout waiting for initialization") + case <-ticker.C: + if kd.endpointsController.HasSynced() && kd.serviceController.HasSynced() { + glog.V(0).Infof("Initialized services and endpoints from apiserver") + return + } + glog.V(0).Infof("DNS server not ready, retry in 500 milliseconds") } - break } - - return } func (kd *KubeDNS) startConfigMapSync() {