mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-11 21:12:07 +00:00
Merge pull request #8287 from vishh/kube2sky
Adding unit tests for existing kube2sky logic
This commit is contained in:
commit
d064e843ab
@ -2,7 +2,7 @@
|
|||||||
# MAINTAINER: Tim Hockin <thockin@google.com>
|
# MAINTAINER: Tim Hockin <thockin@google.com>
|
||||||
# If you update this image please bump the tag value before pushing.
|
# If you update this image please bump the tag value before pushing.
|
||||||
|
|
||||||
.PHONY: all kube2sky container push clean
|
.PHONY: all kube2sky container push clean test
|
||||||
|
|
||||||
TAG = 1.5
|
TAG = 1.5
|
||||||
PREFIX = gcr.io/google_containers
|
PREFIX = gcr.io/google_containers
|
||||||
@ -20,3 +20,6 @@ push:
|
|||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -f kube2sky
|
rm -f kube2sky
|
||||||
|
|
||||||
|
test: clean
|
||||||
|
godep go test -v --vmodule=*=4
|
||||||
|
@ -52,17 +52,20 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Maximum number of retries to connect to etcd server.
|
// Maximum number of attempts to connect to etcd server.
|
||||||
maxConnectRetries = 12
|
maxConnectAttempts = 12
|
||||||
// Resync period for the kube controller loop.
|
// Resync period for the kube controller loop.
|
||||||
resyncPeriod = 5 * time.Second
|
resyncPeriod = 5 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type etcdClient interface {
|
||||||
|
Set(path, value string, ttl uint64) (*etcd.Response, error)
|
||||||
|
Delete(path string, recursive bool) (*etcd.Response, error)
|
||||||
|
}
|
||||||
|
|
||||||
type kube2sky struct {
|
type kube2sky struct {
|
||||||
// Etcd client.
|
// Etcd client.
|
||||||
etcdClient *etcd.Client
|
etcdClient etcdClient
|
||||||
// Kubernetes client.
|
|
||||||
kubeClient *kclient.Client
|
|
||||||
// DNS domain name.
|
// DNS domain name.
|
||||||
domain string
|
domain string
|
||||||
// Etcd mutation timeout.
|
// Etcd mutation timeout.
|
||||||
@ -95,7 +98,6 @@ func (ks *kube2sky) addDNS(record string, service *kapi.Service) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// Set with no TTL, and hope that kubernetes events are accurate.
|
// Set with no TTL, and hope that kubernetes events are accurate.
|
||||||
|
|
||||||
glog.V(2).Infof("Setting DNS record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Ports[i].Port)
|
glog.V(2).Infof("Setting DNS record: %v -> %s:%d\n", record, service.Spec.PortalIP, service.Spec.Ports[i].Port)
|
||||||
_, err = ks.etcdClient.Set(skymsg.Path(record), string(b), uint64(0))
|
_, err = ks.etcdClient.Set(skymsg.Path(record), string(b), uint64(0))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -130,17 +132,15 @@ func newEtcdClient(etcdServer string) (*etcd.Client, error) {
|
|||||||
client *etcd.Client
|
client *etcd.Client
|
||||||
err error
|
err error
|
||||||
)
|
)
|
||||||
retries := maxConnectRetries
|
for attempt := 1; attempt <= maxConnectAttempts; attempt++ {
|
||||||
for retries > 0 {
|
|
||||||
if _, err = tools.GetEtcdVersion(etcdServer); err == nil {
|
if _, err = tools.GetEtcdVersion(etcdServer); err == nil {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
if maxConnectRetries == 1 {
|
if attempt == maxConnectAttempts {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
glog.Info("[Attempt: %d] Retrying request after 5 second sleep", retries)
|
glog.Infof("[Attempt: %d] Attempting access to etcd after 5 second sleep", attempt)
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
retries--
|
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to connect to etcd server: %v, error: %v", etcdServer, err)
|
return nil, fmt.Errorf("failed to connect to etcd server: %v, error: %v", etcdServer, err)
|
||||||
@ -204,33 +204,33 @@ func newKubeClient() (*kclient.Client, error) {
|
|||||||
return kclient.New(config)
|
return kclient.New(config)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ks *kube2sky) buildNameString(service, namespace, domain string) string {
|
func buildNameString(service, namespace, domain string) string {
|
||||||
return fmt.Sprintf("%s.%s.%s.", service, namespace, domain)
|
return fmt.Sprintf("%s.%s.%s.", service, namespace, domain)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns a cache.ListWatch that gets all changes to services.
|
// Returns a cache.ListWatch that gets all changes to services.
|
||||||
func (ks *kube2sky) createServiceLW() *cache.ListWatch {
|
func createServiceLW(kubeClient *kclient.Client) *cache.ListWatch {
|
||||||
return cache.NewListWatchFromClient(ks.kubeClient, "services", kapi.NamespaceAll, kSelector.Everything())
|
return cache.NewListWatchFromClient(kubeClient, "services", kapi.NamespaceAll, kSelector.Everything())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ks *kube2sky) newService(obj interface{}) {
|
func (ks *kube2sky) newService(obj interface{}) {
|
||||||
if s, ok := obj.(*kapi.Service); ok {
|
if s, ok := obj.(*kapi.Service); ok {
|
||||||
name := ks.buildNameString(s.Name, s.Namespace, ks.domain)
|
name := buildNameString(s.Name, s.Namespace, ks.domain)
|
||||||
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
|
ks.mutateEtcdOrDie(func() error { return ks.addDNS(name, s) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ks *kube2sky) removeService(obj interface{}) {
|
func (ks *kube2sky) removeService(obj interface{}) {
|
||||||
if s, ok := obj.(*kapi.Service); ok {
|
if s, ok := obj.(*kapi.Service); ok {
|
||||||
name := ks.buildNameString(s.Name, s.Namespace, ks.domain)
|
name := buildNameString(s.Name, s.Namespace, ks.domain)
|
||||||
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
|
ks.mutateEtcdOrDie(func() error { return ks.removeDNS(name) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ks *kube2sky) watchForServices() {
|
func watchForServices(kubeClient *kclient.Client, ks *kube2sky) {
|
||||||
var serviceController *kcontrollerFramework.Controller
|
var serviceController *kcontrollerFramework.Controller
|
||||||
_, serviceController = framework.NewInformer(
|
_, serviceController = framework.NewInformer(
|
||||||
ks.createServiceLW(),
|
createServiceLW(kubeClient),
|
||||||
&kapi.Service{},
|
&kapi.Service{},
|
||||||
resyncPeriod,
|
resyncPeriod,
|
||||||
framework.ResourceEventHandlerFuncs{
|
framework.ResourceEventHandlerFuncs{
|
||||||
@ -256,9 +256,10 @@ func main() {
|
|||||||
glog.Fatalf("Failed to create etcd client - %v", err)
|
glog.Fatalf("Failed to create etcd client - %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ks.kubeClient, err = newKubeClient(); err != nil {
|
kubeClient, err := newKubeClient()
|
||||||
|
if err != nil {
|
||||||
glog.Fatalf("Failed to create a kubernetes client: %v", err)
|
glog.Fatalf("Failed to create a kubernetes client: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ks.watchForServices()
|
watchForServices(kubeClient, &ks)
|
||||||
}
|
}
|
||||||
|
195
cluster/addons/dns/kube2sky/kube2sky_test.go
Normal file
195
cluster/addons/dns/kube2sky/kube2sky_test.go
Normal file
@ -0,0 +1,195 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
kapi "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
|
"github.com/coreos/go-etcd/etcd"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeEtcdClient struct {
|
||||||
|
// TODO: Convert this to real fs to better simulate etcd behavior.
|
||||||
|
writes map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *fakeEtcdClient) Set(path, value string, ttl uint64) (*etcd.Response, error) {
|
||||||
|
ec.writes[path] = value
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ec *fakeEtcdClient) Delete(path string, recursive bool) (*etcd.Response, error) {
|
||||||
|
for p := range ec.writes {
|
||||||
|
if (recursive && strings.HasPrefix(p, path)) || (!recursive && p == path) {
|
||||||
|
delete(ec.writes, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
testDomain = "cluster.local"
|
||||||
|
basePath = "/skydns/local/cluster"
|
||||||
|
)
|
||||||
|
|
||||||
|
func newKube2Sky(ec etcdClient) *kube2sky {
|
||||||
|
return &kube2sky{
|
||||||
|
etcdClient: ec,
|
||||||
|
domain: testDomain,
|
||||||
|
etcdMutationTimeout: time.Second,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddNoServiceIP(t *testing.T) {
|
||||||
|
const (
|
||||||
|
testService = "testService"
|
||||||
|
testNamespace = "default"
|
||||||
|
)
|
||||||
|
ec := &fakeEtcdClient{make(map[string]string)}
|
||||||
|
k2s := newKube2Sky(ec)
|
||||||
|
service := kapi.Service{
|
||||||
|
ObjectMeta: kapi.ObjectMeta{
|
||||||
|
Name: testNamespace,
|
||||||
|
Namespace: testNamespace,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
k2s.newService(&service)
|
||||||
|
assert.Empty(t, ec.writes)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getEtcdPath(name, namespace string) string {
|
||||||
|
return path.Join(basePath, namespace, name)
|
||||||
|
}
|
||||||
|
|
||||||
|
type hostPort struct {
|
||||||
|
Host string `json:"host"`
|
||||||
|
Port int `json:"port"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func getHostPort(service *kapi.Service) *hostPort {
|
||||||
|
return &hostPort{
|
||||||
|
Host: service.Spec.PortalIP,
|
||||||
|
Port: service.Spec.Ports[0].Port,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getHostPortFromString(data string) (*hostPort, error) {
|
||||||
|
var res hostPort
|
||||||
|
err := json.Unmarshal([]byte(data), &res)
|
||||||
|
return &res, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestAddSinglePortService(t *testing.T) {
|
||||||
|
const (
|
||||||
|
testService = "testService"
|
||||||
|
testNamespace = "default"
|
||||||
|
)
|
||||||
|
ec := &fakeEtcdClient{make(map[string]string)}
|
||||||
|
k2s := newKube2Sky(ec)
|
||||||
|
service := kapi.Service{
|
||||||
|
ObjectMeta: kapi.ObjectMeta{
|
||||||
|
Name: testService,
|
||||||
|
Namespace: testNamespace,
|
||||||
|
},
|
||||||
|
Spec: kapi.ServiceSpec{
|
||||||
|
Ports: []kapi.ServicePort{
|
||||||
|
{
|
||||||
|
Port: 80,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
PortalIP: "1.2.3.4",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
k2s.newService(&service)
|
||||||
|
expectedKey := getEtcdPath(testService, testNamespace)
|
||||||
|
expectedValue := getHostPort(&service)
|
||||||
|
val, exists := ec.writes[expectedKey]
|
||||||
|
require.True(t, exists)
|
||||||
|
actualValue, err := getHostPortFromString(val)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, actualValue, expectedValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestUpdateSinglePortService(t *testing.T) {
|
||||||
|
const (
|
||||||
|
testService = "testService"
|
||||||
|
testNamespace = "default"
|
||||||
|
)
|
||||||
|
ec := &fakeEtcdClient{make(map[string]string)}
|
||||||
|
k2s := newKube2Sky(ec)
|
||||||
|
service := kapi.Service{
|
||||||
|
ObjectMeta: kapi.ObjectMeta{
|
||||||
|
Name: testService,
|
||||||
|
Namespace: testNamespace,
|
||||||
|
},
|
||||||
|
Spec: kapi.ServiceSpec{
|
||||||
|
Ports: []kapi.ServicePort{
|
||||||
|
{
|
||||||
|
Port: 80,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
PortalIP: "1.2.3.4",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
k2s.newService(&service)
|
||||||
|
assert.Len(t, ec.writes, 1)
|
||||||
|
service.Spec.PortalIP = "0.0.0.0"
|
||||||
|
k2s.newService(&service)
|
||||||
|
expectedKey := getEtcdPath(testService, testNamespace)
|
||||||
|
expectedValue := getHostPort(&service)
|
||||||
|
val, exists := ec.writes[expectedKey]
|
||||||
|
require.True(t, exists)
|
||||||
|
actualValue, err := getHostPortFromString(val)
|
||||||
|
require.NoError(t, err)
|
||||||
|
assert.Equal(t, actualValue, expectedValue)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteSinglePortService(t *testing.T) {
|
||||||
|
const (
|
||||||
|
testService = "testService"
|
||||||
|
testNamespace = "default"
|
||||||
|
)
|
||||||
|
ec := &fakeEtcdClient{make(map[string]string)}
|
||||||
|
k2s := newKube2Sky(ec)
|
||||||
|
service := kapi.Service{
|
||||||
|
ObjectMeta: kapi.ObjectMeta{
|
||||||
|
Name: testService,
|
||||||
|
Namespace: testNamespace,
|
||||||
|
},
|
||||||
|
Spec: kapi.ServiceSpec{
|
||||||
|
Ports: []kapi.ServicePort{
|
||||||
|
{
|
||||||
|
Port: 80,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
PortalIP: "1.2.3.4",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// Add the service
|
||||||
|
k2s.newService(&service)
|
||||||
|
assert.Len(t, ec.writes, 1)
|
||||||
|
// Delete the service
|
||||||
|
k2s.removeService(&service)
|
||||||
|
assert.Empty(t, ec.writes)
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user