mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 10:20:51 +00:00
Migrate ipallocator and portallocator to new Events API
This commit is contained in:
parent
7e77252b0d
commit
f1d901861b
@ -33,6 +33,7 @@ import (
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/storage"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
||||
@ -55,7 +56,7 @@ const (
|
||||
type Controller struct {
|
||||
ServiceClient corev1client.ServicesGetter
|
||||
NamespaceClient corev1client.NamespacesGetter
|
||||
EventClient corev1client.EventsGetter
|
||||
EventClient eventsv1client.EventsV1Interface
|
||||
readyzClient rest.Interface
|
||||
|
||||
ServiceClusterIPRegistry rangeallocation.RangeRegistry
|
||||
@ -85,11 +86,12 @@ type Controller struct {
|
||||
PublicServicePort int
|
||||
KubernetesServiceNodePort int
|
||||
|
||||
stopCh chan struct{}
|
||||
runner *async.Runner
|
||||
}
|
||||
|
||||
// NewBootstrapController returns a controller for watching the core capabilities of the master
|
||||
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient corev1client.EventsGetter, readyzClient rest.Interface) (*Controller, error) {
|
||||
func (c *completedConfig) NewBootstrapController(legacyRESTStorage corerest.LegacyRESTStorage, serviceClient corev1client.ServicesGetter, nsClient corev1client.NamespacesGetter, eventClient eventsv1client.EventsV1Interface, readyzClient rest.Interface) (*Controller, error) {
|
||||
_, publicServicePort, err := c.GenericConfig.SecureServing.HostPort()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get listener address: %w", err)
|
||||
@ -172,6 +174,28 @@ func (c *Controller) Start() {
|
||||
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
|
||||
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
|
||||
|
||||
// We start both repairClusterIPs and repairNodePorts to catch events
|
||||
// coming from the RunOnce() calls to them.
|
||||
// TODO: Refactor both controllers to only expose a public RunUntil
|
||||
// method, but to accommodate the usecase below (of forcing the first
|
||||
// successful call before proceeding) let them signal on that, like:
|
||||
// func (c *Repair) RunUntil(stopCh chan struct{}, onFirstSuccess func())
|
||||
// and use it here like:
|
||||
// wg := sync.WaitGroup{}
|
||||
// wg.Add(2)
|
||||
// runRepairClusterIPs := func(stopCh chan struct{}) {
|
||||
// repairClusterIPs(stopCh, wg.Done)
|
||||
// }
|
||||
// runRepairNodePorts := func(stopCh chan struct{}) {
|
||||
// repairNodePorts(stopCh, wg.Done)
|
||||
// }
|
||||
// c.runner = ...
|
||||
// c.runner.Start()
|
||||
// wg.Wait()
|
||||
c.stopCh = make(chan struct{})
|
||||
repairClusterIPs.Start(c.stopCh)
|
||||
repairNodePorts.Start(c.stopCh)
|
||||
|
||||
// run all of the controllers once prior to returning from Start.
|
||||
if err := repairClusterIPs.RunOnce(); err != nil {
|
||||
// If we fail to repair cluster IPs apiserver is useless. We should restart and retry.
|
||||
@ -190,6 +214,7 @@ func (c *Controller) Start() {
|
||||
func (c *Controller) Stop() {
|
||||
if c.runner != nil {
|
||||
c.runner.Stop()
|
||||
close(c.stopCh)
|
||||
}
|
||||
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
|
||||
finishedReconciling := make(chan struct{})
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/client-go/kubernetes/fake"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
|
||||
"k8s.io/client-go/rest"
|
||||
core "k8s.io/client-go/testing"
|
||||
"k8s.io/kubernetes/pkg/controlplane/reconcilers"
|
||||
@ -1021,7 +1022,7 @@ func Test_completedConfig_NewBootstrapController(t *testing.T) {
|
||||
legacyRESTStorage corerest.LegacyRESTStorage
|
||||
serviceClient corev1client.ServicesGetter
|
||||
nsClient corev1client.NamespacesGetter
|
||||
eventClient corev1client.EventsGetter
|
||||
eventClient eventsv1client.EventsV1Interface
|
||||
readyzClient rest.Interface
|
||||
}
|
||||
tests := []struct {
|
||||
|
@ -70,6 +70,7 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
|
||||
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
|
||||
"k8s.io/component-helpers/apimachinery/lease"
|
||||
"k8s.io/klog/v2"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
@ -543,7 +544,8 @@ func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generi
|
||||
|
||||
controllerName := "bootstrap-controller"
|
||||
coreClient := corev1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, coreClient, coreClient.RESTClient())
|
||||
eventsClient := eventsv1client.NewForConfigOrDie(c.GenericConfig.LoopbackClientConfig)
|
||||
bootstrapController, err := c.NewBootstrapController(legacyRESTStorage, coreClient, coreClient, eventsClient, coreClient.RESTClient())
|
||||
if err != nil {
|
||||
return fmt.Errorf("error creating bootstrap controller: %v", err)
|
||||
}
|
||||
|
@ -28,7 +28,8 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
@ -61,7 +62,8 @@ type Repair struct {
|
||||
allocatorByFamily map[v1.IPFamily]rangeallocation.RangeRegistry // allocators we use, by their family
|
||||
|
||||
leaksByFamily map[v1.IPFamily]map[string]int // counter per leaked IP per family
|
||||
recorder record.EventRecorder
|
||||
broadcaster events.EventBroadcaster
|
||||
recorder events.EventRecorder
|
||||
}
|
||||
|
||||
// How many times we need to detect a leak before we clean up. This is to
|
||||
@ -70,10 +72,9 @@ const numRepairsBeforeLeakCleanup = 3
|
||||
|
||||
// NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
|
||||
// and generates informational warnings for a cluster that is not in sync.
|
||||
func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, network *net.IPNet, alloc rangeallocation.RangeRegistry, secondaryNetwork *net.IPNet, secondaryAlloc rangeallocation.RangeRegistry) *Repair {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: "ipallocator-repair-controller"})
|
||||
func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient eventsv1client.EventsV1Interface, network *net.IPNet, alloc rangeallocation.RangeRegistry, secondaryNetwork *net.IPNet, secondaryAlloc rangeallocation.RangeRegistry) *Repair {
|
||||
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient})
|
||||
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller")
|
||||
|
||||
// build *ByFamily struct members
|
||||
networkByFamily := make(map[v1.IPFamily]*net.IPNet)
|
||||
@ -107,17 +108,23 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter
|
||||
allocatorByFamily: allocatorByFamily,
|
||||
|
||||
leaksByFamily: leaksByFamily,
|
||||
broadcaster: eventBroadcaster,
|
||||
recorder: recorder,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts events recording.
|
||||
func (c *Repair) Start(stopCh chan struct{}) {
|
||||
c.broadcaster.StartRecordingToSink(stopCh)
|
||||
}
|
||||
|
||||
// RunUntil starts the controller until the provided ch is closed.
|
||||
func (c *Repair) RunUntil(ch chan struct{}) {
|
||||
func (c *Repair) RunUntil(stopCh chan struct{}) {
|
||||
wait.Until(func() {
|
||||
if err := c.RunOnce(); err != nil {
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}, c.interval, ch)
|
||||
}, c.interval, stopCh)
|
||||
}
|
||||
|
||||
// RunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
|
||||
@ -213,7 +220,7 @@ func (c *Repair) runOnce() error {
|
||||
ip := netutils.ParseIPSloppy(ip)
|
||||
if ip == nil {
|
||||
// cluster IP is corrupt
|
||||
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotValid", "Cluster IP %s is not a valid IP; please recreate service", ip)
|
||||
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate service", ip)
|
||||
runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", ip, svc.Name, svc.Namespace))
|
||||
continue
|
||||
}
|
||||
@ -221,7 +228,7 @@ func (c *Repair) runOnce() error {
|
||||
family := getFamilyByIP(ip)
|
||||
if _, ok := rebuiltByFamily[family]; !ok {
|
||||
// this service is using an IPFamily no longer configured on cluster
|
||||
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotValid", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family)
|
||||
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family)
|
||||
runtime.HandleError(fmt.Errorf("the cluster IP %s(%s) for service %s/%s is of ip family that is no longer configured on cluster; please recreate", ip, family, svc.Name, svc.Namespace))
|
||||
continue
|
||||
}
|
||||
@ -236,25 +243,25 @@ func (c *Repair) runOnce() error {
|
||||
actualStored.Release(ip)
|
||||
} else {
|
||||
// cluster IP doesn't seem to be allocated
|
||||
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPNotAllocated", "Cluster IP [%v]:%s is not allocated; repairing", family, ip)
|
||||
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s is not allocated; repairing", family, ip)
|
||||
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not allocated; repairing", family, ip, svc.Name, svc.Namespace))
|
||||
}
|
||||
delete(c.leaksByFamily[family], ip.String()) // it is used, so it can't be leaked
|
||||
case ipallocator.ErrAllocated:
|
||||
// cluster IP is duplicate
|
||||
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip)
|
||||
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip)
|
||||
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to multiple services; please recreate", family, ip, svc.Name, svc.Namespace))
|
||||
case err.(*ipallocator.ErrNotInRange):
|
||||
// cluster IP is out of range
|
||||
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ClusterIPOutOfRange", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family])
|
||||
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family])
|
||||
runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not within the service CIDR %s; please recreate", family, ip, svc.Name, svc.Namespace, c.networkByFamily[family]))
|
||||
case ipallocator.ErrFull:
|
||||
// somehow we are out of IPs
|
||||
cidr := actualAlloc.CIDR()
|
||||
c.recorder.Eventf(&svc, v1.EventTypeWarning, "ServiceCIDRFull", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
|
||||
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ServiceCIDRFull", "ClusterIPAllocation", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
|
||||
return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
|
||||
default:
|
||||
c.recorder.Eventf(&svc, v1.EventTypeWarning, "UnknownError", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip)
|
||||
c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip)
|
||||
return fmt.Errorf("unable to allocate cluster IP [%v]:%s for service %s/%s due to an unknown error, exiting: %v", family, ip, svc.Name, svc.Namespace, err)
|
||||
}
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ func TestRepair(t *testing.T) {
|
||||
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
|
||||
}
|
||||
_, cidr, _ := netutils.ParseCIDRSloppy(ipregistry.item.Range)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil)
|
||||
|
||||
if err := r.RunOnce(); err != nil {
|
||||
t.Fatal(err)
|
||||
@ -71,7 +71,7 @@ func TestRepair(t *testing.T) {
|
||||
item: &api.RangeAllocation{Range: "192.168.1.0/24"},
|
||||
updateErr: fmt.Errorf("test error"),
|
||||
}
|
||||
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil)
|
||||
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil)
|
||||
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -102,7 +102,7 @@ func TestRepairLeak(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil)
|
||||
// Run through the "leak detection holdoff" loops.
|
||||
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
|
||||
if err := r.RunOnce(); err != nil {
|
||||
@ -200,7 +200,7 @@ func TestRepairWithExisting(t *testing.T) {
|
||||
Data: dst.Data,
|
||||
},
|
||||
}
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, nil, nil)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil)
|
||||
if err := r.RunOnce(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -290,7 +290,7 @@ func TestShouldWorkOnSecondary(t *testing.T) {
|
||||
secondaryRegistry = makeRangeRegistry(t, tc.secondaryNet.String())
|
||||
}
|
||||
|
||||
repair := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), tc.primaryNet, primaryRegistry, tc.secondaryNet, secondaryRegistry)
|
||||
repair := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), tc.primaryNet, primaryRegistry, tc.secondaryNet, secondaryRegistry)
|
||||
if len(repair.allocatorByFamily) != len(tc.expectedFamilies) {
|
||||
t.Fatalf("expected to have allocator by family count:%v got %v", len(tc.expectedFamilies), len(repair.allocatorByFamily))
|
||||
}
|
||||
@ -334,7 +334,7 @@ func TestRepairDualStack(t *testing.T) {
|
||||
|
||||
_, cidr, _ := netutils.ParseCIDRSloppy(ipregistry.item.Range)
|
||||
_, secondaryCIDR, _ := netutils.ParseCIDRSloppy(secondaryIPRegistry.item.Range)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
|
||||
|
||||
if err := r.RunOnce(); err != nil {
|
||||
t.Fatal(err)
|
||||
@ -355,7 +355,7 @@ func TestRepairDualStack(t *testing.T) {
|
||||
updateErr: fmt.Errorf("test error"),
|
||||
}
|
||||
|
||||
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
|
||||
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
|
||||
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -410,7 +410,7 @@ func TestRepairLeakDualStack(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
|
||||
// Run through the "leak detection holdoff" loops.
|
||||
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
|
||||
if err := r.RunOnce(); err != nil {
|
||||
@ -596,7 +596,7 @@ func TestRepairWithExistingDualStack(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
|
||||
if err := r.RunOnce(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -29,7 +29,8 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
"k8s.io/client-go/tools/record"
|
||||
eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
|
||||
"k8s.io/client-go/tools/events"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/kubernetes/pkg/api/legacyscheme"
|
||||
api "k8s.io/kubernetes/pkg/apis/core"
|
||||
@ -44,7 +45,9 @@ type Repair struct {
|
||||
portRange net.PortRange
|
||||
alloc rangeallocation.RangeRegistry
|
||||
leaks map[int]int // counter per leaked port
|
||||
recorder record.EventRecorder
|
||||
|
||||
broadcaster events.EventBroadcaster
|
||||
recorder events.EventRecorder
|
||||
}
|
||||
|
||||
// How many times we need to detect a leak before we clean up. This is to
|
||||
@ -53,10 +56,9 @@ const numRepairsBeforeLeakCleanup = 3
|
||||
|
||||
// NewRepair creates a controller that periodically ensures that all ports are uniquely allocated across the cluster
|
||||
// and generates informational warnings for a cluster that is not in sync.
|
||||
func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient corev1client.EventsGetter, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: eventClient.Events("")})
|
||||
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, corev1.EventSource{Component: "portallocator-repair-controller"})
|
||||
func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient eventsv1client.EventsV1Interface, portRange net.PortRange, alloc rangeallocation.RangeRegistry) *Repair {
|
||||
eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient})
|
||||
recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "portallocator-repair-controller")
|
||||
|
||||
return &Repair{
|
||||
interval: interval,
|
||||
@ -64,17 +66,23 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter
|
||||
portRange: portRange,
|
||||
alloc: alloc,
|
||||
leaks: map[int]int{},
|
||||
broadcaster: eventBroadcaster,
|
||||
recorder: recorder,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts events recording.
|
||||
func (c *Repair) Start(stopCh chan struct{}) {
|
||||
c.broadcaster.StartRecordingToSink(stopCh)
|
||||
}
|
||||
|
||||
// RunUntil starts the controller until the provided ch is closed.
|
||||
func (c *Repair) RunUntil(ch chan struct{}) {
|
||||
func (c *Repair) RunUntil(stopCh chan struct{}) {
|
||||
wait.Until(func() {
|
||||
if err := c.RunOnce(); err != nil {
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
}, c.interval, ch)
|
||||
}, c.interval, stopCh)
|
||||
}
|
||||
|
||||
// RunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
|
||||
@ -143,24 +151,24 @@ func (c *Repair) runOnce() error {
|
||||
stored.Release(port)
|
||||
} else {
|
||||
// doesn't seem to be allocated
|
||||
c.recorder.Eventf(svc, corev1.EventTypeWarning, "PortNotAllocated", "Port %d is not allocated; repairing", port)
|
||||
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortNotAllocated", "PortAllocation", "Port %d is not allocated; repairing", port)
|
||||
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s is not allocated; repairing", port, svc.Name, svc.Namespace))
|
||||
}
|
||||
delete(c.leaks, port) // it is used, so it can't be leaked
|
||||
case portallocator.ErrAllocated:
|
||||
// port is duplicate, reallocate
|
||||
c.recorder.Eventf(svc, corev1.EventTypeWarning, "PortAlreadyAllocated", "Port %d was assigned to multiple services; please recreate service", port)
|
||||
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortAlreadyAllocated", "PortAllocation", "Port %d was assigned to multiple services; please recreate service", port)
|
||||
runtime.HandleError(fmt.Errorf("the node port %d for service %s/%s was assigned to multiple services; please recreate", port, svc.Name, svc.Namespace))
|
||||
case err.(*portallocator.ErrNotInRange):
|
||||
// port is out of range, reallocate
|
||||
c.recorder.Eventf(svc, corev1.EventTypeWarning, "PortOutOfRange", "Port %d is not within the port range %s; please recreate service", port, c.portRange)
|
||||
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortOutOfRange", "PortAllocation", "Port %d is not within the port range %s; please recreate service", port, c.portRange)
|
||||
runtime.HandleError(fmt.Errorf("the port %d for service %s/%s is not within the port range %s; please recreate", port, svc.Name, svc.Namespace, c.portRange))
|
||||
case portallocator.ErrFull:
|
||||
// somehow we are out of ports
|
||||
c.recorder.Eventf(svc, corev1.EventTypeWarning, "PortRangeFull", "Port range %s is full; you must widen the port range in order to create new services", c.portRange)
|
||||
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "PortRangeFull", "PortAllocation", "Port range %s is full; you must widen the port range in order to create new services", c.portRange)
|
||||
return fmt.Errorf("the port range %s is full; you must widen the port range in order to create new services", c.portRange)
|
||||
default:
|
||||
c.recorder.Eventf(svc, corev1.EventTypeWarning, "UnknownError", "Unable to allocate port %d due to an unknown error", port)
|
||||
c.recorder.Eventf(svc, nil, corev1.EventTypeWarning, "UnknownError", "PortAllocation", "Unable to allocate port %d due to an unknown error", port)
|
||||
return fmt.Errorf("unable to allocate port %d for service %s/%s due to an unknown error, exiting: %v", port, svc.Name, svc.Namespace, err)
|
||||
}
|
||||
}
|
||||
|
@ -58,7 +58,7 @@ func TestRepair(t *testing.T) {
|
||||
item: &api.RangeAllocation{Range: "100-200"},
|
||||
}
|
||||
pr, _ := net.ParsePortRange(registry.item.Range)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), *pr, registry)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry)
|
||||
|
||||
if err := r.RunOnce(); err != nil {
|
||||
t.Fatal(err)
|
||||
@ -71,7 +71,7 @@ func TestRepair(t *testing.T) {
|
||||
item: &api.RangeAllocation{Range: "100-200"},
|
||||
updateErr: fmt.Errorf("test error"),
|
||||
}
|
||||
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), *pr, registry)
|
||||
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry)
|
||||
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -102,7 +102,7 @@ func TestRepairLeak(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), *pr, registry)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry)
|
||||
// Run through the "leak detection holdoff" loops.
|
||||
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
|
||||
if err := r.RunOnce(); err != nil {
|
||||
@ -190,7 +190,7 @@ func TestRepairWithExisting(t *testing.T) {
|
||||
Data: dst.Data,
|
||||
},
|
||||
}
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.CoreV1(), *pr, registry)
|
||||
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry)
|
||||
if err := r.RunOnce(); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user