diff --git a/pkg/apis/core/v1/conversion.go b/pkg/apis/core/v1/conversion.go index 4ba49354b4a..9980be6eac5 100644 --- a/pkg/apis/core/v1/conversion.go +++ b/pkg/apis/core/v1/conversion.go @@ -97,6 +97,9 @@ func addConversionFuncs(scheme *runtime.Scheme) error { if err := AddFieldLabelConversionsForSecret(scheme); err != nil { return err } + if err := AddFieldLabelConversionsForService(scheme); err != nil { + return err + } return nil } @@ -488,6 +491,21 @@ func AddFieldLabelConversionsForSecret(scheme *runtime.Scheme) error { }) } +func AddFieldLabelConversionsForService(scheme *runtime.Scheme) error { + return scheme.AddFieldLabelConversionFunc(SchemeGroupVersion.WithKind("Service"), + func(label, value string) (string, string, error) { + switch label { + case "metadata.namespace", + "metadata.name", + "spec.clusterIP", + "spec.type": + return label, value, nil + default: + return "", "", fmt.Errorf("field label not supported: %s", label) + } + }) +} + var initContainerAnnotations = map[string]bool{ "pod.beta.kubernetes.io/init-containers": true, "pod.alpha.kubernetes.io/init-containers": true, diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 7c6d614e959..af74a095628 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -455,7 +455,11 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, var serviceLister corelisters.ServiceLister var serviceHasSynced cache.InformerSynced if kubeDeps.KubeClient != nil { - kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0) + // don't watch headless services, they are not needed since this informer is only used to create the environment variables for pods. + // See https://issues.k8s.io/122394 + kubeInformers := informers.NewSharedInformerFactoryWithOptions(kubeDeps.KubeClient, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", v1.ClusterIPNone).String() + })) serviceLister = kubeInformers.Core().V1().Services().Lister() serviceHasSynced = kubeInformers.Core().V1().Services().Informer().HasSynced kubeInformers.Start(wait.NeverStop) diff --git a/pkg/registry/core/service/storage/storage.go b/pkg/registry/core/service/storage/storage.go index 2433abf15e4..87b720534ca 100644 --- a/pkg/registry/core/service/storage/storage.go +++ b/pkg/registry/core/service/storage/storage.go @@ -88,6 +88,7 @@ func NewREST( store := &genericregistry.Store{ NewFunc: func() runtime.Object { return &api.Service{} }, NewListFunc: func() runtime.Object { return &api.ServiceList{} }, + PredicateFunc: svcreg.Matcher, DefaultQualifiedResource: api.Resource("services"), SingularQualifiedResource: api.Resource("service"), ReturnDeletedObject: true, @@ -99,7 +100,10 @@ func NewREST( TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)}, } - options := &generic.StoreOptions{RESTOptions: optsGetter} + options := &generic.StoreOptions{ + RESTOptions: optsGetter, + AttrFunc: svcreg.GetAttrs, + } if err := store.CompleteWithOptions(options); err != nil { return nil, nil, nil, err } diff --git a/pkg/registry/core/service/strategy.go b/pkg/registry/core/service/strategy.go index 6d35e6625ea..a5916b796bb 100644 --- a/pkg/registry/core/service/strategy.go +++ b/pkg/registry/core/service/strategy.go @@ -18,11 +18,16 @@ package service import ( "context" + "fmt" "reflect" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/apiserver/pkg/registry/generic" + pkgstorage "k8s.io/apiserver/pkg/storage" "k8s.io/apiserver/pkg/storage/names" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/kubernetes/pkg/api/legacyscheme" @@ -166,6 +171,34 @@ func (serviceStatusStrategy) WarningsOnUpdate(ctx context.Context, obj, old runt return nil } +// GetAttrs returns labels and fields of a given object for filtering purposes. +func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) { + service, ok := obj.(*api.Service) + if !ok { + return nil, nil, fmt.Errorf("not a service") + } + return service.Labels, SelectableFields(service), nil +} + +// Matcher returns a selection predicate for a given label and field selector. +func Matcher(label labels.Selector, field fields.Selector) pkgstorage.SelectionPredicate { + return pkgstorage.SelectionPredicate{ + Label: label, + Field: field, + GetAttrs: GetAttrs, + } +} + +// SelectableFields returns a field set that can be used for filter selection +func SelectableFields(service *api.Service) fields.Set { + objectMetaFieldsSet := generic.ObjectMetaFieldsSet(&service.ObjectMeta, true) + serviceSpecificFieldsSet := fields.Set{ + "spec.clusterIP": service.Spec.ClusterIP, + "spec.type": string(service.Spec.Type), + } + return generic.MergeFieldsSets(objectMetaFieldsSet, serviceSpecificFieldsSet) +} + // dropServiceStatusDisabledFields drops fields that are not used if their associated feature gates // are not enabled. The typical pattern is: // diff --git a/pkg/registry/core/service/strategy_test.go b/pkg/registry/core/service/strategy_test.go index f82a47a9c83..af74424e4ef 100644 --- a/pkg/registry/core/service/strategy_test.go +++ b/pkg/registry/core/service/strategy_test.go @@ -23,6 +23,8 @@ import ( "github.com/google/go-cmp/cmp" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" @@ -785,3 +787,159 @@ func TestDropTypeDependentFields(t *testing.T) { }) } } + +func TestMatchService(t *testing.T) { + testCases := []struct { + name string + in *api.Service + fieldSelector fields.Selector + expectMatch bool + }{ + { + name: "match on name", + in: &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, + }, + fieldSelector: fields.ParseSelectorOrDie("metadata.name=test"), + expectMatch: true, + }, + { + name: "match on namespace", + in: &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, + }, + fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=testns"), + expectMatch: true, + }, + { + name: "no match on name", + in: &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, + }, + fieldSelector: fields.ParseSelectorOrDie("metadata.name=nomatch"), + expectMatch: false, + }, + { + name: "no match on namespace", + in: &api.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, + }, + fieldSelector: fields.ParseSelectorOrDie("metadata.namespace=nomatch"), + expectMatch: false, + }, + { + name: "match on loadbalancer type service", + in: &api.Service{ + Spec: api.ServiceSpec{Type: api.ServiceTypeLoadBalancer}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.type=LoadBalancer"), + expectMatch: true, + }, + { + name: "no match on nodeport type service", + in: &api.Service{ + Spec: api.ServiceSpec{Type: api.ServiceTypeNodePort}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.type=LoadBalancer"), + expectMatch: false, + }, + { + name: "match on headless service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"), + expectMatch: true, + }, + { + name: "no match on clusterIP service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"), + expectMatch: false, + }, + { + name: "match on clusterIP service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"), + expectMatch: true, + }, + { + name: "match on non-headless service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=None"), + expectMatch: true, + }, + { + name: "match on any ClusterIP set service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: "192.168.1.1"}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP!=\"\""), + expectMatch: true, + }, + { + name: "match on clusterIP IPv6 service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: "2001:db2::1"}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"), + expectMatch: true, + }, + { + name: "no match on headless service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=192.168.1.1"), + expectMatch: false, + }, + { + name: "no match on headless service", + in: &api.Service{ + Spec: api.ServiceSpec{ClusterIP: api.ClusterIPNone}, + }, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=2001:db2::1"), + expectMatch: false, + }, + { + name: "no match on empty service", + in: &api.Service{}, + fieldSelector: fields.ParseSelectorOrDie("spec.clusterIP=None"), + expectMatch: false, + }, + } + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + m := Matcher(labels.Everything(), testCase.fieldSelector) + result, err := m.Matches(testCase.in) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if result != testCase.expectMatch { + t.Errorf("Result %v, Expected %v, Selector: %v, Service: %v", result, testCase.expectMatch, testCase.fieldSelector.String(), testCase.in) + } + }) + } +} diff --git a/test/integration/service/service_test.go b/test/integration/service/service_test.go index 7f67233449b..1aaf92beb9a 100644 --- a/test/integration/service/service_test.go +++ b/test/integration/service/service_test.go @@ -19,6 +19,7 @@ package service import ( "bytes" "context" + "encoding/json" "fmt" "testing" "time" @@ -26,11 +27,19 @@ import ( corev1 "k8s.io/api/core/v1" discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" + "k8s.io/client-go/util/retry" featuregatetesting "k8s.io/component-base/featuregate/testing" kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" "k8s.io/kubernetes/pkg/controller/endpointslice" @@ -38,6 +47,7 @@ import ( "k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/utils/format" "k8s.io/kubernetes/test/utils/ktesting" + "k8s.io/utils/ptr" ) // Test_ExternalNameServiceStopsDefaultingInternalTrafficPolicy tests that Services no longer default @@ -557,3 +567,382 @@ func Test_TransitionsForTrafficDistribution(t *testing.T) { } logsBuffer.Reset() } + +func Test_ServiceClusterIPSelector(t *testing.T) { + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer server.TearDownFn() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + ns := framework.CreateNamespaceOrDie(client, "test-external-name-drops-internal-traffic-policy", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + // create headless service + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-headless", + Namespace: ns.Name, + }, + Spec: corev1.ServiceSpec{ + ClusterIP: corev1.ClusterIPNone, + Type: corev1.ServiceTypeClusterIP, + Ports: []corev1.ServicePort{{ + Port: int32(80), + }}, + Selector: map[string]string{ + "foo": "bar", + }, + }, + } + + _, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating test service: %v", err) + } + + // informer to watch only non-headless services + kubeInformers := informers.NewSharedInformerFactoryWithOptions(client, 0, informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermNotEqualSelector("spec.clusterIP", corev1.ClusterIPNone).String() + })) + + serviceInformer := kubeInformers.Core().V1().Services().Informer() + serviceLister := kubeInformers.Core().V1().Services().Lister() + serviceHasSynced := serviceInformer.HasSynced + if _, err = serviceInformer.AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + svc := obj.(*corev1.Service) + t.Logf("Added Service %#v", svc) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + oldSvc := oldObj.(*corev1.Service) + newSvc := newObj.(*corev1.Service) + t.Logf("Updated Service %#v to %#v", oldSvc, newSvc) + }, + DeleteFunc: func(obj interface{}) { + svc := obj.(*corev1.Service) + t.Logf("Deleted Service %#v", svc) + }, + }, + ); err != nil { + t.Fatalf("Error adding service informer handler: %v", err) + } + kubeInformers.Start(ctx.Done()) + cache.WaitForCacheSync(ctx.Done(), serviceHasSynced) + svcs, err := serviceLister.List(labels.Everything()) + if err != nil { + t.Fatalf("Error listing services: %v", err) + } + // only the kubernetes.default service expected + if len(svcs) != 1 || svcs[0].Name != "kubernetes" { + t.Fatalf("expected 1 services, got %d", len(svcs)) + } + + // create a new service with ClusterIP + service2 := service.DeepCopy() + service2.Spec.ClusterIP = "" + service2.Name = "test-clusterip" + _, err = client.CoreV1().Services(ns.Name).Create(ctx, service2, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating test service: %v", err) + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + svc, err := serviceLister.Services(service2.Namespace).Get(service2.Name) + if svc == nil || err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatalf("Error waiting for test service test-clusterip: %v", err) + } + + // mutate the Service to drop the ClusterIP, theoretically ClusterIP is inmutable but ... + service.Spec.ExternalName = "test" + service.Spec.Type = corev1.ServiceTypeExternalName + _, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error creating test service: %v", err) + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + svc, err := serviceLister.Services(service.Namespace).Get(service.Name) + if svc == nil || err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatalf("Error waiting for test service without ClusterIP: %v", err) + } + + // mutate the Service to get the ClusterIP again + service.Spec.ExternalName = "" + service.Spec.ClusterIP = "" + service.Spec.Type = corev1.ServiceTypeClusterIP + _, err = client.CoreV1().Services(ns.Name).Update(ctx, service, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Error creating test service: %v", err) + } + + err = wait.PollUntilContextTimeout(ctx, 1*time.Second, 10*time.Second, true, func(ctx context.Context) (done bool, err error) { + svc, err := serviceLister.Services(service.Namespace).Get(service.Name) + if svc == nil || err != nil { + return false, nil + } + return true, nil + }) + if err != nil { + t.Fatalf("Error waiting for test service with ClusterIP: %v", err) + } +} + +// Repro https://github.com/kubernetes/kubernetes/issues/123853 +func Test_ServiceWatchUntil(t *testing.T) { + svcReadyTimeout := 30 * time.Second + + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer server.TearDownFn() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + client, err := clientset.NewForConfig(server.ClientConfig) + if err != nil { + t.Fatalf("Error creating clientset: %v", err) + } + + ns := framework.CreateNamespaceOrDie(client, "test-service-watchuntil", t) + defer framework.DeleteNamespaceOrDie(client, ns, t) + + testSvcName := "test-service-" + utilrand.String(5) + testSvcLabels := map[string]string{"test-service-static": "true"} + testSvcLabelsFlat := "test-service-static=true" + + w := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + options.LabelSelector = testSvcLabelsFlat + return client.CoreV1().Services(ns.Name).Watch(ctx, options) + }, + } + + svcList, err := client.CoreV1().Services("").List(ctx, metav1.ListOptions{LabelSelector: testSvcLabelsFlat}) + if err != nil { + t.Fatalf("failed to list Services: %v", err) + } + // create service + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: testSvcName, + Labels: testSvcLabels, + }, + Spec: corev1.ServiceSpec{ + Type: "LoadBalancer", + Ports: []corev1.ServicePort{{ + Name: "http", + Protocol: corev1.ProtocolTCP, + Port: int32(80), + TargetPort: intstr.FromInt32(80), + }}, + LoadBalancerClass: ptr.To[string]("example.com/internal-vip"), + }, + } + _, err = client.CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Error creating test service: %v", err) + } + + ctxUntil, cancel := context.WithTimeout(ctx, svcReadyTimeout) + defer cancel() + _, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) { + if svc, ok := event.Object.(*corev1.Service); ok { + found := svc.ObjectMeta.Name == service.ObjectMeta.Name && + svc.ObjectMeta.Namespace == ns.Name && + svc.Labels["test-service-static"] == "true" + if !found { + t.Logf("observed Service %v in namespace %v with labels: %v & ports %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Spec.Ports) + return false, nil + } + t.Logf("Found Service %v in namespace %v with labels: %v & ports %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Spec.Ports) + return found, nil + } + t.Logf("Observed event: %+v", event.Object) + return false, nil + }) + if err != nil { + t.Fatalf("Error service not found: %v", err) + } + + t.Log("patching the ServiceStatus") + lbStatus := corev1.LoadBalancerStatus{ + Ingress: []corev1.LoadBalancerIngress{{IP: "203.0.113.1"}}, + } + lbStatusJSON, err := json.Marshal(lbStatus) + if err != nil { + t.Fatalf("Error marshalling status: %v", err) + } + _, err = client.CoreV1().Services(ns.Name).Patch(ctx, testSvcName, types.MergePatchType, + []byte(`{"metadata":{"annotations":{"patchedstatus":"true"}},"status":{"loadBalancer":`+string(lbStatusJSON)+`}}`), + metav1.PatchOptions{}, "status") + if err != nil { + t.Fatalf("Could not patch service status: %v", err) + } + + t.Log("watching for the Service to be patched") + ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout) + defer cancel() + + _, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) { + if svc, ok := event.Object.(*corev1.Service); ok { + found := svc.ObjectMeta.Name == service.ObjectMeta.Name && + svc.ObjectMeta.Namespace == ns.Name && + svc.Annotations["patchedstatus"] == "true" + if !found { + t.Logf("observed Service %v in namespace %v with annotations: %v & LoadBalancer: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer) + return false, nil + } + t.Logf("Found Service %v in namespace %v with annotations: %v & LoadBalancer: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer) + return found, nil + } + t.Logf("Observed event: %+v", event.Object) + return false, nil + }) + if err != nil { + t.Fatalf("failed to locate Service %v in namespace %v", service.ObjectMeta.Name, ns) + } + t.Logf("Service %s has service status patched", testSvcName) + + t.Log("updating the ServiceStatus") + + var statusToUpdate, updatedStatus *corev1.Service + err = retry.RetryOnConflict(retry.DefaultRetry, func() error { + statusToUpdate, err = client.CoreV1().Services(ns.Name).Get(ctx, testSvcName, metav1.GetOptions{}) + if err != nil { + return err + } + + statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, metav1.Condition{ + Type: "StatusUpdate", + Status: metav1.ConditionTrue, + Reason: "E2E", + Message: "Set from e2e test", + }) + + updatedStatus, err = client.CoreV1().Services(ns.Name).UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{}) + return err + }) + if err != nil { + t.Fatalf("\n\n Failed to UpdateStatus. %v\n\n", err) + } + t.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions) + + t.Log("watching for the Service to be updated") + ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout) + defer cancel() + _, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) { + if svc, ok := event.Object.(*corev1.Service); ok { + found := svc.ObjectMeta.Name == service.ObjectMeta.Name && + svc.ObjectMeta.Namespace == ns.Name && + svc.Annotations["patchedstatus"] == "true" + if !found { + t.Logf("Observed Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer) + return false, nil + } + for _, cond := range svc.Status.Conditions { + if cond.Type == "StatusUpdate" && + cond.Reason == "E2E" && + cond.Message == "Set from e2e test" { + t.Logf("Found Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.Conditions) + return found, nil + } else { + t.Logf("Observed Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer) + return false, nil + } + } + } + t.Logf("Observed event: %+v", event.Object) + return false, nil + }) + if err != nil { + t.Fatalf("failed to locate Service %v in namespace %v", service.ObjectMeta.Name, ns) + } + t.Logf("Service %s has service status updated", testSvcName) + + t.Log("patching the service") + servicePatchPayload, err := json.Marshal(corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "test-service": "patched", + }, + }, + }) + + _, err = client.CoreV1().Services(ns.Name).Patch(ctx, testSvcName, types.StrategicMergePatchType, []byte(servicePatchPayload), metav1.PatchOptions{}) + if err != nil { + t.Fatalf("failed to patch service. %v", err) + } + + t.Log("watching for the Service to be patched") + ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout) + defer cancel() + _, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) { + if svc, ok := event.Object.(*corev1.Service); ok { + found := svc.ObjectMeta.Name == service.ObjectMeta.Name && + svc.ObjectMeta.Namespace == ns.Name && + svc.Labels["test-service"] == "patched" + if !found { + t.Logf("observed Service %v in namespace %v with labels: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels) + return false, nil + } + t.Logf("Found Service %v in namespace %v with labels: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels) + return found, nil + } + t.Logf("Observed event: %+v", event.Object) + return false, nil + }) + if err != nil { + t.Fatalf("failed to locate Service %v in namespace %v", service.ObjectMeta.Name, ns) + } + + t.Logf("Service %s patched", testSvcName) + + t.Log("deleting the service") + err = client.CoreV1().Services(ns.Name).Delete(ctx, testSvcName, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("failed to delete the Service. %v", err) + } + + t.Log("watching for the Service to be deleted") + ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout) + defer cancel() + _, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) { + switch event.Type { + case watch.Deleted: + if svc, ok := event.Object.(*corev1.Service); ok { + found := svc.ObjectMeta.Name == service.ObjectMeta.Name && + svc.ObjectMeta.Namespace == ns.Name && + svc.Labels["test-service-static"] == "true" + if !found { + t.Logf("observed Service %v in namespace %v with labels: %v & annotations: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Annotations) + return false, nil + } + t.Logf("Found Service %v in namespace %v with labels: %v & annotations: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Annotations) + return found, nil + } + default: + t.Logf("Observed event: %+v", event.Type) + } + return false, nil + }) + if err != nil { + t.Fatalf("failed to delete Service %v in namespace %v", service.ObjectMeta.Name, ns) + } + t.Logf("Service %s deleted", testSvcName) +}