From 7d0adbcb1c2b7abcfe52755d92fd3b351f7a290d Mon Sep 17 00:00:00 2001 From: Abhishek Shah Date: Thu, 28 May 2015 15:28:17 -0700 Subject: [PATCH] SRV record support --- cluster/addons/dns/kube2sky/Makefile | 2 +- cluster/addons/dns/kube2sky/kube2sky.go | 130 +++++-- cluster/addons/dns/kube2sky/kube2sky_test.go | 361 ++++++++++--------- cluster/addons/dns/skydns-rc.yaml.in | 10 +- test/e2e/dns.go | 313 ++++++++-------- 5 files changed, 445 insertions(+), 371 deletions(-) diff --git a/cluster/addons/dns/kube2sky/Makefile b/cluster/addons/dns/kube2sky/Makefile index deadd8feabb..7f4b7f44591 100644 --- a/cluster/addons/dns/kube2sky/Makefile +++ b/cluster/addons/dns/kube2sky/Makefile @@ -4,7 +4,7 @@ .PHONY: all kube2sky container push clean test -TAG = 1.8 +TAG = 1.9 PREFIX = gcr.io/google_containers all: container diff --git a/cluster/addons/dns/kube2sky/kube2sky.go b/cluster/addons/dns/kube2sky/kube2sky.go index 9c9245f3854..fd85d8d00e4 100644 --- a/cluster/addons/dns/kube2sky/kube2sky.go +++ b/cluster/addons/dns/kube2sky/kube2sky.go @@ -23,6 +23,7 @@ import ( "encoding/json" "flag" "fmt" + "hash/fnv" "net/http" "net/url" "os" @@ -56,7 +57,7 @@ const ( // Maximum number of attempts to connect to etcd server. maxConnectAttempts = 12 // Resync period for the kube controller loop. - resyncPeriod = 5 * time.Second + resyncPeriod = 30 * time.Minute // A subdomain added to the user specified domain for all services. serviceSubdomain = "svc" ) @@ -90,7 +91,7 @@ type kube2sky struct { // Removes 'subdomain' from etcd. func (ks *kube2sky) removeDNS(subdomain string) error { glog.V(2).Infof("Removing %s from DNS", subdomain) - resp, err := ks.etcdClient.RawGet(skymsg.Path(subdomain), false, false) + resp, err := ks.etcdClient.RawGet(skymsg.Path(subdomain), false, true) if err != nil { return err } @@ -109,7 +110,7 @@ func (ks *kube2sky) writeSkyRecord(subdomain string, data string) error { } // Generates skydns records for a headless service. -func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service) error { +func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service, isNewStyleFormat bool) error { // Create an A record for every pod in the service. // This record must be periodically updated. // Format is as follows: @@ -131,7 +132,7 @@ func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service) return nil } if e, ok := e.(*kapi.Endpoints); ok { - return ks.generateRecordsForHeadlessService(subdomain, e, service) + return ks.generateRecordsForHeadlessService(subdomain, e, service, isNewStyleFormat) } return nil } @@ -146,18 +147,33 @@ func getSkyMsg(ip string, port int) *skymsg.Service { } } -func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service) error { +func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service, isNewStyleFormat bool) error { for idx := range e.Subsets { for subIdx := range e.Subsets[idx].Addresses { - subdomain := buildDNSNameString(subdomain, fmt.Sprintf("%d%d", idx, subIdx)) - b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, svc.Spec.Ports[0].Port)) + b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, 0)) if err != nil { return err } - glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b)) - if err := ks.writeSkyRecord(subdomain, string(b)); err != nil { + recordValue := string(b) + recordLabel := getHash(recordValue) + recordKey := buildDNSNameString(subdomain, recordLabel) + + glog.V(2).Infof("Setting DNS record: %v -> %q\n", recordKey, recordValue) + if err := ks.writeSkyRecord(recordKey, recordValue); err != nil { return err } + if isNewStyleFormat { + for portIdx := range e.Subsets[idx].Ports { + endpointPort := &e.Subsets[idx].Ports[portIdx] + portSegment := buildPortSegmentString(endpointPort.Name, endpointPort.Protocol) + if portSegment != "" { + err := ks.generateSRVRecord(subdomain, portSegment, recordLabel, recordKey, endpointPort.Port) + if err != nil { + return err + } + } + } + } } } @@ -183,7 +199,7 @@ func (ks *kube2sky) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, e return nil, fmt.Errorf("got a non service object in services store %v", obj) } -func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints) error { +func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints, isNewStyleFormat bool) error { ks.mlock.Lock() defer ks.mlock.Unlock() svc, err := ks.getServiceFromEndpoints(e) @@ -198,41 +214,90 @@ func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints) er if err := ks.removeDNS(subdomain); err != nil { return err } - return ks.generateRecordsForHeadlessService(subdomain, e, svc) + return ks.generateRecordsForHeadlessService(subdomain, e, svc, isNewStyleFormat) } func (ks *kube2sky) handleEndpointAdd(obj interface{}) { if e, ok := obj.(*kapi.Endpoints); ok { name := buildDNSNameString(ks.domain, e.Namespace, e.Name) - ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) }) + ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e, false) }) name = buildDNSNameString(ks.domain, serviceSubdomain, e.Namespace, e.Name) - ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) }) + ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e, true) }) } } -func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service) error { - for i := range service.Spec.Ports { - b, err := json.Marshal(getSkyMsg(service.Spec.ClusterIP, service.Spec.Ports[i].Port)) +func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service, isNewStyleFormat bool) error { + b, err := json.Marshal(getSkyMsg(service.Spec.ClusterIP, 0)) + if err != nil { + return err + } + recordValue := string(b) + recordKey := subdomain + recordLabel := "" + if isNewStyleFormat { + recordLabel = getHash(recordValue) if err != nil { return err } - glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b)) - if err := ks.writeSkyRecord(subdomain, string(b)); err != nil { - return err + recordKey = buildDNSNameString(subdomain, recordLabel) + } + + glog.V(2).Infof("Setting DNS record: %v -> %q, with recordKey: %v\n", subdomain, recordValue, recordKey) + if err := ks.writeSkyRecord(recordKey, recordValue); err != nil { + return err + } + if !isNewStyleFormat { + return nil + } + // Generate SRV Records + for i := range service.Spec.Ports { + port := &service.Spec.Ports[i] + portSegment := buildPortSegmentString(port.Name, port.Protocol) + if portSegment != "" { + err = ks.generateSRVRecord(subdomain, portSegment, recordLabel, subdomain, port.Port) + if err != nil { + return err + } } } return nil } -func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service) error { +func buildPortSegmentString(portName string, portProtocol kapi.Protocol) string { + if portName == "" { + // we don't create a random name + return "" + } + + if portProtocol == "" { + glog.Errorf("Port Protocol not set. port segment string cannot be created.") + return "" + } + + return fmt.Sprintf("_%s._%s", portName, strings.ToLower(string(portProtocol))) +} + +func (ks *kube2sky) generateSRVRecord(subdomain, portSegment, recordName, cName string, portNumber int) error { + recordKey := buildDNSNameString(subdomain, portSegment, recordName) + srv_rec, err := json.Marshal(getSkyMsg(cName, portNumber)) + if err != nil { + return err + } + if err := ks.writeSkyRecord(recordKey, string(srv_rec)); err != nil { + return err + } + return nil +} + +func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service, isNewStyleFormat bool) error { if len(service.Spec.Ports) == 0 { glog.Fatalf("unexpected service with no ports: %v", service) } // if ClusterIP is not set, a DNS entry should not be created if !kapi.IsServiceIPSet(service) { - return ks.newHeadlessService(subdomain, service) + return ks.newHeadlessService(subdomain, service, isNewStyleFormat) } - return ks.generateRecordsForPortalService(subdomain, service) + return ks.generateRecordsForPortalService(subdomain, service, isNewStyleFormat) } // Implements retry logic for arbitrary mutator. Crashes after retrying for @@ -281,9 +346,9 @@ func (ks *kube2sky) newService(obj interface{}) { if s, ok := obj.(*kapi.Service); ok { //TODO(artfulcoder) stop adding and deleting old-format string for service name := buildDNSNameString(ks.domain, s.Namespace, s.Name) - ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) }) + ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s, false) }) name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name) - ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) }) + ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s, true) }) } } @@ -296,6 +361,12 @@ func (ks *kube2sky) removeService(obj interface{}) { } } +func (ks *kube2sky) updateService(oldObj, newObj interface{}) { + // TODO: Avoid unwanted updates. + ks.removeService(oldObj) + ks.newService(newObj) +} + func newEtcdClient(etcdServer string) (*etcd.Client, error) { var ( client *etcd.Client @@ -390,10 +461,7 @@ func watchForServices(kubeClient *kclient.Client, ks *kube2sky) kcache.Store { kframework.ResourceEventHandlerFuncs{ AddFunc: ks.newService, DeleteFunc: ks.removeService, - UpdateFunc: func(oldObj, newObj interface{}) { - // TODO: Avoid unwanted updates. - ks.newService(newObj) - }, + UpdateFunc: ks.updateService, }, ) go serviceController.Run(util.NeverStop) @@ -418,6 +486,12 @@ func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store { return eStore } +func getHash(text string) string { + h := fnv.New32a() + h.Write([]byte(text)) + return fmt.Sprintf("%x", h.Sum32()) +} + func main() { flag.Parse() var err error diff --git a/cluster/addons/dns/kube2sky/kube2sky_test.go b/cluster/addons/dns/kube2sky/kube2sky_test.go index 5c6d8c68302..272b51a7aa5 100644 --- a/cluster/addons/dns/kube2sky/kube2sky_test.go +++ b/cluster/addons/dns/kube2sky/kube2sky_test.go @@ -18,6 +18,7 @@ package main import ( "encoding/json" + "fmt" "net/http" "path" "strings" @@ -51,18 +52,32 @@ func (ec *fakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, er } func (ec *fakeEtcdClient) RawGet(key string, sort, recursive bool) (*etcd.RawResponse, error) { - count := 0 - for path := range ec.writes { - if strings.HasPrefix(path, key) { - count++ - } - } - if count == 0 { + values := ec.Get(key) + if len(values) == 0 { return &etcd.RawResponse{StatusCode: http.StatusNotFound}, nil } return &etcd.RawResponse{StatusCode: http.StatusOK}, nil } +func (ec *fakeEtcdClient) Get(key string) []string { + values := make([]string, 0, 10) + minSeparatorCount := 0 + key = strings.ToLower(key) + for path := range ec.writes { + if strings.HasPrefix(path, key) { + separatorCount := strings.Count(path, "/") + if minSeparatorCount == 0 || separatorCount < minSeparatorCount { + minSeparatorCount = separatorCount + values = values[:0] + values = append(values, ec.writes[path]) + } else if separatorCount == minSeparatorCount { + values = append(values, ec.writes[path]) + } + } + } + return values +} + const ( testDomain = "cluster.local." basePath = "/skydns/local/cluster" @@ -87,6 +102,10 @@ func getEtcdNewStylePath(name, namespace string) string { return path.Join(basePath, serviceSubDomain, namespace, name) } +func getEtcdPathForSRV(portName, protocol, name, namespace string) string { + return path.Join(basePath, serviceSubDomain, namespace, name, fmt.Sprintf("_%s", strings.ToLower(protocol)), fmt.Sprintf("_%s", strings.ToLower(portName))) +} + type hostPort struct { Host string `json:"host"` Port int `json:"port"` @@ -107,63 +126,110 @@ func getHostPortFromString(data string) (*hostPort, error) { func assertDnsServiceEntryInEtcd(t *testing.T, ec *fakeEtcdClient, serviceName, namespace string, expectedHostPort *hostPort) { oldStyleKey := getEtcdOldStylePath(serviceName, namespace) - val, exists := ec.writes[oldStyleKey] - require.True(t, exists) - actualHostPort, err := getHostPortFromString(val) + values := ec.Get(oldStyleKey) + require.True(t, len(values) > 0, fmt.Sprintf("oldStyleKey '%s' not found.", oldStyleKey)) + actualHostPort, err := getHostPortFromString(values[0]) require.NoError(t, err) - assert.Equal(t, actualHostPort, expectedHostPort) + assert.Equal(t, expectedHostPort.Host, actualHostPort.Host) newStyleKey := getEtcdNewStylePath(serviceName, namespace) - val, exists = ec.writes[newStyleKey] - require.True(t, exists) - actualHostPort, err = getHostPortFromString(val) + values = ec.Get(newStyleKey) + //require.True(t, exists) + require.True(t, len(values) > 0, "newStyleKey entry not found.") + actualHostPort, err = getHostPortFromString(values[0]) require.NoError(t, err) - assert.Equal(t, actualHostPort, expectedHostPort) + assert.Equal(t, expectedHostPort.Host, actualHostPort.Host) } -func TestHeadlessService(t *testing.T) { - const ( - testService = "testService" - testNamespace = "default" - ) - ec := &fakeEtcdClient{make(map[string]string)} - k2s := newKube2Sky(ec) +func assertSRVEntryInEtcd(t *testing.T, ec *fakeEtcdClient, portName, protocol, serviceName, namespace string, expectedPortNumber, expectedEntriesCount int) { + srvKey := getEtcdPathForSRV(portName, protocol, serviceName, namespace) + values := ec.Get(srvKey) + assert.Equal(t, expectedEntriesCount, len(values)) + for i := range values { + actualHostPort, err := getHostPortFromString(values[i]) + require.NoError(t, err) + assert.Equal(t, expectedPortNumber, actualHostPort.Port) + } +} + +func newHeadlessService(namespace, serviceName string) kapi.Service { service := kapi.Service{ ObjectMeta: kapi.ObjectMeta{ - Name: testService, - Namespace: testNamespace, + Name: serviceName, + Namespace: namespace, }, Spec: kapi.ServiceSpec{ ClusterIP: "None", Ports: []kapi.ServicePort{ - {Port: 80}, + {Port: 0}, }, }, } - assert.NoError(t, k2s.servicesStore.Add(&service)) + return service +} + +func newService(namespace, serviceName, clusterIP, portName string, portNumber int) kapi.Service { + service := kapi.Service{ + ObjectMeta: kapi.ObjectMeta{ + Name: serviceName, + Namespace: namespace, + }, + Spec: kapi.ServiceSpec{ + ClusterIP: clusterIP, + Ports: []kapi.ServicePort{ + {Port: portNumber, Name: portName, Protocol: "TCP"}, + }, + }, + } + return service +} + +func newSubset() kapi.EndpointSubset { + subset := kapi.EndpointSubset{ + Addresses: []kapi.EndpointAddress{}, + Ports: []kapi.EndpointPort{}, + } + return subset +} + +func newSubsetWithOnePort(portName string, port int, ips ...string) kapi.EndpointSubset { + subset := newSubset() + subset.Ports = append(subset.Ports, kapi.EndpointPort{Port: port, Name: portName, Protocol: "TCP"}) + for _, ip := range ips { + subset.Addresses = append(subset.Addresses, kapi.EndpointAddress{IP: ip}) + } + return subset +} + +func newSubsetWithTwoPorts(portName1 string, portNumber1 int, portName2 string, portNumber2 int, ips ...string) kapi.EndpointSubset { + subset := newSubsetWithOnePort(portName1, portNumber1, ips...) + subset.Ports = append(subset.Ports, kapi.EndpointPort{Port: portNumber2, Name: portName2, Protocol: "TCP"}) + return subset +} + +func newEndpoints(service kapi.Service, subsets ...kapi.EndpointSubset) kapi.Endpoints { endpoints := kapi.Endpoints{ ObjectMeta: service.ObjectMeta, - Subsets: []kapi.EndpointSubset{ - { - Addresses: []kapi.EndpointAddress{ - {IP: "10.0.0.1"}, - {IP: "10.0.0.2"}, - }, - Ports: []kapi.EndpointPort{ - {Port: 80}, - }, - }, - { - Addresses: []kapi.EndpointAddress{ - {IP: "10.0.0.3"}, - {IP: "10.0.0.4"}, - }, - Ports: []kapi.EndpointPort{ - {Port: 8080}, - }, - }, - }, + Subsets: []kapi.EndpointSubset{}, } + + for _, subset := range subsets { + endpoints.Subsets = append(endpoints.Subsets, subset) + } + return endpoints +} + +func TestHeadlessService(t *testing.T) { + const ( + testService = "testservice" + testNamespace = "default" + ) + ec := &fakeEtcdClient{make(map[string]string)} + k2s := newKube2Sky(ec) + service := newHeadlessService(testNamespace, testService) + assert.NoError(t, k2s.servicesStore.Add(&service)) + endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4")) + // We expect 4 records with "svc" subdomain and 4 records without // "svc" subdomain. expectedDNSRecords := 8 @@ -174,54 +240,56 @@ func TestHeadlessService(t *testing.T) { assert.Empty(t, ec.writes) } -func TestHeadlessServiceEndpointsUpdate(t *testing.T) { +func TestHeadlessServiceWithNamedPorts(t *testing.T) { const ( - testService = "testService" + testService = "testservice" testNamespace = "default" ) ec := &fakeEtcdClient{make(map[string]string)} k2s := newKube2Sky(ec) - service := kapi.Service{ - ObjectMeta: kapi.ObjectMeta{ - Name: testService, - Namespace: testNamespace, - }, - Spec: kapi.ServiceSpec{ - ClusterIP: "None", - Ports: []kapi.ServicePort{ - {Port: 80}, - }, - }, - } + service := newHeadlessService(testNamespace, testService) assert.NoError(t, k2s.servicesStore.Add(&service)) - endpoints := kapi.Endpoints{ - ObjectMeta: service.ObjectMeta, - Subsets: []kapi.EndpointSubset{ - { - Addresses: []kapi.EndpointAddress{ - {IP: "10.0.0.1"}, - {IP: "10.0.0.2"}, - }, - Ports: []kapi.EndpointPort{ - {Port: 80}, - }, - }, - }, - } + endpoints := newEndpoints(service, newSubsetWithTwoPorts("http1", 80, "http2", 81, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("https", 443, "10.0.0.3", "10.0.0.4")) + + // We expect 14 records. 6 SRV records. 4 POD entries with old style, 4 POD entries with new style + // "svc" subdomain. + expectedDNSRecords := 14 + assert.NoError(t, k2s.endpointsStore.Add(&endpoints)) + k2s.newService(&service) + assert.Equal(t, expectedDNSRecords, len(ec.writes)) + assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 2) + assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 81, 2) + assertSRVEntryInEtcd(t, ec, "https", "tcp", testService, testNamespace, 443, 2) + + endpoints.Subsets = endpoints.Subsets[:1] + k2s.handleEndpointAdd(&endpoints) + // We expect 8 records. 4 SRV records. 2 POD entries with old style, 2 POD entries with new style + expectedDNSRecords = 8 + assert.Equal(t, expectedDNSRecords, len(ec.writes)) + assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 2) + assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 81, 2) + + k2s.removeService(&service) + assert.Empty(t, ec.writes) +} + +func TestHeadlessServiceEndpointsUpdate(t *testing.T) { + const ( + testService = "testservice" + testNamespace = "default" + ) + ec := &fakeEtcdClient{make(map[string]string)} + k2s := newKube2Sky(ec) + service := newHeadlessService(testNamespace, testService) + assert.NoError(t, k2s.servicesStore.Add(&service)) + endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2")) + expectedDNSRecords := 4 assert.NoError(t, k2s.endpointsStore.Add(&endpoints)) k2s.newService(&service) assert.Equal(t, expectedDNSRecords, len(ec.writes)) endpoints.Subsets = append(endpoints.Subsets, - kapi.EndpointSubset{ - Addresses: []kapi.EndpointAddress{ - {IP: "10.0.0.3"}, - {IP: "10.0.0.4"}, - }, - Ports: []kapi.EndpointPort{ - {Port: 8080}, - }, - }, + newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"), ) expectedDNSRecords = 8 k2s.handleEndpointAdd(&endpoints) @@ -233,23 +301,12 @@ func TestHeadlessServiceEndpointsUpdate(t *testing.T) { func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) { const ( - testService = "testService" + testService = "testservice" testNamespace = "default" ) ec := &fakeEtcdClient{make(map[string]string)} k2s := newKube2Sky(ec) - service := kapi.Service{ - ObjectMeta: kapi.ObjectMeta{ - Name: testService, - Namespace: testNamespace, - }, - Spec: kapi.ServiceSpec{ - ClusterIP: "None", - Ports: []kapi.ServicePort{ - {Port: 80}, - }, - }, - } + service := newHeadlessService(testNamespace, testService) assert.NoError(t, k2s.servicesStore.Add(&service)) // Headless service DNS records should not be created since // corresponding endpoints object doesn't exist. @@ -257,29 +314,7 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) { assert.Empty(t, ec.writes) // Add an endpoints object for the service. - endpoints := kapi.Endpoints{ - ObjectMeta: service.ObjectMeta, - Subsets: []kapi.EndpointSubset{ - { - Addresses: []kapi.EndpointAddress{ - {IP: "10.0.0.1"}, - {IP: "10.0.0.2"}, - }, - Ports: []kapi.EndpointPort{ - {Port: 80}, - }, - }, - { - Addresses: []kapi.EndpointAddress{ - {IP: "10.0.0.3"}, - {IP: "10.0.0.4"}, - }, - Ports: []kapi.EndpointPort{ - {Port: 8080}, - }, - }, - }, - } + endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4")) // We expect 4 records with "svc" subdomain and 4 records without // "svc" subdomain. expectedDNSRecords := 8 @@ -292,25 +327,12 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) { func TestAddSinglePortService(t *testing.T) { const ( - testService = "testService" + testService = "testservice" testNamespace = "default" ) ec := &fakeEtcdClient{make(map[string]string)} k2s := newKube2Sky(ec) - service := kapi.Service{ - ObjectMeta: kapi.ObjectMeta{ - Name: testService, - Namespace: testNamespace, - }, - Spec: kapi.ServiceSpec{ - Ports: []kapi.ServicePort{ - { - Port: 80, - }, - }, - ClusterIP: "1.2.3.4", - }, - } + service := newService(testNamespace, testService, "1.2.3.4", "", 0) k2s.newService(&service) expectedValue := getHostPort(&service) assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue) @@ -318,54 +340,29 @@ func TestAddSinglePortService(t *testing.T) { func TestUpdateSinglePortService(t *testing.T) { const ( - testService = "testService" + testService = "testservice" testNamespace = "default" ) ec := &fakeEtcdClient{make(map[string]string)} k2s := newKube2Sky(ec) - service := kapi.Service{ - ObjectMeta: kapi.ObjectMeta{ - Name: testService, - Namespace: testNamespace, - }, - Spec: kapi.ServiceSpec{ - Ports: []kapi.ServicePort{ - { - Port: 80, - }, - }, - ClusterIP: "1.2.3.4", - }, - } + service := newService(testNamespace, testService, "1.2.3.4", "", 0) k2s.newService(&service) assert.Len(t, ec.writes, 2) - service.Spec.ClusterIP = "0.0.0.0" - k2s.newService(&service) - expectedValue := getHostPort(&service) + newService := service + newService.Spec.ClusterIP = "0.0.0.0" + k2s.updateService(&service, &newService) + expectedValue := getHostPort(&newService) assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue) } func TestDeleteSinglePortService(t *testing.T) { const ( - testService = "testService" + testService = "testservice" testNamespace = "default" ) ec := &fakeEtcdClient{make(map[string]string)} k2s := newKube2Sky(ec) - service := kapi.Service{ - ObjectMeta: kapi.ObjectMeta{ - Name: testService, - Namespace: testNamespace, - }, - Spec: kapi.ServiceSpec{ - Ports: []kapi.ServicePort{ - { - Port: 80, - }, - }, - ClusterIP: "1.2.3.4", - }, - } + service := newService(testNamespace, testService, "1.2.3.4", "", 80) // Add the service k2s.newService(&service) // two entries should get created, one with the svc subdomain (new-style) @@ -376,6 +373,36 @@ func TestDeleteSinglePortService(t *testing.T) { assert.Empty(t, ec.writes) } +func TestServiceWithNamePort(t *testing.T) { + const ( + testService = "testservice" + testNamespace = "default" + ) + ec := &fakeEtcdClient{make(map[string]string)} + k2s := newKube2Sky(ec) + + // create service + service := newService(testNamespace, testService, "1.2.3.4", "http1", 80) + k2s.newService(&service) + expectedValue := getHostPort(&service) + assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue) + assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 1) + assert.Len(t, ec.writes, 3) + + // update service + newService := service + newService.Spec.Ports[0].Name = "http2" + k2s.updateService(&service, &newService) + expectedValue = getHostPort(&newService) + assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue) + assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 80, 1) + assert.Len(t, ec.writes, 3) + + // Delete the service + k2s.removeService(&service) + assert.Empty(t, ec.writes) +} + func TestBuildDNSName(t *testing.T) { expectedDNSName := "name.ns.svc.cluster.local." assert.Equal(t, expectedDNSName, buildDNSNameString("local.", "cluster", "svc", "ns", "name")) diff --git a/cluster/addons/dns/skydns-rc.yaml.in b/cluster/addons/dns/skydns-rc.yaml.in index e821fd7b175..17f792d06b9 100644 --- a/cluster/addons/dns/skydns-rc.yaml.in +++ b/cluster/addons/dns/skydns-rc.yaml.in @@ -1,21 +1,21 @@ apiVersion: v1beta3 kind: ReplicationController metadata: - name: kube-dns-v2 + name: kube-dns-v3 namespace: default labels: - k8s-app: kube-dns-v2 + k8s-app: kube-dns-v3 kubernetes.io/cluster-service: "true" spec: replicas: {{ pillar['dns_replicas'] }} selector: k8s-app: kube-dns - version: v2 + version: v3 template: metadata: labels: k8s-app: kube-dns - version: v2 + version: v3 kubernetes.io/cluster-service: "true" spec: containers: @@ -30,7 +30,7 @@ spec: - -initial-cluster-token - skydns-etcd - name: kube2sky - image: gcr.io/google_containers/kube2sky:1.8 + image: gcr.io/google_containers/kube2sky:1.9 args: # command = "/kube2sky" - -domain={{ pillar['dns_domain'] }} diff --git a/test/e2e/dns.go b/test/e2e/dns.go index efd320dcd34..b76350dcd58 100644 --- a/test/e2e/dns.go +++ b/test/e2e/dns.go @@ -18,14 +18,15 @@ package e2e import ( "fmt" - "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" + "strings" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -36,6 +37,108 @@ var dnsServiceLableSelector = labels.Set{ "kubernetes.io/cluster-service": "true", }.AsSelector() +func createDNSPod(namespace, probeCmd string) *api.Pod { + pod := &api.Pod{ + TypeMeta: api.TypeMeta{ + Kind: "Pod", + APIVersion: latest.Version, + }, + ObjectMeta: api.ObjectMeta{ + Name: "dns-test-" + string(util.NewUUID()), + Namespace: namespace, + }, + Spec: api.PodSpec{ + Volumes: []api.Volume{ + { + Name: "results", + VolumeSource: api.VolumeSource{ + EmptyDir: &api.EmptyDirVolumeSource{}, + }, + }, + }, + Containers: []api.Container{ + // TODO: Consider scraping logs instead of running a webserver. + { + Name: "webserver", + Image: "gcr.io/google_containers/test-webserver", + Ports: []api.ContainerPort{ + { + Name: "http", + ContainerPort: 80, + }, + }, + VolumeMounts: []api.VolumeMount{ + { + Name: "results", + MountPath: "/results", + }, + }, + }, + { + Name: "querier", + Image: "gcr.io/google_containers/dnsutils", + Command: []string{"sh", "-c", probeCmd}, + VolumeMounts: []api.VolumeMount{ + { + Name: "results", + MountPath: "/results", + }, + }, + }, + }, + }, + } + return pod +} + +func createProbeCommand(namesToResolve []string) (string, []string) { + fileNames := make([]string, 0, len(namesToResolve)*2) + probeCmd := "for i in `seq 1 600`; do " + for _, name := range namesToResolve { + // Resolve by TCP and UDP DNS. Use $$(...) because $(...) is + // expanded by kubernetes (though this won't expand so should + // remain a literal, safe > sorry). + lookup := "A" + if strings.HasPrefix(name, "_") { + lookup = "SRV" + } + fileName := fmt.Sprintf("udp@%s", name) + fileNames = append(fileNames, fileName) + probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search %s %s)" && echo OK > /results/%s;`, name, lookup, fileName) + fileName = fmt.Sprintf("tcp@%s", name) + fileNames = append(fileNames, fileName) + probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s %s)" && echo OK > /results/%s;`, name, lookup, fileName) + } + probeCmd += "sleep 1; done" + return probeCmd, fileNames +} + +func assertFilesExist(fileNames []string, fileDir string, pod *api.Pod, client *client.Client) { + var failed []string + + expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) { + failed = []string{} + for _, fileName := range fileNames { + _, err := client.Get(). + Prefix("proxy"). + Resource("pods"). + Namespace(pod.Namespace). + Name(pod.Name). + Suffix(fileDir, fileName). + Do().Raw() + if err != nil { + failed = append(failed, fileName) + } + } + if len(failed) == 0 { + return true, nil + } + Logf("Lookups using %s failed for: %v\n", pod.Name, failed) + return false, nil + })) + Expect(len(failed)).To(Equal(0)) +} + var _ = Describe("DNS", func() { f := NewFramework("dns") @@ -71,62 +174,11 @@ var _ = Describe("DNS", func() { namesToResolve = append(namesToResolve, "metadata") } - probeCmd := "for i in `seq 1 600`; do " - for _, name := range namesToResolve { - // Resolve by TCP and UDP DNS. Use $$(...) because $(...) is - // expanded by kubernetes (though this won't expand so should - // remain a literal, safe > sorry). - probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search %s)" && echo OK > /results/udp@%s;`, name, name) - probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s)" && echo OK > /results/tcp@%s;`, name, name) - } - probeCmd += "sleep 1; done" + probeCmd, fileNames := createProbeCommand(namesToResolve) // Run a pod which probes DNS and exposes the results by HTTP. By("creating a pod to probe DNS") - pod := &api.Pod{ - TypeMeta: api.TypeMeta{ - Kind: "Pod", - APIVersion: latest.Version, - }, - ObjectMeta: api.ObjectMeta{ - Name: "dns-test-" + string(util.NewUUID()), - Namespace: f.Namespace.Name, - }, - Spec: api.PodSpec{ - Volumes: []api.Volume{ - { - Name: "results", - VolumeSource: api.VolumeSource{ - EmptyDir: &api.EmptyDirVolumeSource{}, - }, - }, - }, - Containers: []api.Container{ - // TODO: Consider scraping logs instead of running a webserver. - { - Name: "webserver", - Image: "gcr.io/google_containers/test-webserver", - VolumeMounts: []api.VolumeMount{ - { - Name: "results", - MountPath: "/results", - }, - }, - }, - { - Name: "querier", - Image: "gcr.io/google_containers/dnsutils", - Command: []string{"sh", "-c", probeCmd}, - VolumeMounts: []api.VolumeMount{ - { - Name: "results", - MountPath: "/results", - }, - }, - }, - }, - }, - } + pod := createDNSPod(f.Namespace.Name, probeCmd) By("submitting the pod to kubernetes") podClient = f.Client.Pods(f.Namespace.Name) @@ -149,38 +201,13 @@ var _ = Describe("DNS", func() { // Try to find results for each expected name. By("looking for the results for each expected name") - var failed []string - - expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) { - failed = []string{} - for _, name := range namesToResolve { - for _, proto := range []string{"udp", "tcp"} { - testCase := fmt.Sprintf("%s@%s", proto, name) - _, err := f.Client.Get(). - Prefix("proxy"). - Resource("pods"). - Namespace(f.Namespace.Name). - Name(pod.Name). - Suffix("results", testCase). - Do().Raw() - if err != nil { - failed = append(failed, testCase) - } - } - } - if len(failed) == 0 { - return true, nil - } - Logf("Lookups using %s failed for: %v\n", pod.Name, failed) - return false, nil - })) - Expect(len(failed)).To(Equal(0)) + assertFilesExist(fileNames, "results", pod, f.Client) // TODO: probe from the host, too. Logf("DNS probes using %s succeeded\n", pod.Name) }) - It("should provide DNS for headless services", func() { + It("should provide DNS for services", func() { if providerIs("vagrant") { By("Skipping test which is broken for vagrant (See https://github.com/GoogleCloudPlatform/kubernetes/issues/3580)") return @@ -200,95 +227,66 @@ var _ = Describe("DNS", func() { // Create a test headless service. By("Creating a test headless service") - testServiceName := "test-service" testServiceSelector := map[string]string{ "dns-test": "true", } - svc := &api.Service{ + headlessService := &api.Service{ ObjectMeta: api.ObjectMeta{ - Name: testServiceName, + Name: "test-service", }, Spec: api.ServiceSpec{ ClusterIP: "None", Ports: []api.ServicePort{ - {Port: 80}, + {Port: 80, Name: "http", Protocol: "tcp"}, }, Selector: testServiceSelector, }, } - _, err = f.Client.Services(f.Namespace.Name).Create(svc) + _, err = f.Client.Services(f.Namespace.Name).Create(headlessService) Expect(err).NotTo(HaveOccurred()) defer func() { By("deleting the test headless service") defer GinkgoRecover() - f.Client.Services(f.Namespace.Name).Delete(svc.Name) + f.Client.Services(f.Namespace.Name).Delete(headlessService.Name) + }() + + regularService := &api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "test-service-2", + }, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Port: 80, Name: "http", Protocol: "tcp"}, + }, + Selector: testServiceSelector, + }, + } + + _, err = f.Client.Services(f.Namespace.Name).Create(regularService) + Expect(err).NotTo(HaveOccurred()) + defer func() { + By("deleting the test service") + defer GinkgoRecover() + f.Client.Services(f.Namespace.Name).Delete(regularService.Name) }() // All the names we need to be able to resolve. // TODO: Create more endpoints and ensure that multiple A records are returned // for headless service. namesToResolve := []string{ - fmt.Sprintf("%s", testServiceName), - fmt.Sprintf("%s.%s", testServiceName, f.Namespace.Name), - fmt.Sprintf("%s.%s.svc", testServiceName, f.Namespace.Name), + fmt.Sprintf("%s", headlessService.Name), + fmt.Sprintf("%s.%s", headlessService.Name, f.Namespace.Name), + fmt.Sprintf("%s.%s.svc", headlessService.Name, f.Namespace.Name), + fmt.Sprintf("_http._tcp.%s.%s.svc", headlessService.Name, f.Namespace.Name), + fmt.Sprintf("_http._tcp.%s.%s.svc", regularService.Name, f.Namespace.Name), } - probeCmd := "for i in `seq 1 600`; do " - for _, name := range namesToResolve { - // Resolve by TCP and UDP DNS. Use $$(...) because $(...) is - // expanded by kubernetes (though this won't expand so should - // remain a literal, safe > sorry). - probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search %s)" && echo OK > /results/udp@%s;`, name, name) - probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s)" && echo OK > /results/tcp@%s;`, name, name) - } - probeCmd += "sleep 1; done" + probeCmd, fileNames := createProbeCommand(namesToResolve) // Run a pod which probes DNS and exposes the results by HTTP. By("creating a pod to probe DNS") - pod := &api.Pod{ - TypeMeta: api.TypeMeta{ - Kind: "Pod", - APIVersion: latest.Version, - }, - ObjectMeta: api.ObjectMeta{ - Name: "dns-test", - Labels: testServiceSelector, - }, - Spec: api.PodSpec{ - Volumes: []api.Volume{ - { - Name: "results", - VolumeSource: api.VolumeSource{ - EmptyDir: &api.EmptyDirVolumeSource{}, - }, - }, - }, - Containers: []api.Container{ - // TODO: Consider scraping logs instead of running a webserver. - { - Name: "webserver", - Image: "gcr.io/google_containers/test-webserver", - VolumeMounts: []api.VolumeMount{ - { - Name: "results", - MountPath: "/results", - }, - }, - }, - { - Name: "querier", - Image: "gcr.io/google_containers/dnsutils", - Command: []string{"sh", "-c", probeCmd}, - VolumeMounts: []api.VolumeMount{ - { - Name: "results", - MountPath: "/results", - }, - }, - }, - }, - }, - } + pod := createDNSPod(f.Namespace.Name, probeCmd) + pod.ObjectMeta.Labels = testServiceSelector By("submitting the pod to kubernetes") podClient = f.Client.Pods(f.Namespace.Name) @@ -311,32 +309,7 @@ var _ = Describe("DNS", func() { // Try to find results for each expected name. By("looking for the results for each expected name") - var failed []string - - expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) { - failed = []string{} - for _, name := range namesToResolve { - for _, proto := range []string{"udp", "tcp"} { - testCase := fmt.Sprintf("%s@%s", proto, name) - _, err := f.Client.Get(). - Prefix("proxy"). - Resource("pods"). - Namespace(f.Namespace.Name). - Name(pod.Name). - Suffix("results", testCase). - Do().Raw() - if err != nil { - failed = append(failed, testCase) - } - } - } - if len(failed) == 0 { - return true, nil - } - Logf("Lookups using %s failed for: %v\n", pod.Name, failed) - return false, nil - })) - Expect(len(failed)).To(Equal(0)) + assertFilesExist(fileNames, "results", pod, f.Client) // TODO: probe from the host, too.