mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 18:31:15 +00:00
add repair loop
Change-Id: I63464bdd5db706ddf7dc5d828b8d03ad532d7981
This commit is contained in:
parent
811c2f50a1
commit
756f1bfe99
@ -33,9 +33,12 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
||||
"k8s.io/kubernetes/pkg/features"
|
||||
"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
|
||||
corerest "k8s.io/kubernetes/pkg/registry/core/rest"
|
||||
servicecontroller "k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller"
|
||||
@ -52,7 +55,8 @@ const (
|
||||
// controller loops, which manage creating the "kubernetes" service and
|
||||
// provide the IP repair check on service IPs
|
||||
type Controller struct {
|
||||
client kubernetes.Interface
|
||||
client kubernetes.Interface
|
||||
informers informers.SharedInformerFactory
|
||||
|
||||
ServiceClusterIPRegistry rangeallocation.RangeRegistry
|
||||
ServiceClusterIPRange net.IPNet
|
||||
@ -98,7 +102,8 @@ func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.Lega
|
||||
}
|
||||
|
||||
return &Controller{
|
||||
client: client,
|
||||
client: client,
|
||||
informers: c.ExtraConfig.VersionedInformers,
|
||||
|
||||
EndpointReconciler: c.ExtraConfig.EndpointReconcilerConfig.Reconciler,
|
||||
EndpointInterval: c.ExtraConfig.EndpointReconcilerConfig.Interval,
|
||||
@ -150,7 +155,6 @@ func (c *Controller) Start() {
|
||||
klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
|
||||
}
|
||||
|
||||
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.client.CoreV1(), c.client.EventsV1(), &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
|
||||
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.client.CoreV1(), c.client.EventsV1(), c.ServiceNodePortRange, c.ServiceNodePortRegistry)
|
||||
|
||||
// We start both repairClusterIPs and repairNodePorts to ensure repair
|
||||
@ -163,15 +167,37 @@ func (c *Controller) Start() {
|
||||
// than 1 minute for backward compatibility of failing the whole
|
||||
// apiserver if we can't repair them.
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(2)
|
||||
wg.Add(1)
|
||||
|
||||
runRepairClusterIPs := func(stopCh chan struct{}) {
|
||||
repairClusterIPs.RunUntil(wg.Done, stopCh)
|
||||
}
|
||||
runRepairNodePorts := func(stopCh chan struct{}) {
|
||||
repairNodePorts.RunUntil(wg.Done, stopCh)
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
var runRepairClusterIPs func(stopCh chan struct{})
|
||||
if !utilfeature.DefaultFeatureGate.Enabled(features.MultiCIDRServiceAllocator) {
|
||||
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval,
|
||||
c.client.CoreV1(),
|
||||
c.client.EventsV1(),
|
||||
&c.ServiceClusterIPRange,
|
||||
c.ServiceClusterIPRegistry,
|
||||
&c.SecondaryServiceClusterIPRange,
|
||||
c.SecondaryServiceClusterIPRegistry)
|
||||
runRepairClusterIPs = func(stopCh chan struct{}) {
|
||||
repairClusterIPs.RunUntil(wg.Done, stopCh)
|
||||
}
|
||||
} else {
|
||||
repairClusterIPs := servicecontroller.NewRepairIPAddress(c.ServiceClusterIPInterval,
|
||||
c.client,
|
||||
&c.ServiceClusterIPRange,
|
||||
&c.SecondaryServiceClusterIPRange,
|
||||
c.informers.Core().V1().Services(),
|
||||
c.informers.Networking().V1alpha1().IPAddresses(),
|
||||
)
|
||||
runRepairClusterIPs = func(stopCh chan struct{}) {
|
||||
repairClusterIPs.RunUntil(wg.Done, stopCh)
|
||||
}
|
||||
}
|
||||
c.runner = async.NewRunner(c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts)
|
||||
c.runner.Start()
|
||||
|
||||
|
590
pkg/registry/core/service/ipallocator/controller/repairip.go
Normal file
590
pkg/registry/core/service/ipallocator/controller/repairip.go
Normal file
@ -0,0 +1,590 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
coreinformers "k8s.io/client-go/informers/core/v1"
|
||||
networkinginformers "k8s.io/client-go/informers/networking/v1alpha1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
networkinglisters "k8s.io/client-go/listers/networking/v1alpha1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
"k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
|
||||
"k8s.io/utils/clock"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
const (
|
||||
// maxRetries is the number of times a service will be retried before it is dropped out of the queue.
|
||||
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the
|
||||
// sequence of delays between successive queuings of a service.
|
||||
//
|
||||
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
|
||||
maxRetries = 15
|
||||
workers = 5
|
||||
)
|
||||
|
||||
// Repair is a controller loop that examines all service ClusterIP allocations and logs any errors,
|
||||
// and then creates the accurate list of IPAddresses objects with all allocated ClusterIPs.
|
||||
//
|
||||
// Handles:
|
||||
// * Duplicate ClusterIP assignments caused by operator action or undetected race conditions
|
||||
// * Allocations to services that were not actually created due to a crash or powerloss
|
||||
// * Migrates old versions of Kubernetes services into the new ipallocator automatically
|
||||
// creating the corresponding IPAddress objects
|
||||
// * IPAddress objects with wrong references or labels
|
||||
//
|
||||
// Logs about:
|
||||
// * ClusterIPs that do not match the currently configured range
|
||||
//
|
||||
// There is a one-to-one relation between Service ClusterIPs and IPAddresses.
|
||||
// The bidirectional relation is achieved using the following fields:
|
||||
// Service.Spec.Cluster == IPAddress.Name AND IPAddress.ParentRef == Service
|
||||
//
|
||||
// The controller use two reconcile loops, one for Services and other for IPAddress.
|
||||
// The Service reconcile loop verifies the bidirectional relation exists and is correct.
|
||||
// 1. Service_X [ClusterIP_X] <------> IPAddress_X [Ref:Service_X] ok
|
||||
// 2. Service_Y [ClusterIP_Y] <------> IPAddress_Y [Ref:GatewayA] !ok, wrong reference
|
||||
// 3. Service_Z [ClusterIP_Z] <------> !ok, missing IPAddress
|
||||
// 4. Service_A [ClusterIP_A] <------> IPAddress_A [Ref:Service_B] !ok, duplicate IPAddress
|
||||
// Service_B [ClusterIP_A] <------> only one service can verify the relation
|
||||
// The IPAddress reconcile loop checks there are no orphan IPAddresses, the rest of the
|
||||
// cases are covered by the Services loop
|
||||
// 1. <------> IPAddress_Z [Ref:Service_C] !ok, orphan IPAddress
|
||||
|
||||
type RepairIPAddress struct {
|
||||
client kubernetes.Interface
|
||||
interval time.Duration
|
||||
|
||||
networkByFamily map[netutils.IPFamily]*net.IPNet // networks we operate on, by their family
|
||||
|
||||
serviceLister corelisters.ServiceLister
|
||||
servicesSynced cache.InformerSynced
|
||||
|
||||
ipAddressLister networkinglisters.IPAddressLister
|
||||
ipAddressSynced cache.InformerSynced
|
||||
|
||||
svcQueue workqueue.RateLimitingInterface
|
||||
ipQueue workqueue.RateLimitingInterface
|
||||
workerLoopPeriod time.Duration
|
||||
|
||||
broadcaster events.EventBroadcaster
|
||||
recorder events.EventRecorder
|
||||
clock clock.Clock
|
||||
}
|
||||
|
||||
// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
|
||||
// and generates informational warnings for a cluster that is not in sync.
|
||||
func NewRepairIPAddress(interval time.Duration,
|
||||
client kubernetes.Interface,
|
||||
network *net.IPNet,
|
||||
secondaryNetwork *net.IPNet,
|
||||
serviceInformer coreinformers.ServiceInformer,
|
||||
ipAddressInformer networkinginformers.IPAddressInformer) *RepairIPAddress {
|
||||
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
|
||||
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller")
|
||||
|
||||
networkByFamily := make(map[netutils.IPFamily]*net.IPNet)
|
||||
primary := netutils.IPFamilyOfCIDR(network)
|
||||
networkByFamily[primary] = network
|
||||
if secondaryNetwork != nil {
|
||||
secondary := netutils.IPFamilyOfCIDR(secondaryNetwork)
|
||||
networkByFamily[secondary] = secondaryNetwork
|
||||
}
|
||||
|
||||
r := &RepairIPAddress{
|
||||
interval: interval,
|
||||
client: client,
|
||||
networkByFamily: networkByFamily,
|
||||
serviceLister: serviceInformer.Lister(),
|
||||
servicesSynced: serviceInformer.Informer().HasSynced,
|
||||
ipAddressLister: ipAddressInformer.Lister(),
|
||||
ipAddressSynced: ipAddressInformer.Informer().HasSynced,
|
||||
svcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "services"),
|
||||
ipQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ipaddresses"),
|
||||
workerLoopPeriod: time.Second,
|
||||
broadcaster: eventBroadcaster,
|
||||
recorder: recorder,
|
||||
clock: clock.RealClock{},
|
||||
}
|
||||
|
||||
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err == nil {
|
||||
r.svcQueue.Add(key)
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old interface{}, new interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(new)
|
||||
if err == nil {
|
||||
r.svcQueue.Add(key)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
|
||||
// key function.
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err == nil {
|
||||
r.svcQueue.Add(key)
|
||||
}
|
||||
},
|
||||
}, interval)
|
||||
|
||||
ipAddressInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err == nil {
|
||||
r.ipQueue.Add(key)
|
||||
}
|
||||
},
|
||||
UpdateFunc: func(old interface{}, new interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(new)
|
||||
if err == nil {
|
||||
r.ipQueue.Add(key)
|
||||
}
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
|
||||
// key function.
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err == nil {
|
||||
r.ipQueue.Add(key)
|
||||
}
|
||||
},
|
||||
}, interval)
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
// RunUntil starts the controller until the provided ch is closed.
|
||||
func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
|
||||
defer r.ipQueue.ShutDown()
|
||||
defer r.svcQueue.ShutDown()
|
||||
r.broadcaster.StartRecordingToSink(stopCh)
|
||||
defer r.broadcaster.Shutdown()
|
||||
|
||||
klog.Info("Starting ipallocator-repair-controller")
|
||||
defer klog.Info("Shutting down ipallocator-repair-controller")
|
||||
|
||||
if !cache.WaitForNamedCacheSync("ipallocator-repair-controller", stopCh, r.ipAddressSynced, r.servicesSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
// First sync goes through all the Services and IPAddresses in the cache,
|
||||
// once synced, it signals the main loop and works using the handlers, since
|
||||
// it's less expensive and more optimal.
|
||||
if err := r.runOnce(); err != nil {
|
||||
runtime.HandleError(err)
|
||||
return
|
||||
}
|
||||
onFirstSuccess()
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(r.ipWorker, r.workerLoopPeriod, stopCh)
|
||||
go wait.Until(r.svcWorker, r.workerLoopPeriod, stopCh)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
// runOnce verifies the state of the ClusterIP allocations and returns an error if an unrecoverable problem occurs.
|
||||
func (r *RepairIPAddress) runOnce() error {
|
||||
return retry.RetryOnConflict(retry.DefaultBackoff, r.doRunOnce)
|
||||
}
|
||||
|
||||
// doRunOnce verifies the state of the ClusterIP allocations and returns an error if an unrecoverable problem occurs.
|
||||
func (r *RepairIPAddress) doRunOnce() error {
|
||||
services, err := r.serviceLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to refresh the service IP block: %v", err)
|
||||
}
|
||||
|
||||
// Check every Service's ClusterIP, and rebuild the state as we think it should be.
|
||||
for _, svc := range services {
|
||||
key, err := cache.MetaNamespaceKeyFunc(svc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.syncService(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// We have checked that every Service has its corresponding IP.
|
||||
// Check that there is no IP created by the allocator without
|
||||
// a Service associated.
|
||||
ipLabelSelector := labels.Set(map[string]string{
|
||||
networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName,
|
||||
}).AsSelectorPreValidated()
|
||||
ipAddresses, err := r.ipAddressLister.List(ipLabelSelector)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to refresh the IPAddress block: %v", err)
|
||||
}
|
||||
// Check every IPAddress matches the corresponding Service, and rebuild the state as we think it should be.
|
||||
for _, ipAddress := range ipAddresses {
|
||||
key, err := cache.MetaNamespaceKeyFunc(ipAddress)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = r.syncIPAddress(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RepairIPAddress) svcWorker() {
|
||||
for r.processNextWorkSvc() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RepairIPAddress) processNextWorkSvc() bool {
|
||||
eKey, quit := r.svcQueue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer r.svcQueue.Done(eKey)
|
||||
|
||||
err := r.syncService(eKey.(string))
|
||||
r.handleSvcErr(err, eKey)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *RepairIPAddress) handleSvcErr(err error, key interface{}) {
|
||||
if err == nil {
|
||||
r.svcQueue.Forget(key)
|
||||
return
|
||||
}
|
||||
|
||||
if r.svcQueue.NumRequeues(key) < maxRetries {
|
||||
klog.V(2).InfoS("Error syncing Service, retrying", "service", key, "err", err)
|
||||
r.svcQueue.AddRateLimited(key)
|
||||
return
|
||||
}
|
||||
|
||||
klog.Warningf("Dropping Service %q out of the queue: %v", key, err)
|
||||
r.svcQueue.Forget(key)
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
|
||||
// syncServices reconcile the Service ClusterIPs to verify that each one has the corresponding IPAddress object associated
|
||||
func (r *RepairIPAddress) syncService(key string) error {
|
||||
var syncError error
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
svc, err := r.serviceLister.Services(namespace).Get(name)
|
||||
if err != nil {
|
||||
// nothing to do
|
||||
return nil
|
||||
}
|
||||
if !helper.IsServiceIPSet(svc) {
|
||||
// didn't need a ClusterIP
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, clusterIP := range svc.Spec.ClusterIPs {
|
||||
ip := netutils.ParseIPSloppy(clusterIP)
|
||||
if ip == nil {
|
||||
// ClusterIP is corrupt, ClusterIPs are already validated, but double checking here
|
||||
// in case there are some inconsistencies with the parsers
|
||||
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate Service", ip)
|
||||
runtime.HandleError(fmt.Errorf("the ClusterIP %s for Service %s/%s is not a valid IP; please recreate Service", ip, svc.Namespace, svc.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
family := netutils.IPFamilyOf(ip)
|
||||
v1Family := getFamilyByIP(ip)
|
||||
network, ok := r.networkByFamily[family]
|
||||
if !ok {
|
||||
// this service is using an IPFamily no longer configured on cluster
|
||||
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate Service", ip, v1Family)
|
||||
runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is of ip family that is no longer configured on cluster; please recreate Service", v1Family, ip, svc.Namespace, svc.Name))
|
||||
continue
|
||||
}
|
||||
if !network.Contains(ip) {
|
||||
// ClusterIP is out of range
|
||||
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]: %s is not within the configured Service CIDR %s; please recreate service", v1Family, ip, network.String())
|
||||
runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not within the service CIDR %s; please recreate", v1Family, ip, svc.Namespace, svc.Name, network.String()))
|
||||
continue
|
||||
}
|
||||
|
||||
// Get the IPAddress object associated to the ClusterIP
|
||||
ipAddress, err := r.ipAddressLister.Get(ip.String())
|
||||
if apierrors.IsNotFound(err) {
|
||||
// ClusterIP doesn't seem to be allocated, create it.
|
||||
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]: %s is not allocated; repairing", v1Family, ip)
|
||||
runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not allocated; repairing", v1Family, ip, svc.Namespace, svc.Name))
|
||||
_, err := r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(ip.String(), svc), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate ClusterIP [%v]: %s due to an unknown error", v1Family, ip)
|
||||
return fmt.Errorf("unable to allocate ClusterIP [%v]: %s for Service %s/%s due to an unknown error, will retry later: %v", v1Family, ip, svc.Namespace, svc.Name, err)
|
||||
}
|
||||
|
||||
// IPAddress that belongs to a Service must reference a Service
|
||||
if ipAddress.Spec.ParentRef.Group != "" ||
|
||||
ipAddress.Spec.ParentRef.Resource != "services" {
|
||||
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", v1Family, ip, svc.Namespace, svc.Name)
|
||||
if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// IPAddress that belongs to a Service must reference the current Service
|
||||
if ipAddress.Spec.ParentRef.Namespace != svc.Namespace ||
|
||||
ipAddress.Spec.ParentRef.Name != svc.Name {
|
||||
// verify that there are no two Services with the same IP, otherwise
|
||||
// it will keep deleting and recreating the same IPAddress changing the reference
|
||||
refService, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name)
|
||||
if err != nil {
|
||||
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", v1Family, ip, svc.Namespace, svc.Name)
|
||||
if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
// the IPAddress is duplicate but current Service is not the referenced, it has to be recreated
|
||||
for _, clusterIP := range refService.Spec.ClusterIPs {
|
||||
if ipAddress.Name == clusterIP {
|
||||
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip)
|
||||
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to other services %s/%s; please recreate", family, ip, svc.Namespace, svc.Name, refService.Namespace, refService.Name))
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// IPAddress must have the corresponding labels assigned by the allocator
|
||||
if !verifyIPAddressLabels(ipAddress) {
|
||||
if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
}
|
||||
return syncError
|
||||
}
|
||||
|
||||
func (r *RepairIPAddress) recreateIPAddress(name string, svc *v1.Service) error {
|
||||
err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), name, metav1.DeleteOptions{})
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
_, err = r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(name, svc), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RepairIPAddress) ipWorker() {
|
||||
for r.processNextWorkIp() {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RepairIPAddress) processNextWorkIp() bool {
|
||||
eKey, quit := r.ipQueue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer r.ipQueue.Done(eKey)
|
||||
|
||||
err := r.syncIPAddress(eKey.(string))
|
||||
r.handleIpErr(err, eKey)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (r *RepairIPAddress) handleIpErr(err error, key interface{}) {
|
||||
if err == nil {
|
||||
r.ipQueue.Forget(key)
|
||||
return
|
||||
}
|
||||
|
||||
if r.ipQueue.NumRequeues(key) < maxRetries {
|
||||
klog.V(2).InfoS("Error syncing Service, retrying", "service", key, "err", err)
|
||||
r.ipQueue.AddRateLimited(key)
|
||||
return
|
||||
}
|
||||
|
||||
klog.Warningf("Dropping Service %q out of the queue: %v", key, err)
|
||||
r.ipQueue.Forget(key)
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
|
||||
// syncIPAddress verify that the IPAddress that are owned by the ipallocator controller reference an existing Service
|
||||
// to avoid leaking IPAddresses. IPAddresses that are owned by other controllers are not processed to avoid hotloops.
|
||||
// IPAddress that reference Services and are part of the ClusterIP are validated in the syncService loop.
|
||||
func (r *RepairIPAddress) syncIPAddress(key string) error {
|
||||
ipAddress, err := r.ipAddressLister.Get(key)
|
||||
if err != nil {
|
||||
// nothing to do
|
||||
return nil
|
||||
}
|
||||
|
||||
// not mananged by this controller
|
||||
if !managedByController(ipAddress) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// does not reference a Service but created by the service allocator, something else have changed it, delete it
|
||||
if ipAddress.Spec.ParentRef.Group != "" ||
|
||||
ipAddress.Spec.ParentRef.Resource != "services" {
|
||||
runtime.HandleError(fmt.Errorf("IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef))
|
||||
r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef)
|
||||
err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{})
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
svc, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
// cleaning all IPAddress without an owner reference IF the time since it was created is greater than 60 seconds (default timeout value on the kube-apiserver)
|
||||
// This is required because during the Service creation there is a time that the IPAddress object exists but the Service is still being created
|
||||
// Assume that CreationTimestamp exists.
|
||||
ipLifetime := r.clock.Now().Sub(ipAddress.CreationTimestamp.Time)
|
||||
gracePeriod := 60 * time.Second
|
||||
if ipLifetime > gracePeriod {
|
||||
runtime.HandleError(fmt.Errorf("IPAddress %s appears to have leaked: cleaning up", ipAddress.Name))
|
||||
r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress: %s for Service %s/%s appears to have leaked: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef.Namespace, ipAddress.Spec.ParentRef.Name)
|
||||
err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{})
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
// requeue after the grace period
|
||||
r.ipQueue.AddAfter(key, gracePeriod-ipLifetime)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
runtime.HandleError(fmt.Errorf("unable to get parent Service for IPAddress %s due to an unknown error: %v", ipAddress, err))
|
||||
r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "UnknownError", "IPAddressAllocation", "Unable to get parent Service for IPAddress %s due to an unknown error", ipAddress)
|
||||
return err
|
||||
}
|
||||
// The service exists, we have checked in previous loop that all Service to IPAddress are correct
|
||||
// but we also have to check the reverse, that the IPAddress to Service relation is correct
|
||||
for _, clusterIP := range svc.Spec.ClusterIPs {
|
||||
if ipAddress.Name == clusterIP {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
runtime.HandleError(fmt.Errorf("the IPAddress: %s for Service %s/%s has a wrong reference %#v; cleaning up", ipAddress.Name, svc.Name, svc.Namespace, ipAddress.Spec.ParentRef))
|
||||
r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressWrongReference", "IPAddressAllocation", "IPAddress: %s for Service %s/%s has a wrong reference; cleaning up", ipAddress.Name, svc.Namespace, svc.Name)
|
||||
err = r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{})
|
||||
if err != nil && !apierrors.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
}
|
||||
func newIPAddress(name string, svc *v1.Service) *networkingv1alpha1.IPAddress {
|
||||
family := string(v1.IPv4Protocol)
|
||||
if netutils.IsIPv6String(name) {
|
||||
family = string(v1.IPv6Protocol)
|
||||
}
|
||||
return &networkingv1alpha1.IPAddress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Labels: map[string]string{
|
||||
networkingv1alpha1.LabelIPAddressFamily: family,
|
||||
networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName,
|
||||
},
|
||||
},
|
||||
Spec: networkingv1alpha1.IPAddressSpec{
|
||||
ParentRef: serviceToRef(svc),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func serviceToRef(svc *v1.Service) *networkingv1alpha1.ParentReference {
|
||||
if svc == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &networkingv1alpha1.ParentReference{
|
||||
Group: "",
|
||||
Resource: "services",
|
||||
Namespace: svc.Namespace,
|
||||
Name: svc.Name,
|
||||
UID: svc.UID,
|
||||
}
|
||||
}
|
||||
|
||||
func getFamilyByIP(ip net.IP) v1.IPFamily {
|
||||
if netutils.IsIPv6(ip) {
|
||||
return v1.IPv6Protocol
|
||||
}
|
||||
return v1.IPv4Protocol
|
||||
}
|
||||
|
||||
// managedByController returns true if the controller of the provided
|
||||
// EndpointSlices is the EndpointSlice controller.
|
||||
func managedByController(ip *networkingv1alpha1.IPAddress) bool {
|
||||
managedBy, ok := ip.Labels[networkingv1alpha1.LabelManagedBy]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return managedBy == ipallocator.ControllerName
|
||||
}
|
||||
|
||||
func verifyIPAddressLabels(ip *networkingv1alpha1.IPAddress) bool {
|
||||
labelFamily, ok := ip.Labels[networkingv1alpha1.LabelIPAddressFamily]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
family := string(v1.IPv4Protocol)
|
||||
if netutils.IsIPv6String(ip.Name) {
|
||||
family = string(v1.IPv6Protocol)
|
||||
}
|
||||
if family != labelFamily {
|
||||
return false
|
||||
}
|
||||
return managedByController(ip)
|
||||
}
|
@ -0,0 +1,446 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
v1 "k8s.io/api/core/v1"
|
||||
networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
k8stesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
|
||||
netutils "k8s.io/utils/net"
|
||||
)
|
||||
|
||||
var (
|
||||
serviceCIDRv4 = "10.0.0.0/16"
|
||||
serviceCIDRv6 = "2001:db8::/64"
|
||||
)
|
||||
|
||||
type fakeRepair struct {
|
||||
*RepairIPAddress
|
||||
serviceStore cache.Store
|
||||
ipAddressStore cache.Store
|
||||
}
|
||||
|
||||
func newFakeRepair() (*fake.Clientset, *fakeRepair) {
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
|
||||
informerFactory := informers.NewSharedInformerFactory(fakeClient, 0*time.Second)
|
||||
serviceInformer := informerFactory.Core().V1().Services()
|
||||
serviceIndexer := serviceInformer.Informer().GetIndexer()
|
||||
|
||||
ipInformer := informerFactory.Networking().V1alpha1().IPAddresses()
|
||||
ipIndexer := ipInformer.Informer().GetIndexer()
|
||||
|
||||
fakeClient.PrependReactor("create", "ipaddresses", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
|
||||
ip := action.(k8stesting.CreateAction).GetObject().(*networkingv1alpha1.IPAddress)
|
||||
err := ipIndexer.Add(ip)
|
||||
return false, ip, err
|
||||
}))
|
||||
fakeClient.PrependReactor("update", "ipaddresses", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
|
||||
ip := action.(k8stesting.UpdateAction).GetObject().(*networkingv1alpha1.IPAddress)
|
||||
return false, ip, fmt.Errorf("IPAddress is inmutable after creation")
|
||||
}))
|
||||
fakeClient.PrependReactor("delete", "ipaddresses", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
|
||||
ip := action.(k8stesting.DeleteAction).GetName()
|
||||
err := ipIndexer.Delete(ip)
|
||||
return false, &networkingv1alpha1.IPAddress{}, err
|
||||
}))
|
||||
|
||||
_, primary, err := netutils.ParseCIDRSloppy(serviceCIDRv4)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
_, secondary, err := netutils.ParseCIDRSloppy(serviceCIDRv6)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
r := NewRepairIPAddress(0*time.Second,
|
||||
fakeClient,
|
||||
primary,
|
||||
secondary,
|
||||
serviceInformer,
|
||||
ipInformer,
|
||||
)
|
||||
return fakeClient, &fakeRepair{r, serviceIndexer, ipIndexer}
|
||||
}
|
||||
|
||||
func TestRepairServiceIP(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
svcs []*v1.Service
|
||||
ipAddresses []*networkingv1alpha1.IPAddress
|
||||
expectedIPs []string
|
||||
actions [][]string // verb and resource
|
||||
events []string
|
||||
}{
|
||||
{
|
||||
name: "no changes needed single stack",
|
||||
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})},
|
||||
ipAddresses: []*networkingv1alpha1.IPAddress{
|
||||
newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})),
|
||||
},
|
||||
expectedIPs: []string{"10.0.1.1"},
|
||||
actions: [][]string{},
|
||||
events: []string{},
|
||||
},
|
||||
{
|
||||
name: "no changes needed dual stack",
|
||||
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1", "2001:db8::10"})},
|
||||
ipAddresses: []*networkingv1alpha1.IPAddress{
|
||||
newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})),
|
||||
newIPAddress("2001:db8::10", newService("test-svc", []string{"2001:db8::10"})),
|
||||
},
|
||||
expectedIPs: []string{"10.0.1.1", "2001:db8::10"},
|
||||
actions: [][]string{},
|
||||
events: []string{},
|
||||
},
|
||||
// these two cases simulate migrating from bitmaps to IPAddress objects
|
||||
{
|
||||
name: "create IPAddress single stack",
|
||||
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})},
|
||||
expectedIPs: []string{"10.0.1.1"},
|
||||
actions: [][]string{{"create", "ipaddresses"}},
|
||||
events: []string{"Warning ClusterIPNotAllocated Cluster IP [IPv4]: 10.0.1.1 is not allocated; repairing"},
|
||||
},
|
||||
{
|
||||
name: "create IPAddresses dual stack",
|
||||
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1", "2001:db8::10"})},
|
||||
expectedIPs: []string{"10.0.1.1", "2001:db8::10"},
|
||||
actions: [][]string{{"create", "ipaddresses"}, {"create", "ipaddresses"}},
|
||||
events: []string{
|
||||
"Warning ClusterIPNotAllocated Cluster IP [IPv4]: 10.0.1.1 is not allocated; repairing",
|
||||
"Warning ClusterIPNotAllocated Cluster IP [IPv6]: 2001:db8::10 is not allocated; repairing",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "reconcile IPAddress single stack wrong reference",
|
||||
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})},
|
||||
ipAddresses: []*networkingv1alpha1.IPAddress{
|
||||
newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})),
|
||||
},
|
||||
expectedIPs: []string{"10.0.1.1"},
|
||||
actions: [][]string{{"delete", "ipaddresses"}, {"create", "ipaddresses"}},
|
||||
events: []string{"Warning ClusterIPNotAllocated the ClusterIP [IPv4]: 10.0.1.1 for Service bar/test-svc has a wrong reference; repairing"},
|
||||
},
|
||||
{
|
||||
name: "reconcile IPAddresses dual stack",
|
||||
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1", "2001:db8::10"})},
|
||||
ipAddresses: []*networkingv1alpha1.IPAddress{
|
||||
newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})),
|
||||
newIPAddress("2001:db8::10", newService("test-svc2", []string{"2001:db8::10"})),
|
||||
},
|
||||
expectedIPs: []string{"10.0.1.1", "2001:db8::10"},
|
||||
actions: [][]string{{"delete", "ipaddresses"}, {"create", "ipaddresses"}, {"delete", "ipaddresses"}, {"create", "ipaddresses"}},
|
||||
events: []string{
|
||||
"Warning ClusterIPNotAllocated the ClusterIP [IPv4]: 10.0.1.1 for Service bar/test-svc has a wrong reference; repairing",
|
||||
"Warning ClusterIPNotAllocated the ClusterIP [IPv6]: 2001:db8::10 for Service bar/test-svc has a wrong reference; repairing",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "one IP out of range",
|
||||
svcs: []*v1.Service{newService("test-svc", []string{"192.168.1.1", "2001:db8::10"})},
|
||||
ipAddresses: []*networkingv1alpha1.IPAddress{
|
||||
newIPAddress("192.168.1.1", newService("test-svc", []string{"192.168.1.1"})),
|
||||
newIPAddress("2001:db8::10", newService("test-svc", []string{"2001:db8::10"})),
|
||||
},
|
||||
expectedIPs: []string{"2001:db8::10"},
|
||||
actions: [][]string{},
|
||||
events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv4]: 192.168.1.1 is not within the configured Service CIDR 10.0.0.0/16; please recreate service"},
|
||||
},
|
||||
{
|
||||
name: "one IP orphan",
|
||||
ipAddresses: []*networkingv1alpha1.IPAddress{
|
||||
newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})),
|
||||
},
|
||||
actions: [][]string{{"delete", "ipaddresses"}},
|
||||
events: []string{"Warning IPAddressNotAllocated IPAddress: 10.0.1.1 for Service bar/test-svc appears to have leaked: cleaning up"},
|
||||
},
|
||||
{
|
||||
name: "Two IPAddresses referencing the same service",
|
||||
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})},
|
||||
ipAddresses: []*networkingv1alpha1.IPAddress{
|
||||
newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})),
|
||||
newIPAddress("10.0.1.2", newService("test-svc", []string{"10.0.1.1"})),
|
||||
},
|
||||
actions: [][]string{{"delete", "ipaddresses"}},
|
||||
events: []string{"Warning IPAddressWrongReference IPAddress: 10.0.1.2 for Service bar/test-svc has a wrong reference; cleaning up"},
|
||||
},
|
||||
{
|
||||
name: "Two Services with same ClusterIP",
|
||||
svcs: []*v1.Service{
|
||||
newService("test-svc", []string{"10.0.1.1"}),
|
||||
newService("test-svc2", []string{"10.0.1.1"}),
|
||||
},
|
||||
ipAddresses: []*networkingv1alpha1.IPAddress{
|
||||
newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})),
|
||||
},
|
||||
events: []string{"Warning ClusterIPAlreadyAllocated Cluster IP [4]:10.0.1.1 was assigned to multiple services; please recreate service"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
|
||||
c, r := newFakeRepair()
|
||||
// override for testing
|
||||
r.servicesSynced = func() bool { return true }
|
||||
r.ipAddressSynced = func() bool { return true }
|
||||
recorder := events.NewFakeRecorder(100)
|
||||
r.recorder = recorder
|
||||
for _, svc := range test.svcs {
|
||||
err := r.serviceStore.Add(svc)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error trying to add Service %v object: %v", svc, err)
|
||||
}
|
||||
}
|
||||
|
||||
for _, ip := range test.ipAddresses {
|
||||
ip.CreationTimestamp = metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)
|
||||
err := r.ipAddressStore.Add(ip)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error trying to add IPAddress %s object: %v", ip, err)
|
||||
}
|
||||
}
|
||||
|
||||
err := r.runOnce()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, ip := range test.expectedIPs {
|
||||
_, err := r.ipAddressLister.Get(ip)
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error trying to get IPAddress %s object: %v", ip, err)
|
||||
}
|
||||
}
|
||||
|
||||
expectAction(t, c.Actions(), test.actions)
|
||||
expectEvents(t, recorder.Events, test.events)
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRepairIPAddress_syncIPAddress(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
ip *networkingv1alpha1.IPAddress
|
||||
actions [][]string // verb and resource
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
name: "correct ipv4 address",
|
||||
ip: &networkingv1alpha1.IPAddress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "10.0.1.1",
|
||||
Labels: map[string]string{
|
||||
networkingv1alpha1.LabelIPAddressFamily: string(v1.IPv4Protocol),
|
||||
networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName,
|
||||
},
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
Spec: networkingv1alpha1.IPAddressSpec{
|
||||
ParentRef: &networkingv1alpha1.ParentReference{
|
||||
Group: "",
|
||||
Resource: "services",
|
||||
Name: "foo",
|
||||
Namespace: "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "correct ipv6 address",
|
||||
ip: &networkingv1alpha1.IPAddress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "2001:db8::11",
|
||||
Labels: map[string]string{
|
||||
networkingv1alpha1.LabelIPAddressFamily: string(v1.IPv6Protocol),
|
||||
networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName,
|
||||
},
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
Spec: networkingv1alpha1.IPAddressSpec{
|
||||
ParentRef: &networkingv1alpha1.ParentReference{
|
||||
Group: "",
|
||||
Resource: "services",
|
||||
Name: "foo",
|
||||
Namespace: "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "not managed by this controller",
|
||||
ip: &networkingv1alpha1.IPAddress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "2001:db8::11",
|
||||
Labels: map[string]string{
|
||||
networkingv1alpha1.LabelIPAddressFamily: string(v1.IPv6Protocol),
|
||||
networkingv1alpha1.LabelManagedBy: "controller-foo",
|
||||
},
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
Spec: networkingv1alpha1.IPAddressSpec{
|
||||
ParentRef: &networkingv1alpha1.ParentReference{
|
||||
Group: "networking.gateway.k8s.io",
|
||||
Resource: "gateway",
|
||||
Name: "foo",
|
||||
Namespace: "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "out of range",
|
||||
ip: &networkingv1alpha1.IPAddress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "fd00:db8::11",
|
||||
Labels: map[string]string{
|
||||
networkingv1alpha1.LabelIPAddressFamily: string(v1.IPv6Protocol),
|
||||
networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName,
|
||||
},
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
Spec: networkingv1alpha1.IPAddressSpec{
|
||||
ParentRef: &networkingv1alpha1.ParentReference{
|
||||
Group: "",
|
||||
Resource: "services",
|
||||
Name: "foo",
|
||||
Namespace: "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "leaked ip",
|
||||
ip: &networkingv1alpha1.IPAddress{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "10.0.1.1",
|
||||
Labels: map[string]string{
|
||||
networkingv1alpha1.LabelIPAddressFamily: string(v1.IPv6Protocol),
|
||||
networkingv1alpha1.LabelManagedBy: ipallocator.ControllerName,
|
||||
},
|
||||
CreationTimestamp: metav1.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
|
||||
},
|
||||
Spec: networkingv1alpha1.IPAddressSpec{
|
||||
ParentRef: &networkingv1alpha1.ParentReference{
|
||||
Group: "",
|
||||
Resource: "services",
|
||||
Name: "noexist",
|
||||
Namespace: "bar",
|
||||
},
|
||||
},
|
||||
},
|
||||
actions: [][]string{{"delete", "ipaddresses"}},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c, r := newFakeRepair()
|
||||
err := r.ipAddressStore.Add(tt.ip)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
err = r.serviceStore.Add(newService("foo", []string{tt.ip.Name}))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// override for testing
|
||||
r.servicesSynced = func() bool { return true }
|
||||
r.ipAddressSynced = func() bool { return true }
|
||||
recorder := events.NewFakeRecorder(100)
|
||||
r.recorder = recorder
|
||||
if err := r.syncIPAddress(tt.ip.Name); (err != nil) != tt.wantErr {
|
||||
t.Errorf("RepairIPAddress.syncIPAddress() error = %v, wantErr %v", err, tt.wantErr)
|
||||
}
|
||||
expectAction(t, c.Actions(), tt.actions)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newService(name string, ips []string) *v1.Service {
|
||||
if len(ips) == 0 {
|
||||
return nil
|
||||
}
|
||||
svc := &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "bar", Name: name},
|
||||
Spec: v1.ServiceSpec{
|
||||
ClusterIP: ips[0],
|
||||
ClusterIPs: ips,
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
},
|
||||
}
|
||||
return svc
|
||||
}
|
||||
|
||||
func expectAction(t *testing.T, actions []k8stesting.Action, expected [][]string) {
|
||||
t.Helper()
|
||||
if len(actions) != len(expected) {
|
||||
t.Fatalf("Expected at least %d actions, got %d \ndiff: %v", len(expected), len(actions), cmp.Diff(expected, actions))
|
||||
}
|
||||
|
||||
for i, action := range actions {
|
||||
verb := expected[i][0]
|
||||
if action.GetVerb() != verb {
|
||||
t.Errorf("Expected action %d verb to be %s, got %s", i, verb, action.GetVerb())
|
||||
}
|
||||
resource := expected[i][1]
|
||||
if action.GetResource().Resource != resource {
|
||||
t.Errorf("Expected action %d resource to be %s, got %s", i, resource, action.GetResource().Resource)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func expectEvents(t *testing.T, actual <-chan string, expected []string) {
|
||||
t.Helper()
|
||||
c := time.After(wait.ForeverTestTimeout)
|
||||
for _, e := range expected {
|
||||
select {
|
||||
case a := <-actual:
|
||||
if e != a {
|
||||
t.Errorf("Expected event %q, got %q", e, a)
|
||||
return
|
||||
}
|
||||
case <-c:
|
||||
t.Errorf("Expected event %q, got nothing", e)
|
||||
// continue iterating to print all expected events
|
||||
}
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case a := <-actual:
|
||||
t.Errorf("Unexpected event: %q", a)
|
||||
default:
|
||||
return // No more events, as expected.
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user