diff --git a/pkg/controlplane/reconcilers/lease.go b/pkg/controlplane/reconcilers/lease.go index 0d38155c963..9911d6a4272 100644 --- a/pkg/controlplane/reconcilers/lease.go +++ b/pkg/controlplane/reconcilers/lease.go @@ -120,7 +120,8 @@ func (s *storageLeases) UpdateLease(ip string) error { // RemoveLease removes the lease on a master IP in storage func (s *storageLeases) RemoveLease(ip string) error { - return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil) + key := path.Join(s.baseKey, ip) + return s.storage.Delete(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil) } func (s *storageLeases) Destroy() { diff --git a/pkg/controlplane/reconcilers/lease_test.go b/pkg/controlplane/reconcilers/lease_test.go index c80078426bc..79195b3f54d 100644 --- a/pkg/controlplane/reconcilers/lease_test.go +++ b/pkg/controlplane/reconcilers/lease_test.go @@ -22,63 +22,85 @@ https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c */ import ( + "reflect" + "sort" "testing" + "time" + "github.com/google/uuid" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/apitesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime/serializer" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apiserver/pkg/storage" + etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing" + "k8s.io/apiserver/pkg/storage/storagebackend/factory" "k8s.io/client-go/kubernetes/fake" + "k8s.io/kubernetes/pkg/apis/core" netutils "k8s.io/utils/net" ) +func init() { + var scheme = runtime.NewScheme() + + metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion) + utilruntime.Must(core.AddToScheme(scheme)) + utilruntime.Must(corev1.AddToScheme(scheme)) + utilruntime.Must(scheme.SetVersionPriority(corev1.SchemeGroupVersion)) + + codecs = serializer.NewCodecFactory(scheme) +} + +var codecs serializer.CodecFactory + type fakeLeases struct { - keys map[string]bool + storageLeases } var _ Leases = &fakeLeases{} -func newFakeLeases() *fakeLeases { - return &fakeLeases{make(map[string]bool)} -} - -func (f *fakeLeases) ListLeases() ([]string, error) { - res := make([]string, 0, len(f.keys)) - for ip := range f.keys { - res = append(res, ip) +func newFakeLeases(t *testing.T, s storage.Interface) *fakeLeases { + // use the same base key used by the controlplane, but add a random + // prefix so we can reuse the etcd instance for subtests independently. + // pkg/controlplane/instance.go:268: + // masterLeases, err := reconcilers.NewLeases(config, "/masterleases/", ttl) + // ref: https://issues.k8s.io/114049 + base := "/" + uuid.New().String() + "/masterleases/" + return &fakeLeases{ + storageLeases{ + storage: s, + destroyFn: func() {}, + baseKey: base, + leaseTime: 1 * time.Minute, // avoid the lease to timeout on tests + }, } - return res, nil } -func (f *fakeLeases) UpdateLease(ip string) error { - f.keys[ip] = true - return nil -} - -func (f *fakeLeases) RemoveLease(ip string) error { - delete(f.keys, ip) - return nil -} - -func (f *fakeLeases) SetKeys(keys []string) { +func (f *fakeLeases) SetKeys(keys []string) error { for _, ip := range keys { - f.keys[ip] = false - } -} - -func (f *fakeLeases) GetUpdatedKeys() []string { - res := []string{} - for ip, updated := range f.keys { - if updated { - res = append(res, ip) + if err := f.UpdateLease(ip); err != nil { + return err } } - return res -} - -func (f *fakeLeases) Destroy() { + return nil } func TestLeaseEndpointReconciler(t *testing.T) { + server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + t.Cleanup(func() { server.Terminate(t) }) + + newFunc := func() runtime.Object { return &corev1.Endpoints{} } + sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) + + s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc) + if err != nil { + t.Fatalf("Error creating storage: %v", err) + } + t.Cleanup(dFunc) + reconcileTests := []struct { testName string serviceName string @@ -88,6 +110,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { initialState []runtime.Object expectUpdate []runtime.Object expectCreate []runtime.Object + expectLeases []string }{ { testName: "no existing endpoints", @@ -96,6 +119,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, initialState: nil, expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints satisfy", @@ -103,6 +127,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { ip: "1.2.3.4", endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints satisfy, no endpointslice", @@ -115,6 +140,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { expectCreate: []runtime.Object{ makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), }, + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpointslice satisfies, no endpoints", @@ -127,6 +153,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { expectCreate: []runtime.Object{ makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), }, + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints satisfy, endpointslice is wrong", @@ -140,6 +167,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { expectUpdate: []runtime.Object{ makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), }, + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpointslice satisfies, endpoints is wrong", @@ -153,6 +181,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { expectUpdate: []runtime.Object{ makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), }, + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints satisfy + refresh existing key", @@ -161,6 +190,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointKeys: []string{"1.2.3.4"}, initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints satisfy but too many", @@ -169,6 +199,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints satisfy but too many + extra masters", @@ -178,6 +209,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, }, { testName: "existing endpoints satisfy but too many + extra masters + delete first", @@ -187,6 +219,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointKeys: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, }, { testName: "existing endpoints current IP missing", @@ -196,6 +229,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointKeys: []string{"4.3.2.1"}, initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"4.3.2.1", "4.3.2.2"}, }, { testName: "existing endpoints wrong name", @@ -204,6 +238,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, initialState: makeEndpointsArray("bar", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints wrong IP", @@ -212,6 +247,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints wrong port", @@ -220,6 +256,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints wrong protocol", @@ -228,6 +265,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints wrong port name", @@ -236,6 +274,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints without skip mirror label", @@ -261,6 +300,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), // EndpointSlice does not get updated because it was already correct }, + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints extra service ports satisfy", @@ -278,6 +318,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { {Name: "baz", Port: 1010, Protocol: "TCP"}, }, ), + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints extra service ports missing port", @@ -294,17 +335,21 @@ func TestLeaseEndpointReconciler(t *testing.T) { {Name: "bar", Port: 1000, Protocol: "TCP"}, }, ), + expectLeases: []string{"1.2.3.4"}, }, } for _, test := range reconcileTests { t.Run(test.testName, func(t *testing.T) { - fakeLeases := newFakeLeases() - fakeLeases.SetKeys(test.endpointKeys) + fakeLeases := newFakeLeases(t, s) + err := fakeLeases.SetKeys(test.endpointKeys) + if err != nil { + t.Errorf("unexpected error creating keys: %v", err) + } clientset := fake.NewSimpleClientset(test.initialState...) epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1()) r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) - err := r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, true) + err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, true) if err != nil { t.Errorf("unexpected error reconciling: %v", err) } @@ -314,8 +359,15 @@ func TestLeaseEndpointReconciler(t *testing.T) { t.Errorf("unexpected error in side effects: %v", err) } - if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip { - t.Errorf("expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", updatedKeys) + leases, err := fakeLeases.ListLeases() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort for comparison + sort.Strings(leases) + sort.Strings(test.expectLeases) + if !reflect.DeepEqual(leases, test.expectLeases) { + t.Errorf("expected %v got: %v", test.expectLeases, leases) } }) } @@ -329,6 +381,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { initialState []runtime.Object expectUpdate []runtime.Object expectCreate []runtime.Object + expectLeases []string }{ { testName: "existing endpoints extra service ports missing port no update", @@ -340,6 +393,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { }, initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: nil, + expectLeases: []string{"1.2.3.4"}, }, { testName: "existing endpoints extra service ports, wrong ports, wrong IP", @@ -351,6 +405,7 @@ func TestLeaseEndpointReconciler(t *testing.T) { }, initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, { testName: "no existing endpoints", @@ -359,16 +414,20 @@ func TestLeaseEndpointReconciler(t *testing.T) { endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, initialState: nil, expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4"}, }, } for _, test := range nonReconcileTests { t.Run(test.testName, func(t *testing.T) { - fakeLeases := newFakeLeases() - fakeLeases.SetKeys(test.endpointKeys) + fakeLeases := newFakeLeases(t, s) + err := fakeLeases.SetKeys(test.endpointKeys) + if err != nil { + t.Errorf("unexpected error creating keys: %v", err) + } clientset := fake.NewSimpleClientset(test.initialState...) epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1()) r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) - err := r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false) + err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false) if err != nil { t.Errorf("unexpected error reconciling: %v", err) } @@ -378,14 +437,33 @@ func TestLeaseEndpointReconciler(t *testing.T) { t.Errorf("unexpected error in side effects: %v", err) } - if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip { - t.Errorf("expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", updatedKeys) + leases, err := fakeLeases.ListLeases() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort for comparison + sort.Strings(leases) + sort.Strings(test.expectLeases) + if !reflect.DeepEqual(leases, test.expectLeases) { + t.Errorf("expected %v got: %v", test.expectLeases, leases) } }) } } func TestLeaseRemoveEndpoints(t *testing.T) { + server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t) + t.Cleanup(func() { server.Terminate(t) }) + + newFunc := func() runtime.Object { return &corev1.Endpoints{} } + sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion) + + s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc) + if err != nil { + t.Fatalf("Error creating storage: %v", err) + } + t.Cleanup(dFunc) + stopTests := []struct { testName string serviceName string @@ -394,6 +472,7 @@ func TestLeaseRemoveEndpoints(t *testing.T) { endpointKeys []string initialState []runtime.Object expectUpdate []runtime.Object + expectLeases []string }{ { testName: "successful stop reconciling", @@ -403,6 +482,7 @@ func TestLeaseRemoveEndpoints(t *testing.T) { endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, }, { testName: "stop reconciling with ip not in endpoint ip list", @@ -411,26 +491,36 @@ func TestLeaseRemoveEndpoints(t *testing.T) { endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, }, { testName: "endpoint with no subset", serviceName: "foo", - ip: "5.6.7.8", + ip: "1.2.3.4", endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, initialState: makeEndpointsArray("foo", nil, nil), - expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), + expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, }, } for _, test := range stopTests { t.Run(test.testName, func(t *testing.T) { - fakeLeases := newFakeLeases() - fakeLeases.SetKeys(test.endpointKeys) + fakeLeases := newFakeLeases(t, s) + err := fakeLeases.SetKeys(test.endpointKeys) + if err != nil { + t.Errorf("unexpected error creating keys: %v", err) + } clientset := fake.NewSimpleClientset(test.initialState...) epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1()) r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) - err := r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts) - if err != nil { + err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts) + // if the ip is not on the endpoints, it must return an storage error and stop reconciling + if !contains(test.endpointKeys, test.ip) { + if !storage.IsNotFound(err) { + t.Errorf("expected error StorageError: key not found, Code: 1, Key: /registry/base/key/%s got: %v", test.ip, err) + } + } else if err != nil { t.Errorf("unexpected error reconciling: %v", err) } @@ -439,11 +529,25 @@ func TestLeaseRemoveEndpoints(t *testing.T) { t.Errorf("unexpected error in side effects: %v", err) } - for _, key := range fakeLeases.GetUpdatedKeys() { - if key == test.ip { - t.Errorf("Found ip %s in leases but shouldn't be there", key) - } + leases, err := fakeLeases.ListLeases() + if err != nil { + t.Errorf("unexpected error: %v", err) + } + // sort for comparison + sort.Strings(leases) + sort.Strings(test.expectLeases) + if !reflect.DeepEqual(leases, test.expectLeases) { + t.Errorf("expected %v got: %v", test.expectLeases, leases) } }) } } + +func contains(s []string, str string) bool { + for _, v := range s { + if v == str { + return true + } + } + return false +}