diff --git a/pkg/controlplane/controller.go b/pkg/controlplane/controller.go index a1d0bb40527..3b2f8d8e4e1 100644 --- a/pkg/controlplane/controller.go +++ b/pkg/controlplane/controller.go @@ -33,9 +33,12 @@ import ( "k8s.io/apimachinery/pkg/util/wait" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/storage" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controlplane/reconcilers" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/registry/core/rangeallocation" corerest "k8s.io/kubernetes/pkg/registry/core/rest" servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller" @@ -52,7 +55,8 @@ const ( // controller loops, which manage creating the "kubernetes" service and // provide the IP repair check on service IPs type Controller struct { - client kubernetes.Interface + client kubernetes.Interface + informers informers.SharedInformerFactory ServiceClusterIPRegistry rangeallocation.RangeRegistry ServiceClusterIPRange net.IPNet @@ -98,7 +102,8 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega } return &Controller{ - client: client, + client: client, + informers: c.ExtraConfig.VersionedInformers, EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler, EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval, @@ -150,7 +155,6 @@ func (c *Controller) Start() { klog.Errorf("Error removing old endpoints from kubernetes service: %v", err) } - repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.client.CoreV1(), c.client.EventsV1(), &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry) repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.client.CoreV1(), c.client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry) // We start both repairClusterIPs and repairNodePorts to ensure repair @@ -163,15 +167,37 @@ func (c *Controller) Start() { // than 1 minute for backward compatibility of failing the whole // apiserver if we can't repair them. wg := sync.WaitGroup{} - wg.Add(2) + wg.Add(1) - runRepairClusterIPs := func(stopCh chan struct{}) { - repairClusterIPs.RunUntil(wg.Done, stopCh) - } runRepairNodePorts := func(stopCh chan struct{}) { repairNodePorts.RunUntil(wg.Done, stopCh) } + wg.Add(1) + var runRepairClusterIPs func(stopCh chan struct{}) + if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) { + repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, + c.client.CoreV1(), + c.client.EventsV1(), + &c.ServiceClusterIPRange, + c.ServiceClusterIPRegistry, + &c.SecondaryServiceClusterIPRange, + c.SecondaryServiceClusterIPRegistry) + runRepairClusterIPs = func(stopCh chan struct{}) { + repairClusterIPs.RunUntil(wg.Done, stopCh) + } + } else { + repairClusterIPs := servicecontroller.NewRepairIPAddress(c.ServiceClusterIPInterval, + c.client, + &c.ServiceClusterIPRange, + &c.SecondaryServiceClusterIPRange, + c.informers.Core().V1().Services(), + c.informers.Networking().V1alpha1().IPAddresses(), + ) + runRepairClusterIPs = func(stopCh chan struct{}) { + repairClusterIPs.RunUntil(wg.Done, stopCh) + } + } c.runner = async.NewRunner(c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts) c.runner.Start() diff --git a/pkg/registry/core/service/ipallocator/controller/repairip.go b/pkg/registry/core/service/ipallocator/controller/repairip.go new file mode 100644 index 00000000000..fec0550ec6d --- /dev/null +++ b/pkg/registry/core/service/ipallocator/controller/repairip.go @@ -0,0 +1,590 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "net" + "time" + + v1 "k8s.io/api/core/v1" + networkingv1alpha1 "k8s.io/api/networking/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + networkinginformers "k8s.io/client-go/informers/networking/v1alpha1" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + networkinglisters "k8s.io/client-go/listers/networking/v1alpha1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/events" + "k8s.io/client-go/util/retry" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/api/legacyscheme" + "k8s.io/kubernetes/pkg/apis/core/v1/helper" + "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" + "k8s.io/utils/clock" + netutils "k8s.io/utils/net" +) + +const ( + // maxRetries is the number of times a service will be retried before it is dropped out of the queue. + // With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the + // sequence of delays between successive queuings of a service. + // + // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s + maxRetries = 15 + workers = 5 +) + +// Repair is a controller loop that examines all service ClusterIP allocations and logs any errors, +// and then creates the accurate list of IPAddresses objects with all allocated ClusterIPs. +// +// Handles: +// * Duplicate ClusterIP assignments caused by operator action or undetected race conditions +// * Allocations to services that were not actually created due to a crash or powerloss +// * Migrates old versions of Kubernetes services into the new ipallocator automatically +// creating the corresponding IPAddress objects +// * IPAddress objects with wrong references or labels +// +// Logs about: +// * ClusterIPs that do not match the currently configured range +// +// There is a one-to-one relation between Service ClusterIPs and IPAddresses. +// The bidirectional relation is achieved using the following fields: +// Service.Spec.Cluster == IPAddress.Name AND IPAddress.ParentRef == Service +// +// The controller use two reconcile loops, one for Services and other for IPAddress. +// The Service reconcile loop verifies the bidirectional relation exists and is correct. +// 1. Service_X [ClusterIP_X] <------> IPAddress_X [Ref:Service_X] ok +// 2. Service_Y [ClusterIP_Y] <------> IPAddress_Y [Ref:GatewayA] !ok, wrong reference +// 3. Service_Z [ClusterIP_Z] <------> !ok, missing IPAddress +// 4. Service_A [ClusterIP_A] <------> IPAddress_A [Ref:Service_B] !ok, duplicate IPAddress +// Service_B [ClusterIP_A] <------> only one service can verify the relation +// The IPAddress reconcile loop checks there are no orphan IPAddresses, the rest of the +// cases are covered by the Services loop +// 1. <------> IPAddress_Z [Ref:Service_C] !ok, orphan IPAddress + +type RepairIPAddress struct { + client kubernetes.Interface + interval time.Duration + + networkByFamily map[netutils.IPFamily]*net.IPNet // networks we operate on, by their family + + serviceLister corelisters.ServiceLister + servicesSynced cache.InformerSynced + + ipAddressLister networkinglisters.IPAddressLister + ipAddressSynced cache.InformerSynced + + svcQueue workqueue.RateLimitingInterface + ipQueue workqueue.RateLimitingInterface + workerLoopPeriod time.Duration + + broadcaster events.EventBroadcaster + recorder events.EventRecorder + clock clock.Clock +} + +// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster +// and generates informational warnings for a cluster that is not in sync. +func NewRepairIPAddress(interval time.Duration, + client kubernetes.Interface, + network *net.IPNet, + secondaryNetwork *net.IPNet, + serviceInformer coreinformers.ServiceInformer, + ipAddressInformer networkinginformers.IPAddressInformer) *RepairIPAddress { + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) + recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller") + + networkByFamily := make(map[netutils.IPFamily]*net.IPNet) + primary := netutils.IPFamilyOfCIDR(network) + networkByFamily[primary] = network + if secondaryNetwork != nil { + secondary := netutils.IPFamilyOfCIDR(secondaryNetwork) + networkByFamily[secondary] = secondaryNetwork + } + + r := &RepairIPAddress{ + interval: interval, + client: client, + networkByFamily: networkByFamily, + serviceLister: serviceInformer.Lister(), + servicesSynced: serviceInformer.Informer().HasSynced, + ipAddressLister: ipAddressInformer.Lister(), + ipAddressSynced: ipAddressInformer.Informer().HasSynced, + svcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "services"), + ipQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ipaddresses"), + workerLoopPeriod: time.Second, + broadcaster: eventBroadcaster, + recorder: recorder, + clock: clock.RealClock{}, + } + + serviceInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + r.svcQueue.Add(key) + } + }, + UpdateFunc: func(old interface{}, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + r.svcQueue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + // IndexerInformer uses a delta queue, therefore for deletes we have to use this + // key function. + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + r.svcQueue.Add(key) + } + }, + }, interval) + + ipAddressInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + r.ipQueue.Add(key) + } + }, + UpdateFunc: func(old interface{}, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + r.ipQueue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + // IndexerInformer uses a delta queue, therefore for deletes we have to use this + // key function. + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + r.ipQueue.Add(key) + } + }, + }, interval) + + return r +} + +// RunUntil starts the controller until the provided ch is closed. +func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{}) { + defer r.ipQueue.ShutDown() + defer r.svcQueue.ShutDown() + r.broadcaster.StartRecordingToSink(stopCh) + defer r.broadcaster.Shutdown() + + klog.Info("Starting ipallocator-repair-controller") + defer klog.Info("Shutting down ipallocator-repair-controller") + + if !cache.WaitForNamedCacheSync("ipallocator-repair-controller", stopCh, r.ipAddressSynced, r.servicesSynced) { + return + } + + // First sync goes through all the Services and IPAddresses in the cache, + // once synced, it signals the main loop and works using the handlers, since + // it's less expensive and more optimal. + if err := r.runOnce(); err != nil { + runtime.HandleError(err) + return + } + onFirstSuccess() + + for i := 0; i < workers; i++ { + go wait.Until(r.ipWorker, r.workerLoopPeriod, stopCh) + go wait.Until(r.svcWorker, r.workerLoopPeriod, stopCh) + } + + <-stopCh +} + +// runOnce verifies the state of the ClusterIP allocations and returns an error if an unrecoverable problem occurs. +func (r *RepairIPAddress) runOnce() error { + return retry.RetryOnConflict(retry.DefaultBackoff, r.doRunOnce) +} + +// doRunOnce verifies the state of the ClusterIP allocations and returns an error if an unrecoverable problem occurs. +func (r *RepairIPAddress) doRunOnce() error { + services, err := r.serviceLister.List(labels.Everything()) + if err != nil { + return fmt.Errorf("unable to refresh the service IP block: %v", err) + } + + // Check every Service's ClusterIP, and rebuild the state as we think it should be. + for _, svc := range services { + key, err := cache.MetaNamespaceKeyFunc(svc) + if err != nil { + return err + } + err = r.syncService(key) + if err != nil { + return err + } + } + + // We have checked that every Service has its corresponding IP. + // Check that there is no IP created by the allocator without + // a Service associated. + ipLabelSelector := labels.Set(map[string]string{ + networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName, + }).AsSelectorPreValidated() + ipAddresses, err := r.ipAddressLister.List(ipLabelSelector) + if err != nil { + return fmt.Errorf("unable to refresh the IPAddress block: %v", err) + } + // Check every IPAddress matches the corresponding Service, and rebuild the state as we think it should be. + for _, ipAddress := range ipAddresses { + key, err := cache.MetaNamespaceKeyFunc(ipAddress) + if err != nil { + return err + } + err = r.syncIPAddress(key) + if err != nil { + return err + } + } + + return nil +} + +func (r *RepairIPAddress) svcWorker() { + for r.processNextWorkSvc() { + } +} + +func (r *RepairIPAddress) processNextWorkSvc() bool { + eKey, quit := r.svcQueue.Get() + if quit { + return false + } + defer r.svcQueue.Done(eKey) + + err := r.syncService(eKey.(string)) + r.handleSvcErr(err, eKey) + + return true +} + +func (r *RepairIPAddress) handleSvcErr(err error, key interface{}) { + if err == nil { + r.svcQueue.Forget(key) + return + } + + if r.svcQueue.NumRequeues(key) < maxRetries { + klog.V(2).InfoS("Error syncing Service, retrying", "service", key, "err", err) + r.svcQueue.AddRateLimited(key) + return + } + + klog.Warningf("Dropping Service %q out of the queue: %v", key, err) + r.svcQueue.Forget(key) + runtime.HandleError(err) +} + +// syncServices reconcile the Service ClusterIPs to verify that each one has the corresponding IPAddress object associated +func (r *RepairIPAddress) syncService(key string) error { + var syncError error + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + svc, err := r.serviceLister.Services(namespace).Get(name) + if err != nil { + // nothing to do + return nil + } + if !helper.IsServiceIPSet(svc) { + // didn't need a ClusterIP + return nil + } + + for _, clusterIP := range svc.Spec.ClusterIPs { + ip := netutils.ParseIPSloppy(clusterIP) + if ip == nil { + // ClusterIP is corrupt, ClusterIPs are already validated, but double checking here + // in case there are some inconsistencies with the parsers + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate Service", ip) + runtime.HandleError(fmt.Errorf("the ClusterIP %s for Service %s/%s is not a valid IP; please recreate Service", ip, svc.Namespace, svc.Name)) + continue + } + + family := netutils.IPFamilyOf(ip) + v1Family := getFamilyByIP(ip) + network, ok := r.networkByFamily[family] + if !ok { + // this service is using an IPFamily no longer configured on cluster + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate Service", ip, v1Family) + runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is of ip family that is no longer configured on cluster; please recreate Service", v1Family, ip, svc.Namespace, svc.Name)) + continue + } + if !network.Contains(ip) { + // ClusterIP is out of range + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]: %s is not within the configured Service CIDR %s; please recreate service", v1Family, ip, network.String()) + runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not within the service CIDR %s; please recreate", v1Family, ip, svc.Namespace, svc.Name, network.String())) + continue + } + + // Get the IPAddress object associated to the ClusterIP + ipAddress, err := r.ipAddressLister.Get(ip.String()) + if apierrors.IsNotFound(err) { + // ClusterIP doesn't seem to be allocated, create it. + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]: %s is not allocated; repairing", v1Family, ip) + runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not allocated; repairing", v1Family, ip, svc.Namespace, svc.Name)) + _, err := r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(ip.String(), svc), metav1.CreateOptions{}) + if err != nil { + return err + } + continue + } + if err != nil { + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate ClusterIP [%v]: %s due to an unknown error", v1Family, ip) + return fmt.Errorf("unable to allocate ClusterIP [%v]: %s for Service %s/%s due to an unknown error, will retry later: %v", v1Family, ip, svc.Namespace, svc.Name, err) + } + + // IPAddress that belongs to a Service must reference a Service + if ipAddress.Spec.ParentRef.Group != "" || + ipAddress.Spec.ParentRef.Resource != "services" { + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", v1Family, ip, svc.Namespace, svc.Name) + if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil { + return err + } + continue + } + + // IPAddress that belongs to a Service must reference the current Service + if ipAddress.Spec.ParentRef.Namespace != svc.Namespace || + ipAddress.Spec.ParentRef.Name != svc.Name { + // verify that there are no two Services with the same IP, otherwise + // it will keep deleting and recreating the same IPAddress changing the reference + refService, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name) + if err != nil { + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", v1Family, ip, svc.Namespace, svc.Name) + if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil { + return err + } + continue + } + // the IPAddress is duplicate but current Service is not the referenced, it has to be recreated + for _, clusterIP := range refService.Spec.ClusterIPs { + if ipAddress.Name == clusterIP { + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip) + runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to other services %s/%s; please recreate", family, ip, svc.Namespace, svc.Name, refService.Namespace, refService.Name)) + break + } + } + } + + // IPAddress must have the corresponding labels assigned by the allocator + if !verifyIPAddressLabels(ipAddress) { + if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil { + return err + } + continue + } + + } + return syncError +} + +func (r *RepairIPAddress) recreateIPAddress(name string, svc *v1.Service) error { + err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + _, err = r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(name, svc), metav1.CreateOptions{}) + if err != nil { + return err + } + return nil +} + +func (r *RepairIPAddress) ipWorker() { + for r.processNextWorkIp() { + } +} + +func (r *RepairIPAddress) processNextWorkIp() bool { + eKey, quit := r.ipQueue.Get() + if quit { + return false + } + defer r.ipQueue.Done(eKey) + + err := r.syncIPAddress(eKey.(string)) + r.handleIpErr(err, eKey) + + return true +} + +func (r *RepairIPAddress) handleIpErr(err error, key interface{}) { + if err == nil { + r.ipQueue.Forget(key) + return + } + + if r.ipQueue.NumRequeues(key) < maxRetries { + klog.V(2).InfoS("Error syncing Service, retrying", "service", key, "err", err) + r.ipQueue.AddRateLimited(key) + return + } + + klog.Warningf("Dropping Service %q out of the queue: %v", key, err) + r.ipQueue.Forget(key) + runtime.HandleError(err) +} + +// syncIPAddress verify that the IPAddress that are owned by the ipallocator controller reference an existing Service +// to avoid leaking IPAddresses. IPAddresses that are owned by other controllers are not processed to avoid hotloops. +// IPAddress that reference Services and are part of the ClusterIP are validated in the syncService loop. +func (r *RepairIPAddress) syncIPAddress(key string) error { + ipAddress, err := r.ipAddressLister.Get(key) + if err != nil { + // nothing to do + return nil + } + + // not mananged by this controller + if !managedByController(ipAddress) { + return nil + } + + // does not reference a Service but created by the service allocator, something else have changed it, delete it + if ipAddress.Spec.ParentRef.Group != "" || + ipAddress.Spec.ParentRef.Resource != "services" { + runtime.HandleError(fmt.Errorf("IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef)) + r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef) + err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + return nil + } + + svc, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name) + if apierrors.IsNotFound(err) { + // cleaning all IPAddress without an owner reference IF the time since it was created is greater than 60 seconds (default timeout value on the kube-apiserver) + // This is required because during the Service creation there is a time that the IPAddress object exists but the Service is still being created + // Assume that CreationTimestamp exists. + ipLifetime := r.clock.Now().Sub(ipAddress.CreationTimestamp.Time) + gracePeriod := 60 * time.Second + if ipLifetime > gracePeriod { + runtime.HandleError(fmt.Errorf("IPAddress %s appears to have leaked: cleaning up", ipAddress.Name)) + r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress: %s for Service %s/%s appears to have leaked: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef.Namespace, ipAddress.Spec.ParentRef.Name) + err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + } + // requeue after the grace period + r.ipQueue.AddAfter(key, gracePeriod-ipLifetime) + return nil + } + if err != nil { + runtime.HandleError(fmt.Errorf("unable to get parent Service for IPAddress %s due to an unknown error: %v", ipAddress, err)) + r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "UnknownError", "IPAddressAllocation", "Unable to get parent Service for IPAddress %s due to an unknown error", ipAddress) + return err + } + // The service exists, we have checked in previous loop that all Service to IPAddress are correct + // but we also have to check the reverse, that the IPAddress to Service relation is correct + for _, clusterIP := range svc.Spec.ClusterIPs { + if ipAddress.Name == clusterIP { + return nil + } + } + runtime.HandleError(fmt.Errorf("the IPAddress: %s for Service %s/%s has a wrong reference %#v; cleaning up", ipAddress.Name, svc.Name, svc.Namespace, ipAddress.Spec.ParentRef)) + r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressWrongReference", "IPAddressAllocation", "IPAddress: %s for Service %s/%s has a wrong reference; cleaning up", ipAddress.Name, svc.Namespace, svc.Name) + err = r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + return nil + +} +func newIPAddress(name string, svc *v1.Service) *networkingv1alpha1.IPAddress { + family := string(v1.IPv4Protocol) + if netutils.IsIPv6String(name) { + family = string(v1.IPv6Protocol) + } + return &networkingv1alpha1.IPAddress{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{ + networkingv1alpha1.LabelIPAddressFamily: family, + networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName, + }, + }, + Spec: networkingv1alpha1.IPAddressSpec{ + ParentRef: serviceToRef(svc), + }, + } +} + +func serviceToRef(svc *v1.Service) *networkingv1alpha1.ParentReference { + if svc == nil { + return nil + } + + return &networkingv1alpha1.ParentReference{ + Group: "", + Resource: "services", + Namespace: svc.Namespace, + Name: svc.Name, + UID: svc.UID, + } +} + +func getFamilyByIP(ip net.IP) v1.IPFamily { + if netutils.IsIPv6(ip) { + return v1.IPv6Protocol + } + return v1.IPv4Protocol +} + +// managedByController returns true if the controller of the provided +// EndpointSlices is the EndpointSlice controller. +func managedByController(ip *networkingv1alpha1.IPAddress) bool { + managedBy, ok := ip.Labels[networkingv1alpha1.LabelManagedBy] + if !ok { + return false + } + return managedBy == ipallocator.ControllerName +} + +func verifyIPAddressLabels(ip *networkingv1alpha1.IPAddress) bool { + labelFamily, ok := ip.Labels[networkingv1alpha1.LabelIPAddressFamily] + if !ok { + return false + } + + family := string(v1.IPv4Protocol) + if netutils.IsIPv6String(ip.Name) { + family = string(v1.IPv6Protocol) + } + if family != labelFamily { + return false + } + return managedByController(ip) +} diff --git a/pkg/registry/core/service/ipallocator/controller/repairip_test.go b/pkg/registry/core/service/ipallocator/controller/repairip_test.go new file mode 100644 index 00000000000..f82b638bcef --- /dev/null +++ b/pkg/registry/core/service/ipallocator/controller/repairip_test.go @@ -0,0 +1,446 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" + networkingv1alpha1 "k8s.io/api/networking/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/events" + "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" + netutils "k8s.io/utils/net" +) + +var ( + serviceCIDRv4 = "10.0.0.0/16" + serviceCIDRv6 = "2001:db8::/64" +) + +type fakeRepair struct { + *RepairIPAddress + serviceStore cache.Store + ipAddressStore cache.Store +} + +func newFakeRepair() (*fake.Clientset, *fakeRepair) { + fakeClient := fake.NewSimpleClientset() + + informerFactory := informers.NewSharedInformerFactory(fakeClient, 0*time.Second) + serviceInformer := informerFactory.Core().V1().Services() + serviceIndexer := serviceInformer.Informer().GetIndexer() + + ipInformer := informerFactory.Networking().V1alpha1().IPAddresses() + ipIndexer := ipInformer.Informer().GetIndexer() + + fakeClient.PrependReactor("create", "ipaddresses", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { + ip := action.(k8stesting.CreateAction).GetObject().(*networkingv1alpha1.IPAddress) + err := ipIndexer.Add(ip) + return false, ip, err + })) + fakeClient.PrependReactor("update", "ipaddresses", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { + ip := action.(k8stesting.UpdateAction).GetObject().(*networkingv1alpha1.IPAddress) + return false, ip, fmt.Errorf("IPAddress is inmutable after creation") + })) + fakeClient.PrependReactor("delete", "ipaddresses", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { + ip := action.(k8stesting.DeleteAction).GetName() + err := ipIndexer.Delete(ip) + return false, &networkingv1alpha1.IPAddress{}, err + })) + + _, primary, err := netutils.ParseCIDRSloppy(serviceCIDRv4) + if err != nil { + panic(err) + } + _, secondary, err := netutils.ParseCIDRSloppy(serviceCIDRv6) + if err != nil { + panic(err) + } + r := NewRepairIPAddress(0*time.Second, + fakeClient, + primary, + secondary, + serviceInformer, + ipInformer, + ) + return fakeClient, &fakeRepair{r, serviceIndexer, ipIndexer} +} + +func TestRepairServiceIP(t *testing.T) { + tests := []struct { + name string + svcs []*v1.Service + ipAddresses []*networkingv1alpha1.IPAddress + expectedIPs []string + actions [][]string // verb and resource + events []string + }{ + { + name: "no changes needed single stack", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), + }, + expectedIPs: []string{"10.0.1.1"}, + actions: [][]string{}, + events: []string{}, + }, + { + name: "no changes needed dual stack", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1", "2001:db8::10"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), + newIPAddress("2001:db8::10", newService("test-svc", []string{"2001:db8::10"})), + }, + expectedIPs: []string{"10.0.1.1", "2001:db8::10"}, + actions: [][]string{}, + events: []string{}, + }, + // these two cases simulate migrating from bitmaps to IPAddress objects + { + name: "create IPAddress single stack", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, + expectedIPs: []string{"10.0.1.1"}, + actions: [][]string{{"create", "ipaddresses"}}, + events: []string{"Warning ClusterIPNotAllocated Cluster IP [IPv4]: 10.0.1.1 is not allocated; repairing"}, + }, + { + name: "create IPAddresses dual stack", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1", "2001:db8::10"})}, + expectedIPs: []string{"10.0.1.1", "2001:db8::10"}, + actions: [][]string{{"create", "ipaddresses"}, {"create", "ipaddresses"}}, + events: []string{ + "Warning ClusterIPNotAllocated Cluster IP [IPv4]: 10.0.1.1 is not allocated; repairing", + "Warning ClusterIPNotAllocated Cluster IP [IPv6]: 2001:db8::10 is not allocated; repairing", + }, + }, + { + name: "reconcile IPAddress single stack wrong reference", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})), + }, + expectedIPs: []string{"10.0.1.1"}, + actions: [][]string{{"delete", "ipaddresses"}, {"create", "ipaddresses"}}, + events: []string{"Warning ClusterIPNotAllocated the ClusterIP [IPv4]: 10.0.1.1 for Service bar/test-svc has a wrong reference; repairing"}, + }, + { + name: "reconcile IPAddresses dual stack", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1", "2001:db8::10"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})), + newIPAddress("2001:db8::10", newService("test-svc2", []string{"2001:db8::10"})), + }, + expectedIPs: []string{"10.0.1.1", "2001:db8::10"}, + actions: [][]string{{"delete", "ipaddresses"}, {"create", "ipaddresses"}, {"delete", "ipaddresses"}, {"create", "ipaddresses"}}, + events: []string{ + "Warning ClusterIPNotAllocated the ClusterIP [IPv4]: 10.0.1.1 for Service bar/test-svc has a wrong reference; repairing", + "Warning ClusterIPNotAllocated the ClusterIP [IPv6]: 2001:db8::10 for Service bar/test-svc has a wrong reference; repairing", + }, + }, + { + name: "one IP out of range", + svcs: []*v1.Service{newService("test-svc", []string{"192.168.1.1", "2001:db8::10"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("192.168.1.1", newService("test-svc", []string{"192.168.1.1"})), + newIPAddress("2001:db8::10", newService("test-svc", []string{"2001:db8::10"})), + }, + expectedIPs: []string{"2001:db8::10"}, + actions: [][]string{}, + events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv4]: 192.168.1.1 is not within the configured Service CIDR 10.0.0.0/16; please recreate service"}, + }, + { + name: "one IP orphan", + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), + }, + actions: [][]string{{"delete", "ipaddresses"}}, + events: []string{"Warning IPAddressNotAllocated IPAddress: 10.0.1.1 for Service bar/test-svc appears to have leaked: cleaning up"}, + }, + { + name: "Two IPAddresses referencing the same service", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), + newIPAddress("10.0.1.2", newService("test-svc", []string{"10.0.1.1"})), + }, + actions: [][]string{{"delete", "ipaddresses"}}, + events: []string{"Warning IPAddressWrongReference IPAddress: 10.0.1.2 for Service bar/test-svc has a wrong reference; cleaning up"}, + }, + { + name: "Two Services with same ClusterIP", + svcs: []*v1.Service{ + newService("test-svc", []string{"10.0.1.1"}), + newService("test-svc2", []string{"10.0.1.1"}), + }, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})), + }, + events: []string{"Warning ClusterIPAlreadyAllocated Cluster IP [4]:10.0.1.1 was assigned to multiple services; please recreate service"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + c, r := newFakeRepair() + // override for testing + r.servicesSynced = func() bool { return true } + r.ipAddressSynced = func() bool { return true } + recorder := events.NewFakeRecorder(100) + r.recorder = recorder + for _, svc := range test.svcs { + err := r.serviceStore.Add(svc) + if err != nil { + t.Errorf("Unexpected error trying to add Service %v object: %v", svc, err) + } + } + + for _, ip := range test.ipAddresses { + ip.CreationTimestamp = metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC) + err := r.ipAddressStore.Add(ip) + if err != nil { + t.Errorf("Unexpected error trying to add IPAddress %s object: %v", ip, err) + } + } + + err := r.runOnce() + if err != nil { + t.Fatal(err) + } + + for _, ip := range test.expectedIPs { + _, err := r.ipAddressLister.Get(ip) + if err != nil { + t.Errorf("Unexpected error trying to get IPAddress %s object: %v", ip, err) + } + } + + expectAction(t, c.Actions(), test.actions) + expectEvents(t, recorder.Events, test.events) + }) + } + +} + +func TestRepairIPAddress_syncIPAddress(t *testing.T) { + tests := []struct { + name string + ip *networkingv1alpha1.IPAddress + actions [][]string // verb and resource + wantErr bool + }{ + { + name: "correct ipv4 address", + ip: &networkingv1alpha1.IPAddress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "10.0.1.1", + Labels: map[string]string{ + networkingv1alpha1.LabelIPAddressFamily: string(v1.IPv4Protocol), + networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName, + }, + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: networkingv1alpha1.IPAddressSpec{ + ParentRef: &networkingv1alpha1.ParentReference{ + Group: "", + Resource: "services", + Name: "foo", + Namespace: "bar", + }, + }, + }, + }, + { + name: "correct ipv6 address", + ip: &networkingv1alpha1.IPAddress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "2001:db8::11", + Labels: map[string]string{ + networkingv1alpha1.LabelIPAddressFamily: string(v1.IPv6Protocol), + networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName, + }, + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: networkingv1alpha1.IPAddressSpec{ + ParentRef: &networkingv1alpha1.ParentReference{ + Group: "", + Resource: "services", + Name: "foo", + Namespace: "bar", + }, + }, + }, + }, + { + name: "not managed by this controller", + ip: &networkingv1alpha1.IPAddress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "2001:db8::11", + Labels: map[string]string{ + networkingv1alpha1.LabelIPAddressFamily: string(v1.IPv6Protocol), + networkingv1alpha1.LabelManagedBy: "controller-foo", + }, + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: networkingv1alpha1.IPAddressSpec{ + ParentRef: &networkingv1alpha1.ParentReference{ + Group: "networking.gateway.k8s.io", + Resource: "gateway", + Name: "foo", + Namespace: "bar", + }, + }, + }, + }, + { + name: "out of range", + ip: &networkingv1alpha1.IPAddress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "fd00:db8::11", + Labels: map[string]string{ + networkingv1alpha1.LabelIPAddressFamily: string(v1.IPv6Protocol), + networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName, + }, + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: networkingv1alpha1.IPAddressSpec{ + ParentRef: &networkingv1alpha1.ParentReference{ + Group: "", + Resource: "services", + Name: "foo", + Namespace: "bar", + }, + }, + }, + }, + { + name: "leaked ip", + ip: &networkingv1alpha1.IPAddress{ + ObjectMeta: metav1.ObjectMeta{ + Name: "10.0.1.1", + Labels: map[string]string{ + networkingv1alpha1.LabelIPAddressFamily: string(v1.IPv6Protocol), + networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName, + }, + CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Spec: networkingv1alpha1.IPAddressSpec{ + ParentRef: &networkingv1alpha1.ParentReference{ + Group: "", + Resource: "services", + Name: "noexist", + Namespace: "bar", + }, + }, + }, + actions: [][]string{{"delete", "ipaddresses"}}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c, r := newFakeRepair() + err := r.ipAddressStore.Add(tt.ip) + if err != nil { + t.Fatal(err) + } + err = r.serviceStore.Add(newService("foo", []string{tt.ip.Name})) + if err != nil { + t.Fatal(err) + } + + // override for testing + r.servicesSynced = func() bool { return true } + r.ipAddressSynced = func() bool { return true } + recorder := events.NewFakeRecorder(100) + r.recorder = recorder + if err := r.syncIPAddress(tt.ip.Name); (err != nil) != tt.wantErr { + t.Errorf("RepairIPAddress.syncIPAddress() error = %v, wantErr %v", err, tt.wantErr) + } + expectAction(t, c.Actions(), tt.actions) + + }) + } +} + +func newService(name string, ips []string) *v1.Service { + if len(ips) == 0 { + return nil + } + svc := &v1.Service{ + ObjectMeta: metav1.ObjectMeta{Namespace: "bar", Name: name}, + Spec: v1.ServiceSpec{ + ClusterIP: ips[0], + ClusterIPs: ips, + Type: v1.ServiceTypeClusterIP, + }, + } + return svc +} + +func expectAction(t *testing.T, actions []k8stesting.Action, expected [][]string) { + t.Helper() + if len(actions) != len(expected) { + t.Fatalf("Expected at least %d actions, got %d \ndiff: %v", len(expected), len(actions), cmp.Diff(expected, actions)) + } + + for i, action := range actions { + verb := expected[i][0] + if action.GetVerb() != verb { + t.Errorf("Expected action %d verb to be %s, got %s", i, verb, action.GetVerb()) + } + resource := expected[i][1] + if action.GetResource().Resource != resource { + t.Errorf("Expected action %d resource to be %s, got %s", i, resource, action.GetResource().Resource) + } + } +} + +func expectEvents(t *testing.T, actual <-chan string, expected []string) { + t.Helper() + c := time.After(wait.ForeverTestTimeout) + for _, e := range expected { + select { + case a := <-actual: + if e != a { + t.Errorf("Expected event %q, got %q", e, a) + return + } + case <-c: + t.Errorf("Expected event %q, got nothing", e) + // continue iterating to print all expected events + } + } + for { + select { + case a := <-actual: + t.Errorf("Unexpected event: %q", a) + default: + return // No more events, as expected. + } + } +}