mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
SRV record support
This commit is contained in:
parent
03ece333c8
commit
7d0adbcb1c
@ -4,7 +4,7 @@
|
||||
|
||||
.PHONY: all kube2sky container push clean test
|
||||
|
||||
TAG = 1.8
|
||||
TAG = 1.9
|
||||
PREFIX = gcr.io/google_containers
|
||||
|
||||
all: container
|
||||
|
@ -23,6 +23,7 @@ import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@ -56,7 +57,7 @@ const (
|
||||
// Maximum number of attempts to connect to etcd server.
|
||||
maxConnectAttempts = 12
|
||||
// Resync period for the kube controller loop.
|
||||
resyncPeriod = 5 * time.Second
|
||||
resyncPeriod = 30 * time.Minute
|
||||
// A subdomain added to the user specified domain for all services.
|
||||
serviceSubdomain = "svc"
|
||||
)
|
||||
@ -90,7 +91,7 @@ type kube2sky struct {
|
||||
// Removes 'subdomain' from etcd.
|
||||
func (ks *kube2sky) removeDNS(subdomain string) error {
|
||||
glog.V(2).Infof("Removing %s from DNS", subdomain)
|
||||
resp, err := ks.etcdClient.RawGet(skymsg.Path(subdomain), false, false)
|
||||
resp, err := ks.etcdClient.RawGet(skymsg.Path(subdomain), false, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -109,7 +110,7 @@ func (ks *kube2sky) writeSkyRecord(subdomain string, data string) error {
|
||||
}
|
||||
|
||||
// Generates skydns records for a headless service.
|
||||
func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service) error {
|
||||
func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service, isNewStyleFormat bool) error {
|
||||
// Create an A record for every pod in the service.
|
||||
// This record must be periodically updated.
|
||||
// Format is as follows:
|
||||
@ -131,7 +132,7 @@ func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service)
|
||||
return nil
|
||||
}
|
||||
if e, ok := e.(*kapi.Endpoints); ok {
|
||||
return ks.generateRecordsForHeadlessService(subdomain, e, service)
|
||||
return ks.generateRecordsForHeadlessService(subdomain, e, service, isNewStyleFormat)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@ -146,18 +147,33 @@ func getSkyMsg(ip string, port int) *skymsg.Service {
|
||||
}
|
||||
}
|
||||
|
||||
func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service) error {
|
||||
func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service, isNewStyleFormat bool) error {
|
||||
for idx := range e.Subsets {
|
||||
for subIdx := range e.Subsets[idx].Addresses {
|
||||
subdomain := buildDNSNameString(subdomain, fmt.Sprintf("%d%d", idx, subIdx))
|
||||
b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, svc.Spec.Ports[0].Port))
|
||||
b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, 0))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b))
|
||||
if err := ks.writeSkyRecord(subdomain, string(b)); err != nil {
|
||||
recordValue := string(b)
|
||||
recordLabel := getHash(recordValue)
|
||||
recordKey := buildDNSNameString(subdomain, recordLabel)
|
||||
|
||||
glog.V(2).Infof("Setting DNS record: %v -> %q\n", recordKey, recordValue)
|
||||
if err := ks.writeSkyRecord(recordKey, recordValue); err != nil {
|
||||
return err
|
||||
}
|
||||
if isNewStyleFormat {
|
||||
for portIdx := range e.Subsets[idx].Ports {
|
||||
endpointPort := &e.Subsets[idx].Ports[portIdx]
|
||||
portSegment := buildPortSegmentString(endpointPort.Name, endpointPort.Protocol)
|
||||
if portSegment != "" {
|
||||
err := ks.generateSRVRecord(subdomain, portSegment, recordLabel, recordKey, endpointPort.Port)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -183,7 +199,7 @@ func (ks *kube2sky) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, e
|
||||
return nil, fmt.Errorf("got a non service object in services store %v", obj)
|
||||
}
|
||||
|
||||
func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints) error {
|
||||
func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints, isNewStyleFormat bool) error {
|
||||
ks.mlock.Lock()
|
||||
defer ks.mlock.Unlock()
|
||||
svc, err := ks.getServiceFromEndpoints(e)
|
||||
@ -198,41 +214,90 @@ func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints) er
|
||||
if err := ks.removeDNS(subdomain); err != nil {
|
||||
return err
|
||||
}
|
||||
return ks.generateRecordsForHeadlessService(subdomain, e, svc)
|
||||
return ks.generateRecordsForHeadlessService(subdomain, e, svc, isNewStyleFormat)
|
||||
}
|
||||
|
||||
func (ks *kube2sky) handleEndpointAdd(obj interface{}) {
|
||||
if e, ok := obj.(*kapi.Endpoints); ok {
|
||||
name := buildDNSNameString(ks.domain, e.Namespace, e.Name)
|
||||
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) })
|
||||
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e, false) })
|
||||
name = buildDNSNameString(ks.domain, serviceSubdomain, e.Namespace, e.Name)
|
||||
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) })
|
||||
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e, true) })
|
||||
}
|
||||
}
|
||||
|
||||
func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service) error {
|
||||
for i := range service.Spec.Ports {
|
||||
b, err := json.Marshal(getSkyMsg(service.Spec.ClusterIP, service.Spec.Ports[i].Port))
|
||||
func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service, isNewStyleFormat bool) error {
|
||||
b, err := json.Marshal(getSkyMsg(service.Spec.ClusterIP, 0))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
recordValue := string(b)
|
||||
recordKey := subdomain
|
||||
recordLabel := ""
|
||||
if isNewStyleFormat {
|
||||
recordLabel = getHash(recordValue)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b))
|
||||
if err := ks.writeSkyRecord(subdomain, string(b)); err != nil {
|
||||
return err
|
||||
recordKey = buildDNSNameString(subdomain, recordLabel)
|
||||
}
|
||||
|
||||
glog.V(2).Infof("Setting DNS record: %v -> %q, with recordKey: %v\n", subdomain, recordValue, recordKey)
|
||||
if err := ks.writeSkyRecord(recordKey, recordValue); err != nil {
|
||||
return err
|
||||
}
|
||||
if !isNewStyleFormat {
|
||||
return nil
|
||||
}
|
||||
// Generate SRV Records
|
||||
for i := range service.Spec.Ports {
|
||||
port := &service.Spec.Ports[i]
|
||||
portSegment := buildPortSegmentString(port.Name, port.Protocol)
|
||||
if portSegment != "" {
|
||||
err = ks.generateSRVRecord(subdomain, portSegment, recordLabel, subdomain, port.Port)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service) error {
|
||||
func buildPortSegmentString(portName string, portProtocol kapi.Protocol) string {
|
||||
if portName == "" {
|
||||
// we don't create a random name
|
||||
return ""
|
||||
}
|
||||
|
||||
if portProtocol == "" {
|
||||
glog.Errorf("Port Protocol not set. port segment string cannot be created.")
|
||||
return ""
|
||||
}
|
||||
|
||||
return fmt.Sprintf("_%s._%s", portName, strings.ToLower(string(portProtocol)))
|
||||
}
|
||||
|
||||
func (ks *kube2sky) generateSRVRecord(subdomain, portSegment, recordName, cName string, portNumber int) error {
|
||||
recordKey := buildDNSNameString(subdomain, portSegment, recordName)
|
||||
srv_rec, err := json.Marshal(getSkyMsg(cName, portNumber))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := ks.writeSkyRecord(recordKey, string(srv_rec)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service, isNewStyleFormat bool) error {
|
||||
if len(service.Spec.Ports) == 0 {
|
||||
glog.Fatalf("unexpected service with no ports: %v", service)
|
||||
}
|
||||
// if ClusterIP is not set, a DNS entry should not be created
|
||||
if !kapi.IsServiceIPSet(service) {
|
||||
return ks.newHeadlessService(subdomain, service)
|
||||
return ks.newHeadlessService(subdomain, service, isNewStyleFormat)
|
||||
}
|
||||
return ks.generateRecordsForPortalService(subdomain, service)
|
||||
return ks.generateRecordsForPortalService(subdomain, service, isNewStyleFormat)
|
||||
}
|
||||
|
||||
// Implements retry logic for arbitrary mutator. Crashes after retrying for
|
||||
@ -281,9 +346,9 @@ func (ks *kube2sky) newService(obj interface{}) {
|
||||
if s, ok := obj.(*kapi.Service); ok {
|
||||
//TODO(artfulcoder) stop adding and deleting old-format string for service
|
||||
name := buildDNSNameString(ks.domain, s.Namespace, s.Name)
|
||||
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
|
||||
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s, false) })
|
||||
name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
|
||||
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
|
||||
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s, true) })
|
||||
}
|
||||
}
|
||||
|
||||
@ -296,6 +361,12 @@ func (ks *kube2sky) removeService(obj interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (ks *kube2sky) updateService(oldObj, newObj interface{}) {
|
||||
// TODO: Avoid unwanted updates.
|
||||
ks.removeService(oldObj)
|
||||
ks.newService(newObj)
|
||||
}
|
||||
|
||||
func newEtcdClient(etcdServer string) (*etcd.Client, error) {
|
||||
var (
|
||||
client *etcd.Client
|
||||
@ -390,10 +461,7 @@ func watchForServices(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
|
||||
kframework.ResourceEventHandlerFuncs{
|
||||
AddFunc: ks.newService,
|
||||
DeleteFunc: ks.removeService,
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
// TODO: Avoid unwanted updates.
|
||||
ks.newService(newObj)
|
||||
},
|
||||
UpdateFunc: ks.updateService,
|
||||
},
|
||||
)
|
||||
go serviceController.Run(util.NeverStop)
|
||||
@ -418,6 +486,12 @@ func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
|
||||
return eStore
|
||||
}
|
||||
|
||||
func getHash(text string) string {
|
||||
h := fnv.New32a()
|
||||
h.Write([]byte(text))
|
||||
return fmt.Sprintf("%x", h.Sum32())
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
var err error
|
||||
|
@ -18,6 +18,7 @@ package main
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"path"
|
||||
"strings"
|
||||
@ -51,18 +52,32 @@ func (ec *fakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, er
|
||||
}
|
||||
|
||||
func (ec *fakeEtcdClient) RawGet(key string, sort, recursive bool) (*etcd.RawResponse, error) {
|
||||
count := 0
|
||||
for path := range ec.writes {
|
||||
if strings.HasPrefix(path, key) {
|
||||
count++
|
||||
}
|
||||
}
|
||||
if count == 0 {
|
||||
values := ec.Get(key)
|
||||
if len(values) == 0 {
|
||||
return &etcd.RawResponse{StatusCode: http.StatusNotFound}, nil
|
||||
}
|
||||
return &etcd.RawResponse{StatusCode: http.StatusOK}, nil
|
||||
}
|
||||
|
||||
func (ec *fakeEtcdClient) Get(key string) []string {
|
||||
values := make([]string, 0, 10)
|
||||
minSeparatorCount := 0
|
||||
key = strings.ToLower(key)
|
||||
for path := range ec.writes {
|
||||
if strings.HasPrefix(path, key) {
|
||||
separatorCount := strings.Count(path, "/")
|
||||
if minSeparatorCount == 0 || separatorCount < minSeparatorCount {
|
||||
minSeparatorCount = separatorCount
|
||||
values = values[:0]
|
||||
values = append(values, ec.writes[path])
|
||||
} else if separatorCount == minSeparatorCount {
|
||||
values = append(values, ec.writes[path])
|
||||
}
|
||||
}
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
const (
|
||||
testDomain = "cluster.local."
|
||||
basePath = "/skydns/local/cluster"
|
||||
@ -87,6 +102,10 @@ func getEtcdNewStylePath(name, namespace string) string {
|
||||
return path.Join(basePath, serviceSubDomain, namespace, name)
|
||||
}
|
||||
|
||||
func getEtcdPathForSRV(portName, protocol, name, namespace string) string {
|
||||
return path.Join(basePath, serviceSubDomain, namespace, name, fmt.Sprintf("_%s", strings.ToLower(protocol)), fmt.Sprintf("_%s", strings.ToLower(portName)))
|
||||
}
|
||||
|
||||
type hostPort struct {
|
||||
Host string `json:"host"`
|
||||
Port int `json:"port"`
|
||||
@ -107,63 +126,110 @@ func getHostPortFromString(data string) (*hostPort, error) {
|
||||
|
||||
func assertDnsServiceEntryInEtcd(t *testing.T, ec *fakeEtcdClient, serviceName, namespace string, expectedHostPort *hostPort) {
|
||||
oldStyleKey := getEtcdOldStylePath(serviceName, namespace)
|
||||
val, exists := ec.writes[oldStyleKey]
|
||||
require.True(t, exists)
|
||||
actualHostPort, err := getHostPortFromString(val)
|
||||
values := ec.Get(oldStyleKey)
|
||||
require.True(t, len(values) > 0, fmt.Sprintf("oldStyleKey '%s' not found.", oldStyleKey))
|
||||
actualHostPort, err := getHostPortFromString(values[0])
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, actualHostPort, expectedHostPort)
|
||||
assert.Equal(t, expectedHostPort.Host, actualHostPort.Host)
|
||||
|
||||
newStyleKey := getEtcdNewStylePath(serviceName, namespace)
|
||||
val, exists = ec.writes[newStyleKey]
|
||||
require.True(t, exists)
|
||||
actualHostPort, err = getHostPortFromString(val)
|
||||
values = ec.Get(newStyleKey)
|
||||
//require.True(t, exists)
|
||||
require.True(t, len(values) > 0, "newStyleKey entry not found.")
|
||||
actualHostPort, err = getHostPortFromString(values[0])
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, actualHostPort, expectedHostPort)
|
||||
assert.Equal(t, expectedHostPort.Host, actualHostPort.Host)
|
||||
}
|
||||
|
||||
func TestHeadlessService(t *testing.T) {
|
||||
const (
|
||||
testService = "testService"
|
||||
testNamespace = "default"
|
||||
)
|
||||
ec := &fakeEtcdClient{make(map[string]string)}
|
||||
k2s := newKube2Sky(ec)
|
||||
func assertSRVEntryInEtcd(t *testing.T, ec *fakeEtcdClient, portName, protocol, serviceName, namespace string, expectedPortNumber, expectedEntriesCount int) {
|
||||
srvKey := getEtcdPathForSRV(portName, protocol, serviceName, namespace)
|
||||
values := ec.Get(srvKey)
|
||||
assert.Equal(t, expectedEntriesCount, len(values))
|
||||
for i := range values {
|
||||
actualHostPort, err := getHostPortFromString(values[i])
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedPortNumber, actualHostPort.Port)
|
||||
}
|
||||
}
|
||||
|
||||
func newHeadlessService(namespace, serviceName string) kapi.Service {
|
||||
service := kapi.Service{
|
||||
ObjectMeta: kapi.ObjectMeta{
|
||||
Name: testService,
|
||||
Namespace: testNamespace,
|
||||
Name: serviceName,
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: kapi.ServiceSpec{
|
||||
ClusterIP: "None",
|
||||
Ports: []kapi.ServicePort{
|
||||
{Port: 80},
|
||||
{Port: 0},
|
||||
},
|
||||
},
|
||||
}
|
||||
assert.NoError(t, k2s.servicesStore.Add(&service))
|
||||
return service
|
||||
}
|
||||
|
||||
func newService(namespace, serviceName, clusterIP, portName string, portNumber int) kapi.Service {
|
||||
service := kapi.Service{
|
||||
ObjectMeta: kapi.ObjectMeta{
|
||||
Name: serviceName,
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: kapi.ServiceSpec{
|
||||
ClusterIP: clusterIP,
|
||||
Ports: []kapi.ServicePort{
|
||||
{Port: portNumber, Name: portName, Protocol: "TCP"},
|
||||
},
|
||||
},
|
||||
}
|
||||
return service
|
||||
}
|
||||
|
||||
func newSubset() kapi.EndpointSubset {
|
||||
subset := kapi.EndpointSubset{
|
||||
Addresses: []kapi.EndpointAddress{},
|
||||
Ports: []kapi.EndpointPort{},
|
||||
}
|
||||
return subset
|
||||
}
|
||||
|
||||
func newSubsetWithOnePort(portName string, port int, ips ...string) kapi.EndpointSubset {
|
||||
subset := newSubset()
|
||||
subset.Ports = append(subset.Ports, kapi.EndpointPort{Port: port, Name: portName, Protocol: "TCP"})
|
||||
for _, ip := range ips {
|
||||
subset.Addresses = append(subset.Addresses, kapi.EndpointAddress{IP: ip})
|
||||
}
|
||||
return subset
|
||||
}
|
||||
|
||||
func newSubsetWithTwoPorts(portName1 string, portNumber1 int, portName2 string, portNumber2 int, ips ...string) kapi.EndpointSubset {
|
||||
subset := newSubsetWithOnePort(portName1, portNumber1, ips...)
|
||||
subset.Ports = append(subset.Ports, kapi.EndpointPort{Port: portNumber2, Name: portName2, Protocol: "TCP"})
|
||||
return subset
|
||||
}
|
||||
|
||||
func newEndpoints(service kapi.Service, subsets ...kapi.EndpointSubset) kapi.Endpoints {
|
||||
endpoints := kapi.Endpoints{
|
||||
ObjectMeta: service.ObjectMeta,
|
||||
Subsets: []kapi.EndpointSubset{
|
||||
{
|
||||
Addresses: []kapi.EndpointAddress{
|
||||
{IP: "10.0.0.1"},
|
||||
{IP: "10.0.0.2"},
|
||||
},
|
||||
Ports: []kapi.EndpointPort{
|
||||
{Port: 80},
|
||||
},
|
||||
},
|
||||
{
|
||||
Addresses: []kapi.EndpointAddress{
|
||||
{IP: "10.0.0.3"},
|
||||
{IP: "10.0.0.4"},
|
||||
},
|
||||
Ports: []kapi.EndpointPort{
|
||||
{Port: 8080},
|
||||
},
|
||||
},
|
||||
},
|
||||
Subsets: []kapi.EndpointSubset{},
|
||||
}
|
||||
|
||||
for _, subset := range subsets {
|
||||
endpoints.Subsets = append(endpoints.Subsets, subset)
|
||||
}
|
||||
return endpoints
|
||||
}
|
||||
|
||||
func TestHeadlessService(t *testing.T) {
|
||||
const (
|
||||
testService = "testservice"
|
||||
testNamespace = "default"
|
||||
)
|
||||
ec := &fakeEtcdClient{make(map[string]string)}
|
||||
k2s := newKube2Sky(ec)
|
||||
service := newHeadlessService(testNamespace, testService)
|
||||
assert.NoError(t, k2s.servicesStore.Add(&service))
|
||||
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"))
|
||||
|
||||
// We expect 4 records with "svc" subdomain and 4 records without
|
||||
// "svc" subdomain.
|
||||
expectedDNSRecords := 8
|
||||
@ -174,54 +240,56 @@ func TestHeadlessService(t *testing.T) {
|
||||
assert.Empty(t, ec.writes)
|
||||
}
|
||||
|
||||
func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
|
||||
func TestHeadlessServiceWithNamedPorts(t *testing.T) {
|
||||
const (
|
||||
testService = "testService"
|
||||
testService = "testservice"
|
||||
testNamespace = "default"
|
||||
)
|
||||
ec := &fakeEtcdClient{make(map[string]string)}
|
||||
k2s := newKube2Sky(ec)
|
||||
service := kapi.Service{
|
||||
ObjectMeta: kapi.ObjectMeta{
|
||||
Name: testService,
|
||||
Namespace: testNamespace,
|
||||
},
|
||||
Spec: kapi.ServiceSpec{
|
||||
ClusterIP: "None",
|
||||
Ports: []kapi.ServicePort{
|
||||
{Port: 80},
|
||||
},
|
||||
},
|
||||
}
|
||||
service := newHeadlessService(testNamespace, testService)
|
||||
assert.NoError(t, k2s.servicesStore.Add(&service))
|
||||
endpoints := kapi.Endpoints{
|
||||
ObjectMeta: service.ObjectMeta,
|
||||
Subsets: []kapi.EndpointSubset{
|
||||
{
|
||||
Addresses: []kapi.EndpointAddress{
|
||||
{IP: "10.0.0.1"},
|
||||
{IP: "10.0.0.2"},
|
||||
},
|
||||
Ports: []kapi.EndpointPort{
|
||||
{Port: 80},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
endpoints := newEndpoints(service, newSubsetWithTwoPorts("http1", 80, "http2", 81, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("https", 443, "10.0.0.3", "10.0.0.4"))
|
||||
|
||||
// We expect 14 records. 6 SRV records. 4 POD entries with old style, 4 POD entries with new style
|
||||
// "svc" subdomain.
|
||||
expectedDNSRecords := 14
|
||||
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
|
||||
k2s.newService(&service)
|
||||
assert.Equal(t, expectedDNSRecords, len(ec.writes))
|
||||
assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 2)
|
||||
assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 81, 2)
|
||||
assertSRVEntryInEtcd(t, ec, "https", "tcp", testService, testNamespace, 443, 2)
|
||||
|
||||
endpoints.Subsets = endpoints.Subsets[:1]
|
||||
k2s.handleEndpointAdd(&endpoints)
|
||||
// We expect 8 records. 4 SRV records. 2 POD entries with old style, 2 POD entries with new style
|
||||
expectedDNSRecords = 8
|
||||
assert.Equal(t, expectedDNSRecords, len(ec.writes))
|
||||
assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 2)
|
||||
assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 81, 2)
|
||||
|
||||
k2s.removeService(&service)
|
||||
assert.Empty(t, ec.writes)
|
||||
}
|
||||
|
||||
func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
|
||||
const (
|
||||
testService = "testservice"
|
||||
testNamespace = "default"
|
||||
)
|
||||
ec := &fakeEtcdClient{make(map[string]string)}
|
||||
k2s := newKube2Sky(ec)
|
||||
service := newHeadlessService(testNamespace, testService)
|
||||
assert.NoError(t, k2s.servicesStore.Add(&service))
|
||||
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"))
|
||||
|
||||
expectedDNSRecords := 4
|
||||
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
|
||||
k2s.newService(&service)
|
||||
assert.Equal(t, expectedDNSRecords, len(ec.writes))
|
||||
endpoints.Subsets = append(endpoints.Subsets,
|
||||
kapi.EndpointSubset{
|
||||
Addresses: []kapi.EndpointAddress{
|
||||
{IP: "10.0.0.3"},
|
||||
{IP: "10.0.0.4"},
|
||||
},
|
||||
Ports: []kapi.EndpointPort{
|
||||
{Port: 8080},
|
||||
},
|
||||
},
|
||||
newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"),
|
||||
)
|
||||
expectedDNSRecords = 8
|
||||
k2s.handleEndpointAdd(&endpoints)
|
||||
@ -233,23 +301,12 @@ func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
|
||||
|
||||
func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
|
||||
const (
|
||||
testService = "testService"
|
||||
testService = "testservice"
|
||||
testNamespace = "default"
|
||||
)
|
||||
ec := &fakeEtcdClient{make(map[string]string)}
|
||||
k2s := newKube2Sky(ec)
|
||||
service := kapi.Service{
|
||||
ObjectMeta: kapi.ObjectMeta{
|
||||
Name: testService,
|
||||
Namespace: testNamespace,
|
||||
},
|
||||
Spec: kapi.ServiceSpec{
|
||||
ClusterIP: "None",
|
||||
Ports: []kapi.ServicePort{
|
||||
{Port: 80},
|
||||
},
|
||||
},
|
||||
}
|
||||
service := newHeadlessService(testNamespace, testService)
|
||||
assert.NoError(t, k2s.servicesStore.Add(&service))
|
||||
// Headless service DNS records should not be created since
|
||||
// corresponding endpoints object doesn't exist.
|
||||
@ -257,29 +314,7 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
|
||||
assert.Empty(t, ec.writes)
|
||||
|
||||
// Add an endpoints object for the service.
|
||||
endpoints := kapi.Endpoints{
|
||||
ObjectMeta: service.ObjectMeta,
|
||||
Subsets: []kapi.EndpointSubset{
|
||||
{
|
||||
Addresses: []kapi.EndpointAddress{
|
||||
{IP: "10.0.0.1"},
|
||||
{IP: "10.0.0.2"},
|
||||
},
|
||||
Ports: []kapi.EndpointPort{
|
||||
{Port: 80},
|
||||
},
|
||||
},
|
||||
{
|
||||
Addresses: []kapi.EndpointAddress{
|
||||
{IP: "10.0.0.3"},
|
||||
{IP: "10.0.0.4"},
|
||||
},
|
||||
Ports: []kapi.EndpointPort{
|
||||
{Port: 8080},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
endpoints := newEndpoints(service, newSubsetWithOnePort("", 80, "10.0.0.1", "10.0.0.2"), newSubsetWithOnePort("", 8080, "10.0.0.3", "10.0.0.4"))
|
||||
// We expect 4 records with "svc" subdomain and 4 records without
|
||||
// "svc" subdomain.
|
||||
expectedDNSRecords := 8
|
||||
@ -292,25 +327,12 @@ func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
|
||||
|
||||
func TestAddSinglePortService(t *testing.T) {
|
||||
const (
|
||||
testService = "testService"
|
||||
testService = "testservice"
|
||||
testNamespace = "default"
|
||||
)
|
||||
ec := &fakeEtcdClient{make(map[string]string)}
|
||||
k2s := newKube2Sky(ec)
|
||||
service := kapi.Service{
|
||||
ObjectMeta: kapi.ObjectMeta{
|
||||
Name: testService,
|
||||
Namespace: testNamespace,
|
||||
},
|
||||
Spec: kapi.ServiceSpec{
|
||||
Ports: []kapi.ServicePort{
|
||||
{
|
||||
Port: 80,
|
||||
},
|
||||
},
|
||||
ClusterIP: "1.2.3.4",
|
||||
},
|
||||
}
|
||||
service := newService(testNamespace, testService, "1.2.3.4", "", 0)
|
||||
k2s.newService(&service)
|
||||
expectedValue := getHostPort(&service)
|
||||
assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue)
|
||||
@ -318,54 +340,29 @@ func TestAddSinglePortService(t *testing.T) {
|
||||
|
||||
func TestUpdateSinglePortService(t *testing.T) {
|
||||
const (
|
||||
testService = "testService"
|
||||
testService = "testservice"
|
||||
testNamespace = "default"
|
||||
)
|
||||
ec := &fakeEtcdClient{make(map[string]string)}
|
||||
k2s := newKube2Sky(ec)
|
||||
service := kapi.Service{
|
||||
ObjectMeta: kapi.ObjectMeta{
|
||||
Name: testService,
|
||||
Namespace: testNamespace,
|
||||
},
|
||||
Spec: kapi.ServiceSpec{
|
||||
Ports: []kapi.ServicePort{
|
||||
{
|
||||
Port: 80,
|
||||
},
|
||||
},
|
||||
ClusterIP: "1.2.3.4",
|
||||
},
|
||||
}
|
||||
service := newService(testNamespace, testService, "1.2.3.4", "", 0)
|
||||
k2s.newService(&service)
|
||||
assert.Len(t, ec.writes, 2)
|
||||
service.Spec.ClusterIP = "0.0.0.0"
|
||||
k2s.newService(&service)
|
||||
expectedValue := getHostPort(&service)
|
||||
newService := service
|
||||
newService.Spec.ClusterIP = "0.0.0.0"
|
||||
k2s.updateService(&service, &newService)
|
||||
expectedValue := getHostPort(&newService)
|
||||
assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue)
|
||||
}
|
||||
|
||||
func TestDeleteSinglePortService(t *testing.T) {
|
||||
const (
|
||||
testService = "testService"
|
||||
testService = "testservice"
|
||||
testNamespace = "default"
|
||||
)
|
||||
ec := &fakeEtcdClient{make(map[string]string)}
|
||||
k2s := newKube2Sky(ec)
|
||||
service := kapi.Service{
|
||||
ObjectMeta: kapi.ObjectMeta{
|
||||
Name: testService,
|
||||
Namespace: testNamespace,
|
||||
},
|
||||
Spec: kapi.ServiceSpec{
|
||||
Ports: []kapi.ServicePort{
|
||||
{
|
||||
Port: 80,
|
||||
},
|
||||
},
|
||||
ClusterIP: "1.2.3.4",
|
||||
},
|
||||
}
|
||||
service := newService(testNamespace, testService, "1.2.3.4", "", 80)
|
||||
// Add the service
|
||||
k2s.newService(&service)
|
||||
// two entries should get created, one with the svc subdomain (new-style)
|
||||
@ -376,6 +373,36 @@ func TestDeleteSinglePortService(t *testing.T) {
|
||||
assert.Empty(t, ec.writes)
|
||||
}
|
||||
|
||||
func TestServiceWithNamePort(t *testing.T) {
|
||||
const (
|
||||
testService = "testservice"
|
||||
testNamespace = "default"
|
||||
)
|
||||
ec := &fakeEtcdClient{make(map[string]string)}
|
||||
k2s := newKube2Sky(ec)
|
||||
|
||||
// create service
|
||||
service := newService(testNamespace, testService, "1.2.3.4", "http1", 80)
|
||||
k2s.newService(&service)
|
||||
expectedValue := getHostPort(&service)
|
||||
assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue)
|
||||
assertSRVEntryInEtcd(t, ec, "http1", "tcp", testService, testNamespace, 80, 1)
|
||||
assert.Len(t, ec.writes, 3)
|
||||
|
||||
// update service
|
||||
newService := service
|
||||
newService.Spec.Ports[0].Name = "http2"
|
||||
k2s.updateService(&service, &newService)
|
||||
expectedValue = getHostPort(&newService)
|
||||
assertDnsServiceEntryInEtcd(t, ec, testService, testNamespace, expectedValue)
|
||||
assertSRVEntryInEtcd(t, ec, "http2", "tcp", testService, testNamespace, 80, 1)
|
||||
assert.Len(t, ec.writes, 3)
|
||||
|
||||
// Delete the service
|
||||
k2s.removeService(&service)
|
||||
assert.Empty(t, ec.writes)
|
||||
}
|
||||
|
||||
func TestBuildDNSName(t *testing.T) {
|
||||
expectedDNSName := "name.ns.svc.cluster.local."
|
||||
assert.Equal(t, expectedDNSName, buildDNSNameString("local.", "cluster", "svc", "ns", "name"))
|
||||
|
@ -1,21 +1,21 @@
|
||||
apiVersion: v1beta3
|
||||
kind: ReplicationController
|
||||
metadata:
|
||||
name: kube-dns-v2
|
||||
name: kube-dns-v3
|
||||
namespace: default
|
||||
labels:
|
||||
k8s-app: kube-dns-v2
|
||||
k8s-app: kube-dns-v3
|
||||
kubernetes.io/cluster-service: "true"
|
||||
spec:
|
||||
replicas: {{ pillar['dns_replicas'] }}
|
||||
selector:
|
||||
k8s-app: kube-dns
|
||||
version: v2
|
||||
version: v3
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
k8s-app: kube-dns
|
||||
version: v2
|
||||
version: v3
|
||||
kubernetes.io/cluster-service: "true"
|
||||
spec:
|
||||
containers:
|
||||
@ -30,7 +30,7 @@ spec:
|
||||
- -initial-cluster-token
|
||||
- skydns-etcd
|
||||
- name: kube2sky
|
||||
image: gcr.io/google_containers/kube2sky:1.8
|
||||
image: gcr.io/google_containers/kube2sky:1.9
|
||||
args:
|
||||
# command = "/kube2sky"
|
||||
- -domain={{ pillar['dns_domain'] }}
|
||||
|
313
test/e2e/dns.go
313
test/e2e/dns.go
@ -18,14 +18,15 @@ package e2e
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
@ -36,6 +37,108 @@ var dnsServiceLableSelector = labels.Set{
|
||||
"kubernetes.io/cluster-service": "true",
|
||||
}.AsSelector()
|
||||
|
||||
func createDNSPod(namespace, probeCmd string) *api.Pod {
|
||||
pod := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: latest.Version,
|
||||
},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "dns-test-" + string(util.NewUUID()),
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Volumes: []api.Volume{
|
||||
{
|
||||
Name: "results",
|
||||
VolumeSource: api.VolumeSource{
|
||||
EmptyDir: &api.EmptyDirVolumeSource{},
|
||||
},
|
||||
},
|
||||
},
|
||||
Containers: []api.Container{
|
||||
// TODO: Consider scraping logs instead of running a webserver.
|
||||
{
|
||||
Name: "webserver",
|
||||
Image: "gcr.io/google_containers/test-webserver",
|
||||
Ports: []api.ContainerPort{
|
||||
{
|
||||
Name: "http",
|
||||
ContainerPort: 80,
|
||||
},
|
||||
},
|
||||
VolumeMounts: []api.VolumeMount{
|
||||
{
|
||||
Name: "results",
|
||||
MountPath: "/results",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "querier",
|
||||
Image: "gcr.io/google_containers/dnsutils",
|
||||
Command: []string{"sh", "-c", probeCmd},
|
||||
VolumeMounts: []api.VolumeMount{
|
||||
{
|
||||
Name: "results",
|
||||
MountPath: "/results",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
return pod
|
||||
}
|
||||
|
||||
func createProbeCommand(namesToResolve []string) (string, []string) {
|
||||
fileNames := make([]string, 0, len(namesToResolve)*2)
|
||||
probeCmd := "for i in `seq 1 600`; do "
|
||||
for _, name := range namesToResolve {
|
||||
// Resolve by TCP and UDP DNS. Use $$(...) because $(...) is
|
||||
// expanded by kubernetes (though this won't expand so should
|
||||
// remain a literal, safe > sorry).
|
||||
lookup := "A"
|
||||
if strings.HasPrefix(name, "_") {
|
||||
lookup = "SRV"
|
||||
}
|
||||
fileName := fmt.Sprintf("udp@%s", name)
|
||||
fileNames = append(fileNames, fileName)
|
||||
probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search %s %s)" && echo OK > /results/%s;`, name, lookup, fileName)
|
||||
fileName = fmt.Sprintf("tcp@%s", name)
|
||||
fileNames = append(fileNames, fileName)
|
||||
probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s %s)" && echo OK > /results/%s;`, name, lookup, fileName)
|
||||
}
|
||||
probeCmd += "sleep 1; done"
|
||||
return probeCmd, fileNames
|
||||
}
|
||||
|
||||
func assertFilesExist(fileNames []string, fileDir string, pod *api.Pod, client *client.Client) {
|
||||
var failed []string
|
||||
|
||||
expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) {
|
||||
failed = []string{}
|
||||
for _, fileName := range fileNames {
|
||||
_, err := client.Get().
|
||||
Prefix("proxy").
|
||||
Resource("pods").
|
||||
Namespace(pod.Namespace).
|
||||
Name(pod.Name).
|
||||
Suffix(fileDir, fileName).
|
||||
Do().Raw()
|
||||
if err != nil {
|
||||
failed = append(failed, fileName)
|
||||
}
|
||||
}
|
||||
if len(failed) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
Logf("Lookups using %s failed for: %v\n", pod.Name, failed)
|
||||
return false, nil
|
||||
}))
|
||||
Expect(len(failed)).To(Equal(0))
|
||||
}
|
||||
|
||||
var _ = Describe("DNS", func() {
|
||||
f := NewFramework("dns")
|
||||
|
||||
@ -71,62 +174,11 @@ var _ = Describe("DNS", func() {
|
||||
namesToResolve = append(namesToResolve, "metadata")
|
||||
}
|
||||
|
||||
probeCmd := "for i in `seq 1 600`; do "
|
||||
for _, name := range namesToResolve {
|
||||
// Resolve by TCP and UDP DNS. Use $$(...) because $(...) is
|
||||
// expanded by kubernetes (though this won't expand so should
|
||||
// remain a literal, safe > sorry).
|
||||
probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search %s)" && echo OK > /results/udp@%s;`, name, name)
|
||||
probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s)" && echo OK > /results/tcp@%s;`, name, name)
|
||||
}
|
||||
probeCmd += "sleep 1; done"
|
||||
probeCmd, fileNames := createProbeCommand(namesToResolve)
|
||||
|
||||
// Run a pod which probes DNS and exposes the results by HTTP.
|
||||
By("creating a pod to probe DNS")
|
||||
pod := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: latest.Version,
|
||||
},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "dns-test-" + string(util.NewUUID()),
|
||||
Namespace: f.Namespace.Name,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Volumes: []api.Volume{
|
||||
{
|
||||
Name: "results",
|
||||
VolumeSource: api.VolumeSource{
|
||||
EmptyDir: &api.EmptyDirVolumeSource{},
|
||||
},
|
||||
},
|
||||
},
|
||||
Containers: []api.Container{
|
||||
// TODO: Consider scraping logs instead of running a webserver.
|
||||
{
|
||||
Name: "webserver",
|
||||
Image: "gcr.io/google_containers/test-webserver",
|
||||
VolumeMounts: []api.VolumeMount{
|
||||
{
|
||||
Name: "results",
|
||||
MountPath: "/results",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "querier",
|
||||
Image: "gcr.io/google_containers/dnsutils",
|
||||
Command: []string{"sh", "-c", probeCmd},
|
||||
VolumeMounts: []api.VolumeMount{
|
||||
{
|
||||
Name: "results",
|
||||
MountPath: "/results",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
pod := createDNSPod(f.Namespace.Name, probeCmd)
|
||||
|
||||
By("submitting the pod to kubernetes")
|
||||
podClient = f.Client.Pods(f.Namespace.Name)
|
||||
@ -149,38 +201,13 @@ var _ = Describe("DNS", func() {
|
||||
|
||||
// Try to find results for each expected name.
|
||||
By("looking for the results for each expected name")
|
||||
var failed []string
|
||||
|
||||
expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) {
|
||||
failed = []string{}
|
||||
for _, name := range namesToResolve {
|
||||
for _, proto := range []string{"udp", "tcp"} {
|
||||
testCase := fmt.Sprintf("%s@%s", proto, name)
|
||||
_, err := f.Client.Get().
|
||||
Prefix("proxy").
|
||||
Resource("pods").
|
||||
Namespace(f.Namespace.Name).
|
||||
Name(pod.Name).
|
||||
Suffix("results", testCase).
|
||||
Do().Raw()
|
||||
if err != nil {
|
||||
failed = append(failed, testCase)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(failed) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
Logf("Lookups using %s failed for: %v\n", pod.Name, failed)
|
||||
return false, nil
|
||||
}))
|
||||
Expect(len(failed)).To(Equal(0))
|
||||
assertFilesExist(fileNames, "results", pod, f.Client)
|
||||
|
||||
// TODO: probe from the host, too.
|
||||
|
||||
Logf("DNS probes using %s succeeded\n", pod.Name)
|
||||
})
|
||||
It("should provide DNS for headless services", func() {
|
||||
It("should provide DNS for services", func() {
|
||||
if providerIs("vagrant") {
|
||||
By("Skipping test which is broken for vagrant (See https://github.com/GoogleCloudPlatform/kubernetes/issues/3580)")
|
||||
return
|
||||
@ -200,95 +227,66 @@ var _ = Describe("DNS", func() {
|
||||
|
||||
// Create a test headless service.
|
||||
By("Creating a test headless service")
|
||||
testServiceName := "test-service"
|
||||
testServiceSelector := map[string]string{
|
||||
"dns-test": "true",
|
||||
}
|
||||
svc := &api.Service{
|
||||
headlessService := &api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: testServiceName,
|
||||
Name: "test-service",
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
ClusterIP: "None",
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 80},
|
||||
{Port: 80, Name: "http", Protocol: "tcp"},
|
||||
},
|
||||
Selector: testServiceSelector,
|
||||
},
|
||||
}
|
||||
|
||||
_, err = f.Client.Services(f.Namespace.Name).Create(svc)
|
||||
_, err = f.Client.Services(f.Namespace.Name).Create(headlessService)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer func() {
|
||||
By("deleting the test headless service")
|
||||
defer GinkgoRecover()
|
||||
f.Client.Services(f.Namespace.Name).Delete(svc.Name)
|
||||
f.Client.Services(f.Namespace.Name).Delete(headlessService.Name)
|
||||
}()
|
||||
|
||||
regularService := &api.Service{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "test-service-2",
|
||||
},
|
||||
Spec: api.ServiceSpec{
|
||||
Ports: []api.ServicePort{
|
||||
{Port: 80, Name: "http", Protocol: "tcp"},
|
||||
},
|
||||
Selector: testServiceSelector,
|
||||
},
|
||||
}
|
||||
|
||||
_, err = f.Client.Services(f.Namespace.Name).Create(regularService)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
defer func() {
|
||||
By("deleting the test service")
|
||||
defer GinkgoRecover()
|
||||
f.Client.Services(f.Namespace.Name).Delete(regularService.Name)
|
||||
}()
|
||||
|
||||
// All the names we need to be able to resolve.
|
||||
// TODO: Create more endpoints and ensure that multiple A records are returned
|
||||
// for headless service.
|
||||
namesToResolve := []string{
|
||||
fmt.Sprintf("%s", testServiceName),
|
||||
fmt.Sprintf("%s.%s", testServiceName, f.Namespace.Name),
|
||||
fmt.Sprintf("%s.%s.svc", testServiceName, f.Namespace.Name),
|
||||
fmt.Sprintf("%s", headlessService.Name),
|
||||
fmt.Sprintf("%s.%s", headlessService.Name, f.Namespace.Name),
|
||||
fmt.Sprintf("%s.%s.svc", headlessService.Name, f.Namespace.Name),
|
||||
fmt.Sprintf("_http._tcp.%s.%s.svc", headlessService.Name, f.Namespace.Name),
|
||||
fmt.Sprintf("_http._tcp.%s.%s.svc", regularService.Name, f.Namespace.Name),
|
||||
}
|
||||
|
||||
probeCmd := "for i in `seq 1 600`; do "
|
||||
for _, name := range namesToResolve {
|
||||
// Resolve by TCP and UDP DNS. Use $$(...) because $(...) is
|
||||
// expanded by kubernetes (though this won't expand so should
|
||||
// remain a literal, safe > sorry).
|
||||
probeCmd += fmt.Sprintf(`test -n "$$(dig +notcp +noall +answer +search %s)" && echo OK > /results/udp@%s;`, name, name)
|
||||
probeCmd += fmt.Sprintf(`test -n "$$(dig +tcp +noall +answer +search %s)" && echo OK > /results/tcp@%s;`, name, name)
|
||||
}
|
||||
probeCmd += "sleep 1; done"
|
||||
probeCmd, fileNames := createProbeCommand(namesToResolve)
|
||||
// Run a pod which probes DNS and exposes the results by HTTP.
|
||||
By("creating a pod to probe DNS")
|
||||
pod := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{
|
||||
Kind: "Pod",
|
||||
APIVersion: latest.Version,
|
||||
},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "dns-test",
|
||||
Labels: testServiceSelector,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Volumes: []api.Volume{
|
||||
{
|
||||
Name: "results",
|
||||
VolumeSource: api.VolumeSource{
|
||||
EmptyDir: &api.EmptyDirVolumeSource{},
|
||||
},
|
||||
},
|
||||
},
|
||||
Containers: []api.Container{
|
||||
// TODO: Consider scraping logs instead of running a webserver.
|
||||
{
|
||||
Name: "webserver",
|
||||
Image: "gcr.io/google_containers/test-webserver",
|
||||
VolumeMounts: []api.VolumeMount{
|
||||
{
|
||||
Name: "results",
|
||||
MountPath: "/results",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "querier",
|
||||
Image: "gcr.io/google_containers/dnsutils",
|
||||
Command: []string{"sh", "-c", probeCmd},
|
||||
VolumeMounts: []api.VolumeMount{
|
||||
{
|
||||
Name: "results",
|
||||
MountPath: "/results",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
pod := createDNSPod(f.Namespace.Name, probeCmd)
|
||||
pod.ObjectMeta.Labels = testServiceSelector
|
||||
|
||||
By("submitting the pod to kubernetes")
|
||||
podClient = f.Client.Pods(f.Namespace.Name)
|
||||
@ -311,32 +309,7 @@ var _ = Describe("DNS", func() {
|
||||
|
||||
// Try to find results for each expected name.
|
||||
By("looking for the results for each expected name")
|
||||
var failed []string
|
||||
|
||||
expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) {
|
||||
failed = []string{}
|
||||
for _, name := range namesToResolve {
|
||||
for _, proto := range []string{"udp", "tcp"} {
|
||||
testCase := fmt.Sprintf("%s@%s", proto, name)
|
||||
_, err := f.Client.Get().
|
||||
Prefix("proxy").
|
||||
Resource("pods").
|
||||
Namespace(f.Namespace.Name).
|
||||
Name(pod.Name).
|
||||
Suffix("results", testCase).
|
||||
Do().Raw()
|
||||
if err != nil {
|
||||
failed = append(failed, testCase)
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(failed) == 0 {
|
||||
return true, nil
|
||||
}
|
||||
Logf("Lookups using %s failed for: %v\n", pod.Name, failed)
|
||||
return false, nil
|
||||
}))
|
||||
Expect(len(failed)).To(Equal(0))
|
||||
assertFilesExist(fileNames, "results", pod, f.Client)
|
||||
|
||||
// TODO: probe from the host, too.
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user