Extract non-storage operations from service etcd

This commit is contained in:
Wojciech Tyczynski 2015-08-06 10:46:45 +02:00
parent e034712456
commit d11ab96446
5 changed files with 28 additions and 84 deletions

View File

@ -448,10 +448,7 @@ func (m *Master) init(c *Config) {
nodeStorage, nodeStatusStorage := nodeetcd.NewStorage(c.DatabaseStorage, c.KubeletClient) nodeStorage, nodeStatusStorage := nodeetcd.NewStorage(c.DatabaseStorage, c.KubeletClient)
m.nodeRegistry = minion.NewRegistry(nodeStorage) m.nodeRegistry = minion.NewRegistry(nodeStorage)
m.serviceRegistry = etcd.NewRegistry(c.DatabaseStorage)
// TODO: split me up into distinct storage registries
registry := etcd.NewRegistry(c.DatabaseStorage, m.endpointRegistry)
m.serviceRegistry = registry
var serviceClusterIPRegistry service.RangeRegistry var serviceClusterIPRegistry service.RangeRegistry
serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface {

View File

@ -20,11 +20,9 @@ import (
"fmt" "fmt"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
etcderr "k8s.io/kubernetes/pkg/api/errors/etcd" etcderr "k8s.io/kubernetes/pkg/api/errors/etcd"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/endpoint"
"k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/storage"
"k8s.io/kubernetes/pkg/watch" "k8s.io/kubernetes/pkg/watch"
) )
@ -44,14 +42,12 @@ const (
// MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd. // MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd.
type Registry struct { type Registry struct {
storage.Interface storage.Interface
endpoints endpoint.Registry
} }
// NewRegistry creates an etcd registry. // NewRegistry creates an etcd registry.
func NewRegistry(storage storage.Interface, endpoints endpoint.Registry) *Registry { func NewRegistry(storage storage.Interface) *Registry {
registry := &Registry{ registry := &Registry{
Interface: storage, Interface: storage,
endpoints: endpoints,
} }
return registry return registry
} }
@ -132,13 +128,6 @@ func (r *Registry) DeleteService(ctx api.Context, name string) error {
if err != nil { if err != nil {
return etcderr.InterpretDeleteError(err, "service", name) return etcderr.InterpretDeleteError(err, "service", name)
} }
// TODO: can leave dangling endpoints, and potentially return incorrect
// endpoints if a new service is created with the same name
err = r.endpoints.DeleteEndpoints(ctx, name)
if err != nil && !errors.IsNotFound(err) {
return err
}
return nil return nil
} }

View File

@ -25,8 +25,6 @@ import (
"k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/api/latest"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/registry/endpoint"
endpointetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd"
etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd" etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd"
@ -38,14 +36,13 @@ import (
func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { func NewTestEtcdRegistry(client tools.EtcdClient) *Registry {
storage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) storage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix())
registry := NewRegistry(storage, nil) registry := NewRegistry(storage)
return registry return registry
} }
func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry {
etcdStorage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) etcdStorage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix())
endpointStorage := endpointetcd.NewStorage(etcdStorage) registry := NewRegistry(etcdStorage)
registry := NewRegistry(etcdStorage, endpoint.NewRegistry(endpointStorage))
return registry return registry
} }
@ -236,15 +233,12 @@ func TestEtcdDeleteService(t *testing.T) {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if len(fakeClient.DeletedKeys) != 2 { if len(fakeClient.DeletedKeys) != 1 {
t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys) t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys)
} }
if fakeClient.DeletedKeys[0] != key { if fakeClient.DeletedKeys[0] != key {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key)
} }
if fakeClient.DeletedKeys[1] != endpointsKey {
t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], endpointsKey)
}
} }
func TestEtcdUpdateService(t *testing.T) { func TestEtcdUpdateService(t *testing.T) {
@ -341,64 +335,6 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) {
} }
} }
func TestEtcdWatchEndpoints(t *testing.T) {
ctx := api.NewDefaultContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistryWithPods(fakeClient)
watching, err := registry.endpoints.WatchEndpoints(
ctx,
labels.Everything(),
fields.SelectorFromSet(fields.Set{"name": "foo"}),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchEndpointsAcrossNamespaces(t *testing.T) {
ctx := api.NewContext()
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistryWithPods(fakeClient)
watching, err := registry.endpoints.WatchEndpoints(
ctx,
labels.Everything(),
fields.Everything(),
"1",
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
// TODO We need a test for the compare and swap behavior. This basically requires two things: // TODO We need a test for the compare and swap behavior. This basically requires two things:
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that // 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
// channel, this will enable us to orchestrate the flow of etcd requests in the test. // channel, this will enable us to orchestrate the flow of etcd requests in the test.

View File

@ -93,5 +93,20 @@ func (e *EndpointRegistry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpo
} }
func (e *EndpointRegistry) DeleteEndpoints(ctx api.Context, name string) error { func (e *EndpointRegistry) DeleteEndpoints(ctx api.Context, name string) error {
return fmt.Errorf("unimplemented!") // TODO: support namespaces in this mock
e.lock.Lock()
defer e.lock.Unlock()
if e.Err != nil {
return e.Err
}
if e.Endpoints != nil {
var newList []api.Endpoints
for _, endpoint := range e.Endpoints.Items {
if endpoint.Name != name {
newList = append(newList, endpoint)
}
}
e.Endpoints.Items = newList
}
return nil
} }

View File

@ -144,6 +144,13 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
return nil, err return nil, err
} }
// TODO: can leave dangling endpoints, and potentially return incorrect
// endpoints if a new service is created with the same name
err = rs.endpoints.DeleteEndpoints(ctx, id)
if err != nil && !errors.IsNotFound(err) {
return nil, err
}
if api.IsServiceIPSet(service) { if api.IsServiceIPSet(service) {
rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP)) rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP))
} }