mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 13:37:30 +00:00
Merge pull request #7821 from lavalamp/deleteEndpoints
Add old endpoint cleanup function
This commit is contained in:
commit
589154a557
@ -132,6 +132,11 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
|
|||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
go util.Until(e.worker, time.Second, stopCh)
|
go util.Until(e.worker, time.Second, stopCh)
|
||||||
}
|
}
|
||||||
|
go func() {
|
||||||
|
defer util.HandleCrash()
|
||||||
|
time.Sleep(5 * time.Minute) // give time for our cache to fill
|
||||||
|
e.checkLeftoverEndpoints()
|
||||||
|
}()
|
||||||
<-stopCh
|
<-stopCh
|
||||||
e.queue.ShutDown()
|
e.queue.ShutDown()
|
||||||
}
|
}
|
||||||
@ -370,6 +375,29 @@ func (e *EndpointController) syncService(key string) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checkLeftoverEndpoints lists all currently existing endpoints and adds their
|
||||||
|
// service to the queue. This will detect endpoints that exist with no
|
||||||
|
// corresponding service; these endpoints need to be deleted. We only need to
|
||||||
|
// do this once on startup, because in steady-state these are detected (but
|
||||||
|
// some stragglers could have been left behind if the endpoint controller
|
||||||
|
// reboots).
|
||||||
|
func (e *EndpointController) checkLeftoverEndpoints() {
|
||||||
|
list, err := e.client.Endpoints(api.NamespaceAll).List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Unable to list endpoints (%v); orphaned endpoints will not be cleaned up. (They're pretty harmless, but you can restart this component if you want another attempt made.)", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for i := range list.Items {
|
||||||
|
ep := &list.Items[i]
|
||||||
|
key, err := keyFunc(ep)
|
||||||
|
if err != nil {
|
||||||
|
glog.Errorf("Unable to get key for endpoint %#v", ep)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
e.queue.Add(key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func findDefaultPort(pod *api.Pod, servicePort int, proto api.Protocol) int {
|
func findDefaultPort(pod *api.Pod, servicePort int, proto api.Protocol) int {
|
||||||
for _, container := range pod.Spec.Containers {
|
for _, container := range pod.Spec.Containers {
|
||||||
for _, port := range container.Ports {
|
for _, port := range container.Ports {
|
||||||
|
@ -262,6 +262,41 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
|
|||||||
endpointsHandler.ValidateRequestCount(t, 0)
|
endpointsHandler.ValidateRequestCount(t, 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCheckLeftoverEndpoints(t *testing.T) {
|
||||||
|
ns := api.NamespaceDefault
|
||||||
|
// Note that this requests *all* endpoints, therefore the NamespaceAll
|
||||||
|
// below.
|
||||||
|
testServer, _ := makeTestServer(t, api.NamespaceAll,
|
||||||
|
serverResponse{http.StatusOK, &api.EndpointsList{
|
||||||
|
ListMeta: api.ListMeta{
|
||||||
|
ResourceVersion: "1",
|
||||||
|
},
|
||||||
|
Items: []api.Endpoints{{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "foo",
|
||||||
|
Namespace: ns,
|
||||||
|
ResourceVersion: "1",
|
||||||
|
},
|
||||||
|
Subsets: []api.EndpointSubset{{
|
||||||
|
Addresses: []api.EndpointAddress{{IP: "6.7.8.9"}},
|
||||||
|
Ports: []api.EndpointPort{{Port: 1000}},
|
||||||
|
}},
|
||||||
|
}},
|
||||||
|
}})
|
||||||
|
defer testServer.Close()
|
||||||
|
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
|
||||||
|
endpoints := NewEndpointController(client)
|
||||||
|
endpoints.checkLeftoverEndpoints()
|
||||||
|
|
||||||
|
if e, a := 1, endpoints.queue.Len(); e != a {
|
||||||
|
t.Fatalf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
got, _ := endpoints.queue.Get()
|
||||||
|
if e, a := ns+"/foo", got; e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
func TestSyncEndpointsProtocolTCP(t *testing.T) {
|
||||||
ns := "other"
|
ns := "other"
|
||||||
testServer, endpointsHandler := makeTestServer(t, ns,
|
testServer, endpointsHandler := makeTestServer(t, ns,
|
||||||
|
@ -85,6 +85,15 @@ func (q *Type) Add(item interface{}) {
|
|||||||
q.cond.Signal()
|
q.cond.Signal()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Len returns the current queue length, for informational purposes only. You
|
||||||
|
// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
|
||||||
|
// value, that can't be synchronized properly.
|
||||||
|
func (q *Type) Len() int {
|
||||||
|
q.cond.L.Lock()
|
||||||
|
defer q.cond.L.Unlock()
|
||||||
|
return len(q.queue)
|
||||||
|
}
|
||||||
|
|
||||||
// Get blocks until it can return an item to be processed. If shutdown = true,
|
// Get blocks until it can return an item to be processed. If shutdown = true,
|
||||||
// the caller should end their goroutine. You must call Done with item when you
|
// the caller should end their goroutine. You must call Done with item when you
|
||||||
// have finished processing it.
|
// have finished processing it.
|
||||||
|
@ -113,3 +113,19 @@ func TestAddWhileProcessing(t *testing.T) {
|
|||||||
q.ShutDown()
|
q.ShutDown()
|
||||||
consumerWG.Wait()
|
consumerWG.Wait()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestLen(t *testing.T) {
|
||||||
|
q := workqueue.New()
|
||||||
|
q.Add("foo")
|
||||||
|
if e, a := 1, q.Len(); e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
q.Add("bar")
|
||||||
|
if e, a := 2, q.Len(); e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
q.Add("foo") // should not increase the queue length.
|
||||||
|
if e, a := 2, q.Len(); e != a {
|
||||||
|
t.Errorf("Expected %v, got %v", e, a)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user