diff --git a/pkg/controlplane/controller/kubernetesservice/controller.go b/pkg/controlplane/controller/kubernetesservice/controller.go index 1c5ec0590b0..bfee9fa8555 100644 --- a/pkg/controlplane/controller/kubernetesservice/controller.go +++ b/pkg/controlplane/controller/kubernetesservice/controller.go @@ -31,7 +31,10 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/storage" + v1informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" + v1listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controlplane/reconcilers" @@ -47,7 +50,9 @@ const ( type Controller struct { Config - client kubernetes.Interface + client kubernetes.Interface + serviceLister v1listers.ServiceLister + serviceSynced cache.InformerSynced lock sync.Mutex stopCh chan struct{} // closed by Stop() @@ -67,17 +72,23 @@ type Config struct { } // New returns a controller for watching the kubernetes service endpoints. -func New(config Config, client kubernetes.Interface) *Controller { +func New(config Config, client kubernetes.Interface, serviceInformer v1informers.ServiceInformer) *Controller { return &Controller{ - Config: config, - client: client, - stopCh: make(chan struct{}), + Config: config, + client: client, + serviceLister: serviceInformer.Lister(), + serviceSynced: serviceInformer.Informer().HasSynced, + stopCh: make(chan struct{}), } } // Start begins the core controller loops that must exist for bootstrapping // a cluster. func (c *Controller) Start(stopCh <-chan struct{}) { + if !cache.WaitForCacheSync(stopCh, c.serviceSynced) { + runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } // Reconcile during first run removing itself until server is ready. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https") if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil { @@ -153,20 +164,6 @@ func (c *Controller) Run(ch <-chan struct{}) { // UpdateKubernetesService attempts to update the default Kube service. func (c *Controller) UpdateKubernetesService(reconcile bool) error { // Update service & endpoint records. - // TODO: when it becomes possible to change this stuff, - // stop polling and start watching. - // TODO: add endpoints of all replicas, not just the elected master. - if _, err := c.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil { - if _, err := c.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: metav1.NamespaceDefault, - Namespace: "", - }, - }, metav1.CreateOptions{}); err != nil && !errors.IsAlreadyExists(err) { - return err - } - } - servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https") if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil { return err @@ -209,8 +206,9 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1. // CreateOrUpdateMasterServiceIfNeeded will create the specified service if it // doesn't already exist. func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error { - if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil { + if s, err := c.serviceLister.Services(metav1.NamespaceDefault).Get(serviceName); err == nil { // The service already exists. + // This path is no executed since 1.17 2a9a9fa, keeping it in case it needs to be revisited if reconcile { if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated { klog.Warningf("Resetting master service %q to %#v", serviceName, svc) diff --git a/pkg/controlplane/controller/kubernetesservice/controller_test.go b/pkg/controlplane/controller/kubernetesservice/controller_test.go index 2cf20f3a9ef..6e16eed46a9 100644 --- a/pkg/controlplane/controller/kubernetesservice/controller_test.go +++ b/pkg/controlplane/controller/kubernetesservice/controller_test.go @@ -19,12 +19,16 @@ package kubernetesservice import ( "reflect" "testing" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" + v1informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/fake" + v1listers "k8s.io/client-go/listers/core/v1" core "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" netutils "k8s.io/utils/net" ) @@ -65,29 +69,33 @@ func TestCreateOrUpdateMasterService(t *testing.T) { }, } for _, test := range createTests { - master := Controller{} - fakeClient := fake.NewSimpleClientset() - master.client = fakeClient - master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) - creates := []core.CreateAction{} - for _, action := range fakeClient.Actions() { - if action.GetVerb() == "create" { - creates = append(creates, action.(core.CreateAction)) - } - } - if test.expectCreate != nil { - if len(creates) != 1 { - t.Errorf("case %q: unexpected creations: %v", test.testName, creates) - } else { - obj := creates[0].GetObject() - if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { - t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a) + t.Run(test.testName, func(t *testing.T) { + master := Controller{} + fakeClient := fake.NewSimpleClientset() + serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + master.serviceLister = v1listers.NewServiceLister(serviceStore) + master.client = fakeClient + master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) + creates := []core.CreateAction{} + for _, action := range fakeClient.Actions() { + if action.GetVerb() == "create" { + creates = append(creates, action.(core.CreateAction)) } } - } - if test.expectCreate == nil && len(creates) > 1 { - t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates) - } + if test.expectCreate != nil { + if len(creates) != 1 { + t.Errorf("case %q: unexpected creations: %v", test.testName, creates) + } else { + obj := creates[0].GetObject() + if e, a := test.expectCreate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected create:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + } + if test.expectCreate == nil && len(creates) > 1 { + t.Errorf("case %q: no create expected, yet saw: %v", test.testName, creates) + } + }) } reconcileTests := []struct { @@ -347,32 +355,41 @@ func TestCreateOrUpdateMasterService(t *testing.T) { }, } for _, test := range reconcileTests { - master := Controller{} - fakeClient := fake.NewSimpleClientset(test.service) - master.client = fakeClient - err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true) - if err != nil { - t.Errorf("case %q: unexpected error: %v", test.testName, err) - } - updates := []core.UpdateAction{} - for _, action := range fakeClient.Actions() { - if action.GetVerb() == "update" { - updates = append(updates, action.(core.UpdateAction)) + t.Run(test.testName, func(t *testing.T) { + master := Controller{} + fakeClient := fake.NewSimpleClientset(test.service) + serviceInformer := v1informers.NewServiceInformer(fakeClient, metav1.NamespaceDefault, 12*time.Hour, cache.Indexers{}) + serviceStore := serviceInformer.GetIndexer() + err := serviceStore.Add(test.service) + if err != nil { + t.Fatalf("unexpected error adding service %v to the store: %v", test.service, err) } - } - if test.expectUpdate != nil { - if len(updates) != 1 { - t.Errorf("case %q: unexpected updates: %v", test.testName, updates) - } else { - obj := updates[0].GetObject() - if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { - t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + master.serviceLister = v1listers.NewServiceLister(serviceStore) + master.client = fakeClient + err = master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + updates := []core.UpdateAction{} + for _, action := range fakeClient.Actions() { + if action.GetVerb() == "update" { + updates = append(updates, action.(core.UpdateAction)) } } - } - if test.expectUpdate == nil && len(updates) > 0 { - t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates) - } + if test.expectUpdate != nil { + if len(updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, updates) + } else { + obj := updates[0].GetObject() + if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + } + if test.expectUpdate == nil && len(updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates) + } + }) } nonReconcileTests := []struct { @@ -406,31 +423,41 @@ func TestCreateOrUpdateMasterService(t *testing.T) { }, } for _, test := range nonReconcileTests { - master := Controller{} - fakeClient := fake.NewSimpleClientset(test.service) - master.client = fakeClient - err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) - if err != nil { - t.Errorf("case %q: unexpected error: %v", test.testName, err) - } - updates := []core.UpdateAction{} - for _, action := range fakeClient.Actions() { - if action.GetVerb() == "update" { - updates = append(updates, action.(core.UpdateAction)) + t.Run(test.testName, func(t *testing.T) { + master := Controller{} + fakeClient := fake.NewSimpleClientset(test.service) + master.client = fakeClient + serviceInformer := v1informers.NewServiceInformer(fakeClient, metav1.NamespaceDefault, 12*time.Hour, cache.Indexers{}) + serviceStore := serviceInformer.GetIndexer() + err := serviceStore.Add(test.service) + if err != nil { + t.Fatalf("unexpected error adding service %v to the store: %v", test.service, err) } - } - if test.expectUpdate != nil { - if len(updates) != 1 { - t.Errorf("case %q: unexpected updates: %v", test.testName, updates) - } else { - obj := updates[0].GetObject() - if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { - t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + master.serviceLister = v1listers.NewServiceLister(serviceStore) + + err = master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) + if err != nil { + t.Errorf("case %q: unexpected error: %v", test.testName, err) + } + updates := []core.UpdateAction{} + for _, action := range fakeClient.Actions() { + if action.GetVerb() == "update" { + updates = append(updates, action.(core.UpdateAction)) } } - } - if test.expectUpdate == nil && len(updates) > 0 { - t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates) - } + if test.expectUpdate != nil { + if len(updates) != 1 { + t.Errorf("case %q: unexpected updates: %v", test.testName, updates) + } else { + obj := updates[0].GetObject() + if e, a := test.expectUpdate.Spec, obj.(*corev1.Service).Spec; !reflect.DeepEqual(e, a) { + t.Errorf("case %q: expected update:\n%#v\ngot:\n%#v\n", test.testName, e, a) + } + } + } + if test.expectUpdate == nil && len(updates) > 0 { + t.Errorf("case %q: no update expected, yet saw: %v", test.testName, updates) + } + }) } } diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 807ff6d69ce..396633f1da2 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -483,7 +483,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) ServicePort: c.ExtraConfig.APIServerServicePort, PublicServicePort: publicServicePort, KubernetesServiceNodePort: c.ExtraConfig.KubernetesServiceNodePort, - }, clientset) + }, clientset, c.ExtraConfig.VersionedInformers.Core().V1().Services()) m.GenericAPIServer.AddPostStartHookOrDie("bootstrap-controller", func(hookContext genericapiserver.PostStartHookContext) error { kubernetesServiceCtrl.Start(hookContext.StopCh) return nil