kubedns: use initial resource listing as ready signal

This commit is contained in:
Zihong Zheng 2016-10-24 10:45:02 -07:00
parent 8d761a0735
commit 19cf831086
4 changed files with 45 additions and 47 deletions

View File

@ -23,6 +23,7 @@ import (
"net/url"
"os"
"strings"
"time"
"github.com/spf13/pflag"
"k8s.io/kubernetes/pkg/api"
@ -31,9 +32,10 @@ import (
)
type KubeDNSConfig struct {
ClusterDomain string
KubeConfigFile string
KubeMasterURL string
ClusterDomain string
KubeConfigFile string
KubeMasterURL string
InitialSyncTimeout time.Duration
HealthzPort int
DNSBindAddress string
@ -47,10 +49,11 @@ type KubeDNSConfig struct {
func NewKubeDNSConfig() *KubeDNSConfig {
return &KubeDNSConfig{
ClusterDomain: "cluster.local.",
HealthzPort: 8081,
DNSBindAddress: "0.0.0.0",
DNSPort: 53,
ClusterDomain: "cluster.local.",
HealthzPort: 8081,
DNSBindAddress: "0.0.0.0",
DNSPort: 53,
InitialSyncTimeout: 60 * time.Second,
Federations: make(map[string]string),
@ -160,4 +163,6 @@ func (s *KubeDNSConfig) AddFlags(fs *pflag.FlagSet) {
"config-map name. If empty, then the config-map will not used. Cannot be "+
" used in conjunction with federations flag. config-map contains "+
"dynamically adjustable configuration.")
fs.DurationVar(&s.InitialSyncTimeout, "initial-sync-timeout", s.InitialSyncTimeout,
"Timeout for initial resource sync.")
}

View File

@ -47,17 +47,11 @@ type KubeDNSServer struct {
}
func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
ks := KubeDNSServer{domain: config.ClusterDomain}
kubeClient, err := newKubeClient(config)
if err != nil {
glog.Fatalf("Failed to create a kubernetes client: %v", err)
}
ks.healthzPort = config.HealthzPort
ks.dnsBindAddress = config.DNSBindAddress
ks.dnsPort = config.DNSPort
var configSync dnsconfig.Sync
if config.ConfigMap == "" {
glog.V(0).Infof("ConfigMap not configured, using values from command line flags")
@ -70,9 +64,13 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer {
kubeClient, config.ConfigMapNs, config.ConfigMap)
}
ks.kd = kdns.NewKubeDNS(kubeClient, config.ClusterDomain, configSync)
return &ks
return &KubeDNSServer{
domain: config.ClusterDomain,
healthzPort: config.HealthzPort,
dnsBindAddress: config.DNSBindAddress,
dnsPort: config.DNSPort,
kd: kdns.NewKubeDNS(kubeClient, config.ClusterDomain, config.InitialSyncTimeout, configSync),
}
}
// TODO: evaluate using pkg/client/clientcmd

View File

@ -275,6 +275,7 @@ image-service-endpoint
include-extended-apis
include-extended-apis
included-types-overrides
initial-sync-timeout
input-base
input-dirs
insecure-allow-any-token

View File

@ -24,9 +24,6 @@ import (
"sync"
"time"
etcd "github.com/coreos/etcd/client"
"github.com/miekg/dns"
skymsg "github.com/skynetservices/skydns/msg"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/v1/endpoints"
metav1 "k8s.io/kubernetes/pkg/apis/meta/v1"
@ -40,7 +37,10 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
etcd "github.com/coreos/etcd/client"
"github.com/golang/glog"
"github.com/miekg/dns"
skymsg "github.com/skynetservices/skydns/msg"
)
const (
@ -117,9 +117,12 @@ type KubeDNS struct {
configLock sync.RWMutex
// configSync manages synchronization of the config map
configSync config.Sync
// Initial timeout for endpoints and services to be synced from APIServer
initialSyncTimeout time.Duration
}
func NewKubeDNS(client clientset.Interface, clusterDomain string, configSync config.Sync) *KubeDNS {
func NewKubeDNS(client clientset.Interface, clusterDomain string, timeout time.Duration, configSync config.Sync) *KubeDNS {
kd := &KubeDNS{
kubeClient: client,
domain: clusterDomain,
@ -129,6 +132,7 @@ func NewKubeDNS(client clientset.Interface, clusterDomain string, configSync con
reverseRecordMap: make(map[string]*skymsg.Service),
clusterIPServiceMap: make(map[string]*v1.Service),
domainPath: util.ReverseArray(strings.Split(strings.TrimRight(clusterDomain, "."), ".")),
initialSyncTimeout: timeout,
configLock: sync.RWMutex{},
configSync: configSync,
@ -149,38 +153,28 @@ func (kd *KubeDNS) Start() {
kd.startConfigMapSync()
// Wait synchronously for the Kubernetes service. This ensures that
// the Start function returns only after having received Service
// objects from APIServer.
//
// TODO: we might not have to wait for kubernetes service
// specifically. We should just wait for a list operation to be
// complete from APIServer.
kd.waitForKubernetesService()
// Wait synchronously for the initial list operations to be
// complete of endpoints and services from APIServer.
kd.waitForResourceSyncedOrDie()
}
func (kd *KubeDNS) waitForKubernetesService() {
glog.V(2).Infof("Waiting for Kubernetes service")
const kubernetesSvcName = "kubernetes"
const servicePollInterval = 1 * time.Second
name := fmt.Sprintf("%v/%v", v1.NamespaceDefault, kubernetesSvcName)
glog.V(2).Infof("Waiting for service: %v", name)
func (kd *KubeDNS) waitForResourceSyncedOrDie() {
// Wait for both controllers have completed an initial resource listing
timeout := time.After(kd.initialSyncTimeout)
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
svc, err := kd.kubeClient.Core().Services(v1.NamespaceDefault).Get(kubernetesSvcName)
if err != nil || svc == nil {
glog.V(3).Infof(
"Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.",
name, err, servicePollInterval)
time.Sleep(servicePollInterval)
continue
select {
case <-timeout:
glog.Fatalf("Timeout waiting for initialization")
case <-ticker.C:
if kd.endpointsController.HasSynced() && kd.serviceController.HasSynced() {
glog.V(0).Infof("Initialized services and endpoints from apiserver")
return
}
glog.V(0).Infof("DNS server not ready, retry in 500 milliseconds")
}
break
}
return
}
func (kd *KubeDNS) startConfigMapSync() {