diff --git a/pkg/registry/core/service/ipallocator/cidrallocator.go b/pkg/registry/core/service/ipallocator/cidrallocator.go new file mode 100644 index 00000000000..a524cde6a2c --- /dev/null +++ b/pkg/registry/core/service/ipallocator/cidrallocator.go @@ -0,0 +1,454 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipallocator + +import ( + "fmt" + "net" + "net/netip" + "sync" + "time" + + v1 "k8s.io/api/core/v1" + networkingv1alpha1 "k8s.io/api/networking/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + networkingv1alpha1informers "k8s.io/client-go/informers/networking/v1alpha1" + networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1" + networkingv1alpha1listers "k8s.io/client-go/listers/networking/v1alpha1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + api "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/util/iptree" + netutils "k8s.io/utils/net" +) + +// MetaAllocator maintains a Tree with the ServiceCIDRs containing an IP Allocator +// on the nodes. Since each allocator doesn't stored the IPAddresses because it reads +// them from the informer cache, it is cheap to create and delete IP Allocators. +// MetaAllocator forwards the request to any of the internal allocators that has free +// addresses. + +// MetaAllocator implements current allocator interface using +// ServiceCIDR and IPAddress API objects. +type MetaAllocator struct { + client networkingv1alpha1client.NetworkingV1alpha1Interface + serviceCIDRLister networkingv1alpha1listers.ServiceCIDRLister + serviceCIDRSynced cache.InformerSynced + ipAddressLister networkingv1alpha1listers.IPAddressLister + ipAddressSynced cache.InformerSynced + ipAddressInformer networkingv1alpha1informers.IPAddressInformer + queue workqueue.RateLimitingInterface + + internalStopCh chan struct{} + + muTree sync.Mutex + tree *iptree.Tree[*Allocator] + + ipFamily api.IPFamily +} + +var _ Interface = &MetaAllocator{} + +// NewMetaAllocator returns an IP allocator associated to a network range +// that use the IPAddress and ServiceCIDR objects to track the assigned IP addresses, +// using an informer cache as storage. +func NewMetaAllocator( + client networkingv1alpha1client.NetworkingV1alpha1Interface, + serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer, + ipAddressInformer networkingv1alpha1informers.IPAddressInformer, + isIPv6 bool, +) (*MetaAllocator, error) { + + family := api.IPv4Protocol + if isIPv6 { + family = api.IPv6Protocol + } + + c := &MetaAllocator{ + client: client, + serviceCIDRLister: serviceCIDRInformer.Lister(), + serviceCIDRSynced: serviceCIDRInformer.Informer().HasSynced, + ipAddressLister: ipAddressInformer.Lister(), + ipAddressSynced: ipAddressInformer.Informer().HasSynced, + ipAddressInformer: ipAddressInformer, + queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: ControllerName}), + internalStopCh: make(chan struct{}), + tree: iptree.New[*Allocator](), + ipFamily: family, + } + + _, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.addServiceCIDR, + UpdateFunc: c.updateServiceCIDR, + DeleteFunc: c.deleteServiceCIDR, + }) + + go c.run() + + return c, nil +} + +func (c *MetaAllocator) addServiceCIDR(obj interface{}) { + key, err := cache.MetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } +} +func (c *MetaAllocator) updateServiceCIDR(old, new interface{}) { + key, err := cache.MetaNamespaceKeyFunc(new) + if err == nil { + c.queue.Add(key) + } +} + +func (c *MetaAllocator) deleteServiceCIDR(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err == nil { + c.queue.Add(key) + } +} + +func (c *MetaAllocator) run() { + defer runtime.HandleCrash() + defer c.queue.ShutDown() + klog.Info("Starting ServiceCIDR Allocator Controller") + defer klog.Info("Stopping ServiceCIDR Allocator Controllerr") + + // Wait for all involved caches to be synced, before processing items from the queue is started + if !cache.WaitForCacheSync(c.internalStopCh, c.serviceCIDRSynced, c.ipAddressSynced) { + runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + + // this is single threaded only one serviceCIDR at a time + go wait.Until(c.runWorker, time.Second, c.internalStopCh) + + <-c.internalStopCh +} + +func (c *MetaAllocator) runWorker() { + for c.processNextItem() { + } +} + +func (c *MetaAllocator) processNextItem() bool { + // Wait until there is a new item in the working queue + key, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(key) + + err := c.syncTree() + // Handle the error if something went wrong during the execution of the business logic + if err != nil { + if c.queue.NumRequeues(key) < 5 { + klog.Infof("Error syncing cidr %v: %v", key, err) + c.queue.AddRateLimited(key) + return true + } + } + c.queue.Forget(key) + return true +} + +// syncTree syncs the ipTrees from the informer cache +// It deletes or creates allocator and sets the corresponding state +func (c *MetaAllocator) syncTree() error { + now := time.Now() + defer func() { + klog.Infof("Finished sync for CIDRs took %v", time.Since(now)) + }() + + serviceCIDRs, err := c.serviceCIDRLister.List(labels.Everything()) + if err != nil { + return err + } + + cidrsSet := sets.New[string]() + cidrState := map[string]bool{} + for _, cidr := range serviceCIDRs { + ready := true + if !isReady(cidr) || !cidr.DeletionTimestamp.IsZero() { + ready = false + } + + if c.ipFamily == api.IPv4Protocol && cidr.Spec.IPv4 != "" { + cidrsSet.Insert(cidr.Spec.IPv4) + cidrState[cidr.Spec.IPv4] = ready + } + if c.ipFamily == api.IPv6Protocol && cidr.Spec.IPv6 != "" { + cidrsSet.Insert(cidr.Spec.IPv6) + cidrState[cidr.Spec.IPv6] = ready + } + } + + // obtain the existing allocators and set the existing state + treeSet := sets.New[string]() + c.muTree.Lock() + c.tree.DepthFirstWalk(c.ipFamily == api.IPv6Protocol, func(k netip.Prefix, v *Allocator) bool { + v.ready.Store(cidrState[k.String()]) + treeSet.Insert(k.String()) + return false + }) + c.muTree.Unlock() + cidrsToRemove := treeSet.Difference(cidrsSet) + cidrsToAdd := cidrsSet.Difference(treeSet) + + errs := []error{} + // Add new allocators + for _, cidr := range cidrsToAdd.UnsortedList() { + _, ipnet, err := netutils.ParseCIDRSloppy(cidr) + if err != nil { + return err + } + // New ServiceCIDR, create new allocator + allocator, err := NewIPAllocator(ipnet, c.client, c.ipAddressInformer) + if err != nil { + errs = append(errs, err) + continue + } + allocator.ready.Store(cidrState[cidr]) + prefix, err := netip.ParsePrefix(cidr) + if err != nil { + return err + } + c.addAllocator(prefix, allocator) + klog.Infof("Created ClusterIP allocator for Service CIDR %s", cidr) + } + // Remove allocators that no longer exist + for _, cidr := range cidrsToRemove.UnsortedList() { + prefix, err := netip.ParsePrefix(cidr) + if err != nil { + return err + } + c.deleteAllocator(prefix) + } + + return utilerrors.NewAggregate(errs) +} + +func (c *MetaAllocator) getAllocator(ip net.IP) (*Allocator, error) { + c.muTree.Lock() + defer c.muTree.Unlock() + + var prefix netip.Prefix + if c.ipFamily == api.IPv4Protocol { + prefix = netip.PrefixFrom(ipToAddr(ip), 32) + } else { + prefix = netip.PrefixFrom(ipToAddr(ip), 128) + } + // Use the largest subnet to allocate addresses because + // all the other subnets will be contained. + _, allocator, ok := c.tree.ShortestPrefixMatch(prefix) + if !ok { + klog.V(2).Infof("Could not get allocator for IP %s", ip.String()) + return nil, ErrMismatchedNetwork + } + return allocator, nil +} + +func (c *MetaAllocator) addAllocator(cidr netip.Prefix, allocator *Allocator) { + c.muTree.Lock() + defer c.muTree.Unlock() + c.tree.InsertPrefix(cidr, allocator) +} + +func (c *MetaAllocator) deleteAllocator(cidr netip.Prefix) { + c.muTree.Lock() + defer c.muTree.Unlock() + ok := c.tree.DeletePrefix(cidr) + if ok { + klog.V(3).Infof("CIDR %s deleted", cidr) + } +} + +func (c *MetaAllocator) AllocateService(service *api.Service, ip net.IP) error { + allocator, err := c.getAllocator(ip) + if err != nil { + return err + } + return allocator.AllocateService(service, ip) +} + +func (c *MetaAllocator) Allocate(ip net.IP) error { + allocator, err := c.getAllocator(ip) + if err != nil { + return err + } + return allocator.Allocate(ip) +} + +func (c *MetaAllocator) AllocateNextService(service *api.Service) (net.IP, error) { + c.muTree.Lock() + defer c.muTree.Unlock() + + // TODO(aojea) add strategy to return a random allocator but + // taking into consideration the number of addresses of each allocator. + // Per example, if we have allocator A and B with 256 and 1024 possible + // addresses each, the chances to get B has to be 4 times the chances to + // get A so we can spread the load of IPs randomly. + // However, we need to validate the best strategy before going to Beta. + isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol) + for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) { + ip, err := allocator.AllocateNextService(service) + if err == nil { + return ip, nil + } + } + return nil, ErrFull +} + +func (c *MetaAllocator) AllocateNext() (net.IP, error) { + c.muTree.Lock() + defer c.muTree.Unlock() + + // TODO(aojea) add strategy to return a random allocator but + // taking into consideration the number of addresses of each allocator. + // Per example, if we have allocator A and B with 256 and 1024 possible + // addresses each, the chances to get B has to be 4 times the chances to + // get A so we can spread the load of IPs randomly. + // However, we need to validate the best strategy before going to Beta. + isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol) + for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) { + ip, err := allocator.AllocateNext() + if err == nil { + return ip, nil + } + } + return nil, ErrFull +} + +func (c *MetaAllocator) Release(ip net.IP) error { + allocator, err := c.getAllocator(ip) + if err != nil { + return err + } + return allocator.Release(ip) + +} +func (c *MetaAllocator) ForEach(f func(ip net.IP)) { + ipLabelSelector := labels.Set(map[string]string{ + networkingv1alpha1.LabelIPAddressFamily: string(c.IPFamily()), + networkingv1alpha1.LabelManagedBy: ControllerName, + }).AsSelectorPreValidated() + ips, err := c.ipAddressLister.List(ipLabelSelector) + if err != nil { + return + } + for _, ip := range ips { + f(netutils.ParseIPSloppy(ip.Name)) + } +} + +func (c *MetaAllocator) CIDR() net.IPNet { + return net.IPNet{} + +} +func (c *MetaAllocator) IPFamily() api.IPFamily { + return c.ipFamily +} +func (c *MetaAllocator) Has(ip net.IP) bool { + allocator, err := c.getAllocator(ip) + if err != nil { + return false + } + return allocator.Has(ip) +} +func (c *MetaAllocator) Destroy() { + select { + case <-c.internalStopCh: + default: + close(c.internalStopCh) + } +} + +// for testing +func (c *MetaAllocator) Used() int { + ipLabelSelector := labels.Set(map[string]string{ + networkingv1alpha1.LabelIPAddressFamily: string(c.IPFamily()), + networkingv1alpha1.LabelManagedBy: ControllerName, + }).AsSelectorPreValidated() + ips, err := c.ipAddressLister.List(ipLabelSelector) + if err != nil { + return 0 + } + return len(ips) +} + +// for testing +func (c *MetaAllocator) Free() int { + c.muTree.Lock() + defer c.muTree.Unlock() + + size := 0 + isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol) + for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) { + size += int(allocator.size) + } + return size - c.Used() +} + +func (c *MetaAllocator) EnableMetrics() {} + +// DryRun returns a random allocator +func (c *MetaAllocator) DryRun() Interface { + c.muTree.Lock() + defer c.muTree.Unlock() + isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol) + for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) { + return allocator.DryRun() + } + return &Allocator{} +} + +func isReady(serviceCIDR *networkingv1alpha1.ServiceCIDR) bool { + if serviceCIDR == nil { + return false + } + + for _, condition := range serviceCIDR.Status.Conditions { + if condition.Type == networkingv1alpha1.ServiceCIDRConditionReady { + return condition.Status == metav1.ConditionStatus(metav1.ConditionTrue) + } + } + // if condition is not found assume it is ready to handle scenarios + // where the kube-controller-manager is not running + return true +} + +// ipToAddr converts a net.IP to a netip.Addr +// if the net.IP is not valid it returns an empty netip.Addr{} +func ipToAddr(ip net.IP) netip.Addr { + // https://pkg.go.dev/net/netip#AddrFromSlice can return an IPv4 in IPv6 format + // so we have to check the IP family to return exactly the format that we want + // address, _ := netip.AddrFromSlice(net.ParseIPSloppy(192.168.0.1)) returns + // an address like ::ffff:192.168.0.1/32 + bytes := ip.To4() + if bytes == nil { + bytes = ip.To16() + } + // AddrFromSlice returns Addr{}, false if the input is invalid. + address, _ := netip.AddrFromSlice(bytes) + return address +} diff --git a/pkg/registry/core/service/ipallocator/cidrallocator_test.go b/pkg/registry/core/service/ipallocator/cidrallocator_test.go new file mode 100644 index 00000000000..dca46828473 --- /dev/null +++ b/pkg/registry/core/service/ipallocator/cidrallocator_test.go @@ -0,0 +1,488 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package ipallocator + +import ( + "context" + "fmt" + "testing" + "time" + + networkingv1alpha1 "k8s.io/api/networking/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + k8stesting "k8s.io/client-go/testing" + netutils "k8s.io/utils/net" +) + +func newTestMetaAllocator() (*MetaAllocator, error) { + client := fake.NewSimpleClientset() + + informerFactory := informers.NewSharedInformerFactory(client, 0*time.Second) + serviceCIDRInformer := informerFactory.Networking().V1alpha1().ServiceCIDRs() + serviceCIDRStore := serviceCIDRInformer.Informer().GetIndexer() + serviceCIDRInformer.Informer().HasSynced() + ipInformer := informerFactory.Networking().V1alpha1().IPAddresses() + ipStore := ipInformer.Informer().GetIndexer() + + client.PrependReactor("create", "servicecidrs", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { + cidr := action.(k8stesting.CreateAction).GetObject().(*networkingv1alpha1.ServiceCIDR) + _, exists, err := serviceCIDRStore.GetByKey(cidr.Name) + if exists && err != nil { + return false, nil, fmt.Errorf("cidr already exist") + } + cidr.Generation = 1 + err = serviceCIDRStore.Add(cidr) + return false, cidr, err + })) + client.PrependReactor("delete", "servicecidrs", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { + name := action.(k8stesting.DeleteAction).GetName() + obj, exists, err := serviceCIDRStore.GetByKey(name) + cidr := &networkingv1alpha1.ServiceCIDR{} + if exists && err == nil { + cidr = obj.(*networkingv1alpha1.ServiceCIDR) + err = serviceCIDRStore.Delete(cidr) + } + return false, cidr, err + })) + + client.PrependReactor("create", "ipaddresses", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { + ip := action.(k8stesting.CreateAction).GetObject().(*networkingv1alpha1.IPAddress) + _, exists, err := ipStore.GetByKey(ip.Name) + if exists && err != nil { + return false, nil, fmt.Errorf("ip already exist") + } + ip.Generation = 1 + err = ipStore.Add(ip) + return false, ip, err + })) + client.PrependReactor("delete", "ipaddresses", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) { + name := action.(k8stesting.DeleteAction).GetName() + obj, exists, err := ipStore.GetByKey(name) + ip := &networkingv1alpha1.IPAddress{} + if exists && err == nil { + ip = obj.(*networkingv1alpha1.IPAddress) + err = ipStore.Delete(ip) + } + return false, ip, err + })) + + c, err := NewMetaAllocator(client.NetworkingV1alpha1(), serviceCIDRInformer, ipInformer, false) + if err != nil { + return nil, err + } + // we can not force the state of the informers to be synced without racing + // so we run our worker here + go wait.Until(c.runWorker, time.Second, c.internalStopCh) + return c, nil +} + +func TestCIDRAllocateMultiple(t *testing.T) { + r, err := newTestMetaAllocator() + if err != nil { + t.Fatal(err) + } + defer r.Destroy() + + if f := r.Free(); f != 0 { + t.Errorf("free: %d", f) + } + if _, err := r.AllocateNext(); err == nil { + t.Error(err) + } + + cidr := newServiceCIDR("test", "192.168.0.0/28") + _, err = r.client.ServiceCIDRs().Create(context.Background(), cidr, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + r.addServiceCIDR(cidr) + // wait for the cidr to be processed and set the informer synced + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1")) + if err != nil { + t.Logf("unexpected error %v", err) + return false, nil + } + allocator.ipAddressSynced = func() bool { return true } + return allocator.ready.Load(), nil + }) + if err != nil { + t.Fatal(err) + } + found := sets.NewString() + count := 0 + for r.Free() > 0 { + ip, err := r.AllocateNext() + if err != nil { + t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err) + } + count++ + if found.Has(ip.String()) { + t.Fatalf("allocated %s twice: %d", ip, count) + } + found.Insert(ip.String()) + } + if count != 14 { + t.Fatalf("expected 14 IPs got %d", count) + } + if _, err := r.AllocateNext(); err == nil { + t.Fatal(err) + } + + cidr2 := newServiceCIDR("test2", "10.0.0.0/28") + _, err = r.client.ServiceCIDRs().Create(context.Background(), cidr2, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + r.addServiceCIDR(cidr2) + // wait for the cidr to be processed + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + allocator, err := r.getAllocator(netutils.ParseIPSloppy("10.0.0.11")) + if err != nil { + return false, nil + } + allocator.ipAddressSynced = func() bool { return true } + return allocator.ready.Load(), nil + }) + if err != nil { + t.Fatal(err) + } + // allocate one IP from the new allocator + err = r.Allocate(netutils.ParseIPSloppy("10.0.0.11")) + if err != nil { + t.Fatalf("error allocating IP 10.0.0.11 from new allocator: %v", err) + } + count++ + for r.Free() > 0 { + ip, err := r.AllocateNext() + if err != nil { + t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err) + } + count++ + if found.Has(ip.String()) { + t.Fatalf("allocated %s twice: %d", ip, count) + } + found.Insert(ip.String()) + } + if count != 28 { + t.Fatalf("expected 28 IPs got %d", count) + } + if _, err := r.AllocateNext(); err == nil { + t.Fatal(err) + } + +} + +func TestCIDRAllocateShadow(t *testing.T) { + r, err := newTestMetaAllocator() + if err != nil { + t.Fatal(err) + } + defer r.Destroy() + + if f := r.Free(); f != 0 { + t.Errorf("free: %d", f) + } + if _, err := r.AllocateNext(); err == nil { + t.Error(err) + } + + cidr := newServiceCIDR("test", "192.168.1.0/24") + _, err = r.client.ServiceCIDRs().Create(context.Background(), cidr, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + r.addServiceCIDR(cidr) + // wait for the cidr to be processed and set the informer synced + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.1.0")) + if err != nil { + return false, nil + } + allocator.ipAddressSynced = func() bool { return true } + return allocator.ready.Load(), nil + }) + if err != nil { + t.Fatal(err) + } + // allocate one IP from the new allocator + err = r.Allocate(netutils.ParseIPSloppy("192.168.1.0")) + if err == nil { + t.Fatalf("unexpected allocation for IP 192.168.1.0") + } + + if f := r.Used(); f != 0 { + t.Errorf("used: %d", f) + } + + cidr2 := newServiceCIDR("test2", "192.168.0.0/16") + _, err = r.client.ServiceCIDRs().Create(context.Background(), cidr2, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + r.addServiceCIDR(cidr2) + // wait for the cidr to be processed + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.0")) + if err != nil { + return false, nil + } + allocator.ipAddressSynced = func() bool { return true } + return allocator.ready.Load(), nil + }) + if err != nil { + t.Fatal(err) + } + // allocate one IP from the new allocator + err = r.Allocate(netutils.ParseIPSloppy("192.168.1.0")) + if err != nil { + t.Fatalf("error allocating IP 192.168.1.0 from new allocator: %v", err) + } + + if f := r.Used(); f != 1 { + t.Errorf("used: %d", f) + } + +} + +func TestCIDRAllocateGrow(t *testing.T) { + r, err := newTestMetaAllocator() + if err != nil { + t.Fatal(err) + } + defer r.Destroy() + + if f := r.Free(); f != 0 { + t.Errorf("free: %d", f) + } + if _, err := r.AllocateNext(); err == nil { + t.Error(err) + } + + cidr := newServiceCIDR("test", "192.168.0.0/28") + _, err = r.client.ServiceCIDRs().Create(context.Background(), cidr, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + r.addServiceCIDR(cidr) + // wait for the cidr to be processed + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1")) + if err != nil { + return false, nil + } + allocator.ipAddressSynced = func() bool { return true } + return allocator.ready.Load(), nil + }) + if err != nil { + t.Fatal(err) + } + found := sets.NewString() + count := 0 + for r.Free() > 0 { + ip, err := r.AllocateNext() + if err != nil { + t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err) + } + count++ + if found.Has(ip.String()) { + t.Fatalf("allocated %s twice: %d", ip, count) + } + found.Insert(ip.String()) + } + if count != 14 { + t.Fatalf("expected 14 IPs got %d", count) + } + if _, err := r.AllocateNext(); err == nil { + t.Fatal(err) + } + + cidr2 := newServiceCIDR("test2", "192.168.0.0/24") + _, err = r.client.ServiceCIDRs().Create(context.Background(), cidr2, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + r.addServiceCIDR(cidr2) + // wait for the cidr to be processed + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.253")) + if err != nil { + return false, nil + } + allocator.ipAddressSynced = func() bool { return true } + return allocator.ready.Load(), nil + }) + if err != nil { + t.Fatal(err) + } + + for r.Free() > 0 { + ip, err := r.AllocateNext() + if err != nil { + t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err) + } + count++ + if found.Has(ip.String()) { + t.Fatalf("allocated %s twice: %d", ip, count) + } + found.Insert(ip.String()) + } + if count != 254 { + t.Fatalf("expected 254 IPs got %d", count) + } + if _, err := r.AllocateNext(); err == nil { + t.Fatal(err) + } + +} + +func TestCIDRAllocateShrink(t *testing.T) { + r, err := newTestMetaAllocator() + if err != nil { + t.Fatal(err) + } + defer r.Destroy() + + if f := r.Free(); f != 0 { + t.Errorf("free: %d", f) + } + if _, err := r.AllocateNext(); err == nil { + t.Error(err) + } + + cidr := newServiceCIDR("test", "192.168.0.0/24") + _, err = r.client.ServiceCIDRs().Create(context.Background(), cidr, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + r.addServiceCIDR(cidr) + // wait for the cidr to be processed + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1")) + if err != nil { + return false, nil + } + allocator.ipAddressSynced = func() bool { return true } + return allocator.ready.Load(), nil + }) + if err != nil { + t.Fatal(err) + } + found := sets.NewString() + count := 0 + for r.Free() > 0 { + ip, err := r.AllocateNext() + if err != nil { + t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err) + } + count++ + if found.Has(ip.String()) { + t.Fatalf("allocated %s twice: %d", ip, count) + } + found.Insert(ip.String()) + } + if count != 254 { + t.Fatalf("expected 254 IPs got %d", count) + } + if _, err := r.AllocateNext(); err == nil { + t.Fatal(err) + } + for _, ip := range found.List() { + err = r.Release(netutils.ParseIPSloppy(ip)) + if err != nil { + t.Fatalf("unexpected error releasing ip %s", err) + } + } + if r.Used() > 0 { + t.Fatalf("expected allocator to be empty, got %d", r.Free()) + } + cidr2 := newServiceCIDR("cidr2", "192.168.0.0/28") + _, err = r.client.ServiceCIDRs().Create(context.Background(), cidr2, metav1.CreateOptions{}) + if err != nil { + t.Fatal(err) + } + r.addServiceCIDR(cidr2) + err = r.client.ServiceCIDRs().Delete(context.Background(), cidr.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatal(err) + } + r.deleteServiceCIDR(cidr) + + // wait for the cidr to be processed (delete ServiceCIDR) + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + _, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.253")) + if err != nil { + return true, nil + } + + return false, nil + }) + if err != nil { + t.Fatal(err) + } + // wait for the cidr to be processed (create ServiceCIDR) + err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { + allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1")) + if err != nil { + return false, nil + } + allocator.ipAddressSynced = func() bool { return true } + return allocator.ready.Load(), nil + }) + if err != nil { + t.Fatal(err) + } + count = 0 + for r.Free() > 0 { + _, err := r.AllocateNext() + if err != nil { + t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err) + } + count++ + } + if count != 14 { + t.Fatalf("expected 14 IPs got %d", count) + } + if _, err := r.AllocateNext(); err == nil { + t.Fatal(err) + } + +} + +func newServiceCIDR(name, cidrV4 string) *networkingv1alpha1.ServiceCIDR { + return &networkingv1alpha1.ServiceCIDR{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: networkingv1alpha1.ServiceCIDRSpec{ + IPv4: cidrV4, + }, + Status: networkingv1alpha1.ServiceCIDRStatus{ + Conditions: []metav1.Condition{ + { + Type: string(networkingv1alpha1.ServiceCIDRConditionReady), + Status: metav1.ConditionTrue, + }, + }, + }, + } +} diff --git a/pkg/registry/core/service/ipallocator/interfaces.go b/pkg/registry/core/service/ipallocator/interfaces.go index 0319b315e45..2bf29802538 100644 --- a/pkg/registry/core/service/ipallocator/interfaces.go +++ b/pkg/registry/core/service/ipallocator/interfaces.go @@ -45,6 +45,7 @@ var ( ErrFull = errors.New("range is full") ErrAllocated = errors.New("provided IP is already allocated") ErrMismatchedNetwork = errors.New("the provided network does not match the current range") + ErrNotReady = errors.New("allocator not ready") ) type ErrNotInRange struct { diff --git a/pkg/registry/core/service/ipallocator/ipallocator.go b/pkg/registry/core/service/ipallocator/ipallocator.go index b5e49bd37a4..1f90c992b92 100644 --- a/pkg/registry/core/service/ipallocator/ipallocator.go +++ b/pkg/registry/core/service/ipallocator/ipallocator.go @@ -24,6 +24,7 @@ import ( "math/rand" "net" "net/netip" + "sync/atomic" "time" networkingv1alpha1 "k8s.io/api/networking/v1alpha1" @@ -58,6 +59,9 @@ type Allocator struct { client networkingv1alpha1client.NetworkingV1alpha1Interface ipAddressLister networkingv1alpha1listers.IPAddressLister ipAddressSynced cache.InformerSynced + // ready indicates if the allocator is able to allocate new IP addresses. + // This is required because it depends on the ServiceCIDR to be ready. + ready atomic.Bool // metrics is a metrics recorder that can be disabled metrics metricsRecorderInterface @@ -133,7 +137,7 @@ func NewIPAllocator( metricLabel: cidr.String(), rand: rand.New(rand.NewSource(time.Now().UnixNano())), } - + a.ready.Store(true) return a, nil } @@ -185,8 +189,8 @@ func (a *Allocator) AllocateService(svc *api.Service, ip net.IP) error { } func (a *Allocator) allocateService(svc *api.Service, ip net.IP, dryRun bool) error { - if !a.ipAddressSynced() { - return fmt.Errorf("allocator not ready") + if !a.ready.Load() || !a.ipAddressSynced() { + return ErrNotReady } addr, err := netip.ParseAddr(ip.String()) if err != nil { @@ -227,8 +231,8 @@ func (a *Allocator) AllocateNextService(svc *api.Service) (net.IP, error) { // falls back to the lower subnet. // It starts allocating from a random IP within each range. func (a *Allocator) allocateNextService(svc *api.Service, dryRun bool) (net.IP, error) { - if !a.ipAddressSynced() { - return nil, fmt.Errorf("allocator not ready") + if !a.ready.Load() || !a.ipAddressSynced() { + return nil, ErrNotReady } if dryRun { // Don't bother finding a free value. It's racy and not worth the @@ -348,9 +352,6 @@ func (a *Allocator) Release(ip net.IP) error { } func (a *Allocator) release(ip net.IP, dryRun bool) error { - if !a.ipAddressSynced() { - return fmt.Errorf("allocator not ready") - } if dryRun { return nil } @@ -403,7 +404,7 @@ func (a *Allocator) IPFamily() api.IPFamily { return a.family } -// for testing +// for testing, it assumes this is the allocator is unique for the ipFamily func (a *Allocator) Used() int { ipLabelSelector := labels.Set(map[string]string{ networkingv1alpha1.LabelIPAddressFamily: string(a.IPFamily()), @@ -416,7 +417,7 @@ func (a *Allocator) Used() int { return len(ips) } -// for testing +// for testing, it assumes this is the allocator is unique for the ipFamily func (a *Allocator) Free() int { return int(a.size) - a.Used() }