mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #82139 from robscott/endpointslice-master
Adding EndpointsAdapter for apiserver EndpointSlice support
This commit is contained in:
commit
f1617a1b87
@ -124,6 +124,7 @@ go_library(
|
||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/discovery/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
|
@ -392,7 +392,8 @@ func TestReconcileEndpoints(t *testing.T) {
|
||||
if test.endpoints != nil {
|
||||
fakeClient = fake.NewSimpleClientset(test.endpoints)
|
||||
}
|
||||
reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.CoreV1())
|
||||
epAdapter := reconcilers.NewEndpointsAdapter(fakeClient.CoreV1(), nil)
|
||||
reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, epAdapter)
|
||||
err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true)
|
||||
if err != nil {
|
||||
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
||||
@ -510,7 +511,8 @@ func TestReconcileEndpoints(t *testing.T) {
|
||||
if test.endpoints != nil {
|
||||
fakeClient = fake.NewSimpleClientset(test.endpoints)
|
||||
}
|
||||
reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, fakeClient.CoreV1())
|
||||
epAdapter := reconcilers.NewEndpointsAdapter(fakeClient.CoreV1(), nil)
|
||||
reconciler := reconcilers.NewMasterCountEndpointReconciler(test.additionalMasters+1, epAdapter)
|
||||
err := reconciler.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false)
|
||||
if err != nil {
|
||||
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
||||
|
@ -70,9 +70,12 @@ import (
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1alpha1"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
kubeoptions "k8s.io/kubernetes/pkg/kubeapiserver/options"
|
||||
kubeletclient "k8s.io/kubernetes/pkg/kubelet/client"
|
||||
"k8s.io/kubernetes/pkg/master/reconcilers"
|
||||
@ -217,7 +220,13 @@ type Master struct {
|
||||
|
||||
func (c *Config) createMasterCountReconciler() reconcilers.EndpointReconciler {
|
||||
endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointClient)
|
||||
var endpointSliceClient discoveryclient.EndpointSlicesGetter
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
|
||||
endpointSliceClient = discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
}
|
||||
endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient)
|
||||
|
||||
return reconcilers.NewMasterCountEndpointReconciler(c.ExtraConfig.MasterCount, endpointsAdapter)
|
||||
}
|
||||
|
||||
func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler {
|
||||
@ -226,6 +235,12 @@ func (c *Config) createNoneReconciler() reconcilers.EndpointReconciler {
|
||||
|
||||
func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
|
||||
endpointClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
var endpointSliceClient discoveryclient.EndpointSlicesGetter
|
||||
if utilfeature.DefaultFeatureGate.Enabled(features.EndpointSlice) {
|
||||
endpointSliceClient = discoveryclient.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
}
|
||||
endpointsAdapter := reconcilers.NewEndpointsAdapter(endpointClient, endpointSliceClient)
|
||||
|
||||
ttl := c.ExtraConfig.MasterEndpointReconcileTTL
|
||||
config, err := c.ExtraConfig.StorageFactory.NewConfig(api.Resource("apiServerIPInfo"))
|
||||
if err != nil {
|
||||
@ -236,7 +251,8 @@ func (c *Config) createLeaseReconciler() reconcilers.EndpointReconciler {
|
||||
klog.Fatalf("Error creating storage factory: %v", err)
|
||||
}
|
||||
masterLeases := reconcilers.NewLeases(leaseStorage, "/masterleases/", ttl)
|
||||
return reconcilers.NewLeaseEndpointReconciler(endpointClient, masterLeases)
|
||||
|
||||
return reconcilers.NewLeaseEndpointReconciler(endpointsAdapter, masterLeases)
|
||||
}
|
||||
|
||||
func (c *Config) createEndpointReconciler() reconcilers.EndpointReconciler {
|
||||
|
@ -4,6 +4,7 @@ go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"doc.go",
|
||||
"endpointsadapter.go",
|
||||
"lease.go",
|
||||
"mastercount.go",
|
||||
"none.go",
|
||||
@ -14,6 +15,7 @@ go_library(
|
||||
deps = [
|
||||
"//pkg/api/v1/endpoints:go_default_library",
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
@ -21,6 +23,7 @@ go_library(
|
||||
"//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//staging/src/k8s.io/apiserver/pkg/storage:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/typed/discovery/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
|
||||
"//vendor/k8s.io/klog:go_default_library",
|
||||
],
|
||||
@ -28,11 +31,18 @@ go_library(
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["lease_test.go"],
|
||||
srcs = [
|
||||
"endpointsadapter_test.go",
|
||||
"lease_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||
],
|
||||
)
|
||||
|
145
pkg/master/reconcilers/endpointsadapter.go
Normal file
145
pkg/master/reconcilers/endpointsadapter.go
Normal file
@ -0,0 +1,145 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package reconcilers
|
||||
|
||||
import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1alpha1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1alpha1"
|
||||
)
|
||||
|
||||
const (
|
||||
// serviceNameLabel is used to indicate the name of a Kubernetes service
|
||||
// associated with an EndpointSlice.
|
||||
serviceNameLabel = "kubernetes.io/service-name"
|
||||
)
|
||||
|
||||
// EndpointsAdapter provides a simple interface for reading and writing both
|
||||
// Endpoints and Endpoint Slices.
|
||||
// NOTE: This is an incomplete adapter implementation that is only suitable for
|
||||
// use in this package. This takes advantage of the Endpoints used in this
|
||||
// package always having a consistent set of ports, a single subset, and a small
|
||||
// set of addresses. Any more complex Endpoints resource would likely translate
|
||||
// into multiple Endpoint Slices creating significantly more complexity instead
|
||||
// of the 1:1 mapping this allows.
|
||||
type EndpointsAdapter struct {
|
||||
endpointClient corev1client.EndpointsGetter
|
||||
endpointSliceClient discoveryclient.EndpointSlicesGetter
|
||||
}
|
||||
|
||||
// NewEndpointsAdapter returns a new EndpointsAdapter.
|
||||
func NewEndpointsAdapter(endpointClient corev1client.EndpointsGetter, endpointSliceClient discoveryclient.EndpointSlicesGetter) EndpointsAdapter {
|
||||
return EndpointsAdapter{
|
||||
endpointClient: endpointClient,
|
||||
endpointSliceClient: endpointSliceClient,
|
||||
}
|
||||
}
|
||||
|
||||
// Get takes the name and namespace of the Endpoints resource, and returns a
|
||||
// corresponding Endpoints object if it exists, and an error if there is any.
|
||||
func (adapter *EndpointsAdapter) Get(namespace, name string, getOpts metav1.GetOptions) (*corev1.Endpoints, error) {
|
||||
return adapter.endpointClient.Endpoints(namespace).Get(name, getOpts)
|
||||
}
|
||||
|
||||
// Create accepts a namespace and Endpoints object and creates the Endpoints
|
||||
// object. If an endpointSliceClient exists, a matching EndpointSlice will also
|
||||
// be created or updated. The created Endpoints object or an error will be
|
||||
// returned.
|
||||
func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
|
||||
endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(endpoints)
|
||||
if err == nil && adapter.endpointSliceClient != nil {
|
||||
_, err = adapter.ensureEndpointSliceFromEndpoints(namespace, endpoints)
|
||||
}
|
||||
return endpoints, err
|
||||
}
|
||||
|
||||
// Update accepts a namespace and Endpoints object and updates it. If an
|
||||
// endpointSliceClient exists, a matching EndpointSlice will also be created or
|
||||
// updated. The updated Endpoints object or an error will be returned.
|
||||
func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
|
||||
endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(endpoints)
|
||||
if err == nil && adapter.endpointSliceClient != nil {
|
||||
_, err = adapter.ensureEndpointSliceFromEndpoints(namespace, endpoints)
|
||||
}
|
||||
return endpoints, err
|
||||
}
|
||||
|
||||
// ensureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource
|
||||
// and creates or updates a corresponding EndpointSlice. The EndpointSlice
|
||||
// and/or an error will be returned.
|
||||
func (adapter *EndpointsAdapter) ensureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) (*discovery.EndpointSlice, error) {
|
||||
endpointSlice := endpointSliceFromEndpoints(endpoints)
|
||||
_, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(endpointSlice.Name, metav1.GetOptions{})
|
||||
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
return adapter.endpointSliceClient.EndpointSlices(namespace).Create(endpointSlice)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return adapter.endpointSliceClient.EndpointSlices(namespace).Update(endpointSlice)
|
||||
}
|
||||
|
||||
// endpointSliceFromEndpoints generates an EndpointSlice from an Endpoints
|
||||
// resource.
|
||||
func endpointSliceFromEndpoints(endpoints *corev1.Endpoints) *discovery.EndpointSlice {
|
||||
endpointSlice := &discovery.EndpointSlice{}
|
||||
endpointSlice.Name = endpoints.Name
|
||||
endpointSlice.Labels = map[string]string{serviceNameLabel: endpoints.Name}
|
||||
endpointSlice.OwnerReferences = []metav1.OwnerReference{{Kind: "Service", Name: endpoints.Name}}
|
||||
|
||||
ipAddressType := discovery.AddressTypeIP
|
||||
endpointSlice.AddressType = &ipAddressType
|
||||
|
||||
if len(endpoints.Subsets) > 0 {
|
||||
subset := endpoints.Subsets[0]
|
||||
for i := range subset.Ports {
|
||||
endpointSlice.Ports = append(endpointSlice.Ports, discovery.EndpointPort{
|
||||
Port: &subset.Ports[i].Port,
|
||||
Name: &subset.Ports[i].Name,
|
||||
Protocol: &subset.Ports[i].Protocol,
|
||||
})
|
||||
}
|
||||
for _, address := range subset.Addresses {
|
||||
endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpointFromAddress(address, true))
|
||||
}
|
||||
for _, address := range subset.NotReadyAddresses {
|
||||
endpointSlice.Endpoints = append(endpointSlice.Endpoints, endpointFromAddress(address, false))
|
||||
}
|
||||
}
|
||||
|
||||
return endpointSlice
|
||||
}
|
||||
|
||||
// endpointFromAddress generates an Endpoint from an EndpointAddress resource.
|
||||
func endpointFromAddress(address corev1.EndpointAddress, ready bool) discovery.Endpoint {
|
||||
topology := map[string]string{}
|
||||
if address.NodeName != nil {
|
||||
topology["kubernetes.io/hostname"] = *address.NodeName
|
||||
}
|
||||
|
||||
return discovery.Endpoint{
|
||||
Addresses: []string{address.IP},
|
||||
Conditions: discovery.EndpointConditions{Ready: &ready},
|
||||
TargetRef: address.TargetRef,
|
||||
Topology: topology,
|
||||
}
|
||||
}
|
337
pkg/master/reconcilers/endpointsadapter_test.go
Normal file
337
pkg/master/reconcilers/endpointsadapter_test.go
Normal file
@ -0,0 +1,337 @@
|
||||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package reconcilers
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discovery "k8s.io/api/discovery/v1alpha1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
)
|
||||
|
||||
func TestEndpointsAdapterGet(t *testing.T) {
|
||||
endpoints1, _ := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4"})
|
||||
|
||||
testCases := map[string]struct {
|
||||
endpointSlicesEnabled bool
|
||||
expectedError error
|
||||
expectedEndpoints *corev1.Endpoints
|
||||
endpoints []*corev1.Endpoints
|
||||
namespaceParam string
|
||||
nameParam string
|
||||
}{
|
||||
"single-existing-endpoints": {
|
||||
endpointSlicesEnabled: false,
|
||||
expectedError: nil,
|
||||
expectedEndpoints: endpoints1,
|
||||
endpoints: []*corev1.Endpoints{endpoints1},
|
||||
namespaceParam: "testing",
|
||||
nameParam: "foo",
|
||||
},
|
||||
"single-existing-endpoints-slices-enabled": {
|
||||
endpointSlicesEnabled: true,
|
||||
expectedError: nil,
|
||||
expectedEndpoints: endpoints1,
|
||||
endpoints: []*corev1.Endpoints{endpoints1},
|
||||
namespaceParam: "testing",
|
||||
nameParam: "foo",
|
||||
},
|
||||
"wrong-namespace": {
|
||||
endpointSlicesEnabled: false,
|
||||
expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "foo"),
|
||||
expectedEndpoints: nil,
|
||||
endpoints: []*corev1.Endpoints{endpoints1},
|
||||
namespaceParam: "foo",
|
||||
nameParam: "foo",
|
||||
},
|
||||
"wrong-name": {
|
||||
endpointSlicesEnabled: false,
|
||||
expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "bar"),
|
||||
expectedEndpoints: nil,
|
||||
endpoints: []*corev1.Endpoints{endpoints1},
|
||||
namespaceParam: "testing",
|
||||
nameParam: "bar",
|
||||
},
|
||||
}
|
||||
|
||||
for name, testCase := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
client := fake.NewSimpleClientset()
|
||||
epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()}
|
||||
if testCase.endpointSlicesEnabled {
|
||||
epAdapter.endpointSliceClient = client.DiscoveryV1alpha1()
|
||||
}
|
||||
|
||||
for _, endpoints := range testCase.endpoints {
|
||||
_, err := client.CoreV1().Endpoints(endpoints.Namespace).Create(endpoints)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating Endpoints: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
endpoints, err := epAdapter.Get(testCase.namespaceParam, testCase.nameParam, metav1.GetOptions{})
|
||||
|
||||
if !apiequality.Semantic.DeepEqual(testCase.expectedError, err) {
|
||||
t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err)
|
||||
}
|
||||
|
||||
if !apiequality.Semantic.DeepEqual(endpoints, testCase.expectedEndpoints) {
|
||||
t.Errorf("Expected endpoints: %v, got: %v", testCase.expectedEndpoints, endpoints)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointsAdapterCreate(t *testing.T) {
|
||||
endpoints1, epSlice1 := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.3", "10.1.2.4"})
|
||||
|
||||
testCases := map[string]struct {
|
||||
endpointSlicesEnabled bool
|
||||
expectedError error
|
||||
expectedEndpoints *corev1.Endpoints
|
||||
expectedEndpointSlice *discovery.EndpointSlice
|
||||
endpoints []*corev1.Endpoints
|
||||
endpointSlices []*discovery.EndpointSlice
|
||||
namespaceParam string
|
||||
endpointsParam *corev1.Endpoints
|
||||
}{
|
||||
"single-endpoint": {
|
||||
endpointSlicesEnabled: true,
|
||||
expectedError: nil,
|
||||
expectedEndpoints: endpoints1,
|
||||
expectedEndpointSlice: epSlice1,
|
||||
endpoints: []*corev1.Endpoints{},
|
||||
namespaceParam: endpoints1.Namespace,
|
||||
endpointsParam: endpoints1,
|
||||
},
|
||||
"single-endpoint-no-slices": {
|
||||
endpointSlicesEnabled: false,
|
||||
expectedError: nil,
|
||||
expectedEndpoints: endpoints1,
|
||||
expectedEndpointSlice: nil,
|
||||
endpoints: []*corev1.Endpoints{},
|
||||
namespaceParam: endpoints1.Namespace,
|
||||
endpointsParam: endpoints1,
|
||||
},
|
||||
"existing-endpoint": {
|
||||
endpointSlicesEnabled: true,
|
||||
expectedError: errors.NewAlreadyExists(schema.GroupResource{Group: "", Resource: "endpoints"}, "foo"),
|
||||
expectedEndpoints: nil,
|
||||
expectedEndpointSlice: nil,
|
||||
endpoints: []*corev1.Endpoints{endpoints1},
|
||||
namespaceParam: endpoints1.Namespace,
|
||||
endpointsParam: endpoints1,
|
||||
},
|
||||
}
|
||||
|
||||
for name, testCase := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
client := fake.NewSimpleClientset()
|
||||
epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()}
|
||||
if testCase.endpointSlicesEnabled {
|
||||
epAdapter.endpointSliceClient = client.DiscoveryV1alpha1()
|
||||
}
|
||||
|
||||
for _, endpoints := range testCase.endpoints {
|
||||
_, err := client.CoreV1().Endpoints(endpoints.Namespace).Create(endpoints)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating Endpoints: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
endpoints, err := epAdapter.Create(testCase.namespaceParam, testCase.endpointsParam)
|
||||
|
||||
if !apiequality.Semantic.DeepEqual(testCase.expectedError, err) {
|
||||
t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err)
|
||||
}
|
||||
|
||||
if !apiequality.Semantic.DeepEqual(endpoints, testCase.expectedEndpoints) {
|
||||
t.Errorf("Expected endpoints: %v, got: %v", testCase.expectedEndpoints, endpoints)
|
||||
}
|
||||
|
||||
epSliceList, err := client.DiscoveryV1alpha1().EndpointSlices(testCase.namespaceParam).List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Error listing Endpoint Slices: %v", err)
|
||||
}
|
||||
|
||||
if testCase.expectedEndpointSlice == nil {
|
||||
if len(epSliceList.Items) != 0 {
|
||||
t.Fatalf("Expected no Endpoint Slices, got: %v", epSliceList.Items)
|
||||
}
|
||||
} else {
|
||||
if len(epSliceList.Items) == 0 {
|
||||
t.Fatalf("No Endpoint Slices found, expected: %v", testCase.expectedEndpointSlice)
|
||||
}
|
||||
if len(epSliceList.Items) > 1 {
|
||||
t.Errorf("Only 1 Endpoint Slice expected, got: %v", testCase.expectedEndpointSlice)
|
||||
}
|
||||
if !apiequality.Semantic.DeepEqual(*testCase.expectedEndpointSlice, epSliceList.Items[0]) {
|
||||
t.Errorf("Expected Endpoint Slice: %v, got: %v", testCase.expectedEndpointSlice, epSliceList.Items[0])
|
||||
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointsAdapterUpdate(t *testing.T) {
|
||||
endpoints1, _ := generateEndpointsAndSlice("foo", "testing", []int{80}, []string{"10.1.2.3", "10.1.2.4"})
|
||||
endpoints2, epSlice2 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"})
|
||||
endpoints3, _ := generateEndpointsAndSlice("bar", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"})
|
||||
|
||||
testCases := map[string]struct {
|
||||
endpointSlicesEnabled bool
|
||||
expectedError error
|
||||
expectedEndpoints *corev1.Endpoints
|
||||
expectedEndpointSlice *discovery.EndpointSlice
|
||||
endpoints []*corev1.Endpoints
|
||||
endpointSlices []*discovery.EndpointSlice
|
||||
namespaceParam string
|
||||
endpointsParam *corev1.Endpoints
|
||||
}{
|
||||
"single-existing-endpoints-no-change": {
|
||||
endpointSlicesEnabled: false,
|
||||
expectedError: nil,
|
||||
expectedEndpoints: endpoints1,
|
||||
expectedEndpointSlice: nil,
|
||||
endpoints: []*corev1.Endpoints{endpoints1},
|
||||
namespaceParam: "testing",
|
||||
endpointsParam: endpoints1,
|
||||
},
|
||||
"add-ports-and-ips": {
|
||||
endpointSlicesEnabled: true,
|
||||
expectedError: nil,
|
||||
expectedEndpoints: endpoints2,
|
||||
expectedEndpointSlice: epSlice2,
|
||||
endpoints: []*corev1.Endpoints{endpoints1},
|
||||
namespaceParam: "testing",
|
||||
endpointsParam: endpoints2,
|
||||
},
|
||||
"missing-endpoints": {
|
||||
endpointSlicesEnabled: true,
|
||||
expectedError: errors.NewNotFound(schema.GroupResource{Group: "", Resource: "endpoints"}, "bar"),
|
||||
expectedEndpoints: nil,
|
||||
expectedEndpointSlice: nil,
|
||||
endpoints: []*corev1.Endpoints{endpoints1},
|
||||
namespaceParam: "testing",
|
||||
endpointsParam: endpoints3,
|
||||
},
|
||||
}
|
||||
|
||||
for name, testCase := range testCases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
client := fake.NewSimpleClientset()
|
||||
epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()}
|
||||
if testCase.endpointSlicesEnabled {
|
||||
epAdapter.endpointSliceClient = client.DiscoveryV1alpha1()
|
||||
}
|
||||
|
||||
for _, endpoints := range testCase.endpoints {
|
||||
_, err := client.CoreV1().Endpoints(endpoints.Namespace).Create(endpoints)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating Endpoints: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
endpoints, err := epAdapter.Update(testCase.namespaceParam, testCase.endpointsParam)
|
||||
|
||||
if !apiequality.Semantic.DeepEqual(testCase.expectedError, err) {
|
||||
t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err)
|
||||
}
|
||||
|
||||
if !apiequality.Semantic.DeepEqual(endpoints, testCase.expectedEndpoints) {
|
||||
t.Errorf("Expected endpoints: %v, got: %v", testCase.expectedEndpoints, endpoints)
|
||||
}
|
||||
|
||||
epSliceList, err := client.DiscoveryV1alpha1().EndpointSlices(testCase.namespaceParam).List(metav1.ListOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Error listing Endpoint Slices: %v", err)
|
||||
}
|
||||
|
||||
if testCase.expectedEndpointSlice == nil {
|
||||
if len(epSliceList.Items) != 0 {
|
||||
t.Fatalf("Expected no Endpoint Slices, got: %v", epSliceList.Items)
|
||||
}
|
||||
} else {
|
||||
if len(epSliceList.Items) == 0 {
|
||||
t.Fatalf("No Endpoint Slices found, expected: %v", testCase.expectedEndpointSlice)
|
||||
}
|
||||
if len(epSliceList.Items) > 1 {
|
||||
t.Errorf("Only 1 Endpoint Slice expected, got: %v", testCase.expectedEndpointSlice)
|
||||
}
|
||||
if !apiequality.Semantic.DeepEqual(*testCase.expectedEndpointSlice, epSliceList.Items[0]) {
|
||||
t.Errorf("Expected Endpoint Slice: %v, got: %v", testCase.expectedEndpointSlice, epSliceList.Items[0])
|
||||
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func generateEndpointsAndSlice(name, namespace string, ports []int, addresses []string) (*corev1.Endpoints, *discovery.EndpointSlice) {
|
||||
objectMeta := metav1.ObjectMeta{Name: name, Namespace: namespace}
|
||||
trueBool := true
|
||||
addressTypeIP := discovery.AddressTypeIP
|
||||
|
||||
epSlice := &discovery.EndpointSlice{ObjectMeta: objectMeta, AddressType: &addressTypeIP}
|
||||
epSlice.Labels = map[string]string{serviceNameLabel: name}
|
||||
epSlice.OwnerReferences = []metav1.OwnerReference{{Kind: "Service", Name: name}}
|
||||
subset := corev1.EndpointSubset{}
|
||||
|
||||
for i, port := range ports {
|
||||
endpointPort := corev1.EndpointPort{
|
||||
Name: fmt.Sprintf("port-%d", i),
|
||||
Port: int32(port),
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
}
|
||||
subset.Ports = append(subset.Ports, endpointPort)
|
||||
epSlice.Ports = append(epSlice.Ports, discovery.EndpointPort{
|
||||
Name: &endpointPort.Name,
|
||||
Port: &endpointPort.Port,
|
||||
Protocol: &endpointPort.Protocol,
|
||||
})
|
||||
}
|
||||
|
||||
for i, address := range addresses {
|
||||
endpointAddress := corev1.EndpointAddress{
|
||||
IP: address,
|
||||
TargetRef: &corev1.ObjectReference{
|
||||
Kind: "Pod",
|
||||
Name: fmt.Sprintf("pod-%d", i),
|
||||
},
|
||||
}
|
||||
|
||||
subset.Addresses = append(subset.Addresses, endpointAddress)
|
||||
|
||||
epSlice.Endpoints = append(epSlice.Endpoints, discovery.Endpoint{
|
||||
Addresses: []string{endpointAddress.IP},
|
||||
TargetRef: endpointAddress.TargetRef,
|
||||
Conditions: discovery.EndpointConditions{Ready: &trueBool},
|
||||
})
|
||||
}
|
||||
|
||||
return &corev1.Endpoints{
|
||||
ObjectMeta: objectMeta,
|
||||
Subsets: []corev1.EndpointSubset{subset},
|
||||
}, epSlice
|
||||
}
|
@ -37,7 +37,6 @@ import (
|
||||
apirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
|
||||
)
|
||||
|
||||
@ -120,16 +119,16 @@ func NewLeases(storage storage.Interface, baseKey string, leaseTime time.Duratio
|
||||
}
|
||||
|
||||
type leaseEndpointReconciler struct {
|
||||
endpointClient corev1client.EndpointsGetter
|
||||
epAdapter EndpointsAdapter
|
||||
masterLeases Leases
|
||||
stopReconcilingCalled bool
|
||||
reconcilingLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
|
||||
func NewLeaseEndpointReconciler(endpointClient corev1client.EndpointsGetter, masterLeases Leases) EndpointReconciler {
|
||||
func NewLeaseEndpointReconciler(epAdapter EndpointsAdapter, masterLeases Leases) EndpointReconciler {
|
||||
return &leaseEndpointReconciler{
|
||||
endpointClient: endpointClient,
|
||||
epAdapter: epAdapter,
|
||||
masterLeases: masterLeases,
|
||||
stopReconcilingCalled: false,
|
||||
}
|
||||
@ -161,7 +160,7 @@ func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.
|
||||
}
|
||||
|
||||
func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
|
||||
e, err := r.endpointClient.Endpoints(corev1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
|
||||
e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{})
|
||||
shouldCreate := false
|
||||
if err != nil {
|
||||
if !errors.IsNotFound(err) {
|
||||
@ -222,11 +221,11 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
|
||||
|
||||
klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
|
||||
if shouldCreate {
|
||||
if _, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Create(e); errors.IsAlreadyExists(err) {
|
||||
if _, err = r.epAdapter.Create(corev1.NamespaceDefault, e); errors.IsAlreadyExists(err) {
|
||||
err = nil
|
||||
}
|
||||
} else {
|
||||
_, err = r.endpointClient.Endpoints(corev1.NamespaceDefault).Update(e)
|
||||
_, err = r.epAdapter.Update(corev1.NamespaceDefault, e)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
@ -426,7 +426,9 @@ func TestLeaseEndpointReconciler(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases)
|
||||
|
||||
epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()}
|
||||
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
|
||||
err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, true)
|
||||
if err != nil {
|
||||
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
||||
@ -526,7 +528,8 @@ func TestLeaseEndpointReconciler(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases)
|
||||
epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()}
|
||||
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
|
||||
err := r.ReconcileEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts, false)
|
||||
if err != nil {
|
||||
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
||||
@ -626,7 +629,8 @@ func TestLeaseRemoveEndpoints(t *testing.T) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
r := NewLeaseEndpointReconciler(clientset.CoreV1(), fakeLeases)
|
||||
epAdapter := EndpointsAdapter{endpointClient: clientset.CoreV1()}
|
||||
r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
|
||||
err := r.RemoveEndpoints(test.serviceName, net.ParseIP(test.ip), test.endpointPorts)
|
||||
if err != nil {
|
||||
t.Errorf("case %q: unexpected error: %v", test.testName, err)
|
||||
|
@ -24,7 +24,6 @@ import (
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/klog"
|
||||
endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
|
||||
@ -34,17 +33,17 @@ import (
|
||||
// masters. masterCountEndpointReconciler implements EndpointReconciler.
|
||||
type masterCountEndpointReconciler struct {
|
||||
masterCount int
|
||||
endpointClient corev1client.EndpointsGetter
|
||||
epAdapter EndpointsAdapter
|
||||
stopReconcilingCalled bool
|
||||
reconcilingLock sync.Mutex
|
||||
}
|
||||
|
||||
// NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
|
||||
// specified expected number of masters.
|
||||
func NewMasterCountEndpointReconciler(masterCount int, endpointClient corev1client.EndpointsGetter) EndpointReconciler {
|
||||
func NewMasterCountEndpointReconciler(masterCount int, epAdapter EndpointsAdapter) EndpointReconciler {
|
||||
return &masterCountEndpointReconciler{
|
||||
masterCount: masterCount,
|
||||
endpointClient: endpointClient,
|
||||
masterCount: masterCount,
|
||||
epAdapter: epAdapter,
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,7 +67,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
|
||||
return nil
|
||||
}
|
||||
|
||||
e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
|
||||
e, err := r.epAdapter.Get(metav1.NamespaceDefault, serviceName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
e = &corev1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@ -83,7 +82,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
|
||||
Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
|
||||
Ports: endpointPorts,
|
||||
}}
|
||||
_, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Create(e)
|
||||
_, err = r.epAdapter.Create(metav1.NamespaceDefault, e)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -97,7 +96,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
|
||||
Ports: endpointPorts,
|
||||
}}
|
||||
klog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e)
|
||||
_, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
|
||||
_, err = r.epAdapter.Update(metav1.NamespaceDefault, e)
|
||||
return err
|
||||
}
|
||||
if ipCorrect && portsCorrect {
|
||||
@ -133,7 +132,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
|
||||
e.Subsets[0].Ports = endpointPorts
|
||||
}
|
||||
klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
|
||||
_, err = r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
|
||||
_, err = r.epAdapter.Update(metav1.NamespaceDefault, e)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -141,7 +140,7 @@ func (r *masterCountEndpointReconciler) RemoveEndpoints(serviceName string, ip n
|
||||
r.reconcilingLock.Lock()
|
||||
defer r.reconcilingLock.Unlock()
|
||||
|
||||
e, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Get(serviceName, metav1.GetOptions{})
|
||||
e, err := r.epAdapter.Get(metav1.NamespaceDefault, serviceName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
if errors.IsNotFound(err) {
|
||||
// Endpoint doesn't exist
|
||||
@ -160,7 +159,7 @@ func (r *masterCountEndpointReconciler) RemoveEndpoints(serviceName string, ip n
|
||||
e.Subsets[0].Addresses = new
|
||||
e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
|
||||
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
_, err := r.endpointClient.Endpoints(metav1.NamespaceDefault).Update(e)
|
||||
_, err := r.epAdapter.Update(metav1.NamespaceDefault, e)
|
||||
return err
|
||||
})
|
||||
return err
|
||||
|
Loading…
Reference in New Issue
Block a user