From c5147c91b886c25c90ed6db079e0ac56e02bc249 Mon Sep 17 00:00:00 2001 From: Antonio Ojea Date: Wed, 12 Jul 2023 22:05:42 +0000 Subject: [PATCH] controlplane: kubernetes.default controller stop polling the kubernetesservice controller is in charge of reconciling the kubernetes.default service with the first IP in the service CIDR range and port 443, it also maintains the Endpoints associated to the Service using the configure EndpointReconciler. Until now, the controller was creating the default namespace if it doesn't exist , and creating the kubernetes.default service if it doesn't exist too. However, it was polling the Service in each loop, with this change we reuse the apiserver informers to watch the Service instead of polling. It also removes the logic to create the default network namespace, since this is part of the systemnamespaces controller now. Change-Id: I70954f8e6309e7af8e4b749bf0752168f0ec2c42 Signed-off-by: Antonio Ojea --- .../kubernetesservice/controller.go | 38 ++--- .../kubernetesservice/controller_test.go | 161 ++++++++++-------- pkg/controlplane/instance.go | 2 +- 3 files changed, 113 insertions(+), 88 deletions(-) diff --git a/pkg/controlplane/controller/kubernetesservice/controller.go b/pkg/controlplane/controller/kubernetesservice/controller.go index 1c5ec0590b0..bfee9fa8555 100644 --- a/pkg/controlplane/controller/kubernetesservice/controller.go +++ b/pkg/controlplane/controller/kubernetesservice/controller.go @@ -31,7 +31,10 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/storage" + v1informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" + v1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controlplane/reconcilers" @@ -47,7 +50,9 @@ const ( type Controller struct { Config - client kubernetes.Interface + client kubernetes.Interface + serviceLister v1listers.ServiceLister + serviceSynced cache.InformerSynced lock sync.Mutex stopCh chan struct{} // closed by Stop() @@ -67,17 +72,23 @@ type Config struct { } // New returns a controller for watching the kubernetes service endpoints. -func New(config Config, client kubernetes.Interface) *Controller { +func New(config Config, client kubernetes.Interface, serviceInformer v1informers.ServiceInformer) *Controller { return &Controller{ - Config: config, - client: client, - stopCh: make(chan struct{}), + Config: config, + client: client, + serviceLister: serviceInformer.Lister(), + serviceSynced: serviceInformer.Informer().HasSynced, + stopCh: make(chan struct{}), } } // Start begins the core controller loops that must exist for bootstrapping // a cluster. func (c *Controller) Start(stopCh <-chan struct{}) { + if !cache.WaitForCacheSync(stopCh, c.serviceSynced) { + runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } // Reconcile during first run removing itself until server is ready. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https") if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil { @@ -153,20 +164,6 @@ func (c *Controller) Run(ch <-chan struct{}) { // UpdateKubernetesService attempts to update the default Kube service. func (c *Controller) UpdateKubernetesService(reconcile bool) error { // Update service & endpoint records. - // TODO: when it becomes possible to change this stuff, - // stop polling and start watching. - // TODO: add endpoints of all replicas, not just the elected master. - if _, err := c.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil { - if _, err := c.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: metav1.NamespaceDefault, - Namespace: "", - }, - }, metav1.CreateOptions{}); err != nil && !errors.IsAlreadyExists(err) { - return err - } - } - servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https") if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil { return err @@ -209,8 +206,9 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1. // CreateOrUpdateMasterServiceIfNeeded will create the specified service if it // doesn't already exist. func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error { - if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil { + if s, err := c.serviceLister.Services(metav1.NamespaceDefault).Get(serviceName); err == nil { // The service already exists. + // This path is no executed since 1.17 2a9a9fa, keeping it in case it needs to be revisited if reconcile { if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated { klog.Warningf("Resetting master service %q to %#v", serviceName, svc) diff --git a/pkg/controlplane/controller/kubernetesservice/controller_test.go b/pkg/controlplane/controller/kubernetesservice/controller_test.go index 2cf20f3a9ef..6e16eed46a9 100644 --- a/pkg/controlplane/controller/kubernetesservice/controller_test.go +++ b/pkg/controlplane/controller/kubernetesservice/controller_test.go @@ -19,12 +19,16 @@ package kubernetesservice import ( "reflect" "testing" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + v1informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/fake" + v1listers "k8s.io/client-go/listers/core/v1" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" netutils "k8s.io/utils/net" ) @@ -65,29 +69,33 @@ func TestCreateOrUpdateMasterService(t *testing.T) { }, } for _, test := range createTests { - master := Controller{} - fakeClient := fake.NewSimpleClientset() - master.client = fakeClient - master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) - creates := []core.CreateAction{} - for _, action := range fakeClient.Actions() { - if action.GetVerb() == "create" { - creates = append(creates, action.(core.CreateAction)) - } - } - if test.expectCreate != nil { - if len(creates) != 1 { - t.Errorf("case %q: unexpected creations: %v", test.testName, creates) - } else { - obj := creates[0].GetObject() - if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { - t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a) + t.Run(test.testName, func(t *testing.T) { + master := Controller{} + fakeClient := fake.NewSimpleClientset() + serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + master.serviceLister = v1listers.NewServiceLister(serviceStore) + master.client = fakeClient + master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) + creates := []core.CreateAction{} + for _, action := range fakeClient.Actions() { + if action.GetVerb() == "create" { + creates = append(creates, action.(core.CreateAction)) } } - } - if test.expectCreate == nil && len(creates) > 1 { - t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates) - } + if test.expectCreate != nil { + if len(creates) != 1 { + t.Errorf("case %q: unexpected creations: %v", test.testName, creates) + } else { + obj := creates[0].GetObject() + if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + } + if test.expectCreate == nil && len(creates) > 1 { + t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates) + } + }) } reconcileTests := []struct { @@ -347,32 +355,41 @@ func TestCreateOrUpdateMasterService(t *testing.T) { }, } for _, test := range reconcileTests { - master := Controller{} - fakeClient := fake.NewSimpleClientset(test.service) - master.client = fakeClient - err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true) - if err != nil { - t.Errorf("case %q: unexpected error: %v", test.testName, err) - } - updates := []core.UpdateAction{} - for _, action := range fakeClient.Actions() { - if action.GetVerb() == "update" { - updates = append(updates, action.(core.UpdateAction)) + t.Run(test.testName, func(t *testing.T) { + master := Controller{} + fakeClient := fake.NewSimpleClientset(test.service) + serviceInformer := v1informers.NewServiceInformer(fakeClient, metav1.NamespaceDefault, 12*time.Hour, cache.Indexers{}) + serviceStore := serviceInformer.GetIndexer() + err := serviceStore.Add(test.service) + if err != nil { + t.Fatalf("unexpected error adding service %v to the store: %v", test.service, err) } - } - if test.expectUpdate != nil { - if len(updates) != 1 { - t.Errorf("case %q: unexpected updates: %v", test.testName, updates) - } else { - obj := updates[0].GetObject() - if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { - t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + master.serviceLister = v1listers.NewServiceLister(serviceStore) + master.client = fakeClient + err = master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + updates := []core.UpdateAction{} + for _, action := range fakeClient.Actions() { + if action.GetVerb() == "update" { + updates = append(updates, action.(core.UpdateAction)) } } - } - if test.expectUpdate == nil && len(updates) > 0 { - t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates) - } + if test.expectUpdate != nil { + if len(updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, updates) + } else { + obj := updates[0].GetObject() + if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + } + if test.expectUpdate == nil && len(updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates) + } + }) } nonReconcileTests := []struct { @@ -406,31 +423,41 @@ func TestCreateOrUpdateMasterService(t *testing.T) { }, } for _, test := range nonReconcileTests { - master := Controller{} - fakeClient := fake.NewSimpleClientset(test.service) - master.client = fakeClient - err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) - if err != nil { - t.Errorf("case %q: unexpected error: %v", test.testName, err) - } - updates := []core.UpdateAction{} - for _, action := range fakeClient.Actions() { - if action.GetVerb() == "update" { - updates = append(updates, action.(core.UpdateAction)) + t.Run(test.testName, func(t *testing.T) { + master := Controller{} + fakeClient := fake.NewSimpleClientset(test.service) + master.client = fakeClient + serviceInformer := v1informers.NewServiceInformer(fakeClient, metav1.NamespaceDefault, 12*time.Hour, cache.Indexers{}) + serviceStore := serviceInformer.GetIndexer() + err := serviceStore.Add(test.service) + if err != nil { + t.Fatalf("unexpected error adding service %v to the store: %v", test.service, err) } - } - if test.expectUpdate != nil { - if len(updates) != 1 { - t.Errorf("case %q: unexpected updates: %v", test.testName, updates) - } else { - obj := updates[0].GetObject() - if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { - t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + master.serviceLister = v1listers.NewServiceLister(serviceStore) + + err = master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + updates := []core.UpdateAction{} + for _, action := range fakeClient.Actions() { + if action.GetVerb() == "update" { + updates = append(updates, action.(core.UpdateAction)) } } - } - if test.expectUpdate == nil && len(updates) > 0 { - t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates) - } + if test.expectUpdate != nil { + if len(updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, updates) + } else { + obj := updates[0].GetObject() + if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + } + if test.expectUpdate == nil && len(updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates) + } + }) } } diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 807ff6d69ce..396633f1da2 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -483,7 +483,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) ServicePort: c.ExtraConfig.APIServerServicePort, PublicServicePort: publicServicePort, KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort, - }, clientset) + }, clientset, c.ExtraConfig.VersionedInformers.Core().V1().Services()) m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error { kubernetesServiceCtrl.Start(hookContext.StopCh) return nil