diff --git a/pkg/master/master.go b/pkg/master/master.go index 85bb7cdf8ef..f591cf903fe 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -448,10 +448,7 @@ func (m *Master) init(c *Config) { nodeStorage, nodeStatusStorage := nodeetcd.NewStorage(c.DatabaseStorage, c.KubeletClient) m.nodeRegistry = minion.NewRegistry(nodeStorage) - - // TODO: split me up into distinct storage registries - registry := etcd.NewRegistry(c.DatabaseStorage, m.endpointRegistry) - m.serviceRegistry = registry + m.serviceRegistry = etcd.NewRegistry(c.DatabaseStorage) var serviceClusterIPRegistry service.RangeRegistry serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 775557d12e7..b71308289fb 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -20,11 +20,9 @@ import ( "fmt" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" etcderr "k8s.io/kubernetes/pkg/api/errors/etcd" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/registry/endpoint" "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/watch" ) @@ -44,14 +42,12 @@ const ( // MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd. type Registry struct { storage.Interface - endpoints endpoint.Registry } // NewRegistry creates an etcd registry. -func NewRegistry(storage storage.Interface, endpoints endpoint.Registry) *Registry { +func NewRegistry(storage storage.Interface) *Registry { registry := &Registry{ Interface: storage, - endpoints: endpoints, } return registry } @@ -132,13 +128,6 @@ func (r *Registry) DeleteService(ctx api.Context, name string) error { if err != nil { return etcderr.InterpretDeleteError(err, "service", name) } - - // TODO: can leave dangling endpoints, and potentially return incorrect - // endpoints if a new service is created with the same name - err = r.endpoints.DeleteEndpoints(ctx, name) - if err != nil && !errors.IsNotFound(err) { - return err - } return nil } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 522d79955eb..972a3cb9ca9 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -25,8 +25,6 @@ import ( "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/registry/endpoint" - endpointetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd" etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd" "k8s.io/kubernetes/pkg/runtime" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" @@ -38,14 +36,13 @@ import ( func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { storage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) - registry := NewRegistry(storage, nil) + registry := NewRegistry(storage) return registry } func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { etcdStorage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) - endpointStorage := endpointetcd.NewStorage(etcdStorage) - registry := NewRegistry(etcdStorage, endpoint.NewRegistry(endpointStorage)) + registry := NewRegistry(etcdStorage) return registry } @@ -236,15 +233,12 @@ func TestEtcdDeleteService(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if len(fakeClient.DeletedKeys) != 2 { + if len(fakeClient.DeletedKeys) != 1 { t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys) } if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } - if fakeClient.DeletedKeys[1] != endpointsKey { - t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], endpointsKey) - } } func TestEtcdUpdateService(t *testing.T) { @@ -341,64 +335,6 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) { } } -func TestEtcdWatchEndpoints(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistryWithPods(fakeClient) - watching, err := registry.endpoints.WatchEndpoints( - ctx, - labels.Everything(), - fields.SelectorFromSet(fields.Set{"name": "foo"}), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - -func TestEtcdWatchEndpointsAcrossNamespaces(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistryWithPods(fakeClient) - watching, err := registry.endpoints.WatchEndpoints( - ctx, - labels.Everything(), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - // TODO We need a test for the compare and swap behavior. This basically requires two things: // 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that // channel, this will enable us to orchestrate the flow of etcd requests in the test. diff --git a/pkg/registry/registrytest/endpoint.go b/pkg/registry/registrytest/endpoint.go index 22855f8a2b3..a389caf27fa 100644 --- a/pkg/registry/registrytest/endpoint.go +++ b/pkg/registry/registrytest/endpoint.go @@ -93,5 +93,20 @@ func (e *EndpointRegistry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpo } func (e *EndpointRegistry) DeleteEndpoints(ctx api.Context, name string) error { - return fmt.Errorf("unimplemented!") + // TODO: support namespaces in this mock + e.lock.Lock() + defer e.lock.Unlock() + if e.Err != nil { + return e.Err + } + if e.Endpoints != nil { + var newList []api.Endpoints + for _, endpoint := range e.Endpoints.Items { + if endpoint.Name != name { + newList = append(newList, endpoint) + } + } + e.Endpoints.Items = newList + } + return nil } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 48ac7196111..6b010fb3e8f 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -144,6 +144,13 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { return nil, err } + // TODO: can leave dangling endpoints, and potentially return incorrect + // endpoints if a new service is created with the same name + err = rs.endpoints.DeleteEndpoints(ctx, id) + if err != nil && !errors.IsNotFound(err) { + return nil, err + } + if api.IsServiceIPSet(service) { rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP)) }