make repair loop multi service cidr aware

The repair loop controller watches the ServiceCIDRs configured
and use them to handle the repair of the IPAddresses assigned
by the kube-apiserver.

Change-Id: I8cfe8fd6285ea91192fc4ec72eaeea1eb004a235

Change-Id: If4be12e2c67b340d86c4efa2f9fb3672f0661636
This commit is contained in:
Antonio Ojea 2023-03-14 00:30:02 +00:00
parent 63fe539b4e
commit 65e6938946
3 changed files with 305 additions and 70 deletions

View File

@ -137,9 +137,8 @@ func New(c Config) (*legacyProvider, error) {
p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepairIPAddress( p.startServiceClusterIPRepair = serviceipallocatorcontroller.NewRepairIPAddress(
c.Services.IPRepairInterval, c.Services.IPRepairInterval,
client, client,
&c.Services.ClusterIPRange,
&c.Services.SecondaryClusterIPRange,
c.Informers.Core().V1().Services(), c.Informers.Core().V1().Services(),
c.Informers.Networking().V1alpha1().ServiceCIDRs(),
c.Informers.Networking().V1alpha1().IPAddresses(), c.Informers.Networking().V1alpha1().IPAddresses(),
).RunUntil ).RunUntil
} }

View File

@ -20,6 +20,8 @@ import (
"context" "context"
"fmt" "fmt"
"net" "net"
"net/netip"
"sync"
"time" "time"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
@ -42,6 +44,7 @@ import (
"k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/core/v1/helper" "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
"k8s.io/kubernetes/pkg/util/iptree"
"k8s.io/utils/clock" "k8s.io/utils/clock"
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
) )
@ -88,18 +91,23 @@ type RepairIPAddress struct {
client kubernetes.Interface client kubernetes.Interface
interval time.Duration interval time.Duration
networkByFamily map[netutils.IPFamily]*net.IPNet // networks we operate on, by their family
serviceLister corelisters.ServiceLister serviceLister corelisters.ServiceLister
servicesSynced cache.InformerSynced servicesSynced cache.InformerSynced
serviceCIDRLister networkinglisters.ServiceCIDRLister
serviceCIDRSynced cache.InformerSynced
ipAddressLister networkinglisters.IPAddressLister ipAddressLister networkinglisters.IPAddressLister
ipAddressSynced cache.InformerSynced ipAddressSynced cache.InformerSynced
cidrQueue workqueue.RateLimitingInterface
svcQueue workqueue.RateLimitingInterface svcQueue workqueue.RateLimitingInterface
ipQueue workqueue.RateLimitingInterface ipQueue workqueue.RateLimitingInterface
workerLoopPeriod time.Duration workerLoopPeriod time.Duration
muTree sync.Mutex
tree *iptree.Tree[string]
broadcaster events.EventBroadcaster broadcaster events.EventBroadcaster
recorder events.EventRecorder recorder events.EventRecorder
clock clock.Clock clock clock.Clock
@ -109,38 +117,32 @@ type RepairIPAddress struct {
// and generates informational warnings for a cluster that is not in sync. // and generates informational warnings for a cluster that is not in sync.
func NewRepairIPAddress(interval time.Duration, func NewRepairIPAddress(interval time.Duration,
client kubernetes.Interface, client kubernetes.Interface,
network *net.IPNet,
secondaryNetwork *net.IPNet,
serviceInformer coreinformers.ServiceInformer, serviceInformer coreinformers.ServiceInformer,
serviceCIDRInformer networkinginformers.ServiceCIDRInformer,
ipAddressInformer networkinginformers.IPAddressInformer) *RepairIPAddress { ipAddressInformer networkinginformers.IPAddressInformer) *RepairIPAddress {
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()}) eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: client.EventsV1()})
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller") recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller")
networkByFamily := make(map[netutils.IPFamily]*net.IPNet)
primary := netutils.IPFamilyOfCIDR(network)
networkByFamily[primary] = network
if secondaryNetwork != nil {
secondary := netutils.IPFamilyOfCIDR(secondaryNetwork)
networkByFamily[secondary] = secondaryNetwork
}
r := &RepairIPAddress{ r := &RepairIPAddress{
interval: interval, interval: interval,
client: client, client: client,
networkByFamily: networkByFamily, serviceLister: serviceInformer.Lister(),
serviceLister: serviceInformer.Lister(), servicesSynced: serviceInformer.Informer().HasSynced,
servicesSynced: serviceInformer.Informer().HasSynced, serviceCIDRLister: serviceCIDRInformer.Lister(),
ipAddressLister: ipAddressInformer.Lister(), serviceCIDRSynced: serviceCIDRInformer.Informer().HasSynced,
ipAddressSynced: ipAddressInformer.Informer().HasSynced, ipAddressLister: ipAddressInformer.Lister(),
svcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "services"), ipAddressSynced: ipAddressInformer.Informer().HasSynced,
ipQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ipaddresses"), cidrQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "servicecidrs"),
workerLoopPeriod: time.Second, svcQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "services"),
broadcaster: eventBroadcaster, ipQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ipaddresses"),
recorder: recorder, tree: iptree.New[string](),
clock: clock.RealClock{}, workerLoopPeriod: time.Second,
broadcaster: eventBroadcaster,
recorder: recorder,
clock: clock.RealClock{},
} }
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ _, _ = serviceInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj) key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil { if err == nil {
@ -163,6 +165,29 @@ func NewRepairIPAddress(interval time.Duration,
}, },
}, interval) }, interval)
_, _ = serviceCIDRInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
r.cidrQueue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
r.cidrQueue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
r.cidrQueue.Add(key)
}
},
})
ipAddressInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ ipAddressInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj) key, err := cache.MetaNamespaceKeyFunc(obj)
@ -191,6 +216,7 @@ func NewRepairIPAddress(interval time.Duration,
// RunUntil starts the controller until the provided ch is closed. // RunUntil starts the controller until the provided ch is closed.
func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{}) { func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
defer r.cidrQueue.ShutDown()
defer r.ipQueue.ShutDown() defer r.ipQueue.ShutDown()
defer r.svcQueue.ShutDown() defer r.svcQueue.ShutDown()
r.broadcaster.StartRecordingToSink(stopCh) r.broadcaster.StartRecordingToSink(stopCh)
@ -199,7 +225,7 @@ func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{})
klog.Info("Starting ipallocator-repair-controller") klog.Info("Starting ipallocator-repair-controller")
defer klog.Info("Shutting down ipallocator-repair-controller") defer klog.Info("Shutting down ipallocator-repair-controller")
if !cache.WaitForNamedCacheSync("ipallocator-repair-controller", stopCh, r.ipAddressSynced, r.servicesSynced) { if !cache.WaitForNamedCacheSync("ipallocator-repair-controller", stopCh, r.ipAddressSynced, r.servicesSynced, r.serviceCIDRSynced) {
return return
} }
@ -212,6 +238,9 @@ func (r *RepairIPAddress) RunUntil(onFirstSuccess func(), stopCh chan struct{})
} }
onFirstSuccess() onFirstSuccess()
// serialize the operations on ServiceCIDRs
go wait.Until(r.cidrWorker, r.workerLoopPeriod, stopCh)
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(r.ipWorker, r.workerLoopPeriod, stopCh) go wait.Until(r.ipWorker, r.workerLoopPeriod, stopCh)
go wait.Until(r.svcWorker, r.workerLoopPeriod, stopCh) go wait.Until(r.svcWorker, r.workerLoopPeriod, stopCh)
@ -330,20 +359,16 @@ func (r *RepairIPAddress) syncService(key string) error {
runtime.HandleError(fmt.Errorf("the ClusterIP %s for Service %s/%s is not a valid IP; please recreate Service", ip, svc.Namespace, svc.Name)) runtime.HandleError(fmt.Errorf("the ClusterIP %s for Service %s/%s is not a valid IP; please recreate Service", ip, svc.Namespace, svc.Name))
continue continue
} }
// TODO(aojea) Refactor to abstract the IPs checks
family := getFamilyByIP(ip)
family := netutils.IPFamilyOf(ip) r.muTree.Lock()
v1Family := getFamilyByIP(ip) prefixes := r.tree.GetHostIPPrefixMatches(ipToAddr(ip))
network, ok := r.networkByFamily[family] r.muTree.Unlock()
if !ok { if len(prefixes) == 0 {
// this service is using an IPFamily no longer configured on cluster
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate Service", ip, v1Family)
runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is of ip family that is no longer configured on cluster; please recreate Service", v1Family, ip, svc.Namespace, svc.Name))
continue
}
if !network.Contains(ip) {
// ClusterIP is out of range // ClusterIP is out of range
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]: %s is not within the configured Service CIDR %s; please recreate service", v1Family, ip, network.String()) r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]: %s is not within the configured Service CIDR; please recreate service", family, ip)
runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not within the service CIDR %s; please recreate", v1Family, ip, svc.Namespace, svc.Name, network.String())) runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not within any service CIDR; please recreate", family, ip, svc.Namespace, svc.Name))
continue continue
} }
@ -351,8 +376,8 @@ func (r *RepairIPAddress) syncService(key string) error {
ipAddress, err := r.ipAddressLister.Get(ip.String()) ipAddress, err := r.ipAddressLister.Get(ip.String())
if apierrors.IsNotFound(err) { if apierrors.IsNotFound(err) {
// ClusterIP doesn't seem to be allocated, create it. // ClusterIP doesn't seem to be allocated, create it.
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]: %s is not allocated; repairing", v1Family, ip) r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]: %s is not allocated; repairing", family, ip)
runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not allocated; repairing", v1Family, ip, svc.Namespace, svc.Name)) runtime.HandleError(fmt.Errorf("the ClusterIP [%v]: %s for Service %s/%s is not allocated; repairing", family, ip, svc.Namespace, svc.Name))
_, err := r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(ip.String(), svc), metav1.CreateOptions{}) _, err := r.client.NetworkingV1alpha1().IPAddresses().Create(context.Background(), newIPAddress(ip.String(), svc), metav1.CreateOptions{})
if err != nil { if err != nil {
return err return err
@ -360,14 +385,14 @@ func (r *RepairIPAddress) syncService(key string) error {
continue continue
} }
if err != nil { if err != nil {
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate ClusterIP [%v]: %s due to an unknown error", v1Family, ip) r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate ClusterIP [%v]: %s due to an unknown error", family, ip)
return fmt.Errorf("unable to allocate ClusterIP [%v]: %s for Service %s/%s due to an unknown error, will retry later: %v", v1Family, ip, svc.Namespace, svc.Name, err) return fmt.Errorf("unable to allocate ClusterIP [%v]: %s for Service %s/%s due to an unknown error, will retry later: %v", family, ip, svc.Namespace, svc.Name, err)
} }
// IPAddress that belongs to a Service must reference a Service // IPAddress that belongs to a Service must reference a Service
if ipAddress.Spec.ParentRef.Group != "" || if ipAddress.Spec.ParentRef.Group != "" ||
ipAddress.Spec.ParentRef.Resource != "services" { ipAddress.Spec.ParentRef.Resource != "services" {
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", v1Family, ip, svc.Namespace, svc.Name) r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", family, ip, svc.Namespace, svc.Name)
if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil { if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
return err return err
} }
@ -381,7 +406,7 @@ func (r *RepairIPAddress) syncService(key string) error {
// it will keep deleting and recreating the same IPAddress changing the reference // it will keep deleting and recreating the same IPAddress changing the reference
refService, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name) refService, err := r.serviceLister.Services(ipAddress.Spec.ParentRef.Namespace).Get(ipAddress.Spec.ParentRef.Name)
if err != nil { if err != nil {
r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", v1Family, ip, svc.Namespace, svc.Name) r.recorder.Eventf(svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "the ClusterIP [%v]: %s for Service %s/%s has a wrong reference; repairing", family, ip, svc.Namespace, svc.Name)
if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil { if err := r.recreateIPAddress(ipAddress.Name, svc); err != nil {
return err return err
} }
@ -472,8 +497,7 @@ func (r *RepairIPAddress) syncIPAddress(key string) error {
} }
// does not reference a Service but created by the service allocator, something else have changed it, delete it // does not reference a Service but created by the service allocator, something else have changed it, delete it
if ipAddress.Spec.ParentRef.Group != "" || if ipAddress.Spec.ParentRef.Group != "" || ipAddress.Spec.ParentRef.Resource != "services" {
ipAddress.Spec.ParentRef.Resource != "services" {
runtime.HandleError(fmt.Errorf("IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef)) runtime.HandleError(fmt.Errorf("IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef))
r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef) r.recorder.Eventf(ipAddress, nil, v1.EventTypeWarning, "IPAddressNotAllocated", "IPAddressAllocation", "IPAddress %s appears to have been modified, not referencing a Service %v: cleaning up", ipAddress.Name, ipAddress.Spec.ParentRef)
err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{}) err := r.client.NetworkingV1alpha1().IPAddresses().Delete(context.Background(), ipAddress.Name, metav1.DeleteOptions{})
@ -523,6 +547,64 @@ func (r *RepairIPAddress) syncIPAddress(key string) error {
return nil return nil
} }
func (r *RepairIPAddress) cidrWorker() {
for r.processNextWorkCIDR() {
}
}
func (r *RepairIPAddress) processNextWorkCIDR() bool {
eKey, quit := r.cidrQueue.Get()
if quit {
return false
}
defer r.cidrQueue.Done(eKey)
err := r.syncCIDRs()
r.handleCIDRErr(err, eKey)
return true
}
func (r *RepairIPAddress) handleCIDRErr(err error, key interface{}) {
if err == nil {
r.cidrQueue.Forget(key)
return
}
if r.cidrQueue.NumRequeues(key) < maxRetries {
klog.V(2).InfoS("Error syncing ServiceCIDR, retrying", "serviceCIDR", key, "err", err)
r.cidrQueue.AddRateLimited(key)
return
}
klog.Warningf("Dropping ServiceCIDR %q out of the queue: %v", key, err)
r.cidrQueue.Forget(key)
runtime.HandleError(err)
}
// syncCIDRs rebuilds the radix tree based from the informers cache
func (r *RepairIPAddress) syncCIDRs() error {
cidrList, err := r.serviceCIDRLister.List(labels.Everything())
if err != nil {
return err
}
tree := iptree.New[string]()
for _, cidr := range cidrList {
if prefix, err := netip.ParsePrefix(cidr.Spec.IPv4); err == nil { // if is empty err will not be nil
tree.InsertPrefix(prefix, cidr.Name)
}
if prefix, err := netip.ParsePrefix(cidr.Spec.IPv6); err == nil { // if is empty err will not be nil
tree.InsertPrefix(prefix, cidr.Name)
}
}
r.muTree.Lock()
defer r.muTree.Unlock()
r.tree = tree
return nil
}
func newIPAddress(name string, svc *v1.Service) *networkingv1alpha1.IPAddress { func newIPAddress(name string, svc *v1.Service) *networkingv1alpha1.IPAddress {
family := string(v1.IPv4Protocol) family := string(v1.IPv4Protocol)
if netutils.IsIPv6String(name) { if netutils.IsIPv6String(name) {
@ -587,3 +669,20 @@ func verifyIPAddressLabels(ip *networkingv1alpha1.IPAddress) bool {
} }
return managedByController(ip) return managedByController(ip)
} }
// TODO(aojea) move to utils, already in pkg/registry/core/service/ipallocator/cidrallocator.go
// ipToAddr converts a net.IP to a netip.Addr
// if the net.IP is not valid it returns an empty netip.Addr{}
func ipToAddr(ip net.IP) netip.Addr {
// https://pkg.go.dev/net/netip#AddrFromSlice can return an IPv4 in IPv6 format
// so we have to check the IP family to return exactly the format that we want
// address, _ := netip.AddrFromSlice(net.ParseIPSloppy(192.168.0.1)) returns
// an address like ::ffff:192.168.0.1/32
bytes := ip.To4()
if bytes == nil {
bytes = ip.To16()
}
// AddrFromSlice returns Addr{}, false if the input is invalid.
address, _ := netip.AddrFromSlice(bytes)
return address
}

View File

@ -33,7 +33,6 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/events" "k8s.io/client-go/tools/events"
"k8s.io/kubernetes/pkg/registry/core/service/ipallocator" "k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
netutils "k8s.io/utils/net"
) )
var ( var (
@ -43,8 +42,9 @@ var (
type fakeRepair struct { type fakeRepair struct {
*RepairIPAddress *RepairIPAddress
serviceStore cache.Store serviceStore cache.Store
ipAddressStore cache.Store ipAddressStore cache.Store
serviceCIDRStore cache.Store
} }
func newFakeRepair() (*fake.Clientset, *fakeRepair) { func newFakeRepair() (*fake.Clientset, *fakeRepair) {
@ -54,6 +54,9 @@ func newFakeRepair() (*fake.Clientset, *fakeRepair) {
serviceInformer := informerFactory.Core().V1().Services() serviceInformer := informerFactory.Core().V1().Services()
serviceIndexer := serviceInformer.Informer().GetIndexer() serviceIndexer := serviceInformer.Informer().GetIndexer()
serviceCIDRInformer := informerFactory.Networking().V1alpha1().ServiceCIDRs()
serviceCIDRIndexer := serviceCIDRInformer.Informer().GetIndexer()
ipInformer := informerFactory.Networking().V1alpha1().IPAddresses() ipInformer := informerFactory.Networking().V1alpha1().IPAddresses()
ipIndexer := ipInformer.Informer().GetIndexer() ipIndexer := ipInformer.Informer().GetIndexer()
@ -72,22 +75,13 @@ func newFakeRepair() (*fake.Clientset, *fakeRepair) {
return false, &networkingv1alpha1.IPAddress{}, err return false, &networkingv1alpha1.IPAddress{}, err
})) }))
_, primary, err := netutils.ParseCIDRSloppy(serviceCIDRv4)
if err != nil {
panic(err)
}
_, secondary, err := netutils.ParseCIDRSloppy(serviceCIDRv6)
if err != nil {
panic(err)
}
r := NewRepairIPAddress(0*time.Second, r := NewRepairIPAddress(0*time.Second,
fakeClient, fakeClient,
primary,
secondary,
serviceInformer, serviceInformer,
serviceCIDRInformer,
ipInformer, ipInformer,
) )
return fakeClient, &fakeRepair{r, serviceIndexer, ipIndexer} return fakeClient, &fakeRepair{r, serviceIndexer, ipIndexer, serviceCIDRIndexer}
} }
func TestRepairServiceIP(t *testing.T) { func TestRepairServiceIP(t *testing.T) {
@ -95,6 +89,7 @@ func TestRepairServiceIP(t *testing.T) {
name string name string
svcs []*v1.Service svcs []*v1.Service
ipAddresses []*networkingv1alpha1.IPAddress ipAddresses []*networkingv1alpha1.IPAddress
cidrs []*networkingv1alpha1.ServiceCIDR
expectedIPs []string expectedIPs []string
actions [][]string // verb and resource actions [][]string // verb and resource
events []string events []string
@ -105,6 +100,9 @@ func TestRepairServiceIP(t *testing.T) {
ipAddresses: []*networkingv1alpha1.IPAddress{ ipAddresses: []*networkingv1alpha1.IPAddress{
newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})),
}, },
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"10.0.1.1"}, expectedIPs: []string{"10.0.1.1"},
actions: [][]string{}, actions: [][]string{},
events: []string{}, events: []string{},
@ -116,21 +114,45 @@ func TestRepairServiceIP(t *testing.T) {
newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})),
newIPAddress("2001:db8::10", newService("test-svc", []string{"2001:db8::10"})), newIPAddress("2001:db8::10", newService("test-svc", []string{"2001:db8::10"})),
}, },
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"10.0.1.1", "2001:db8::10"}, expectedIPs: []string{"10.0.1.1", "2001:db8::10"},
actions: [][]string{}, actions: [][]string{},
events: []string{}, events: []string{},
}, },
{
name: "no changes needed dual stack multiple cidrs",
svcs: []*v1.Service{newService("test-svc", []string{"192.168.0.1", "2001:db8:a:b::10"})},
ipAddresses: []*networkingv1alpha1.IPAddress{
newIPAddress("192.168.0.1", newService("test-svc", []string{"192.168.0.1"})),
newIPAddress("2001:db8:a:b::10", newService("test-svc", []string{"2001:db8:a:b::10"})),
},
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
newServiceCIDR("custom", "192.168.0.0/24", "2001:db8:a:b::/64"),
},
expectedIPs: []string{"192.168.0.1", "2001:db8:a:b::10"},
actions: [][]string{},
events: []string{},
},
// these two cases simulate migrating from bitmaps to IPAddress objects // these two cases simulate migrating from bitmaps to IPAddress objects
{ {
name: "create IPAddress single stack", name: "create IPAddress single stack",
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})},
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"10.0.1.1"}, expectedIPs: []string{"10.0.1.1"},
actions: [][]string{{"create", "ipaddresses"}}, actions: [][]string{{"create", "ipaddresses"}},
events: []string{"Warning ClusterIPNotAllocated Cluster IP [IPv4]: 10.0.1.1 is not allocated; repairing"}, events: []string{"Warning ClusterIPNotAllocated Cluster IP [IPv4]: 10.0.1.1 is not allocated; repairing"},
}, },
{ {
name: "create IPAddresses dual stack", name: "create IPAddresses dual stack",
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1", "2001:db8::10"})}, svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1", "2001:db8::10"})},
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"10.0.1.1", "2001:db8::10"}, expectedIPs: []string{"10.0.1.1", "2001:db8::10"},
actions: [][]string{{"create", "ipaddresses"}, {"create", "ipaddresses"}}, actions: [][]string{{"create", "ipaddresses"}, {"create", "ipaddresses"}},
events: []string{ events: []string{
@ -138,12 +160,26 @@ func TestRepairServiceIP(t *testing.T) {
"Warning ClusterIPNotAllocated Cluster IP [IPv6]: 2001:db8::10 is not allocated; repairing", "Warning ClusterIPNotAllocated Cluster IP [IPv6]: 2001:db8::10 is not allocated; repairing",
}, },
}, },
{
name: "create IPAddress single stack from secondary",
svcs: []*v1.Service{newService("test-svc", []string{"192.168.1.1"})},
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
newServiceCIDR("custom", "192.168.1.0/24", ""),
},
expectedIPs: []string{"192.168.1.1"},
actions: [][]string{{"create", "ipaddresses"}},
events: []string{"Warning ClusterIPNotAllocated Cluster IP [IPv4]: 192.168.1.1 is not allocated; repairing"},
},
{ {
name: "reconcile IPAddress single stack wrong reference", name: "reconcile IPAddress single stack wrong reference",
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})},
ipAddresses: []*networkingv1alpha1.IPAddress{ ipAddresses: []*networkingv1alpha1.IPAddress{
newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})), newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})),
}, },
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"10.0.1.1"}, expectedIPs: []string{"10.0.1.1"},
actions: [][]string{{"delete", "ipaddresses"}, {"create", "ipaddresses"}}, actions: [][]string{{"delete", "ipaddresses"}, {"create", "ipaddresses"}},
events: []string{"Warning ClusterIPNotAllocated the ClusterIP [IPv4]: 10.0.1.1 for Service bar/test-svc has a wrong reference; repairing"}, events: []string{"Warning ClusterIPNotAllocated the ClusterIP [IPv4]: 10.0.1.1 for Service bar/test-svc has a wrong reference; repairing"},
@ -155,6 +191,9 @@ func TestRepairServiceIP(t *testing.T) {
newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})), newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})),
newIPAddress("2001:db8::10", newService("test-svc2", []string{"2001:db8::10"})), newIPAddress("2001:db8::10", newService("test-svc2", []string{"2001:db8::10"})),
}, },
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"10.0.1.1", "2001:db8::10"}, expectedIPs: []string{"10.0.1.1", "2001:db8::10"},
actions: [][]string{{"delete", "ipaddresses"}, {"create", "ipaddresses"}, {"delete", "ipaddresses"}, {"create", "ipaddresses"}}, actions: [][]string{{"delete", "ipaddresses"}, {"create", "ipaddresses"}, {"delete", "ipaddresses"}, {"create", "ipaddresses"}},
events: []string{ events: []string{
@ -169,18 +208,85 @@ func TestRepairServiceIP(t *testing.T) {
newIPAddress("192.168.1.1", newService("test-svc", []string{"192.168.1.1"})), newIPAddress("192.168.1.1", newService("test-svc", []string{"192.168.1.1"})),
newIPAddress("2001:db8::10", newService("test-svc", []string{"2001:db8::10"})), newIPAddress("2001:db8::10", newService("test-svc", []string{"2001:db8::10"})),
}, },
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"2001:db8::10"}, expectedIPs: []string{"2001:db8::10"},
actions: [][]string{}, actions: [][]string{},
events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv4]: 192.168.1.1 is not within the configured Service CIDR 10.0.0.0/16; please recreate service"}, events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv4]: 192.168.1.1 is not within the configured Service CIDR; please recreate service"},
}, },
{ {
name: "one IP orphan", name: "one IP orphan",
ipAddresses: []*networkingv1alpha1.IPAddress{ ipAddresses: []*networkingv1alpha1.IPAddress{
newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})),
}, },
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
actions: [][]string{{"delete", "ipaddresses"}}, actions: [][]string{{"delete", "ipaddresses"}},
events: []string{"Warning IPAddressNotAllocated IPAddress: 10.0.1.1 for Service bar/test-svc appears to have leaked: cleaning up"}, events: []string{"Warning IPAddressNotAllocated IPAddress: 10.0.1.1 for Service bar/test-svc appears to have leaked: cleaning up"},
}, },
{
name: "one IP out of range matching the network address",
svcs: []*v1.Service{newService("test-svc", []string{"10.0.0.0"})},
ipAddresses: []*networkingv1alpha1.IPAddress{
newIPAddress("10.0.0.0", newService("test-svc", []string{"10.0.0.0"})),
},
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"10.0.0.0"},
actions: [][]string{},
events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv4]: 10.0.0.0 is not within the configured Service CIDR; please recreate service"},
},
{
name: "one IP out of range matching the broadcast address",
svcs: []*v1.Service{newService("test-svc", []string{"10.0.255.255"})},
ipAddresses: []*networkingv1alpha1.IPAddress{
newIPAddress("10.0.255.255", newService("test-svc", []string{"10.0.255.255"})),
},
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"10.0.255.255"},
actions: [][]string{},
events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv4]: 10.0.255.255 is not within the configured Service CIDR; please recreate service"},
},
{
name: "one IPv6 out of range matching the subnet address",
svcs: []*v1.Service{newService("test-svc", []string{"2001:db8::"})},
ipAddresses: []*networkingv1alpha1.IPAddress{
newIPAddress("2001:db8::", newService("test-svc", []string{"2001:db8::"})),
},
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"2001:db8::"},
actions: [][]string{},
events: []string{"Warning ClusterIPOutOfRange Cluster IP [IPv6]: 2001:db8:: is not within the configured Service CIDR; please recreate service"},
},
{
name: "one IPv6 matching the broadcast address",
svcs: []*v1.Service{newService("test-svc", []string{"2001:db8::ffff:ffff:ffff:ffff"})},
ipAddresses: []*networkingv1alpha1.IPAddress{
newIPAddress("2001:db8::ffff:ffff:ffff:ffff", newService("test-svc", []string{"2001:db8::ffff:ffff:ffff:ffff"})),
},
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
expectedIPs: []string{"2001:db8::ffff:ffff:ffff:ffff"},
},
{
name: "one IP orphan matching the broadcast address",
ipAddresses: []*networkingv1alpha1.IPAddress{
newIPAddress("10.0.255.255", newService("test-svc", []string{"10.0.255.255"})),
},
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
actions: [][]string{{"delete", "ipaddresses"}},
events: []string{"Warning IPAddressNotAllocated IPAddress: 10.0.255.255 for Service bar/test-svc appears to have leaked: cleaning up"},
},
{ {
name: "Two IPAddresses referencing the same service", name: "Two IPAddresses referencing the same service",
svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})}, svcs: []*v1.Service{newService("test-svc", []string{"10.0.1.1"})},
@ -188,6 +294,9 @@ func TestRepairServiceIP(t *testing.T) {
newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})), newIPAddress("10.0.1.1", newService("test-svc", []string{"10.0.1.1"})),
newIPAddress("10.0.1.2", newService("test-svc", []string{"10.0.1.1"})), newIPAddress("10.0.1.2", newService("test-svc", []string{"10.0.1.1"})),
}, },
cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
actions: [][]string{{"delete", "ipaddresses"}}, actions: [][]string{{"delete", "ipaddresses"}},
events: []string{"Warning IPAddressWrongReference IPAddress: 10.0.1.2 for Service bar/test-svc has a wrong reference; cleaning up"}, events: []string{"Warning IPAddressWrongReference IPAddress: 10.0.1.2 for Service bar/test-svc has a wrong reference; cleaning up"},
}, },
@ -200,7 +309,10 @@ func TestRepairServiceIP(t *testing.T) {
ipAddresses: []*networkingv1alpha1.IPAddress{ ipAddresses: []*networkingv1alpha1.IPAddress{
newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})), newIPAddress("10.0.1.1", newService("test-svc2", []string{"10.0.1.1"})),
}, },
events: []string{"Warning ClusterIPAlreadyAllocated Cluster IP [4]:10.0.1.1 was assigned to multiple services; please recreate service"}, cidrs: []*networkingv1alpha1.ServiceCIDR{
newServiceCIDR("kubernetes", serviceCIDRv4, serviceCIDRv6),
},
events: []string{"Warning ClusterIPAlreadyAllocated Cluster IP [IPv4]:10.0.1.1 was assigned to multiple services; please recreate service"},
}, },
} }
@ -208,9 +320,21 @@ func TestRepairServiceIP(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
c, r := newFakeRepair() c, r := newFakeRepair()
// add cidrs
for _, cidr := range test.cidrs {
err := r.serviceCIDRStore.Add(cidr)
if err != nil {
t.Errorf("Unexpected error trying to add Service %v object: %v", cidr, err)
}
}
err := r.syncCIDRs()
if err != nil {
t.Fatal(err)
}
// override for testing // override for testing
r.servicesSynced = func() bool { return true } r.servicesSynced = func() bool { return true }
r.ipAddressSynced = func() bool { return true } r.ipAddressSynced = func() bool { return true }
r.serviceCIDRSynced = func() bool { return true }
recorder := events.NewFakeRecorder(100) recorder := events.NewFakeRecorder(100)
r.recorder = recorder r.recorder = recorder
for _, svc := range test.svcs { for _, svc := range test.svcs {
@ -228,7 +352,7 @@ func TestRepairServiceIP(t *testing.T) {
} }
} }
err := r.runOnce() err = r.runOnce()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -402,6 +526,19 @@ func newService(name string, ips []string) *v1.Service {
return svc return svc
} }
func newServiceCIDR(name, ipv4, ipv6 string) *networkingv1alpha1.ServiceCIDR {
serviceCIDR := &networkingv1alpha1.ServiceCIDR{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: networkingv1alpha1.ServiceCIDRSpec{
IPv4: ipv4,
IPv6: ipv6,
},
}
return serviceCIDR
}
func expectAction(t *testing.T, actions []k8stesting.Action, expected [][]string) { func expectAction(t *testing.T, actions []k8stesting.Action, expected [][]string) {
t.Helper() t.Helper()
if len(actions) != len(expected) { if len(actions) != len(expected) {