Ensure EndpointSlice exist if Endpoint is unchanged

The EndpointSlice for masters was not created after enabling
EndpointSlice feature on a pre-existing cluster. This was because the
Endpoint object had been created and ReconcileEndpoints would skip
creating or updating it after EndpointSlice feature is enabled.

This patch ensures EndpointSlice is consistent with Endpoints after the
reconciler reconciles Endpoints even if Endpoints is unchanged. It also
avoids an update if the desired EndpointSlice matches the existing one.
This commit is contained in:
Quan Tian 2019-10-27 08:11:08 -07:00
parent 3d7318f29d
commit 92ceb166e0
5 changed files with 106 additions and 14 deletions

View File

@ -16,6 +16,7 @@ go_library(
"//pkg/api/v1/endpoints:go_default_library", "//pkg/api/v1/endpoints:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library", "//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",

View File

@ -19,6 +19,7 @@ package reconcilers
import ( import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1alpha1" discovery "k8s.io/api/discovery/v1alpha1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
@ -58,8 +59,8 @@ func (adapter *EndpointsAdapter) Get(namespace, name string, getOpts metav1.GetO
// returned. // returned.
func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) { func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(endpoints) endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(endpoints)
if err == nil && adapter.endpointSliceClient != nil { if err == nil {
_, err = adapter.ensureEndpointSliceFromEndpoints(namespace, endpoints) err = adapter.EnsureEndpointSliceFromEndpoints(namespace, endpoints)
} }
return endpoints, err return endpoints, err
} }
@ -69,27 +70,39 @@ func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endp
// updated. The updated Endpoints object or an error will be returned. // updated. The updated Endpoints object or an error will be returned.
func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) { func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(endpoints) endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(endpoints)
if err == nil && adapter.endpointSliceClient != nil { if err == nil {
_, err = adapter.ensureEndpointSliceFromEndpoints(namespace, endpoints) err = adapter.EnsureEndpointSliceFromEndpoints(namespace, endpoints)
} }
return endpoints, err return endpoints, err
} }
// ensureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource // EnsureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource
// and creates or updates a corresponding EndpointSlice. The EndpointSlice // and creates or updates a corresponding EndpointSlice if an endpointSliceClient
// and/or an error will be returned. // exists. An error will be returned if it fails to sync the EndpointSlice.
func (adapter *EndpointsAdapter) ensureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) (*discovery.EndpointSlice, error) { func (adapter *EndpointsAdapter) EnsureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) error {
if adapter.endpointSliceClient == nil {
return nil
}
endpointSlice := endpointSliceFromEndpoints(endpoints) endpointSlice := endpointSliceFromEndpoints(endpoints)
_, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(endpointSlice.Name, metav1.GetOptions{}) currentEndpointSlice, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(endpointSlice.Name, metav1.GetOptions{})
if err != nil { if err != nil {
if errors.IsNotFound(err) { if errors.IsNotFound(err) {
return adapter.endpointSliceClient.EndpointSlices(namespace).Create(endpointSlice) if _, err = adapter.endpointSliceClient.EndpointSlices(namespace).Create(endpointSlice); errors.IsAlreadyExists(err) {
err = nil
}
} }
return nil, err return err
} }
return adapter.endpointSliceClient.EndpointSlices(namespace).Update(endpointSlice) if apiequality.Semantic.DeepEqual(currentEndpointSlice.Endpoints, endpointSlice.Endpoints) &&
apiequality.Semantic.DeepEqual(currentEndpointSlice.Ports, endpointSlice.Ports) &&
apiequality.Semantic.DeepEqual(currentEndpointSlice.Labels, endpointSlice.Labels) {
return nil
}
_, err = adapter.endpointSliceClient.EndpointSlices(namespace).Update(endpointSlice)
return err
} }
// endpointSliceFromEndpoints generates an EndpointSlice from an Endpoints // endpointSliceFromEndpoints generates an EndpointSlice from an Endpoints

View File

@ -334,3 +334,81 @@ func generateEndpointsAndSlice(name, namespace string, ports []int, addresses []
Subsets: []corev1.EndpointSubset{subset}, Subsets: []corev1.EndpointSubset{subset},
}, epSlice }, epSlice
} }
func TestEndpointsAdapterEnsureEndpointSliceFromEndpoints(t *testing.T) {
endpoints1, epSlice1 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4"})
endpoints2, epSlice2 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"})
testCases := map[string]struct {
endpointSlicesEnabled bool
expectedError error
expectedEndpointSlice *discovery.EndpointSlice
endpointSlices []*discovery.EndpointSlice
namespaceParam string
endpointsParam *corev1.Endpoints
}{
"existing-endpointslice-no-change": {
endpointSlicesEnabled: true,
expectedError: nil,
expectedEndpointSlice: epSlice1,
endpointSlices: []*discovery.EndpointSlice{epSlice1},
namespaceParam: "testing",
endpointsParam: endpoints1,
},
"existing-endpointslice-change": {
endpointSlicesEnabled: true,
expectedError: nil,
expectedEndpointSlice: epSlice2,
endpointSlices: []*discovery.EndpointSlice{epSlice1},
namespaceParam: "testing",
endpointsParam: endpoints2,
},
"missing-endpointslice": {
endpointSlicesEnabled: true,
expectedError: nil,
expectedEndpointSlice: epSlice1,
endpointSlices: []*discovery.EndpointSlice{},
namespaceParam: "testing",
endpointsParam: endpoints1,
},
"endpointslices-disabled": {
endpointSlicesEnabled: false,
expectedError: nil,
expectedEndpointSlice: nil,
endpointSlices: []*discovery.EndpointSlice{},
namespaceParam: "testing",
endpointsParam: endpoints1,
},
}
for name, testCase := range testCases {
t.Run(name, func(t *testing.T) {
client := fake.NewSimpleClientset()
epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()}
if testCase.endpointSlicesEnabled {
epAdapter.endpointSliceClient = client.DiscoveryV1alpha1()
}
for _, endpointSlice := range testCase.endpointSlices {
_, err := client.DiscoveryV1alpha1().EndpointSlices(endpointSlice.Namespace).Create(endpointSlice)
if err != nil {
t.Fatalf("Error creating EndpointSlice: %v", err)
}
}
err := epAdapter.EnsureEndpointSliceFromEndpoints(testCase.namespaceParam, testCase.endpointsParam)
if !apiequality.Semantic.DeepEqual(testCase.expectedError, err) {
t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err)
}
endpointSlice, err := client.DiscoveryV1alpha1().EndpointSlices(testCase.namespaceParam).Get(testCase.endpointsParam.Name, metav1.GetOptions{})
if err != nil && !errors.IsNotFound(err) {
t.Fatalf("Error getting Endpoint Slice: %v", err)
}
if !apiequality.Semantic.DeepEqual(endpointSlice, testCase.expectedEndpointSlice) {
t.Errorf("Expected Endpoint Slice: %v, got: %v", testCase.expectedEndpointSlice, endpointSlice)
}
})
}
}

View File

@ -192,7 +192,7 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
// Next, we compare the current list of endpoints with the list of master IP keys // Next, we compare the current list of endpoints with the list of master IP keys
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts) formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
if formatCorrect && ipCorrect && portsCorrect { if formatCorrect && ipCorrect && portsCorrect {
return nil return r.epAdapter.EnsureEndpointSliceFromEndpoints(corev1.NamespaceDefault, e)
} }
if !formatCorrect { if !formatCorrect {

View File

@ -100,7 +100,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
return err return err
} }
if ipCorrect && portsCorrect { if ipCorrect && portsCorrect {
return nil return r.epAdapter.EnsureEndpointSliceFromEndpoints(metav1.NamespaceDefault, e)
} }
if !ipCorrect { if !ipCorrect {
// We *always* add our own IP address. // We *always* add our own IP address.