Fix endpoint reconciler failing to delete masterlease

add regression test for endpoint reconciler leases
This commit is contained in:
Wojciech Tyczyński 2022-11-24 11:26:11 +01:00 committed by Antonio Ojea
parent 8f2371bcce
commit 4ffca653ff
2 changed files with 160 additions and 55 deletions

View File

@ -120,7 +120,8 @@ func (s *storageLeases) UpdateLease(ip string) error {
// RemoveLease removes the lease on a master IP in storage // RemoveLease removes the lease on a master IP in storage
func (s *storageLeases) RemoveLease(ip string) error { func (s *storageLeases) RemoveLease(ip string) error {
return s.storage.Delete(apirequest.NewDefaultContext(), s.baseKey+"/"+ip, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil) key := path.Join(s.baseKey, ip)
return s.storage.Delete(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil)
} }
func (s *storageLeases) Destroy() { func (s *storageLeases) Destroy() {

View File

@ -22,63 +22,85 @@ https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c
*/ */
import ( import (
"reflect"
"sort"
"testing" "testing"
"time"
"github.com/google/uuid"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/apitesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/storage"
etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
"k8s.io/apiserver/pkg/storage/storagebackend/factory"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
"k8s.io/kubernetes/pkg/apis/core"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
) )
func init() {
var scheme = runtime.NewScheme()
metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
utilruntime.Must(core.AddToScheme(scheme))
utilruntime.Must(corev1.AddToScheme(scheme))
utilruntime.Must(scheme.SetVersionPriority(corev1.SchemeGroupVersion))
codecs = serializer.NewCodecFactory(scheme)
}
var codecs serializer.CodecFactory
type fakeLeases struct { type fakeLeases struct {
keys map[string]bool storageLeases
} }
var _ Leases = &fakeLeases{} var _ Leases = &fakeLeases{}
func newFakeLeases() *fakeLeases { func newFakeLeases(t *testing.T, s storage.Interface) *fakeLeases {
return &fakeLeases{make(map[string]bool)} // use the same base key used by the controlplane, but add a random
} // prefix so we can reuse the etcd instance for subtests independently.
// pkg/controlplane/instance.go:268:
func (f *fakeLeases) ListLeases() ([]string, error) { // masterLeases, err := reconcilers.NewLeases(config, "/masterleases/", ttl)
res := make([]string, 0, len(f.keys)) // ref: https://issues.k8s.io/114049
for ip := range f.keys { base := "/" + uuid.New().String() + "/masterleases/"
res = append(res, ip) return &fakeLeases{
storageLeases{
storage: s,
destroyFn: func() {},
baseKey: base,
leaseTime: 1 * time.Minute, // avoid the lease to timeout on tests
},
} }
return res, nil
} }
func (f *fakeLeases) UpdateLease(ip string) error { func (f *fakeLeases) SetKeys(keys []string) error {
f.keys[ip] = true
return nil
}
func (f *fakeLeases) RemoveLease(ip string) error {
delete(f.keys, ip)
return nil
}
func (f *fakeLeases) SetKeys(keys []string) {
for _, ip := range keys { for _, ip := range keys {
f.keys[ip] = false if err := f.UpdateLease(ip); err != nil {
} return err
}
func (f *fakeLeases) GetUpdatedKeys() []string {
res := []string{}
for ip, updated := range f.keys {
if updated {
res = append(res, ip)
} }
} }
return res return nil
}
func (f *fakeLeases) Destroy() {
} }
func TestLeaseEndpointReconciler(t *testing.T) { func TestLeaseEndpointReconciler(t *testing.T) {
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}
t.Cleanup(dFunc)
reconcileTests := []struct { reconcileTests := []struct {
testName string testName string
serviceName string serviceName string
@ -88,6 +110,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
initialState []runtime.Object initialState []runtime.Object
expectUpdate []runtime.Object expectUpdate []runtime.Object
expectCreate []runtime.Object expectCreate []runtime.Object
expectLeases []string
}{ }{
{ {
testName: "no existing endpoints", testName: "no existing endpoints",
@ -96,6 +119,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: nil, initialState: nil,
expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints satisfy", testName: "existing endpoints satisfy",
@ -103,6 +127,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints satisfy, no endpointslice", testName: "existing endpoints satisfy, no endpointslice",
@ -115,6 +140,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
expectCreate: []runtime.Object{ expectCreate: []runtime.Object{
makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
}, },
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpointslice satisfies, no endpoints", testName: "existing endpointslice satisfies, no endpoints",
@ -127,6 +153,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
expectCreate: []runtime.Object{ expectCreate: []runtime.Object{
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
}, },
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints satisfy, endpointslice is wrong", testName: "existing endpoints satisfy, endpointslice is wrong",
@ -140,6 +167,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
expectUpdate: []runtime.Object{ expectUpdate: []runtime.Object{
makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
}, },
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpointslice satisfies, endpoints is wrong", testName: "existing endpointslice satisfies, endpoints is wrong",
@ -153,6 +181,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
expectUpdate: []runtime.Object{ expectUpdate: []runtime.Object{
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
}, },
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints satisfy + refresh existing key", testName: "existing endpoints satisfy + refresh existing key",
@ -161,6 +190,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4"}, endpointKeys: []string{"1.2.3.4"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints satisfy but too many", testName: "existing endpoints satisfy but too many",
@ -169,6 +199,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints satisfy but too many + extra masters", testName: "existing endpoints satisfy but too many + extra masters",
@ -178,6 +209,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
}, },
{ {
testName: "existing endpoints satisfy but too many + extra masters + delete first", testName: "existing endpoints satisfy but too many + extra masters + delete first",
@ -187,6 +219,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointKeys: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, endpointKeys: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
}, },
{ {
testName: "existing endpoints current IP missing", testName: "existing endpoints current IP missing",
@ -196,6 +229,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointKeys: []string{"4.3.2.1"}, endpointKeys: []string{"4.3.2.1"},
initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"4.3.2.1", "4.3.2.2"},
}, },
{ {
testName: "existing endpoints wrong name", testName: "existing endpoints wrong name",
@ -204,6 +238,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: makeEndpointsArray("bar", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("bar", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints wrong IP", testName: "existing endpoints wrong IP",
@ -212,6 +247,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints wrong port", testName: "existing endpoints wrong port",
@ -220,6 +256,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints wrong protocol", testName: "existing endpoints wrong protocol",
@ -228,6 +265,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints wrong port name", testName: "existing endpoints wrong port name",
@ -236,6 +274,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints without skip mirror label", testName: "existing endpoints without skip mirror label",
@ -261,6 +300,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
// EndpointSlice does not get updated because it was already correct // EndpointSlice does not get updated because it was already correct
}, },
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints extra service ports satisfy", testName: "existing endpoints extra service ports satisfy",
@ -278,6 +318,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
{Name: "baz", Port: 1010, Protocol: "TCP"}, {Name: "baz", Port: 1010, Protocol: "TCP"},
}, },
), ),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints extra service ports missing port", testName: "existing endpoints extra service ports missing port",
@ -294,17 +335,21 @@ func TestLeaseEndpointReconciler(t *testing.T) {
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
}, },
), ),
expectLeases: []string{"1.2.3.4"},
}, },
} }
for _, test := range reconcileTests { for _, test := range reconcileTests {
t.Run(test.testName, func(t *testing.T) { t.Run(test.testName, func(t *testing.T) {
fakeLeases := newFakeLeases() fakeLeases := newFakeLeases(t, s)
fakeLeases.SetKeys(test.endpointKeys) err := fakeLeases.SetKeys(test.endpointKeys)
if err != nil {
t.Errorf("unexpected error creating keys: %v", err)
}
clientset := fake.NewSimpleClientset(test.initialState...) clientset := fake.NewSimpleClientset(test.initialState...)
epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1()) epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
err := r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, true) err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, true)
if err != nil { if err != nil {
t.Errorf("unexpected error reconciling: %v", err) t.Errorf("unexpected error reconciling: %v", err)
} }
@ -314,8 +359,15 @@ func TestLeaseEndpointReconciler(t *testing.T) {
t.Errorf("unexpected error in side effects: %v", err) t.Errorf("unexpected error in side effects: %v", err)
} }
if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip { leases, err := fakeLeases.ListLeases()
t.Errorf("expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", updatedKeys) if err != nil {
t.Errorf("unexpected error: %v", err)
}
// sort for comparison
sort.Strings(leases)
sort.Strings(test.expectLeases)
if !reflect.DeepEqual(leases, test.expectLeases) {
t.Errorf("expected %v got: %v", test.expectLeases, leases)
} }
}) })
} }
@ -329,6 +381,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
initialState []runtime.Object initialState []runtime.Object
expectUpdate []runtime.Object expectUpdate []runtime.Object
expectCreate []runtime.Object expectCreate []runtime.Object
expectLeases []string
}{ }{
{ {
testName: "existing endpoints extra service ports missing port no update", testName: "existing endpoints extra service ports missing port no update",
@ -340,6 +393,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
}, },
initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: nil, expectUpdate: nil,
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "existing endpoints extra service ports, wrong ports, wrong IP", testName: "existing endpoints extra service ports, wrong ports, wrong IP",
@ -351,6 +405,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
}, },
initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
{ {
testName: "no existing endpoints", testName: "no existing endpoints",
@ -359,16 +414,20 @@ func TestLeaseEndpointReconciler(t *testing.T) {
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: nil, initialState: nil,
expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4"},
}, },
} }
for _, test := range nonReconcileTests { for _, test := range nonReconcileTests {
t.Run(test.testName, func(t *testing.T) { t.Run(test.testName, func(t *testing.T) {
fakeLeases := newFakeLeases() fakeLeases := newFakeLeases(t, s)
fakeLeases.SetKeys(test.endpointKeys) err := fakeLeases.SetKeys(test.endpointKeys)
if err != nil {
t.Errorf("unexpected error creating keys: %v", err)
}
clientset := fake.NewSimpleClientset(test.initialState...) clientset := fake.NewSimpleClientset(test.initialState...)
epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1()) epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
err := r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false) err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
if err != nil { if err != nil {
t.Errorf("unexpected error reconciling: %v", err) t.Errorf("unexpected error reconciling: %v", err)
} }
@ -378,14 +437,33 @@ func TestLeaseEndpointReconciler(t *testing.T) {
t.Errorf("unexpected error in side effects: %v", err) t.Errorf("unexpected error in side effects: %v", err)
} }
if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip { leases, err := fakeLeases.ListLeases()
t.Errorf("expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", updatedKeys) if err != nil {
t.Errorf("unexpected error: %v", err)
}
// sort for comparison
sort.Strings(leases)
sort.Strings(test.expectLeases)
if !reflect.DeepEqual(leases, test.expectLeases) {
t.Errorf("expected %v got: %v", test.expectLeases, leases)
} }
}) })
} }
} }
func TestLeaseRemoveEndpoints(t *testing.T) { func TestLeaseRemoveEndpoints(t *testing.T) {
server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
t.Cleanup(func() { server.Terminate(t) })
newFunc := func() runtime.Object { return &corev1.Endpoints{} }
sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc)
if err != nil {
t.Fatalf("Error creating storage: %v", err)
}
t.Cleanup(dFunc)
stopTests := []struct { stopTests := []struct {
testName string testName string
serviceName string serviceName string
@ -394,6 +472,7 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
endpointKeys []string endpointKeys []string
initialState []runtime.Object initialState []runtime.Object
expectUpdate []runtime.Object expectUpdate []runtime.Object
expectLeases []string
}{ }{
{ {
testName: "successful stop reconciling", testName: "successful stop reconciling",
@ -403,6 +482,7 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"},
}, },
{ {
testName: "stop reconciling with ip not in endpoint ip list", testName: "stop reconciling with ip not in endpoint ip list",
@ -411,26 +491,36 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
}, },
{ {
testName: "endpoint with no subset", testName: "endpoint with no subset",
serviceName: "foo", serviceName: "foo",
ip: "5.6.7.8", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
initialState: makeEndpointsArray("foo", nil, nil), initialState: makeEndpointsArray("foo", nil, nil),
expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}), expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"},
}, },
} }
for _, test := range stopTests { for _, test := range stopTests {
t.Run(test.testName, func(t *testing.T) { t.Run(test.testName, func(t *testing.T) {
fakeLeases := newFakeLeases() fakeLeases := newFakeLeases(t, s)
fakeLeases.SetKeys(test.endpointKeys) err := fakeLeases.SetKeys(test.endpointKeys)
if err != nil {
t.Errorf("unexpected error creating keys: %v", err)
}
clientset := fake.NewSimpleClientset(test.initialState...) clientset := fake.NewSimpleClientset(test.initialState...)
epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1()) epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
err := r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts) err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
if err != nil { // if the ip is not on the endpoints, it must return an storage error and stop reconciling
if !contains(test.endpointKeys, test.ip) {
if !storage.IsNotFound(err) {
t.Errorf("expected error StorageError: key not found, Code: 1, Key: /registry/base/key/%s got: %v", test.ip, err)
}
} else if err != nil {
t.Errorf("unexpected error reconciling: %v", err) t.Errorf("unexpected error reconciling: %v", err)
} }
@ -439,11 +529,25 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
t.Errorf("unexpected error in side effects: %v", err) t.Errorf("unexpected error in side effects: %v", err)
} }
for _, key := range fakeLeases.GetUpdatedKeys() { leases, err := fakeLeases.ListLeases()
if key == test.ip { if err != nil {
t.Errorf("Found ip %s in leases but shouldn't be there", key) t.Errorf("unexpected error: %v", err)
} }
// sort for comparison
sort.Strings(leases)
sort.Strings(test.expectLeases)
if !reflect.DeepEqual(leases, test.expectLeases) {
t.Errorf("expected %v got: %v", test.expectLeases, leases)
} }
}) })
} }
} }
func contains(s []string, str string) bool {
for _, v := range s {
if v == str {
return true
}
}
return false
}