Merge pull request #8643 from vishh/headless

Adding support for generating A records for headless services.
This commit is contained in:
Saad Ali 2015-05-26 14:44:00 -07:00
commit abaa278b0e
7 changed files with 582 additions and 71 deletions

View File

@ -1,3 +1,7 @@
* Fri May 15 2015 Tim Hockin <thockin@google.com>
- First Changelog entry
- Current version is 1.4
## Version 1.7 (May 25 2015 Vishnu Kannan <vishnuk@google.com>)
- Adding support for headless services. All pods backing a headless service is addressible via DNS RR.

View File

@ -4,13 +4,13 @@
.PHONY: all kube2sky container push clean test
TAG = 1.6
TAG = 1.7
PREFIX = gcr.io/google_containers
all: container
kube2sky: kube2sky.go
CGO_ENABLED=0 godep go build -a -installsuffix cgo --ldflags '-w' ./kube2sky.go
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 godep go build -a -installsuffix cgo --ldflags '-w' ./kube2sky.go
container: kube2sky
docker build -t $(PREFIX)/kube2sky:$(TAG) .

View File

@ -21,8 +21,13 @@ example, if this is set to `kubernetes.io`, then a service named "nifty" in the
`-verbose`: Log additional information.
'-etcd_mutation_timeout': For how long the application will keep retrying etcd
`-etcd_mutation_timeout`: For how long the application will keep retrying etcd
mutation (insertion or removal of a dns entry) before giving up and crashing.
`--etcd-server`: The etcd server that is being used by skydns.
`--kube_master_url`: URL of kubernetes master. Reuired if `--kubecfg_file` is not set.
`--kubecfg_file`: Path to kubecfg file that contains the master URL and tokens to authenticate with the master.
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/cluster/addons/dns/kube2sky/README.md?pixel)]()

View File

@ -23,17 +23,19 @@ import (
"encoding/json"
"flag"
"fmt"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
kclientcmd "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
kclientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
kcontrollerFramework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
kframework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
kSelector "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
tools "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
@ -56,13 +58,21 @@ const (
maxConnectAttempts = 12
// Resync period for the kube controller loop.
resyncPeriod = 5 * time.Second
// A subdomain added to the user specified domain for all services.
serviceSubdomain = "svc"
)
type etcdClient interface {
Set(path, value string, ttl uint64) (*etcd.Response, error)
RawGet(key string, sort, recursive bool) (*etcd.RawResponse, error)
Delete(path string, recursive bool) (*etcd.Response, error)
}
type nameNamespace struct {
name string
namespace string
}
type kube2sky struct {
// Etcd client.
etcdClient etcdClient
@ -70,43 +80,162 @@ type kube2sky struct {
domain string
// Etcd mutation timeout.
etcdMutationTimeout time.Duration
// A cache that contains all the endpoints in the system.
endpointsStore kcache.Store
// A cache that contains all the servicess in the system.
servicesStore kcache.Store
// Lock for controlling access to headless services.
mlock sync.Mutex
}
func (ks *kube2sky) removeDNS(record string) error {
glog.V(2).Infof("Removing %s from DNS", record)
_, err := ks.etcdClient.Delete(skymsg.Path(record), true)
// Removes 'subdomain' from etcd.
func (ks *kube2sky) removeDNS(subdomain string) error {
glog.V(2).Infof("Removing %s from DNS", subdomain)
resp, err := ks.etcdClient.RawGet(skymsg.Path(subdomain), false, false)
if err != nil {
return err
}
if resp.StatusCode == http.StatusNotFound {
glog.V(2).Infof("Subdomain %q does not exist in etcd", subdomain)
return nil
}
_, err = ks.etcdClient.Delete(skymsg.Path(subdomain), true)
return err
}
func (ks *kube2sky) addDNS(record string, service *kapi.Service) error {
// if PortalIP is not set, a DNS entry should not be created
if !kapi.IsServiceIPSet(service) {
glog.V(1).Infof("Skipping dns record for headless service: %s\n", service.Name)
func (ks *kube2sky) writeSkyRecord(subdomain string, data string) error {
// Set with no TTL, and hope that kubernetes events are accurate.
_, err := ks.etcdClient.Set(skymsg.Path(subdomain), data, uint64(0))
return err
}
// Generates skydns records for a headless service.
func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service) error {
// Create an A record for every pod in the service.
// This record must be periodically updated.
// Format is as follows:
// For a service x, with pods a and b create DNS records,
// a.x.ns.domain. and, b.x.ns.domain.
// TODO: Handle multi-port services.
ks.mlock.Lock()
defer ks.mlock.Unlock()
key, err := kcache.MetaNamespaceKeyFunc(service)
if err != nil {
return err
}
e, exists, err := ks.endpointsStore.GetByKey(key)
if err != nil {
return fmt.Errorf("failed to get endpoints object from endpoints store - %v", err)
}
if !exists {
glog.V(1).Infof("could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up.", service.Name, service.Namespace)
return nil
}
if e, ok := e.(*kapi.Endpoints); ok {
return ks.generateRecordsForHeadlessService(subdomain, e, service)
}
return nil
}
for i := range service.Spec.Ports {
svc := skymsg.Service{
Host: service.Spec.PortalIP,
Port: service.Spec.Ports[i].Port,
Priority: 10,
Weight: 10,
Ttl: 30,
func getSkyMsg(ip string, port int) *skymsg.Service {
return &skymsg.Service{
Host: ip,
Port: port,
Priority: 10,
Weight: 10,
Ttl: 30,
}
}
func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service) error {
for idx := range e.Subsets {
for subIdx := range e.Subsets[idx].Addresses {
subdomain := buildDNSNameString(subdomain, fmt.Sprintf("%d%d", idx, subIdx))
b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, svc.Spec.Ports[0].Port))
if err != nil {
return err
}
glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b))
if err := ks.writeSkyRecord(subdomain, string(b)); err != nil {
return err
}
}
b, err := json.Marshal(svc)
}
return nil
}
func (ks *kube2sky) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, error) {
key, err := kcache.MetaNamespaceKeyFunc(e)
if err != nil {
return nil, err
}
obj, exists, err := ks.servicesStore.GetByKey(key)
if err != nil {
return nil, fmt.Errorf("failed to get service object from services store - %v", err)
}
if !exists {
glog.V(1).Infof("could not find service for endpoint %q in namespace %q", e.Name, e.Namespace)
return nil, nil
}
if svc, ok := obj.(*kapi.Service); ok {
return svc, nil
}
return nil, fmt.Errorf("got a non service object in services store %v", obj)
}
func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints) error {
ks.mlock.Lock()
defer ks.mlock.Unlock()
svc, err := ks.getServiceFromEndpoints(e)
if err != nil {
return err
}
if svc == nil || kapi.IsServiceIPSet(svc) {
// No headless service found corresponding to endpoints object.
return nil
}
// Remove existing DNS entry.
if err := ks.removeDNS(subdomain); err != nil {
return err
}
return ks.generateRecordsForHeadlessService(subdomain, e, svc)
}
func (ks *kube2sky) handleEndpointAdd(obj interface{}) {
if e, ok := obj.(*kapi.Endpoints); ok {
name := buildDNSNameString(ks.domain, e.Namespace, e.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) })
name = buildDNSNameString(ks.domain, serviceSubdomain, e.Namespace, e.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) })
}
}
func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service) error {
for i := range service.Spec.Ports {
b, err := json.Marshal(getSkyMsg(service.Spec.PortalIP, service.Spec.Ports[i].Port))
if err != nil {
return err
}
// Set with no TTL, and hope that kubernetes events are accurate.
glog.V(2).Infof("Setting DNS record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Ports[i].Port)
_, err = ks.etcdClient.Set(skymsg.Path(record), string(b), uint64(0))
if err != nil {
glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b))
if err := ks.writeSkyRecord(subdomain, string(b)); err != nil {
return err
}
}
return nil
}
func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service) error {
if len(service.Spec.Ports) == 0 {
glog.Fatalf("unexpected service with no ports: %v", service)
}
// if PortalIP is not set, a DNS entry should not be created
if !kapi.IsServiceIPSet(service) {
return ks.newHeadlessService(subdomain, service)
}
return ks.generateRecordsForPortalService(subdomain, service)
}
// Implements retry logic for arbitrary mutator. Crashes after retrying for
// etcd_mutation_timeout.
func (ks *kube2sky) mutateEtcdOrDie(mutator func() error) {
@ -127,6 +256,47 @@ func (ks *kube2sky) mutateEtcdOrDie(mutator func() error) {
}
}
func buildDNSNameString(labels ...string) string {
var res string
for _, label := range labels {
if res == "" {
res = label
} else {
res = fmt.Sprintf("%s.%s", label, res)
}
}
return res
}
// Returns a cache.ListWatch that gets all changes to services.
func createServiceLW(kubeClient *kclient.Client) *kcache.ListWatch {
return kcache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kSelector.Everything())
}
// Returns a cache.ListWatch that gets all changes to endpoints.
func createEndpointsLW(kubeClient *kclient.Client) *kcache.ListWatch {
return kcache.NewListWatchFromClient(kubeClient, "endpoints", kapi.NamespaceAll, kSelector.Everything())
}
func (ks *kube2sky) newService(obj interface{}) {
if s, ok := obj.(*kapi.Service); ok {
//TODO(artfulcoder) stop adding and deleting old-format string for service
name := buildDNSNameString(ks.domain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
}
}
func (ks *kube2sky) removeService(obj interface{}) {
if s, ok := obj.(*kapi.Service); ok {
name := buildDNSNameString(ks.domain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
}
}
func newEtcdClient(etcdServer string) (*etcd.Client, error) {
var (
client *etcd.Client
@ -204,61 +374,52 @@ func newKubeClient() (*kclient.Client, error) {
return kclient.New(config)
}
func buildOldNameString(service, namespace, domain string) string {
return fmt.Sprintf("%s.%s.%s.", service, namespace, domain)
}
func buildNewServiceNameString(service, namespace, domain string) string {
return fmt.Sprintf("%s.%s.svc.%s.", service, namespace, domain)
}
// Returns a cache.ListWatch that gets all changes to services.
func createServiceLW(kubeClient *kclient.Client) *cache.ListWatch {
return cache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kSelector.Everything())
}
func (ks *kube2sky) newService(obj interface{}) {
if s, ok := obj.(*kapi.Service); ok {
//TODO(artfulcoder) stop adding and deleting old-format string for service
name := buildOldNameString(s.Name, s.Namespace, ks.domain)
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
name1 := buildNewServiceNameString(s.Name, s.Namespace, ks.domain)
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name1, s) })
}
}
func (ks *kube2sky) removeService(obj interface{}) {
if s, ok := obj.(*kapi.Service); ok {
name := buildOldNameString(s.Name, s.Namespace, ks.domain)
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
name1 := buildNewServiceNameString(s.Name, s.Namespace, ks.domain)
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name1) })
}
}
func watchForServices(kubeClient *kclient.Client, ks *kube2sky) {
var serviceController *kcontrollerFramework.Controller
_, serviceController = framework.NewInformer(
func watchForServices(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
serviceStore, serviceController := kframework.NewInformer(
createServiceLW(kubeClient),
&kapi.Service{},
resyncPeriod,
framework.ResourceEventHandlerFuncs{
kframework.ResourceEventHandlerFuncs{
AddFunc: ks.newService,
DeleteFunc: ks.removeService,
UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: Avoid unwanted updates.
ks.newService(newObj)
},
},
)
serviceController.Run(util.NeverStop)
go serviceController.Run(util.NeverStop)
return serviceStore
}
func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
eStore, eController := kframework.NewInformer(
createEndpointsLW(kubeClient),
&kapi.Endpoints{},
resyncPeriod,
kframework.ResourceEventHandlerFuncs{
AddFunc: ks.handleEndpointAdd,
UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: Avoid unwanted updates.
ks.handleEndpointAdd(newObj)
},
},
)
go eController.Run(util.NeverStop)
return eStore
}
func main() {
flag.Parse()
var err error
// TODO: Validate input flags.
domain := *argDomain
if !strings.HasSuffix(domain, ".") {
domain = fmt.Sprintf("%s.", domain)
}
ks := kube2sky{
domain: *argDomain,
domain: domain,
etcdMutationTimeout: *argEtcdMutationTimeout,
}
if ks.etcdClient, err = newEtcdClient(*argEtcdServer); err != nil {
@ -270,5 +431,8 @@ func main() {
glog.Fatalf("Failed to create a kubernetes client: %v", err)
}
watchForServices(kubeClient, &ks)
ks.endpointsStore = watchEndpoints(kubeClient, &ks)
ks.servicesStore = watchForServices(kubeClient, &ks)
select {}
}

View File

@ -18,12 +18,14 @@ package main
import (
"encoding/json"
"net/http"
"path"
"strings"
"testing"
"time"
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/coreos/go-etcd/etcd"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -34,22 +36,35 @@ type fakeEtcdClient struct {
writes map[string]string
}
func (ec *fakeEtcdClient) Set(path, value string, ttl uint64) (*etcd.Response, error) {
ec.writes[path] = value
func (ec *fakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
ec.writes[key] = value
return nil, nil
}
func (ec *fakeEtcdClient) Delete(path string, recursive bool) (*etcd.Response, error) {
func (ec *fakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
for p := range ec.writes {
if (recursive && strings.HasPrefix(p, path)) || (!recursive && p == path) {
if (recursive && strings.HasPrefix(p, key)) || (!recursive && p == key) {
delete(ec.writes, p)
}
}
return nil, nil
}
func (ec *fakeEtcdClient) RawGet(key string, sort, recursive bool) (*etcd.RawResponse, error) {
count := 0
for path := range ec.writes {
if strings.HasPrefix(path, key) {
count++
}
}
if count == 0 {
return &etcd.RawResponse{StatusCode: http.StatusNotFound}, nil
}
return &etcd.RawResponse{StatusCode: http.StatusOK}, nil
}
const (
testDomain = "cluster.local"
testDomain = "cluster.local."
basePath = "/skydns/local/cluster"
serviceSubDomain = "svc"
)
@ -59,6 +74,8 @@ func newKube2Sky(ec etcdClient) *kube2sky {
etcdClient: ec,
domain: testDomain,
etcdMutationTimeout: time.Second,
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
}
}
@ -113,14 +130,166 @@ func TestHeadlessService(t *testing.T) {
k2s := newKube2Sky(ec)
service := kapi.Service{
ObjectMeta: kapi.ObjectMeta{
Name: testNamespace,
Name: testService,
Namespace: testNamespace,
},
Spec: kapi.ServiceSpec{
PortalIP: "None",
Ports: []kapi.ServicePort{
{Port: 80},
},
},
}
assert.NoError(t, k2s.servicesStore.Add(&service))
endpoints := kapi.Endpoints{
ObjectMeta: service.ObjectMeta,
Subsets: []kapi.EndpointSubset{
{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.1"},
{IP: "10.0.0.2"},
},
Ports: []kapi.EndpointPort{
{Port: 80},
},
},
{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.3"},
{IP: "10.0.0.4"},
},
Ports: []kapi.EndpointPort{
{Port: 8080},
},
},
},
}
// We expect 4 records with "svc" subdomain and 4 records without
// "svc" subdomain.
expectedDNSRecords := 8
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
k2s.newService(&service)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
k2s.removeService(&service)
assert.Empty(t, ec.writes)
}
func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
const (
testService = "testService"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
service := kapi.Service{
ObjectMeta: kapi.ObjectMeta{
Name: testService,
Namespace: testNamespace,
},
Spec: kapi.ServiceSpec{
PortalIP: "None",
Ports: []kapi.ServicePort{
{Port: 80},
},
},
}
assert.NoError(t, k2s.servicesStore.Add(&service))
endpoints := kapi.Endpoints{
ObjectMeta: service.ObjectMeta,
Subsets: []kapi.EndpointSubset{
{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.1"},
{IP: "10.0.0.2"},
},
Ports: []kapi.EndpointPort{
{Port: 80},
},
},
},
}
expectedDNSRecords := 4
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
k2s.newService(&service)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
endpoints.Subsets = append(endpoints.Subsets,
kapi.EndpointSubset{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.3"},
{IP: "10.0.0.4"},
},
Ports: []kapi.EndpointPort{
{Port: 8080},
},
},
)
expectedDNSRecords = 8
k2s.handleEndpointAdd(&endpoints)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
k2s.removeService(&service)
assert.Empty(t, ec.writes)
}
func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
const (
testService = "testService"
testNamespace = "default"
)
ec := &fakeEtcdClient{make(map[string]string)}
k2s := newKube2Sky(ec)
service := kapi.Service{
ObjectMeta: kapi.ObjectMeta{
Name: testService,
Namespace: testNamespace,
},
Spec: kapi.ServiceSpec{
PortalIP: "None",
Ports: []kapi.ServicePort{
{Port: 80},
},
},
}
assert.NoError(t, k2s.servicesStore.Add(&service))
// Headless service DNS records should not be created since
// corresponding endpoints object doesn't exist.
k2s.newService(&service)
assert.Empty(t, ec.writes)
// Add an endpoints object for the service.
endpoints := kapi.Endpoints{
ObjectMeta: service.ObjectMeta,
Subsets: []kapi.EndpointSubset{
{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.1"},
{IP: "10.0.0.2"},
},
Ports: []kapi.EndpointPort{
{Port: 80},
},
},
{
Addresses: []kapi.EndpointAddress{
{IP: "10.0.0.3"},
{IP: "10.0.0.4"},
},
Ports: []kapi.EndpointPort{
{Port: 8080},
},
},
},
}
// We expect 4 records with "svc" subdomain and 4 records without
// "svc" subdomain.
expectedDNSRecords := 8
k2s.handleEndpointAdd(&endpoints)
assert.Equal(t, expectedDNSRecords, len(ec.writes))
}
// TODO: Test service updates for headless services.
// TODO: Test headless service addition with delayed endpoints addition
func TestAddSinglePortService(t *testing.T) {
const (
testService = "testService"
@ -206,3 +375,10 @@ func TestDeleteSinglePortService(t *testing.T) {
k2s.removeService(&service)
assert.Empty(t, ec.writes)
}
func TestBuildDNSName(t *testing.T) {
expectedDNSName := "name.ns.svc.cluster.local."
assert.Equal(t, expectedDNSName, buildDNSNameString("local.", "cluster", "svc", "ns", "name"))
newExpectedDNSName := "00.name.ns.svc.cluster.local."
assert.Equal(t, newExpectedDNSName, buildDNSNameString(expectedDNSName, "00"))
}

View File

@ -30,7 +30,7 @@ spec:
- -initial-cluster-token
- skydns-etcd
- name: kube2sky
image: gcr.io/google_containers/kube2sky:1.6
image: gcr.io/google_containers/kube2sky:1.7
args:
# command = "/kube2sky"
- -domain={{ pillar['dns_domain'] }}

View File

@ -178,4 +178,166 @@ var _ = Describe("DNS", func() {
Logf("DNS probes using %s succeeded\n", pod.Name)
})
It("should provide DNS for headless services", func() {
if providerIs("vagrant") {
By("Skipping test which is broken for vagrant (See https://github.com/GoogleCloudPlatform/kubernetes/issues/3580)")
return
}
podClient := f.Client.Pods(api.NamespaceDefault)
By("Waiting for DNS Service to be Running")
dnsPods, err := podClient.List(dnsServiceLableSelector, fields.Everything())
if err != nil {
Failf("Failed to list all dns service pods")
}
if len(dnsPods.Items) != 1 {
Failf("Unexpected number of pods (%d) matches the label selector %v", len(dnsPods.Items), dnsServiceLableSelector.String())
}
expectNoError(waitForPodRunning(f.Client, dnsPods.Items[0].Name))
// Create a test headless service.
By("Creating a test headless service")
testServiceName := "test-service"
testServiceSelector := map[string]string{
"dns-test": "true",
}
svc := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: testServiceName,
},
Spec: api.ServiceSpec{
PortalIP: "None",
Ports: []api.ServicePort{
{Port: 80},
},
Selector: testServiceSelector,
},
}
_, err = f.Client.Services(f.Namespace.Name).Create(svc)
Expect(err).NotTo(HaveOccurred())
defer func() {
By("deleting the test headless service")
defer GinkgoRecover()
f.Client.Services(f.Namespace.Name).Delete(svc.Name)
}()
// All the names we need to be able to resolve.
// TODO: Create more endpoints and ensure that multiple A records are returned
// for headless service.
namesToResolve := []string{
fmt.Sprintf("%s", testServiceName),
fmt.Sprintf("%s.%s", testServiceName, f.Namespace.Name),
fmt.Sprintf("%s.%s.svc", testServiceName, f.Namespace.Name),
}
probeCmd := "for i in `seq 1 600`; do "
for _, name := range namesToResolve {
// Resolve by TCP and UDP DNS.
probeCmd += fmt.Sprintf(`test -n "$(dig +notcp +noall +answer +search %s)" && echo OK > /results/udp@%s;`, name, name)
probeCmd += fmt.Sprintf(`test -n "$(dig +tcp +noall +answer +search %s)" && echo OK > /results/tcp@%s;`, name, name)
}
probeCmd += "sleep 1; done"
Logf("vishh: 1")
// Run a pod which probes DNS and exposes the results by HTTP.
By("creating a pod to probe DNS")
pod := &api.Pod{
TypeMeta: api.TypeMeta{
Kind: "Pod",
APIVersion: latest.Version,
},
ObjectMeta: api.ObjectMeta{
Name: "dns-test",
Labels: testServiceSelector,
},
Spec: api.PodSpec{
Volumes: []api.Volume{
{
Name: "results",
VolumeSource: api.VolumeSource{
EmptyDir: &api.EmptyDirVolumeSource{},
},
},
},
Containers: []api.Container{
// TODO: Consider scraping logs instead of running a webserver.
{
Name: "webserver",
Image: "gcr.io/google_containers/test-webserver",
VolumeMounts: []api.VolumeMount{
{
Name: "results",
MountPath: "/results",
},
},
},
{
Name: "querier",
Image: "gcr.io/google_containers/dnsutils",
Command: []string{"sh", "-c", probeCmd},
VolumeMounts: []api.VolumeMount{
{
Name: "results",
MountPath: "/results",
},
},
},
},
},
}
By("submitting the pod to kubernetes")
podClient = f.Client.Pods(f.Namespace.Name)
defer func() {
By("deleting the pod")
defer GinkgoRecover()
podClient.Delete(pod.Name, nil)
}()
if _, err := podClient.Create(pod); err != nil {
Failf("Failed to create %s pod: %v", pod.Name, err)
}
expectNoError(f.WaitForPodRunning(pod.Name))
By("retrieving the pod")
pod, err = podClient.Get(pod.Name)
if err != nil {
Failf("Failed to get pod %s: %v", pod.Name, err)
}
// Try to find results for each expected name.
By("looking for the results for each expected name")
var failed []string
expectNoError(wait.Poll(time.Second*2, time.Second*60, func() (bool, error) {
failed = []string{}
for _, name := range namesToResolve {
for _, proto := range []string{"udp", "tcp"} {
testCase := fmt.Sprintf("%s@%s", proto, name)
_, err := f.Client.Get().
Prefix("proxy").
Resource("pods").
Namespace(f.Namespace.Name).
Name(pod.Name).
Suffix("results", testCase).
Do().Raw()
if err != nil {
failed = append(failed, testCase)
}
}
}
if len(failed) == 0 {
return true, nil
}
Logf("Lookups using %s failed for: %v\n", pod.Name, failed)
return false, nil
}))
Expect(len(failed)).To(Equal(0))
// TODO: probe from the host, too.
Logf("DNS probes using %s succeeded\n", pod.Name)
})
})