From f0c20e17c58734846d62f5b03556e0cc82d54916 Mon Sep 17 00:00:00 2001 From: Vishnu Kannan Date: Mon, 18 May 2015 10:03:30 -0700 Subject: [PATCH 1/3] Adding support for generating A records for headless services. --- cluster/addons/dns/kube2sky/Makefile | 2 +- cluster/addons/dns/kube2sky/kube2sky.go | 286 +++++++++++++++---- cluster/addons/dns/kube2sky/kube2sky_test.go | 188 +++++++++++- 3 files changed, 408 insertions(+), 68 deletions(-) diff --git a/cluster/addons/dns/kube2sky/Makefile b/cluster/addons/dns/kube2sky/Makefile index 15853c9038d..fa1c8b71256 100644 --- a/cluster/addons/dns/kube2sky/Makefile +++ b/cluster/addons/dns/kube2sky/Makefile @@ -10,7 +10,7 @@ PREFIX = gcr.io/google_containers all: container kube2sky: kube2sky.go - CGO_ENABLED=0 godep go build -a -installsuffix cgo --ldflags '-w' ./kube2sky.go + GOOS=linux GOARCH=amd64 CGO_ENABLED=0 godep go build -a -installsuffix cgo --ldflags '-w' ./kube2sky.go container: kube2sky docker build -t $(PREFIX)/kube2sky:$(TAG) . diff --git a/cluster/addons/dns/kube2sky/kube2sky.go b/cluster/addons/dns/kube2sky/kube2sky.go index 913e1a34612..f574174dd00 100644 --- a/cluster/addons/dns/kube2sky/kube2sky.go +++ b/cluster/addons/dns/kube2sky/kube2sky.go @@ -23,17 +23,19 @@ import ( "encoding/json" "flag" "fmt" + "net/http" "net/url" "os" + "strings" + "sync" "time" kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" kclientcmd "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd" kclientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" - kcontrollerFramework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" + kframework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" kSelector "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" tools "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -56,13 +58,21 @@ const ( maxConnectAttempts = 12 // Resync period for the kube controller loop. resyncPeriod = 5 * time.Second + // A subdomain added to the user specified domain for all services. + serviceSubdomain = "svc" ) type etcdClient interface { Set(path, value string, ttl uint64) (*etcd.Response, error) + RawGet(key string, sort, recursive bool) (*etcd.RawResponse, error) Delete(path string, recursive bool) (*etcd.Response, error) } +type nameNamespace struct { + name string + namespace string +} + type kube2sky struct { // Etcd client. etcdClient etcdClient @@ -70,43 +80,162 @@ type kube2sky struct { domain string // Etcd mutation timeout. etcdMutationTimeout time.Duration + // A cache that contains all the endpoints in the system. + endpointsStore kcache.Store + // A cache that contains all the servicess in the system. + servicesStore kcache.Store + // Lock for controlling access to headless services. + mlock sync.Mutex } -func (ks *kube2sky) removeDNS(record string) error { - glog.V(2).Infof("Removing %s from DNS", record) - _, err := ks.etcdClient.Delete(skymsg.Path(record), true) +// Removes 'subdomain' from etcd. +func (ks *kube2sky) removeDNS(subdomain string) error { + glog.V(2).Infof("Removing %s from DNS", subdomain) + resp, err := ks.etcdClient.RawGet(skymsg.Path(subdomain), false, false) + if err != nil { + return err + } + if resp.StatusCode == http.StatusNotFound { + glog.V(2).Infof("Subdomain %q does not exist in etcd", subdomain) + return nil + } + _, err = ks.etcdClient.Delete(skymsg.Path(subdomain), true) return err } -func (ks *kube2sky) addDNS(record string, service *kapi.Service) error { - // if PortalIP is not set, a DNS entry should not be created - if !kapi.IsServiceIPSet(service) { - glog.V(1).Infof("Skipping dns record for headless service: %s\n", service.Name) +func (ks *kube2sky) writeSkyRecord(subdomain string, data string) error { + // Set with no TTL, and hope that kubernetes events are accurate. + _, err := ks.etcdClient.Set(skymsg.Path(subdomain), data, uint64(0)) + return err +} + +// Generates skydns records for a headless service. +func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service) error { + // Create an A record for every pod in the service. + // This record must be periodically updated. + // Format is as follows: + // For a service x, with pods a and b create DNS records, + // a.x.ns.domain. and, b.x.ns.domain. + // TODO: Handle multi-port services. + ks.mlock.Lock() + defer ks.mlock.Unlock() + key, err := kcache.MetaNamespaceKeyFunc(service) + if err != nil { + return err + } + e, exists, err := ks.endpointsStore.GetByKey(key) + if err != nil { + return fmt.Errorf("failed to get endpoints object from endpoints store - %v", err) + } + if !exists { + glog.V(1).Infof("could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up.", service.Name, service.Namespace) return nil } + if e, ok := e.(*kapi.Endpoints); ok { + return ks.generateRecordsForHeadlessService(subdomain, e, service) + } + return nil +} - for i := range service.Spec.Ports { - svc := skymsg.Service{ - Host: service.Spec.PortalIP, - Port: service.Spec.Ports[i].Port, - Priority: 10, - Weight: 10, - Ttl: 30, +func getSkyMsg(ip string, port int) *skymsg.Service { + return &skymsg.Service{ + Host: ip, + Port: port, + Priority: 10, + Weight: 10, + Ttl: 30, + } +} + +func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service) error { + for idx := range e.Subsets { + for subIdx := range e.Subsets[idx].Addresses { + subdomain := buildDNSNameString(subdomain, fmt.Sprintf("%d%d", idx, subIdx)) + b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, svc.Spec.Ports[0].Port)) + if err != nil { + return err + } + glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b)) + if err := ks.writeSkyRecord(subdomain, string(b)); err != nil { + return err + } } - b, err := json.Marshal(svc) + } + + return nil +} + +func (ks *kube2sky) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, error) { + key, err := kcache.MetaNamespaceKeyFunc(e) + if err != nil { + return nil, err + } + obj, exists, err := ks.servicesStore.GetByKey(key) + if err != nil { + return nil, fmt.Errorf("failed to get service object from services store - %v", err) + } + if !exists { + glog.V(1).Infof("could not find service for endpoint %q in namespace %q", e.Name, e.Namespace) + return nil, nil + } + if svc, ok := obj.(*kapi.Service); ok { + return svc, nil + } + return nil, fmt.Errorf("got a non service object in services store %v", obj) +} + +func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints) error { + ks.mlock.Lock() + defer ks.mlock.Unlock() + svc, err := ks.getServiceFromEndpoints(e) + if err != nil { + return err + } + if svc == nil || kapi.IsServiceIPSet(svc) { + // No headless service found corresponding to endpoints object. + return nil + } + // Remove existing DNS entry. + if err := ks.removeDNS(subdomain); err != nil { + return err + } + return ks.generateRecordsForHeadlessService(subdomain, e, svc) +} + +func (ks *kube2sky) handleEndpointAdd(obj interface{}) { + if e, ok := obj.(*kapi.Endpoints); ok { + name := buildDNSNameString(ks.domain, e.Namespace, e.Name) + ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) }) + name = buildDNSNameString(ks.domain, serviceSubdomain, e.Namespace, e.Name) + ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) }) + } +} + +func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service) error { + for i := range service.Spec.Ports { + b, err := json.Marshal(getSkyMsg(service.Spec.PortalIP, service.Spec.Ports[i].Port)) if err != nil { return err } - // Set with no TTL, and hope that kubernetes events are accurate. - glog.V(2).Infof("Setting DNS record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Ports[i].Port) - _, err = ks.etcdClient.Set(skymsg.Path(record), string(b), uint64(0)) - if err != nil { + glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b)) + if err := ks.writeSkyRecord(subdomain, string(b)); err != nil { return err } } return nil } +func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service) error { + if len(service.Spec.Ports) == 0 { + glog.Fatalf("unexpected service with no ports: %v", service) + } + // if PortalIP is not set, a DNS entry should not be created + if !kapi.IsServiceIPSet(service) { + return ks.newHeadlessService(subdomain, service) + } + return ks.generateRecordsForPortalService(subdomain, service) +} + // Implements retry logic for arbitrary mutator. Crashes after retrying for // etcd_mutation_timeout. func (ks *kube2sky) mutateEtcdOrDie(mutator func() error) { @@ -127,6 +256,47 @@ func (ks *kube2sky) mutateEtcdOrDie(mutator func() error) { } } +func buildDNSNameString(labels ...string) string { + var res string + for _, label := range labels { + if res == "" { + res = label + } else { + res = fmt.Sprintf("%s.%s", label, res) + } + } + return res +} + +// Returns a cache.ListWatch that gets all changes to services. +func createServiceLW(kubeClient *kclient.Client) *kcache.ListWatch { + return kcache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kSelector.Everything()) +} + +// Returns a cache.ListWatch that gets all changes to endpoints. +func createEndpointsLW(kubeClient *kclient.Client) *kcache.ListWatch { + return kcache.NewListWatchFromClient(kubeClient, "endpoints", kapi.NamespaceAll, kSelector.Everything()) +} + +func (ks *kube2sky) newService(obj interface{}) { + if s, ok := obj.(*kapi.Service); ok { + //TODO(artfulcoder) stop adding and deleting old-format string for service + name := buildDNSNameString(ks.domain, s.Namespace, s.Name) + ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) }) + name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name) + ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) }) + } +} + +func (ks *kube2sky) removeService(obj interface{}) { + if s, ok := obj.(*kapi.Service); ok { + name := buildDNSNameString(ks.domain, s.Namespace, s.Name) + ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) }) + name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name) + ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) }) + } +} + func newEtcdClient(etcdServer string) (*etcd.Client, error) { var ( client *etcd.Client @@ -204,61 +374,52 @@ func newKubeClient() (*kclient.Client, error) { return kclient.New(config) } -func buildOldNameString(service, namespace, domain string) string { - return fmt.Sprintf("%s.%s.%s.", service, namespace, domain) -} - -func buildNewServiceNameString(service, namespace, domain string) string { - return fmt.Sprintf("%s.%s.svc.%s.", service, namespace, domain) -} - -// Returns a cache.ListWatch that gets all changes to services. -func createServiceLW(kubeClient *kclient.Client) *cache.ListWatch { - return cache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kSelector.Everything()) -} - -func (ks *kube2sky) newService(obj interface{}) { - if s, ok := obj.(*kapi.Service); ok { - //TODO(artfulcoder) stop adding and deleting old-format string for service - name := buildOldNameString(s.Name, s.Namespace, ks.domain) - ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) }) - name1 := buildNewServiceNameString(s.Name, s.Namespace, ks.domain) - ks.mutateEtcdOrDie(func() error { return ks.addDNS(name1, s) }) - } -} - -func (ks *kube2sky) removeService(obj interface{}) { - if s, ok := obj.(*kapi.Service); ok { - name := buildOldNameString(s.Name, s.Namespace, ks.domain) - ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) }) - name1 := buildNewServiceNameString(s.Name, s.Namespace, ks.domain) - ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name1) }) - } -} - -func watchForServices(kubeClient *kclient.Client, ks *kube2sky) { - var serviceController *kcontrollerFramework.Controller - _, serviceController = framework.NewInformer( +func watchForServices(kubeClient *kclient.Client, ks *kube2sky) kcache.Store { + serviceStore, serviceController := kframework.NewInformer( createServiceLW(kubeClient), &kapi.Service{}, resyncPeriod, - framework.ResourceEventHandlerFuncs{ + kframework.ResourceEventHandlerFuncs{ AddFunc: ks.newService, DeleteFunc: ks.removeService, UpdateFunc: func(oldObj, newObj interface{}) { + // TODO: Avoid unwanted updates. ks.newService(newObj) }, }, ) - serviceController.Run(util.NeverStop) + go serviceController.Run(util.NeverStop) + return serviceStore +} + +func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store { + eStore, eController := kframework.NewInformer( + createEndpointsLW(kubeClient), + &kapi.Endpoints{}, + resyncPeriod, + kframework.ResourceEventHandlerFuncs{ + AddFunc: ks.handleEndpointAdd, + UpdateFunc: func(oldObj, newObj interface{}) { + // TODO: Avoid unwanted updates. + ks.handleEndpointAdd(newObj) + }, + }, + ) + + go eController.Run(util.NeverStop) + return eStore } func main() { flag.Parse() var err error // TODO: Validate input flags. + domain := *argDomain + if !strings.HasSuffix(domain, ".") { + domain = fmt.Sprintf("%s.", domain) + } ks := kube2sky{ - domain: *argDomain, + domain: domain, etcdMutationTimeout: *argEtcdMutationTimeout, } if ks.etcdClient, err = newEtcdClient(*argEtcdServer); err != nil { @@ -270,5 +431,8 @@ func main() { glog.Fatalf("Failed to create a kubernetes client: %v", err) } - watchForServices(kubeClient, &ks) + ks.endpointsStore = watchEndpoints(kubeClient, &ks) + ks.servicesStore = watchForServices(kubeClient, &ks) + + select {} } diff --git a/cluster/addons/dns/kube2sky/kube2sky_test.go b/cluster/addons/dns/kube2sky/kube2sky_test.go index 5098ec88482..324a634f1a8 100644 --- a/cluster/addons/dns/kube2sky/kube2sky_test.go +++ b/cluster/addons/dns/kube2sky/kube2sky_test.go @@ -18,12 +18,14 @@ package main import ( "encoding/json" + "net/http" "path" "strings" "testing" "time" kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/coreos/go-etcd/etcd" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -34,22 +36,35 @@ type fakeEtcdClient struct { writes map[string]string } -func (ec *fakeEtcdClient) Set(path, value string, ttl uint64) (*etcd.Response, error) { - ec.writes[path] = value +func (ec *fakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) { + ec.writes[key] = value return nil, nil } -func (ec *fakeEtcdClient) Delete(path string, recursive bool) (*etcd.Response, error) { +func (ec *fakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) { for p := range ec.writes { - if (recursive && strings.HasPrefix(p, path)) || (!recursive && p == path) { + if (recursive && strings.HasPrefix(p, key)) || (!recursive && p == key) { delete(ec.writes, p) } } return nil, nil } +func (ec *fakeEtcdClient) RawGet(key string, sort, recursive bool) (*etcd.RawResponse, error) { + count := 0 + for path := range ec.writes { + if strings.HasPrefix(path, key) { + count++ + } + } + if count == 0 { + return &etcd.RawResponse{StatusCode: http.StatusNotFound}, nil + } + return &etcd.RawResponse{StatusCode: http.StatusOK}, nil +} + const ( - testDomain = "cluster.local" + testDomain = "cluster.local." basePath = "/skydns/local/cluster" serviceSubDomain = "svc" ) @@ -59,6 +74,8 @@ func newKube2Sky(ec etcdClient) *kube2sky { etcdClient: ec, domain: testDomain, etcdMutationTimeout: time.Second, + endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc), + servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc), } } @@ -113,14 +130,166 @@ func TestHeadlessService(t *testing.T) { k2s := newKube2Sky(ec) service := kapi.Service{ ObjectMeta: kapi.ObjectMeta{ - Name: testNamespace, + Name: testService, Namespace: testNamespace, }, + Spec: kapi.ServiceSpec{ + PortalIP: "None", + Ports: []kapi.ServicePort{ + {Port: 80}, + }, + }, } + assert.NoError(t, k2s.servicesStore.Add(&service)) + endpoints := kapi.Endpoints{ + ObjectMeta: service.ObjectMeta, + Subsets: []kapi.EndpointSubset{ + { + Addresses: []kapi.EndpointAddress{ + {IP: "10.0.0.1"}, + {IP: "10.0.0.2"}, + }, + Ports: []kapi.EndpointPort{ + {Port: 80}, + }, + }, + { + Addresses: []kapi.EndpointAddress{ + {IP: "10.0.0.3"}, + {IP: "10.0.0.4"}, + }, + Ports: []kapi.EndpointPort{ + {Port: 8080}, + }, + }, + }, + } + // We expect 4 records with "svc" subdomain and 4 records without + // "svc" subdomain. + expectedDNSRecords := 8 + assert.NoError(t, k2s.endpointsStore.Add(&endpoints)) k2s.newService(&service) + assert.Equal(t, expectedDNSRecords, len(ec.writes)) + k2s.removeService(&service) assert.Empty(t, ec.writes) } +func TestHeadlessServiceEndpointsUpdate(t *testing.T) { + const ( + testService = "testService" + testNamespace = "default" + ) + ec := &fakeEtcdClient{make(map[string]string)} + k2s := newKube2Sky(ec) + service := kapi.Service{ + ObjectMeta: kapi.ObjectMeta{ + Name: testService, + Namespace: testNamespace, + }, + Spec: kapi.ServiceSpec{ + PortalIP: "None", + Ports: []kapi.ServicePort{ + {Port: 80}, + }, + }, + } + assert.NoError(t, k2s.servicesStore.Add(&service)) + endpoints := kapi.Endpoints{ + ObjectMeta: service.ObjectMeta, + Subsets: []kapi.EndpointSubset{ + { + Addresses: []kapi.EndpointAddress{ + {IP: "10.0.0.1"}, + {IP: "10.0.0.2"}, + }, + Ports: []kapi.EndpointPort{ + {Port: 80}, + }, + }, + }, + } + expectedDNSRecords := 4 + assert.NoError(t, k2s.endpointsStore.Add(&endpoints)) + k2s.newService(&service) + assert.Equal(t, expectedDNSRecords, len(ec.writes)) + endpoints.Subsets = append(endpoints.Subsets, + kapi.EndpointSubset{ + Addresses: []kapi.EndpointAddress{ + {IP: "10.0.0.3"}, + {IP: "10.0.0.4"}, + }, + Ports: []kapi.EndpointPort{ + {Port: 8080}, + }, + }, + ) + expectedDNSRecords = 8 + k2s.handleEndpointAdd(&endpoints) + + assert.Equal(t, expectedDNSRecords, len(ec.writes)) + k2s.removeService(&service) + assert.Empty(t, ec.writes) +} + +func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) { + const ( + testService = "testService" + testNamespace = "default" + ) + ec := &fakeEtcdClient{make(map[string]string)} + k2s := newKube2Sky(ec) + service := kapi.Service{ + ObjectMeta: kapi.ObjectMeta{ + Name: testService, + Namespace: testNamespace, + }, + Spec: kapi.ServiceSpec{ + PortalIP: "None", + Ports: []kapi.ServicePort{ + {Port: 80}, + }, + }, + } + assert.NoError(t, k2s.servicesStore.Add(&service)) + // Headless service DNS records should not be created since + // corresponding endpoints object doesn't exist. + k2s.newService(&service) + assert.Empty(t, ec.writes) + + // Add an endpoints object for the service. + endpoints := kapi.Endpoints{ + ObjectMeta: service.ObjectMeta, + Subsets: []kapi.EndpointSubset{ + { + Addresses: []kapi.EndpointAddress{ + {IP: "10.0.0.1"}, + {IP: "10.0.0.2"}, + }, + Ports: []kapi.EndpointPort{ + {Port: 80}, + }, + }, + { + Addresses: []kapi.EndpointAddress{ + {IP: "10.0.0.3"}, + {IP: "10.0.0.4"}, + }, + Ports: []kapi.EndpointPort{ + {Port: 8080}, + }, + }, + }, + } + // We expect 4 records with "svc" subdomain and 4 records without + // "svc" subdomain. + expectedDNSRecords := 8 + k2s.handleEndpointAdd(&endpoints) + assert.Equal(t, expectedDNSRecords, len(ec.writes)) +} + +// TODO: Test service updates for headless services. +// TODO: Test headless service addition with delayed endpoints addition + func TestAddSinglePortService(t *testing.T) { const ( testService = "testService" @@ -206,3 +375,10 @@ func TestDeleteSinglePortService(t *testing.T) { k2s.removeService(&service) assert.Empty(t, ec.writes) } + +func TestBuildDNSName(t *testing.T) { + expectedDNSName := "name.ns.svc.cluster.local." + assert.Equal(t, expectedDNSName, buildDNSNameString("local.", "cluster", "svc", "ns", "name")) + newExpectedDNSName := "00.name.ns.svc.cluster.local." + assert.Equal(t, newExpectedDNSName, buildDNSNameString(expectedDNSName, "00")) +} From c2fcdec503e449f03d91d39d0ff417321f889191 Mon Sep 17 00:00:00 2001 From: Vishnu Kannan Date: Fri, 22 May 2015 13:58:18 -0700 Subject: [PATCH 2/3] Adding an e2e test for headless services. --- test/e2e/dns.go | 162 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 162 insertions(+) diff --git a/test/e2e/dns.go b/test/e2e/dns.go index 0ece4af69f8..13946002919 100644 --- a/test/e2e/dns.go +++ b/test/e2e/dns.go @@ -178,4 +178,166 @@ var _ = Describe("DNS", func() { Logf("DNS probes using %s succeeded\n", pod.Name) }) + It("should provide DNS for headless services", func() { + if providerIs("vagrant") { + By("Skipping test which is broken for vagrant (See https://github.com/GoogleCloudPlatform/kubernetes/issues/3580)") + return + } + + podClient := f.Client.Pods(api.NamespaceDefault) + + By("Waiting for DNS Service to be Running") + dnsPods, err := podClient.List(dnsServiceLableSelector, fields.Everything()) + if err != nil { + Failf("Failed to list all dns service pods") + } + if len(dnsPods.Items) != 1 { + Failf("Unexpected number of pods (%d) matches the label selector %v", len(dnsPods.Items), dnsServiceLableSelector.String()) + } + expectNoError(waitForPodRunning(f.Client, dnsPods.Items[0].Name)) + + // Create a test headless service. + By("Creating a test headless service") + testServiceName := "test-service" + testServiceSelector := map[string]string{ + "dns-test": "true", + } + svc := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: testServiceName, + }, + Spec: api.ServiceSpec{ + PortalIP: "None", + Ports: []api.ServicePort{ + {Port: 80}, + }, + Selector: testServiceSelector, + }, + } + + _, err = f.Client.Services(f.Namespace.Name).Create(svc) + Expect(err).NotTo(HaveOccurred()) + defer func() { + By("deleting the test headless service") + defer GinkgoRecover() + f.Client.Services(f.Namespace.Name).Delete(svc.Name) + }() + + // All the names we need to be able to resolve. + // TODO: Create more endpoints and ensure that multiple A records are returned + // for headless service. + namesToResolve := []string{ + fmt.Sprintf("%s", testServiceName), + fmt.Sprintf("%s.%s", testServiceName, f.Namespace.Name), + fmt.Sprintf("%s.%s.svc", testServiceName, f.Namespace.Name), + } + + probeCmd := "for i in `seq 1 600`; do " + for _, name := range namesToResolve { + // Resolve by TCP and UDP DNS. + probeCmd += fmt.Sprintf(`test -n "$(dig +notcp +noall +answer +search %s)" && echo OK > /results/udp@%s;`, name, name) + probeCmd += fmt.Sprintf(`test -n "$(dig +tcp +noall +answer +search %s)" && echo OK > /results/tcp@%s;`, name, name) + } + probeCmd += "sleep 1; done" + Logf("vishh: 1") + // Run a pod which probes DNS and exposes the results by HTTP. + By("creating a pod to probe DNS") + pod := &api.Pod{ + TypeMeta: api.TypeMeta{ + Kind: "Pod", + APIVersion: latest.Version, + }, + ObjectMeta: api.ObjectMeta{ + Name: "dns-test", + Labels: testServiceSelector, + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "results", + VolumeSource: api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }, + }, + }, + Containers: []api.Container{ + // TODO: Consider scraping logs instead of running a webserver. + { + Name: "webserver", + Image: "gcr.io/google_containers/test-webserver", + VolumeMounts: []api.VolumeMount{ + { + Name: "results", + MountPath: "/results", + }, + }, + }, + { + Name: "querier", + Image: "gcr.io/google_containers/dnsutils", + Command: []string{"sh", "-c", probeCmd}, + VolumeMounts: []api.VolumeMount{ + { + Name: "results", + MountPath: "/results", + }, + }, + }, + }, + }, + } + + By("submitting the pod to kubernetes") + podClient = f.Client.Pods(f.Namespace.Name) + defer func() { + By("deleting the pod") + defer GinkgoRecover() + podClient.Delete(pod.Name, nil) + }() + if _, err := podClient.Create(pod); err != nil { + Failf("Failed to create %s pod: %v", pod.Name, err) + } + + expectNoError(f.WaitForPodRunning(pod.Name)) + + By("retrieving the pod") + pod, err = podClient.Get(pod.Name) + if err != nil { + Failf("Failed to get pod %s: %v", pod.Name, err) + } + + // Try to find results for each expected name. + By("looking for the results for each expected name") + var failed []string + + expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) { + failed = []string{} + for _, name := range namesToResolve { + for _, proto := range []string{"udp", "tcp"} { + testCase := fmt.Sprintf("%s@%s", proto, name) + _, err := f.Client.Get(). + Prefix("proxy"). + Resource("pods"). + Namespace(f.Namespace.Name). + Name(pod.Name). + Suffix("results", testCase). + Do().Raw() + if err != nil { + failed = append(failed, testCase) + } + } + } + if len(failed) == 0 { + return true, nil + } + Logf("Lookups using %s failed for: %v\n", pod.Name, failed) + return false, nil + })) + Expect(len(failed)).To(Equal(0)) + + // TODO: probe from the host, too. + + Logf("DNS probes using %s succeeded\n", pod.Name) + }) + }) From d3c7edb78f38bde43038f1128239f0f4ce34571c Mon Sep 17 00:00:00 2001 From: Vishnu Kannan Date: Mon, 25 May 2015 19:31:06 -0700 Subject: [PATCH 3/3] New release of kube2sky (v1.7) --- cluster/addons/dns/kube2sky/Changelog | 4 ++++ cluster/addons/dns/kube2sky/Makefile | 2 +- cluster/addons/dns/kube2sky/README.md | 7 ++++++- cluster/addons/dns/skydns-rc.yaml.in | 2 +- 4 files changed, 12 insertions(+), 3 deletions(-) diff --git a/cluster/addons/dns/kube2sky/Changelog b/cluster/addons/dns/kube2sky/Changelog index b93a874f84b..4075602aaa2 100644 --- a/cluster/addons/dns/kube2sky/Changelog +++ b/cluster/addons/dns/kube2sky/Changelog @@ -1,3 +1,7 @@ * Fri May 15 2015 Tim Hockin - First Changelog entry - Current version is 1.4 + + +## Version 1.7 (May 25 2015 Vishnu Kannan ) +- Adding support for headless services. All pods backing a headless service is addressible via DNS RR. diff --git a/cluster/addons/dns/kube2sky/Makefile b/cluster/addons/dns/kube2sky/Makefile index fa1c8b71256..2f565bc0bb4 100644 --- a/cluster/addons/dns/kube2sky/Makefile +++ b/cluster/addons/dns/kube2sky/Makefile @@ -4,7 +4,7 @@ .PHONY: all kube2sky container push clean test -TAG = 1.6 +TAG = 1.7 PREFIX = gcr.io/google_containers all: container diff --git a/cluster/addons/dns/kube2sky/README.md b/cluster/addons/dns/kube2sky/README.md index d8e3229392e..09867534d3e 100644 --- a/cluster/addons/dns/kube2sky/README.md +++ b/cluster/addons/dns/kube2sky/README.md @@ -21,8 +21,13 @@ example, if this is set to `kubernetes.io`, then a service named "nifty" in the `-verbose`: Log additional information. -'-etcd_mutation_timeout': For how long the application will keep retrying etcd +`-etcd_mutation_timeout`: For how long the application will keep retrying etcd mutation (insertion or removal of a dns entry) before giving up and crashing. +`--etcd-server`: The etcd server that is being used by skydns. + +`--kube_master_url`: URL of kubernetes master. Reuired if `--kubecfg_file` is not set. + +`--kubecfg_file`: Path to kubecfg file that contains the master URL and tokens to authenticate with the master. [![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/cluster/addons/dns/kube2sky/README.md?pixel)]() diff --git a/cluster/addons/dns/skydns-rc.yaml.in b/cluster/addons/dns/skydns-rc.yaml.in index d4fa2614053..32d08db49fd 100644 --- a/cluster/addons/dns/skydns-rc.yaml.in +++ b/cluster/addons/dns/skydns-rc.yaml.in @@ -30,7 +30,7 @@ spec: - -initial-cluster-token - skydns-etcd - name: kube2sky - image: gcr.io/google_containers/kube2sky:1.6 + image: gcr.io/google_containers/kube2sky:1.7 args: # command = "/kube2sky" - -domain={{ pillar['dns_domain'] }}