mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-20 09:05:26 +00:00
Merge pull request #84421 from tnqn/missing-endpointslice
Ensure EndpointSlice exist if Endpoint is found
This commit is contained in:
commit
4002e4c7e7
@ -16,6 +16,7 @@ go_library(
|
|||||||
"//pkg/api/v1/endpoints:go_default_library",
|
"//pkg/api/v1/endpoints:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
|
"//staging/src/k8s.io/api/discovery/v1alpha1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
@ -19,6 +19,7 @@ package reconcilers
|
|||||||
import (
|
import (
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
discovery "k8s.io/api/discovery/v1alpha1"
|
discovery "k8s.io/api/discovery/v1alpha1"
|
||||||
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||||
@ -58,8 +59,8 @@ func (adapter *EndpointsAdapter) Get(namespace, name string, getOpts metav1.GetO
|
|||||||
// returned.
|
// returned.
|
||||||
func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
|
func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
|
||||||
endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(endpoints)
|
endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(endpoints)
|
||||||
if err == nil && adapter.endpointSliceClient != nil {
|
if err == nil {
|
||||||
_, err = adapter.ensureEndpointSliceFromEndpoints(namespace, endpoints)
|
err = adapter.EnsureEndpointSliceFromEndpoints(namespace, endpoints)
|
||||||
}
|
}
|
||||||
return endpoints, err
|
return endpoints, err
|
||||||
}
|
}
|
||||||
@ -69,27 +70,39 @@ func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endp
|
|||||||
// updated. The updated Endpoints object or an error will be returned.
|
// updated. The updated Endpoints object or an error will be returned.
|
||||||
func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
|
func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
|
||||||
endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(endpoints)
|
endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(endpoints)
|
||||||
if err == nil && adapter.endpointSliceClient != nil {
|
if err == nil {
|
||||||
_, err = adapter.ensureEndpointSliceFromEndpoints(namespace, endpoints)
|
err = adapter.EnsureEndpointSliceFromEndpoints(namespace, endpoints)
|
||||||
}
|
}
|
||||||
return endpoints, err
|
return endpoints, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource
|
// EnsureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource
|
||||||
// and creates or updates a corresponding EndpointSlice. The EndpointSlice
|
// and creates or updates a corresponding EndpointSlice if an endpointSliceClient
|
||||||
// and/or an error will be returned.
|
// exists. An error will be returned if it fails to sync the EndpointSlice.
|
||||||
func (adapter *EndpointsAdapter) ensureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) (*discovery.EndpointSlice, error) {
|
func (adapter *EndpointsAdapter) EnsureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) error {
|
||||||
|
if adapter.endpointSliceClient == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
endpointSlice := endpointSliceFromEndpoints(endpoints)
|
endpointSlice := endpointSliceFromEndpoints(endpoints)
|
||||||
_, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(endpointSlice.Name, metav1.GetOptions{})
|
currentEndpointSlice, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(endpointSlice.Name, metav1.GetOptions{})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
return adapter.endpointSliceClient.EndpointSlices(namespace).Create(endpointSlice)
|
if _, err = adapter.endpointSliceClient.EndpointSlices(namespace).Create(endpointSlice); errors.IsAlreadyExists(err) {
|
||||||
|
err = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return adapter.endpointSliceClient.EndpointSlices(namespace).Update(endpointSlice)
|
if apiequality.Semantic.DeepEqual(currentEndpointSlice.Endpoints, endpointSlice.Endpoints) &&
|
||||||
|
apiequality.Semantic.DeepEqual(currentEndpointSlice.Ports, endpointSlice.Ports) &&
|
||||||
|
apiequality.Semantic.DeepEqual(currentEndpointSlice.Labels, endpointSlice.Labels) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = adapter.endpointSliceClient.EndpointSlices(namespace).Update(endpointSlice)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// endpointSliceFromEndpoints generates an EndpointSlice from an Endpoints
|
// endpointSliceFromEndpoints generates an EndpointSlice from an Endpoints
|
||||||
|
@ -334,3 +334,81 @@ func generateEndpointsAndSlice(name, namespace string, ports []int, addresses []
|
|||||||
Subsets: []corev1.EndpointSubset{subset},
|
Subsets: []corev1.EndpointSubset{subset},
|
||||||
}, epSlice
|
}, epSlice
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEndpointsAdapterEnsureEndpointSliceFromEndpoints(t *testing.T) {
|
||||||
|
endpoints1, epSlice1 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4"})
|
||||||
|
endpoints2, epSlice2 := generateEndpointsAndSlice("foo", "testing", []int{80, 443}, []string{"10.1.2.3", "10.1.2.4", "10.1.2.5"})
|
||||||
|
|
||||||
|
testCases := map[string]struct {
|
||||||
|
endpointSlicesEnabled bool
|
||||||
|
expectedError error
|
||||||
|
expectedEndpointSlice *discovery.EndpointSlice
|
||||||
|
endpointSlices []*discovery.EndpointSlice
|
||||||
|
namespaceParam string
|
||||||
|
endpointsParam *corev1.Endpoints
|
||||||
|
}{
|
||||||
|
"existing-endpointslice-no-change": {
|
||||||
|
endpointSlicesEnabled: true,
|
||||||
|
expectedError: nil,
|
||||||
|
expectedEndpointSlice: epSlice1,
|
||||||
|
endpointSlices: []*discovery.EndpointSlice{epSlice1},
|
||||||
|
namespaceParam: "testing",
|
||||||
|
endpointsParam: endpoints1,
|
||||||
|
},
|
||||||
|
"existing-endpointslice-change": {
|
||||||
|
endpointSlicesEnabled: true,
|
||||||
|
expectedError: nil,
|
||||||
|
expectedEndpointSlice: epSlice2,
|
||||||
|
endpointSlices: []*discovery.EndpointSlice{epSlice1},
|
||||||
|
namespaceParam: "testing",
|
||||||
|
endpointsParam: endpoints2,
|
||||||
|
},
|
||||||
|
"missing-endpointslice": {
|
||||||
|
endpointSlicesEnabled: true,
|
||||||
|
expectedError: nil,
|
||||||
|
expectedEndpointSlice: epSlice1,
|
||||||
|
endpointSlices: []*discovery.EndpointSlice{},
|
||||||
|
namespaceParam: "testing",
|
||||||
|
endpointsParam: endpoints1,
|
||||||
|
},
|
||||||
|
"endpointslices-disabled": {
|
||||||
|
endpointSlicesEnabled: false,
|
||||||
|
expectedError: nil,
|
||||||
|
expectedEndpointSlice: nil,
|
||||||
|
endpointSlices: []*discovery.EndpointSlice{},
|
||||||
|
namespaceParam: "testing",
|
||||||
|
endpointsParam: endpoints1,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for name, testCase := range testCases {
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
client := fake.NewSimpleClientset()
|
||||||
|
epAdapter := EndpointsAdapter{endpointClient: client.CoreV1()}
|
||||||
|
if testCase.endpointSlicesEnabled {
|
||||||
|
epAdapter.endpointSliceClient = client.DiscoveryV1alpha1()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, endpointSlice := range testCase.endpointSlices {
|
||||||
|
_, err := client.DiscoveryV1alpha1().EndpointSlices(endpointSlice.Namespace).Create(endpointSlice)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Error creating EndpointSlice: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := epAdapter.EnsureEndpointSliceFromEndpoints(testCase.namespaceParam, testCase.endpointsParam)
|
||||||
|
if !apiequality.Semantic.DeepEqual(testCase.expectedError, err) {
|
||||||
|
t.Errorf("Expected error: %v, got: %v", testCase.expectedError, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
endpointSlice, err := client.DiscoveryV1alpha1().EndpointSlices(testCase.namespaceParam).Get(testCase.endpointsParam.Name, metav1.GetOptions{})
|
||||||
|
if err != nil && !errors.IsNotFound(err) {
|
||||||
|
t.Fatalf("Error getting Endpoint Slice: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !apiequality.Semantic.DeepEqual(endpointSlice, testCase.expectedEndpointSlice) {
|
||||||
|
t.Errorf("Expected Endpoint Slice: %v, got: %v", testCase.expectedEndpointSlice, endpointSlice)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -192,7 +192,7 @@ func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts
|
|||||||
// Next, we compare the current list of endpoints with the list of master IP keys
|
// Next, we compare the current list of endpoints with the list of master IP keys
|
||||||
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
|
formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
|
||||||
if formatCorrect && ipCorrect && portsCorrect {
|
if formatCorrect && ipCorrect && portsCorrect {
|
||||||
return nil
|
return r.epAdapter.EnsureEndpointSliceFromEndpoints(corev1.NamespaceDefault, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !formatCorrect {
|
if !formatCorrect {
|
||||||
|
@ -100,7 +100,7 @@ func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, i
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if ipCorrect && portsCorrect {
|
if ipCorrect && portsCorrect {
|
||||||
return nil
|
return r.epAdapter.EnsureEndpointSliceFromEndpoints(metav1.NamespaceDefault, e)
|
||||||
}
|
}
|
||||||
if !ipCorrect {
|
if !ipCorrect {
|
||||||
// We *always* add our own IP address.
|
// We *always* add our own IP address.
|
||||||
|
Loading…
Reference in New Issue
Block a user