expand IPAllocator to work with multiple ServiceCIDRs

Create a new allocator that uses the ServiceCIDRs configured in the
system to create IPAllocators.

The CIDRAllocator will create IPAllocators per parent ServiceCIDRs,
since we allow overlapping, there is no need to have an allocator
per ServiceCIDR.
The benefit of the IPAllocator is that uses the informer cache as
storage, hence, it does not need to keep cache and as only as logical
abstraction. This allows to create and delete IPAllocators without
any penalty.

IPAllocators can allocate IP addresses only if they are ready (not
being deleted)

Change-Id: I3fdda69991907c39cca3120fe2d850f14dcccec2
This commit is contained in:
Antonio Ojea 2023-03-13 22:25:43 +00:00
parent d3386e171a
commit 63fe539b4e
4 changed files with 954 additions and 10 deletions

View File

@ -0,0 +1,454 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipallocator
import (
"fmt"
"net"
"net/netip"
"sync"
"time"
v1 "k8s.io/api/core/v1"
networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
networkingv1alpha1informers "k8s.io/client-go/informers/networking/v1alpha1"
networkingv1alpha1client "k8s.io/client-go/kubernetes/typed/networking/v1alpha1"
networkingv1alpha1listers "k8s.io/client-go/listers/networking/v1alpha1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/util/iptree"
netutils "k8s.io/utils/net"
)
// MetaAllocator maintains a Tree with the ServiceCIDRs containing an IP Allocator
// on the nodes. Since each allocator doesn't stored the IPAddresses because it reads
// them from the informer cache, it is cheap to create and delete IP Allocators.
// MetaAllocator forwards the request to any of the internal allocators that has free
// addresses.
// MetaAllocator implements current allocator interface using
// ServiceCIDR and IPAddress API objects.
type MetaAllocator struct {
client networkingv1alpha1client.NetworkingV1alpha1Interface
serviceCIDRLister networkingv1alpha1listers.ServiceCIDRLister
serviceCIDRSynced cache.InformerSynced
ipAddressLister networkingv1alpha1listers.IPAddressLister
ipAddressSynced cache.InformerSynced
ipAddressInformer networkingv1alpha1informers.IPAddressInformer
queue workqueue.RateLimitingInterface
internalStopCh chan struct{}
muTree sync.Mutex
tree *iptree.Tree[*Allocator]
ipFamily api.IPFamily
}
var _ Interface = &MetaAllocator{}
// NewMetaAllocator returns an IP allocator associated to a network range
// that use the IPAddress and ServiceCIDR objects to track the assigned IP addresses,
// using an informer cache as storage.
func NewMetaAllocator(
client networkingv1alpha1client.NetworkingV1alpha1Interface,
serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer,
ipAddressInformer networkingv1alpha1informers.IPAddressInformer,
isIPv6 bool,
) (*MetaAllocator, error) {
family := api.IPv4Protocol
if isIPv6 {
family = api.IPv6Protocol
}
c := &MetaAllocator{
client: client,
serviceCIDRLister: serviceCIDRInformer.Lister(),
serviceCIDRSynced: serviceCIDRInformer.Informer().HasSynced,
ipAddressLister: ipAddressInformer.Lister(),
ipAddressSynced: ipAddressInformer.Informer().HasSynced,
ipAddressInformer: ipAddressInformer,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: ControllerName}),
internalStopCh: make(chan struct{}),
tree: iptree.New[*Allocator](),
ipFamily: family,
}
_, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addServiceCIDR,
UpdateFunc: c.updateServiceCIDR,
DeleteFunc: c.deleteServiceCIDR,
})
go c.run()
return c, nil
}
func (c *MetaAllocator) addServiceCIDR(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
}
func (c *MetaAllocator) updateServiceCIDR(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
c.queue.Add(key)
}
}
func (c *MetaAllocator) deleteServiceCIDR(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
c.queue.Add(key)
}
}
func (c *MetaAllocator) run() {
defer runtime.HandleCrash()
defer c.queue.ShutDown()
klog.Info("Starting ServiceCIDR Allocator Controller")
defer klog.Info("Stopping ServiceCIDR Allocator Controllerr")
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(c.internalStopCh, c.serviceCIDRSynced, c.ipAddressSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}
// this is single threaded only one serviceCIDR at a time
go wait.Until(c.runWorker, time.Second, c.internalStopCh)
<-c.internalStopCh
}
func (c *MetaAllocator) runWorker() {
for c.processNextItem() {
}
}
func (c *MetaAllocator) processNextItem() bool {
// Wait until there is a new item in the working queue
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncTree()
// Handle the error if something went wrong during the execution of the business logic
if err != nil {
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing cidr %v: %v", key, err)
c.queue.AddRateLimited(key)
return true
}
}
c.queue.Forget(key)
return true
}
// syncTree syncs the ipTrees from the informer cache
// It deletes or creates allocator and sets the corresponding state
func (c *MetaAllocator) syncTree() error {
now := time.Now()
defer func() {
klog.Infof("Finished sync for CIDRs took %v", time.Since(now))
}()
serviceCIDRs, err := c.serviceCIDRLister.List(labels.Everything())
if err != nil {
return err
}
cidrsSet := sets.New[string]()
cidrState := map[string]bool{}
for _, cidr := range serviceCIDRs {
ready := true
if !isReady(cidr) || !cidr.DeletionTimestamp.IsZero() {
ready = false
}
if c.ipFamily == api.IPv4Protocol && cidr.Spec.IPv4 != "" {
cidrsSet.Insert(cidr.Spec.IPv4)
cidrState[cidr.Spec.IPv4] = ready
}
if c.ipFamily == api.IPv6Protocol && cidr.Spec.IPv6 != "" {
cidrsSet.Insert(cidr.Spec.IPv6)
cidrState[cidr.Spec.IPv6] = ready
}
}
// obtain the existing allocators and set the existing state
treeSet := sets.New[string]()
c.muTree.Lock()
c.tree.DepthFirstWalk(c.ipFamily == api.IPv6Protocol, func(k netip.Prefix, v *Allocator) bool {
v.ready.Store(cidrState[k.String()])
treeSet.Insert(k.String())
return false
})
c.muTree.Unlock()
cidrsToRemove := treeSet.Difference(cidrsSet)
cidrsToAdd := cidrsSet.Difference(treeSet)
errs := []error{}
// Add new allocators
for _, cidr := range cidrsToAdd.UnsortedList() {
_, ipnet, err := netutils.ParseCIDRSloppy(cidr)
if err != nil {
return err
}
// New ServiceCIDR, create new allocator
allocator, err := NewIPAllocator(ipnet, c.client, c.ipAddressInformer)
if err != nil {
errs = append(errs, err)
continue
}
allocator.ready.Store(cidrState[cidr])
prefix, err := netip.ParsePrefix(cidr)
if err != nil {
return err
}
c.addAllocator(prefix, allocator)
klog.Infof("Created ClusterIP allocator for Service CIDR %s", cidr)
}
// Remove allocators that no longer exist
for _, cidr := range cidrsToRemove.UnsortedList() {
prefix, err := netip.ParsePrefix(cidr)
if err != nil {
return err
}
c.deleteAllocator(prefix)
}
return utilerrors.NewAggregate(errs)
}
func (c *MetaAllocator) getAllocator(ip net.IP) (*Allocator, error) {
c.muTree.Lock()
defer c.muTree.Unlock()
var prefix netip.Prefix
if c.ipFamily == api.IPv4Protocol {
prefix = netip.PrefixFrom(ipToAddr(ip), 32)
} else {
prefix = netip.PrefixFrom(ipToAddr(ip), 128)
}
// Use the largest subnet to allocate addresses because
// all the other subnets will be contained.
_, allocator, ok := c.tree.ShortestPrefixMatch(prefix)
if !ok {
klog.V(2).Infof("Could not get allocator for IP %s", ip.String())
return nil, ErrMismatchedNetwork
}
return allocator, nil
}
func (c *MetaAllocator) addAllocator(cidr netip.Prefix, allocator *Allocator) {
c.muTree.Lock()
defer c.muTree.Unlock()
c.tree.InsertPrefix(cidr, allocator)
}
func (c *MetaAllocator) deleteAllocator(cidr netip.Prefix) {
c.muTree.Lock()
defer c.muTree.Unlock()
ok := c.tree.DeletePrefix(cidr)
if ok {
klog.V(3).Infof("CIDR %s deleted", cidr)
}
}
func (c *MetaAllocator) AllocateService(service *api.Service, ip net.IP) error {
allocator, err := c.getAllocator(ip)
if err != nil {
return err
}
return allocator.AllocateService(service, ip)
}
func (c *MetaAllocator) Allocate(ip net.IP) error {
allocator, err := c.getAllocator(ip)
if err != nil {
return err
}
return allocator.Allocate(ip)
}
func (c *MetaAllocator) AllocateNextService(service *api.Service) (net.IP, error) {
c.muTree.Lock()
defer c.muTree.Unlock()
// TODO(aojea) add strategy to return a random allocator but
// taking into consideration the number of addresses of each allocator.
// Per example, if we have allocator A and B with 256 and 1024 possible
// addresses each, the chances to get B has to be 4 times the chances to
// get A so we can spread the load of IPs randomly.
// However, we need to validate the best strategy before going to Beta.
isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
ip, err := allocator.AllocateNextService(service)
if err == nil {
return ip, nil
}
}
return nil, ErrFull
}
func (c *MetaAllocator) AllocateNext() (net.IP, error) {
c.muTree.Lock()
defer c.muTree.Unlock()
// TODO(aojea) add strategy to return a random allocator but
// taking into consideration the number of addresses of each allocator.
// Per example, if we have allocator A and B with 256 and 1024 possible
// addresses each, the chances to get B has to be 4 times the chances to
// get A so we can spread the load of IPs randomly.
// However, we need to validate the best strategy before going to Beta.
isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
ip, err := allocator.AllocateNext()
if err == nil {
return ip, nil
}
}
return nil, ErrFull
}
func (c *MetaAllocator) Release(ip net.IP) error {
allocator, err := c.getAllocator(ip)
if err != nil {
return err
}
return allocator.Release(ip)
}
func (c *MetaAllocator) ForEach(f func(ip net.IP)) {
ipLabelSelector := labels.Set(map[string]string{
networkingv1alpha1.LabelIPAddressFamily: string(c.IPFamily()),
networkingv1alpha1.LabelManagedBy: ControllerName,
}).AsSelectorPreValidated()
ips, err := c.ipAddressLister.List(ipLabelSelector)
if err != nil {
return
}
for _, ip := range ips {
f(netutils.ParseIPSloppy(ip.Name))
}
}
func (c *MetaAllocator) CIDR() net.IPNet {
return net.IPNet{}
}
func (c *MetaAllocator) IPFamily() api.IPFamily {
return c.ipFamily
}
func (c *MetaAllocator) Has(ip net.IP) bool {
allocator, err := c.getAllocator(ip)
if err != nil {
return false
}
return allocator.Has(ip)
}
func (c *MetaAllocator) Destroy() {
select {
case <-c.internalStopCh:
default:
close(c.internalStopCh)
}
}
// for testing
func (c *MetaAllocator) Used() int {
ipLabelSelector := labels.Set(map[string]string{
networkingv1alpha1.LabelIPAddressFamily: string(c.IPFamily()),
networkingv1alpha1.LabelManagedBy: ControllerName,
}).AsSelectorPreValidated()
ips, err := c.ipAddressLister.List(ipLabelSelector)
if err != nil {
return 0
}
return len(ips)
}
// for testing
func (c *MetaAllocator) Free() int {
c.muTree.Lock()
defer c.muTree.Unlock()
size := 0
isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
size += int(allocator.size)
}
return size - c.Used()
}
func (c *MetaAllocator) EnableMetrics() {}
// DryRun returns a random allocator
func (c *MetaAllocator) DryRun() Interface {
c.muTree.Lock()
defer c.muTree.Unlock()
isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) {
return allocator.DryRun()
}
return &Allocator{}
}
func isReady(serviceCIDR *networkingv1alpha1.ServiceCIDR) bool {
if serviceCIDR == nil {
return false
}
for _, condition := range serviceCIDR.Status.Conditions {
if condition.Type == networkingv1alpha1.ServiceCIDRConditionReady {
return condition.Status == metav1.ConditionStatus(metav1.ConditionTrue)
}
}
// if condition is not found assume it is ready to handle scenarios
// where the kube-controller-manager is not running
return true
}
// ipToAddr converts a net.IP to a netip.Addr
// if the net.IP is not valid it returns an empty netip.Addr{}
func ipToAddr(ip net.IP) netip.Addr {
// https://pkg.go.dev/net/netip#AddrFromSlice can return an IPv4 in IPv6 format
// so we have to check the IP family to return exactly the format that we want
// address, _ := netip.AddrFromSlice(net.ParseIPSloppy(192.168.0.1)) returns
// an address like ::ffff:192.168.0.1/32
bytes := ip.To4()
if bytes == nil {
bytes = ip.To16()
}
// AddrFromSlice returns Addr{}, false if the input is invalid.
address, _ := netip.AddrFromSlice(bytes)
return address
}

View File

@ -0,0 +1,488 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ipallocator
import (
"context"
"fmt"
"testing"
"time"
networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
k8stesting "k8s.io/client-go/testing"
netutils "k8s.io/utils/net"
)
func newTestMetaAllocator() (*MetaAllocator, error) {
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0*time.Second)
serviceCIDRInformer := informerFactory.Networking().V1alpha1().ServiceCIDRs()
serviceCIDRStore := serviceCIDRInformer.Informer().GetIndexer()
serviceCIDRInformer.Informer().HasSynced()
ipInformer := informerFactory.Networking().V1alpha1().IPAddresses()
ipStore := ipInformer.Informer().GetIndexer()
client.PrependReactor("create", "servicecidrs", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
cidr := action.(k8stesting.CreateAction).GetObject().(*networkingv1alpha1.ServiceCIDR)
_, exists, err := serviceCIDRStore.GetByKey(cidr.Name)
if exists && err != nil {
return false, nil, fmt.Errorf("cidr already exist")
}
cidr.Generation = 1
err = serviceCIDRStore.Add(cidr)
return false, cidr, err
}))
client.PrependReactor("delete", "servicecidrs", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
name := action.(k8stesting.DeleteAction).GetName()
obj, exists, err := serviceCIDRStore.GetByKey(name)
cidr := &networkingv1alpha1.ServiceCIDR{}
if exists && err == nil {
cidr = obj.(*networkingv1alpha1.ServiceCIDR)
err = serviceCIDRStore.Delete(cidr)
}
return false, cidr, err
}))
client.PrependReactor("create", "ipaddresses", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
ip := action.(k8stesting.CreateAction).GetObject().(*networkingv1alpha1.IPAddress)
_, exists, err := ipStore.GetByKey(ip.Name)
if exists && err != nil {
return false, nil, fmt.Errorf("ip already exist")
}
ip.Generation = 1
err = ipStore.Add(ip)
return false, ip, err
}))
client.PrependReactor("delete", "ipaddresses", k8stesting.ReactionFunc(func(action k8stesting.Action) (bool, runtime.Object, error) {
name := action.(k8stesting.DeleteAction).GetName()
obj, exists, err := ipStore.GetByKey(name)
ip := &networkingv1alpha1.IPAddress{}
if exists && err == nil {
ip = obj.(*networkingv1alpha1.IPAddress)
err = ipStore.Delete(ip)
}
return false, ip, err
}))
c, err := NewMetaAllocator(client.NetworkingV1alpha1(), serviceCIDRInformer, ipInformer, false)
if err != nil {
return nil, err
}
// we can not force the state of the informers to be synced without racing
// so we run our worker here
go wait.Until(c.runWorker, time.Second, c.internalStopCh)
return c, nil
}
func TestCIDRAllocateMultiple(t *testing.T) {
r, err := newTestMetaAllocator()
if err != nil {
t.Fatal(err)
}
defer r.Destroy()
if f := r.Free(); f != 0 {
t.Errorf("free: %d", f)
}
if _, err := r.AllocateNext(); err == nil {
t.Error(err)
}
cidr := newServiceCIDR("test", "192.168.0.0/28")
_, err = r.client.ServiceCIDRs().Create(context.Background(), cidr, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
r.addServiceCIDR(cidr)
// wait for the cidr to be processed and set the informer synced
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"))
if err != nil {
t.Logf("unexpected error %v", err)
return false, nil
}
allocator.ipAddressSynced = func() bool { return true }
return allocator.ready.Load(), nil
})
if err != nil {
t.Fatal(err)
}
found := sets.NewString()
count := 0
for r.Free() > 0 {
ip, err := r.AllocateNext()
if err != nil {
t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err)
}
count++
if found.Has(ip.String()) {
t.Fatalf("allocated %s twice: %d", ip, count)
}
found.Insert(ip.String())
}
if count != 14 {
t.Fatalf("expected 14 IPs got %d", count)
}
if _, err := r.AllocateNext(); err == nil {
t.Fatal(err)
}
cidr2 := newServiceCIDR("test2", "10.0.0.0/28")
_, err = r.client.ServiceCIDRs().Create(context.Background(), cidr2, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
r.addServiceCIDR(cidr2)
// wait for the cidr to be processed
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("10.0.0.11"))
if err != nil {
return false, nil
}
allocator.ipAddressSynced = func() bool { return true }
return allocator.ready.Load(), nil
})
if err != nil {
t.Fatal(err)
}
// allocate one IP from the new allocator
err = r.Allocate(netutils.ParseIPSloppy("10.0.0.11"))
if err != nil {
t.Fatalf("error allocating IP 10.0.0.11 from new allocator: %v", err)
}
count++
for r.Free() > 0 {
ip, err := r.AllocateNext()
if err != nil {
t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err)
}
count++
if found.Has(ip.String()) {
t.Fatalf("allocated %s twice: %d", ip, count)
}
found.Insert(ip.String())
}
if count != 28 {
t.Fatalf("expected 28 IPs got %d", count)
}
if _, err := r.AllocateNext(); err == nil {
t.Fatal(err)
}
}
func TestCIDRAllocateShadow(t *testing.T) {
r, err := newTestMetaAllocator()
if err != nil {
t.Fatal(err)
}
defer r.Destroy()
if f := r.Free(); f != 0 {
t.Errorf("free: %d", f)
}
if _, err := r.AllocateNext(); err == nil {
t.Error(err)
}
cidr := newServiceCIDR("test", "192.168.1.0/24")
_, err = r.client.ServiceCIDRs().Create(context.Background(), cidr, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
r.addServiceCIDR(cidr)
// wait for the cidr to be processed and set the informer synced
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.1.0"))
if err != nil {
return false, nil
}
allocator.ipAddressSynced = func() bool { return true }
return allocator.ready.Load(), nil
})
if err != nil {
t.Fatal(err)
}
// allocate one IP from the new allocator
err = r.Allocate(netutils.ParseIPSloppy("192.168.1.0"))
if err == nil {
t.Fatalf("unexpected allocation for IP 192.168.1.0")
}
if f := r.Used(); f != 0 {
t.Errorf("used: %d", f)
}
cidr2 := newServiceCIDR("test2", "192.168.0.0/16")
_, err = r.client.ServiceCIDRs().Create(context.Background(), cidr2, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
r.addServiceCIDR(cidr2)
// wait for the cidr to be processed
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.0"))
if err != nil {
return false, nil
}
allocator.ipAddressSynced = func() bool { return true }
return allocator.ready.Load(), nil
})
if err != nil {
t.Fatal(err)
}
// allocate one IP from the new allocator
err = r.Allocate(netutils.ParseIPSloppy("192.168.1.0"))
if err != nil {
t.Fatalf("error allocating IP 192.168.1.0 from new allocator: %v", err)
}
if f := r.Used(); f != 1 {
t.Errorf("used: %d", f)
}
}
func TestCIDRAllocateGrow(t *testing.T) {
r, err := newTestMetaAllocator()
if err != nil {
t.Fatal(err)
}
defer r.Destroy()
if f := r.Free(); f != 0 {
t.Errorf("free: %d", f)
}
if _, err := r.AllocateNext(); err == nil {
t.Error(err)
}
cidr := newServiceCIDR("test", "192.168.0.0/28")
_, err = r.client.ServiceCIDRs().Create(context.Background(), cidr, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
r.addServiceCIDR(cidr)
// wait for the cidr to be processed
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"))
if err != nil {
return false, nil
}
allocator.ipAddressSynced = func() bool { return true }
return allocator.ready.Load(), nil
})
if err != nil {
t.Fatal(err)
}
found := sets.NewString()
count := 0
for r.Free() > 0 {
ip, err := r.AllocateNext()
if err != nil {
t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err)
}
count++
if found.Has(ip.String()) {
t.Fatalf("allocated %s twice: %d", ip, count)
}
found.Insert(ip.String())
}
if count != 14 {
t.Fatalf("expected 14 IPs got %d", count)
}
if _, err := r.AllocateNext(); err == nil {
t.Fatal(err)
}
cidr2 := newServiceCIDR("test2", "192.168.0.0/24")
_, err = r.client.ServiceCIDRs().Create(context.Background(), cidr2, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
r.addServiceCIDR(cidr2)
// wait for the cidr to be processed
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.253"))
if err != nil {
return false, nil
}
allocator.ipAddressSynced = func() bool { return true }
return allocator.ready.Load(), nil
})
if err != nil {
t.Fatal(err)
}
for r.Free() > 0 {
ip, err := r.AllocateNext()
if err != nil {
t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err)
}
count++
if found.Has(ip.String()) {
t.Fatalf("allocated %s twice: %d", ip, count)
}
found.Insert(ip.String())
}
if count != 254 {
t.Fatalf("expected 254 IPs got %d", count)
}
if _, err := r.AllocateNext(); err == nil {
t.Fatal(err)
}
}
func TestCIDRAllocateShrink(t *testing.T) {
r, err := newTestMetaAllocator()
if err != nil {
t.Fatal(err)
}
defer r.Destroy()
if f := r.Free(); f != 0 {
t.Errorf("free: %d", f)
}
if _, err := r.AllocateNext(); err == nil {
t.Error(err)
}
cidr := newServiceCIDR("test", "192.168.0.0/24")
_, err = r.client.ServiceCIDRs().Create(context.Background(), cidr, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
r.addServiceCIDR(cidr)
// wait for the cidr to be processed
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"))
if err != nil {
return false, nil
}
allocator.ipAddressSynced = func() bool { return true }
return allocator.ready.Load(), nil
})
if err != nil {
t.Fatal(err)
}
found := sets.NewString()
count := 0
for r.Free() > 0 {
ip, err := r.AllocateNext()
if err != nil {
t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err)
}
count++
if found.Has(ip.String()) {
t.Fatalf("allocated %s twice: %d", ip, count)
}
found.Insert(ip.String())
}
if count != 254 {
t.Fatalf("expected 254 IPs got %d", count)
}
if _, err := r.AllocateNext(); err == nil {
t.Fatal(err)
}
for _, ip := range found.List() {
err = r.Release(netutils.ParseIPSloppy(ip))
if err != nil {
t.Fatalf("unexpected error releasing ip %s", err)
}
}
if r.Used() > 0 {
t.Fatalf("expected allocator to be empty, got %d", r.Free())
}
cidr2 := newServiceCIDR("cidr2", "192.168.0.0/28")
_, err = r.client.ServiceCIDRs().Create(context.Background(), cidr2, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)
}
r.addServiceCIDR(cidr2)
err = r.client.ServiceCIDRs().Delete(context.Background(), cidr.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatal(err)
}
r.deleteServiceCIDR(cidr)
// wait for the cidr to be processed (delete ServiceCIDR)
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
_, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.253"))
if err != nil {
return true, nil
}
return false, nil
})
if err != nil {
t.Fatal(err)
}
// wait for the cidr to be processed (create ServiceCIDR)
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"))
if err != nil {
return false, nil
}
allocator.ipAddressSynced = func() bool { return true }
return allocator.ready.Load(), nil
})
if err != nil {
t.Fatal(err)
}
count = 0
for r.Free() > 0 {
_, err := r.AllocateNext()
if err != nil {
t.Fatalf("error @ free: %d count: %d: %v", r.Free(), count, err)
}
count++
}
if count != 14 {
t.Fatalf("expected 14 IPs got %d", count)
}
if _, err := r.AllocateNext(); err == nil {
t.Fatal(err)
}
}
func newServiceCIDR(name, cidrV4 string) *networkingv1alpha1.ServiceCIDR {
return &networkingv1alpha1.ServiceCIDR{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: networkingv1alpha1.ServiceCIDRSpec{
IPv4: cidrV4,
},
Status: networkingv1alpha1.ServiceCIDRStatus{
Conditions: []metav1.Condition{
{
Type: string(networkingv1alpha1.ServiceCIDRConditionReady),
Status: metav1.ConditionTrue,
},
},
},
}
}

View File

@ -45,6 +45,7 @@ var (
ErrFull = errors.New("range is full")
ErrAllocated = errors.New("provided IP is already allocated")
ErrMismatchedNetwork = errors.New("the provided network does not match the current range")
ErrNotReady = errors.New("allocator not ready")
)
type ErrNotInRange struct {

View File

@ -24,6 +24,7 @@ import (
"math/rand"
"net"
"net/netip"
"sync/atomic"
"time"
networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
@ -58,6 +59,9 @@ type Allocator struct {
client networkingv1alpha1client.NetworkingV1alpha1Interface
ipAddressLister networkingv1alpha1listers.IPAddressLister
ipAddressSynced cache.InformerSynced
// ready indicates if the allocator is able to allocate new IP addresses.
// This is required because it depends on the ServiceCIDR to be ready.
ready atomic.Bool
// metrics is a metrics recorder that can be disabled
metrics metricsRecorderInterface
@ -133,7 +137,7 @@ func NewIPAllocator(
metricLabel: cidr.String(),
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
a.ready.Store(true)
return a, nil
}
@ -185,8 +189,8 @@ func (a *Allocator) AllocateService(svc *api.Service, ip net.IP) error {
}
func (a *Allocator) allocateService(svc *api.Service, ip net.IP, dryRun bool) error {
if !a.ipAddressSynced() {
return fmt.Errorf("allocator not ready")
if !a.ready.Load() || !a.ipAddressSynced() {
return ErrNotReady
}
addr, err := netip.ParseAddr(ip.String())
if err != nil {
@ -227,8 +231,8 @@ func (a *Allocator) AllocateNextService(svc *api.Service) (net.IP, error) {
// falls back to the lower subnet.
// It starts allocating from a random IP within each range.
func (a *Allocator) allocateNextService(svc *api.Service, dryRun bool) (net.IP, error) {
if !a.ipAddressSynced() {
return nil, fmt.Errorf("allocator not ready")
if !a.ready.Load() || !a.ipAddressSynced() {
return nil, ErrNotReady
}
if dryRun {
// Don't bother finding a free value. It's racy and not worth the
@ -348,9 +352,6 @@ func (a *Allocator) Release(ip net.IP) error {
}
func (a *Allocator) release(ip net.IP, dryRun bool) error {
if !a.ipAddressSynced() {
return fmt.Errorf("allocator not ready")
}
if dryRun {
return nil
}
@ -403,7 +404,7 @@ func (a *Allocator) IPFamily() api.IPFamily {
return a.family
}
// for testing
// for testing, it assumes this is the allocator is unique for the ipFamily
func (a *Allocator) Used() int {
ipLabelSelector := labels.Set(map[string]string{
networkingv1alpha1.LabelIPAddressFamily: string(a.IPFamily()),
@ -416,7 +417,7 @@ func (a *Allocator) Used() int {
return len(ips)
}
// for testing
// for testing, it assumes this is the allocator is unique for the ipFamily
func (a *Allocator) Free() int {
return int(a.size) - a.Used()
}