Merge pull request #107878 from danwinship/apiserver-endpointslice-cleanup

Clean up some EndpointSlice-related code in apiserver endpoint reconciler
This commit is contained in:
Kubernetes Prow Robot 2022-06-01 17:55:45 -07:00 committed by GitHub
commit 901434942c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 616 additions and 1044 deletions

View File

@ -57,8 +57,7 @@ func (adapter *EndpointsAdapter) Get(namespace, name string, getOpts metav1.GetO
} }
// Create accepts a namespace and Endpoints object and creates the Endpoints // Create accepts a namespace and Endpoints object and creates the Endpoints
// object. If an endpointSliceClient exists, a matching EndpointSlice will also // object and matching EndpointSlice. The created Endpoints object or an error will be
// be created or updated. The created Endpoints object or an error will be
// returned. // returned.
func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) { func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(context.TODO(), endpoints, metav1.CreateOptions{}) endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(context.TODO(), endpoints, metav1.CreateOptions{})
@ -68,9 +67,8 @@ func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endp
return endpoints, err return endpoints, err
} }
// Update accepts a namespace and Endpoints object and updates it. If an // Update accepts a namespace and Endpoints object and updates it and its
// endpointSliceClient exists, a matching EndpointSlice will also be created or // matching EndpointSlice. The updated Endpoints object or an error will be returned.
// updated. The updated Endpoints object or an error will be returned.
func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) { func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(context.TODO(), endpoints, metav1.UpdateOptions{}) endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(context.TODO(), endpoints, metav1.UpdateOptions{})
if err == nil { if err == nil {
@ -80,12 +78,9 @@ func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endp
} }
// EnsureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource // EnsureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource
// and creates or updates a corresponding EndpointSlice if an endpointSliceClient // and creates or updates a corresponding EndpointSlice. An error will be returned
// exists. An error will be returned if it fails to sync the EndpointSlice. // if it fails to sync the EndpointSlice.
func (adapter *EndpointsAdapter) EnsureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) error { func (adapter *EndpointsAdapter) EnsureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) error {
if adapter.endpointSliceClient == nil {
return nil
}
endpointSlice := endpointSliceFromEndpoints(endpoints) endpointSlice := endpointSliceFromEndpoints(endpoints)
currentEndpointSlice, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(context.TODO(), endpointSlice.Name, metav1.GetOptions{}) currentEndpointSlice, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(context.TODO(), endpointSlice.Name, metav1.GetOptions{})
@ -123,6 +118,7 @@ func (adapter *EndpointsAdapter) EnsureEndpointSliceFromEndpoints(namespace stri
func endpointSliceFromEndpoints(endpoints *corev1.Endpoints) *discovery.EndpointSlice { func endpointSliceFromEndpoints(endpoints *corev1.Endpoints) *discovery.EndpointSlice {
endpointSlice := &discovery.EndpointSlice{} endpointSlice := &discovery.EndpointSlice{}
endpointSlice.Name = endpoints.Name endpointSlice.Name = endpoints.Name
endpointSlice.Namespace = endpoints.Namespace
endpointSlice.Labels = map[string]string{discovery.LabelServiceName: endpoints.Name} endpointSlice.Labels = map[string]string{discovery.LabelServiceName: endpoints.Name}
// TODO: Add support for dual stack here (and in the rest of // TODO: Add support for dual stack here (and in the rest of

View File

@ -26,50 +26,53 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
) )
func TestEndpointsAdapterGet(t *testing.T) { func TestEndpointsAdapterGet(t *testing.T) {
endpoints1, _ := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4"}) endpoints1, epSlice1 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4"})
testCases := map[string]struct { testCases := map[string]struct {
endpointSlicesEnabled bool
expectedError error expectedError error
expectedEndpoints *corev1.Endpoints expectedEndpoints *corev1.Endpoints
endpoints []*corev1.Endpoints initialState []runtime.Object
namespaceParam string namespaceParam string
nameParam string nameParam string
}{ }{
"single-existing-endpoints": { "single-existing-endpoints": {
endpointSlicesEnabled: false,
expectedError: nil, expectedError: nil,
expectedEndpoints: endpoints1, expectedEndpoints: endpoints1,
endpoints: []*corev1.Endpoints{endpoints1}, initialState: []runtime.Object{endpoints1, epSlice1},
namespaceParam: "testing", namespaceParam: "testing",
nameParam: "foo", nameParam: "foo",
}, },
"single-existing-endpoints-slices-enabled": { "endpoints exists, endpointslice does not": {
endpointSlicesEnabled: true,
expectedError: nil, expectedError: nil,
expectedEndpoints: endpoints1, expectedEndpoints: endpoints1,
endpoints: []*corev1.Endpoints{endpoints1}, initialState: []runtime.Object{endpoints1},
namespaceParam: "testing",
nameParam: "foo",
},
"endpointslice exists, endpoints does not": {
expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "foo"),
expectedEndpoints: nil,
initialState: []runtime.Object{epSlice1},
namespaceParam: "testing", namespaceParam: "testing",
nameParam: "foo", nameParam: "foo",
}, },
"wrong-namespace": { "wrong-namespace": {
endpointSlicesEnabled: false,
expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "foo"), expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "foo"),
expectedEndpoints: nil, expectedEndpoints: nil,
endpoints: []*corev1.Endpoints{endpoints1}, initialState: []runtime.Object{endpoints1, epSlice1},
namespaceParam: "foo", namespaceParam: "foo",
nameParam: "foo", nameParam: "foo",
}, },
"wrong-name": { "wrong-name": {
endpointSlicesEnabled: false,
expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "bar"), expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "bar"),
expectedEndpoints: nil, expectedEndpoints: nil,
endpoints: []*corev1.Endpoints{endpoints1}, initialState: []runtime.Object{endpoints1, epSlice1},
namespaceParam: "testing", namespaceParam: "testing",
nameParam: "bar", nameParam: "bar",
}, },
@ -77,18 +80,8 @@ func TestEndpointsAdapterGet(t *testing.T) {
for name, testCase := range testCases { for name, testCase := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset(testCase.initialState...)
epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()} epAdapter := NewEndpointsAdapter(client.CoreV1(), client.DiscoveryV1())
if testCase.endpointSlicesEnabled {
epAdapter.endpointSliceClient = client.DiscoveryV1()
}
for _, endpoints := range testCase.endpoints {
_, err := client.CoreV1().Endpoints(endpoints.Namespace).Create(context.TODO(), endpoints, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating Endpoints: %v", err)
}
}
endpoints, err := epAdapter.Get(testCase.namespaceParam, testCase.nameParam, metav1.GetOptions{}) endpoints, err := epAdapter.Get(testCase.namespaceParam, testCase.nameParam, metav1.GetOptions{})
@ -117,76 +110,75 @@ func TestEndpointsAdapterCreate(t *testing.T) {
epSlice3.AddressType = discovery.AddressTypeIPv6 epSlice3.AddressType = discovery.AddressTypeIPv6
testCases := map[string]struct { testCases := map[string]struct {
endpointSlicesEnabled bool
expectedError error expectedError error
expectedEndpoints *corev1.Endpoints expectedResult *corev1.Endpoints
expectedEndpointSlice *discovery.EndpointSlice expectCreate []runtime.Object
endpoints []*corev1.Endpoints expectUpdate []runtime.Object
endpointSlices []*discovery.EndpointSlice initialState []runtime.Object
namespaceParam string namespaceParam string
endpointsParam *corev1.Endpoints endpointsParam *corev1.Endpoints
}{ }{
"single-endpoint": { "single-endpoint": {
endpointSlicesEnabled: true,
expectedError: nil, expectedError: nil,
expectedEndpoints: endpoints1, expectedResult: endpoints1,
expectedEndpointSlice: epSlice1, expectCreate: []runtime.Object{endpoints1, epSlice1},
endpoints: []*corev1.Endpoints{}, initialState: []runtime.Object{},
namespaceParam: endpoints1.Namespace, namespaceParam: endpoints1.Namespace,
endpointsParam: endpoints1, endpointsParam: endpoints1,
}, },
"single-endpoint-partial-ipv6": { "single-endpoint-partial-ipv6": {
endpointSlicesEnabled: true,
expectedError: nil, expectedError: nil,
expectedEndpoints: endpoints2, expectedResult: endpoints2,
expectedEndpointSlice: epSlice2, expectCreate: []runtime.Object{endpoints2, epSlice2},
endpoints: []*corev1.Endpoints{}, initialState: []runtime.Object{},
namespaceParam: endpoints2.Namespace, namespaceParam: endpoints2.Namespace,
endpointsParam: endpoints2, endpointsParam: endpoints2,
}, },
"single-endpoint-full-ipv6": { "single-endpoint-full-ipv6": {
endpointSlicesEnabled: true,
expectedError: nil, expectedError: nil,
expectedEndpoints: endpoints3, expectedResult: endpoints3,
expectedEndpointSlice: epSlice3, expectCreate: []runtime.Object{endpoints3, epSlice3},
endpoints: []*corev1.Endpoints{}, initialState: []runtime.Object{},
namespaceParam: endpoints3.Namespace, namespaceParam: endpoints3.Namespace,
endpointsParam: endpoints3, endpointsParam: endpoints3,
}, },
"single-endpoint-no-slices": { "existing-endpoints": {
endpointSlicesEnabled: false, expectedError: errors.NewAlreadyExists(schema.GroupResource{Group: "", Resource: "endpoints"}, "foo"),
expectedResult: nil,
initialState: []runtime.Object{endpoints1, epSlice1},
namespaceParam: endpoints1.Namespace,
endpointsParam: endpoints1,
// We expect the create to be attempted, we just also expect it to fail
expectCreate: []runtime.Object{endpoints1},
},
"existing-endpointslice-incorrect": {
// No error when we need to create the Endpoints but the correct
// EndpointSlice already exists
expectedError: nil, expectedError: nil,
expectedEndpoints: endpoints1, expectedResult: endpoints1,
expectedEndpointSlice: nil, expectCreate: []runtime.Object{endpoints1},
endpoints: []*corev1.Endpoints{}, initialState: []runtime.Object{epSlice1},
namespaceParam: endpoints1.Namespace, namespaceParam: endpoints1.Namespace,
endpointsParam: endpoints1, endpointsParam: endpoints1,
}, },
"existing-endpoint": { "existing-endpointslice-correct": {
endpointSlicesEnabled: true, // No error when we need to create the Endpoints but an incorrect
expectedError: errors.NewAlreadyExists(schema.GroupResource{Group: "", Resource: "endpoints"}, "foo"), // EndpointSlice already exists
expectedEndpoints: nil, expectedError: nil,
expectedEndpointSlice: nil, expectedResult: endpoints2,
endpoints: []*corev1.Endpoints{endpoints1}, expectCreate: []runtime.Object{endpoints2},
namespaceParam: endpoints1.Namespace, expectUpdate: []runtime.Object{epSlice2},
endpointsParam: endpoints1, initialState: []runtime.Object{epSlice1},
namespaceParam: endpoints2.Namespace,
endpointsParam: endpoints2,
}, },
} }
for name, testCase := range testCases { for name, testCase := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset(testCase.initialState...)
epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()} epAdapter := NewEndpointsAdapter(client.CoreV1(), client.DiscoveryV1())
if testCase.endpointSlicesEnabled {
epAdapter.endpointSliceClient = client.DiscoveryV1()
}
for _, endpoints := range testCase.endpoints {
_, err := client.CoreV1().Endpoints(endpoints.Namespace).Create(context.TODO(), endpoints, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating Endpoints: %v", err)
}
}
endpoints, err := epAdapter.Create(testCase.namespaceParam, testCase.endpointsParam) endpoints, err := epAdapter.Create(testCase.namespaceParam, testCase.endpointsParam)
@ -194,37 +186,20 @@ func TestEndpointsAdapterCreate(t *testing.T) {
t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err) t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err)
} }
if !apiequality.Semantic.DeepEqual(endpoints, testCase.expectedEndpoints) { if !apiequality.Semantic.DeepEqual(endpoints, testCase.expectedResult) {
t.Errorf("Expected endpoints: %v, got: %v", testCase.expectedEndpoints, endpoints) t.Errorf("Expected endpoints: %v, got: %v", testCase.expectedResult, endpoints)
} }
epSliceList, err := client.DiscoveryV1().EndpointSlices(testCase.namespaceParam).List(context.TODO(), metav1.ListOptions{}) err = verifyCreatesAndUpdates(client, testCase.expectCreate, testCase.expectUpdate)
if err != nil { if err != nil {
t.Fatalf("Error listing Endpoint Slices: %v", err) t.Errorf("unexpected error in side effects: %v", err)
}
if testCase.expectedEndpointSlice == nil {
if len(epSliceList.Items) != 0 {
t.Fatalf("Expected no Endpoint Slices, got: %v", epSliceList.Items)
}
} else {
if len(epSliceList.Items) == 0 {
t.Fatalf("No Endpoint Slices found, expected: %v", testCase.expectedEndpointSlice)
}
if len(epSliceList.Items) > 1 {
t.Errorf("Only 1 Endpoint Slice expected, got: %v", testCase.expectedEndpointSlice)
}
if !apiequality.Semantic.DeepEqual(*testCase.expectedEndpointSlice, epSliceList.Items[0]) {
t.Errorf("Expected Endpoint Slice: %v, got: %v", testCase.expectedEndpointSlice, epSliceList.Items[0])
}
} }
}) })
} }
} }
func TestEndpointsAdapterUpdate(t *testing.T) { func TestEndpointsAdapterUpdate(t *testing.T) {
endpoints1, _ := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.3", "10.1.2.4"}) endpoints1, epSlice1 := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.3", "10.1.2.4"})
endpoints2, epSlice2 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"}) endpoints2, epSlice2 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"})
endpoints3, _ := generateEndpointsAndSlice("bar", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"}) endpoints3, _ := generateEndpointsAndSlice("bar", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"})
@ -237,68 +212,96 @@ func TestEndpointsAdapterUpdate(t *testing.T) {
_, epSlice4IPv4 := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.7", "10.1.2.8"}) _, epSlice4IPv4 := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.7", "10.1.2.8"})
testCases := map[string]struct { testCases := map[string]struct {
endpointSlicesEnabled bool
expectedError error expectedError error
expectedEndpoints *corev1.Endpoints expectedResult *corev1.Endpoints
expectedEndpointSlice *discovery.EndpointSlice expectCreate []runtime.Object
endpoints []*corev1.Endpoints expectUpdate []runtime.Object
endpointSlices []*discovery.EndpointSlice initialState []runtime.Object
namespaceParam string namespaceParam string
endpointsParam *corev1.Endpoints endpointsParam *corev1.Endpoints
}{ }{
"single-existing-endpoints-no-change": { "single-existing-endpoints-no-change": {
endpointSlicesEnabled: false,
expectedError: nil, expectedError: nil,
expectedEndpoints: endpoints1, expectedResult: endpoints1,
expectedEndpointSlice: nil, initialState: []runtime.Object{endpoints1, epSlice1},
endpoints: []*corev1.Endpoints{endpoints1},
namespaceParam: "testing", namespaceParam: "testing",
endpointsParam: endpoints1, endpointsParam: endpoints1,
// Even though there's no change, we still expect Update() to be
// called, because this unit test ALWAYS calls Update().
expectUpdate: []runtime.Object{endpoints1},
}, },
"existing-endpointslice-replaced-with-updated-ipv4-address-type": { "existing-endpointslice-replaced-with-updated-ipv4-address-type": {
endpointSlicesEnabled: true,
expectedError: nil, expectedError: nil,
expectedEndpoints: endpoints4, expectedResult: endpoints4,
expectedEndpointSlice: epSlice4IPv4, initialState: []runtime.Object{endpoints4, epSlice4IP},
endpoints: []*corev1.Endpoints{endpoints4},
endpointSlices: []*discovery.EndpointSlice{epSlice4IP},
namespaceParam: "testing", namespaceParam: "testing",
endpointsParam: endpoints4, endpointsParam: endpoints4,
// When AddressType changes, we Delete+Create the EndpointSlice,
// so that shows up in expectCreate, not expectUpdate.
expectUpdate: []runtime.Object{endpoints4},
expectCreate: []runtime.Object{epSlice4IPv4},
}, },
"add-ports-and-ips": { "add-ports-and-ips": {
endpointSlicesEnabled: true,
expectedError: nil, expectedError: nil,
expectedEndpoints: endpoints2, expectedResult: endpoints2,
expectedEndpointSlice: epSlice2, expectUpdate: []runtime.Object{endpoints2, epSlice2},
endpoints: []*corev1.Endpoints{endpoints1}, initialState: []runtime.Object{endpoints1, epSlice1},
namespaceParam: "testing", namespaceParam: "testing",
endpointsParam: endpoints2, endpointsParam: endpoints2,
}, },
"missing-endpoints": { "endpoints-correct-endpointslice-wrong": {
endpointSlicesEnabled: true, expectedError: nil,
expectedResult: endpoints2,
expectUpdate: []runtime.Object{endpoints2, epSlice2},
initialState: []runtime.Object{endpoints2, epSlice1},
namespaceParam: "testing",
endpointsParam: endpoints2,
},
"endpointslice-correct-endpoints-wrong": {
expectedError: nil,
expectedResult: endpoints2,
expectUpdate: []runtime.Object{endpoints2},
initialState: []runtime.Object{endpoints1, epSlice2},
namespaceParam: "testing",
endpointsParam: endpoints2,
},
"wrong-endpoints": {
expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "bar"), expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "bar"),
expectedEndpoints: nil, expectedResult: nil,
expectedEndpointSlice: nil, expectUpdate: []runtime.Object{endpoints3},
endpoints: []*corev1.Endpoints{endpoints1}, initialState: []runtime.Object{endpoints1, epSlice1},
namespaceParam: "testing", namespaceParam: "testing",
endpointsParam: endpoints3, endpointsParam: endpoints3,
}, },
"missing-endpoints": {
expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "bar"),
expectedResult: nil,
initialState: []runtime.Object{endpoints1, epSlice1},
namespaceParam: "testing",
endpointsParam: endpoints3,
// We expect the update to be attempted, we just also expect it to fail
expectUpdate: []runtime.Object{endpoints3},
},
"missing-endpointslice": {
// No error when we need to update the Endpoints but the
// EndpointSlice doesn't exist
expectedError: nil,
expectedResult: endpoints1,
expectUpdate: []runtime.Object{endpoints1},
expectCreate: []runtime.Object{epSlice1},
initialState: []runtime.Object{endpoints2},
namespaceParam: "testing",
endpointsParam: endpoints1,
},
} }
for name, testCase := range testCases { for name, testCase := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset(testCase.initialState...)
epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()} epAdapter := NewEndpointsAdapter(client.CoreV1(), client.DiscoveryV1())
if testCase.endpointSlicesEnabled {
epAdapter.endpointSliceClient = client.DiscoveryV1()
}
for _, endpoints := range testCase.endpoints {
_, err := client.CoreV1().Endpoints(endpoints.Namespace).Create(context.TODO(), endpoints, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating Endpoints: %v", err)
}
}
endpoints, err := epAdapter.Update(testCase.namespaceParam, testCase.endpointsParam) endpoints, err := epAdapter.Update(testCase.namespaceParam, testCase.endpointsParam)
@ -306,40 +309,25 @@ func TestEndpointsAdapterUpdate(t *testing.T) {
t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err) t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err)
} }
if !apiequality.Semantic.DeepEqual(endpoints, testCase.expectedEndpoints) { if !apiequality.Semantic.DeepEqual(endpoints, testCase.expectedResult) {
t.Errorf("Expected endpoints: %v, got: %v", testCase.expectedEndpoints, endpoints) t.Errorf("Expected endpoints: %v, got: %v", testCase.expectedResult, endpoints)
} }
epSliceList, err := client.DiscoveryV1().EndpointSlices(testCase.namespaceParam).List(context.TODO(), metav1.ListOptions{}) err = verifyCreatesAndUpdates(client, testCase.expectCreate, testCase.expectUpdate)
if err != nil { if err != nil {
t.Fatalf("Error listing Endpoint Slices: %v", err) t.Errorf("unexpected error in side effects: %v", err)
}
if testCase.expectedEndpointSlice == nil {
if len(epSliceList.Items) != 0 {
t.Fatalf("Expected no Endpoint Slices, got: %v", epSliceList.Items)
}
} else {
if len(epSliceList.Items) == 0 {
t.Fatalf("No Endpoint Slices found, expected: %v", testCase.expectedEndpointSlice)
}
if len(epSliceList.Items) > 1 {
t.Errorf("Only 1 Endpoint Slice expected, got: %v", testCase.expectedEndpointSlice)
}
if !apiequality.Semantic.DeepEqual(*testCase.expectedEndpointSlice, epSliceList.Items[0]) {
t.Errorf("Expected Endpoint Slice: %v, got: %v", testCase.expectedEndpointSlice, epSliceList.Items[0])
}
} }
}) })
} }
} }
func generateEndpointsAndSlice(name, namespace string, ports []int, addresses []string) (*corev1.Endpoints, *discovery.EndpointSlice) { func generateEndpointsAndSlice(name, namespace string, ports []int, addresses []string) (*corev1.Endpoints, *discovery.EndpointSlice) {
objectMeta := metav1.ObjectMeta{Name: name, Namespace: namespace}
trueBool := true trueBool := true
epSlice := &discovery.EndpointSlice{ObjectMeta: objectMeta, AddressType: discovery.AddressTypeIPv4} epSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
AddressType: discovery.AddressTypeIPv4,
}
epSlice.Labels = map[string]string{discovery.LabelServiceName: name} epSlice.Labels = map[string]string{discovery.LabelServiceName: name}
subset := corev1.EndpointSubset{} subset := corev1.EndpointSubset{}
@ -376,52 +364,46 @@ func generateEndpointsAndSlice(name, namespace string, ports []int, addresses []
} }
return &corev1.Endpoints{ return &corev1.Endpoints{
ObjectMeta: objectMeta, ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
discovery.LabelSkipMirror: "true",
},
},
Subsets: []corev1.EndpointSubset{subset}, Subsets: []corev1.EndpointSubset{subset},
}, epSlice }, epSlice
} }
func TestEndpointsAdapterEnsureEndpointSliceFromEndpoints(t *testing.T) { func TestEndpointManagerEnsureEndpointSliceFromEndpoints(t *testing.T) {
endpoints1, epSlice1 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4"}) endpoints1, epSlice1 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4"})
endpoints2, epSlice2 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"}) endpoints2, epSlice2 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"})
testCases := map[string]struct { testCases := map[string]struct {
endpointSlicesEnabled bool
expectedError error expectedError error
expectedEndpointSlice *discovery.EndpointSlice expectedEndpointSlice *discovery.EndpointSlice
endpointSlices []*discovery.EndpointSlice initialState []runtime.Object
namespaceParam string namespaceParam string
endpointsParam *corev1.Endpoints endpointsParam *corev1.Endpoints
}{ }{
"existing-endpointslice-no-change": { "existing-endpointslice-no-change": {
endpointSlicesEnabled: true,
expectedError: nil, expectedError: nil,
expectedEndpointSlice: epSlice1, expectedEndpointSlice: epSlice1,
endpointSlices: []*discovery.EndpointSlice{epSlice1}, initialState: []runtime.Object{epSlice1},
namespaceParam: "testing", namespaceParam: "testing",
endpointsParam: endpoints1, endpointsParam: endpoints1,
}, },
"existing-endpointslice-change": { "existing-endpointslice-change": {
endpointSlicesEnabled: true,
expectedError: nil, expectedError: nil,
expectedEndpointSlice: epSlice2, expectedEndpointSlice: epSlice2,
endpointSlices: []*discovery.EndpointSlice{epSlice1}, initialState: []runtime.Object{epSlice1},
namespaceParam: "testing", namespaceParam: "testing",
endpointsParam: endpoints2, endpointsParam: endpoints2,
}, },
"missing-endpointslice": { "missing-endpointslice": {
endpointSlicesEnabled: true,
expectedError: nil, expectedError: nil,
expectedEndpointSlice: epSlice1, expectedEndpointSlice: epSlice1,
endpointSlices: []*discovery.EndpointSlice{}, initialState: []runtime.Object{},
namespaceParam: "testing",
endpointsParam: endpoints1,
},
"endpointslices-disabled": {
endpointSlicesEnabled: false,
expectedError: nil,
expectedEndpointSlice: nil,
endpointSlices: []*discovery.EndpointSlice{},
namespaceParam: "testing", namespaceParam: "testing",
endpointsParam: endpoints1, endpointsParam: endpoints1,
}, },
@ -429,18 +411,8 @@ func TestEndpointsAdapterEnsureEndpointSliceFromEndpoints(t *testing.T) {
for name, testCase := range testCases { for name, testCase := range testCases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
client := fake.NewSimpleClientset() client := fake.NewSimpleClientset(testCase.initialState...)
epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()} epAdapter := NewEndpointsAdapter(client.CoreV1(), client.DiscoveryV1())
if testCase.endpointSlicesEnabled {
epAdapter.endpointSliceClient = client.DiscoveryV1()
}
for _, endpointSlice := range testCase.endpointSlices {
_, err := client.DiscoveryV1().EndpointSlices(endpointSlice.Namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating EndpointSlice: %v", err)
}
}
err := epAdapter.EnsureEndpointSliceFromEndpoints(testCase.namespaceParam, testCase.endpointsParam) err := epAdapter.EnsureEndpointSliceFromEndpoints(testCase.namespaceParam, testCase.endpointsParam)
if !apiequality.Semantic.DeepEqual(testCase.expectedError, err) { if !apiequality.Semantic.DeepEqual(testCase.expectedError, err) {

View File

@ -0,0 +1,133 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconcilers
import (
"fmt"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
)
func makeEndpointsArray(name string, ips []string, ports []corev1.EndpointPort) []runtime.Object {
return []runtime.Object{
makeEndpoints(name, ips, ports),
makeEndpointSlice(name, ips, ports),
}
}
func makeEndpoints(name string, ips []string, ports []corev1.EndpointPort) *corev1.Endpoints {
endpoints := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: name,
Labels: map[string]string{
discoveryv1.LabelSkipMirror: "true",
},
},
}
if len(ips) > 0 || len(ports) > 0 {
endpoints.Subsets = []corev1.EndpointSubset{{
Addresses: make([]corev1.EndpointAddress, len(ips)),
Ports: ports,
}}
for i := range ips {
endpoints.Subsets[0].Addresses[i].IP = ips[i]
}
}
return endpoints
}
func makeEndpointSlice(name string, ips []string, ports []corev1.EndpointPort) *discoveryv1.EndpointSlice {
slice := &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: name,
Labels: map[string]string{
discoveryv1.LabelServiceName: name,
},
},
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: make([]discoveryv1.Endpoint, len(ips)),
Ports: make([]discoveryv1.EndpointPort, len(ports)),
}
ready := true
for i := range ips {
slice.Endpoints[i].Addresses = []string{ips[i]}
slice.Endpoints[i].Conditions.Ready = &ready
}
for i := range ports {
slice.Ports[i].Name = &ports[i].Name
slice.Ports[i].Protocol = &ports[i].Protocol
slice.Ports[i].Port = &ports[i].Port
}
return slice
}
func verifyCreatesAndUpdates(fakeClient *fake.Clientset, expectedCreates, expectedUpdates []runtime.Object) error {
errors := []error{}
updates := []k8stesting.UpdateAction{}
creates := []k8stesting.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() == "update" {
updates = append(updates, action.(k8stesting.UpdateAction))
} else if action.GetVerb() == "create" {
creates = append(creates, action.(k8stesting.CreateAction))
}
}
if len(creates) != len(expectedCreates) {
errors = append(errors, fmt.Errorf("expected %d creates got %d", len(expectedCreates), len(creates)))
}
for i := 0; i < len(creates) || i < len(expectedCreates); i++ {
var expected, actual runtime.Object
if i < len(creates) {
actual = creates[i].GetObject()
}
if i < len(expectedCreates) {
expected = expectedCreates[i]
}
if !apiequality.Semantic.DeepEqual(expected, actual) {
errors = append(errors, fmt.Errorf("expected create %d to be:\n%#v\ngot:\n%#v\n", i, expected, actual))
}
}
if len(updates) != len(expectedUpdates) {
errors = append(errors, fmt.Errorf("expected %d updates got %d", len(expectedUpdates), len(updates)))
}
for i := 0; i < len(updates) || i < len(expectedUpdates); i++ {
var expected, actual runtime.Object
if i < len(updates) {
actual = updates[i].GetObject()
}
if i < len(expectedUpdates) {
expected = expectedUpdates[i]
}
if !apiequality.Semantic.DeepEqual(expected, actual) {
errors = append(errors, fmt.Errorf("expected update %d to be:\n%#v\ngot:\n%#v\n", i, expected, actual))
}
}
return utilerrors.NewAggregate(errors)
}

View File

@ -17,65 +17,88 @@ limitations under the License.
package reconcilers package reconcilers
import ( import (
"reflect"
"testing" "testing"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
) )
func TestMasterCountEndpointReconciler(t *testing.T) { func TestMasterCountEndpointReconciler(t *testing.T) {
ns := metav1.NamespaceDefault
om := func(name string, skipMirrorLabel bool) metav1.ObjectMeta {
o := metav1.ObjectMeta{Namespace: ns, Name: name}
if skipMirrorLabel {
o.Labels = map[string]string{
discoveryv1.LabelSkipMirror: "true",
}
}
return o
}
reconcileTests := []struct { reconcileTests := []struct {
testName string testName string
serviceName string serviceName string
ip string ip string
endpointPorts []corev1.EndpointPort endpointPorts []corev1.EndpointPort
additionalMasters int additionalMasters int
endpoints *corev1.EndpointsList initialState []runtime.Object
expectUpdate *corev1.Endpoints // nil means none expected expectUpdate []runtime.Object
expectCreate *corev1.Endpoints // nil means none expected expectCreate []runtime.Object
}{ }{
{ {
testName: "no existing endpoints", testName: "no existing endpoints",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil, initialState: nil,
expectCreate: &corev1.Endpoints{ expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints satisfy", testName: "existing endpoints satisfy",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ },
ObjectMeta: om("foo", true), {
Subsets: []corev1.EndpointSubset{{ testName: "existing endpoints satisfy, no endpointslice",
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, serviceName: "foo",
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, ip: "1.2.3.4",
}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}}, initialState: []runtime.Object{
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
expectCreate: []runtime.Object{
makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
},
{
testName: "existing endpointslice satisfies, no endpoints",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: []runtime.Object{
makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
expectCreate: []runtime.Object{
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
},
{
testName: "existing endpoints satisfy, endpointslice is wrong",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: []runtime.Object{
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
makeEndpointSlice("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
expectUpdate: []runtime.Object{
makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
},
{
testName: "existing endpointslice satisfies, endpoints is wrong",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: []runtime.Object{
makeEndpoints("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
expectUpdate: []runtime.Object{
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
}, },
}, },
{ {
@ -83,22 +106,8 @@ func TestMasterCountEndpointReconciler(t *testing.T) {
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints satisfy but too many + extra masters", testName: "existing endpoints satisfy but too many + extra masters",
@ -106,33 +115,8 @@ func TestMasterCountEndpointReconciler(t *testing.T) {
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
additionalMasters: 3, additionalMasters: 3,
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints satisfy but too many + extra masters + delete first", testName: "existing endpoints satisfy but too many + extra masters + delete first",
@ -140,33 +124,8 @@ func TestMasterCountEndpointReconciler(t *testing.T) {
ip: "4.3.2.4", ip: "4.3.2.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
additionalMasters: 3, additionalMasters: 3,
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints satisfy and endpoint addresses length less than master count", testName: "existing endpoints satisfy and endpoint addresses length less than master count",
@ -174,18 +133,7 @@ func TestMasterCountEndpointReconciler(t *testing.T) {
ip: "4.3.2.2", ip: "4.3.2.2",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
additionalMasters: 3, additionalMasters: 3,
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: nil, expectUpdate: nil,
}, },
{ {
@ -194,137 +142,48 @@ func TestMasterCountEndpointReconciler(t *testing.T) {
ip: "4.3.2.2", ip: "4.3.2.2",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
additionalMasters: 3, additionalMasters: 3,
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.1"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints wrong name", testName: "existing endpoints wrong name",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("bar", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("bar", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectCreate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints wrong IP", testName: "existing endpoints wrong IP",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints wrong port", testName: "existing endpoints wrong port",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints wrong protocol", testName: "existing endpoints wrong protocol",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints wrong port name", testName: "existing endpoints wrong port name",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints extra service ports satisfy", testName: "existing endpoints extra service ports satisfy",
@ -335,19 +194,13 @@ func TestMasterCountEndpointReconciler(t *testing.T) {
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
{Name: "baz", Port: 1010, Protocol: "TCP"}, {Name: "baz", Port: 1010, Protocol: "TCP"},
}, },
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"},
Items: []corev1.Endpoints{{ []corev1.EndpointPort{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
{Name: "baz", Port: 1010, Protocol: "TCP"}, {Name: "baz", Port: 1010, Protocol: "TCP"},
}, },
}}, ),
}},
},
}, },
{ {
testName: "existing endpoints extra service ports missing port", testName: "existing endpoints extra service ports missing port",
@ -357,89 +210,38 @@ func TestMasterCountEndpointReconciler(t *testing.T) {
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
}, },
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"},
ObjectMeta: om("foo", true), []corev1.EndpointPort{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
}, },
}}, ),
},
}, },
{ {
testName: "no existing sctp endpoints", testName: "no existing sctp endpoints",
serviceName: "boo", serviceName: "boo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "boo", Port: 7777, Protocol: "SCTP"}}, endpointPorts: []corev1.EndpointPort{{Name: "boo", Port: 7777, Protocol: "SCTP"}},
endpoints: nil, initialState: nil,
expectCreate: &corev1.Endpoints{ expectCreate: makeEndpointsArray("boo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "boo", Port: 7777, Protocol: "SCTP"}}),
ObjectMeta: om("boo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "boo", Port: 7777, Protocol: "SCTP"}},
}},
},
}, },
} }
for _, test := range reconcileTests { for _, test := range reconcileTests {
fakeClient := fake.NewSimpleClientset() t.Run(test.testName, func(t *testing.T) {
if test.endpoints != nil { fakeClient := fake.NewSimpleClientset(test.initialState...)
fakeClient = fake.NewSimpleClientset(test.endpoints) epAdapter := NewEndpointsAdapter(fakeClient.CoreV1(), fakeClient.DiscoveryV1())
}
epAdapter := NewEndpointsAdapter(fakeClient.CoreV1(), nil)
reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, epAdapter) reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, epAdapter)
err := reconciler.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, true) err := reconciler.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, true)
if err != nil { if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err) t.Errorf("unexpected error reconciling: %v", err)
} }
updates := []core.UpdateAction{} err = verifyCreatesAndUpdates(fakeClient, test.expectCreate, test.expectUpdate)
for _, action := range fakeClient.Actions() { if err != nil {
if action.GetVerb() != "update" { t.Errorf("unexpected error in side effects: %v", err)
continue
} }
updates = append(updates, action.(core.UpdateAction)) })
}
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else if e, a := test.expectUpdate, updates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() != "create" {
continue
}
creates = append(creates, action.(core.CreateAction))
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creates: %v", test.testName, creates)
} else if e, a := test.expectCreate, creates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectCreate == nil && len(creates) > 0 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
} }
nonReconcileTests := []struct { nonReconcileTests := []struct {
@ -448,9 +250,9 @@ func TestMasterCountEndpointReconciler(t *testing.T) {
ip string ip string
endpointPorts []corev1.EndpointPort endpointPorts []corev1.EndpointPort
additionalMasters int additionalMasters int
endpoints *corev1.EndpointsList initialState []runtime.Object
expectUpdate *corev1.Endpoints // nil means none expected expectUpdate []runtime.Object
expectCreate *corev1.Endpoints // nil means none expected expectCreate []runtime.Object
}{ }{
{ {
testName: "existing endpoints extra service ports missing port no update", testName: "existing endpoints extra service ports missing port no update",
@ -460,15 +262,7 @@ func TestMasterCountEndpointReconciler(t *testing.T) {
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
}, },
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: nil, expectUpdate: nil,
}, },
{ {
@ -479,106 +273,40 @@ func TestMasterCountEndpointReconciler(t *testing.T) {
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
}, },
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "no existing endpoints", testName: "no existing endpoints",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil, initialState: nil,
expectCreate: &corev1.Endpoints{ expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
} }
for _, test := range nonReconcileTests { for _, test := range nonReconcileTests {
fakeClient := fake.NewSimpleClientset() t.Run(test.testName, func(t *testing.T) {
if test.endpoints != nil { fakeClient := fake.NewSimpleClientset(test.initialState...)
fakeClient = fake.NewSimpleClientset(test.endpoints) epAdapter := NewEndpointsAdapter(fakeClient.CoreV1(), fakeClient.DiscoveryV1())
}
epAdapter := NewEndpointsAdapter(fakeClient.CoreV1(), nil)
reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, epAdapter) reconciler := NewMasterCountEndpointReconciler(test.additionalMasters+1, epAdapter)
err := reconciler.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false) err := reconciler.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
if err != nil { if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err) t.Errorf("unexpected error reconciling: %v", err)
} }
updates := []core.UpdateAction{} err = verifyCreatesAndUpdates(fakeClient, test.expectCreate, test.expectUpdate)
for _, action := range fakeClient.Actions() { if err != nil {
if action.GetVerb() != "update" { t.Errorf("unexpected error in side effects: %v", err)
continue
} }
updates = append(updates, action.(core.UpdateAction)) })
} }
if test.expectUpdate != nil {
if len(updates) != 1 {
t.Errorf("case %q: unexpected updates: %v", test.testName, updates)
} else if e, a := test.expectUpdate, updates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectUpdate == nil && len(updates) > 0 {
t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates)
}
creates := []core.CreateAction{}
for _, action := range fakeClient.Actions() {
if action.GetVerb() != "create" {
continue
}
creates = append(creates, action.(core.CreateAction))
}
if test.expectCreate != nil {
if len(creates) != 1 {
t.Errorf("case %q: unexpected creates: %v", test.testName, creates)
} else if e, a := test.expectCreate, creates[0].GetObject(); !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
}
if test.expectCreate == nil && len(creates) > 0 {
t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates)
}
}
} }
func TestEmptySubsets(t *testing.T) { func TestEmptySubsets(t *testing.T) {
ns := metav1.NamespaceDefault endpoints := makeEndpointsArray("foo", nil, nil)
om := func(name string) metav1.ObjectMeta { fakeClient := fake.NewSimpleClientset(endpoints...)
return metav1.ObjectMeta{Namespace: ns, Name: name} epAdapter := NewEndpointsAdapter(fakeClient.CoreV1(), fakeClient.DiscoveryV1())
}
endpoints := &corev1.EndpointsList{
Items: []corev1.Endpoints{{
ObjectMeta: om("foo"),
Subsets: nil,
}},
}
fakeClient := fake.NewSimpleClientset()
if endpoints != nil {
fakeClient = fake.NewSimpleClientset(endpoints)
}
epAdapter := NewEndpointsAdapter(fakeClient.CoreV1(), nil)
reconciler := NewMasterCountEndpointReconciler(1, epAdapter) reconciler := NewMasterCountEndpointReconciler(1, epAdapter)
endpointPorts := []corev1.EndpointPort{ endpointPorts := []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},

View File

@ -22,13 +22,11 @@ https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c
*/ */
import ( import (
"context"
"reflect"
"testing" "testing"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
) )
@ -81,52 +79,79 @@ func (f *fakeLeases) Destroy() {
} }
func TestLeaseEndpointReconciler(t *testing.T) { func TestLeaseEndpointReconciler(t *testing.T) {
ns := corev1.NamespaceDefault
om := func(name string, skipMirrorLabel bool) metav1.ObjectMeta {
o := metav1.ObjectMeta{Namespace: ns, Name: name}
if skipMirrorLabel {
o.Labels = map[string]string{
discoveryv1.LabelSkipMirror: "true",
}
}
return o
}
reconcileTests := []struct { reconcileTests := []struct {
testName string testName string
serviceName string serviceName string
ip string ip string
endpointPorts []corev1.EndpointPort endpointPorts []corev1.EndpointPort
endpointKeys []string endpointKeys []string
endpoints *corev1.EndpointsList initialState []runtime.Object
expectUpdate *corev1.Endpoints // nil means none expected expectUpdate []runtime.Object
expectCreate []runtime.Object
}{ }{
{ {
testName: "no existing endpoints", testName: "no existing endpoints",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil, initialState: nil,
expectUpdate: &corev1.Endpoints{ expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints satisfy", testName: "existing endpoints satisfy",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ },
ObjectMeta: om("foo", true), {
Subsets: []corev1.EndpointSubset{{ testName: "existing endpoints satisfy, no endpointslice",
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, serviceName: "foo",
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, ip: "1.2.3.4",
}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}}, initialState: []runtime.Object{
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
expectCreate: []runtime.Object{
makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
},
{
testName: "existing endpointslice satisfies, no endpoints",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: []runtime.Object{
makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
expectCreate: []runtime.Object{
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
},
{
testName: "existing endpoints satisfy, endpointslice is wrong",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: []runtime.Object{
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
makeEndpointSlice("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
expectUpdate: []runtime.Object{
makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
},
{
testName: "existing endpointslice satisfies, endpoints is wrong",
serviceName: "foo",
ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
initialState: []runtime.Object{
makeEndpoints("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
},
expectUpdate: []runtime.Object{
makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
}, },
}, },
{ {
@ -135,37 +160,15 @@ func TestLeaseEndpointReconciler(t *testing.T) {
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4"}, endpointKeys: []string{"1.2.3.4"},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
}, },
{ {
testName: "existing endpoints satisfy but too many", testName: "existing endpoints satisfy but too many",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "4.3.2.1"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints satisfy but too many + extra masters", testName: "existing endpoints satisfy but too many + extra masters",
@ -173,33 +176,8 @@ func TestLeaseEndpointReconciler(t *testing.T) {
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints satisfy but too many + extra masters + delete first", testName: "existing endpoints satisfy but too many + extra masters + delete first",
@ -207,33 +185,8 @@ func TestLeaseEndpointReconciler(t *testing.T) {
ip: "4.3.2.4", ip: "4.3.2.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, endpointKeys: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints current IP missing", testName: "existing endpoints current IP missing",
@ -241,158 +194,72 @@ func TestLeaseEndpointReconciler(t *testing.T) {
ip: "4.3.2.2", ip: "4.3.2.2",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"4.3.2.1"}, endpointKeys: []string{"4.3.2.1"},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.1"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.1"},
{IP: "4.3.2.2"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints wrong name", testName: "existing endpoints wrong name",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("bar", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("bar", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints wrong IP", testName: "existing endpoints wrong IP",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints wrong port", testName: "existing endpoints wrong port",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints wrong protocol", testName: "existing endpoints wrong protocol",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints wrong port name", testName: "existing endpoints wrong port name",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "existing endpoints without skip mirror label", testName: "existing endpoints without skip mirror label",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: &corev1.EndpointsList{ initialState: []runtime.Object{
Items: []corev1.Endpoints{{ // can't use makeEndpointsArray() here because we don't want the
ObjectMeta: om("foo", false), // skip-mirror label
&corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Namespace: metav1.NamespaceDefault,
Name: "foo",
},
Subsets: []corev1.EndpointSubset{{ Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}}, }},
}},
}, },
expectUpdate: &corev1.Endpoints{ makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true), },
Subsets: []corev1.EndpointSubset{{ expectUpdate: []runtime.Object{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}}, makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, // EndpointSlice does not get updated because it was already correct
}},
}, },
}, },
{ {
@ -404,19 +271,13 @@ func TestLeaseEndpointReconciler(t *testing.T) {
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
{Name: "baz", Port: 1010, Protocol: "TCP"}, {Name: "baz", Port: 1010, Protocol: "TCP"},
}, },
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"},
Items: []corev1.Endpoints{{ []corev1.EndpointPort{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
{Name: "baz", Port: 1010, Protocol: "TCP"}, {Name: "baz", Port: 1010, Protocol: "TCP"},
}, },
}}, ),
}},
},
}, },
{ {
testName: "existing endpoints extra service ports missing port", testName: "existing endpoints extra service ports missing port",
@ -426,58 +287,37 @@ func TestLeaseEndpointReconciler(t *testing.T) {
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
}, },
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"},
ObjectMeta: om("foo", true), []corev1.EndpointPort{
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
}, },
}}, ),
},
}, },
} }
for _, test := range reconcileTests { for _, test := range reconcileTests {
t.Run(test.testName, func(t *testing.T) {
fakeLeases := newFakeLeases() fakeLeases := newFakeLeases()
fakeLeases.SetKeys(test.endpointKeys) fakeLeases.SetKeys(test.endpointKeys)
clientset := fake.NewSimpleClientset() clientset := fake.NewSimpleClientset(test.initialState...)
if test.endpoints != nil {
for _, ep := range test.endpoints.Items {
if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(context.TODO(), &ep, metav1.CreateOptions{}); err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
continue
}
}
}
epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()} epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
err := r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, true) err := r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, true)
if err != nil { if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err) t.Errorf("unexpected error reconciling: %v", err)
} }
actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(context.TODO(), test.serviceName, metav1.GetOptions{})
err = verifyCreatesAndUpdates(clientset, test.expectCreate, test.expectUpdate)
if err != nil { if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err) t.Errorf("unexpected error in side effects: %v", err)
}
if test.expectUpdate != nil {
if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
} }
if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip { if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip {
t.Errorf("case %q: expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", test.testName, updatedKeys) t.Errorf("expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", updatedKeys)
} }
})
} }
nonReconcileTests := []struct { nonReconcileTests := []struct {
@ -486,8 +326,9 @@ func TestLeaseEndpointReconciler(t *testing.T) {
ip string ip string
endpointPorts []corev1.EndpointPort endpointPorts []corev1.EndpointPort
endpointKeys []string endpointKeys []string
endpoints *corev1.EndpointsList initialState []runtime.Object
expectUpdate *corev1.Endpoints // nil means none expected expectUpdate []runtime.Object
expectCreate []runtime.Object
}{ }{
{ {
testName: "existing endpoints extra service ports missing port no update", testName: "existing endpoints extra service ports missing port no update",
@ -497,15 +338,7 @@ func TestLeaseEndpointReconciler(t *testing.T) {
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
}, },
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: nil, expectUpdate: nil,
}, },
{ {
@ -516,92 +349,51 @@ func TestLeaseEndpointReconciler(t *testing.T) {
{Name: "foo", Port: 8080, Protocol: "TCP"}, {Name: "foo", Port: 8080, Protocol: "TCP"},
{Name: "bar", Port: 1000, Protocol: "TCP"}, {Name: "bar", Port: 1000, Protocol: "TCP"},
}, },
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "4.3.2.1"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "no existing endpoints", testName: "no existing endpoints",
serviceName: "foo", serviceName: "foo",
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpoints: nil, initialState: nil,
expectUpdate: &corev1.Endpoints{ expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
} }
for _, test := range nonReconcileTests { for _, test := range nonReconcileTests {
t.Run(test.testName, func(t *testing.T) { t.Run(test.testName, func(t *testing.T) {
fakeLeases := newFakeLeases() fakeLeases := newFakeLeases()
fakeLeases.SetKeys(test.endpointKeys) fakeLeases.SetKeys(test.endpointKeys)
clientset := fake.NewSimpleClientset() clientset := fake.NewSimpleClientset(test.initialState...)
if test.endpoints != nil { epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
for _, ep := range test.endpoints.Items {
if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(context.TODO(), &ep, metav1.CreateOptions{}); err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
continue
}
}
}
epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()}
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
err := r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false) err := r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
if err != nil { if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err) t.Errorf("unexpected error reconciling: %v", err)
} }
actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(context.TODO(), test.serviceName, metav1.GetOptions{})
err = verifyCreatesAndUpdates(clientset, test.expectCreate, test.expectUpdate)
if err != nil { if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err) t.Errorf("unexpected error in side effects: %v", err)
}
if test.expectUpdate != nil {
if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
} }
if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip { if updatedKeys := fakeLeases.GetUpdatedKeys(); len(updatedKeys) != 1 || updatedKeys[0] != test.ip {
t.Errorf("case %q: expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", test.testName, updatedKeys) t.Errorf("expected the master's IP to be refreshed, but the following IPs were refreshed instead: %v", updatedKeys)
} }
}) })
} }
} }
func TestLeaseRemoveEndpoints(t *testing.T) { func TestLeaseRemoveEndpoints(t *testing.T) {
ns := corev1.NamespaceDefault
om := func(name string, skipMirrorLabel bool) metav1.ObjectMeta {
o := metav1.ObjectMeta{Namespace: ns, Name: name}
if skipMirrorLabel {
o.Labels = map[string]string{
discoveryv1.LabelSkipMirror: "true",
}
}
return o
}
stopTests := []struct { stopTests := []struct {
testName string testName string
serviceName string serviceName string
ip string ip string
endpointPorts []corev1.EndpointPort endpointPorts []corev1.EndpointPort
endpointKeys []string endpointKeys []string
endpoints *corev1.EndpointsList initialState []runtime.Object
expectUpdate *corev1.Endpoints // nil means none expected expectUpdate []runtime.Object
}{ }{
{ {
testName: "successful stop reconciling", testName: "successful stop reconciling",
@ -609,31 +401,8 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
ip: "1.2.3.4", ip: "1.2.3.4",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
expectUpdate: &corev1.Endpoints{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
},
}, },
{ {
testName: "stop reconciling with ip not in endpoint ip list", testName: "stop reconciling with ip not in endpoint ip list",
@ -641,20 +410,7 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
ip: "5.6.7.8", ip: "5.6.7.8",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
Items: []corev1.Endpoints{{
ObjectMeta: om("foo", true),
Subsets: []corev1.EndpointSubset{{
Addresses: []corev1.EndpointAddress{
{IP: "1.2.3.4"},
{IP: "4.3.2.2"},
{IP: "4.3.2.3"},
{IP: "4.3.2.4"},
},
Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
}},
}},
},
}, },
{ {
testName: "endpoint with no subset", testName: "endpoint with no subset",
@ -662,43 +418,30 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
ip: "5.6.7.8", ip: "5.6.7.8",
endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}, endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
endpoints: &corev1.EndpointsList{ initialState: makeEndpointsArray("foo", nil, nil),
Items: []corev1.Endpoints{{ expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
ObjectMeta: om("foo", true),
Subsets: nil,
}},
},
}, },
} }
for _, test := range stopTests { for _, test := range stopTests {
t.Run(test.testName, func(t *testing.T) { t.Run(test.testName, func(t *testing.T) {
fakeLeases := newFakeLeases() fakeLeases := newFakeLeases()
fakeLeases.SetKeys(test.endpointKeys) fakeLeases.SetKeys(test.endpointKeys)
clientset := fake.NewSimpleClientset() clientset := fake.NewSimpleClientset(test.initialState...)
for _, ep := range test.endpoints.Items { epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
if _, err := clientset.CoreV1().Endpoints(ep.Namespace).Create(context.TODO(), &ep, metav1.CreateOptions{}); err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err)
continue
}
}
epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()}
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
err := r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts) err := r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
if err != nil { if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err) t.Errorf("unexpected error reconciling: %v", err)
} }
actualEndpoints, err := clientset.CoreV1().Endpoints(corev1.NamespaceDefault).Get(context.TODO(), test.serviceName, metav1.GetOptions{})
err = verifyCreatesAndUpdates(clientset, nil, test.expectUpdate)
if err != nil { if err != nil {
t.Errorf("case %q: unexpected error: %v", test.testName, err) t.Errorf("unexpected error in side effects: %v", err)
}
if test.expectUpdate != nil {
if e, a := test.expectUpdate, actualEndpoints; !reflect.DeepEqual(e, a) {
t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a)
}
} }
for _, key := range fakeLeases.GetUpdatedKeys() { for _, key := range fakeLeases.GetUpdatedKeys() {
if key == test.ip { if key == test.ip {
t.Errorf("case %q: Found ip %s in leases but shouldn't be there", test.testName, key) t.Errorf("Found ip %s in leases but shouldn't be there", key)
} }
} }
}) })