diff --git a/pkg/controlplane/controller/kubernetesservice/controller.go b/pkg/controlplane/controller/kubernetesservice/controller.go index 8c72f06b3de..dff7bd8cb8b 100644 --- a/pkg/controlplane/controller/kubernetesservice/controller.go +++ b/pkg/controlplane/controller/kubernetesservice/controller.go @@ -21,28 +21,19 @@ import ( "fmt" "net" "net/http" - "sync" "time" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - utilnet "k8s.io/apimachinery/pkg/util/net" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/storage" - utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controlplane/reconcilers" - "k8s.io/kubernetes/pkg/features" - "k8s.io/kubernetes/pkg/registry/core/rangeallocation" - servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller" - portallocatorcontroller "k8s.io/kubernetes/pkg/registry/core/service/portallocator/controller" - "k8s.io/kubernetes/pkg/util/async" ) const ( @@ -54,27 +45,11 @@ const ( // provide the IP repair check on service IPs type Controller struct { Config - RangeRegistries - runner *async.Runner -} - -type RangeRegistries struct { - ServiceClusterIPRegistry rangeallocation.RangeRegistry - SecondaryServiceClusterIPRegistry rangeallocation.RangeRegistry - ServiceNodePortRegistry rangeallocation.RangeRegistry + client kubernetes.Interface } type Config struct { - Client kubernetes.Interface - Informers informers.SharedInformerFactory - - KubernetesService - ClusterIP - NodePort -} - -type KubernetesService struct { PublicIP net.IP EndpointReconciler reconcilers.EndpointReconciler @@ -87,32 +62,17 @@ type KubernetesService struct { KubernetesServiceNodePort int } -type ClusterIP struct { - ServiceClusterIPRange net.IPNet - SecondaryServiceClusterIPRange net.IPNet - ServiceClusterIPInterval time.Duration -} - -type NodePort struct { - ServiceNodePortInterval time.Duration - ServiceNodePortRange utilnet.PortRange -} - // New returns a controller for watching the kubernetes service endpoints. -func New(config Config, rangeRegistries RangeRegistries) (*Controller, error) { +func New(config Config, client kubernetes.Interface) *Controller { return &Controller{ - Config: config, - RangeRegistries: rangeRegistries, - }, nil + Config: config, + client: client, + } } // Start begins the core controller loops that must exist for bootstrapping // a cluster. -func (c *Controller) Start() { - if c.runner != nil { - return - } - +func (c *Controller) Start(stopCh <-chan struct{}) { // Reconcile during first run removing itself until server is ready. endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https") if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil { @@ -121,72 +81,11 @@ func (c *Controller) Start() { klog.Errorf("Error removing old endpoints from kubernetes service: %v", err) } - repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.Client.CoreV1(), c.Client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry) - - // We start both repairClusterIPs and repairNodePorts to ensure repair - // loops of ClusterIPs and NodePorts. - // We run both repair loops using RunUntil public interface. - // However, we want to fail liveness/readiness until the first - // successful repair loop, so we basically pass appropriate - // callbacks to RunUtil methods. - // Additionally, we ensure that we don't wait for it for longer - // than 1 minute for backward compatibility of failing the whole - // apiserver if we can't repair them. - wg := sync.WaitGroup{} - wg.Add(1) - - runRepairNodePorts := func(stopCh chan struct{}) { - repairNodePorts.RunUntil(wg.Done, stopCh) - } - - wg.Add(1) - var runRepairClusterIPs func(stopCh chan struct{}) - if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { - repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, - c.Client.CoreV1(), - c.Client.EventsV1(), - &c.ServiceClusterIPRange, - c.ServiceClusterIPRegistry, - &c.SecondaryServiceClusterIPRange, - c.SecondaryServiceClusterIPRegistry) - runRepairClusterIPs = func(stopCh chan struct{}) { - repairClusterIPs.RunUntil(wg.Done, stopCh) - } - } else { - repairClusterIPs := servicecontroller.NewRepairIPAddress(c.ServiceClusterIPInterval, - c.Client, - &c.ServiceClusterIPRange, - &c.SecondaryServiceClusterIPRange, - c.Informers.Core().V1().Services(), - c.Informers.Networking().V1alpha1().IPAddresses(), - ) - runRepairClusterIPs = func(stopCh chan struct{}) { - repairClusterIPs.RunUntil(wg.Done, stopCh) - } - } - c.runner = async.NewRunner(c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts) - c.runner.Start() - - // For backward compatibility, we ensure that if we never are able - // to repair clusterIPs and/or nodeports, we not only fail the liveness - // and/or readiness, but also explicitly fail. - done := make(chan struct{}) - go func() { - defer close(done) - wg.Wait() - }() - select { - case <-done: - case <-time.After(time.Minute): - klog.Fatalf("Unable to perform initial IP and Port allocation check") - } + go c.Run(stopCh) } // Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly. func (c *Controller) Stop() { - if c.runner != nil { - c.runner.Stop() - } endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https") finishedReconciling := make(chan struct{}) go func() { @@ -208,12 +107,12 @@ func (c *Controller) Stop() { } } -// RunKubernetesService periodically updates the kubernetes service -func (c *Controller) RunKubernetesService(ch chan struct{}) { +// Run periodically updates the kubernetes service +func (c *Controller) Run(ch <-chan struct{}) { // wait until process is ready wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { var code int - c.Client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code) + c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code) return code == http.StatusOK, nil }, ch) @@ -233,8 +132,8 @@ func (c *Controller) UpdateKubernetesService(reconcile bool) error { // TODO: when it becomes possible to change this stuff, // stop polling and start watching. // TODO: add endpoints of all replicas, not just the elected master. - if _, err := c.Client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil { - if _, err := c.Client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ + if _, err := c.client.CoreV1().Namespaces().Get(context.TODO(), metav1.NamespaceDefault, metav1.GetOptions{}); err != nil { + if _, err := c.client.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{ ObjectMeta: metav1.ObjectMeta{ Name: metav1.NamespaceDefault, Namespace: "", @@ -286,12 +185,12 @@ func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1. // CreateOrUpdateMasterServiceIfNeeded will create the specified service if it // doesn't already exist. func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error { - if s, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil { + if s, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Get(context.TODO(), serviceName, metav1.GetOptions{}); err == nil { // The service already exists. if reconcile { if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated { klog.Warningf("Resetting master service %q to %#v", serviceName, svc) - _, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{}) + _, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{}) return err } } @@ -315,7 +214,7 @@ func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, ser }, } - _, err := c.Client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) + _, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{}) if errors.IsAlreadyExists(err) { return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile) } diff --git a/pkg/controlplane/controller/kubernetesservice/controller_test.go b/pkg/controlplane/controller/kubernetesservice/controller_test.go index 5e370e82dcb..2cf20f3a9ef 100644 --- a/pkg/controlplane/controller/kubernetesservice/controller_test.go +++ b/pkg/controlplane/controller/kubernetesservice/controller_test.go @@ -67,7 +67,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { for _, test := range createTests { master := Controller{} fakeClient := fake.NewSimpleClientset() - master.Client = fakeClient + master.client = fakeClient master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) creates := []core.CreateAction{} for _, action := range fakeClient.Actions() { @@ -349,7 +349,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { for _, test := range reconcileTests { master := Controller{} fakeClient := fake.NewSimpleClientset(test.service) - master.Client = fakeClient + master.client = fakeClient err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, true) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err) @@ -408,7 +408,7 @@ func TestCreateOrUpdateMasterService(t *testing.T) { for _, test := range nonReconcileTests { master := Controller{} fakeClient := fake.NewSimpleClientset(test.service) - master.Client = fakeClient + master.client = fakeClient err := master.CreateOrUpdateMasterServiceIfNeeded(test.serviceName, netutils.ParseIPSloppy("1.2.3.4"), test.servicePorts, test.serviceType, false) if err != nil { t.Errorf("case %q: unexpected error: %v", test.testName, err)