diff --git a/cluster/addons/dns/kube2sky/Makefile b/cluster/addons/dns/kube2sky/Makefile index 3943920487f..23474bc71c8 100644 --- a/cluster/addons/dns/kube2sky/Makefile +++ b/cluster/addons/dns/kube2sky/Makefile @@ -2,7 +2,7 @@ # MAINTAINER: Tim Hockin # If you update this image please bump the tag value before pushing. -.PHONY: all kube2sky container push clean +.PHONY: all kube2sky container push clean test TAG = 1.5 PREFIX = gcr.io/google_containers @@ -20,3 +20,6 @@ push: clean: rm -f kube2sky + +test: clean + godep go test -v --vmodule=*=4 diff --git a/cluster/addons/dns/kube2sky/kube2sky.go b/cluster/addons/dns/kube2sky/kube2sky.go index 531055c9e8e..784986d517f 100644 --- a/cluster/addons/dns/kube2sky/kube2sky.go +++ b/cluster/addons/dns/kube2sky/kube2sky.go @@ -52,17 +52,20 @@ var ( ) const ( - // Maximum number of retries to connect to etcd server. - maxConnectRetries = 12 + // Maximum number of attempts to connect to etcd server. + maxConnectAttempts = 12 // Resync period for the kube controller loop. resyncPeriod = 5 * time.Second ) +type etcdClient interface { + Set(path, value string, ttl uint64) (*etcd.Response, error) + Delete(path string, recursive bool) (*etcd.Response, error) +} + type kube2sky struct { // Etcd client. - etcdClient *etcd.Client - // Kubernetes client. - kubeClient *kclient.Client + etcdClient etcdClient // DNS domain name. domain string // Etcd mutation timeout. @@ -95,7 +98,6 @@ func (ks *kube2sky) addDNS(record string, service *kapi.Service) error { return err } // Set with no TTL, and hope that kubernetes events are accurate. - glog.V(2).Infof("Setting DNS record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Ports[i].Port) _, err = ks.etcdClient.Set(skymsg.Path(record), string(b), uint64(0)) if err != nil { @@ -130,17 +132,15 @@ func newEtcdClient(etcdServer string) (*etcd.Client, error) { client *etcd.Client err error ) - retries := maxConnectRetries - for retries > 0 { + for attempt := 1; attempt <= maxConnectAttempts; attempt++ { if _, err = tools.GetEtcdVersion(etcdServer); err == nil { break } - if maxConnectRetries == 1 { + if attempt == maxConnectAttempts { break } - glog.Info("[Attempt: %d] Retrying request after 5 second sleep", retries) + glog.Infof("[Attempt: %d] Attempting access to etcd after 5 second sleep", attempt) time.Sleep(5 * time.Second) - retries-- } if err != nil { return nil, fmt.Errorf("failed to connect to etcd server: %v, error: %v", etcdServer, err) @@ -204,33 +204,33 @@ func newKubeClient() (*kclient.Client, error) { return kclient.New(config) } -func (ks *kube2sky) buildNameString(service, namespace, domain string) string { +func buildNameString(service, namespace, domain string) string { return fmt.Sprintf("%s.%s.%s.", service, namespace, domain) } // Returns a cache.ListWatch that gets all changes to services. -func (ks *kube2sky) createServiceLW() *cache.ListWatch { - return cache.NewListWatchFromClient(ks.kubeClient, "services", kapi.NamespaceAll, kSelector.Everything()) +func createServiceLW(kubeClient *kclient.Client) *cache.ListWatch { + return cache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kSelector.Everything()) } func (ks *kube2sky) newService(obj interface{}) { if s, ok := obj.(*kapi.Service); ok { - name := ks.buildNameString(s.Name, s.Namespace, ks.domain) + name := buildNameString(s.Name, s.Namespace, ks.domain) ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) }) } } func (ks *kube2sky) removeService(obj interface{}) { if s, ok := obj.(*kapi.Service); ok { - name := ks.buildNameString(s.Name, s.Namespace, ks.domain) + name := buildNameString(s.Name, s.Namespace, ks.domain) ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) }) } } -func (ks *kube2sky) watchForServices() { +func watchForServices(kubeClient *kclient.Client, ks *kube2sky) { var serviceController *kcontrollerFramework.Controller _, serviceController = framework.NewInformer( - ks.createServiceLW(), + createServiceLW(kubeClient), &kapi.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{ @@ -256,9 +256,10 @@ func main() { glog.Fatalf("Failed to create etcd client - %v", err) } - if ks.kubeClient, err = newKubeClient(); err != nil { + kubeClient, err := newKubeClient() + if err != nil { glog.Fatalf("Failed to create a kubernetes client: %v", err) } - ks.watchForServices() + watchForServices(kubeClient, &ks) } diff --git a/cluster/addons/dns/kube2sky/kube2sky_test.go b/cluster/addons/dns/kube2sky/kube2sky_test.go new file mode 100644 index 00000000000..bfcd5668cd0 --- /dev/null +++ b/cluster/addons/dns/kube2sky/kube2sky_test.go @@ -0,0 +1,195 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "encoding/json" + "path" + "strings" + "testing" + "time" + + kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/coreos/go-etcd/etcd" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type fakeEtcdClient struct { + // TODO: Convert this to real fs to better simulate etcd behavior. + writes map[string]string +} + +func (ec *fakeEtcdClient) Set(path, value string, ttl uint64) (*etcd.Response, error) { + ec.writes[path] = value + return nil, nil +} + +func (ec *fakeEtcdClient) Delete(path string, recursive bool) (*etcd.Response, error) { + for p := range ec.writes { + if (recursive && strings.HasPrefix(p, path)) || (!recursive && p == path) { + delete(ec.writes, p) + } + } + return nil, nil +} + +const ( + testDomain = "cluster.local" + basePath = "/skydns/local/cluster" +) + +func newKube2Sky(ec etcdClient) *kube2sky { + return &kube2sky{ + etcdClient: ec, + domain: testDomain, + etcdMutationTimeout: time.Second, + } +} + +func TestAddNoServiceIP(t *testing.T) { + const ( + testService = "testService" + testNamespace = "default" + ) + ec := &fakeEtcdClient{make(map[string]string)} + k2s := newKube2Sky(ec) + service := kapi.Service{ + ObjectMeta: kapi.ObjectMeta{ + Name: testNamespace, + Namespace: testNamespace, + }, + } + k2s.newService(&service) + assert.Empty(t, ec.writes) +} + +func getEtcdPath(name, namespace string) string { + return path.Join(basePath, namespace, name) +} + +type hostPort struct { + Host string `json:"host"` + Port int `json:"port"` +} + +func getHostPort(service *kapi.Service) *hostPort { + return &hostPort{ + Host: service.Spec.PortalIP, + Port: service.Spec.Ports[0].Port, + } +} + +func getHostPortFromString(data string) (*hostPort, error) { + var res hostPort + err := json.Unmarshal([]byte(data), &res) + return &res, err +} + +func TestAddSinglePortService(t *testing.T) { + const ( + testService = "testService" + testNamespace = "default" + ) + ec := &fakeEtcdClient{make(map[string]string)} + k2s := newKube2Sky(ec) + service := kapi.Service{ + ObjectMeta: kapi.ObjectMeta{ + Name: testService, + Namespace: testNamespace, + }, + Spec: kapi.ServiceSpec{ + Ports: []kapi.ServicePort{ + { + Port: 80, + }, + }, + PortalIP: "1.2.3.4", + }, + } + k2s.newService(&service) + expectedKey := getEtcdPath(testService, testNamespace) + expectedValue := getHostPort(&service) + val, exists := ec.writes[expectedKey] + require.True(t, exists) + actualValue, err := getHostPortFromString(val) + require.NoError(t, err) + assert.Equal(t, actualValue, expectedValue) +} + +func TestUpdateSinglePortService(t *testing.T) { + const ( + testService = "testService" + testNamespace = "default" + ) + ec := &fakeEtcdClient{make(map[string]string)} + k2s := newKube2Sky(ec) + service := kapi.Service{ + ObjectMeta: kapi.ObjectMeta{ + Name: testService, + Namespace: testNamespace, + }, + Spec: kapi.ServiceSpec{ + Ports: []kapi.ServicePort{ + { + Port: 80, + }, + }, + PortalIP: "1.2.3.4", + }, + } + k2s.newService(&service) + assert.Len(t, ec.writes, 1) + service.Spec.PortalIP = "0.0.0.0" + k2s.newService(&service) + expectedKey := getEtcdPath(testService, testNamespace) + expectedValue := getHostPort(&service) + val, exists := ec.writes[expectedKey] + require.True(t, exists) + actualValue, err := getHostPortFromString(val) + require.NoError(t, err) + assert.Equal(t, actualValue, expectedValue) +} + +func TestDeleteSinglePortService(t *testing.T) { + const ( + testService = "testService" + testNamespace = "default" + ) + ec := &fakeEtcdClient{make(map[string]string)} + k2s := newKube2Sky(ec) + service := kapi.Service{ + ObjectMeta: kapi.ObjectMeta{ + Name: testService, + Namespace: testNamespace, + }, + Spec: kapi.ServiceSpec{ + Ports: []kapi.ServicePort{ + { + Port: 80, + }, + }, + PortalIP: "1.2.3.4", + }, + } + // Add the service + k2s.newService(&service) + assert.Len(t, ec.writes, 1) + // Delete the service + k2s.removeService(&service) + assert.Empty(t, ec.writes) +}