Merge pull request #9073 from ArtfulCoder/srv_dns

SRV record support
This commit is contained in:
Quinton Hoole 2015-06-05 11:24:32 -07:00
commit 21db71db1a
5 changed files with 445 additions and 371 deletions

View File

@ -4,7 +4,7 @@
.PHONY: all kube2sky container push clean test
TAG = 1.8
TAG = 1.9
PREFIX = gcr.io/google_containers
all: container

View File

@ -23,6 +23,7 @@ import (
"encoding/json"
"flag"
"fmt"
"hash/fnv"
"net/http"
"net/url"
"os"
@ -56,7 +57,7 @@ const (
// Maximum number of attempts to connect to etcd server.
maxConnectAttempts = 12
// Resync period for the kube controller loop.
resyncPeriod = 5 * time.Second
resyncPeriod = 30 * time.Minute
// A subdomain added to the user specified domain for all services.
serviceSubdomain = "svc"
)
@ -90,7 +91,7 @@ type kube2sky struct {
// Removes 'subdomain' from etcd.
func (ks *kube2sky) removeDNS(subdomain string) error {
glog.V(2).Infof("Removing %s from DNS", subdomain)
resp, err := ks.etcdClient.RawGet(skymsg.Path(subdomain), false, false)
resp, err := ks.etcdClient.RawGet(skymsg.Path(subdomain), false, true)
if err != nil {
return err
}
@ -109,7 +110,7 @@ func (ks *kube2sky) writeSkyRecord(subdomain string, data string) error {
}
// Generates skydns records for a headless service.
func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service) error {
func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service, isNewStyleFormat bool) error {
// Create an A record for every pod in the service.
// This record must be periodically updated.
// Format is as follows:
@ -131,7 +132,7 @@ func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service)
return nil
}
if e, ok := e.(*kapi.Endpoints); ok {
return ks.generateRecordsForHeadlessService(subdomain, e, service)
return ks.generateRecordsForHeadlessService(subdomain, e, service, isNewStyleFormat)
}
return nil
}
@ -146,18 +147,33 @@ func getSkyMsg(ip string, port int) *skymsg.Service {
}
}
func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service) error {
func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service, isNewStyleFormat bool) error {
for idx := range e.Subsets {
for subIdx := range e.Subsets[idx].Addresses {
subdomain := buildDNSNameString(subdomain, fmt.Sprintf("%d%d", idx, subIdx))
b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, svc.Spec.Ports[0].Port))
b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, 0))
if err != nil {
return err
}
glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b))
if err := ks.writeSkyRecord(subdomain, string(b)); err != nil {
recordValue := string(b)
recordLabel := getHash(recordValue)
recordKey := buildDNSNameString(subdomain, recordLabel)
glog.V(2).Infof("Setting DNS record: %v -> %q\n", recordKey, recordValue)
if err := ks.writeSkyRecord(recordKey, recordValue); err != nil {
return err
}
if isNewStyleFormat {
for portIdx := range e.Subsets[idx].Ports {
endpointPort := &e.Subsets[idx].Ports[portIdx]
portSegment := buildPortSegmentString(endpointPort.Name, endpointPort.Protocol)
if portSegment != "" {
err := ks.generateSRVRecord(subdomain, portSegment, recordLabel, recordKey, endpointPort.Port)
if err != nil {
return err
}
}
}
}
}
}
@ -183,7 +199,7 @@ func (ks *kube2sky) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, e
return nil, fmt.Errorf("got a non service object in services store %v", obj)
}
func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints) error {
func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints, isNewStyleFormat bool) error {
ks.mlock.Lock()
defer ks.mlock.Unlock()
svc, err := ks.getServiceFromEndpoints(e)
@ -198,41 +214,90 @@ func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints) er
if err := ks.removeDNS(subdomain); err != nil {
return err
}
return ks.generateRecordsForHeadlessService(subdomain, e, svc)
return ks.generateRecordsForHeadlessService(subdomain, e, svc, isNewStyleFormat)
}
func (ks *kube2sky) handleEndpointAdd(obj interface{}) {
if e, ok := obj.(*kapi.Endpoints); ok {
name := buildDNSNameString(ks.domain, e.Namespace, e.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) })
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e, false) })
name = buildDNSNameString(ks.domain, serviceSubdomain, e.Namespace, e.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) })
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e, true) })
}
}
func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service) error {
for i := range service.Spec.Ports {
b, err := json.Marshal(getSkyMsg(service.Spec.ClusterIP, service.Spec.Ports[i].Port))
func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service, isNewStyleFormat bool) error {
b, err := json.Marshal(getSkyMsg(service.Spec.ClusterIP, 0))
if err != nil {
return err
}
recordValue := string(b)
recordKey := subdomain
recordLabel := ""
if isNewStyleFormat {
recordLabel = getHash(recordValue)
if err != nil {
return err
}
glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b))
if err := ks.writeSkyRecord(subdomain, string(b)); err != nil {
return err
recordKey = buildDNSNameString(subdomain, recordLabel)
}
glog.V(2).Infof("Setting DNS record: %v -> %q, with recordKey: %v\n", subdomain, recordValue, recordKey)
if err := ks.writeSkyRecord(recordKey, recordValue); err != nil {
return err
}
if !isNewStyleFormat {
return nil
}
// Generate SRV Records
for i := range service.Spec.Ports {
port := &service.Spec.Ports[i]
portSegment := buildPortSegmentString(port.Name, port.Protocol)
if portSegment != "" {
err = ks.generateSRVRecord(subdomain, portSegment, recordLabel, subdomain, port.Port)
if err != nil {
return err
}
}
}
return nil
}
func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service) error {
func buildPortSegmentString(portName string, portProtocol kapi.Protocol) string {
if portName == "" {
// we don't create a random name
return ""
}
if portProtocol == "" {
glog.Errorf("Port Protocol not set. port segment string cannot be created.")
return ""
}
return fmt.Sprintf("_%s._%s", portName, strings.ToLower(string(portProtocol)))
}
func (ks *kube2sky) generateSRVRecord(subdomain, portSegment, recordName, cName string, portNumber int) error {
recordKey := buildDNSNameString(subdomain, portSegment, recordName)
srv_rec, err := json.Marshal(getSkyMsg(cName, portNumber))
if err != nil {
return err
}
if err := ks.writeSkyRecord(recordKey, string(srv_rec)); err != nil {
return err
}
return nil
}
func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service, isNewStyleFormat bool) error {
if len(service.Spec.Ports) == 0 {
glog.Fatalf("unexpected service with no ports: %v", service)
}
// if ClusterIP is not set, a DNS entry should not be created
if !kapi.IsServiceIPSet(service) {
return ks.newHeadlessService(subdomain, service)
return ks.newHeadlessService(subdomain, service, isNewStyleFormat)
}
return ks.generateRecordsForPortalService(subdomain, service)
return ks.generateRecordsForPortalService(subdomain, service, isNewStyleFormat)
}
// Implements retry logic for arbitrary mutator. Crashes after retrying for
@ -281,9 +346,9 @@ func (ks *kube2sky) newService(obj interface{}) {
if s, ok := obj.(*kapi.Service); ok {
//TODO(artfulcoder) stop adding and deleting old-format string for service
name := buildDNSNameString(ks.domain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s, false) })
name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s, true) })
}
}
@ -296,6 +361,12 @@ func (ks *kube2sky) removeService(obj interface{}) {
}
}
func (ks *kube2sky) updateService(oldObj, newObj interface{}) {
// TODO: Avoid unwanted updates.
ks.removeService(oldObj)
ks.newService(newObj)
}
func newEtcdClient(etcdServer string) (*etcd.Client, error) {
var (
client *etcd.Client
@ -390,10 +461,7 @@ func watchForServices(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
kframework.ResourceEventHandlerFuncs{
AddFunc: ks.newService,
DeleteFunc: ks.removeService,
UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: Avoid unwanted updates.
ks.newService(newObj)
},
UpdateFunc: ks.updateService,
},
)
go serviceController.Run(util.NeverStop)
@ -418,6 +486,12 @@ func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
return eStore
}
func getHash(text string) string {
h := fnv.New32a()
h.Write([]byte(text))
return fmt.Sprintf("%x", h.Sum32())
}
func main() {
flag.Parse()
var err error

View File

@ -18,6 +18,7 @@ package main
import (
"encoding/json"
"fmt"
"net/http"
"path"
"strings"
@ -51,18 +52,32 @@ func (ec *fakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, er
}
func (ec *fakeEtcdClient) RawGet(key string, sort, recursive bool) (*etcd.RawResponse, error) {
count := 0
for path := range ec.writes {
if strings.HasPrefix(path, key) {
count++
}
}
if count == 0 {
values := ec.Get(key)
if len(values) == 0 {
return &etcd.RawResponse{StatusCode: http.StatusNotFound}, nil
}
return &etcd.RawResponse{StatusCode: http.StatusOK}, nil
}
func (ec *fakeEtcdClient) Get(key string) []string {
values := make([]string, 0, 10)
minSeparatorCount := 0
key = strings.ToLower(key)
for path := range ec.writes {
if strings.HasPrefix(path, key) {
separatorCount := strings.Count(path, "/")
if minSeparatorCount == 0 || separatorCount < minSeparatorCount {
minSeparatorCount = separatorCount
values = values[:0]
values = append(values, ec.writes[path])
} else if separatorCount == minSeparatorCount {
values = append(values, ec.writes[path])
}
}
}
return values
}
const (
testDomain = "cluster.local."
basePath = "/skydns/local/cluster"
@ -87,6 +102,10 @@ func getEtcdNewStylePath(name, namespace string) string {
return path.Join(basePath, serviceSubDomain, namespace, name)
}
func getEtcdPathForSRV(portName, protocol, name, namespace string) string {
return path.Join(basePath, serviceSubDomain, namespace, name, fmt.Sprintf("_%s", strings.ToLower(protocol)), fmt.Sprintf("_%s", strings.ToLower(portName)))
}
type hostPort struct {
Host string `json:"host"`
Port int `json:"port"`
@ -107,63 +126,110 @@ func getHostPortFromString(data string) (*hostPort, error) {
func assertDnsServiceEntryInEtcd(t *testing.T, ec *fakeEtcdClient, serviceName, namespace string, expectedHostPort *hostPort) {
oldStyleKey := getEtcdOldStylePath(serviceName, namespace)
val, exists := ec.writes[oldStyleKey]
require.True(t, exists)
actualHostPort, err := getHostPortFromString(val)
values := ec.Get(oldStyleKey)
require.True(t, len(values) > 0, fmt.Sprintf("oldStyleKey '%s' not found.", oldStyleKey))
actualHostPort, err := getHostPortFromString(values[0])
require.NoError(t, err)
assert.Equal(t, actualHostPort, expectedHostPort)
assert.Equal(t, expectedHostPort.Host, actualHostPort.Host)
newStyleKey := getEtcdNewStylePath(serviceName, namespace)
val, exists = ec.writes[newStyleKey]
require.True(t, exists)
actualHostPort, err = getHostPortFromString(val)
values = ec.Get(newStyleKey)
//require.True(t, exists)
require.True(t, len(values) > 0, "newStyleKey entry not found.")
actualHostPort, err = getHostPortFromString(values[0])
require.NoError(t, err)
assert.Equal(t, actualHostPort, expectedHostPort)
assert.Equal(t, expectedHostPort.Host, actualHostPort.Host)
}
func TestHeadlessService(t *testing.T) {
const (
testService = "testService"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
func assertSRVEntryInEtcd(t *testing.T, ec *fakeEtcdClient, portName, protocol, serviceName, namespace string, expectedPortNumber, expectedEntriesCount int) {
srvKey := getEtcdPathForSRV(portName, protocol, serviceName, namespace)
values := ec.Get(srvKey)
assert.Equal(t, expectedEntriesCount, len(values))
for i := range values {
actualHostPort, err := getHostPortFromString(values[i])
require.NoError(t, err)
assert.Equal(t, expectedPortNumber, actualHostPort.Port)
}
}
func newHeadlessService(namespace, serviceName string) kapi.Service {
service := kapi.Service{
ObjectMeta: kapi.ObjectMeta{
Name: testService,
Namespace: testNamespace,
Name: serviceName,
Namespace: namespace,
},
Spec: kapi.ServiceSpec{
ClusterIP: "None",
Ports: []kapi.ServicePort{
{Port: 80},
{Port: 0},
},
},
}
assert.NoError(t, k2s.servicesStore.Add(&service))
return service
}
func newService(namespace, serviceName, clusterIP, portName string, portNumber int) kapi.Service {
service := kapi.Service{
ObjectMeta: kapi.ObjectMeta{
Name: serviceName,
Namespace: namespace,
},
Spec: kapi.ServiceSpec{
ClusterIP: clusterIP,
Ports: []kapi.ServicePort{
{Port: portNumber, Name: portName, Protocol: "TCP"},
},
},
}
return service
}
func newSubset() kapi.EndpointSubset {
subset := kapi.EndpointSubset{
Addresses: []kapi.EndpointAddress{},
Ports: []kapi.EndpointPort{},
}
return subset
}
func newSubsetWithOnePort(portName string, port int, ips ...string) kapi.EndpointSubset {
subset := newSubset()
subset.Ports = append(subset.Ports, kapi.EndpointPort{Port: port, Name: portName, Protocol: "TCP"})
for _, ip := range ips {
subset.Addresses = append(subset.Addresses, kapi.EndpointAddress{IP: ip})
}
return subset
}
func newSubsetWithTwoPorts(portName1 string, portNumber1 int, portName2 string, portNumber2 int, ips ...string) kapi.EndpointSubset {
subset := newSubsetWithOnePort(portName1, portNumber1, ips...)
subset.Ports = append(subset.Ports, kapi.EndpointPort{Port: portNumber2, Name: portName2, Protocol: "TCP"})
return subset
}
func newEndpoints(service kapi.Service, subsets ...kapi.EndpointSubset) kapi.Endpoints {
endpoints := kapi.Endpoints{
ObjectMeta: service.ObjectMeta,
Subsets: []kapi.EndpointSubset{
{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.1"},
{IP: "10.0.0.2"},
},
Ports: []kapi.EndpointPort{
{Port: 80},
},
},
{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.3"},
{IP: "10.0.0.4"},
},
Ports: []kapi.EndpointPort{
{Port: 8080},
},
},
},
Subsets: []kapi.EndpointSubset{},
}
for _, subset := range subsets {
endpoints.Subsets = append(endpoints.Subsets, subset)
}
return endpoints
}
func TestHeadlessService(t *testing.T) {
const (
testService = "testservice"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
service := newHeadlessService(testNamespace, testService)
assert.NoError(t, k2s.servicesStore.Add(&service))
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"))
// We expect 4 records with "svc" subdomain and 4 records without
// "svc" subdomain.
expectedDNSRecords := 8
@ -174,54 +240,56 @@ func TestHeadlessService(t *testing.T) {
assert.Empty(t, ec.writes)
}
func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
func TestHeadlessServiceWithNamedPorts(t *testing.T) {
const (
testService = "testService"
testService = "testservice"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
service := kapi.Service{
ObjectMeta: kapi.ObjectMeta{
Name: testService,
Namespace: testNamespace,
},
Spec: kapi.ServiceSpec{
ClusterIP: "None",
Ports: []kapi.ServicePort{
{Port: 80},
},
},
}
service := newHeadlessService(testNamespace, testService)
assert.NoError(t, k2s.servicesStore.Add(&service))
endpoints := kapi.Endpoints{
ObjectMeta: service.ObjectMeta,
Subsets: []kapi.EndpointSubset{
{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.1"},
{IP: "10.0.0.2"},
},
Ports: []kapi.EndpointPort{
{Port: 80},
},
},
},
}
endpoints := newEndpoints(service, newSubsetWithTwoPorts("http1", 80, "http2", 81, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("https", 443, "10.0.0.3", "10.0.0.4"))
// We expect 14 records. 6 SRV records. 4 POD entries with old style, 4 POD entries with new style
// "svc" subdomain.
expectedDNSRecords := 14
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
k2s.newService(&service)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 2)
assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 81, 2)
assertSRVEntryInEtcd(t, ec, "https", "tcp", testService, testNamespace, 443, 2)
endpoints.Subsets = endpoints.Subsets[:1]
k2s.handleEndpointAdd(&endpoints)
// We expect 8 records. 4 SRV records. 2 POD entries with old style, 2 POD entries with new style
expectedDNSRecords = 8
assert.Equal(t, expectedDNSRecords, len(ec.writes))
assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 2)
assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 81, 2)
k2s.removeService(&service)
assert.Empty(t, ec.writes)
}
func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
const (
testService = "testservice"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
service := newHeadlessService(testNamespace, testService)
assert.NoError(t, k2s.servicesStore.Add(&service))
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"))
expectedDNSRecords := 4
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
k2s.newService(&service)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
endpoints.Subsets = append(endpoints.Subsets,
kapi.EndpointSubset{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.3"},
{IP: "10.0.0.4"},
},
Ports: []kapi.EndpointPort{
{Port: 8080},
},
},
newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"),
)
expectedDNSRecords = 8
k2s.handleEndpointAdd(&endpoints)
@ -233,23 +301,12 @@ func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
const (
testService = "testService"
testService = "testservice"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
service := kapi.Service{
ObjectMeta: kapi.ObjectMeta{
Name: testService,
Namespace: testNamespace,
},
Spec: kapi.ServiceSpec{
ClusterIP: "None",
Ports: []kapi.ServicePort{
{Port: 80},
},
},
}
service := newHeadlessService(testNamespace, testService)
assert.NoError(t, k2s.servicesStore.Add(&service))
// Headless service DNS records should not be created since
// corresponding endpoints object doesn't exist.
@ -257,29 +314,7 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
assert.Empty(t, ec.writes)
// Add an endpoints object for the service.
endpoints := kapi.Endpoints{
ObjectMeta: service.ObjectMeta,
Subsets: []kapi.EndpointSubset{
{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.1"},
{IP: "10.0.0.2"},
},
Ports: []kapi.EndpointPort{
{Port: 80},
},
},
{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.3"},
{IP: "10.0.0.4"},
},
Ports: []kapi.EndpointPort{
{Port: 8080},
},
},
},
}
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"))
// We expect 4 records with "svc" subdomain and 4 records without
// "svc" subdomain.
expectedDNSRecords := 8
@ -292,25 +327,12 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
func TestAddSinglePortService(t *testing.T) {
const (
testService = "testService"
testService = "testservice"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
service := kapi.Service{
ObjectMeta: kapi.ObjectMeta{
Name: testService,
Namespace: testNamespace,
},
Spec: kapi.ServiceSpec{
Ports: []kapi.ServicePort{
{
Port: 80,
},
},
ClusterIP: "1.2.3.4",
},
}
service := newService(testNamespace, testService, "1.2.3.4", "", 0)
k2s.newService(&service)
expectedValue := getHostPort(&service)
assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue)
@ -318,54 +340,29 @@ func TestAddSinglePortService(t *testing.T) {
func TestUpdateSinglePortService(t *testing.T) {
const (
testService = "testService"
testService = "testservice"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
service := kapi.Service{
ObjectMeta: kapi.ObjectMeta{
Name: testService,
Namespace: testNamespace,
},
Spec: kapi.ServiceSpec{
Ports: []kapi.ServicePort{
{
Port: 80,
},
},
ClusterIP: "1.2.3.4",
},
}
service := newService(testNamespace, testService, "1.2.3.4", "", 0)
k2s.newService(&service)
assert.Len(t, ec.writes, 2)
service.Spec.ClusterIP = "0.0.0.0"
k2s.newService(&service)
expectedValue := getHostPort(&service)
newService := service
newService.Spec.ClusterIP = "0.0.0.0"
k2s.updateService(&service, &newService)
expectedValue := getHostPort(&newService)
assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue)
}
func TestDeleteSinglePortService(t *testing.T) {
const (
testService = "testService"
testService = "testservice"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
service := kapi.Service{
ObjectMeta: kapi.ObjectMeta{
Name: testService,
Namespace: testNamespace,
},
Spec: kapi.ServiceSpec{
Ports: []kapi.ServicePort{
{
Port: 80,
},
},
ClusterIP: "1.2.3.4",
},
}
service := newService(testNamespace, testService, "1.2.3.4", "", 80)
// Add the service
k2s.newService(&service)
// two entries should get created, one with the svc subdomain (new-style)
@ -376,6 +373,36 @@ func TestDeleteSinglePortService(t *testing.T) {
assert.Empty(t, ec.writes)
}
func TestServiceWithNamePort(t *testing.T) {
const (
testService = "testservice"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
// create service
service := newService(testNamespace, testService, "1.2.3.4", "http1", 80)
k2s.newService(&service)
expectedValue := getHostPort(&service)
assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue)
assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 1)
assert.Len(t, ec.writes, 3)
// update service
newService := service
newService.Spec.Ports[0].Name = "http2"
k2s.updateService(&service, &newService)
expectedValue = getHostPort(&newService)
assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue)
assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 80, 1)
assert.Len(t, ec.writes, 3)
// Delete the service
k2s.removeService(&service)
assert.Empty(t, ec.writes)
}
func TestBuildDNSName(t *testing.T) {
expectedDNSName := "name.ns.svc.cluster.local."
assert.Equal(t, expectedDNSName, buildDNSNameString("local.", "cluster", "svc", "ns", "name"))

View File

@ -1,21 +1,21 @@
apiVersion: v1beta3
kind: ReplicationController
metadata:
name: kube-dns-v2
name: kube-dns-v3
namespace: default
labels:
k8s-app: kube-dns-v2
k8s-app: kube-dns-v3
kubernetes.io/cluster-service: "true"
spec:
replicas: {{ pillar['dns_replicas'] }}
selector:
k8s-app: kube-dns
version: v2
version: v3
template:
metadata:
labels:
k8s-app: kube-dns
version: v2
version: v3
kubernetes.io/cluster-service: "true"
spec:
containers:
@ -30,7 +30,7 @@ spec:
- -initial-cluster-token
- skydns-etcd
- name: kube2sky
image: gcr.io/google_containers/kube2sky:1.8
image: gcr.io/google_containers/kube2sky:1.9
args:
# command = "/kube2sky"
- -domain={{ pillar['dns_domain'] }}

View File

@ -18,14 +18,15 @@ package e2e
import (
"fmt"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
"strings"
"time"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
@ -36,6 +37,108 @@ var dnsServiceLableSelector = labels.Set{
"kubernetes.io/cluster-service": "true",
}.AsSelector()
func createDNSPod(namespace, probeCmd string) *api.Pod {
pod := &api.Pod{
TypeMeta: api.TypeMeta{
Kind: "Pod",
APIVersion: latest.Version,
},
ObjectMeta: api.ObjectMeta{
Name: "dns-test-" + string(util.NewUUID()),
Namespace: namespace,
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "results",
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
},
},
},
Containers: []api.Container{
// TODO: Consider scraping logs instead of running a webserver.
{
Name: "webserver",
Image: "gcr.io/google_containers/test-webserver",
Ports: []api.ContainerPort{
{
Name: "http",
ContainerPort: 80,
},
},
VolumeMounts: []api.VolumeMount{
{
Name: "results",
MountPath: "/results",
},
},
},
{
Name: "querier",
Image: "gcr.io/google_containers/dnsutils",
Command: []string{"sh", "-c", probeCmd},
VolumeMounts: []api.VolumeMount{
{
Name: "results",
MountPath: "/results",
},
},
},
},
},
}
return pod
}
func createProbeCommand(namesToResolve []string) (string, []string) {
fileNames := make([]string, 0, len(namesToResolve)*2)
probeCmd := "for i in `seq 1 600`; do "
for _, name := range namesToResolve {
// Resolve by TCP and UDP DNS. Use $$(...) because $(...) is
// expanded by kubernetes (though this won't expand so should
// remain a literal, safe > sorry).
lookup := "A"
if strings.HasPrefix(name, "_") {
lookup = "SRV"
}
fileName := fmt.Sprintf("udp@%s", name)
fileNames = append(fileNames, fileName)
probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search %s %s)" && echo OK > /results/%s;`, name, lookup, fileName)
fileName = fmt.Sprintf("tcp@%s", name)
fileNames = append(fileNames, fileName)
probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s %s)" && echo OK > /results/%s;`, name, lookup, fileName)
}
probeCmd += "sleep 1; done"
return probeCmd, fileNames
}
func assertFilesExist(fileNames []string, fileDir string, pod *api.Pod, client *client.Client) {
var failed []string
expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) {
failed = []string{}
for _, fileName := range fileNames {
_, err := client.Get().
Prefix("proxy").
Resource("pods").
Namespace(pod.Namespace).
Name(pod.Name).
Suffix(fileDir, fileName).
Do().Raw()
if err != nil {
failed = append(failed, fileName)
}
}
if len(failed) == 0 {
return true, nil
}
Logf("Lookups using %s failed for: %v\n", pod.Name, failed)
return false, nil
}))
Expect(len(failed)).To(Equal(0))
}
var _ = Describe("DNS", func() {
f := NewFramework("dns")
@ -71,62 +174,11 @@ var _ = Describe("DNS", func() {
namesToResolve = append(namesToResolve, "metadata")
}
probeCmd := "for i in `seq 1 600`; do "
for _, name := range namesToResolve {
// Resolve by TCP and UDP DNS. Use $$(...) because $(...) is
// expanded by kubernetes (though this won't expand so should
// remain a literal, safe > sorry).
probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search %s)" && echo OK > /results/udp@%s;`, name, name)
probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s)" && echo OK > /results/tcp@%s;`, name, name)
}
probeCmd += "sleep 1; done"
probeCmd, fileNames := createProbeCommand(namesToResolve)
// Run a pod which probes DNS and exposes the results by HTTP.
By("creating a pod to probe DNS")
pod := &api.Pod{
TypeMeta: api.TypeMeta{
Kind: "Pod",
APIVersion: latest.Version,
},
ObjectMeta: api.ObjectMeta{
Name: "dns-test-" + string(util.NewUUID()),
Namespace: f.Namespace.Name,
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "results",
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
},
},
},
Containers: []api.Container{
// TODO: Consider scraping logs instead of running a webserver.
{
Name: "webserver",
Image: "gcr.io/google_containers/test-webserver",
VolumeMounts: []api.VolumeMount{
{
Name: "results",
MountPath: "/results",
},
},
},
{
Name: "querier",
Image: "gcr.io/google_containers/dnsutils",
Command: []string{"sh", "-c", probeCmd},
VolumeMounts: []api.VolumeMount{
{
Name: "results",
MountPath: "/results",
},
},
},
},
},
}
pod := createDNSPod(f.Namespace.Name, probeCmd)
By("submitting the pod to kubernetes")
podClient = f.Client.Pods(f.Namespace.Name)
@ -149,38 +201,13 @@ var _ = Describe("DNS", func() {
// Try to find results for each expected name.
By("looking for the results for each expected name")
var failed []string
expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) {
failed = []string{}
for _, name := range namesToResolve {
for _, proto := range []string{"udp", "tcp"} {
testCase := fmt.Sprintf("%s@%s", proto, name)
_, err := f.Client.Get().
Prefix("proxy").
Resource("pods").
Namespace(f.Namespace.Name).
Name(pod.Name).
Suffix("results", testCase).
Do().Raw()
if err != nil {
failed = append(failed, testCase)
}
}
}
if len(failed) == 0 {
return true, nil
}
Logf("Lookups using %s failed for: %v\n", pod.Name, failed)
return false, nil
}))
Expect(len(failed)).To(Equal(0))
assertFilesExist(fileNames, "results", pod, f.Client)
// TODO: probe from the host, too.
Logf("DNS probes using %s succeeded\n", pod.Name)
})
It("should provide DNS for headless services", func() {
It("should provide DNS for services", func() {
if providerIs("vagrant") {
By("Skipping test which is broken for vagrant (See https://github.com/GoogleCloudPlatform/kubernetes/issues/3580)")
return
@ -200,95 +227,66 @@ var _ = Describe("DNS", func() {
// Create a test headless service.
By("Creating a test headless service")
testServiceName := "test-service"
testServiceSelector := map[string]string{
"dns-test": "true",
}
svc := &api.Service{
headlessService := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: testServiceName,
Name: "test-service",
},
Spec: api.ServiceSpec{
ClusterIP: "None",
Ports: []api.ServicePort{
{Port: 80},
{Port: 80, Name: "http", Protocol: "tcp"},
},
Selector: testServiceSelector,
},
}
_, err = f.Client.Services(f.Namespace.Name).Create(svc)
_, err = f.Client.Services(f.Namespace.Name).Create(headlessService)
Expect(err).NotTo(HaveOccurred())
defer func() {
By("deleting the test headless service")
defer GinkgoRecover()
f.Client.Services(f.Namespace.Name).Delete(svc.Name)
f.Client.Services(f.Namespace.Name).Delete(headlessService.Name)
}()
regularService := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: "test-service-2",
},
Spec: api.ServiceSpec{
Ports: []api.ServicePort{
{Port: 80, Name: "http", Protocol: "tcp"},
},
Selector: testServiceSelector,
},
}
_, err = f.Client.Services(f.Namespace.Name).Create(regularService)
Expect(err).NotTo(HaveOccurred())
defer func() {
By("deleting the test service")
defer GinkgoRecover()
f.Client.Services(f.Namespace.Name).Delete(regularService.Name)
}()
// All the names we need to be able to resolve.
// TODO: Create more endpoints and ensure that multiple A records are returned
// for headless service.
namesToResolve := []string{
fmt.Sprintf("%s", testServiceName),
fmt.Sprintf("%s.%s", testServiceName, f.Namespace.Name),
fmt.Sprintf("%s.%s.svc", testServiceName, f.Namespace.Name),
fmt.Sprintf("%s", headlessService.Name),
fmt.Sprintf("%s.%s", headlessService.Name, f.Namespace.Name),
fmt.Sprintf("%s.%s.svc", headlessService.Name, f.Namespace.Name),
fmt.Sprintf("_http._tcp.%s.%s.svc", headlessService.Name, f.Namespace.Name),
fmt.Sprintf("_http._tcp.%s.%s.svc", regularService.Name, f.Namespace.Name),
}
probeCmd := "for i in `seq 1 600`; do "
for _, name := range namesToResolve {
// Resolve by TCP and UDP DNS. Use $$(...) because $(...) is
// expanded by kubernetes (though this won't expand so should
// remain a literal, safe > sorry).
probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search %s)" && echo OK > /results/udp@%s;`, name, name)
probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s)" && echo OK > /results/tcp@%s;`, name, name)
}
probeCmd += "sleep 1; done"
probeCmd, fileNames := createProbeCommand(namesToResolve)
// Run a pod which probes DNS and exposes the results by HTTP.
By("creating a pod to probe DNS")
pod := &api.Pod{
TypeMeta: api.TypeMeta{
Kind: "Pod",
APIVersion: latest.Version,
},
ObjectMeta: api.ObjectMeta{
Name: "dns-test",
Labels: testServiceSelector,
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "results",
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
},
},
},
Containers: []api.Container{
// TODO: Consider scraping logs instead of running a webserver.
{
Name: "webserver",
Image: "gcr.io/google_containers/test-webserver",
VolumeMounts: []api.VolumeMount{
{
Name: "results",
MountPath: "/results",
},
},
},
{
Name: "querier",
Image: "gcr.io/google_containers/dnsutils",
Command: []string{"sh", "-c", probeCmd},
VolumeMounts: []api.VolumeMount{
{
Name: "results",
MountPath: "/results",
},
},
},
},
},
}
pod := createDNSPod(f.Namespace.Name, probeCmd)
pod.ObjectMeta.Labels = testServiceSelector
By("submitting the pod to kubernetes")
podClient = f.Client.Pods(f.Namespace.Name)
@ -311,32 +309,7 @@ var _ = Describe("DNS", func() {
// Try to find results for each expected name.
By("looking for the results for each expected name")
var failed []string
expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) {
failed = []string{}
for _, name := range namesToResolve {
for _, proto := range []string{"udp", "tcp"} {
testCase := fmt.Sprintf("%s@%s", proto, name)
_, err := f.Client.Get().
Prefix("proxy").
Resource("pods").
Namespace(f.Namespace.Name).
Name(pod.Name).
Suffix("results", testCase).
Do().Raw()
if err != nil {
failed = append(failed, testCase)
}
}
}
if len(failed) == 0 {
return true, nil
}
Logf("Lookups using %s failed for: %v\n", pod.Name, failed)
return false, nil
}))
Expect(len(failed)).To(Equal(0))
assertFilesExist(fileNames, "results", pod, f.Client)
// TODO: probe from the host, too.