diff --git a/pkg/controlplane/controller.go b/pkg/controlplane/controller.go index 408257ed686..b0aa57c78d7 100644 --- a/pkg/controlplane/controller.go +++ b/pkg/controlplane/controller.go @@ -21,6 +21,7 @@ import ( "fmt" "net" "net/http" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -86,7 +87,6 @@ type Controller struct { PublicServicePort int KubernetesServiceNodePort int - stopCh chan struct{} runner *async.Runner } @@ -174,47 +174,47 @@ func (c *Controller) Start() { repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry) repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry) - // We start both repairClusterIPs and repairNodePorts to catch events - // coming from the RunOnce() calls to them. - // TODO: Refactor both controllers to only expose a public RunUntil - // method, but to accommodate the usecase below (of forcing the first - // successful call before proceeding) let them signal on that, like: - // func (c *Repair) RunUntil(stopCh chan struct{}, onFirstSuccess func()) - // and use it here like: - // wg := sync.WaitGroup{} - // wg.Add(2) - // runRepairClusterIPs := func(stopCh chan struct{}) { - // repairClusterIPs(stopCh, wg.Done) - // } - // runRepairNodePorts := func(stopCh chan struct{}) { - // repairNodePorts(stopCh, wg.Done) - // } - // c.runner = ... - // c.runner.Start() - // wg.Wait() - c.stopCh = make(chan struct{}) - repairClusterIPs.Start(c.stopCh) - repairNodePorts.Start(c.stopCh) + // We start both repairClusterIPs and repairNodePorts to ensure repair + // loops of ClusterIPs and NodePorts. + // We run both repair loops using RunUntil public interface. + // However, we want to fail liveness/readiness until the first + // successful repair loop, so we basically pass appropriate + // callbacks to RunUtil methods. + // Additionally, we ensure that we don't wait for it for longer + // than 1 minute for backward compatibility of failing the whole + // apiserver if we can't repair them. + wg := sync.WaitGroup{} + wg.Add(2) - // run all of the controllers once prior to returning from Start. - if err := repairClusterIPs.RunOnce(); err != nil { - // If we fail to repair cluster IPs apiserver is useless. We should restart and retry. - klog.Fatalf("Unable to perform initial IP allocation check: %v", err) + runRepairClusterIPs := func(stopCh chan struct{}) { + repairClusterIPs.RunUntil(wg.Done, stopCh) } - if err := repairNodePorts.RunOnce(); err != nil { - // If we fail to repair node ports apiserver is useless. We should restart and retry. - klog.Fatalf("Unable to perform initial service nodePort check: %v", err) + runRepairNodePorts := func(stopCh chan struct{}) { + repairNodePorts.RunUntil(wg.Done, stopCh) } - c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil) + c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts) c.runner.Start() + + // For backward compatibility, we ensure that if we never are able + // to repair clusterIPs and/or nodeports, we not only fail the liveness + // and/or readiness, but also explicitly fail. + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() + select { + case <-done: + case <-time.After(time.Minute): + klog.Fatalf("Unable to perform initial IP and Port allocation check") + } } // Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly. func (c *Controller) Stop() { if c.runner != nil { c.runner.Stop() - close(c.stopCh) } endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts) finishedReconciling := make(chan struct{}) diff --git a/pkg/registry/core/service/ipallocator/controller/repair.go b/pkg/registry/core/service/ipallocator/controller/repair.go index d3bd7f1833c..f568e4fdb59 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair.go +++ b/pkg/registry/core/service/ipallocator/controller/repair.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net" + "sync" "time" v1 "k8s.io/api/core/v1" @@ -113,27 +114,28 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter } } -// Start starts events recording. -func (c *Repair) Start(stopCh chan struct{}) { - c.broadcaster.StartRecordingToSink(stopCh) -} - // RunUntil starts the controller until the provided ch is closed. -func (c *Repair) RunUntil(stopCh chan struct{}) { - wait.Until(func() { - if err := c.RunOnce(); err != nil { - runtime.HandleError(err) - } - }, c.interval, stopCh) -} +func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) { + c.broadcaster.StartRecordingToSink(stopCh) + defer c.broadcaster.Shutdown() -// RunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs. -func (c *Repair) RunOnce() error { - return retry.RetryOnConflict(retry.DefaultBackoff, c.runOnce) + var once sync.Once + wait.Until(func() { + if err := c.runOnce(); err != nil { + runtime.HandleError(err) + return + } + once.Do(onFirstSuccess) + }, c.interval, stopCh) } // runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs. func (c *Repair) runOnce() error { + return retry.RetryOnConflict(retry.DefaultBackoff, c.doRunOnce) +} + +// doRunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs. +func (c *Repair) doRunOnce() error { // TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read, // or if they are executed against different leaders, // the ordering guarantee required to ensure no IP is allocated twice is violated. diff --git a/pkg/registry/core/service/ipallocator/controller/repair_test.go b/pkg/registry/core/service/ipallocator/controller/repair_test.go index ceb8cd20f3e..f2cf9bb098f 100644 --- a/pkg/registry/core/service/ipallocator/controller/repair_test.go +++ b/pkg/registry/core/service/ipallocator/controller/repair_test.go @@ -60,7 +60,7 @@ func TestRepair(t *testing.T) { _, cidr, _ := netutils.ParseCIDRSloppy(ipregistry.item.Range) r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil) - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } if !ipregistry.updateCalled || ipregistry.updated == nil || ipregistry.updated.Range != cidr.String() || ipregistry.updated != ipregistry.item { @@ -72,7 +72,7 @@ func TestRepair(t *testing.T) { updateErr: fmt.Errorf("test error"), } r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil) - if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { + if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } } @@ -105,7 +105,7 @@ func TestRepairLeak(t *testing.T) { r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil) // Run through the "leak detection holdoff" loops. for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } after, err := ipallocator.NewFromSnapshot(ipregistry.updated) @@ -117,7 +117,7 @@ func TestRepairLeak(t *testing.T) { } } // Run one more time to actually remove the leak. - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } after, err := ipallocator.NewFromSnapshot(ipregistry.updated) @@ -201,7 +201,7 @@ func TestRepairWithExisting(t *testing.T) { }, } r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil) - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } after, err := ipallocator.NewFromSnapshot(ipregistry.updated) @@ -336,7 +336,7 @@ func TestRepairDualStack(t *testing.T) { _, secondaryCIDR, _ := netutils.ParseCIDRSloppy(secondaryIPRegistry.item.Range) r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } if !ipregistry.updateCalled || ipregistry.updated == nil || ipregistry.updated.Range != cidr.String() || ipregistry.updated != ipregistry.item { @@ -356,7 +356,7 @@ func TestRepairDualStack(t *testing.T) { } r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) - if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { + if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } } @@ -413,7 +413,7 @@ func TestRepairLeakDualStack(t *testing.T) { r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) // Run through the "leak detection holdoff" loops. for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } after, err := ipallocator.NewFromSnapshot(ipregistry.updated) @@ -432,7 +432,7 @@ func TestRepairLeakDualStack(t *testing.T) { } } // Run one more time to actually remove the leak. - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } @@ -597,7 +597,7 @@ func TestRepairWithExistingDualStack(t *testing.T) { } r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry) - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } after, err := ipallocator.NewFromSnapshot(ipregistry.updated) diff --git a/pkg/registry/core/service/portallocator/controller/repair.go b/pkg/registry/core/service/portallocator/controller/repair.go index ab6abcfdcd4..e75d9f9a67a 100644 --- a/pkg/registry/core/service/portallocator/controller/repair.go +++ b/pkg/registry/core/service/portallocator/controller/repair.go @@ -19,6 +19,7 @@ package controller import ( "context" "fmt" + "sync" "time" corev1 "k8s.io/api/core/v1" @@ -71,27 +72,28 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter } } -// Start starts events recording. -func (c *Repair) Start(stopCh chan struct{}) { - c.broadcaster.StartRecordingToSink(stopCh) -} - // RunUntil starts the controller until the provided ch is closed. -func (c *Repair) RunUntil(stopCh chan struct{}) { - wait.Until(func() { - if err := c.RunOnce(); err != nil { - runtime.HandleError(err) - } - }, c.interval, stopCh) -} +func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) { + c.broadcaster.StartRecordingToSink(stopCh) + defer c.broadcaster.Shutdown() -// RunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs. -func (c *Repair) RunOnce() error { - return retry.RetryOnConflict(retry.DefaultBackoff, c.runOnce) + var once sync.Once + wait.Until(func() { + if err := c.runOnce(); err != nil { + runtime.HandleError(err) + return + } + once.Do(onFirstSuccess) + }, c.interval, stopCh) } // runOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs. func (c *Repair) runOnce() error { + return retry.RetryOnConflict(retry.DefaultBackoff, c.doRunOnce) +} + +// doRunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs. +func (c *Repair) doRunOnce() error { // TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read, // or if they are executed against different leaders, // the ordering guarantee required to ensure no port is allocated twice is violated. diff --git a/pkg/registry/core/service/portallocator/controller/repair_test.go b/pkg/registry/core/service/portallocator/controller/repair_test.go index 689c9066bdc..9869d32fadf 100644 --- a/pkg/registry/core/service/portallocator/controller/repair_test.go +++ b/pkg/registry/core/service/portallocator/controller/repair_test.go @@ -60,7 +60,7 @@ func TestRepair(t *testing.T) { pr, _ := net.ParsePortRange(registry.item.Range) r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry) - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } if !registry.updateCalled || registry.updated == nil || registry.updated.Range != pr.String() || registry.updated != registry.item { @@ -72,7 +72,7 @@ func TestRepair(t *testing.T) { updateErr: fmt.Errorf("test error"), } r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry) - if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") { + if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") { t.Fatal(err) } } @@ -105,7 +105,7 @@ func TestRepairLeak(t *testing.T) { r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry) // Run through the "leak detection holdoff" loops. for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ { - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } after, err := portallocator.NewFromSnapshot(registry.updated) @@ -117,7 +117,7 @@ func TestRepairLeak(t *testing.T) { } } // Run one more time to actually remove the leak. - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } after, err := portallocator.NewFromSnapshot(registry.updated) @@ -191,7 +191,7 @@ func TestRepairWithExisting(t *testing.T) { }, } r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry) - if err := r.RunOnce(); err != nil { + if err := r.runOnce(); err != nil { t.Fatal(err) } after, err := portallocator.NewFromSnapshot(registry.updated)