diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index 610ec5b34fa..2c91d491911 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -110,6 +110,9 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme e.podLister = podInformer.Lister() e.podsSynced = podInformer.Informer().HasSynced + endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: e.onEndpointsDelete, + }) e.endpointsLister = endpointsInformer.Lister() e.endpointsSynced = endpointsInformer.Informer().HasSynced @@ -287,6 +290,15 @@ func (e *EndpointController) onServiceDelete(obj interface{}) { e.queue.Add(key) } +func (e *EndpointController) onEndpointsDelete(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err)) + return + } + e.queue.Add(key) +} + // worker runs a worker thread that just dequeues items, processes them, and // marks them done. You may run as many of these in parallel as you wish; the // workqueue guarantees that they will not end up processing the same service diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index e5f6d9d2323..705c10b6e0c 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -159,6 +159,49 @@ func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltest return httptest.NewServer(mux), &fakeEndpointsHandler } +// makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All +// block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will +// be sent in the response. +func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server { + + handlerFunc := func(res http.ResponseWriter, req *http.Request) { + if controller == nil { + res.WriteHeader(http.StatusInternalServerError) + res.Write([]byte("controller has not been set yet")) + return + } + + if req.Method == "POST" { + controller.endpointsStore.Add(endpoint) + blockNextAction <- struct{}{} + } + + if req.Method == "DELETE" { + go func() { + // Delay the deletion of endoints to make endpoint cache out of sync + <-blockDelete + controller.endpointsStore.Delete(endpoint) + controller.onEndpointsDelete(endpoint) + }() + blockNextAction <- struct{}{} + } + + res.WriteHeader(http.StatusOK) + res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{}))) + } + + mux := http.NewServeMux() + mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints", handlerFunc) + mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints/", handlerFunc) + mux.HandleFunc("/api/v1/namespaces/"+namespace+"/events", func(res http.ResponseWriter, req *http.Request) {}) + mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { + t.Errorf("unexpected request: %v", req.RequestURI) + http.Error(res, "", http.StatusNotFound) + }) + return httptest.NewServer(mux) + +} + type endpointController struct { *EndpointController podStore cache.Store @@ -1954,9 +1997,105 @@ func TestEndpointPortFromServicePort(t *testing.T) { } } +// TestMultipleServiceChanges tests that endpoints that are not created because of an out of sync endpoints cache are eventually recreated +// A service will be created. After the endpoints exist, the service will be deleted and the endpoints will not be deleted from the cache immediately. +// After the service is recreated, the endpoints will be deleted replicating an out of sync cache. Expect that eventually the endpoints will be recreated. +func TestMultipleServiceChanges(t *testing.T) { + ns := metav1.NamespaceDefault + expectedSubsets := []v1.EndpointSubset{{ + Addresses: []v1.EndpointAddress{ + {IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, + }, + }} + endpoint := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"}, + Subsets: expectedSubsets, + } + + controller := &endpointController{} + blockDelete := make(chan struct{}) + blockNextAction := make(chan struct{}) + stopChan := make(chan struct{}) + testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns) + defer testServer.Close() + + *controller = *newController(testServer.URL, 0*time.Second) + addPods(controller.podStore, ns, 1, 1, 0, ipv4only) + + go func() { controller.Run(1, stopChan) }() + + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: v1.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + ClusterIP: "None", + Ports: nil, + }, + } + + controller.serviceStore.Add(svc) + controller.onServiceUpdate(svc) + // blockNextAction should eventually unblock once server gets endpoint request. + waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Add should have caused a request to be sent to the test server") + + controller.serviceStore.Delete(svc) + controller.onServiceDelete(svc) + waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Delete should have caused a request to be sent to the test server") + + // If endpoints cache has not updated before service update is registered + // Services add will not trigger a Create endpoint request. + controller.serviceStore.Add(svc) + controller.onServiceUpdate(svc) + + // Ensure the work queue has been processed by looping for up to a second to prevent flakes. + wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) { + return controller.queue.Len() == 0, nil + }) + + // Cause test server to delete endpoints + close(blockDelete) + waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoint should have been recreated") + + close(blockNextAction) + close(stopChan) +} + +func TestEndpointsDeletionEvents(t *testing.T) { + ns := metav1.NamespaceDefault + testServer, _ := makeTestServer(t, ns) + defer testServer.Close() + controller := newController(testServer.URL, 0) + store := controller.endpointsStore + ep1 := &v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Namespace: ns, + ResourceVersion: "rv1", + }, + } + + // Test Unexpected and Expected Deletes + store.Delete(ep1) + controller.onEndpointsDelete(ep1) + + if controller.queue.Len() != 1 { + t.Errorf("Expected one service to be in the queue, found %d", controller.queue.Len()) + } +} + func stringVal(str *string) string { if str == nil { return "nil" } return *str } + +// waitForChanReceive blocks up to the timeout waiting for the receivingChan to receive +func waitForChanReceive(t *testing.T, timeout time.Duration, receivingChan chan struct{}, errorMsg string) { + timer := time.NewTimer(timeout) + select { + case <-timer.C: + t.Errorf(errorMsg) + case <-receivingChan: + } +}