From d618452a979e49a94c517793c9cac0208d477768 Mon Sep 17 00:00:00 2001 From: Rob Scott Date: Wed, 28 Aug 2019 17:59:21 -0700 Subject: [PATCH] Adding EndpointsAdapter for apiserver EndpointSlice support --- pkg/master/BUILD | 1 + pkg/master/controller_test.go | 6 +- pkg/master/master.go | 20 +- pkg/master/reconcilers/BUILD | 12 +- pkg/master/reconcilers/endpointsadapter.go | 145 ++++++++ .../reconcilers/endpointsadapter_test.go | 337 ++++++++++++++++++ pkg/master/reconcilers/lease.go | 13 +- pkg/master/reconcilers/lease_test.go | 10 +- pkg/master/reconcilers/mastercount.go | 21 +- 9 files changed, 539 insertions(+), 26 deletions(-) create mode 100644 pkg/master/reconcilers/endpointsadapter.go create mode 100644 pkg/master/reconcilers/endpointsadapter_test.go diff --git a/pkg/master/BUILD b/pkg/master/BUILD index 888d8c8c38f..75e7f60eaab 100644 --- a/pkg/master/BUILD +++ b/pkg/master/BUILD @@ -124,6 +124,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/typed/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//vendor/github.com/prometheus/client_golang/prometheus:go_default_library", "//vendor/k8s.io/klog:go_default_library", diff --git a/pkg/master/controller_test.go b/pkg/master/controller_test.go index 682baa00792..4578161b26e 100644 --- a/pkg/master/controller_test.go +++ b/pkg/master/controller_test.go @@ -392,7 +392,8 @@ func TestReconcileEndpoints(t *testing.T) { if test.endpoints != nil { fakeClient = fake.NewSimpleClientset(test.endpoints) } - reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.CoreV1()) + epAdapter := reconcilers.NewEndpointsAdapter(fakeClient.CoreV1(), nil) + reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, epAdapter) err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) @@ -510,7 +511,8 @@ func TestReconcileEndpoints(t *testing.T) { if test.endpoints != nil { fakeClient = fake.NewSimpleClientset(test.endpoints) } - reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.CoreV1()) + epAdapter := reconcilers.NewEndpointsAdapter(fakeClient.CoreV1(), nil) + reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, epAdapter) err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) diff --git a/pkg/master/master.go b/pkg/master/master.go index 2d8489e94e9..54abbfbe6ee 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -70,9 +70,12 @@ import ( "k8s.io/apiserver/pkg/server/healthz" serverstorage "k8s.io/apiserver/pkg/server/storage" storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1alpha1" api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/features" kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options" kubeletclient "k8s.io/kubernetes/pkg/kubelet/client" "k8s.io/kubernetes/pkg/master/reconcilers" @@ -217,7 +220,13 @@ type Master struct { func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler { endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) - return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointClient) + var endpointSliceClient discoveryclient.EndpointSlicesGetter + if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) { + endpointSliceClient = discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) + } + endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient) + + return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointsAdapter) } func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler { @@ -226,6 +235,12 @@ func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler { func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler { endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) + var endpointSliceClient discoveryclient.EndpointSlicesGetter + if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) { + endpointSliceClient = discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) + } + endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient) + ttl := c.ExtraConfig.MasterEndpointReconcileTTL config, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("apiServerIPInfo")) if err != nil { @@ -236,7 +251,8 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler { klog.Fatalf("Error creating storage factory: %v", err) } masterLeases := reconcilers.NewLeases(leaseStorage, "/masterleases/", ttl) - return reconcilers.NewLeaseEndpointReconciler(endpointClient, masterLeases) + + return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases) } func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler { diff --git a/pkg/master/reconcilers/BUILD b/pkg/master/reconcilers/BUILD index cc498604880..acf5f2af17b 100644 --- a/pkg/master/reconcilers/BUILD +++ b/pkg/master/reconcilers/BUILD @@ -4,6 +4,7 @@ go_library( name = "go_default_library", srcs = [ "doc.go", + "endpointsadapter.go", "lease.go", "mastercount.go", "none.go", @@ -14,6 +15,7 @@ go_library( deps = [ "//pkg/api/v1/endpoints:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", @@ -21,6 +23,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/typed/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/client-go/util/retry:go_default_library", "//vendor/k8s.io/klog:go_default_library", ], @@ -28,11 +31,18 @@ go_library( go_test( name = "go_default_test", - srcs = ["lease_test.go"], + srcs = [ + "endpointsadapter_test.go", + "lease_test.go", + ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", ], ) diff --git a/pkg/master/reconcilers/endpointsadapter.go b/pkg/master/reconcilers/endpointsadapter.go new file mode 100644 index 00000000000..5eee2fca15c --- /dev/null +++ b/pkg/master/reconcilers/endpointsadapter.go @@ -0,0 +1,145 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconcilers + +import ( + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1alpha1" +) + +const ( + // serviceNameLabel is used to indicate the name of a Kubernetes service + // associated with an EndpointSlice. + serviceNameLabel = "kubernetes.io/service-name" +) + +// EndpointsAdapter provides a simple interface for reading and writing both +// Endpoints and Endpoint Slices. +// NOTE: This is an incomplete adapter implementation that is only suitable for +// use in this package. This takes advantage of the Endpoints used in this +// package always having a consistent set of ports, a single subset, and a small +// set of addresses. Any more complex Endpoints resource would likely translate +// into multiple Endpoint Slices creating significantly more complexity instead +// of the 1:1 mapping this allows. +type EndpointsAdapter struct { + endpointClient corev1client.EndpointsGetter + endpointSliceClient discoveryclient.EndpointSlicesGetter +} + +// NewEndpointsAdapter returns a new EndpointsAdapter. +func NewEndpointsAdapter(endpointClient corev1client.EndpointsGetter, endpointSliceClient discoveryclient.EndpointSlicesGetter) EndpointsAdapter { + return EndpointsAdapter{ + endpointClient: endpointClient, + endpointSliceClient: endpointSliceClient, + } +} + +// Get takes the name and namespace of the Endpoints resource, and returns a +// corresponding Endpoints object if it exists, and an error if there is any. +func (adapter *EndpointsAdapter) Get(namespace, name string, getOpts metav1.GetOptions) (*corev1.Endpoints, error) { + return adapter.endpointClient.Endpoints(namespace).Get(name, getOpts) +} + +// Create accepts a namespace and Endpoints object and creates the Endpoints +// object. If an endpointSliceClient exists, a matching EndpointSlice will also +// be created or updated. The created Endpoints object or an error will be +// returned. +func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) { + endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(endpoints) + if err == nil && adapter.endpointSliceClient != nil { + _, err = adapter.ensureEndpointSliceFromEndpoints(namespace, endpoints) + } + return endpoints, err +} + +// Update accepts a namespace and Endpoints object and updates it. If an +// endpointSliceClient exists, a matching EndpointSlice will also be created or +// updated. The updated Endpoints object or an error will be returned. +func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) { + endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(endpoints) + if err == nil && adapter.endpointSliceClient != nil { + _, err = adapter.ensureEndpointSliceFromEndpoints(namespace, endpoints) + } + return endpoints, err +} + +// ensureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource +// and creates or updates a corresponding EndpointSlice. The EndpointSlice +// and/or an error will be returned. +func (adapter *EndpointsAdapter) ensureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) (*discovery.EndpointSlice, error) { + endpointSlice := endpointSliceFromEndpoints(endpoints) + _, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(endpointSlice.Name, metav1.GetOptions{}) + + if err != nil { + if errors.IsNotFound(err) { + return adapter.endpointSliceClient.EndpointSlices(namespace).Create(endpointSlice) + } + return nil, err + } + + return adapter.endpointSliceClient.EndpointSlices(namespace).Update(endpointSlice) +} + +// endpointSliceFromEndpoints generates an EndpointSlice from an Endpoints +// resource. +func endpointSliceFromEndpoints(endpoints *corev1.Endpoints) *discovery.EndpointSlice { + endpointSlice := &discovery.EndpointSlice{} + endpointSlice.Name = endpoints.Name + endpointSlice.Labels = map[string]string{serviceNameLabel: endpoints.Name} + endpointSlice.OwnerReferences = []metav1.OwnerReference{{Kind: "Service", Name: endpoints.Name}} + + ipAddressType := discovery.AddressTypeIP + endpointSlice.AddressType = &ipAddressType + + if len(endpoints.Subsets) > 0 { + subset := endpoints.Subsets[0] + for i := range subset.Ports { + endpointSlice.Ports = append(endpointSlice.Ports, discovery.EndpointPort{ + Port: &subset.Ports[i].Port, + Name: &subset.Ports[i].Name, + Protocol: &subset.Ports[i].Protocol, + }) + } + for _, address := range subset.Addresses { + endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpointFromAddress(address, true)) + } + for _, address := range subset.NotReadyAddresses { + endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpointFromAddress(address, false)) + } + } + + return endpointSlice +} + +// endpointFromAddress generates an Endpoint from an EndpointAddress resource. +func endpointFromAddress(address corev1.EndpointAddress, ready bool) discovery.Endpoint { + topology := map[string]string{} + if address.NodeName != nil { + topology["kubernetes.io/hostname"] = *address.NodeName + } + + return discovery.Endpoint{ + Addresses: []string{address.IP}, + Conditions: discovery.EndpointConditions{Ready: &ready}, + TargetRef: address.TargetRef, + Topology: topology, + } +} diff --git a/pkg/master/reconcilers/endpointsadapter_test.go b/pkg/master/reconcilers/endpointsadapter_test.go new file mode 100644 index 00000000000..f751866a962 --- /dev/null +++ b/pkg/master/reconcilers/endpointsadapter_test.go @@ -0,0 +1,337 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reconcilers + +import ( + "fmt" + "testing" + + corev1 "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1alpha1" + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes/fake" +) + +func TestEndpointsAdapterGet(t *testing.T) { + endpoints1, _ := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4"}) + + testCases := map[string]struct { + endpointSlicesEnabled bool + expectedError error + expectedEndpoints *corev1.Endpoints + endpoints []*corev1.Endpoints + namespaceParam string + nameParam string + }{ + "single-existing-endpoints": { + endpointSlicesEnabled: false, + expectedError: nil, + expectedEndpoints: endpoints1, + endpoints: []*corev1.Endpoints{endpoints1}, + namespaceParam: "testing", + nameParam: "foo", + }, + "single-existing-endpoints-slices-enabled": { + endpointSlicesEnabled: true, + expectedError: nil, + expectedEndpoints: endpoints1, + endpoints: []*corev1.Endpoints{endpoints1}, + namespaceParam: "testing", + nameParam: "foo", + }, + "wrong-namespace": { + endpointSlicesEnabled: false, + expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "foo"), + expectedEndpoints: nil, + endpoints: []*corev1.Endpoints{endpoints1}, + namespaceParam: "foo", + nameParam: "foo", + }, + "wrong-name": { + endpointSlicesEnabled: false, + expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "bar"), + expectedEndpoints: nil, + endpoints: []*corev1.Endpoints{endpoints1}, + namespaceParam: "testing", + nameParam: "bar", + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + client := fake.NewSimpleClientset() + epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()} + if testCase.endpointSlicesEnabled { + epAdapter.endpointSliceClient = client.DiscoveryV1alpha1() + } + + for _, endpoints := range testCase.endpoints { + _, err := client.CoreV1().Endpoints(endpoints.Namespace).Create(endpoints) + if err != nil { + t.Fatalf("Error creating Endpoints: %v", err) + } + } + + endpoints, err := epAdapter.Get(testCase.namespaceParam, testCase.nameParam, metav1.GetOptions{}) + + if !apiequality.Semantic.DeepEqual(testCase.expectedError, err) { + t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err) + } + + if !apiequality.Semantic.DeepEqual(endpoints, testCase.expectedEndpoints) { + t.Errorf("Expected endpoints: %v, got: %v", testCase.expectedEndpoints, endpoints) + } + }) + } +} + +func TestEndpointsAdapterCreate(t *testing.T) { + endpoints1, epSlice1 := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.3", "10.1.2.4"}) + + testCases := map[string]struct { + endpointSlicesEnabled bool + expectedError error + expectedEndpoints *corev1.Endpoints + expectedEndpointSlice *discovery.EndpointSlice + endpoints []*corev1.Endpoints + endpointSlices []*discovery.EndpointSlice + namespaceParam string + endpointsParam *corev1.Endpoints + }{ + "single-endpoint": { + endpointSlicesEnabled: true, + expectedError: nil, + expectedEndpoints: endpoints1, + expectedEndpointSlice: epSlice1, + endpoints: []*corev1.Endpoints{}, + namespaceParam: endpoints1.Namespace, + endpointsParam: endpoints1, + }, + "single-endpoint-no-slices": { + endpointSlicesEnabled: false, + expectedError: nil, + expectedEndpoints: endpoints1, + expectedEndpointSlice: nil, + endpoints: []*corev1.Endpoints{}, + namespaceParam: endpoints1.Namespace, + endpointsParam: endpoints1, + }, + "existing-endpoint": { + endpointSlicesEnabled: true, + expectedError: errors.NewAlreadyExists(schema.GroupResource{Group: "", Resource: "endpoints"}, "foo"), + expectedEndpoints: nil, + expectedEndpointSlice: nil, + endpoints: []*corev1.Endpoints{endpoints1}, + namespaceParam: endpoints1.Namespace, + endpointsParam: endpoints1, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + client := fake.NewSimpleClientset() + epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()} + if testCase.endpointSlicesEnabled { + epAdapter.endpointSliceClient = client.DiscoveryV1alpha1() + } + + for _, endpoints := range testCase.endpoints { + _, err := client.CoreV1().Endpoints(endpoints.Namespace).Create(endpoints) + if err != nil { + t.Fatalf("Error creating Endpoints: %v", err) + } + } + + endpoints, err := epAdapter.Create(testCase.namespaceParam, testCase.endpointsParam) + + if !apiequality.Semantic.DeepEqual(testCase.expectedError, err) { + t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err) + } + + if !apiequality.Semantic.DeepEqual(endpoints, testCase.expectedEndpoints) { + t.Errorf("Expected endpoints: %v, got: %v", testCase.expectedEndpoints, endpoints) + } + + epSliceList, err := client.DiscoveryV1alpha1().EndpointSlices(testCase.namespaceParam).List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("Error listing Endpoint Slices: %v", err) + } + + if testCase.expectedEndpointSlice == nil { + if len(epSliceList.Items) != 0 { + t.Fatalf("Expected no Endpoint Slices, got: %v", epSliceList.Items) + } + } else { + if len(epSliceList.Items) == 0 { + t.Fatalf("No Endpoint Slices found, expected: %v", testCase.expectedEndpointSlice) + } + if len(epSliceList.Items) > 1 { + t.Errorf("Only 1 Endpoint Slice expected, got: %v", testCase.expectedEndpointSlice) + } + if !apiequality.Semantic.DeepEqual(*testCase.expectedEndpointSlice, epSliceList.Items[0]) { + t.Errorf("Expected Endpoint Slice: %v, got: %v", testCase.expectedEndpointSlice, epSliceList.Items[0]) + + } + } + }) + } +} + +func TestEndpointsAdapterUpdate(t *testing.T) { + endpoints1, _ := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.3", "10.1.2.4"}) + endpoints2, epSlice2 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"}) + endpoints3, _ := generateEndpointsAndSlice("bar", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"}) + + testCases := map[string]struct { + endpointSlicesEnabled bool + expectedError error + expectedEndpoints *corev1.Endpoints + expectedEndpointSlice *discovery.EndpointSlice + endpoints []*corev1.Endpoints + endpointSlices []*discovery.EndpointSlice + namespaceParam string + endpointsParam *corev1.Endpoints + }{ + "single-existing-endpoints-no-change": { + endpointSlicesEnabled: false, + expectedError: nil, + expectedEndpoints: endpoints1, + expectedEndpointSlice: nil, + endpoints: []*corev1.Endpoints{endpoints1}, + namespaceParam: "testing", + endpointsParam: endpoints1, + }, + "add-ports-and-ips": { + endpointSlicesEnabled: true, + expectedError: nil, + expectedEndpoints: endpoints2, + expectedEndpointSlice: epSlice2, + endpoints: []*corev1.Endpoints{endpoints1}, + namespaceParam: "testing", + endpointsParam: endpoints2, + }, + "missing-endpoints": { + endpointSlicesEnabled: true, + expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "bar"), + expectedEndpoints: nil, + expectedEndpointSlice: nil, + endpoints: []*corev1.Endpoints{endpoints1}, + namespaceParam: "testing", + endpointsParam: endpoints3, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + client := fake.NewSimpleClientset() + epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()} + if testCase.endpointSlicesEnabled { + epAdapter.endpointSliceClient = client.DiscoveryV1alpha1() + } + + for _, endpoints := range testCase.endpoints { + _, err := client.CoreV1().Endpoints(endpoints.Namespace).Create(endpoints) + if err != nil { + t.Fatalf("Error creating Endpoints: %v", err) + } + } + + endpoints, err := epAdapter.Update(testCase.namespaceParam, testCase.endpointsParam) + + if !apiequality.Semantic.DeepEqual(testCase.expectedError, err) { + t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err) + } + + if !apiequality.Semantic.DeepEqual(endpoints, testCase.expectedEndpoints) { + t.Errorf("Expected endpoints: %v, got: %v", testCase.expectedEndpoints, endpoints) + } + + epSliceList, err := client.DiscoveryV1alpha1().EndpointSlices(testCase.namespaceParam).List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("Error listing Endpoint Slices: %v", err) + } + + if testCase.expectedEndpointSlice == nil { + if len(epSliceList.Items) != 0 { + t.Fatalf("Expected no Endpoint Slices, got: %v", epSliceList.Items) + } + } else { + if len(epSliceList.Items) == 0 { + t.Fatalf("No Endpoint Slices found, expected: %v", testCase.expectedEndpointSlice) + } + if len(epSliceList.Items) > 1 { + t.Errorf("Only 1 Endpoint Slice expected, got: %v", testCase.expectedEndpointSlice) + } + if !apiequality.Semantic.DeepEqual(*testCase.expectedEndpointSlice, epSliceList.Items[0]) { + t.Errorf("Expected Endpoint Slice: %v, got: %v", testCase.expectedEndpointSlice, epSliceList.Items[0]) + + } + } + }) + } +} + +func generateEndpointsAndSlice(name, namespace string, ports []int, addresses []string) (*corev1.Endpoints, *discovery.EndpointSlice) { + objectMeta := metav1.ObjectMeta{Name: name, Namespace: namespace} + trueBool := true + addressTypeIP := discovery.AddressTypeIP + + epSlice := &discovery.EndpointSlice{ObjectMeta: objectMeta, AddressType: &addressTypeIP} + epSlice.Labels = map[string]string{serviceNameLabel: name} + epSlice.OwnerReferences = []metav1.OwnerReference{{Kind: "Service", Name: name}} + subset := corev1.EndpointSubset{} + + for i, port := range ports { + endpointPort := corev1.EndpointPort{ + Name: fmt.Sprintf("port-%d", i), + Port: int32(port), + Protocol: corev1.ProtocolTCP, + } + subset.Ports = append(subset.Ports, endpointPort) + epSlice.Ports = append(epSlice.Ports, discovery.EndpointPort{ + Name: &endpointPort.Name, + Port: &endpointPort.Port, + Protocol: &endpointPort.Protocol, + }) + } + + for i, address := range addresses { + endpointAddress := corev1.EndpointAddress{ + IP: address, + TargetRef: &corev1.ObjectReference{ + Kind: "Pod", + Name: fmt.Sprintf("pod-%d", i), + }, + } + + subset.Addresses = append(subset.Addresses, endpointAddress) + + epSlice.Endpoints = append(epSlice.Endpoints, discovery.Endpoint{ + Addresses: []string{endpointAddress.IP}, + TargetRef: endpointAddress.TargetRef, + Conditions: discovery.EndpointConditions{Ready: &trueBool}, + }) + } + + return &corev1.Endpoints{ + ObjectMeta: objectMeta, + Subsets: []corev1.EndpointSubset{subset}, + }, epSlice +} diff --git a/pkg/master/reconcilers/lease.go b/pkg/master/reconcilers/lease.go index 25afeb72262..790e93462c9 100644 --- a/pkg/master/reconcilers/lease.go +++ b/pkg/master/reconcilers/lease.go @@ -37,7 +37,6 @@ import ( apirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/storage" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" ) @@ -120,16 +119,16 @@ func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duratio } type leaseEndpointReconciler struct { - endpointClient corev1client.EndpointsGetter + epAdapter EndpointsAdapter masterLeases Leases stopReconcilingCalled bool reconcilingLock sync.Mutex } // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler -func NewLeaseEndpointReconciler(endpointClient corev1client.EndpointsGetter, masterLeases Leases) EndpointReconciler { +func NewLeaseEndpointReconciler(epAdapter EndpointsAdapter, masterLeases Leases) EndpointReconciler { return &leaseEndpointReconciler{ - endpointClient: endpointClient, + epAdapter: epAdapter, masterLeases: masterLeases, stopReconcilingCalled: false, } @@ -161,7 +160,7 @@ func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net. } func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error { - e, err := r.endpointClient.Endpoints(corev1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) + e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{}) shouldCreate := false if err != nil { if !errors.IsNotFound(err) { @@ -222,11 +221,11 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs) if shouldCreate { - if _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Create(e); errors.IsAlreadyExists(err) { + if _, err = r.epAdapter.Create(corev1.NamespaceDefault, e); errors.IsAlreadyExists(err) { err = nil } } else { - _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Update(e) + _, err = r.epAdapter.Update(corev1.NamespaceDefault, e) } return err } diff --git a/pkg/master/reconcilers/lease_test.go b/pkg/master/reconcilers/lease_test.go index a3521217712..b202b0f31b0 100644 --- a/pkg/master/reconcilers/lease_test.go +++ b/pkg/master/reconcilers/lease_test.go @@ -426,7 +426,9 @@ func TestLeaseEndpointReconciler(t *testing.T) { } } } - r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases) + + epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()} + r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) @@ -526,7 +528,8 @@ func TestLeaseEndpointReconciler(t *testing.T) { } } } - r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases) + epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()} + r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) @@ -626,7 +629,8 @@ func TestLeaseRemoveEndpoints(t *testing.T) { continue } } - r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases) + epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()} + r := NewLeaseEndpointReconciler(epAdapter, fakeLeases) err := r.RemoveEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) diff --git a/pkg/master/reconcilers/mastercount.go b/pkg/master/reconcilers/mastercount.go index 4699e90c172..78726da9724 100644 --- a/pkg/master/reconcilers/mastercount.go +++ b/pkg/master/reconcilers/mastercount.go @@ -24,7 +24,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/retry" "k8s.io/klog" endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints" @@ -34,17 +33,17 @@ import ( // masters. masterCountEndpointReconciler implements EndpointReconciler. type masterCountEndpointReconciler struct { masterCount int - endpointClient corev1client.EndpointsGetter + epAdapter EndpointsAdapter stopReconcilingCalled bool reconcilingLock sync.Mutex } // NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a // specified expected number of masters. -func NewMasterCountEndpointReconciler(masterCount int, endpointClient corev1client.EndpointsGetter) EndpointReconciler { +func NewMasterCountEndpointReconciler(masterCount int, epAdapter EndpointsAdapter) EndpointReconciler { return &masterCountEndpointReconciler{ - masterCount: masterCount, - endpointClient: endpointClient, + masterCount: masterCount, + epAdapter: epAdapter, } } @@ -68,7 +67,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i return nil } - e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) + e, err := r.epAdapter.Get(metav1.NamespaceDefault, serviceName, metav1.GetOptions{}) if err != nil { e = &corev1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ @@ -83,7 +82,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i Addresses: []corev1.EndpointAddress{{IP: ip.String()}}, Ports: endpointPorts, }} - _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Create(e) + _, err = r.epAdapter.Create(metav1.NamespaceDefault, e) return err } @@ -97,7 +96,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i Ports: endpointPorts, }} klog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e) - _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) + _, err = r.epAdapter.Update(metav1.NamespaceDefault, e) return err } if ipCorrect && portsCorrect { @@ -133,7 +132,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i e.Subsets[0].Ports = endpointPorts } klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e) - _, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) + _, err = r.epAdapter.Update(metav1.NamespaceDefault, e) return err } @@ -141,7 +140,7 @@ func (r *masterCountEndpointReconciler) RemoveEndpoints(serviceName string, ip n r.reconcilingLock.Lock() defer r.reconcilingLock.Unlock() - e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{}) + e, err := r.epAdapter.Get(metav1.NamespaceDefault, serviceName, metav1.GetOptions{}) if err != nil { if errors.IsNotFound(err) { // Endpoint doesn't exist @@ -160,7 +159,7 @@ func (r *masterCountEndpointReconciler) RemoveEndpoints(serviceName string, ip n e.Subsets[0].Addresses = new e.Subsets = endpointsv1.RepackSubsets(e.Subsets) err = retry.RetryOnConflict(retry.DefaultBackoff, func() error { - _, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e) + _, err := r.epAdapter.Update(metav1.NamespaceDefault, e) return err }) return err