diff --git a/pkg/registry/core/rest/storage_core.go b/pkg/registry/core/rest/storage_core.go index 3fabc84371a..95d0b29ede5 100644 --- a/pkg/registry/core/rest/storage_core.go +++ b/pkg/registry/core/rest/storage_core.go @@ -137,9 +137,8 @@ func New(c Config) (*legacyProvider, error) { p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepairIPAddress( c.Services.IPRepairInterval, client, - &c.Services.ClusterIPRange, - &c.Services.SecondaryClusterIPRange, c.Informers.Core().V1().Services(), + c.Informers.Networking().V1alpha1().ServiceCIDRs(), c.Informers.Networking().V1alpha1().IPAddresses(), ).RunUntil } diff --git a/pkg/registry/core/service/ipallocator/controller/repairip.go b/pkg/registry/core/service/ipallocator/controller/repairip.go index aa4b19cc56a..540fb784180 100644 --- a/pkg/registry/core/service/ipallocator/controller/repairip.go +++ b/pkg/registry/core/service/ipallocator/controller/repairip.go @@ -20,6 +20,8 @@ import ( "context" "fmt" "net" + "net/netip" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -42,6 +44,7 @@ import ( "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" + "k8s.io/kubernetes/pkg/util/iptree" "k8s.io/utils/clock" netutils "k8s.io/utils/net" ) @@ -88,18 +91,23 @@ type RepairIPAddress struct { client kubernetes.Interface interval time.Duration - networkByFamily map[netutils.IPFamily]*net.IPNet // networks we operate on, by their family - serviceLister corelisters.ServiceLister servicesSynced cache.InformerSynced + serviceCIDRLister networkinglisters.ServiceCIDRLister + serviceCIDRSynced cache.InformerSynced + ipAddressLister networkinglisters.IPAddressLister ipAddressSynced cache.InformerSynced + cidrQueue workqueue.RateLimitingInterface svcQueue workqueue.RateLimitingInterface ipQueue workqueue.RateLimitingInterface workerLoopPeriod time.Duration + muTree sync.Mutex + tree *iptree.Tree[string] + broadcaster events.EventBroadcaster recorder events.EventRecorder clock clock.Clock @@ -109,38 +117,32 @@ type RepairIPAddress struct { // and generates informational warnings for a cluster that is not in sync. func NewRepairIPAddress(interval time.Duration, client kubernetes.Interface, - network *net.IPNet, - secondaryNetwork *net.IPNet, serviceInformer coreinformers.ServiceInformer, + serviceCIDRInformer networkinginformers.ServiceCIDRInformer, ipAddressInformer networkinginformers.IPAddressInformer) *RepairIPAddress { eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller") - networkByFamily := make(map[netutils.IPFamily]*net.IPNet) - primary := netutils.IPFamilyOfCIDR(network) - networkByFamily[primary] = network - if secondaryNetwork != nil { - secondary := netutils.IPFamilyOfCIDR(secondaryNetwork) - networkByFamily[secondary] = secondaryNetwork - } - r := &RepairIPAddress{ - interval: interval, - client: client, - networkByFamily: networkByFamily, - serviceLister: serviceInformer.Lister(), - servicesSynced: serviceInformer.Informer().HasSynced, - ipAddressLister: ipAddressInformer.Lister(), - ipAddressSynced: ipAddressInformer.Informer().HasSynced, - svcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "services"), - ipQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ipaddresses"), - workerLoopPeriod: time.Second, - broadcaster: eventBroadcaster, - recorder: recorder, - clock: clock.RealClock{}, + interval: interval, + client: client, + serviceLister: serviceInformer.Lister(), + servicesSynced: serviceInformer.Informer().HasSynced, + serviceCIDRLister: serviceCIDRInformer.Lister(), + serviceCIDRSynced: serviceCIDRInformer.Informer().HasSynced, + ipAddressLister: ipAddressInformer.Lister(), + ipAddressSynced: ipAddressInformer.Informer().HasSynced, + cidrQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "servicecidrs"), + svcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "services"), + ipQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ipaddresses"), + tree: iptree.New[string](), + workerLoopPeriod: time.Second, + broadcaster: eventBroadcaster, + recorder: recorder, + clock: clock.RealClock{}, } - serviceInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + _, _ = serviceInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) if err == nil { @@ -163,6 +165,29 @@ func NewRepairIPAddress(interval time.Duration, }, }, interval) + _, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + r.cidrQueue.Add(key) + } + }, + UpdateFunc: func(old interface{}, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + r.cidrQueue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + // IndexerInformer uses a delta queue, therefore for deletes we have to use this + // key function. + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + r.cidrQueue.Add(key) + } + }, + }) + ipAddressInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(obj) @@ -191,6 +216,7 @@ func NewRepairIPAddress(interval time.Duration, // RunUntil starts the controller until the provided ch is closed. func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{}) { + defer r.cidrQueue.ShutDown() defer r.ipQueue.ShutDown() defer r.svcQueue.ShutDown() r.broadcaster.StartRecordingToSink(stopCh) @@ -199,7 +225,7 @@ func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{}) klog.Info("Starting ipallocator-repair-controller") defer klog.Info("Shutting down ipallocator-repair-controller") - if !cache.WaitForNamedCacheSync("ipallocator-repair-controller", stopCh, r.ipAddressSynced, r.servicesSynced) { + if !cache.WaitForNamedCacheSync("ipallocator-repair-controller", stopCh, r.ipAddressSynced, r.servicesSynced, r.serviceCIDRSynced) { return } @@ -212,6 +238,9 @@ func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{}) } onFirstSuccess() + // serialize the operations on ServiceCIDRs + go wait.Until(r.cidrWorker, r.workerLoopPeriod, stopCh) + for i := 0; i < workers; i++ { go wait.Until(r.ipWorker, r.workerLoopPeriod, stopCh) go wait.Until(r.svcWorker, r.workerLoopPeriod, stopCh) @@ -330,20 +359,16 @@ func (r *RepairIPAddress) syncService(key string) error { runtime.HandleError(fmt.Errorf("the ClusterIP %s for Service %s/%s is not a valid IP; please recreate Service", ip, svc.Namespace, svc.Name)) continue } + // TODO(aojea) Refactor to abstract the IPs checks + family := getFamilyByIP(ip) - family := netutils.IPFamilyOf(ip) - v1Family := getFamilyByIP(ip) - network, ok := r.networkByFamily[family] - if !ok { - // this service is using an IPFamily no longer configured on cluster - r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate Service", ip, v1Family) - runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is of ip family that is no longer configured on cluster; please recreate Service", v1Family, ip, svc.Namespace, svc.Name)) - continue - } - if !network.Contains(ip) { + r.muTree.Lock() + prefixes := r.tree.GetHostIPPrefixMatches(ipToAddr(ip)) + r.muTree.Unlock() + if len(prefixes) == 0 { // ClusterIP is out of range - r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]: %s is not within the configured Service CIDR %s; please recreate service", v1Family, ip, network.String()) - runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not within the service CIDR %s; please recreate", v1Family, ip, svc.Namespace, svc.Name, network.String())) + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]: %s is not within the configured Service CIDR; please recreate service", family, ip) + runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not within any service CIDR; please recreate", family, ip, svc.Namespace, svc.Name)) continue } @@ -351,8 +376,8 @@ func (r *RepairIPAddress) syncService(key string) error { ipAddress, err := r.ipAddressLister.Get(ip.String()) if apierrors.IsNotFound(err) { // ClusterIP doesn't seem to be allocated, create it. - r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]: %s is not allocated; repairing", v1Family, ip) - runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not allocated; repairing", v1Family, ip, svc.Namespace, svc.Name)) + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]: %s is not allocated; repairing", family, ip) + runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not allocated; repairing", family, ip, svc.Namespace, svc.Name)) _, err := r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(ip.String(), svc), metav1.CreateOptions{}) if err != nil { return err @@ -360,14 +385,14 @@ func (r *RepairIPAddress) syncService(key string) error { continue } if err != nil { - r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate ClusterIP [%v]: %s due to an unknown error", v1Family, ip) - return fmt.Errorf("unable to allocate ClusterIP [%v]: %s for Service %s/%s due to an unknown error, will retry later: %v", v1Family, ip, svc.Namespace, svc.Name, err) + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate ClusterIP [%v]: %s due to an unknown error", family, ip) + return fmt.Errorf("unable to allocate ClusterIP [%v]: %s for Service %s/%s due to an unknown error, will retry later: %v", family, ip, svc.Namespace, svc.Name, err) } // IPAddress that belongs to a Service must reference a Service if ipAddress.Spec.ParentRef.Group != "" || ipAddress.Spec.ParentRef.Resource != "services" { - r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", v1Family, ip, svc.Namespace, svc.Name) + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", family, ip, svc.Namespace, svc.Name) if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil { return err } @@ -381,7 +406,7 @@ func (r *RepairIPAddress) syncService(key string) error { // it will keep deleting and recreating the same IPAddress changing the reference refService, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name) if err != nil { - r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", v1Family, ip, svc.Namespace, svc.Name) + r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", family, ip, svc.Namespace, svc.Name) if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil { return err } @@ -472,8 +497,7 @@ func (r *RepairIPAddress) syncIPAddress(key string) error { } // does not reference a Service but created by the service allocator, something else have changed it, delete it - if ipAddress.Spec.ParentRef.Group != "" || - ipAddress.Spec.ParentRef.Resource != "services" { + if ipAddress.Spec.ParentRef.Group != "" || ipAddress.Spec.ParentRef.Resource != "services" { runtime.HandleError(fmt.Errorf("IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef)) r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef) err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{}) @@ -523,6 +547,64 @@ func (r *RepairIPAddress) syncIPAddress(key string) error { return nil } + +func (r *RepairIPAddress) cidrWorker() { + for r.processNextWorkCIDR() { + } +} + +func (r *RepairIPAddress) processNextWorkCIDR() bool { + eKey, quit := r.cidrQueue.Get() + if quit { + return false + } + defer r.cidrQueue.Done(eKey) + + err := r.syncCIDRs() + r.handleCIDRErr(err, eKey) + + return true +} + +func (r *RepairIPAddress) handleCIDRErr(err error, key interface{}) { + if err == nil { + r.cidrQueue.Forget(key) + return + } + + if r.cidrQueue.NumRequeues(key) < maxRetries { + klog.V(2).InfoS("Error syncing ServiceCIDR, retrying", "serviceCIDR", key, "err", err) + r.cidrQueue.AddRateLimited(key) + return + } + + klog.Warningf("Dropping ServiceCIDR %q out of the queue: %v", key, err) + r.cidrQueue.Forget(key) + runtime.HandleError(err) +} + +// syncCIDRs rebuilds the radix tree based from the informers cache +func (r *RepairIPAddress) syncCIDRs() error { + cidrList, err := r.serviceCIDRLister.List(labels.Everything()) + if err != nil { + return err + } + + tree := iptree.New[string]() + for _, cidr := range cidrList { + if prefix, err := netip.ParsePrefix(cidr.Spec.IPv4); err == nil { // if is empty err will not be nil + tree.InsertPrefix(prefix, cidr.Name) + } + if prefix, err := netip.ParsePrefix(cidr.Spec.IPv6); err == nil { // if is empty err will not be nil + tree.InsertPrefix(prefix, cidr.Name) + } + } + r.muTree.Lock() + defer r.muTree.Unlock() + r.tree = tree + return nil +} + func newIPAddress(name string, svc *v1.Service) *networkingv1alpha1.IPAddress { family := string(v1.IPv4Protocol) if netutils.IsIPv6String(name) { @@ -587,3 +669,20 @@ func verifyIPAddressLabels(ip *networkingv1alpha1.IPAddress) bool { } return managedByController(ip) } + +// TODO(aojea) move to utils, already in pkg/registry/core/service/ipallocator/cidrallocator.go +// ipToAddr converts a net.IP to a netip.Addr +// if the net.IP is not valid it returns an empty netip.Addr{} +func ipToAddr(ip net.IP) netip.Addr { + // https://pkg.go.dev/net/netip#AddrFromSlice can return an IPv4 in IPv6 format + // so we have to check the IP family to return exactly the format that we want + // address, _ := netip.AddrFromSlice(net.ParseIPSloppy(192.168.0.1)) returns + // an address like ::ffff:192.168.0.1/32 + bytes := ip.To4() + if bytes == nil { + bytes = ip.To16() + } + // AddrFromSlice returns Addr{}, false if the input is invalid. + address, _ := netip.AddrFromSlice(bytes) + return address +} diff --git a/pkg/registry/core/service/ipallocator/controller/repairip_test.go b/pkg/registry/core/service/ipallocator/controller/repairip_test.go index f82b638bcef..7ad3ffb368a 100644 --- a/pkg/registry/core/service/ipallocator/controller/repairip_test.go +++ b/pkg/registry/core/service/ipallocator/controller/repairip_test.go @@ -33,7 +33,6 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/events" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator" - netutils "k8s.io/utils/net" ) var ( @@ -43,8 +42,9 @@ var ( type fakeRepair struct { *RepairIPAddress - serviceStore cache.Store - ipAddressStore cache.Store + serviceStore cache.Store + ipAddressStore cache.Store + serviceCIDRStore cache.Store } func newFakeRepair() (*fake.Clientset, *fakeRepair) { @@ -54,6 +54,9 @@ func newFakeRepair() (*fake.Clientset, *fakeRepair) { serviceInformer := informerFactory.Core().V1().Services() serviceIndexer := serviceInformer.Informer().GetIndexer() + serviceCIDRInformer := informerFactory.Networking().V1alpha1().ServiceCIDRs() + serviceCIDRIndexer := serviceCIDRInformer.Informer().GetIndexer() + ipInformer := informerFactory.Networking().V1alpha1().IPAddresses() ipIndexer := ipInformer.Informer().GetIndexer() @@ -72,22 +75,13 @@ func newFakeRepair() (*fake.Clientset, *fakeRepair) { return false, &networkingv1alpha1.IPAddress{}, err })) - _, primary, err := netutils.ParseCIDRSloppy(serviceCIDRv4) - if err != nil { - panic(err) - } - _, secondary, err := netutils.ParseCIDRSloppy(serviceCIDRv6) - if err != nil { - panic(err) - } r := NewRepairIPAddress(0*time.Second, fakeClient, - primary, - secondary, serviceInformer, + serviceCIDRInformer, ipInformer, ) - return fakeClient, &fakeRepair{r, serviceIndexer, ipIndexer} + return fakeClient, &fakeRepair{r, serviceIndexer, ipIndexer, serviceCIDRIndexer} } func TestRepairServiceIP(t *testing.T) { @@ -95,6 +89,7 @@ func TestRepairServiceIP(t *testing.T) { name string svcs []*v1.Service ipAddresses []*networkingv1alpha1.IPAddress + cidrs []*networkingv1alpha1.ServiceCIDR expectedIPs []string actions [][]string // verb and resource events []string @@ -105,6 +100,9 @@ func TestRepairServiceIP(t *testing.T) { ipAddresses: []*networkingv1alpha1.IPAddress{ newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, expectedIPs: []string{"10.0.1.1"}, actions: [][]string{}, events: []string{}, @@ -116,21 +114,45 @@ func TestRepairServiceIP(t *testing.T) { newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), newIPAddress("2001:db8::10", newService("test-svc", []string{"2001:db8::10"})), }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, expectedIPs: []string{"10.0.1.1", "2001:db8::10"}, actions: [][]string{}, events: []string{}, }, + { + name: "no changes needed dual stack multiple cidrs", + svcs: []*v1.Service{newService("test-svc", []string{"192.168.0.1", "2001:db8:a:b::10"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("192.168.0.1", newService("test-svc", []string{"192.168.0.1"})), + newIPAddress("2001:db8:a:b::10", newService("test-svc", []string{"2001:db8:a:b::10"})), + }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + newServiceCIDR("custom", "192.168.0.0/24", "2001:db8:a:b::/64"), + }, + expectedIPs: []string{"192.168.0.1", "2001:db8:a:b::10"}, + actions: [][]string{}, + events: []string{}, + }, // these two cases simulate migrating from bitmaps to IPAddress objects { - name: "create IPAddress single stack", - svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, + name: "create IPAddress single stack", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, expectedIPs: []string{"10.0.1.1"}, actions: [][]string{{"create", "ipaddresses"}}, events: []string{"Warning ClusterIPNotAllocated Cluster IP [IPv4]: 10.0.1.1 is not allocated; repairing"}, }, { - name: "create IPAddresses dual stack", - svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1", "2001:db8::10"})}, + name: "create IPAddresses dual stack", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1", "2001:db8::10"})}, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, expectedIPs: []string{"10.0.1.1", "2001:db8::10"}, actions: [][]string{{"create", "ipaddresses"}, {"create", "ipaddresses"}}, events: []string{ @@ -138,12 +160,26 @@ func TestRepairServiceIP(t *testing.T) { "Warning ClusterIPNotAllocated Cluster IP [IPv6]: 2001:db8::10 is not allocated; repairing", }, }, + { + name: "create IPAddress single stack from secondary", + svcs: []*v1.Service{newService("test-svc", []string{"192.168.1.1"})}, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + newServiceCIDR("custom", "192.168.1.0/24", ""), + }, + expectedIPs: []string{"192.168.1.1"}, + actions: [][]string{{"create", "ipaddresses"}}, + events: []string{"Warning ClusterIPNotAllocated Cluster IP [IPv4]: 192.168.1.1 is not allocated; repairing"}, + }, { name: "reconcile IPAddress single stack wrong reference", svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, ipAddresses: []*networkingv1alpha1.IPAddress{ newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})), }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, expectedIPs: []string{"10.0.1.1"}, actions: [][]string{{"delete", "ipaddresses"}, {"create", "ipaddresses"}}, events: []string{"Warning ClusterIPNotAllocated the ClusterIP [IPv4]: 10.0.1.1 for Service bar/test-svc has a wrong reference; repairing"}, @@ -155,6 +191,9 @@ func TestRepairServiceIP(t *testing.T) { newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})), newIPAddress("2001:db8::10", newService("test-svc2", []string{"2001:db8::10"})), }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, expectedIPs: []string{"10.0.1.1", "2001:db8::10"}, actions: [][]string{{"delete", "ipaddresses"}, {"create", "ipaddresses"}, {"delete", "ipaddresses"}, {"create", "ipaddresses"}}, events: []string{ @@ -169,18 +208,85 @@ func TestRepairServiceIP(t *testing.T) { newIPAddress("192.168.1.1", newService("test-svc", []string{"192.168.1.1"})), newIPAddress("2001:db8::10", newService("test-svc", []string{"2001:db8::10"})), }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, expectedIPs: []string{"2001:db8::10"}, actions: [][]string{}, - events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv4]: 192.168.1.1 is not within the configured Service CIDR 10.0.0.0/16; please recreate service"}, + events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv4]: 192.168.1.1 is not within the configured Service CIDR; please recreate service"}, }, { name: "one IP orphan", ipAddresses: []*networkingv1alpha1.IPAddress{ newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, actions: [][]string{{"delete", "ipaddresses"}}, events: []string{"Warning IPAddressNotAllocated IPAddress: 10.0.1.1 for Service bar/test-svc appears to have leaked: cleaning up"}, }, + { + name: "one IP out of range matching the network address", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.0.0"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("10.0.0.0", newService("test-svc", []string{"10.0.0.0"})), + }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, + expectedIPs: []string{"10.0.0.0"}, + actions: [][]string{}, + events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv4]: 10.0.0.0 is not within the configured Service CIDR; please recreate service"}, + }, + { + name: "one IP out of range matching the broadcast address", + svcs: []*v1.Service{newService("test-svc", []string{"10.0.255.255"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("10.0.255.255", newService("test-svc", []string{"10.0.255.255"})), + }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, + expectedIPs: []string{"10.0.255.255"}, + actions: [][]string{}, + events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv4]: 10.0.255.255 is not within the configured Service CIDR; please recreate service"}, + }, + { + name: "one IPv6 out of range matching the subnet address", + svcs: []*v1.Service{newService("test-svc", []string{"2001:db8::"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("2001:db8::", newService("test-svc", []string{"2001:db8::"})), + }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, + expectedIPs: []string{"2001:db8::"}, + actions: [][]string{}, + events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv6]: 2001:db8:: is not within the configured Service CIDR; please recreate service"}, + }, + { + name: "one IPv6 matching the broadcast address", + svcs: []*v1.Service{newService("test-svc", []string{"2001:db8::ffff:ffff:ffff:ffff"})}, + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("2001:db8::ffff:ffff:ffff:ffff", newService("test-svc", []string{"2001:db8::ffff:ffff:ffff:ffff"})), + }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, + expectedIPs: []string{"2001:db8::ffff:ffff:ffff:ffff"}, + }, + { + name: "one IP orphan matching the broadcast address", + ipAddresses: []*networkingv1alpha1.IPAddress{ + newIPAddress("10.0.255.255", newService("test-svc", []string{"10.0.255.255"})), + }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, + actions: [][]string{{"delete", "ipaddresses"}}, + events: []string{"Warning IPAddressNotAllocated IPAddress: 10.0.255.255 for Service bar/test-svc appears to have leaked: cleaning up"}, + }, { name: "Two IPAddresses referencing the same service", svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, @@ -188,6 +294,9 @@ func TestRepairServiceIP(t *testing.T) { newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), newIPAddress("10.0.1.2", newService("test-svc", []string{"10.0.1.1"})), }, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, actions: [][]string{{"delete", "ipaddresses"}}, events: []string{"Warning IPAddressWrongReference IPAddress: 10.0.1.2 for Service bar/test-svc has a wrong reference; cleaning up"}, }, @@ -200,7 +309,10 @@ func TestRepairServiceIP(t *testing.T) { ipAddresses: []*networkingv1alpha1.IPAddress{ newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})), }, - events: []string{"Warning ClusterIPAlreadyAllocated Cluster IP [4]:10.0.1.1 was assigned to multiple services; please recreate service"}, + cidrs: []*networkingv1alpha1.ServiceCIDR{ + newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6), + }, + events: []string{"Warning ClusterIPAlreadyAllocated Cluster IP [IPv4]:10.0.1.1 was assigned to multiple services; please recreate service"}, }, } @@ -208,9 +320,21 @@ func TestRepairServiceIP(t *testing.T) { t.Run(test.name, func(t *testing.T) { c, r := newFakeRepair() + // add cidrs + for _, cidr := range test.cidrs { + err := r.serviceCIDRStore.Add(cidr) + if err != nil { + t.Errorf("Unexpected error trying to add Service %v object: %v", cidr, err) + } + } + err := r.syncCIDRs() + if err != nil { + t.Fatal(err) + } // override for testing r.servicesSynced = func() bool { return true } r.ipAddressSynced = func() bool { return true } + r.serviceCIDRSynced = func() bool { return true } recorder := events.NewFakeRecorder(100) r.recorder = recorder for _, svc := range test.svcs { @@ -228,7 +352,7 @@ func TestRepairServiceIP(t *testing.T) { } } - err := r.runOnce() + err = r.runOnce() if err != nil { t.Fatal(err) } @@ -402,6 +526,19 @@ func newService(name string, ips []string) *v1.Service { return svc } +func newServiceCIDR(name, ipv4, ipv6 string) *networkingv1alpha1.ServiceCIDR { + serviceCIDR := &networkingv1alpha1.ServiceCIDR{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: networkingv1alpha1.ServiceCIDRSpec{ + IPv4: ipv4, + IPv6: ipv6, + }, + } + return serviceCIDR +} + func expectAction(t *testing.T, actions []k8stesting.Action, expected [][]string) { t.Helper() if len(actions) != len(expected) {