replace the iptree on the ipallocator

ServiceCIDRs are protected by finalizers and the CIDRs fields are
inmutable once set, only the readiness state impact the allocator
as it can only allocate IPs if any of the ServiceCIDR is ready.

The Add/Update events triggers a reconcilation of the current state
of the ServiceCIDR present in the informers with the existing IP
allocators.

The Delete events are handled directly to update or delete the
corresponing IP allocator.
This commit is contained in:
Antonio Ojea 2024-05-21 08:14:18 +00:00
parent b5cfccbca7
commit b04ca186d8
4 changed files with 299 additions and 191 deletions

View File

@ -83,7 +83,7 @@ func ContainsAddress(serviceCIDRLister networkinglisters.ServiceCIDRLister, addr
for _, serviceCIDR := range serviceCIDRList { for _, serviceCIDR := range serviceCIDRList {
for _, cidr := range serviceCIDR.Spec.CIDRs { for _, cidr := range serviceCIDR.Spec.CIDRs {
if prefix, err := netip.ParsePrefix(cidr); err == nil { // it can not fail since is already validated if prefix, err := netip.ParsePrefix(cidr); err == nil { // it can not fail since is already validated
if prefixContainsIP(prefix, address) { if PrefixContainsIP(prefix, address) {
result = append(result, serviceCIDR) result = append(result, serviceCIDR)
} }
} }
@ -92,12 +92,12 @@ func ContainsAddress(serviceCIDRLister networkinglisters.ServiceCIDRLister, addr
return result return result
} }
// prefixContainsIP returns true if the given IP is contained with the prefix, // PrefixContainsIP returns true if the given IP is contained with the prefix,
// is not the network address and also, if IPv4, is not the broadcast address. // is not the network address and also, if IPv4, is not the broadcast address.
// This is required (rather than just `prefix.Contains(ip)`) because a ServiceCIDR // This is required (rather than just `prefix.Contains(ip)`) because a ServiceCIDR
// covering prefix will not allocate those IPs, so a service with one of those IPs // covering prefix will not allocate those IPs, so a service with one of those IPs
// can't belong to that ServiceCIDR. // can't belong to that ServiceCIDR.
func prefixContainsIP(prefix netip.Prefix, ip netip.Addr) bool { func PrefixContainsIP(prefix netip.Prefix, ip netip.Addr) bool {
// if the IP is the network address is not contained // if the IP is the network address is not contained
if prefix.Masked().Addr() == ip { if prefix.Masked().Addr() == ip {
return false return false

View File

@ -640,7 +640,7 @@ func Test_PrefixContainIP(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
if got := prefixContainsIP(tt.prefix, tt.ip); got != tt.want { if got := PrefixContainsIP(tt.prefix, tt.ip); got != tt.want {
t.Errorf("prefixContainIP() = %v, want %v", got, tt.want) t.Errorf("prefixContainIP() = %v, want %v", got, tt.want)
} }
}) })

View File

@ -27,7 +27,6 @@ import (
networkingv1alpha1 "k8s.io/api/networking/v1alpha1" networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -37,16 +36,16 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/api/servicecidr"
api "k8s.io/kubernetes/pkg/apis/core" api "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/util/iptree"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
) )
// MetaAllocator maintains a Tree with the ServiceCIDRs containing an IP Allocator // MetaAllocator maintains a structure with IP alloctors for the corresponding ServiceCIDRs.
// on the nodes. Since each allocator doesn't stored the IPAddresses because it reads // CIDR overlapping is allowed and the MetaAllocator should take this into consideration.
// them from the informer cache, it is cheap to create and delete IP Allocators. // Each allocator doesn't stored the IPAddresses, instead it reads them from the informer
// MetaAllocator forwards the request to any of the internal allocators that has free // cache, it is cheap to create and delete IP Allocators.
// addresses. // MetaAllocator use any READY allocator to Allocate IP addresses that has available IPs.
// MetaAllocator implements current allocator interface using // MetaAllocator implements current allocator interface using
// ServiceCIDR and IPAddress API objects. // ServiceCIDR and IPAddress API objects.
@ -61,10 +60,20 @@ type MetaAllocator struct {
internalStopCh chan struct{} internalStopCh chan struct{}
muTree sync.Mutex // allocators is a map indexed by the network prefix
tree *iptree.Tree[*Allocator] // Multiple ServiceCIDR can contain the same network prefix
// so we need to store the references from each allocators to
// the corresponding ServiceCIDRs
mu sync.Mutex
allocators map[string]*item
ipFamily api.IPFamily ipFamily api.IPFamily
metrics bool // enable the metrics collection
}
type item struct {
allocator *Allocator
serviceCIDRs sets.Set[string] // reference of the serviceCIDRs using this Allocator
} }
var _ Interface = &MetaAllocator{} var _ Interface = &MetaAllocator{}
@ -79,6 +88,17 @@ func NewMetaAllocator(
isIPv6 bool, isIPv6 bool,
) (*MetaAllocator, error) { ) (*MetaAllocator, error) {
c := newMetaAllocator(client, serviceCIDRInformer, ipAddressInformer, isIPv6)
go c.run()
return c, nil
}
// newMetaAllocator is used to build the allocator for testing
func newMetaAllocator(client networkingv1alpha1client.NetworkingV1alpha1Interface,
serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer,
ipAddressInformer networkingv1alpha1informers.IPAddressInformer,
isIPv6 bool,
) *MetaAllocator {
// TODO: make the NewMetaAllocator agnostic of the IP family // TODO: make the NewMetaAllocator agnostic of the IP family
family := api.IPv4Protocol family := api.IPv4Protocol
if isIPv6 { if isIPv6 {
@ -97,46 +117,77 @@ func NewMetaAllocator(
workqueue.TypedRateLimitingQueueConfig[string]{Name: ControllerName}, workqueue.TypedRateLimitingQueueConfig[string]{Name: ControllerName},
), ),
internalStopCh: make(chan struct{}), internalStopCh: make(chan struct{}),
tree: iptree.New[*Allocator](), allocators: make(map[string]*item),
ipFamily: family, ipFamily: family,
metrics: false,
} }
_, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ _, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addServiceCIDR, AddFunc: c.enqueServiceCIDR,
UpdateFunc: c.updateServiceCIDR, UpdateFunc: func(old, new interface{}) {
c.enqueServiceCIDR(new)
},
// Process the deletion directly in the handler to be able to use the object fields
// without having to cache them. ServiceCIDRs are protected by finalizers
// so the "started deletion" logic will be handled in the reconcile loop.
DeleteFunc: c.deleteServiceCIDR, DeleteFunc: c.deleteServiceCIDR,
}) })
go c.run() return c
return c, nil
} }
func (c *MetaAllocator) addServiceCIDR(obj interface{}) { func (c *MetaAllocator) enqueServiceCIDR(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj) key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil { if err == nil {
c.queue.Add(key) c.queue.Add(key)
} }
} }
func (c *MetaAllocator) updateServiceCIDR(old, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
c.queue.Add(key)
}
}
func (c *MetaAllocator) deleteServiceCIDR(obj interface{}) { func (c *MetaAllocator) deleteServiceCIDR(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) serviceCIDR, ok := obj.(*networkingv1alpha1.ServiceCIDR)
if err == nil { if !ok {
c.queue.Add(key) tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
serviceCIDR, ok = tombstone.Obj.(*networkingv1alpha1.ServiceCIDR)
if !ok {
return
}
}
klog.Infof("deleting ClusterIP allocator for Service CIDR %v", serviceCIDR)
c.mu.Lock()
defer c.mu.Unlock()
for _, cidr := range serviceCIDR.Spec.CIDRs {
// skip IP families not supported by this MetaAllocator
if c.ipFamily != api.IPFamily(convertToV1IPFamily(netutils.IPFamilyOfCIDRString(cidr))) {
continue
}
// get the Allocator used by this ServiceCIDR
v, ok := c.allocators[cidr]
if !ok {
continue
}
// remove the reference to this ServiceCIDR
v.serviceCIDRs.Delete(serviceCIDR.Name)
if v.serviceCIDRs.Len() > 0 {
klog.V(2).Infof("deleted Service CIDR from allocator %s, remaining %v", cidr, v.serviceCIDRs)
} else {
// if there are no references to this Allocator
// destroy and remove it from the map
v.allocator.Destroy()
delete(c.allocators, cidr)
klog.Infof("deleted ClusterIP allocator for Service CIDR %s", cidr)
}
} }
} }
func (c *MetaAllocator) run() { func (c *MetaAllocator) run() {
defer runtime.HandleCrash() defer runtime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
klog.Info("Starting ServiceCIDR Allocator Controller") klog.Info("starting ServiceCIDR Allocator Controller")
defer klog.Info("Stopping ServiceCIDR Allocator Controllerr") defer klog.Info("stopping ServiceCIDR Allocator Controller")
// Wait for all involved caches to be synced, before processing items from the queue is started // Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(c.internalStopCh, c.serviceCIDRSynced, c.ipAddressSynced) { if !cache.WaitForCacheSync(c.internalStopCh, c.serviceCIDRSynced, c.ipAddressSynced) {
@ -162,12 +213,11 @@ func (c *MetaAllocator) processNextItem() bool {
return false return false
} }
defer c.queue.Done(key) defer c.queue.Done(key)
err := c.syncAllocators()
err := c.syncTree()
// Handle the error if something went wrong during the execution of the business logic // Handle the error if something went wrong during the execution of the business logic
if err != nil { if err != nil {
if c.queue.NumRequeues(key) < 5 { if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing cidr %v: %v", key, err) klog.Infof("error syncing cidr %v: %v", key, err)
c.queue.AddRateLimited(key) c.queue.AddRateLimited(key)
return true return true
} }
@ -176,113 +226,106 @@ func (c *MetaAllocator) processNextItem() bool {
return true return true
} }
// syncTree syncs the ipTrees from the informer cache // syncAllocators adds new allocators and syncs the ready state of the allocators
// It deletes or creates allocator and sets the corresponding state // deletion of allocators is handled directly on the event handler.
func (c *MetaAllocator) syncTree() error { func (c *MetaAllocator) syncAllocators() error {
now := time.Now() start := time.Now()
klog.V(2).Info("syncing ServiceCIDR allocators")
defer func() { defer func() {
klog.V(2).Infof("Finished sync for CIDRs took %v", time.Since(now)) klog.V(2).Infof("syncing ServiceCIDR allocators took: %v", time.Since(start))
}() }()
c.mu.Lock()
defer c.mu.Unlock()
serviceCIDRs, err := c.serviceCIDRLister.List(labels.Everything()) serviceCIDRs, err := c.serviceCIDRLister.List(labels.Everything())
if err != nil { if err != nil {
return err return err
} }
cidrsSet := sets.New[string]()
cidrReady := map[string]bool{}
for _, serviceCIDR := range serviceCIDRs { for _, serviceCIDR := range serviceCIDRs {
for _, cidr := range serviceCIDR.Spec.CIDRs {
// skip IP families not supported by this MetaAllocator
if c.ipFamily != api.IPFamily(convertToV1IPFamily(netutils.IPFamilyOfCIDRString(cidr))) {
continue
}
// the readiness state of an allocator is an OR of all readiness states
ready := true ready := true
if !isReady(serviceCIDR) || !serviceCIDR.DeletionTimestamp.IsZero() { if !isReady(serviceCIDR) || !serviceCIDR.DeletionTimestamp.IsZero() {
ready = false ready = false
} }
for _, cidr := range serviceCIDR.Spec.CIDRs { // check if an allocator already exist for this CIDR
if c.ipFamily == api.IPFamily(convertToV1IPFamily(netutils.IPFamilyOfCIDRString(cidr))) { v, ok := c.allocators[cidr]
cidrsSet.Insert(cidr) // Update allocator with ServiceCIDR
cidrReady[cidr] = ready if ok {
} v.serviceCIDRs.Insert(serviceCIDR.Name)
// an Allocator is ready if at least one of the ServiceCIDRs is ready
if ready {
v.allocator.ready.Store(true)
} else if v.serviceCIDRs.Has(serviceCIDR.Name) && len(v.serviceCIDRs) == 1 {
v.allocator.ready.Store(false)
} }
klog.Infof("updated ClusterIP allocator for Service CIDR %s", cidr)
continue
} }
// obtain the existing allocators and set the existing state // Create new allocator for ServiceCIDR
treeSet := sets.New[string]() _, ipnet, err := netutils.ParseCIDRSloppy(cidr) // this was already validated
c.muTree.Lock()
c.tree.DepthFirstWalk(c.ipFamily == api.IPv6Protocol, func(k netip.Prefix, v *Allocator) bool {
v.ready.Store(cidrReady[k.String()])
treeSet.Insert(k.String())
return false
})
c.muTree.Unlock()
cidrsToRemove := treeSet.Difference(cidrsSet)
cidrsToAdd := cidrsSet.Difference(treeSet)
errs := []error{}
// Add new allocators
for _, cidr := range cidrsToAdd.UnsortedList() {
_, ipnet, err := netutils.ParseCIDRSloppy(cidr)
if err != nil { if err != nil {
return err klog.Infof("error parsing cidr %s", cidr)
continue
} }
// New ServiceCIDR, create new allocator // New ServiceCIDR, create new allocator
allocator, err := NewIPAllocator(ipnet, c.client, c.ipAddressInformer) allocator, err := NewIPAllocator(ipnet, c.client, c.ipAddressInformer)
if err != nil { if err != nil {
errs = append(errs, err) klog.Infof("error creating new IPAllocator for Service CIDR %s", cidr)
continue continue
} }
allocator.ready.Store(cidrReady[cidr]) if c.metrics {
prefix, err := netip.ParsePrefix(cidr) allocator.EnableMetrics()
if err != nil {
return err
} }
c.addAllocator(prefix, allocator) allocator.ready.Store(ready)
klog.Infof("Created ClusterIP allocator for Service CIDR %s", cidr) c.allocators[cidr] = &item{
allocator: allocator,
serviceCIDRs: sets.New[string](serviceCIDR.Name),
} }
// Remove allocators that no longer exist klog.Infof("created ClusterIP allocator for Service CIDR %s", cidr)
for _, cidr := range cidrsToRemove.UnsortedList() {
prefix, err := netip.ParsePrefix(cidr)
if err != nil {
return err
} }
c.deleteAllocator(prefix)
} }
return nil
return utilerrors.NewAggregate(errs)
} }
func (c *MetaAllocator) getAllocator(ip net.IP) (*Allocator, error) { // getAllocator returns any allocator that contains the IP passed as argument.
c.muTree.Lock() // if ready is set only an allocator that is ready is returned.
defer c.muTree.Unlock() // Allocate operations can work with ANY allocator that is ready, the allocators
// contain references to the IP addresses hence does not matter what allocators have
address := ipToAddr(ip) // the IP. Release operations need to work with ANY allocator independent of its state.
prefix := netip.PrefixFrom(address, address.BitLen()) func (c *MetaAllocator) getAllocator(ip net.IP, ready bool) (*Allocator, error) {
// Use the largest subnet to allocate addresses because c.mu.Lock()
// all the other subnets will be contained. defer c.mu.Unlock()
_, allocator, ok := c.tree.ShortestPrefixMatch(prefix) address := servicecidr.IPToAddr(ip)
if !ok { // use the first allocator that contains the address
for cidr, item := range c.allocators {
prefix, err := netip.ParsePrefix(cidr)
if err != nil {
return nil, err
}
if servicecidr.PrefixContainsIP(prefix, address) {
if !ready {
return item.allocator, nil
}
if item.allocator.ready.Load() {
return item.allocator, nil
}
}
}
klog.V(2).Infof("Could not get allocator for IP %s", ip.String()) klog.V(2).Infof("Could not get allocator for IP %s", ip.String())
return nil, ErrMismatchedNetwork return nil, ErrMismatchedNetwork
}
return allocator, nil
}
func (c *MetaAllocator) addAllocator(cidr netip.Prefix, allocator *Allocator) {
c.muTree.Lock()
defer c.muTree.Unlock()
c.tree.InsertPrefix(cidr, allocator)
}
func (c *MetaAllocator) deleteAllocator(cidr netip.Prefix) {
c.muTree.Lock()
defer c.muTree.Unlock()
ok := c.tree.DeletePrefix(cidr)
if ok {
klog.V(3).Infof("CIDR %s deleted", cidr)
}
} }
func (c *MetaAllocator) AllocateService(service *api.Service, ip net.IP) error { func (c *MetaAllocator) AllocateService(service *api.Service, ip net.IP) error {
allocator, err := c.getAllocator(ip) allocator, err := c.getAllocator(ip, true)
if err != nil { if err != nil {
return err return err
} }
@ -290,7 +333,7 @@ func (c *MetaAllocator) AllocateService(service *api.Service, ip net.IP) error {
} }
func (c *MetaAllocator) Allocate(ip net.IP) error { func (c *MetaAllocator) Allocate(ip net.IP) error {
allocator, err := c.getAllocator(ip) allocator, err := c.getAllocator(ip, true)
if err != nil { if err != nil {
return err return err
} }
@ -298,9 +341,8 @@ func (c *MetaAllocator) Allocate(ip net.IP) error {
} }
func (c *MetaAllocator) AllocateNextService(service *api.Service) (net.IP, error) { func (c *MetaAllocator) AllocateNextService(service *api.Service) (net.IP, error) {
c.muTree.Lock() c.mu.Lock()
defer c.muTree.Unlock() defer c.mu.Unlock()
// TODO(aojea) add strategy to return a random allocator but // TODO(aojea) add strategy to return a random allocator but
// taking into consideration the number of addresses of each allocator. // taking into consideration the number of addresses of each allocator.
// Per example, if we have allocator A and B with 256 and 1024 possible // Per example, if we have allocator A and B with 256 and 1024 possible
@ -308,8 +350,11 @@ func (c *MetaAllocator) AllocateNextService(service *api.Service) (net.IP, error
// get A so we can spread the load of IPs randomly. // get A so we can spread the load of IPs randomly.
// However, we need to validate the best strategy before going to Beta. // However, we need to validate the best strategy before going to Beta.
isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol) isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) { for cidr, item := range c.allocators {
ip, err := allocator.AllocateNextService(service) if netutils.IsIPv6CIDRString(cidr) != isIPv6 {
continue
}
ip, err := item.allocator.AllocateNextService(service)
if err == nil { if err == nil {
return ip, nil return ip, nil
} }
@ -318,9 +363,8 @@ func (c *MetaAllocator) AllocateNextService(service *api.Service) (net.IP, error
} }
func (c *MetaAllocator) AllocateNext() (net.IP, error) { func (c *MetaAllocator) AllocateNext() (net.IP, error) {
c.muTree.Lock() c.mu.Lock()
defer c.muTree.Unlock() defer c.mu.Unlock()
// TODO(aojea) add strategy to return a random allocator but // TODO(aojea) add strategy to return a random allocator but
// taking into consideration the number of addresses of each allocator. // taking into consideration the number of addresses of each allocator.
// Per example, if we have allocator A and B with 256 and 1024 possible // Per example, if we have allocator A and B with 256 and 1024 possible
@ -328,8 +372,11 @@ func (c *MetaAllocator) AllocateNext() (net.IP, error) {
// get A so we can spread the load of IPs randomly. // get A so we can spread the load of IPs randomly.
// However, we need to validate the best strategy before going to Beta. // However, we need to validate the best strategy before going to Beta.
isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol) isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol)
for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) { for cidr, item := range c.allocators {
ip, err := allocator.AllocateNext() if netutils.IsIPv6String(cidr) != isIPv6 {
continue
}
ip, err := item.allocator.AllocateNext()
if err == nil { if err == nil {
return ip, nil return ip, nil
} }
@ -338,7 +385,7 @@ func (c *MetaAllocator) AllocateNext() (net.IP, error) {
} }
func (c *MetaAllocator) Release(ip net.IP) error { func (c *MetaAllocator) Release(ip net.IP) error {
allocator, err := c.getAllocator(ip) allocator, err := c.getAllocator(ip, false)
if err != nil { if err != nil {
return err return err
} }
@ -367,7 +414,7 @@ func (c *MetaAllocator) IPFamily() api.IPFamily {
return c.ipFamily return c.ipFamily
} }
func (c *MetaAllocator) Has(ip net.IP) bool { func (c *MetaAllocator) Has(ip net.IP) bool {
allocator, err := c.getAllocator(ip) allocator, err := c.getAllocator(ip, true)
if err != nil { if err != nil {
return false return false
} }
@ -396,26 +443,48 @@ func (c *MetaAllocator) Used() int {
// for testing // for testing
func (c *MetaAllocator) Free() int { func (c *MetaAllocator) Free() int {
c.muTree.Lock() c.mu.Lock()
defer c.muTree.Unlock() defer c.mu.Unlock()
size := 0 size := 0
isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol) prefixes := []netip.Prefix{}
for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) { // Get all the existing prefixes
size += int(allocator.size) for cidr := range c.allocators {
prefix, err := netip.ParsePrefix(cidr)
if err != nil {
continue
}
prefixes = append(prefixes, prefix)
}
// only count the top level prefixes to not double count
for _, prefix := range prefixes {
if !isNotContained(prefix, prefixes) {
continue
}
v, ok := c.allocators[prefix.String()]
if !ok {
continue
}
size += int(v.allocator.size)
} }
return size - c.Used() return size - c.Used()
} }
func (c *MetaAllocator) EnableMetrics() {} func (c *MetaAllocator) EnableMetrics() {
c.mu.Lock()
defer c.mu.Unlock()
c.metrics = true
for _, item := range c.allocators {
item.allocator.EnableMetrics()
}
}
// DryRun returns a random allocator // DryRun returns a random allocator
func (c *MetaAllocator) DryRun() Interface { func (c *MetaAllocator) DryRun() Interface {
c.muTree.Lock() c.mu.Lock()
defer c.muTree.Unlock() defer c.mu.Unlock()
isIPv6 := c.ipFamily == api.IPFamily(v1.IPv6Protocol) for _, item := range c.allocators {
for _, allocator := range c.tree.TopLevelPrefixes(isIPv6) { return item.allocator.DryRun()
return allocator.DryRun()
} }
return &Allocator{} return &Allocator{}
} }
@ -434,22 +503,6 @@ func isReady(serviceCIDR *networkingv1alpha1.ServiceCIDR) bool {
return true return true
} }
// ipToAddr converts a net.IP to a netip.Addr
// if the net.IP is not valid it returns an empty netip.Addr{}
func ipToAddr(ip net.IP) netip.Addr {
// https://pkg.go.dev/net/netip#AddrFromSlice can return an IPv4 in IPv6 format
// so we have to check the IP family to return exactly the format that we want
// address, _ := netip.AddrFromSlice(net.ParseIPSloppy(192.168.0.1)) returns
// an address like ::ffff:192.168.0.1/32
bytes := ip.To4()
if bytes == nil {
bytes = ip.To16()
}
// AddrFromSlice returns Addr{}, false if the input is invalid.
address, _ := netip.AddrFromSlice(bytes)
return address
}
// Convert netutils.IPFamily to v1.IPFamily // Convert netutils.IPFamily to v1.IPFamily
// TODO: consolidate helpers // TODO: consolidate helpers
// copied from pkg/proxy/util/utils.go // copied from pkg/proxy/util/utils.go
@ -463,3 +516,19 @@ func convertToV1IPFamily(ipFamily netutils.IPFamily) v1.IPFamily {
return v1.IPFamilyUnknown return v1.IPFamilyUnknown
} }
// isNotContained returns true if the prefix is not contained in any
// of the passed prefixes.
func isNotContained(prefix netip.Prefix, prefixes []netip.Prefix) bool {
for _, p := range prefixes {
// skip same prefix
if prefix == p {
continue
}
// 192.168.0.0/24 is contained within 192.168.0.0/16
if prefix.Overlaps(p) && prefix.Bits() >= p.Bits() {
return false
}
}
return true
}

View File

@ -19,6 +19,7 @@ package ipallocator
import ( import (
"context" "context"
"fmt" "fmt"
"net/netip"
"testing" "testing"
"time" "time"
@ -39,7 +40,6 @@ func newTestMetaAllocator() (*MetaAllocator, error) {
informerFactory := informers.NewSharedInformerFactory(client, 0*time.Second) informerFactory := informers.NewSharedInformerFactory(client, 0*time.Second)
serviceCIDRInformer := informerFactory.Networking().V1alpha1().ServiceCIDRs() serviceCIDRInformer := informerFactory.Networking().V1alpha1().ServiceCIDRs()
serviceCIDRStore := serviceCIDRInformer.Informer().GetIndexer() serviceCIDRStore := serviceCIDRInformer.Informer().GetIndexer()
serviceCIDRInformer.Informer().HasSynced()
ipInformer := informerFactory.Networking().V1alpha1().IPAddresses() ipInformer := informerFactory.Networking().V1alpha1().IPAddresses()
ipStore := ipInformer.Informer().GetIndexer() ipStore := ipInformer.Informer().GetIndexer()
@ -85,13 +85,11 @@ func newTestMetaAllocator() (*MetaAllocator, error) {
return false, ip, err return false, ip, err
})) }))
c, err := NewMetaAllocator(client.NetworkingV1alpha1(), serviceCIDRInformer, ipInformer, false) c := newMetaAllocator(client.NetworkingV1alpha1(), serviceCIDRInformer, ipInformer, false)
if err != nil {
return nil, err c.serviceCIDRSynced = func() bool { return true }
} c.ipAddressSynced = func() bool { return true }
// we can not force the state of the informers to be synced without racing go c.run()
// so we run our worker here
go wait.Until(c.runWorker, time.Second, c.internalStopCh)
return c, nil return c, nil
} }
@ -114,10 +112,10 @@ func TestCIDRAllocateMultiple(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r.addServiceCIDR(cidr) r.enqueServiceCIDR(cidr)
// wait for the cidr to be processed and set the informer synced // wait for the cidr to be processed and set the informer synced
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1")) allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"), true)
if err != nil { if err != nil {
t.Logf("unexpected error %v", err) t.Logf("unexpected error %v", err)
return false, nil return false, nil
@ -153,10 +151,10 @@ func TestCIDRAllocateMultiple(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r.addServiceCIDR(cidr2) r.enqueServiceCIDR(cidr2)
// wait for the cidr to be processed // wait for the cidr to be processed
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("10.0.0.11")) allocator, err := r.getAllocator(netutils.ParseIPSloppy("10.0.0.11"), true)
if err != nil { if err != nil {
return false, nil return false, nil
} }
@ -211,10 +209,10 @@ func TestCIDRAllocateShadow(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r.addServiceCIDR(cidr) r.enqueServiceCIDR(cidr)
// wait for the cidr to be processed and set the informer synced // wait for the cidr to be processed and set the informer synced
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.1.0")) allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.1.1"), true)
if err != nil { if err != nil {
return false, nil return false, nil
} }
@ -224,7 +222,7 @@ func TestCIDRAllocateShadow(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
// allocate one IP from the new allocator // can not allocate the subnet IP from the new allocator
err = r.Allocate(netutils.ParseIPSloppy("192.168.1.0")) err = r.Allocate(netutils.ParseIPSloppy("192.168.1.0"))
if err == nil { if err == nil {
t.Fatalf("unexpected allocation for IP 192.168.1.0") t.Fatalf("unexpected allocation for IP 192.168.1.0")
@ -239,10 +237,10 @@ func TestCIDRAllocateShadow(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r.addServiceCIDR(cidr2) r.enqueServiceCIDR(cidr2)
// wait for the cidr to be processed // wait for the cidr to be processed
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.0")) allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"), true)
if err != nil { if err != nil {
return false, nil return false, nil
} }
@ -283,10 +281,10 @@ func TestCIDRAllocateGrow(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r.addServiceCIDR(cidr) r.enqueServiceCIDR(cidr)
// wait for the cidr to be processed // wait for the cidr to be processed
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1")) allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"), true)
if err != nil { if err != nil {
return false, nil return false, nil
} }
@ -321,10 +319,10 @@ func TestCIDRAllocateGrow(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r.addServiceCIDR(cidr2) r.enqueServiceCIDR(cidr2)
// wait for the cidr to be processed // wait for the cidr to be processed
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.253")) allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.253"), true)
if err != nil { if err != nil {
return false, nil return false, nil
} }
@ -374,10 +372,10 @@ func TestCIDRAllocateShrink(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r.addServiceCIDR(cidr) r.enqueServiceCIDR(cidr)
// wait for the cidr to be processed // wait for the cidr to be processed
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1")) allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"), true)
if err != nil { if err != nil {
return false, nil return false, nil
} }
@ -420,7 +418,7 @@ func TestCIDRAllocateShrink(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
r.addServiceCIDR(cidr2) r.enqueServiceCIDR(cidr2)
err = r.client.ServiceCIDRs().Delete(context.Background(), cidr.Name, metav1.DeleteOptions{}) err = r.client.ServiceCIDRs().Delete(context.Background(), cidr.Name, metav1.DeleteOptions{})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
@ -429,7 +427,7 @@ func TestCIDRAllocateShrink(t *testing.T) {
// wait for the cidr to be processed (delete ServiceCIDR) // wait for the cidr to be processed (delete ServiceCIDR)
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
_, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.253")) _, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.253"), true)
if err != nil { if err != nil {
return true, nil return true, nil
} }
@ -441,7 +439,7 @@ func TestCIDRAllocateShrink(t *testing.T) {
} }
// wait for the cidr to be processed (create ServiceCIDR) // wait for the cidr to be processed (create ServiceCIDR)
err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) { err = wait.PollUntilContextTimeout(context.Background(), 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1")) allocator, err := r.getAllocator(netutils.ParseIPSloppy("192.168.0.1"), true)
if err != nil { if err != nil {
return false, nil return false, nil
} }
@ -487,3 +485,44 @@ func newServiceCIDR(name, cidr string) *networkingv1alpha1.ServiceCIDR {
}, },
} }
} }
func Test_isNotContained(t *testing.T) {
tests := []struct {
name string
prefix netip.Prefix
prefixes []netip.Prefix
want bool
}{
{
name: "ipv4 not contained nor overlapping",
prefix: netip.MustParsePrefix("192.168.0.0/24"),
prefixes: []netip.Prefix{netip.MustParsePrefix("10.0.0.0/24"), netip.MustParsePrefix("10.0.0.0/27")},
want: true,
},
{
name: "ipv4 not contained but contains",
prefix: netip.MustParsePrefix("10.0.0.0/8"),
prefixes: []netip.Prefix{netip.MustParsePrefix("10.0.0.0/24"), netip.MustParsePrefix("10.0.0.0/27")},
want: true,
},
{
name: "ipv4 not contained but matches existing one",
prefix: netip.MustParsePrefix("10.0.0.0/24"),
prefixes: []netip.Prefix{netip.MustParsePrefix("10.0.0.0/24"), netip.MustParsePrefix("10.0.0.0/27")},
want: true,
},
{
name: "ipv4 contained but matches existing one",
prefix: netip.MustParsePrefix("10.0.0.0/27"),
prefixes: []netip.Prefix{netip.MustParsePrefix("10.0.0.0/24"), netip.MustParsePrefix("10.0.0.0/27")},
want: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := isNotContained(tt.prefix, tt.prefixes); got != tt.want {
t.Errorf("isNotContained() = %v, want %v", got, tt.want)
}
})
}
}