diff --git a/pkg/proxy/config/api_test.go b/pkg/proxy/config/api_test.go index 64689c498aa..455c8f14836 100644 --- a/pkg/proxy/config/api_test.go +++ b/pkg/proxy/config/api_test.go @@ -18,15 +18,15 @@ package config import ( "reflect" - "sync" "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" - informers "k8s.io/client-go/informers" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" ktesting "k8s.io/client-go/testing" ) @@ -148,34 +148,6 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) { handler.ValidateEndpoints(t, []*v1.Endpoints{}) } -func newSvcHandler(t *testing.T, svcs []*v1.Service, done func()) ServiceHandler { - shm := &ServiceHandlerMock{ - state: make(map[types.NamespacedName]*v1.Service), - } - var callDoneOnce sync.Once - shm.process = func(services []*v1.Service) { - defer callDoneOnce.Do(done) - if !reflect.DeepEqual(services, svcs) { - t.Errorf("Unexpected services: %#v, expected: %#v", services, svcs) - } - } - return shm -} - -func newEpsHandler(t *testing.T, eps []*v1.Endpoints, done func()) EndpointsHandler { - ehm := &EndpointsHandlerMock{ - state: make(map[types.NamespacedName]*v1.Endpoints), - } - var callDoneOnce sync.Once - ehm.process = func(endpoints []*v1.Endpoints) { - defer callDoneOnce.Do(done) - if !reflect.DeepEqual(eps, endpoints) { - t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, eps) - } - } - return ehm -} - func TestInitialSync(t *testing.T) { svc1 := &v1.Service{ ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"}, @@ -192,25 +164,73 @@ func TestInitialSync(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"}, } - var wg sync.WaitGroup - // Wait for both services and endpoints handler. - wg.Add(2) + expectedSvcState := map[types.NamespacedName]*v1.Service{ + {Name: svc1.Name, Namespace: svc1.Namespace}: svc1, + {Name: svc2.Name, Namespace: svc2.Namespace}: svc2, + } + expectedEpsState := map[types.NamespacedName]*v1.Endpoints{ + {Name: eps1.Name, Namespace: eps1.Namespace}: eps1, + {Name: eps2.Name, Namespace: eps2.Namespace}: eps2, + } // Setup fake api client. client := fake.NewSimpleClientset(svc1, svc2, eps2, eps1) sharedInformers := informers.NewSharedInformerFactory(client, 0) svcConfig := NewServiceConfig(sharedInformers.Core().V1().Services(), 0) - epsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), 0) - svcHandler := newSvcHandler(t, []*v1.Service{svc2, svc1}, wg.Done) + svcHandler := NewServiceHandlerMock() svcConfig.RegisterEventHandler(svcHandler) - epsHandler := newEpsHandler(t, []*v1.Endpoints{eps2, eps1}, wg.Done) + + epsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), 0) + epsHandler := NewEndpointsHandlerMock() epsConfig.RegisterEventHandler(epsHandler) stopCh := make(chan struct{}) defer close(stopCh) - go sharedInformers.Start(stopCh) - go svcConfig.Run(stopCh) - go epsConfig.Run(stopCh) - wg.Wait() + sharedInformers.Start(stopCh) + + err := wait.PollImmediate(time.Millisecond*10, wait.ForeverTestTimeout, func() (bool, error) { + svcHandler.lock.Lock() + defer svcHandler.lock.Unlock() + if reflect.DeepEqual(svcHandler.state, expectedSvcState) { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatal("Timed out waiting for the completion of handler `OnServiceAdd`") + } + + err = wait.PollImmediate(time.Millisecond*10, wait.ForeverTestTimeout, func() (bool, error) { + epsHandler.lock.Lock() + defer epsHandler.lock.Unlock() + if reflect.DeepEqual(epsHandler.state, expectedEpsState) { + return true, nil + } + return false, nil + }) + if err != nil { + t.Fatal("Timed out waiting for the completion of handler `OnEndpointsAdd`") + } + + svcConfig.Run(stopCh) + epsConfig.Run(stopCh) + + gotSvc := <-svcHandler.updated + gotSvcState := make(map[types.NamespacedName]*v1.Service, len(gotSvc)) + for _, svc := range gotSvc { + gotSvcState[types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}] = svc + } + if !reflect.DeepEqual(gotSvcState, expectedSvcState) { + t.Fatalf("Expected service state: %v\nGot: %v\n", expectedSvcState, gotSvcState) + } + + gotEps := <-epsHandler.updated + gotEpsState := make(map[types.NamespacedName]*v1.Endpoints, len(gotEps)) + for _, eps := range gotEps { + gotEpsState[types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}] = eps + } + if !reflect.DeepEqual(gotEpsState, expectedEpsState) { + t.Fatalf("Expected endpoints state: %v\nGot: %v\n", expectedEpsState, gotEpsState) + } }