mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
Merge pull request #94730 from robscott/endpointslice-service-fix
Ensuring EndpointSlices are recreated after Service recreation
This commit is contained in:
commit
97e4059092
@ -29,6 +29,8 @@ import (
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
@ -126,7 +128,7 @@ func TestSyncServiceWithSelector(t *testing.T) {
|
||||
ns := metav1.NamespaceDefault
|
||||
serviceName := "testing-1"
|
||||
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
||||
standardSyncService(t, esController, ns, serviceName, "true")
|
||||
standardSyncService(t, esController, ns, serviceName)
|
||||
expectActions(t, client.Actions(), 1, "create", "endpointslices")
|
||||
|
||||
sliceList, err := client.DiscoveryV1beta1().EndpointSlices(ns).List(context.TODO(), metav1.ListOptions{})
|
||||
@ -192,7 +194,7 @@ func TestSyncServicePodSelection(t *testing.T) {
|
||||
pod2.Labels["foo"] = "boo"
|
||||
esController.podStore.Add(pod2)
|
||||
|
||||
standardSyncService(t, esController, ns, "testing-1", "true")
|
||||
standardSyncService(t, esController, ns, "testing-1")
|
||||
expectActions(t, client.Actions(), 1, "create", "endpointslices")
|
||||
|
||||
// an endpoint slice should be created, it should only reference pod1 (not pod2)
|
||||
@ -211,12 +213,17 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
|
||||
client, esController := newController([]string{"node-1"}, time.Duration(0))
|
||||
ns := metav1.NamespaceDefault
|
||||
serviceName := "testing-1"
|
||||
service := createService(t, esController, ns, serviceName)
|
||||
|
||||
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
|
||||
ownerRef := metav1.NewControllerRef(service, gvk)
|
||||
|
||||
// 5 slices, 3 with matching labels for our service
|
||||
endpointSlices := []*discovery.EndpointSlice{{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "matching-1",
|
||||
Namespace: ns,
|
||||
Name: "matching-1",
|
||||
Namespace: ns,
|
||||
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
||||
Labels: map[string]string{
|
||||
discovery.LabelServiceName: serviceName,
|
||||
discovery.LabelManagedBy: controllerName,
|
||||
@ -225,8 +232,9 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
|
||||
AddressType: discovery.AddressTypeIPv4,
|
||||
}, {
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "matching-2",
|
||||
Namespace: ns,
|
||||
Name: "matching-2",
|
||||
Namespace: ns,
|
||||
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
||||
Labels: map[string]string{
|
||||
discovery.LabelServiceName: serviceName,
|
||||
discovery.LabelManagedBy: controllerName,
|
||||
@ -278,9 +286,9 @@ func TestSyncServiceEndpointSliceLabelSelection(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// +1 for extra action involved in Service creation before syncService call.
|
||||
numActionsBefore := len(client.Actions()) + 1
|
||||
standardSyncService(t, esController, ns, serviceName, "false")
|
||||
numActionsBefore := len(client.Actions())
|
||||
err := esController.syncService(fmt.Sprintf("%s/%s", ns, serviceName))
|
||||
assert.Nil(t, err, "Expected no error syncing service")
|
||||
|
||||
if len(client.Actions()) != numActionsBefore+2 {
|
||||
t.Errorf("Expected 2 more actions, got %d", len(client.Actions())-numActionsBefore)
|
||||
@ -784,21 +792,22 @@ func addPods(t *testing.T, esController *endpointSliceController, namespace stri
|
||||
}
|
||||
}
|
||||
|
||||
func standardSyncService(t *testing.T, esController *endpointSliceController, namespace, serviceName, managedBySetup string) {
|
||||
func standardSyncService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) {
|
||||
t.Helper()
|
||||
createService(t, esController, namespace, serviceName, managedBySetup)
|
||||
createService(t, esController, namespace, serviceName)
|
||||
|
||||
err := esController.syncService(fmt.Sprintf("%s/%s", namespace, serviceName))
|
||||
assert.Nil(t, err, "Expected no error syncing service")
|
||||
}
|
||||
|
||||
func createService(t *testing.T, esController *endpointSliceController, namespace, serviceName, managedBySetup string) *v1.Service {
|
||||
func createService(t *testing.T, esController *endpointSliceController, namespace, serviceName string) *v1.Service {
|
||||
t.Helper()
|
||||
service := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: serviceName,
|
||||
Namespace: namespace,
|
||||
CreationTimestamp: metav1.NewTime(time.Now()),
|
||||
UID: types.UID(namespace + "-" + serviceName),
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Ports: []v1.ServicePort{{TargetPort: intstr.FromInt(80)}},
|
||||
|
@ -70,7 +70,7 @@ func (r *reconciler) reconcile(service *corev1.Service, pods []*corev1.Pod, exis
|
||||
existingSlicesByPortMap := map[endpointutil.PortMapKey][]*discovery.EndpointSlice{}
|
||||
numExistingEndpoints := 0
|
||||
for _, existingSlice := range existingSlices {
|
||||
if existingSlice.AddressType == addressType {
|
||||
if existingSlice.AddressType == addressType && ownedBy(existingSlice, service) {
|
||||
epHash := endpointutil.NewPortMapKey(existingSlice.Ports)
|
||||
existingSlicesByPortMap[epHash] = append(existingSlicesByPortMap[epHash], existingSlice)
|
||||
numExistingEndpoints += len(existingSlice.Endpoints)
|
||||
@ -187,13 +187,15 @@ func (r *reconciler) finalize(
|
||||
}
|
||||
sliceToDelete := slicesToDelete[i]
|
||||
slice := slicesToCreate[len(slicesToCreate)-1]
|
||||
// Only update EndpointSlices that have the same AddressType as this
|
||||
// field is considered immutable. Since Services also consider IPFamily
|
||||
// immutable, the only case where this should matter will be the
|
||||
// migration from IP to IPv4 and IPv6 AddressTypes, where there's a
|
||||
// Only update EndpointSlices that are owned by this Service and have
|
||||
// the same AddressType. We need to avoid updating EndpointSlices that
|
||||
// are being garbage collected for an old Service with the same name.
|
||||
// The AddressType field is immutable. Since Services also consider
|
||||
// IPFamily immutable, the only case where this should matter will be
|
||||
// the migration from IP to IPv4 and IPv6 AddressTypes, where there's a
|
||||
// chance EndpointSlices with an IP AddressType would otherwise be
|
||||
// updated to IPv4 or IPv6 without this check.
|
||||
if sliceToDelete.AddressType == slice.AddressType {
|
||||
if sliceToDelete.AddressType == slice.AddressType && ownedBy(sliceToDelete, service) {
|
||||
slice.Name = sliceToDelete.Name
|
||||
slicesToCreate = slicesToCreate[:len(slicesToCreate)-1]
|
||||
slicesToUpdate = append(slicesToUpdate, slice)
|
||||
|
@ -30,6 +30,7 @@ import (
|
||||
discovery "k8s.io/api/discovery/v1beta1"
|
||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
@ -595,6 +596,73 @@ func TestReconcileEndpointSlicesReplaceDeprecated(t *testing.T) {
|
||||
cmc.Check(t)
|
||||
}
|
||||
|
||||
// In this test, we want to verify that a Service recreation will result in new
|
||||
// EndpointSlices being created.
|
||||
func TestReconcileEndpointSlicesRecreation(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
ownedByService bool
|
||||
expectChanges bool
|
||||
}{
|
||||
{
|
||||
name: "slice owned by Service",
|
||||
ownedByService: true,
|
||||
expectChanges: false,
|
||||
}, {
|
||||
name: "slice owned by other Service UID",
|
||||
ownedByService: false,
|
||||
expectChanges: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
client := newClientset()
|
||||
setupMetrics()
|
||||
namespace := "test"
|
||||
|
||||
svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
|
||||
slice := newEmptyEndpointSlice(1, namespace, endpointMeta, svc)
|
||||
|
||||
pod := newPod(1, namespace, true, 1)
|
||||
slice.Endpoints = append(slice.Endpoints, podToEndpoint(pod, &corev1.Node{}, &corev1.Service{Spec: corev1.ServiceSpec{}}))
|
||||
|
||||
if !tc.ownedByService {
|
||||
slice.OwnerReferences[0].UID = "different"
|
||||
}
|
||||
existingSlices := []*discovery.EndpointSlice{slice}
|
||||
createEndpointSlices(t, client, namespace, existingSlices)
|
||||
|
||||
cmc := newCacheMutationCheck(existingSlices)
|
||||
|
||||
numActionsBefore := len(client.Actions())
|
||||
r := newReconciler(client, []*corev1.Node{{ObjectMeta: metav1.ObjectMeta{Name: "node-1"}}}, defaultMaxEndpointsPerSlice)
|
||||
reconcileHelper(t, r, &svc, []*corev1.Pod{pod}, existingSlices, time.Now())
|
||||
|
||||
if tc.expectChanges {
|
||||
if len(client.Actions()) != numActionsBefore+2 {
|
||||
t.Fatalf("Expected 2 additional actions, got %d", len(client.Actions())-numActionsBefore)
|
||||
}
|
||||
|
||||
expectAction(t, client.Actions(), numActionsBefore, "create", "endpointslices")
|
||||
expectAction(t, client.Actions(), numActionsBefore+1, "delete", "endpointslices")
|
||||
|
||||
fetchedSlices := fetchEndpointSlices(t, client, namespace)
|
||||
|
||||
if len(fetchedSlices) != 1 {
|
||||
t.Fatalf("Expected 1 EndpointSlice to exist, got %d", len(fetchedSlices))
|
||||
}
|
||||
} else {
|
||||
if len(client.Actions()) != numActionsBefore {
|
||||
t.Errorf("Expected no additional actions, got %d", len(client.Actions())-numActionsBefore)
|
||||
}
|
||||
}
|
||||
// ensure cache mutation has not occurred
|
||||
cmc.Check(t)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// Named ports can map to different port numbers on different pods.
|
||||
// This test ensures that EndpointSlices are grouped correctly in that case.
|
||||
func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
|
||||
@ -818,15 +886,24 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) {
|
||||
namespace := "test"
|
||||
svc, endpointMeta := newServiceAndEndpointMeta("foo", namespace)
|
||||
svc.DeletionTimestamp = tc.deletionTimestamp
|
||||
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
|
||||
ownerRef := metav1.NewControllerRef(&svc, gvk)
|
||||
|
||||
esToCreate := &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "to-create"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "to-create",
|
||||
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
||||
},
|
||||
AddressType: endpointMeta.AddressType,
|
||||
Ports: endpointMeta.Ports,
|
||||
}
|
||||
|
||||
// Add EndpointSlice that can be updated.
|
||||
esToUpdate, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "to-update"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "to-update",
|
||||
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
||||
},
|
||||
AddressType: endpointMeta.AddressType,
|
||||
Ports: endpointMeta.Ports,
|
||||
}, metav1.CreateOptions{})
|
||||
@ -839,7 +916,10 @@ func TestReconcilerFinalizeSvcDeletionTimestamp(t *testing.T) {
|
||||
|
||||
// Add EndpointSlice that can be deleted.
|
||||
esToDelete, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(context.TODO(), &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "to-delete"},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "to-delete",
|
||||
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
||||
},
|
||||
AddressType: endpointMeta.AddressType,
|
||||
Ports: endpointMeta.Ports,
|
||||
}, metav1.CreateOptions{})
|
||||
|
@ -201,6 +201,17 @@ func objectRefPtrChanged(ref1, ref2 *corev1.ObjectReference) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// ownedBy returns true if the provided EndpointSlice is owned by the provided
|
||||
// Service.
|
||||
func ownedBy(endpointSlice *discovery.EndpointSlice, svc *corev1.Service) bool {
|
||||
for _, o := range endpointSlice.OwnerReferences {
|
||||
if o.UID == svc.UID && o.Kind == "Service" && o.APIVersion == "v1" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// getSliceToFill will return the EndpointSlice that will be closest to full
|
||||
// when numEndpoints are added. If no EndpointSlice can be found, a nil pointer
|
||||
// will be returned.
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/intstr"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
@ -455,7 +456,11 @@ func newServiceAndEndpointMeta(name, namespace string) (v1.Service, endpointMeta
|
||||
}
|
||||
|
||||
svc := v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: namespace},
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: namespace,
|
||||
UID: types.UID(namespace + "-" + name),
|
||||
},
|
||||
Spec: v1.ServiceSpec{
|
||||
Ports: []v1.ServicePort{{
|
||||
TargetPort: portNameIntStr,
|
||||
@ -477,10 +482,14 @@ func newServiceAndEndpointMeta(name, namespace string) (v1.Service, endpointMeta
|
||||
}
|
||||
|
||||
func newEmptyEndpointSlice(n int, namespace string, endpointMeta endpointMeta, svc v1.Service) *discovery.EndpointSlice {
|
||||
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Service"}
|
||||
ownerRef := metav1.NewControllerRef(&svc, gvk)
|
||||
|
||||
return &discovery.EndpointSlice{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s.%d", svc.Name, n),
|
||||
Namespace: namespace,
|
||||
Name: fmt.Sprintf("%s-%d", svc.Name, n),
|
||||
Namespace: namespace,
|
||||
OwnerReferences: []metav1.OwnerReference{*ownerRef},
|
||||
},
|
||||
Ports: endpointMeta.Ports,
|
||||
AddressType: endpointMeta.AddressType,
|
||||
|
Loading…
Reference in New Issue
Block a user