Merge pull request #98116 from aojea/mirror_dual

slice mirroring controller should mirror annotations (but endpoints.kubernetes.io/last-change-trigger-time annotation) and labels
This commit is contained in:
Kubernetes Prow Robot 2021-03-08 20:47:12 -08:00 committed by GitHub
commit 7eb99191a1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 462 additions and 25 deletions

View File

@ -22,6 +22,7 @@ import (
corev1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -196,9 +197,14 @@ func (r *reconciler) reconcileByPortMapping(
// if >0 existing slices, mark all but 1 for deletion.
slices.toDelete = existingSlices[1:]
// Return early if first slice matches desired endpoints.
// generated slices must mirror all endpoints annotations but EndpointsLastChangeTriggerTime
compareAnnotations := cloneAndRemoveKeys(endpoints.Annotations, corev1.EndpointsLastChangeTriggerTime)
compareLabels := cloneAndRemoveKeys(existingSlices[0].Labels, discovery.LabelManagedBy, discovery.LabelServiceName)
// Return early if first slice matches desired endpoints, labels and annotations
totals = totalChanges(existingSlices[0], desiredSet)
if totals.added == 0 && totals.updated == 0 && totals.removed == 0 {
if totals.added == 0 && totals.updated == 0 && totals.removed == 0 &&
apiequality.Semantic.DeepEqual(endpoints.Labels, compareLabels) &&
apiequality.Semantic.DeepEqual(compareAnnotations, existingSlices[0].Annotations) {
return slices, totals
}
}

View File

@ -43,6 +43,7 @@ func TestReconcile(t *testing.T) {
testCases := []struct {
testName string
subsets []corev1.EndpointSubset
epLabels map[string]string
endpointsDeletionPending bool
maxEndpointsPerSubset int32
existingEndpointSlices []*discovery.EndpointSlice
@ -105,6 +106,102 @@ func TestReconcile(t *testing.T) {
existingEndpointSlices: []*discovery.EndpointSlice{},
expectedNumSlices: 0,
expectedClientActions: 0,
}, {
testName: "Endpoints with 1 subset, port, and address and existing slice with same fields",
subsets: []corev1.EndpointSubset{{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
}},
}},
existingEndpointSlices: []*discovery.EndpointSlice{{
ObjectMeta: metav1.ObjectMeta{
Name: "test-ep-1",
},
AddressType: discovery.AddressTypeIPv4,
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr("http"),
Port: utilpointer.Int32Ptr(80),
Protocol: &protoTCP,
}},
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.0.0.1"},
Hostname: utilpointer.StringPtr("pod-1"),
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
}},
}},
expectedNumSlices: 1,
expectedClientActions: 0,
}, {
testName: "Endpoints with 1 subset, port, and address and existing slice with an additional annotation",
subsets: []corev1.EndpointSubset{{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
}},
}},
existingEndpointSlices: []*discovery.EndpointSlice{{
ObjectMeta: metav1.ObjectMeta{
Name: "test-ep-1",
Annotations: map[string]string{"foo": "bar"},
},
AddressType: discovery.AddressTypeIPv4,
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr("http"),
Port: utilpointer.Int32Ptr(80),
Protocol: &protoTCP,
}},
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.0.0.1"},
Hostname: utilpointer.StringPtr("pod-1"),
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
}},
}},
expectedNumSlices: 1,
expectedClientActions: 1,
}, {
testName: "Endpoints with 1 subset, port, label and address and existing slice with same fields but the label",
subsets: []corev1.EndpointSubset{{
Ports: []corev1.EndpointPort{{
Name: "http",
Port: 80,
Protocol: corev1.ProtocolTCP,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
Hostname: "pod-1",
}},
}},
epLabels: map[string]string{"foo": "bar"},
existingEndpointSlices: []*discovery.EndpointSlice{{
ObjectMeta: metav1.ObjectMeta{
Name: "test-ep-1",
Annotations: map[string]string{"foo": "bar"},
},
AddressType: discovery.AddressTypeIPv4,
Ports: []discovery.EndpointPort{{
Name: utilpointer.StringPtr("http"),
Port: utilpointer.Int32Ptr(80),
Protocol: &protoTCP,
}},
Endpoints: []discovery.Endpoint{{
Addresses: []string{"10.0.0.1"},
Hostname: utilpointer.StringPtr("pod-1"),
Conditions: discovery.EndpointConditions{Ready: utilpointer.BoolPtr(true)},
}},
}},
expectedNumSlices: 1,
expectedClientActions: 1,
}, {
testName: "Endpoints with 1 subset, 2 ports, and 2 addresses",
subsets: []corev1.EndpointSubset{{
@ -641,7 +738,7 @@ func TestReconcile(t *testing.T) {
setupMetrics()
namespace := "test"
endpoints := corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{Name: "test-ep", Namespace: namespace},
ObjectMeta: metav1.ObjectMeta{Name: "test-ep", Namespace: namespace, Labels: tc.epLabels},
Subsets: tc.subsets,
}

View File

@ -69,6 +69,7 @@ func newEndpointSlice(endpoints *corev1.Endpoints, ports []discovery.EndpointPor
epSlice := &discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{},
Annotations: map[string]string{},
OwnerReferences: []metav1.OwnerReference{*ownerRef},
Namespace: endpoints.Namespace,
},
@ -77,13 +78,23 @@ func newEndpointSlice(endpoints *corev1.Endpoints, ports []discovery.EndpointPor
Endpoints: []discovery.Endpoint{},
}
// clone all labels
for label, val := range endpoints.Labels {
epSlice.Labels[label] = val
}
// overwrite specific labels
epSlice.Labels[discovery.LabelServiceName] = endpoints.Name
epSlice.Labels[discovery.LabelManagedBy] = controllerName
// clone all annotations but EndpointsLastChangeTriggerTime
for annotation, val := range endpoints.Annotations {
if annotation == corev1.EndpointsLastChangeTriggerTime {
continue
}
epSlice.Annotations[annotation] = val
}
if sliceName == "" {
epSlice.GenerateName = getEndpointSlicePrefix(endpoints.Name)
} else {
@ -228,3 +239,22 @@ func hasLeaderElection(annotations map[string]string) bool {
_, ok := annotations[resourcelock.LeaderElectionRecordAnnotationKey]
return ok
}
// cloneAndRemoveKeys is a copy of CloneAndRemoveLabels
// it is used here for annotations and labels
func cloneAndRemoveKeys(a map[string]string, keys ...string) map[string]string {
if len(keys) == 0 {
// Don't need to remove a key.
return a
}
// Clone.
newMap := map[string]string{}
for k, v := range a {
newMap[k] = v
}
// remove keys
for _, key := range keys {
delete(newMap, key)
}
return newMap
}

View File

@ -21,6 +21,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -38,42 +39,153 @@ func TestNewEndpointSlice(t *testing.T) {
ports := []discovery.EndpointPort{{Name: &portName, Protocol: &protocol}}
addrType := discovery.AddressTypeIPv4
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Endpoints"}
endpoints := v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "foo",
Namespace: "test",
Labels: map[string]string{"foo": "bar"},
},
Subsets: []v1.EndpointSubset{{
Ports: []v1.EndpointPort{{Port: 80}},
}},
}
gvk := schema.GroupVersionKind{Version: "v1", Kind: "Endpoints"}
ownerRef := metav1.NewControllerRef(&endpoints, gvk)
expectedSlice := discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
discovery.LabelServiceName: endpoints.Name,
discovery.LabelManagedBy: controllerName,
testCases := []struct {
name string
tweakEndpoint func(ep *corev1.Endpoints)
expectedSlice discovery.EndpointSlice
}{
{
name: "create slice from endpoints",
tweakEndpoint: func(ep *corev1.Endpoints) {
},
expectedSlice: discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
discovery.LabelServiceName: endpoints.Name,
discovery.LabelManagedBy: controllerName,
},
Annotations: map[string]string{},
GenerateName: fmt.Sprintf("%s-", endpoints.Name),
Namespace: endpoints.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
},
Ports: ports,
AddressType: addrType,
Endpoints: []discovery.Endpoint{},
},
},
{
name: "create slice from endpoints with annotations",
tweakEndpoint: func(ep *corev1.Endpoints) {
annotations := map[string]string{"foo": "bar"}
ep.Annotations = annotations
},
expectedSlice: discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
discovery.LabelServiceName: endpoints.Name,
discovery.LabelManagedBy: controllerName,
},
Annotations: map[string]string{"foo": "bar"},
GenerateName: fmt.Sprintf("%s-", endpoints.Name),
Namespace: endpoints.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
},
Ports: ports,
AddressType: addrType,
Endpoints: []discovery.Endpoint{},
},
},
{
name: "create slice from endpoints with labels",
tweakEndpoint: func(ep *corev1.Endpoints) {
labels := map[string]string{"foo": "bar"}
ep.Labels = labels
},
expectedSlice: discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
discovery.LabelServiceName: endpoints.Name,
discovery.LabelManagedBy: controllerName,
},
Annotations: map[string]string{},
GenerateName: fmt.Sprintf("%s-", endpoints.Name),
Namespace: endpoints.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
},
Ports: ports,
AddressType: addrType,
Endpoints: []discovery.Endpoint{},
},
},
{
name: "create slice from endpoints with labels and annotations",
tweakEndpoint: func(ep *corev1.Endpoints) {
labels := map[string]string{"foo": "bar"}
ep.Labels = labels
annotations := map[string]string{"foo2": "bar2"}
ep.Annotations = annotations
},
expectedSlice: discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
discovery.LabelServiceName: endpoints.Name,
discovery.LabelManagedBy: controllerName,
},
Annotations: map[string]string{"foo2": "bar2"},
GenerateName: fmt.Sprintf("%s-", endpoints.Name),
Namespace: endpoints.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
},
Ports: ports,
AddressType: addrType,
Endpoints: []discovery.Endpoint{},
},
},
{
name: "create slice from endpoints with labels and annotations triggertime",
tweakEndpoint: func(ep *corev1.Endpoints) {
labels := map[string]string{"foo": "bar"}
ep.Labels = labels
annotations := map[string]string{
"foo2": "bar2",
corev1.EndpointsLastChangeTriggerTime: "date",
}
ep.Annotations = annotations
},
expectedSlice: discovery.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
discovery.LabelServiceName: endpoints.Name,
discovery.LabelManagedBy: controllerName,
},
Annotations: map[string]string{"foo2": "bar2"},
GenerateName: fmt.Sprintf("%s-", endpoints.Name),
Namespace: endpoints.Namespace,
OwnerReferences: []metav1.OwnerReference{*ownerRef},
},
Ports: ports,
AddressType: addrType,
Endpoints: []discovery.Endpoint{},
},
GenerateName: fmt.Sprintf("%s-", endpoints.Name),
OwnerReferences: []metav1.OwnerReference{*ownerRef},
Namespace: endpoints.Namespace,
},
Ports: ports,
AddressType: addrType,
Endpoints: []discovery.Endpoint{},
}
generatedSlice := newEndpointSlice(&endpoints, ports, addrType, "")
assert.EqualValues(t, expectedSlice, *generatedSlice)
if len(endpoints.Labels) > 1 {
t.Errorf("Expected Endpoints labels to not be modified, got %+v", endpoints.Labels)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ep := endpoints.DeepCopy()
tc.tweakEndpoint(ep)
generatedSlice := newEndpointSlice(ep, ports, addrType, "")
assert.EqualValues(t, tc.expectedSlice, *generatedSlice)
if len(endpoints.Labels) > 1 {
t.Errorf("Expected Endpoints labels to not be modified, got %+v", endpoints.Labels)
}
})
}
}

View File

@ -19,11 +19,14 @@ package endpointslice
import (
"context"
"fmt"
"sort"
"testing"
"time"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
@ -194,7 +197,7 @@ func TestEndpointSliceMirroring(t *testing.T) {
err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
lSelector := discovery.LabelServiceName + "=" + resourceName
esList, err := client.DiscoveryV1beta1().EndpointSlices(ns.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector})
esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector})
if err != nil {
t.Logf("Error listing EndpointSlices: %v", err)
return false, err
@ -228,3 +231,192 @@ func TestEndpointSliceMirroring(t *testing.T) {
}
}
func TestEndpointSliceMirroringUpdates(t *testing.T) {
masterConfig := framework.NewIntegrationTestMasterConfig()
_, server, closeFn := framework.RunAMaster(masterConfig)
defer closeFn()
config := restclient.Config{Host: server.URL}
client, err := clientset.NewForConfig(&config)
if err != nil {
t.Fatalf("Error creating clientset: %v", err)
}
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(client, resyncPeriod)
epsmController := endpointslicemirroring.NewController(
informers.Core().V1().Endpoints(),
informers.Discovery().V1().EndpointSlices(),
informers.Core().V1().Services(),
int32(100),
client,
1*time.Second)
// Start informer and controllers
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
go epsmController.Run(1, stopCh)
testCases := []struct {
testName string
tweakEndpoint func(ep *corev1.Endpoints)
}{
{
testName: "Update labels",
tweakEndpoint: func(ep *corev1.Endpoints) {
ep.Labels["foo"] = "bar"
},
},
{
testName: "Update annotations",
tweakEndpoint: func(ep *corev1.Endpoints) {
ep.Annotations["foo2"] = "bar2"
},
},
{
testName: "Update annotations but triggertime",
tweakEndpoint: func(ep *corev1.Endpoints) {
ep.Annotations["foo2"] = "bar2"
ep.Annotations[corev1.EndpointsLastChangeTriggerTime] = "date"
},
},
{
testName: "Update addresses",
tweakEndpoint: func(ep *corev1.Endpoints) {
ep.Subsets[0].Addresses = []v1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "1.2.3.6"}}
},
},
}
for i, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
ns := framework.CreateTestingNamespace(fmt.Sprintf("test-endpointslice-mirroring-%d", i), server, t)
defer framework.DeleteTestingNamespace(ns, server, t)
service := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
Namespace: ns.Name,
},
Spec: corev1.ServiceSpec{
Ports: []corev1.ServicePort{{
Port: int32(80),
}},
},
}
customEndpoints := &corev1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: "test-123",
Namespace: ns.Name,
Labels: map[string]string{},
Annotations: map[string]string{},
},
Subsets: []corev1.EndpointSubset{{
Ports: []corev1.EndpointPort{{
Port: 80,
}},
Addresses: []corev1.EndpointAddress{{
IP: "10.0.0.1",
}},
}},
}
_, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating service: %v", err)
}
_, err = client.CoreV1().Endpoints(ns.Name).Create(context.TODO(), customEndpoints, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Error creating endpoints: %v", err)
}
// update endpoint
tc.tweakEndpoint(customEndpoints)
_, err = client.CoreV1().Endpoints(ns.Name).Update(context.TODO(), customEndpoints, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("Error updating endpoints: %v", err)
}
// verify the endpoint updates were mirrored
err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
lSelector := discovery.LabelServiceName + "=" + service.Name
esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector})
if err != nil {
t.Logf("Error listing EndpointSlices: %v", err)
return false, err
}
if len(esList.Items) == 0 {
t.Logf("Waiting for EndpointSlice to be created")
return false, nil
}
for _, endpointSlice := range esList.Items {
if endpointSlice.Labels[discovery.LabelManagedBy] != "endpointslicemirroring-controller.k8s.io" {
return false, fmt.Errorf("Expected EndpointSlice to be managed by endpointslicemirroring-controller.k8s.io, got %s", endpointSlice.Labels[discovery.LabelManagedBy])
}
// compare addresses
epAddresses := []string{}
for _, address := range customEndpoints.Subsets[0].Addresses {
epAddresses = append(epAddresses, address.IP)
}
sliceAddresses := []string{}
for _, sliceEndpoint := range endpointSlice.Endpoints {
sliceAddresses = append(sliceAddresses, sliceEndpoint.Addresses...)
}
sort.Strings(epAddresses)
sort.Strings(sliceAddresses)
if !apiequality.Semantic.DeepEqual(epAddresses, sliceAddresses) {
t.Logf("Expected EndpointSlice to have the same IP addresses, expected %v got %v", epAddresses, sliceAddresses)
return false, nil
}
// check labels were mirrored
if !isSubset(customEndpoints.Labels, endpointSlice.Labels) {
t.Logf("Expected EndpointSlice to mirror labels, expected %v to be in received %v", customEndpoints.Labels, endpointSlice.Labels)
return false, nil
}
// check annotations but endpoints.kubernetes.io/last-change-trigger-time were mirrored
annotations := map[string]string{}
for k, v := range customEndpoints.Annotations {
if k == corev1.EndpointsLastChangeTriggerTime {
continue
}
annotations[k] = v
}
if !apiequality.Semantic.DeepEqual(annotations, endpointSlice.Annotations) {
t.Logf("Expected EndpointSlice to mirror annotations, expected %v received %v", customEndpoints.Annotations, endpointSlice.Annotations)
return false, nil
}
}
return true, nil
})
if err != nil {
t.Fatalf("Timed out waiting for conditions: %v", err)
}
})
}
}
// isSubset check if all the elements in a exist in b
func isSubset(a, b map[string]string) bool {
if len(a) > len(b) {
return false
}
for k, v1 := range a {
if v2, ok := b[k]; !ok || v1 != v2 {
return false
}
}
return true
}