diff --git a/pkg/controlplane/controller.go b/pkg/controlplane/controller.go index 8887ded5416..408257ed686 100644 --- a/pkg/controlplane/controller.go +++ b/pkg/controlplane/controller.go @@ -33,6 +33,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/storage" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controlplane/reconcilers" @@ -55,7 +56,7 @@ const ( type Controller struct { ServiceClient corev1client.ServicesGetter NamespaceClient corev1client.NamespacesGetter - EventClient corev1client.EventsGetter + EventClient eventsv1client.EventsV1Interface readyzClient rest.Interface ServiceClusterIPRegistry rangeallocation.RangeRegistry @@ -85,11 +86,12 @@ type Controller struct { PublicServicePort int KubernetesServiceNodePort int + stopCh chan struct{} runner *async.Runner } // NewBootstrapController returns a controller for watching the core capabilities of the master -func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter, readyzClient rest.Interface) (*Controller, error) { +func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient eventsv1client.EventsV1Interface, readyzClient rest.Interface) (*Controller, error) { _, publicServicePort, err := c.GenericConfig.SecureServing.HostPort() if err != nil { return nil, fmt.Errorf("failed to get listener address: %w", err) @@ -172,6 +174,28 @@ func (c *Controller) Start() { repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry) repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry) + // We start both repairClusterIPs and repairNodePorts to catch events + // coming from the RunOnce() calls to them. + // TODO: Refactor both controllers to only expose a public RunUntil + // method, but to accommodate the usecase below (of forcing the first + // successful call before proceeding) let them signal on that, like: + // func (c *Repair) RunUntil(stopCh chan struct{}, onFirstSuccess func()) + // and use it here like: + // wg := sync.WaitGroup{} + // wg.Add(2) + // runRepairClusterIPs := func(stopCh chan struct{}) { + // repairClusterIPs(stopCh, wg.Done) + // } + // runRepairNodePorts := func(stopCh chan struct{}) { + // repairNodePorts(stopCh, wg.Done) + // } + // c.runner = ... + // c.runner.Start() + // wg.Wait() + c.stopCh = make(chan struct{}) + repairClusterIPs.Start(c.stopCh) + repairNodePorts.Start(c.stopCh) + // run all of the controllers once prior to returning from Start. if err := repairClusterIPs.RunOnce(); err != nil { // If we fail to repair cluster IPs apiserver is useless. We should restart and retry. @@ -190,6 +214,7 @@ func (c *Controller) Start() { func (c *Controller) Stop() { if c.runner != nil { c.runner.Stop() + close(c.stopCh) } endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts) finishedReconciling := make(chan struct{}) diff --git a/pkg/controlplane/controller_test.go b/pkg/controlplane/controller_test.go index 19d1c6c790e..f20cdee24b0 100644 --- a/pkg/controlplane/controller_test.go +++ b/pkg/controlplane/controller_test.go @@ -27,6 +27,7 @@ import ( genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/kubernetes/fake" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/client-go/rest" core "k8s.io/client-go/testing" "k8s.io/kubernetes/pkg/controlplane/reconcilers" @@ -1021,7 +1022,7 @@ func Test_completedConfig_NewBootstrapController(t *testing.T) { legacyRESTStorage corerest.LegacyRESTStorage serviceClient corev1client.ServicesGetter nsClient corev1client.NamespacesGetter - eventClient corev1client.EventsGetter + eventClient eventsv1client.EventsV1Interface readyzClient rest.Interface } tests := []struct { diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index 8a6c246fad6..24fb0234be0 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -70,6 +70,7 @@ import ( "k8s.io/client-go/kubernetes" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" "k8s.io/component-helpers/apimachinery/lease" "k8s.io/klog/v2" api "k8s.io/kubernetes/pkg/apis/core" @@ -543,7 +544,8 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi controllerName := "bootstrap-controller" coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) - bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient()) + eventsClient := eventsv1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig) + bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, eventsClient, coreClient.RESTClient()) if err != nil { return fmt.Errorf("error creating bootstrap controller: %v", err) } diff --git a/pkg/registry/core/service/ipallocator/controller/repair.go b/pkg/registry/core/service/ipallocator/controller/repair.go index 37b0a78f777..d3bd7f1833c 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair.go +++ b/pkg/registry/core/service/ipallocator/controller/repair.go @@ -28,7 +28,8 @@ import ( "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" + "k8s.io/client-go/tools/events" "k8s.io/client-go/util/retry" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" @@ -61,7 +62,8 @@ type Repair struct { allocatorByFamily map[v1.IPFamily]rangeallocation.RangeRegistry // allocators we use, by their family leaksByFamily map[v1.IPFamily]map[string]int // counter per leaked IP per family - recorder record.EventRecorder + broadcaster events.EventBroadcaster + recorder events.EventRecorder } // How many times we need to detect a leak before we clean up. This is to @@ -70,10 +72,9 @@ const numRepairsBeforeLeakCleanup = 3 // NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. -func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry, secondaryNetwork *net.IPNet, secondaryAlloc rangeallocation.RangeRegistry) *Repair { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")}) - recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ipallocator-repair-controller"}) +func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient eventsv1client.EventsV1Interface, network *net.IPNet, alloc rangeallocation.RangeRegistry, secondaryNetwork *net.IPNet, secondaryAlloc rangeallocation.RangeRegistry) *Repair { + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient}) + recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller") // build *ByFamily struct members networkByFamily := make(map[v1.IPFamily]*net.IPNet) @@ -107,17 +108,23 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter allocatorByFamily: allocatorByFamily, leaksByFamily: leaksByFamily, + broadcaster: eventBroadcaster, recorder: recorder, } } +// Start starts events recording. +func (c *Repair) Start(stopCh chan struct{}) { + c.broadcaster.StartRecordingToSink(stopCh) +} + // RunUntil starts the controller until the provided ch is closed. -func (c *Repair) RunUntil(ch chan struct{}) { +func (c *Repair) RunUntil(stopCh chan struct{}) { wait.Until(func() { if err := c.RunOnce(); err != nil { runtime.HandleError(err) } - }, c.interval, ch) + }, c.interval, stopCh) } // RunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs. @@ -213,7 +220,7 @@ func (c *Repair) runOnce() error { ip := netutils.ParseIPSloppy(ip) if ip == nil { // cluster IP is corrupt - c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotValid", "Cluster IP %s is not a valid IP; please recreate service", ip) + c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate service", ip) runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", ip, svc.Name, svc.Namespace)) continue } @@ -221,7 +228,7 @@ func (c *Repair) runOnce() error { family := getFamilyByIP(ip) if _, ok := rebuiltByFamily[family]; !ok { // this service is using an IPFamily no longer configured on cluster - c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotValid", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family) + c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family) runtime.HandleError(fmt.Errorf("the cluster IP %s(%s) for service %s/%s is of ip family that is no longer configured on cluster; please recreate", ip, family, svc.Name, svc.Namespace)) continue } @@ -236,25 +243,25 @@ func (c *Repair) runOnce() error { actualStored.Release(ip) } else { // cluster IP doesn't seem to be allocated - c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotAllocated", "Cluster IP [%v]:%s is not allocated; repairing", family, ip) + c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s is not allocated; repairing", family, ip) runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not allocated; repairing", family, ip, svc.Name, svc.Namespace)) } delete(c.leaksByFamily[family], ip.String()) // it is used, so it can't be leaked case ipallocator.ErrAllocated: // cluster IP is duplicate - c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip) + c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip) runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to multiple services; please recreate", family, ip, svc.Name, svc.Namespace)) case err.(*ipallocator.ErrNotInRange): // cluster IP is out of range - c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPOutOfRange", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family]) + c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family]) runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not within the service CIDR %s; please recreate", family, ip, svc.Name, svc.Namespace, c.networkByFamily[family])) case ipallocator.ErrFull: // somehow we are out of IPs cidr := actualAlloc.CIDR() - c.recorder.Eventf(&svc, v1.EventTypeWarning, "ServiceCIDRFull", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip) + c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ServiceCIDRFull", "ClusterIPAllocation", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip) return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip) default: - c.recorder.Eventf(&svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip) + c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip) return fmt.Errorf("unable to allocate cluster IP [%v]:%s for service %s/%s due to an unknown error, exiting: %v", family, ip, svc.Name, svc.Namespace, err) } } diff --git a/pkg/registry/core/service/ipallocator/controller/repair_test.go b/pkg/registry/core/service/ipallocator/controller/repair_test.go index cd96298f94c..ceb8cd20f3e 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair_test.go +++ b/pkg/registry/core/service/ipallocator/controller/repair_test.go @@ -58,7 +58,7 @@ func TestRepair(t *testing.T) { item: &api.RangeAllocation{Range: "192.168.1.0/24"}, } _, cidr, _ := netutils.ParseCIDRSloppy(ipregistry.item.Range) - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil) if err := r.RunOnce(); err != nil { t.Fatal(err) @@ -71,7 +71,7 @@ func TestRepair(t *testing.T) { item: &api.RangeAllocation{Range: "192.168.1.0/24"}, updateErr: fmt.Errorf("test error"), } - r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil) + r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil) if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } @@ -102,7 +102,7 @@ func TestRepairLeak(t *testing.T) { }, } - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil) // Run through the "leak detection holdoff" loops. for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { if err := r.RunOnce(); err != nil { @@ -200,7 +200,7 @@ func TestRepairWithExisting(t *testing.T) { Data: dst.Data, }, } - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil) if err := r.RunOnce(); err != nil { t.Fatal(err) } @@ -290,7 +290,7 @@ func TestShouldWorkOnSecondary(t *testing.T) { secondaryRegistry = makeRangeRegistry(t, tc.secondaryNet.String()) } - repair := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), tc.primaryNet, primaryRegistry, tc.secondaryNet, secondaryRegistry) + repair := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), tc.primaryNet, primaryRegistry, tc.secondaryNet, secondaryRegistry) if len(repair.allocatorByFamily) != len(tc.expectedFamilies) { t.Fatalf("expected to have allocator by family count:%v got %v", len(tc.expectedFamilies), len(repair.allocatorByFamily)) } @@ -334,7 +334,7 @@ func TestRepairDualStack(t *testing.T) { _, cidr, _ := netutils.ParseCIDRSloppy(ipregistry.item.Range) _, secondaryCIDR, _ := netutils.ParseCIDRSloppy(secondaryIPRegistry.item.Range) - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) if err := r.RunOnce(); err != nil { t.Fatal(err) @@ -355,7 +355,7 @@ func TestRepairDualStack(t *testing.T) { updateErr: fmt.Errorf("test error"), } - r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) + r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } @@ -410,7 +410,7 @@ func TestRepairLeakDualStack(t *testing.T) { }, } - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) // Run through the "leak detection holdoff" loops. for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { if err := r.RunOnce(); err != nil { @@ -596,7 +596,7 @@ func TestRepairWithExistingDualStack(t *testing.T) { }, } - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) if err := r.RunOnce(); err != nil { t.Fatal(err) } diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index d37e12ee5d0..ab6abcfdcd4 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -29,7 +29,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/record" + eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1" + "k8s.io/client-go/tools/events" "k8s.io/client-go/util/retry" "k8s.io/kubernetes/pkg/api/legacyscheme" api "k8s.io/kubernetes/pkg/apis/core" @@ -44,7 +45,9 @@ type Repair struct { portRange net.PortRange alloc rangeallocation.RangeRegistry leaks map[int]int // counter per leaked port - recorder record.EventRecorder + + broadcaster events.EventBroadcaster + recorder events.EventRecorder } // How many times we need to detect a leak before we clean up. This is to @@ -53,10 +56,9 @@ const numRepairsBeforeLeakCleanup = 3 // NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster // and generates informational warnings for a cluster that is not in sync. -func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")}) - recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: "portallocator-repair-controller"}) +func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient eventsv1client.EventsV1Interface, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair { + eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient}) + recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "portallocator-repair-controller") return &Repair{ interval: interval, @@ -64,17 +66,23 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter portRange: portRange, alloc: alloc, leaks: map[int]int{}, + broadcaster: eventBroadcaster, recorder: recorder, } } +// Start starts events recording. +func (c *Repair) Start(stopCh chan struct{}) { + c.broadcaster.StartRecordingToSink(stopCh) +} + // RunUntil starts the controller until the provided ch is closed. -func (c *Repair) RunUntil(ch chan struct{}) { +func (c *Repair) RunUntil(stopCh chan struct{}) { wait.Until(func() { if err := c.RunOnce(); err != nil { runtime.HandleError(err) } - }, c.interval, ch) + }, c.interval, stopCh) } // RunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs. @@ -143,24 +151,24 @@ func (c *Repair) runOnce() error { stored.Release(port) } else { // doesn't seem to be allocated - c.recorder.Eventf(svc, corev1.EventTypeWarning, "PortNotAllocated", "Port %d is not allocated; repairing", port) + c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortNotAllocated", "PortAllocation", "Port %d is not allocated; repairing", port) runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace)) } delete(c.leaks, port) // it is used, so it can't be leaked case portallocator.ErrAllocated: // port is duplicate, reallocate - c.recorder.Eventf(svc, corev1.EventTypeWarning, "PortAlreadyAllocated", "Port %d was assigned to multiple services; please recreate service", port) + c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortAlreadyAllocated", "PortAllocation", "Port %d was assigned to multiple services; please recreate service", port) runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace)) case err.(*portallocator.ErrNotInRange): // port is out of range, reallocate - c.recorder.Eventf(svc, corev1.EventTypeWarning, "PortOutOfRange", "Port %d is not within the port range %s; please recreate service", port, c.portRange) + c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortOutOfRange", "PortAllocation", "Port %d is not within the port range %s; please recreate service", port, c.portRange) runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %s; please recreate", port, svc.Name, svc.Namespace, c.portRange)) case portallocator.ErrFull: // somehow we are out of ports - c.recorder.Eventf(svc, corev1.EventTypeWarning, "PortRangeFull", "Port range %s is full; you must widen the port range in order to create new services", c.portRange) + c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortRangeFull", "PortAllocation", "Port range %s is full; you must widen the port range in order to create new services", c.portRange) return fmt.Errorf("the port range %s is full; you must widen the port range in order to create new services", c.portRange) default: - c.recorder.Eventf(svc, corev1.EventTypeWarning, "UnknownError", "Unable to allocate port %d due to an unknown error", port) + c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "UnknownError", "PortAllocation", "Unable to allocate port %d due to an unknown error", port) return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err) } } diff --git a/pkg/registry/core/service/portallocator/controller/repair_test.go b/pkg/registry/core/service/portallocator/controller/repair_test.go index f9d4bfc65aa..689c9066bdc 100644 --- a/pkg/registry/core/service/portallocator/controller/repair_test.go +++ b/pkg/registry/core/service/portallocator/controller/repair_test.go @@ -58,7 +58,7 @@ func TestRepair(t *testing.T) { item: &api.RangeAllocation{Range: "100-200"}, } pr, _ := net.ParsePortRange(registry.item.Range) - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), *pr, registry) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry) if err := r.RunOnce(); err != nil { t.Fatal(err) @@ -71,7 +71,7 @@ func TestRepair(t *testing.T) { item: &api.RangeAllocation{Range: "100-200"}, updateErr: fmt.Errorf("test error"), } - r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), *pr, registry) + r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry) if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } @@ -102,7 +102,7 @@ func TestRepairLeak(t *testing.T) { }, } - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), *pr, registry) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry) // Run through the "leak detection holdoff" loops. for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { if err := r.RunOnce(); err != nil { @@ -190,7 +190,7 @@ func TestRepairWithExisting(t *testing.T) { Data: dst.Data, }, } - r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), *pr, registry) + r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry) if err := r.RunOnce(); err != nil { t.Fatal(err) }