mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-09 03:57:41 +00:00
Adding support for generating A records for headless services.
This commit is contained in:
parent
fe84643ccd
commit
f0c20e17c5
@ -10,7 +10,7 @@ PREFIX = gcr.io/google_containers
|
|||||||
all: container
|
all: container
|
||||||
|
|
||||||
kube2sky: kube2sky.go
|
kube2sky: kube2sky.go
|
||||||
CGO_ENABLED=0 godep go build -a -installsuffix cgo --ldflags '-w' ./kube2sky.go
|
GOOS=linux GOARCH=amd64 CGO_ENABLED=0 godep go build -a -installsuffix cgo --ldflags '-w' ./kube2sky.go
|
||||||
|
|
||||||
container: kube2sky
|
container: kube2sky
|
||||||
docker build -t $(PREFIX)/kube2sky:$(TAG) .
|
docker build -t $(PREFIX)/kube2sky:$(TAG) .
|
||||||
|
@ -23,17 +23,19 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
kcache "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||||
kclientcmd "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
|
kclientcmd "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd"
|
||||||
kclientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api"
|
kclientcmdapi "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
kframework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
||||||
kcontrollerFramework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
|
|
||||||
kSelector "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
kSelector "github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
|
||||||
tools "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
tools "github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||||
@ -56,13 +58,21 @@ const (
|
|||||||
maxConnectAttempts = 12
|
maxConnectAttempts = 12
|
||||||
// Resync period for the kube controller loop.
|
// Resync period for the kube controller loop.
|
||||||
resyncPeriod = 5 * time.Second
|
resyncPeriod = 5 * time.Second
|
||||||
|
// A subdomain added to the user specified domain for all services.
|
||||||
|
serviceSubdomain = "svc"
|
||||||
)
|
)
|
||||||
|
|
||||||
type etcdClient interface {
|
type etcdClient interface {
|
||||||
Set(path, value string, ttl uint64) (*etcd.Response, error)
|
Set(path, value string, ttl uint64) (*etcd.Response, error)
|
||||||
|
RawGet(key string, sort, recursive bool) (*etcd.RawResponse, error)
|
||||||
Delete(path string, recursive bool) (*etcd.Response, error)
|
Delete(path string, recursive bool) (*etcd.Response, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type nameNamespace struct {
|
||||||
|
name string
|
||||||
|
namespace string
|
||||||
|
}
|
||||||
|
|
||||||
type kube2sky struct {
|
type kube2sky struct {
|
||||||
// Etcd client.
|
// Etcd client.
|
||||||
etcdClient etcdClient
|
etcdClient etcdClient
|
||||||
@ -70,43 +80,162 @@ type kube2sky struct {
|
|||||||
domain string
|
domain string
|
||||||
// Etcd mutation timeout.
|
// Etcd mutation timeout.
|
||||||
etcdMutationTimeout time.Duration
|
etcdMutationTimeout time.Duration
|
||||||
|
// A cache that contains all the endpoints in the system.
|
||||||
|
endpointsStore kcache.Store
|
||||||
|
// A cache that contains all the servicess in the system.
|
||||||
|
servicesStore kcache.Store
|
||||||
|
// Lock for controlling access to headless services.
|
||||||
|
mlock sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ks *kube2sky) removeDNS(record string) error {
|
// Removes 'subdomain' from etcd.
|
||||||
glog.V(2).Infof("Removing %s from DNS", record)
|
func (ks *kube2sky) removeDNS(subdomain string) error {
|
||||||
_, err := ks.etcdClient.Delete(skymsg.Path(record), true)
|
glog.V(2).Infof("Removing %s from DNS", subdomain)
|
||||||
|
resp, err := ks.etcdClient.RawGet(skymsg.Path(subdomain), false, false)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
|
glog.V(2).Infof("Subdomain %q does not exist in etcd", subdomain)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
_, err = ks.etcdClient.Delete(skymsg.Path(subdomain), true)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ks *kube2sky) addDNS(record string, service *kapi.Service) error {
|
func (ks *kube2sky) writeSkyRecord(subdomain string, data string) error {
|
||||||
// if PortalIP is not set, a DNS entry should not be created
|
// Set with no TTL, and hope that kubernetes events are accurate.
|
||||||
if !kapi.IsServiceIPSet(service) {
|
_, err := ks.etcdClient.Set(skymsg.Path(subdomain), data, uint64(0))
|
||||||
glog.V(1).Infof("Skipping dns record for headless service: %s\n", service.Name)
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generates skydns records for a headless service.
|
||||||
|
func (ks *kube2sky) newHeadlessService(subdomain string, service *kapi.Service) error {
|
||||||
|
// Create an A record for every pod in the service.
|
||||||
|
// This record must be periodically updated.
|
||||||
|
// Format is as follows:
|
||||||
|
// For a service x, with pods a and b create DNS records,
|
||||||
|
// a.x.ns.domain. and, b.x.ns.domain.
|
||||||
|
// TODO: Handle multi-port services.
|
||||||
|
ks.mlock.Lock()
|
||||||
|
defer ks.mlock.Unlock()
|
||||||
|
key, err := kcache.MetaNamespaceKeyFunc(service)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
e, exists, err := ks.endpointsStore.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get endpoints object from endpoints store - %v", err)
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
glog.V(1).Infof("could not find endpoints for service %q in namespace %q. DNS records will be created once endpoints show up.", service.Name, service.Namespace)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if e, ok := e.(*kapi.Endpoints); ok {
|
||||||
|
return ks.generateRecordsForHeadlessService(subdomain, e, service)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
for i := range service.Spec.Ports {
|
func getSkyMsg(ip string, port int) *skymsg.Service {
|
||||||
svc := skymsg.Service{
|
return &skymsg.Service{
|
||||||
Host: service.Spec.PortalIP,
|
Host: ip,
|
||||||
Port: service.Spec.Ports[i].Port,
|
Port: port,
|
||||||
Priority: 10,
|
Priority: 10,
|
||||||
Weight: 10,
|
Weight: 10,
|
||||||
Ttl: 30,
|
Ttl: 30,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) generateRecordsForHeadlessService(subdomain string, e *kapi.Endpoints, svc *kapi.Service) error {
|
||||||
|
for idx := range e.Subsets {
|
||||||
|
for subIdx := range e.Subsets[idx].Addresses {
|
||||||
|
subdomain := buildDNSNameString(subdomain, fmt.Sprintf("%d%d", idx, subIdx))
|
||||||
|
b, err := json.Marshal(getSkyMsg(e.Subsets[idx].Addresses[subIdx].IP, svc.Spec.Ports[0].Port))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b))
|
||||||
|
if err := ks.writeSkyRecord(subdomain, string(b)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
b, err := json.Marshal(svc)
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) getServiceFromEndpoints(e *kapi.Endpoints) (*kapi.Service, error) {
|
||||||
|
key, err := kcache.MetaNamespaceKeyFunc(e)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
obj, exists, err := ks.servicesStore.GetByKey(key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get service object from services store - %v", err)
|
||||||
|
}
|
||||||
|
if !exists {
|
||||||
|
glog.V(1).Infof("could not find service for endpoint %q in namespace %q", e.Name, e.Namespace)
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
if svc, ok := obj.(*kapi.Service); ok {
|
||||||
|
return svc, nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("got a non service object in services store %v", obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) addDNSUsingEndpoints(subdomain string, e *kapi.Endpoints) error {
|
||||||
|
ks.mlock.Lock()
|
||||||
|
defer ks.mlock.Unlock()
|
||||||
|
svc, err := ks.getServiceFromEndpoints(e)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if svc == nil || kapi.IsServiceIPSet(svc) {
|
||||||
|
// No headless service found corresponding to endpoints object.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Remove existing DNS entry.
|
||||||
|
if err := ks.removeDNS(subdomain); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return ks.generateRecordsForHeadlessService(subdomain, e, svc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) handleEndpointAdd(obj interface{}) {
|
||||||
|
if e, ok := obj.(*kapi.Endpoints); ok {
|
||||||
|
name := buildDNSNameString(ks.domain, e.Namespace, e.Name)
|
||||||
|
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) })
|
||||||
|
name = buildDNSNameString(ks.domain, serviceSubdomain, e.Namespace, e.Name)
|
||||||
|
ks.mutateEtcdOrDie(func() error { return ks.addDNSUsingEndpoints(name, e) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) generateRecordsForPortalService(subdomain string, service *kapi.Service) error {
|
||||||
|
for i := range service.Spec.Ports {
|
||||||
|
b, err := json.Marshal(getSkyMsg(service.Spec.PortalIP, service.Spec.Ports[i].Port))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Set with no TTL, and hope that kubernetes events are accurate.
|
glog.V(2).Infof("Setting DNS record: %v -> %q\n", subdomain, string(b))
|
||||||
glog.V(2).Infof("Setting DNS record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Ports[i].Port)
|
if err := ks.writeSkyRecord(subdomain, string(b)); err != nil {
|
||||||
_, err = ks.etcdClient.Set(skymsg.Path(record), string(b), uint64(0))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) addDNS(subdomain string, service *kapi.Service) error {
|
||||||
|
if len(service.Spec.Ports) == 0 {
|
||||||
|
glog.Fatalf("unexpected service with no ports: %v", service)
|
||||||
|
}
|
||||||
|
// if PortalIP is not set, a DNS entry should not be created
|
||||||
|
if !kapi.IsServiceIPSet(service) {
|
||||||
|
return ks.newHeadlessService(subdomain, service)
|
||||||
|
}
|
||||||
|
return ks.generateRecordsForPortalService(subdomain, service)
|
||||||
|
}
|
||||||
|
|
||||||
// Implements retry logic for arbitrary mutator. Crashes after retrying for
|
// Implements retry logic for arbitrary mutator. Crashes after retrying for
|
||||||
// etcd_mutation_timeout.
|
// etcd_mutation_timeout.
|
||||||
func (ks *kube2sky) mutateEtcdOrDie(mutator func() error) {
|
func (ks *kube2sky) mutateEtcdOrDie(mutator func() error) {
|
||||||
@ -127,6 +256,47 @@ func (ks *kube2sky) mutateEtcdOrDie(mutator func() error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func buildDNSNameString(labels ...string) string {
|
||||||
|
var res string
|
||||||
|
for _, label := range labels {
|
||||||
|
if res == "" {
|
||||||
|
res = label
|
||||||
|
} else {
|
||||||
|
res = fmt.Sprintf("%s.%s", label, res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns a cache.ListWatch that gets all changes to services.
|
||||||
|
func createServiceLW(kubeClient *kclient.Client) *kcache.ListWatch {
|
||||||
|
return kcache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kSelector.Everything())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Returns a cache.ListWatch that gets all changes to endpoints.
|
||||||
|
func createEndpointsLW(kubeClient *kclient.Client) *kcache.ListWatch {
|
||||||
|
return kcache.NewListWatchFromClient(kubeClient, "endpoints", kapi.NamespaceAll, kSelector.Everything())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) newService(obj interface{}) {
|
||||||
|
if s, ok := obj.(*kapi.Service); ok {
|
||||||
|
//TODO(artfulcoder) stop adding and deleting old-format string for service
|
||||||
|
name := buildDNSNameString(ks.domain, s.Namespace, s.Name)
|
||||||
|
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
|
||||||
|
name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
|
||||||
|
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ks *kube2sky) removeService(obj interface{}) {
|
||||||
|
if s, ok := obj.(*kapi.Service); ok {
|
||||||
|
name := buildDNSNameString(ks.domain, s.Namespace, s.Name)
|
||||||
|
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
|
||||||
|
name = buildDNSNameString(ks.domain, serviceSubdomain, s.Namespace, s.Name)
|
||||||
|
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func newEtcdClient(etcdServer string) (*etcd.Client, error) {
|
func newEtcdClient(etcdServer string) (*etcd.Client, error) {
|
||||||
var (
|
var (
|
||||||
client *etcd.Client
|
client *etcd.Client
|
||||||
@ -204,61 +374,52 @@ func newKubeClient() (*kclient.Client, error) {
|
|||||||
return kclient.New(config)
|
return kclient.New(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildOldNameString(service, namespace, domain string) string {
|
func watchForServices(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
|
||||||
return fmt.Sprintf("%s.%s.%s.", service, namespace, domain)
|
serviceStore, serviceController := kframework.NewInformer(
|
||||||
}
|
|
||||||
|
|
||||||
func buildNewServiceNameString(service, namespace, domain string) string {
|
|
||||||
return fmt.Sprintf("%s.%s.svc.%s.", service, namespace, domain)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Returns a cache.ListWatch that gets all changes to services.
|
|
||||||
func createServiceLW(kubeClient *kclient.Client) *cache.ListWatch {
|
|
||||||
return cache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kSelector.Everything())
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ks *kube2sky) newService(obj interface{}) {
|
|
||||||
if s, ok := obj.(*kapi.Service); ok {
|
|
||||||
//TODO(artfulcoder) stop adding and deleting old-format string for service
|
|
||||||
name := buildOldNameString(s.Name, s.Namespace, ks.domain)
|
|
||||||
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
|
|
||||||
name1 := buildNewServiceNameString(s.Name, s.Namespace, ks.domain)
|
|
||||||
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name1, s) })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ks *kube2sky) removeService(obj interface{}) {
|
|
||||||
if s, ok := obj.(*kapi.Service); ok {
|
|
||||||
name := buildOldNameString(s.Name, s.Namespace, ks.domain)
|
|
||||||
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
|
|
||||||
name1 := buildNewServiceNameString(s.Name, s.Namespace, ks.domain)
|
|
||||||
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name1) })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func watchForServices(kubeClient *kclient.Client, ks *kube2sky) {
|
|
||||||
var serviceController *kcontrollerFramework.Controller
|
|
||||||
_, serviceController = framework.NewInformer(
|
|
||||||
createServiceLW(kubeClient),
|
createServiceLW(kubeClient),
|
||||||
&kapi.Service{},
|
&kapi.Service{},
|
||||||
resyncPeriod,
|
resyncPeriod,
|
||||||
framework.ResourceEventHandlerFuncs{
|
kframework.ResourceEventHandlerFuncs{
|
||||||
AddFunc: ks.newService,
|
AddFunc: ks.newService,
|
||||||
DeleteFunc: ks.removeService,
|
DeleteFunc: ks.removeService,
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
|
// TODO: Avoid unwanted updates.
|
||||||
ks.newService(newObj)
|
ks.newService(newObj)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
serviceController.Run(util.NeverStop)
|
go serviceController.Run(util.NeverStop)
|
||||||
|
return serviceStore
|
||||||
|
}
|
||||||
|
|
||||||
|
func watchEndpoints(kubeClient *kclient.Client, ks *kube2sky) kcache.Store {
|
||||||
|
eStore, eController := kframework.NewInformer(
|
||||||
|
createEndpointsLW(kubeClient),
|
||||||
|
&kapi.Endpoints{},
|
||||||
|
resyncPeriod,
|
||||||
|
kframework.ResourceEventHandlerFuncs{
|
||||||
|
AddFunc: ks.handleEndpointAdd,
|
||||||
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
|
// TODO: Avoid unwanted updates.
|
||||||
|
ks.handleEndpointAdd(newObj)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
go eController.Run(util.NeverStop)
|
||||||
|
return eStore
|
||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
var err error
|
var err error
|
||||||
// TODO: Validate input flags.
|
// TODO: Validate input flags.
|
||||||
|
domain := *argDomain
|
||||||
|
if !strings.HasSuffix(domain, ".") {
|
||||||
|
domain = fmt.Sprintf("%s.", domain)
|
||||||
|
}
|
||||||
ks := kube2sky{
|
ks := kube2sky{
|
||||||
domain: *argDomain,
|
domain: domain,
|
||||||
etcdMutationTimeout: *argEtcdMutationTimeout,
|
etcdMutationTimeout: *argEtcdMutationTimeout,
|
||||||
}
|
}
|
||||||
if ks.etcdClient, err = newEtcdClient(*argEtcdServer); err != nil {
|
if ks.etcdClient, err = newEtcdClient(*argEtcdServer); err != nil {
|
||||||
@ -270,5 +431,8 @@ func main() {
|
|||||||
glog.Fatalf("Failed to create a kubernetes client: %v", err)
|
glog.Fatalf("Failed to create a kubernetes client: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
watchForServices(kubeClient, &ks)
|
ks.endpointsStore = watchEndpoints(kubeClient, &ks)
|
||||||
|
ks.servicesStore = watchForServices(kubeClient, &ks)
|
||||||
|
|
||||||
|
select {}
|
||||||
}
|
}
|
||||||
|
@ -18,12 +18,14 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"net/http"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||||
"github.com/coreos/go-etcd/etcd"
|
"github.com/coreos/go-etcd/etcd"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
@ -34,22 +36,35 @@ type fakeEtcdClient struct {
|
|||||||
writes map[string]string
|
writes map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ec *fakeEtcdClient) Set(path, value string, ttl uint64) (*etcd.Response, error) {
|
func (ec *fakeEtcdClient) Set(key, value string, ttl uint64) (*etcd.Response, error) {
|
||||||
ec.writes[path] = value
|
ec.writes[key] = value
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ec *fakeEtcdClient) Delete(path string, recursive bool) (*etcd.Response, error) {
|
func (ec *fakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, error) {
|
||||||
for p := range ec.writes {
|
for p := range ec.writes {
|
||||||
if (recursive && strings.HasPrefix(p, path)) || (!recursive && p == path) {
|
if (recursive && strings.HasPrefix(p, key)) || (!recursive && p == key) {
|
||||||
delete(ec.writes, p)
|
delete(ec.writes, p)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ec *fakeEtcdClient) RawGet(key string, sort, recursive bool) (*etcd.RawResponse, error) {
|
||||||
|
count := 0
|
||||||
|
for path := range ec.writes {
|
||||||
|
if strings.HasPrefix(path, key) {
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if count == 0 {
|
||||||
|
return &etcd.RawResponse{StatusCode: http.StatusNotFound}, nil
|
||||||
|
}
|
||||||
|
return &etcd.RawResponse{StatusCode: http.StatusOK}, nil
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
testDomain = "cluster.local"
|
testDomain = "cluster.local."
|
||||||
basePath = "/skydns/local/cluster"
|
basePath = "/skydns/local/cluster"
|
||||||
serviceSubDomain = "svc"
|
serviceSubDomain = "svc"
|
||||||
)
|
)
|
||||||
@ -59,6 +74,8 @@ func newKube2Sky(ec etcdClient) *kube2sky {
|
|||||||
etcdClient: ec,
|
etcdClient: ec,
|
||||||
domain: testDomain,
|
domain: testDomain,
|
||||||
etcdMutationTimeout: time.Second,
|
etcdMutationTimeout: time.Second,
|
||||||
|
endpointsStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||||
|
servicesStore: cache.NewStore(cache.MetaNamespaceKeyFunc),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -113,14 +130,166 @@ func TestHeadlessService(t *testing.T) {
|
|||||||
k2s := newKube2Sky(ec)
|
k2s := newKube2Sky(ec)
|
||||||
service := kapi.Service{
|
service := kapi.Service{
|
||||||
ObjectMeta: kapi.ObjectMeta{
|
ObjectMeta: kapi.ObjectMeta{
|
||||||
Name: testNamespace,
|
Name: testService,
|
||||||
Namespace: testNamespace,
|
Namespace: testNamespace,
|
||||||
},
|
},
|
||||||
|
Spec: kapi.ServiceSpec{
|
||||||
|
PortalIP: "None",
|
||||||
|
Ports: []kapi.ServicePort{
|
||||||
|
{Port: 80},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
assert.NoError(t, k2s.servicesStore.Add(&service))
|
||||||
|
endpoints := kapi.Endpoints{
|
||||||
|
ObjectMeta: service.ObjectMeta,
|
||||||
|
Subsets: []kapi.EndpointSubset{
|
||||||
|
{
|
||||||
|
Addresses: []kapi.EndpointAddress{
|
||||||
|
{IP: "10.0.0.1"},
|
||||||
|
{IP: "10.0.0.2"},
|
||||||
|
},
|
||||||
|
Ports: []kapi.EndpointPort{
|
||||||
|
{Port: 80},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Addresses: []kapi.EndpointAddress{
|
||||||
|
{IP: "10.0.0.3"},
|
||||||
|
{IP: "10.0.0.4"},
|
||||||
|
},
|
||||||
|
Ports: []kapi.EndpointPort{
|
||||||
|
{Port: 8080},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// We expect 4 records with "svc" subdomain and 4 records without
|
||||||
|
// "svc" subdomain.
|
||||||
|
expectedDNSRecords := 8
|
||||||
|
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
|
||||||
k2s.newService(&service)
|
k2s.newService(&service)
|
||||||
|
assert.Equal(t, expectedDNSRecords, len(ec.writes))
|
||||||
|
k2s.removeService(&service)
|
||||||
assert.Empty(t, ec.writes)
|
assert.Empty(t, ec.writes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHeadlessServiceEndpointsUpdate(t *testing.T) {
|
||||||
|
const (
|
||||||
|
testService = "testService"
|
||||||
|
testNamespace = "default"
|
||||||
|
)
|
||||||
|
ec := &fakeEtcdClient{make(map[string]string)}
|
||||||
|
k2s := newKube2Sky(ec)
|
||||||
|
service := kapi.Service{
|
||||||
|
ObjectMeta: kapi.ObjectMeta{
|
||||||
|
Name: testService,
|
||||||
|
Namespace: testNamespace,
|
||||||
|
},
|
||||||
|
Spec: kapi.ServiceSpec{
|
||||||
|
PortalIP: "None",
|
||||||
|
Ports: []kapi.ServicePort{
|
||||||
|
{Port: 80},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.NoError(t, k2s.servicesStore.Add(&service))
|
||||||
|
endpoints := kapi.Endpoints{
|
||||||
|
ObjectMeta: service.ObjectMeta,
|
||||||
|
Subsets: []kapi.EndpointSubset{
|
||||||
|
{
|
||||||
|
Addresses: []kapi.EndpointAddress{
|
||||||
|
{IP: "10.0.0.1"},
|
||||||
|
{IP: "10.0.0.2"},
|
||||||
|
},
|
||||||
|
Ports: []kapi.EndpointPort{
|
||||||
|
{Port: 80},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
expectedDNSRecords := 4
|
||||||
|
assert.NoError(t, k2s.endpointsStore.Add(&endpoints))
|
||||||
|
k2s.newService(&service)
|
||||||
|
assert.Equal(t, expectedDNSRecords, len(ec.writes))
|
||||||
|
endpoints.Subsets = append(endpoints.Subsets,
|
||||||
|
kapi.EndpointSubset{
|
||||||
|
Addresses: []kapi.EndpointAddress{
|
||||||
|
{IP: "10.0.0.3"},
|
||||||
|
{IP: "10.0.0.4"},
|
||||||
|
},
|
||||||
|
Ports: []kapi.EndpointPort{
|
||||||
|
{Port: 8080},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
expectedDNSRecords = 8
|
||||||
|
k2s.handleEndpointAdd(&endpoints)
|
||||||
|
|
||||||
|
assert.Equal(t, expectedDNSRecords, len(ec.writes))
|
||||||
|
k2s.removeService(&service)
|
||||||
|
assert.Empty(t, ec.writes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHeadlessServiceWithDelayedEndpointsAddition(t *testing.T) {
|
||||||
|
const (
|
||||||
|
testService = "testService"
|
||||||
|
testNamespace = "default"
|
||||||
|
)
|
||||||
|
ec := &fakeEtcdClient{make(map[string]string)}
|
||||||
|
k2s := newKube2Sky(ec)
|
||||||
|
service := kapi.Service{
|
||||||
|
ObjectMeta: kapi.ObjectMeta{
|
||||||
|
Name: testService,
|
||||||
|
Namespace: testNamespace,
|
||||||
|
},
|
||||||
|
Spec: kapi.ServiceSpec{
|
||||||
|
PortalIP: "None",
|
||||||
|
Ports: []kapi.ServicePort{
|
||||||
|
{Port: 80},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.NoError(t, k2s.servicesStore.Add(&service))
|
||||||
|
// Headless service DNS records should not be created since
|
||||||
|
// corresponding endpoints object doesn't exist.
|
||||||
|
k2s.newService(&service)
|
||||||
|
assert.Empty(t, ec.writes)
|
||||||
|
|
||||||
|
// Add an endpoints object for the service.
|
||||||
|
endpoints := kapi.Endpoints{
|
||||||
|
ObjectMeta: service.ObjectMeta,
|
||||||
|
Subsets: []kapi.EndpointSubset{
|
||||||
|
{
|
||||||
|
Addresses: []kapi.EndpointAddress{
|
||||||
|
{IP: "10.0.0.1"},
|
||||||
|
{IP: "10.0.0.2"},
|
||||||
|
},
|
||||||
|
Ports: []kapi.EndpointPort{
|
||||||
|
{Port: 80},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Addresses: []kapi.EndpointAddress{
|
||||||
|
{IP: "10.0.0.3"},
|
||||||
|
{IP: "10.0.0.4"},
|
||||||
|
},
|
||||||
|
Ports: []kapi.EndpointPort{
|
||||||
|
{Port: 8080},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// We expect 4 records with "svc" subdomain and 4 records without
|
||||||
|
// "svc" subdomain.
|
||||||
|
expectedDNSRecords := 8
|
||||||
|
k2s.handleEndpointAdd(&endpoints)
|
||||||
|
assert.Equal(t, expectedDNSRecords, len(ec.writes))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Test service updates for headless services.
|
||||||
|
// TODO: Test headless service addition with delayed endpoints addition
|
||||||
|
|
||||||
func TestAddSinglePortService(t *testing.T) {
|
func TestAddSinglePortService(t *testing.T) {
|
||||||
const (
|
const (
|
||||||
testService = "testService"
|
testService = "testService"
|
||||||
@ -206,3 +375,10 @@ func TestDeleteSinglePortService(t *testing.T) {
|
|||||||
k2s.removeService(&service)
|
k2s.removeService(&service)
|
||||||
assert.Empty(t, ec.writes)
|
assert.Empty(t, ec.writes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestBuildDNSName(t *testing.T) {
|
||||||
|
expectedDNSName := "name.ns.svc.cluster.local."
|
||||||
|
assert.Equal(t, expectedDNSName, buildDNSNameString("local.", "cluster", "svc", "ns", "name"))
|
||||||
|
newExpectedDNSName := "00.name.ns.svc.cluster.local."
|
||||||
|
assert.Equal(t, newExpectedDNSName, buildDNSNameString(expectedDNSName, "00"))
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user