Merge pull request #93030 from swetharepakula/endpoint-tracker

Requeue Service after Endpoint Deletion
This commit is contained in:
Kubernetes Prow Robot 2020-08-10 18:28:16 -07:00 committed by GitHub
commit 15a3d46db1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 151 additions and 0 deletions

View File

@ -110,6 +110,9 @@ func NewEndpointController(podInformer coreinformers.PodInformer, serviceInforme
e.podLister = podInformer.Lister()
e.podsSynced = podInformer.Informer().HasSynced
endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: e.onEndpointsDelete,
})
e.endpointsLister = endpointsInformer.Lister()
e.endpointsSynced = endpointsInformer.Informer().HasSynced
@ -287,6 +290,15 @@ func (e *EndpointController) onServiceDelete(obj interface{}) {
e.queue.Add(key)
}
func (e *EndpointController) onEndpointsDelete(obj interface{}) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %+v: %v", obj, err))
return
}
e.queue.Add(key)
}
// worker runs a worker thread that just dequeues items, processes them, and
// marks them done. You may run as many of these in parallel as you wish; the
// workqueue guarantees that they will not end up processing the same service

View File

@ -159,6 +159,49 @@ func makeTestServer(t *testing.T, namespace string) (*httptest.Server, *utiltest
return httptest.NewServer(mux), &fakeEndpointsHandler
}
// makeBlockingEndpointDeleteTestServer will signal the blockNextAction channel on endpoint "POST" & "DELETE" requests. All
// block endpoint "DELETE" requestsi will wait on a blockDelete signal to delete endpoint. If controller is nil, a error will
// be sent in the response.
func makeBlockingEndpointDeleteTestServer(t *testing.T, controller *endpointController, endpoint *v1.Endpoints, blockDelete, blockNextAction chan struct{}, namespace string) *httptest.Server {
handlerFunc := func(res http.ResponseWriter, req *http.Request) {
if controller == nil {
res.WriteHeader(http.StatusInternalServerError)
res.Write([]byte("controller has not been set yet"))
return
}
if req.Method == "POST" {
controller.endpointsStore.Add(endpoint)
blockNextAction <- struct{}{}
}
if req.Method == "DELETE" {
go func() {
// Delay the deletion of endoints to make endpoint cache out of sync
<-blockDelete
controller.endpointsStore.Delete(endpoint)
controller.onEndpointsDelete(endpoint)
}()
blockNextAction <- struct{}{}
}
res.WriteHeader(http.StatusOK)
res.Write([]byte(runtime.EncodeOrDie(clientscheme.Codecs.LegacyCodec(v1.SchemeGroupVersion), &v1.Endpoints{})))
}
mux := http.NewServeMux()
mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints", handlerFunc)
mux.HandleFunc("/api/v1/namespaces/"+namespace+"/endpoints/", handlerFunc)
mux.HandleFunc("/api/v1/namespaces/"+namespace+"/events", func(res http.ResponseWriter, req *http.Request) {})
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
http.Error(res, "", http.StatusNotFound)
})
return httptest.NewServer(mux)
}
type endpointController struct {
*EndpointController
podStore cache.Store
@ -1954,9 +1997,105 @@ func TestEndpointPortFromServicePort(t *testing.T) {
}
}
// TestMultipleServiceChanges tests that endpoints that are not created because of an out of sync endpoints cache are eventually recreated
// A service will be created. After the endpoints exist, the service will be deleted and the endpoints will not be deleted from the cache immediately.
// After the service is recreated, the endpoints will be deleted replicating an out of sync cache. Expect that eventually the endpoints will be recreated.
func TestMultipleServiceChanges(t *testing.T) {
ns := metav1.NamespaceDefault
expectedSubsets := []v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{
{IP: "1.2.3.4", NodeName: &emptyNodeName, TargetRef: &v1.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
},
}}
endpoint := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns, ResourceVersion: "1"},
Subsets: expectedSubsets,
}
controller := &endpointController{}
blockDelete := make(chan struct{})
blockNextAction := make(chan struct{})
stopChan := make(chan struct{})
testServer := makeBlockingEndpointDeleteTestServer(t, controller, endpoint, blockDelete, blockNextAction, ns)
defer testServer.Close()
*controller = *newController(testServer.URL, 0*time.Second)
addPods(controller.podStore, ns, 1, 1, 0, ipv4only)
go func() { controller.Run(1, stopChan) }()
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns},
Spec: v1.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
ClusterIP: "None",
Ports: nil,
},
}
controller.serviceStore.Add(svc)
controller.onServiceUpdate(svc)
// blockNextAction should eventually unblock once server gets endpoint request.
waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Add should have caused a request to be sent to the test server")
controller.serviceStore.Delete(svc)
controller.onServiceDelete(svc)
waitForChanReceive(t, 1*time.Second, blockNextAction, "Service Delete should have caused a request to be sent to the test server")
// If endpoints cache has not updated before service update is registered
// Services add will not trigger a Create endpoint request.
controller.serviceStore.Add(svc)
controller.onServiceUpdate(svc)
// Ensure the work queue has been processed by looping for up to a second to prevent flakes.
wait.PollImmediate(50*time.Millisecond, 1*time.Second, func() (bool, error) {
return controller.queue.Len() == 0, nil
})
// Cause test server to delete endpoints
close(blockDelete)
waitForChanReceive(t, 1*time.Second, blockNextAction, "Endpoint should have been recreated")
close(blockNextAction)
close(stopChan)
}
func TestEndpointsDeletionEvents(t *testing.T) {
ns := metav1.NamespaceDefault
testServer, _ := makeTestServer(t, ns)
defer testServer.Close()
controller := newController(testServer.URL, 0)
store := controller.endpointsStore
ep1 := &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: ns,
ResourceVersion: "rv1",
},
}
// Test Unexpected and Expected Deletes
store.Delete(ep1)
controller.onEndpointsDelete(ep1)
if controller.queue.Len() != 1 {
t.Errorf("Expected one service to be in the queue, found %d", controller.queue.Len())
}
}
func stringVal(str *string) string {
if str == nil {
return "nil"
}
return *str
}
// waitForChanReceive blocks up to the timeout waiting for the receivingChan to receive
func waitForChanReceive(t *testing.T, timeout time.Duration, receivingChan chan struct{}, errorMsg string) {
timer := time.NewTimer(timeout)
select {
case <-timer.C:
t.Errorf(errorMsg)
case <-receivingChan:
}
}