service cidr controller manager: use new ServiceCIDR API

This commit is contained in:
Antonio Ojea 2023-10-29 18:48:52 +00:00
parent 016c3c9e36
commit 3edcce52e3
5 changed files with 82 additions and 111 deletions

View File

@ -557,15 +557,13 @@ func NewControllerDescriptors() map[string]*ControllerDescriptor {
register(newLegacyServiceAccountTokenCleanerControllerDescriptor()) register(newLegacyServiceAccountTokenCleanerControllerDescriptor())
register(newValidatingAdmissionPolicyStatusControllerDescriptor()) register(newValidatingAdmissionPolicyStatusControllerDescriptor())
register(newTaintEvictionControllerDescriptor()) register(newTaintEvictionControllerDescriptor())
register(newServiceCIDRsControllerDescriptor())
for _, alias := range aliases.UnsortedList() { for _, alias := range aliases.UnsortedList() {
if _, ok := controllers[alias]; ok { if _, ok := controllers[alias]; ok {
panic(fmt.Sprintf("alias %q conflicts with a controller name", alias)) panic(fmt.Sprintf("alias %q conflicts with a controller name", alias))
} }
} }
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.MultiCIDRServiceAllocator) {
register(names.ServiceCIDRController, startServiceCIDRsController)
}
return controllers return controllers
} }

View File

@ -93,6 +93,7 @@ func TestControllerNamesDeclaration(t *testing.T) {
names.ResourceClaimController, names.ResourceClaimController,
names.LegacyServiceAccountTokenCleanerController, names.LegacyServiceAccountTokenCleanerController,
names.ValidatingAdmissionPolicyStatusController, names.ValidatingAdmissionPolicyStatusController,
names.ServiceCIDRController,
) )
for _, name := range KnownControllers() { for _, name := range KnownControllers() {

View File

@ -22,11 +22,22 @@ package app
import ( import (
"context" "context"
"k8s.io/component-base/featuregate"
"k8s.io/controller-manager/controller" "k8s.io/controller-manager/controller"
"k8s.io/kubernetes/cmd/kube-controller-manager/names"
"k8s.io/kubernetes/pkg/controller/servicecidrs" "k8s.io/kubernetes/pkg/controller/servicecidrs"
"k8s.io/kubernetes/pkg/features"
) )
func startServiceCIDRsController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { func newServiceCIDRsControllerDescriptor() *ControllerDescriptor {
return &ControllerDescriptor{
name: names.ServiceCIDRController,
initFunc: startServiceCIDRsController,
requiredFeatureGates: []featuregate.Feature{
features.MultiCIDRServiceAllocator,
}}
}
func startServiceCIDRsController(ctx context.Context, controllerContext ControllerContext, controllerName string) (controller.Interface, bool, error) {
go servicecidrs.NewController( go servicecidrs.NewController(
controllerContext.InformerFactory.Networking().V1alpha1().ServiceCIDRs(), controllerContext.InformerFactory.Networking().V1alpha1().ServiceCIDRs(),
controllerContext.InformerFactory.Networking().V1alpha1().IPAddresses(), controllerContext.InformerFactory.Networking().V1alpha1().IPAddresses(),

View File

@ -45,6 +45,7 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/util/iptree" "k8s.io/kubernetes/pkg/util/iptree"
netutils "k8s.io/utils/net"
) )
const ( const (
@ -87,7 +88,6 @@ func NewController(
c.serviceCIDRLister = serviceCIDRInformer.Lister() c.serviceCIDRLister = serviceCIDRInformer.Lister()
c.serviceCIDRsSynced = serviceCIDRInformer.Informer().HasSynced c.serviceCIDRsSynced = serviceCIDRInformer.Informer().HasSynced
// IPAddresses can only block the deletion of the ServiceCIDR that contains it
_, _ = ipAddressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ _, _ = ipAddressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.addIPAddress, AddFunc: c.addIPAddress,
DeleteFunc: c.deleteIPAddress, DeleteFunc: c.deleteIPAddress,
@ -208,31 +208,24 @@ func (c *Controller) deleteIPAddress(obj interface{}) {
// overlappingServiceCIDRs, given a ServiceCIDR return the ServiceCIDRs that contain or are contained, // overlappingServiceCIDRs, given a ServiceCIDR return the ServiceCIDRs that contain or are contained,
// this is required because adding or removing a CIDR will require to recompute the // this is required because adding or removing a CIDR will require to recompute the
// state of each ServiceCIDR to check if can be unblocked on deletion. // state of each ServiceCIDR to check if can be unblocked on deletion.
func (c *Controller) overlappingServiceCIDRs(cidr *networkingapiv1alpha1.ServiceCIDR) []string { func (c *Controller) overlappingServiceCIDRs(serviceCIDR *networkingapiv1alpha1.ServiceCIDR) []string {
c.muTree.Lock() c.muTree.Lock()
defer c.muTree.Unlock() defer c.muTree.Unlock()
serviceCIDRs := sets.New[string]() serviceCIDRs := sets.New[string]()
if prefix, err := netip.ParsePrefix(cidr.Spec.IPv4); err == nil { // if is empty err will not be nil for _, cidr := range serviceCIDR.Spec.CIDRs {
c.tree.WalkPath(prefix, func(k netip.Prefix, v sets.Set[string]) bool { if prefix, err := netip.ParsePrefix(cidr); err == nil { // if is empty err will not be nil
serviceCIDRs.Insert(v.UnsortedList()...) c.tree.WalkPath(prefix, func(k netip.Prefix, v sets.Set[string]) bool {
return false serviceCIDRs.Insert(v.UnsortedList()...)
}) return false
c.tree.WalkPrefix(prefix, func(k netip.Prefix, v sets.Set[string]) bool { })
serviceCIDRs.Insert(v.UnsortedList()...) c.tree.WalkPrefix(prefix, func(k netip.Prefix, v sets.Set[string]) bool {
return false serviceCIDRs.Insert(v.UnsortedList()...)
}) return false
} })
if prefix, err := netip.ParsePrefix(cidr.Spec.IPv6); err == nil { // if is empty err will not be nil }
c.tree.WalkPath(prefix, func(k netip.Prefix, v sets.Set[string]) bool {
serviceCIDRs.Insert(v.UnsortedList()...)
return false
})
c.tree.WalkPrefix(prefix, func(k netip.Prefix, v sets.Set[string]) bool {
serviceCIDRs.Insert(v.UnsortedList()...)
return false
})
} }
return serviceCIDRs.UnsortedList() return serviceCIDRs.UnsortedList()
} }
@ -296,7 +289,7 @@ func (c *Controller) processNext(ctx context.Context) bool {
// syncCIDRs rebuilds the radix tree based from the informers cache // syncCIDRs rebuilds the radix tree based from the informers cache
func (c *Controller) syncCIDRs() error { func (c *Controller) syncCIDRs() error {
cidrList, err := c.serviceCIDRLister.List(labels.Everything()) serviceCIDRList, err := c.serviceCIDRLister.List(labels.Everything())
if err != nil { if err != nil {
return err return err
} }
@ -306,26 +299,20 @@ func (c *Controller) syncCIDRs() error {
// and this is important to determine if a ServiceCIDR can // and this is important to determine if a ServiceCIDR can
// be deleted. // be deleted.
tree := iptree.New[sets.Set[string]]() tree := iptree.New[sets.Set[string]]()
for _, cidr := range cidrList { for _, serviceCIDR := range serviceCIDRList {
if prefix, err := netip.ParsePrefix(cidr.Spec.IPv4); err == nil { // if is empty err will not be nil for _, cidr := range serviceCIDR.Spec.CIDRs {
// if the prefix already exist append the new ServiceCIDR name if prefix, err := netip.ParsePrefix(cidr); err == nil { // if is empty err will not be nil
v, ok := tree.GetPrefix(prefix) // if the prefix already exist append the new ServiceCIDR name
if !ok { v, ok := tree.GetPrefix(prefix)
v = sets.Set[string]{} if !ok {
v = sets.Set[string]{}
}
v.Insert(serviceCIDR.Name)
tree.InsertPrefix(prefix, v)
} }
v.Insert(cidr.Name)
tree.InsertPrefix(prefix, v)
}
if prefix, err := netip.ParsePrefix(cidr.Spec.IPv6); err == nil { // if is empty err will not be nil
// if the prefix already exist append the new ServiceCIDR name
v, ok := tree.GetPrefix(prefix)
if !ok {
v = sets.Set[string]{}
}
v.Insert(cidr.Name)
tree.InsertPrefix(prefix, v)
} }
} }
c.muTree.Lock() c.muTree.Lock()
defer c.muTree.Unlock() defer c.muTree.Unlock()
c.tree = tree c.tree = tree
@ -413,53 +400,43 @@ func (c *Controller) sync(ctx context.Context, key string) error {
} }
// canDeleteCIDR checks that the ServiceCIDR can be safely deleted and not leave orphan IPAddresses // canDeleteCIDR checks that the ServiceCIDR can be safely deleted and not leave orphan IPAddresses
func (c *Controller) canDeleteCIDR(ctx context.Context, cidr *networkingapiv1alpha1.ServiceCIDR) (bool, error) { func (c *Controller) canDeleteCIDR(ctx context.Context, serviceCIDR *networkingapiv1alpha1.ServiceCIDR) (bool, error) {
// TODO(aojea) Revisit the lock usage and if we need to keep it only for the tree operations // TODO(aojea) Revisit the lock usage and if we need to keep it only for the tree operations
// to avoid holding it during the whole operation. // to avoid holding it during the whole operation.
c.muTree.Lock() c.muTree.Lock()
defer c.muTree.Unlock() defer c.muTree.Unlock()
logger := klog.FromContext(ctx) logger := klog.FromContext(ctx)
// Check if there is a subnet that already contains the ServiceCIDR that is going to be deleted. // Check if there is a subnet that already contains the ServiceCIDR that is going to be deleted.
hasParentV4 := true hasParent := true
hasParentV6 := true for _, cidr := range serviceCIDR.Spec.CIDRs {
// Walk the tree to find if there is a larger subnet that contains the existing one, // Walk the tree to find if there is a larger subnet that contains the existing one,
// or there is another ServiceCIDR with the same subnet. // or there is another ServiceCIDR with the same subnet.
if prefix, err := netip.ParsePrefix(cidr.Spec.IPv4); err == nil { if prefix, err := netip.ParsePrefix(cidr); err == nil {
serviceCIDRs := sets.New[string]() serviceCIDRs := sets.New[string]()
c.tree.WalkPath(prefix, func(k netip.Prefix, v sets.Set[string]) bool { c.tree.WalkPath(prefix, func(k netip.Prefix, v sets.Set[string]) bool {
serviceCIDRs.Insert(v.UnsortedList()...) serviceCIDRs.Insert(v.UnsortedList()...)
return false return false
}) })
if serviceCIDRs.Len() == 1 && serviceCIDRs.Has(cidr.Name) { if serviceCIDRs.Len() == 1 && serviceCIDRs.Has(serviceCIDR.Name) {
hasParentV4 = false hasParent = false
} }
}
if prefix, err := netip.ParsePrefix(cidr.Spec.IPv6); err == nil {
serviceCIDRs := sets.New[string]()
c.tree.WalkPath(prefix, func(k netip.Prefix, v sets.Set[string]) bool {
serviceCIDRs.Insert(v.UnsortedList()...)
return false
})
if serviceCIDRs.Len() == 1 && serviceCIDRs.Has(cidr.Name) {
hasParentV6 = false
} }
} }
// All the existing IP addresses will be contained on the parent ServiceCIDRs, // All the existing IP addresses will be contained on the parent ServiceCIDRs,
// it is safe to delete, remove the finalizer. // it is safe to delete, remove the finalizer.
if hasParentV4 && hasParentV6 { if hasParent {
logger.V(2).Info("Removing finalizer for ServiceCIDR", "ServiceCIDR", cidr.String()) logger.V(2).Info("Removing finalizer for ServiceCIDR", "ServiceCIDR", serviceCIDR.String())
return true, nil return true, nil
} }
// TODO: optimize this // TODO: optimize this
// Since current ServiceCIDR does not have another ServiceCIDR containing it, // Since current ServiceCIDR does not have another ServiceCIDR containing it,
// verify there are no existing IPAddresses referencing it that will be orphan. // verify there are no existing IPAddresses referencing it that will be orphan.
if cidr.Spec.IPv4 != "" && !hasParentV4 { for _, cidr := range serviceCIDR.Spec.CIDRs {
// get all the IPv4 addresses // get all the IPv4 addresses
ipLabelSelector := labels.Set(map[string]string{ ipLabelSelector := labels.Set(map[string]string{
networkingapiv1alpha1.LabelIPAddressFamily: string(v1.IPv4Protocol), networkingapiv1alpha1.LabelIPAddressFamily: string(convertToV1IPFamily(netutils.IPFamilyOfCIDRString(cidr))),
networkingapiv1alpha1.LabelManagedBy: ipallocator.ControllerName, networkingapiv1alpha1.LabelManagedBy: ipallocator.ControllerName,
}).AsSelectorPreValidated() }).AsSelectorPreValidated()
ips, err := c.ipAddressLister.List(ipLabelSelector) ips, err := c.ipAddressLister.List(ipLabelSelector)
@ -482,47 +459,16 @@ func (c *Controller) canDeleteCIDR(ctx context.Context, cidr *networkingapiv1alp
continue continue
} }
for _, v := range prefixes { for _, v := range prefixes {
if v.Len() == 1 && v.Has(cidr.Name) { if v.Len() == 1 && v.Has(serviceCIDR.Name) {
return false, nil return false, nil
} }
} }
} }
if cidr.Spec.IPv6 != "" && !hasParentV6 {
ipLabelSelector := labels.Set(map[string]string{
networkingapiv1alpha1.LabelIPAddressFamily: string(v1.IPv6Protocol),
networkingapiv1alpha1.LabelManagedBy: ipallocator.ControllerName,
}).AsSelectorPreValidated()
ips, err := c.ipAddressLister.List(ipLabelSelector)
if err != nil {
return false, err
}
for _, ip := range ips {
// if the longest prefix match is the ServiceCIDR to be deleted
// and is the only existing one, at least one IPAddress will be
// orphan, block the ServiceCIDR deletion.
address, err := netip.ParseAddr(ip.Name)
if err != nil {
// the IPAddress object validates that the name is a valid IPAddress
logger.Info("[SHOULD NOT HAPPEN] unexpected error parsing IPAddress", "IPAddress", ip.Name, "error", err)
continue
}
// walk the tree to find all ServiceCIDRs containing this IP
prefixes := c.tree.GetHostIPPrefixMatches(address)
if len(prefixes) != 1 {
continue
}
for _, v := range prefixes {
if v.Len() == 1 && v.Has(cidr.Name) {
return false, nil
}
}
}
}
} }
// There are no IPAddresses that depend on the existing ServiceCIDR, so // There are no IPAddresses that depend on the existing ServiceCIDR, so
// it is safe to delete, remove finalizer. // it is safe to delete, remove finalizer.
logger.Info("ServiceCIDR no longer have orphan IPs", "ServiceCDIR", cidr.String()) logger.Info("ServiceCIDR no longer have orphan IPs", "ServiceCDIR", serviceCIDR.String())
return true, nil return true, nil
} }
@ -578,3 +524,17 @@ func (c *Controller) removeServiceCIDRFinalizerIfNeeded(ctx context.Context, cid
klog.FromContext(ctx).V(4).Info("Removed protection finalizer from ServiceCIDRs", "ServiceCIDR", cidr.Name) klog.FromContext(ctx).V(4).Info("Removed protection finalizer from ServiceCIDRs", "ServiceCIDR", cidr.Name)
return nil return nil
} }
// Convert netutils.IPFamily to v1.IPFamily
// TODO: consolidate helpers
// copied from pkg/proxy/util/utils.go
func convertToV1IPFamily(ipFamily netutils.IPFamily) v1.IPFamily {
switch ipFamily {
case netutils.IPv4:
return v1.IPv4Protocol
case netutils.IPv6:
return v1.IPv6Protocol
}
return v1.IPFamilyUnknown
}

View File

@ -128,7 +128,7 @@ func TestControllerSync(t *testing.T) {
actions: [][]string{{"patch", "servicecidrs", ""}}, actions: [][]string{{"patch", "servicecidrs", ""}},
}, },
{ {
name: "service CIDR being deleted but withing the grace period must be requeued not remove the finalizer", // TODO: assert is actually requeued name: "service CIDR being deleted but within the grace period must be requeued not remove the finalizer", // TODO: assert is actually requeued
cidrs: []*networkingapiv1alpha1.ServiceCIDR{ cidrs: []*networkingapiv1alpha1.ServiceCIDR{
deletingServiceCIDR, deletingServiceCIDR,
}, },
@ -244,15 +244,16 @@ func TestControllerSync(t *testing.T) {
} }
} }
func makeServiceCIDR(name, ipv4, ipv6 string) *networkingapiv1alpha1.ServiceCIDR { func makeServiceCIDR(name, primary, secondary string) *networkingapiv1alpha1.ServiceCIDR {
serviceCIDR := &networkingapiv1alpha1.ServiceCIDR{ serviceCIDR := &networkingapiv1alpha1.ServiceCIDR{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Name: name, Name: name,
}, },
Spec: networkingapiv1alpha1.ServiceCIDRSpec{ Spec: networkingapiv1alpha1.ServiceCIDRSpec{},
IPv4: ipv4, }
IPv6: ipv6, serviceCIDR.Spec.CIDRs = append(serviceCIDR.Spec.CIDRs, primary)
}, if secondary != "" {
serviceCIDR.Spec.CIDRs = append(serviceCIDR.Spec.CIDRs, secondary)
} }
return serviceCIDR return serviceCIDR
} }