From 3ee2b76554e6544bc69f4ae05c917b97df89ce86 Mon Sep 17 00:00:00 2001 From: "Madhusudan.C.S" Date: Wed, 18 May 2016 00:04:10 -0700 Subject: [PATCH] Switch kube-dns to use external versioned API instead of the internal version. --- cmd/kube-dns/app/server.go | 6 ++--- pkg/dns/dns.go | 48 ++++++++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/cmd/kube-dns/app/server.go b/cmd/kube-dns/app/server.go index 60dddba5889..9cfcb49b5c8 100644 --- a/cmd/kube-dns/app/server.go +++ b/cmd/kube-dns/app/server.go @@ -28,8 +28,8 @@ import ( "github.com/skynetservices/skydns/server" "k8s.io/kubernetes/cmd/kube-dns/app/options" "k8s.io/kubernetes/pkg/api/unversioned" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3" "k8s.io/kubernetes/pkg/client/restclient" - kclient "k8s.io/kubernetes/pkg/client/unversioned" kclientcmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" kdns "k8s.io/kubernetes/pkg/dns" ) @@ -56,7 +56,7 @@ func NewKubeDNSServerDefault(config *options.KubeDNSConfig) *KubeDNSServer { } // TODO: evaluate using pkg/client/clientcmd -func newKubeClient(dnsConfig *options.KubeDNSConfig) (*kclient.Client, error) { +func newKubeClient(dnsConfig *options.KubeDNSConfig) (clientset.Interface, error) { var ( config *restclient.Config err error @@ -85,7 +85,7 @@ func newKubeClient(dnsConfig *options.KubeDNSConfig) (*kclient.Client, error) { glog.Infof("Using %s for kubernetes master", config.Host) glog.Infof("Using kubernetes API %v", config.GroupVersion) - return kclient.New(config) + return clientset.NewForConfig(config) } func (server *KubeDNSServer) Run() { diff --git a/pkg/dns/dns.go b/pkg/dns/dns.go index 048ed4273b0..9f4d1fef637 100644 --- a/pkg/dns/dns.go +++ b/pkg/dns/dns.go @@ -30,13 +30,15 @@ import ( skymsg "github.com/skynetservices/skydns/msg" kapi "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/endpoints" - kunversioned "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/unversioned" + v1 "k8s.io/kubernetes/pkg/api/v1" kcache "k8s.io/kubernetes/pkg/client/cache" - kclient "k8s.io/kubernetes/pkg/client/unversioned" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_3" kframework "k8s.io/kubernetes/pkg/controller/framework" - kselector "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/validation" "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/watch" ) const ( @@ -62,7 +64,7 @@ const ( type KubeDNS struct { // kubeClient makes calls to API Server and registers calls with API Server // to get Endpoints and Service objects. - kubeClient *kclient.Client + kubeClient clientset.Interface // The domain for which this DNS Server is authoritative. domain string @@ -102,7 +104,7 @@ type KubeDNS struct { nodesStore kcache.Store } -func NewKubeDNS(client *kclient.Client, domain string, federations map[string]string) *KubeDNS { +func NewKubeDNS(client clientset.Interface, domain string, federations map[string]string) *KubeDNS { kd := &KubeDNS{ kubeClient: client, domain: domain, @@ -127,13 +129,13 @@ func (kd *KubeDNS) Start() { kd.waitForKubernetesService() } -func (kd *KubeDNS) waitForKubernetesService() (svc *kapi.Service) { +func (kd *KubeDNS) waitForKubernetesService() (svc *v1.Service) { name := fmt.Sprintf("%v/%v", kapi.NamespaceDefault, kubernetesSvcName) glog.Infof("Waiting for service: %v", name) var err error servicePollInterval := 1 * time.Second for { - svc, err = kd.kubeClient.Services(kapi.NamespaceDefault).Get(kubernetesSvcName) + svc, err = kd.kubeClient.Core().Services(kapi.NamespaceDefault).Get(kubernetesSvcName) if err != nil || svc == nil { glog.Infof("Ignoring error while waiting for service %v: %v. Sleeping %v before retrying.", name, err, servicePollInterval) time.Sleep(servicePollInterval) @@ -153,10 +155,16 @@ func (kd *KubeDNS) GetCacheAsJSON() (string, error) { func (kd *KubeDNS) setServicesStore() { // Returns a cache.ListWatch that gets all changes to services. - serviceWatch := kcache.NewListWatchFromClient(kd.kubeClient, "services", kapi.NamespaceAll, kselector.Everything()) kd.servicesStore, kd.serviceController = kframework.NewInformer( - serviceWatch, - &kapi.Service{}, + &kcache.ListWatch{ + ListFunc: func(options kapi.ListOptions) (runtime.Object, error) { + return kd.kubeClient.Core().Services(v1.NamespaceAll).List(options) + }, + WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) { + return kd.kubeClient.Core().Services(v1.NamespaceAll).Watch(options) + }, + }, + &v1.Service{}, resyncPeriod, kframework.ResourceEventHandlerFuncs{ AddFunc: kd.newService, @@ -168,10 +176,16 @@ func (kd *KubeDNS) setServicesStore() { func (kd *KubeDNS) setEndpointsStore() { // Returns a cache.ListWatch that gets all changes to endpoints. - endpointsWatch := kcache.NewListWatchFromClient(kd.kubeClient, "endpoints", kapi.NamespaceAll, kselector.Everything()) kd.endpointsStore, kd.endpointsController = kframework.NewInformer( - endpointsWatch, - &kapi.Endpoints{}, + &kcache.ListWatch{ + ListFunc: func(options kapi.ListOptions) (runtime.Object, error) { + return kd.kubeClient.Core().Endpoints(v1.NamespaceAll).List(options) + }, + WatchFunc: func(options kapi.ListOptions) (watch.Interface, error) { + return kd.kubeClient.Core().Endpoints(v1.NamespaceAll).Watch(options) + }, + }, + &v1.Endpoints{}, resyncPeriod, kframework.ResourceEventHandlerFuncs{ AddFunc: kd.handleEndpointAdd, @@ -546,12 +560,12 @@ func (kd *KubeDNS) federationRecords(queryPath []string) ([]skymsg.Service, erro // simpler approach here. // Also note that zone here means the zone in cloud provider terminology, not the DNS zone. func (kd *KubeDNS) getClusterZone() (string, error) { - var node *kapi.Node + var node *v1.Node objs := kd.nodesStore.List() if len(objs) > 0 { var ok bool - if node, ok = objs[0].(*kapi.Node); !ok { + if node, ok = objs[0].(*v1.Node); !ok { return "", fmt.Errorf("expected node object, got: %T", objs[0]) } } else { @@ -559,7 +573,7 @@ func (kd *KubeDNS) getClusterZone() (string, error) { // wasteful in case of non-federated independent Kubernetes clusters. So carefully // proceeding here. // TODO(madhusudancs): Move this to external/v1 API. - nodeList, err := kd.kubeClient.Nodes().List(kapi.ListOptions{}) + nodeList, err := kd.kubeClient.Core().Nodes().List(kapi.ListOptions{}) if err != nil || len(nodeList.Items) == 0 { return "", fmt.Errorf("failed to retrieve the cluster nodes: %v", err) } @@ -570,7 +584,7 @@ func (kd *KubeDNS) getClusterZone() (string, error) { } } - zone, ok := node.Annotations[kunversioned.LabelZoneFailureDomain] + zone, ok := node.Annotations[unversioned.LabelZoneFailureDomain] if !ok || zone == "" { return "", fmt.Errorf("unknown cluster zone") }