mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 11:21:47 +00:00
Merge pull request #94564 from knight42/fix/TestInitialSync
test(proxy::config): deflake TestInitialSync
This commit is contained in:
commit
496f16c5ba
@ -18,15 +18,15 @@ package config
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
informers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
ktesting "k8s.io/client-go/testing"
|
||||
)
|
||||
@ -148,34 +148,6 @@ func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
|
||||
handler.ValidateEndpoints(t, []*v1.Endpoints{})
|
||||
}
|
||||
|
||||
func newSvcHandler(t *testing.T, svcs []*v1.Service, done func()) ServiceHandler {
|
||||
shm := &ServiceHandlerMock{
|
||||
state: make(map[types.NamespacedName]*v1.Service),
|
||||
}
|
||||
var callDoneOnce sync.Once
|
||||
shm.process = func(services []*v1.Service) {
|
||||
defer callDoneOnce.Do(done)
|
||||
if !reflect.DeepEqual(services, svcs) {
|
||||
t.Errorf("Unexpected services: %#v, expected: %#v", services, svcs)
|
||||
}
|
||||
}
|
||||
return shm
|
||||
}
|
||||
|
||||
func newEpsHandler(t *testing.T, eps []*v1.Endpoints, done func()) EndpointsHandler {
|
||||
ehm := &EndpointsHandlerMock{
|
||||
state: make(map[types.NamespacedName]*v1.Endpoints),
|
||||
}
|
||||
var callDoneOnce sync.Once
|
||||
ehm.process = func(endpoints []*v1.Endpoints) {
|
||||
defer callDoneOnce.Do(done)
|
||||
if !reflect.DeepEqual(eps, endpoints) {
|
||||
t.Errorf("Unexpected endpoints: %#v, expected: %#v", endpoints, eps)
|
||||
}
|
||||
}
|
||||
return ehm
|
||||
}
|
||||
|
||||
func TestInitialSync(t *testing.T) {
|
||||
svc1 := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
|
||||
@ -192,25 +164,73 @@ func TestInitialSync(t *testing.T) {
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
// Wait for both services and endpoints handler.
|
||||
wg.Add(2)
|
||||
expectedSvcState := map[types.NamespacedName]*v1.Service{
|
||||
{Name: svc1.Name, Namespace: svc1.Namespace}: svc1,
|
||||
{Name: svc2.Name, Namespace: svc2.Namespace}: svc2,
|
||||
}
|
||||
expectedEpsState := map[types.NamespacedName]*v1.Endpoints{
|
||||
{Name: eps1.Name, Namespace: eps1.Namespace}: eps1,
|
||||
{Name: eps2.Name, Namespace: eps2.Namespace}: eps2,
|
||||
}
|
||||
|
||||
// Setup fake api client.
|
||||
client := fake.NewSimpleClientset(svc1, svc2, eps2, eps1)
|
||||
sharedInformers := informers.NewSharedInformerFactory(client, 0)
|
||||
|
||||
svcConfig := NewServiceConfig(sharedInformers.Core().V1().Services(), 0)
|
||||
epsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), 0)
|
||||
svcHandler := newSvcHandler(t, []*v1.Service{svc2, svc1}, wg.Done)
|
||||
svcHandler := NewServiceHandlerMock()
|
||||
svcConfig.RegisterEventHandler(svcHandler)
|
||||
epsHandler := newEpsHandler(t, []*v1.Endpoints{eps2, eps1}, wg.Done)
|
||||
|
||||
epsConfig := NewEndpointsConfig(sharedInformers.Core().V1().Endpoints(), 0)
|
||||
epsHandler := NewEndpointsHandlerMock()
|
||||
epsConfig.RegisterEventHandler(epsHandler)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
go sharedInformers.Start(stopCh)
|
||||
go svcConfig.Run(stopCh)
|
||||
go epsConfig.Run(stopCh)
|
||||
wg.Wait()
|
||||
sharedInformers.Start(stopCh)
|
||||
|
||||
err := wait.PollImmediate(time.Millisecond*10, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
svcHandler.lock.Lock()
|
||||
defer svcHandler.lock.Unlock()
|
||||
if reflect.DeepEqual(svcHandler.state, expectedSvcState) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("Timed out waiting for the completion of handler `OnServiceAdd`")
|
||||
}
|
||||
|
||||
err = wait.PollImmediate(time.Millisecond*10, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
epsHandler.lock.Lock()
|
||||
defer epsHandler.lock.Unlock()
|
||||
if reflect.DeepEqual(epsHandler.state, expectedEpsState) {
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal("Timed out waiting for the completion of handler `OnEndpointsAdd`")
|
||||
}
|
||||
|
||||
svcConfig.Run(stopCh)
|
||||
epsConfig.Run(stopCh)
|
||||
|
||||
gotSvc := <-svcHandler.updated
|
||||
gotSvcState := make(map[types.NamespacedName]*v1.Service, len(gotSvc))
|
||||
for _, svc := range gotSvc {
|
||||
gotSvcState[types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}] = svc
|
||||
}
|
||||
if !reflect.DeepEqual(gotSvcState, expectedSvcState) {
|
||||
t.Fatalf("Expected service state: %v\nGot: %v\n", expectedSvcState, gotSvcState)
|
||||
}
|
||||
|
||||
gotEps := <-epsHandler.updated
|
||||
gotEpsState := make(map[types.NamespacedName]*v1.Endpoints, len(gotEps))
|
||||
for _, eps := range gotEps {
|
||||
gotEpsState[types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}] = eps
|
||||
}
|
||||
if !reflect.DeepEqual(gotEpsState, expectedEpsState) {
|
||||
t.Fatalf("Expected endpoints state: %v\nGot: %v\n", expectedEpsState, gotEpsState)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user