From b49dd0ad1e95e73e2930cb85f57d7fec99c9597e Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Fri, 24 Apr 2015 14:16:27 -0700 Subject: [PATCH] Add old endpoint cleanup function --- pkg/service/endpoints_controller.go | 28 +++++++++++++++++++ pkg/service/endpoints_controller_test.go | 35 ++++++++++++++++++++++++ pkg/util/workqueue/queue.go | 9 ++++++ pkg/util/workqueue/queue_test.go | 16 +++++++++++ 4 files changed, 88 insertions(+) diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index cc11f81ad56..857fe3edfb2 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -132,6 +132,11 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go util.Until(e.worker, time.Second, stopCh) } + go func() { + defer util.HandleCrash() + time.Sleep(5 * time.Minute) // give time for our cache to fill + e.checkLeftoverEndpoints() + }() <-stopCh e.queue.ShutDown() } @@ -370,6 +375,29 @@ func (e *EndpointController) syncService(key string) { } } +// checkLeftoverEndpoints lists all currently existing endpoints and adds their +// service to the queue. This will detect endpoints that exist with no +// corresponding service; these endpoints need to be deleted. We only need to +// do this once on startup, because in steady-state these are detected (but +// some stragglers could have been left behind if the endpoint controller +// reboots). +func (e *EndpointController) checkLeftoverEndpoints() { + list, err := e.client.Endpoints(api.NamespaceAll).List(labels.Everything()) + if err != nil { + glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err) + return + } + for i := range list.Items { + ep := &list.Items[i] + key, err := keyFunc(ep) + if err != nil { + glog.Errorf("Unable to get key for endpoint %#v", ep) + continue + } + e.queue.Add(key) + } +} + func findDefaultPort(pod *api.Pod, servicePort int, proto api.Protocol) int { for _, container := range pod.Spec.Containers { for _, port := range container.Ports { diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 7c63c95b837..d0d626fa788 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -262,6 +262,41 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { endpointsHandler.ValidateRequestCount(t, 0) } +func TestCheckLeftoverEndpoints(t *testing.T) { + ns := api.NamespaceDefault + // Note that this requests *all* endpoints, therefore the NamespaceAll + // below. + testServer, _ := makeTestServer(t, api.NamespaceAll, + serverResponse{http.StatusOK, &api.EndpointsList{ + ListMeta: api.ListMeta{ + ResourceVersion: "1", + }, + Items: []api.Endpoints{{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "1", + }, + Subsets: []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "6.7.8.9"}}, + Ports: []api.EndpointPort{{Port: 1000}}, + }}, + }}, + }}) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + endpoints := NewEndpointController(client) + endpoints.checkLeftoverEndpoints() + + if e, a := 1, endpoints.queue.Len(); e != a { + t.Fatalf("Expected %v, got %v", e, a) + } + got, _ := endpoints.queue.Get() + if e, a := ns+"/foo", got; e != a { + t.Errorf("Expected %v, got %v", e, a) + } +} + func TestSyncEndpointsProtocolTCP(t *testing.T) { ns := "other" testServer, endpointsHandler := makeTestServer(t, ns, diff --git a/pkg/util/workqueue/queue.go b/pkg/util/workqueue/queue.go index 52d8ba96236..4d5239040dc 100644 --- a/pkg/util/workqueue/queue.go +++ b/pkg/util/workqueue/queue.go @@ -85,6 +85,15 @@ func (q *Type) Add(item interface{}) { q.cond.Signal() } +// Len returns the current queue length, for informational purposes only. You +// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular +// value, that can't be synchronized properly. +func (q *Type) Len() int { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return len(q.queue) +} + // Get blocks until it can return an item to be processed. If shutdown = true, // the caller should end their goroutine. You must call Done with item when you // have finished processing it. diff --git a/pkg/util/workqueue/queue_test.go b/pkg/util/workqueue/queue_test.go index 74a607a1802..fe59d3ea46b 100644 --- a/pkg/util/workqueue/queue_test.go +++ b/pkg/util/workqueue/queue_test.go @@ -113,3 +113,19 @@ func TestAddWhileProcessing(t *testing.T) { q.ShutDown() consumerWG.Wait() } + +func TestLen(t *testing.T) { + q := workqueue.New() + q.Add("foo") + if e, a := 1, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + q.Add("bar") + if e, a := 2, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + q.Add("foo") // should not increase the queue length. + if e, a := 2, q.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } +}