mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 14:07:14 +00:00
Fixing how EndpointSlice Mirroring handles Service selector transitions
This commit is contained in:
parent
c592bd40f2
commit
794f0cb7f1
@ -304,12 +304,11 @@ func (c *Controller) syncEndpoints(key string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// This means that if a Service transitions away from a nil selector, any
|
||||
// mirrored EndpointSlices will not be cleaned up. #91072 tracks this issue
|
||||
// for this controller along with the Endpoints and EndpointSlice
|
||||
// controllers.
|
||||
// If a selector is specified, clean up any mirrored slices.
|
||||
if svc.Spec.Selector != nil {
|
||||
return nil
|
||||
klog.V(4).Infof("%s/%s Service now has selector, cleaning up any mirrored EndpointSlices", namespace, name)
|
||||
c.endpointSliceTracker.DeleteService(namespace, name)
|
||||
return c.deleteMirroredSlices(namespace, name)
|
||||
}
|
||||
|
||||
endpointSlices, err := endpointSlicesMirroredForService(c.endpointSliceLister, namespace, name)
|
||||
@ -372,7 +371,7 @@ func (c *Controller) onServiceUpdate(prevObj, obj interface{}) {
|
||||
service := obj.(*v1.Service)
|
||||
prevService := prevObj.(*v1.Service)
|
||||
if service == nil || prevService == nil {
|
||||
utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Endpoints, got %T, %T", prevObj, obj))
|
||||
utilruntime.HandleError(fmt.Errorf("onServiceUpdate() expected type v1.Service, got %T, %T", prevObj, obj))
|
||||
return
|
||||
}
|
||||
if (service.Spec.Selector == nil) != (prevService.Spec.Selector == nil) {
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/kubernetes/pkg/controller/endpoint"
|
||||
@ -408,6 +409,150 @@ func TestEndpointSliceMirroringUpdates(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEndpointSliceMirroringSelectorTransition(t *testing.T) {
|
||||
controlPlaneConfig := framework.NewIntegrationTestControlPlaneConfig()
|
||||
_, server, closeFn := framework.RunAnAPIServer(controlPlaneConfig)
|
||||
defer closeFn()
|
||||
|
||||
config := restclient.Config{Host: server.URL}
|
||||
client, err := clientset.NewForConfig(&config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating clientset: %v", err)
|
||||
}
|
||||
|
||||
resyncPeriod := 12 * time.Hour
|
||||
informers := informers.NewSharedInformerFactory(client, resyncPeriod)
|
||||
|
||||
epsmController := endpointslicemirroring.NewController(
|
||||
informers.Core().V1().Endpoints(),
|
||||
informers.Discovery().V1().EndpointSlices(),
|
||||
informers.Core().V1().Services(),
|
||||
int32(100),
|
||||
client,
|
||||
1*time.Second)
|
||||
|
||||
// Start informer and controllers
|
||||
stopCh := make(chan struct{})
|
||||
defer close(stopCh)
|
||||
informers.Start(stopCh)
|
||||
go epsmController.Run(1, stopCh)
|
||||
|
||||
testCases := []struct {
|
||||
testName string
|
||||
startingSelector map[string]string
|
||||
startingMirroredSlices int
|
||||
endingSelector map[string]string
|
||||
endingMirroredSlices int
|
||||
}{
|
||||
{
|
||||
testName: "nil -> {foo: bar} selector",
|
||||
startingSelector: nil,
|
||||
startingMirroredSlices: 1,
|
||||
endingSelector: map[string]string{"foo": "bar"},
|
||||
endingMirroredSlices: 0,
|
||||
},
|
||||
{
|
||||
testName: "{foo: bar} -> nil selector",
|
||||
startingSelector: map[string]string{"foo": "bar"},
|
||||
startingMirroredSlices: 0,
|
||||
endingSelector: nil,
|
||||
endingMirroredSlices: 1,
|
||||
},
|
||||
{
|
||||
testName: "{} -> {foo: bar} selector",
|
||||
startingSelector: map[string]string{},
|
||||
startingMirroredSlices: 1,
|
||||
endingSelector: map[string]string{"foo": "bar"},
|
||||
endingMirroredSlices: 0,
|
||||
},
|
||||
{
|
||||
testName: "{foo: bar} -> {} selector",
|
||||
startingSelector: map[string]string{"foo": "bar"},
|
||||
startingMirroredSlices: 0,
|
||||
endingSelector: map[string]string{},
|
||||
endingMirroredSlices: 1,
|
||||
},
|
||||
}
|
||||
|
||||
for i, tc := range testCases {
|
||||
t.Run(tc.testName, func(t *testing.T) {
|
||||
ns := framework.CreateTestingNamespace(fmt.Sprintf("test-endpointslice-mirroring-%d", i), server, t)
|
||||
defer framework.DeleteTestingNamespace(ns, server, t)
|
||||
meta := metav1.ObjectMeta{Name: "test-123", Namespace: ns.Name}
|
||||
|
||||
service := &corev1.Service{
|
||||
ObjectMeta: meta,
|
||||
Spec: corev1.ServiceSpec{
|
||||
Ports: []corev1.ServicePort{{
|
||||
Port: int32(80),
|
||||
}},
|
||||
Selector: tc.startingSelector,
|
||||
},
|
||||
}
|
||||
|
||||
customEndpoints := &corev1.Endpoints{
|
||||
ObjectMeta: meta,
|
||||
Subsets: []corev1.EndpointSubset{{
|
||||
Ports: []corev1.EndpointPort{{
|
||||
Port: 80,
|
||||
}},
|
||||
Addresses: []corev1.EndpointAddress{{
|
||||
IP: "10.0.0.1",
|
||||
}},
|
||||
}},
|
||||
}
|
||||
|
||||
_, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating service: %v", err)
|
||||
}
|
||||
|
||||
_, err = client.CoreV1().Endpoints(ns.Name).Create(context.TODO(), customEndpoints, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating endpoints: %v", err)
|
||||
}
|
||||
|
||||
// verify the expected number of mirrored slices exist
|
||||
err = waitForMirroredSlices(t, client, ns.Name, service.Name, tc.startingMirroredSlices)
|
||||
if err != nil {
|
||||
t.Fatalf("Timed out waiting for initial mirrored slices to match expectations: %v", err)
|
||||
}
|
||||
|
||||
service.Spec.Selector = tc.endingSelector
|
||||
_, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), service, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatalf("Error updating service: %v", err)
|
||||
}
|
||||
|
||||
// verify the expected number of mirrored slices exist
|
||||
err = waitForMirroredSlices(t, client, ns.Name, service.Name, tc.endingMirroredSlices)
|
||||
if err != nil {
|
||||
t.Fatalf("Timed out waiting for final mirrored slices to match expectations: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func waitForMirroredSlices(t *testing.T, client *kubernetes.Clientset, nsName, svcName string, num int) error {
|
||||
t.Helper()
|
||||
return wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||
lSelector := discovery.LabelServiceName + "=" + svcName
|
||||
lSelector += "," + discovery.LabelManagedBy + "=endpointslicemirroring-controller.k8s.io"
|
||||
esList, err := client.DiscoveryV1().EndpointSlices(nsName).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector})
|
||||
if err != nil {
|
||||
t.Logf("Error listing EndpointSlices: %v", err)
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(esList.Items) != num {
|
||||
t.Logf("Expected %d slices to be mirrored, got %d", num, len(esList.Items))
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
})
|
||||
}
|
||||
|
||||
// isSubset check if all the elements in a exist in b
|
||||
func isSubset(a, b map[string]string) bool {
|
||||
if len(a) > len(b) {
|
||||
|
Loading…
Reference in New Issue
Block a user