diff --git a/cluster/addons/dns/kube2sky/Makefile b/cluster/addons/dns/kube2sky/Makefile index 998f6451065..3c70ef3cb76 100644 --- a/cluster/addons/dns/kube2sky/Makefile +++ b/cluster/addons/dns/kube2sky/Makefile @@ -1,10 +1,10 @@ all: kube2sky kube2sky: kube2sky.go - CGO_ENABLED=0 go build -a -installsuffix cgo --ldflags '-w' ./kube2sky.go + CGO_ENABLED=0 godep go build -a -installsuffix cgo --ldflags '-w' ./kube2sky.go container: kube2sky - sudo docker build -t gcr.io/google_containers/kube2sky . + docker build -t gcr.io/google_containers/kube2sky . push: gcloud preview docker push gcr.io/google_containers/kube2sky diff --git a/cluster/addons/dns/kube2sky/kube2sky.go b/cluster/addons/dns/kube2sky/kube2sky.go index e19dca87f47..5e0c7856143 100644 --- a/cluster/addons/dns/kube2sky/kube2sky.go +++ b/cluster/addons/dns/kube2sky/kube2sky.go @@ -23,41 +23,62 @@ import ( "encoding/json" "flag" "fmt" - "log" + "net/url" "os" "time" kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" kclientcmd "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd" kclientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api" - kfields "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" - klabels "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" + kcontrollerFramework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" + kSelector "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" tools "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" - kwatch "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" etcd "github.com/coreos/go-etcd/etcd" + "github.com/golang/glog" skymsg "github.com/skynetservices/skydns/msg" ) var ( - domain = flag.String("domain", "kubernetes.local", "domain under which to create names") - etcd_mutation_timeout = flag.Duration("etcd_mutation_timeout", 10*time.Second, "crash after retrying etcd mutation for a specified duration") - etcd_server = flag.String("etcd-server", "http://127.0.0.1:4001", "URL to etcd server") - verbose = flag.Bool("verbose", false, "log extra information") - kubecfg_file = flag.String("kubecfg_file", "", "Location of kubecfg file for access to kubernetes service") + argDomain = flag.String("domain", "kubernetes.local", "domain under which to create names") + argEtcdMutationTimeout = flag.Duration("etcd_mutation_timeout", 10*time.Second, "crash after retrying etcd mutation for a specified duration") + argEtcdServer = flag.String("etcd-server", "http://127.0.0.1:4001", "URL to etcd server") + argKubecfgFile = flag.String("kubecfg_file", "", "Location of kubecfg file for access to kubernetes service") + argKubeMasterUrl = flag.String("kube_master_url", "http://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT}", "Url to reach kubernetes master. Env variables in this flag will be expanded.") ) -func removeDNS(record string, etcdClient *etcd.Client) error { - log.Printf("Removing %s from DNS", record) - _, err := etcdClient.Delete(skymsg.Path(record), true) +const ( + // Maximum number of retries to connect to etcd server. + maxConnectRetries = 12 + // Resync period for the kube controller loop. + resyncPeriod = 5 * time.Second +) + +type kube2sky struct { + // Etcd client. + etcdClient *etcd.Client + // Kubernetes client. + kubeClient *kclient.Client + // DNS domain name. + domain string + // Etcd mutation timeout. + etcdMutationTimeout time.Duration +} + +func (ks *kube2sky) removeDNS(record string) error { + glog.V(2).Infof("Removing %s from DNS", record) + _, err := ks.etcdClient.Delete(skymsg.Path(record), true) return err } -func addDNS(record string, service *kapi.Service, etcdClient *etcd.Client) error { +func (ks *kube2sky) addDNS(record string, service *kapi.Service) error { // if PortalIP is not set, a DNS entry should not be created if !kapi.IsServiceIPSet(service) { - log.Printf("Skipping dns record for headless service: %s\n", service.Name) + glog.V(1).Infof("Skipping dns record for headless service: %s\n", service.Name) return nil } @@ -75,8 +96,8 @@ func addDNS(record string, service *kapi.Service, etcdClient *etcd.Client) error } // Set with no TTL, and hope that kubernetes events are accurate. - log.Printf("Setting DNS record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Ports[i].Port) - _, err = etcdClient.Set(skymsg.Path(record), string(b), uint64(0)) + glog.V(2).Infof("Setting DNS record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Ports[i].Port) + _, err = ks.etcdClient.Set(skymsg.Path(record), string(b), uint64(0)) if err != nil { return err } @@ -86,16 +107,16 @@ func addDNS(record string, service *kapi.Service, etcdClient *etcd.Client) error // Implements retry logic for arbitrary mutator. Crashes after retrying for // etcd_mutation_timeout. -func mutateEtcdOrDie(mutator func() error) { - timeout := time.After(*etcd_mutation_timeout) +func (ks *kube2sky) mutateEtcdOrDie(mutator func() error) { + timeout := time.After(ks.etcdMutationTimeout) for { select { case <-timeout: - log.Fatalf("Failed to mutate etcd for %v using mutator: %v", *etcd_mutation_timeout, mutator) + glog.Fatalf("Failed to mutate etcd for %v using mutator: %v", ks.etcdMutationTimeout, mutator) default: if err := mutator(); err != nil { delay := 50 * time.Millisecond - log.Printf("Failed to mutate etcd using mutator: %v due to: %v. Will retry in: %v", mutator, err, delay) + glog.V(1).Infof("Failed to mutate etcd using mutator: %v due to: %v. Will retry in: %v", mutator, err, delay) time.Sleep(delay) } else { return @@ -104,28 +125,33 @@ func mutateEtcdOrDie(mutator func() error) { } } -func newEtcdClient() (client *etcd.Client) { - maxConnectRetries := 12 - for maxConnectRetries > 0 { - if _, err := tools.GetEtcdVersion(*etcd_server); err != nil { - log.Fatalf("Failed to connect to etcd server: %v, error: %v", *etcd_server, err) - if maxConnectRetries > 0 { - log.Println("Retrying request after 5 second sleep.") - time.Sleep(5 * time.Second) - maxConnectRetries-- - } else { - return nil - } - } else { - log.Printf("Etcd server found: %v", *etcd_server) +func newEtcdClient(etcdServer string) (*etcd.Client, error) { + var ( + client *etcd.Client + err error + ) + retries := maxConnectRetries + for retries > 0 { + if _, err = tools.GetEtcdVersion(etcdServer); err == nil { break } + if maxConnectRetries == 1 { + break + } + glog.Info("[Attempt: %d] Retrying request after 5 second sleep", retries) + time.Sleep(5 * time.Second) + retries-- } + if err != nil { + return nil, fmt.Errorf("failed to connect to etcd server: %v, error: %v", etcdServer, err) + } + glog.Infof("Etcd server found: %v", etcdServer) + // loop until we have > 0 machines && machines[0] != "" poll, timeout := 1*time.Second, 10*time.Second if err := wait.Poll(poll, timeout, func() (bool, error) { - if client = etcd.NewClient([]string{*etcd_server}); client == nil { - log.Fatal("etcd.NewClient returned nil") + if client = etcd.NewClient([]string{etcdServer}); client == nil { + return false, fmt.Errorf("etcd.NewClient returned nil") } client.SyncCluster() machines := client.GetCluster() @@ -134,195 +160,105 @@ func newEtcdClient() (client *etcd.Client) { } return true, nil }); err != nil { - log.Fatalf("Timed out after %s waiting for at least 1 synchronized etcd server in the cluster", timeout) + return nil, fmt.Errorf("Timed out after %s waiting for at least 1 synchronized etcd server in the cluster. Error: %v", timeout, err) } - return client + return client, nil +} + +func getKubeMasterUrl() (string, error) { + if *argKubeMasterUrl == "" { + return "", fmt.Errorf("no --kube_master_url specified") + } + parsedUrl, err := url.Parse(os.ExpandEnv(*argKubeMasterUrl)) + if err != nil { + return "", fmt.Errorf("failed to parse --kube_master_url %s - %v", *argKubeMasterUrl, err) + } + if parsedUrl.Scheme == "" || parsedUrl.Host == "" || parsedUrl.Host == ":" { + return "", fmt.Errorf("invalid --kube_master_url specified %s", *argKubeMasterUrl) + } + return parsedUrl.String(), nil } // TODO: evaluate using pkg/client/clientcmd func newKubeClient() (*kclient.Client, error) { var config *kclient.Config - if *kubecfg_file == "" { - // No kubecfg file provided. Use kubernetes_ro service. - masterHost := os.Getenv("KUBERNETES_RO_SERVICE_HOST") - if masterHost == "" { - log.Fatalf("KUBERNETES_RO_SERVICE_HOST is not defined") - } - masterPort := os.Getenv("KUBERNETES_RO_SERVICE_PORT") - if masterPort == "" { - log.Fatalf("KUBERNETES_RO_SERVICE_PORT is not defined") - } + masterUrl, err := getKubeMasterUrl() + if err != nil { + return nil, err + } + if *argKubecfgFile == "" { config = &kclient.Config{ - Host: fmt.Sprintf("http://%s:%s", masterHost, masterPort), - Version: "v1beta1", + Host: masterUrl, + Version: "v1beta3", } } else { - masterHost := os.Getenv("KUBERNETES_SERVICE_HOST") - if masterHost == "" { - log.Fatalf("KUBERNETES_SERVICE_HOST is not defined") - } - masterPort := os.Getenv("KUBERNETES_SERVICE_PORT") - if masterPort == "" { - log.Fatalf("KUBERNETES_SERVICE_PORT is not defined") - } - master := fmt.Sprintf("https://%s:%s", masterHost, masterPort) var err error if config, err = kclientcmd.NewNonInteractiveDeferredLoadingClientConfig( - &kclientcmd.ClientConfigLoadingRules{ExplicitPath: *kubecfg_file}, - &kclientcmd.ConfigOverrides{ClusterInfo: kclientcmdapi.Cluster{Server: master}}).ClientConfig(); err != nil { + &kclientcmd.ClientConfigLoadingRules{ExplicitPath: *argKubecfgFile}, + &kclientcmd.ConfigOverrides{ClusterInfo: kclientcmdapi.Cluster{Server: masterUrl}}).ClientConfig(); err != nil { return nil, err } } - log.Printf("Using %s for kubernetes master", config.Host) - log.Printf("Using kubernetes API %s", config.Version) + glog.Infof("Using %s for kubernetes master", config.Host) + glog.Infof("Using kubernetes API %s", config.Version) return kclient.New(config) } -func buildNameString(service, namespace, domain string) string { +func (ks *kube2sky) buildNameString(service, namespace, domain string) string { return fmt.Sprintf("%s.%s.%s.", service, namespace, domain) } -func watchOnce(etcdClient *etcd.Client, kubeClient *kclient.Client) { - // Start the goroutine to produce update events. - updates := make(chan serviceUpdate) - startWatching(kubeClient.Services(kapi.NamespaceAll), updates) +// Returns a cache.ListWatch that gets all changes to services. +func (ks *kube2sky) createServiceLW() *cache.ListWatch { + return cache.NewListWatchFromClient(ks.kubeClient, "services", kapi.NamespaceAll, kSelector.Everything()) +} - // This loop will break if the channel closes, which is how the - // goroutine signals an error. - for ev := range updates { - if *verbose { - log.Printf("Received update event: %#v", ev) - } - switch ev.Op { - case SetServices, AddService: - for i := range ev.Services { - s := &ev.Services[i] - name := buildNameString(s.Name, s.Namespace, *domain) - mutateEtcdOrDie(func() error { return addDNS(name, s, etcdClient) }) - } - case RemoveService: - for i := range ev.Services { - s := &ev.Services[i] - name := buildNameString(s.Name, s.Namespace, *domain) - mutateEtcdOrDie(func() error { return removeDNS(name, etcdClient) }) - } - } +func (ks *kube2sky) newService(obj interface{}) { + if s, ok := obj.(*kapi.Service); ok { + name := ks.buildNameString(s.Name, s.Namespace, ks.domain) + ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) }) } - //TODO: fully resync periodically. +} + +func (ks *kube2sky) removeService(obj interface{}) { + if s, ok := obj.(*kapi.Service); ok { + name := ks.buildNameString(s.Name, s.Namespace, ks.domain) + ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) }) + } +} + +func (ks *kube2sky) watchForServices() { + var serviceController *kcontrollerFramework.Controller + _, serviceController = framework.NewInformer( + ks.createServiceLW(), + &kapi.Service{}, + resyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: ks.newService, + DeleteFunc: ks.removeService, + UpdateFunc: func(oldObj, newObj interface{}) { + ks.newService(newObj) + }, + }, + ) + serviceController.Run(util.NeverStop) } func main() { flag.Parse() - - etcdClient := newEtcdClient() - if etcdClient == nil { - log.Fatal("Failed to create etcd client") + var err error + // TODO: Validate input flags. + ks := kube2sky{ + domain: *argDomain, + etcdMutationTimeout: *argEtcdMutationTimeout, + } + if ks.etcdClient, err = newEtcdClient(*argEtcdServer); err != nil { + glog.Fatalf("Failed to create etcd client - %v", err) } - kubeClient, err := newKubeClient() - if err != nil { - log.Fatalf("Failed to create a kubernetes client: %v", err) + if ks.kubeClient, err = newKubeClient(); err != nil { + glog.Fatalf("Failed to create a kubernetes client: %v", err) } - // In case of error, the watch will be aborted. At that point we just - // retry. - for { - watchOnce(etcdClient, kubeClient) - } -} - -//FIXME: make the below part of the k8s client lib? - -// servicesWatcher is capable of listing and watching for changes to services -// across ALL namespaces -type servicesWatcher interface { - List(label klabels.Selector) (*kapi.ServiceList, error) - Watch(label klabels.Selector, field kfields.Selector, resourceVersion string) (kwatch.Interface, error) -} - -type operation int - -// These are the available operation types. -const ( - SetServices operation = iota - AddService - RemoveService -) - -// serviceUpdate describes an operation of services, sent on the channel. -// -// You can add or remove a single service by sending an array of size one with -// Op == AddService|RemoveService. For setting the state of the system to a given state, just -// set Services as desired and Op to SetServices, which will reset the system -// state to that specified in this operation for this source channel. To remove -// all services, set Services to empty array and Op to SetServices -type serviceUpdate struct { - Services []kapi.Service - Op operation -} - -// startWatching launches a goroutine that watches for changes to services. -func startWatching(watcher servicesWatcher, updates chan<- serviceUpdate) { - serviceVersion := "" - go watchLoop(watcher, updates, &serviceVersion) -} - -// watchLoop loops forever looking for changes to services. If an error occurs -// it will close the channel and return. -func watchLoop(svcWatcher servicesWatcher, updates chan<- serviceUpdate, resourceVersion *string) { - defer close(updates) - - if len(*resourceVersion) == 0 { - services, err := svcWatcher.List(klabels.Everything()) - if err != nil { - log.Printf("Failed to load services: %v", err) - return - } - *resourceVersion = services.ResourceVersion - updates <- serviceUpdate{Op: SetServices, Services: services.Items} - } - - watcher, err := svcWatcher.Watch(klabels.Everything(), kfields.Everything(), *resourceVersion) - if err != nil { - log.Printf("Failed to watch for service changes: %v", err) - return - } - defer watcher.Stop() - - ch := watcher.ResultChan() - for { - select { - case event, ok := <-ch: - if !ok { - log.Printf("watchLoop channel closed") - return - } - - if event.Type == kwatch.Error { - if status, ok := event.Object.(*kapi.Status); ok { - log.Printf("Error during watch: %#v", status) - return - } - log.Fatalf("Received unexpected error: %#v", event.Object) - } - - if service, ok := event.Object.(*kapi.Service); ok { - sendUpdate(updates, event, service, resourceVersion) - continue - } - } - } -} - -func sendUpdate(updates chan<- serviceUpdate, event kwatch.Event, service *kapi.Service, resourceVersion *string) { - *resourceVersion = service.ResourceVersion - - switch event.Type { - case kwatch.Added, kwatch.Modified: - updates <- serviceUpdate{Op: AddService, Services: []kapi.Service{*service}} - case kwatch.Deleted: - updates <- serviceUpdate{Op: RemoveService, Services: []kapi.Service{*service}} - default: - log.Fatalf("Unknown event.Type: %v", event.Type) - } + ks.watchForServices() }