Merge pull request #109975 from wojtek-t/cleanup_repair_controllers

Cleanup portallocator/ipallocator interfaces
This commit is contained in:
Kubernetes Prow Robot 2022-05-23 06:08:01 -07:00 committed by GitHub
commit cfd69463de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 80 additions and 76 deletions

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net"
"net/http"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
@ -86,7 +87,6 @@ type Controller struct {
PublicServicePort int
KubernetesServiceNodePort int
stopCh chan struct{}
runner *async.Runner
}
@ -174,47 +174,47 @@ func (c *Controller) Start() {
repairClusterIPs := servicecontroller.NewRepair(c.ServiceClusterIPInterval, c.ServiceClient, c.EventClient, &c.ServiceClusterIPRange, c.ServiceClusterIPRegistry, &c.SecondaryServiceClusterIPRange, c.SecondaryServiceClusterIPRegistry)
repairNodePorts := portallocatorcontroller.NewRepair(c.ServiceNodePortInterval, c.ServiceClient, c.EventClient, c.ServiceNodePortRange, c.ServiceNodePortRegistry)
// We start both repairClusterIPs and repairNodePorts to catch events
// coming from the RunOnce() calls to them.
// TODO: Refactor both controllers to only expose a public RunUntil
// method, but to accommodate the usecase below (of forcing the first
// successful call before proceeding) let them signal on that, like:
// func (c *Repair) RunUntil(stopCh chan struct{}, onFirstSuccess func())
// and use it here like:
// wg := sync.WaitGroup{}
// wg.Add(2)
// runRepairClusterIPs := func(stopCh chan struct{}) {
// repairClusterIPs(stopCh, wg.Done)
// }
// runRepairNodePorts := func(stopCh chan struct{}) {
// repairNodePorts(stopCh, wg.Done)
// }
// c.runner = ...
// c.runner.Start()
// wg.Wait()
c.stopCh = make(chan struct{})
repairClusterIPs.Start(c.stopCh)
repairNodePorts.Start(c.stopCh)
// We start both repairClusterIPs and repairNodePorts to ensure repair
// loops of ClusterIPs and NodePorts.
// We run both repair loops using RunUntil public interface.
// However, we want to fail liveness/readiness until the first
// successful repair loop, so we basically pass appropriate
// callbacks to RunUtil methods.
// Additionally, we ensure that we don't wait for it for longer
// than 1 minute for backward compatibility of failing the whole
// apiserver if we can't repair them.
wg := sync.WaitGroup{}
wg.Add(2)
// run all of the controllers once prior to returning from Start.
if err := repairClusterIPs.RunOnce(); err != nil {
// If we fail to repair cluster IPs apiserver is useless. We should restart and retry.
klog.Fatalf("Unable to perform initial IP allocation check: %v", err)
runRepairClusterIPs := func(stopCh chan struct{}) {
repairClusterIPs.RunUntil(wg.Done, stopCh)
}
if err := repairNodePorts.RunOnce(); err != nil {
// If we fail to repair node ports apiserver is useless. We should restart and retry.
klog.Fatalf("Unable to perform initial service nodePort check: %v", err)
runRepairNodePorts := func(stopCh chan struct{}) {
repairNodePorts.RunUntil(wg.Done, stopCh)
}
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, repairClusterIPs.RunUntil, repairNodePorts.RunUntil)
c.runner = async.NewRunner(c.RunKubernetesNamespaces, c.RunKubernetesService, runRepairClusterIPs, runRepairNodePorts)
c.runner.Start()
// For backward compatibility, we ensure that if we never are able
// to repair clusterIPs and/or nodeports, we not only fail the liveness
// and/or readiness, but also explicitly fail.
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()
select {
case <-done:
case <-time.After(time.Minute):
klog.Fatalf("Unable to perform initial IP and Port allocation check")
}
}
// Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly.
func (c *Controller) Stop() {
if c.runner != nil {
c.runner.Stop()
close(c.stopCh)
}
endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https", c.ExtraEndpointPorts)
finishedReconciling := make(chan struct{})

View File

@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net"
"sync"
"time"
v1 "k8s.io/api/core/v1"
@ -113,27 +114,28 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter
}
}
// Start starts events recording.
func (c *Repair) Start(stopCh chan struct{}) {
c.broadcaster.StartRecordingToSink(stopCh)
}
// RunUntil starts the controller until the provided ch is closed.
func (c *Repair) RunUntil(stopCh chan struct{}) {
wait.Until(func() {
if err := c.RunOnce(); err != nil {
runtime.HandleError(err)
}
}, c.interval, stopCh)
}
func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
c.broadcaster.StartRecordingToSink(stopCh)
defer c.broadcaster.Shutdown()
// RunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) RunOnce() error {
return retry.RetryOnConflict(retry.DefaultBackoff, c.runOnce)
var once sync.Once
wait.Until(func() {
if err := c.runOnce(); err != nil {
runtime.HandleError(err)
return
}
once.Do(onFirstSuccess)
}, c.interval, stopCh)
}
// runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) runOnce() error {
return retry.RetryOnConflict(retry.DefaultBackoff, c.doRunOnce)
}
// doRunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) doRunOnce() error {
// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
// or if they are executed against different leaders,
// the ordering guarantee required to ensure no IP is allocated twice is violated.

View File

@ -60,7 +60,7 @@ func TestRepair(t *testing.T) {
_, cidr, _ := netutils.ParseCIDRSloppy(ipregistry.item.Range)
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil)
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
if !ipregistry.updateCalled || ipregistry.updated == nil || ipregistry.updated.Range != cidr.String() || ipregistry.updated != ipregistry.item {
@ -72,7 +72,7 @@ func TestRepair(t *testing.T) {
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
}
@ -105,7 +105,7 @@ func TestRepairLeak(t *testing.T) {
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
@ -117,7 +117,7 @@ func TestRepairLeak(t *testing.T) {
}
}
// Run one more time to actually remove the leak.
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
@ -201,7 +201,7 @@ func TestRepairWithExisting(t *testing.T) {
},
}
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, nil, nil)
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
@ -336,7 +336,7 @@ func TestRepairDualStack(t *testing.T) {
_, secondaryCIDR, _ := netutils.ParseCIDRSloppy(secondaryIPRegistry.item.Range)
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
if !ipregistry.updateCalled || ipregistry.updated == nil || ipregistry.updated.Range != cidr.String() || ipregistry.updated != ipregistry.item {
@ -356,7 +356,7 @@ func TestRepairDualStack(t *testing.T) {
}
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
}
@ -413,7 +413,7 @@ func TestRepairLeakDualStack(t *testing.T) {
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)
@ -432,7 +432,7 @@ func TestRepairLeakDualStack(t *testing.T) {
}
}
// Run one more time to actually remove the leak.
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
@ -597,7 +597,7 @@ func TestRepairWithExistingDualStack(t *testing.T) {
}
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), cidr, ipregistry, secondaryCIDR, secondaryIPRegistry)
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
after, err := ipallocator.NewFromSnapshot(ipregistry.updated)

View File

@ -19,6 +19,7 @@ package controller
import (
"context"
"fmt"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
@ -71,27 +72,28 @@ func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter
}
}
// Start starts events recording.
func (c *Repair) Start(stopCh chan struct{}) {
c.broadcaster.StartRecordingToSink(stopCh)
}
// RunUntil starts the controller until the provided ch is closed.
func (c *Repair) RunUntil(stopCh chan struct{}) {
wait.Until(func() {
if err := c.RunOnce(); err != nil {
runtime.HandleError(err)
}
}, c.interval, stopCh)
}
func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
c.broadcaster.StartRecordingToSink(stopCh)
defer c.broadcaster.Shutdown()
// RunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) RunOnce() error {
return retry.RetryOnConflict(retry.DefaultBackoff, c.runOnce)
var once sync.Once
wait.Until(func() {
if err := c.runOnce(); err != nil {
runtime.HandleError(err)
return
}
once.Do(onFirstSuccess)
}, c.interval, stopCh)
}
// runOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) runOnce() error {
return retry.RetryOnConflict(retry.DefaultBackoff, c.doRunOnce)
}
// doRunOnce verifies the state of the port allocations and returns an error if an unrecoverable problem occurs.
func (c *Repair) doRunOnce() error {
// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
// or if they are executed against different leaders,
// the ordering guarantee required to ensure no port is allocated twice is violated.

View File

@ -60,7 +60,7 @@ func TestRepair(t *testing.T) {
pr, _ := net.ParsePortRange(registry.item.Range)
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry)
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
if !registry.updateCalled || registry.updated == nil || registry.updated.Range != pr.String() || registry.updated != registry.item {
@ -72,7 +72,7 @@ func TestRepair(t *testing.T) {
updateErr: fmt.Errorf("test error"),
}
r = NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry)
if err := r.RunOnce(); !strings.Contains(err.Error(), ": test error") {
if err := r.runOnce(); !strings.Contains(err.Error(), ": test error") {
t.Fatal(err)
}
}
@ -105,7 +105,7 @@ func TestRepairLeak(t *testing.T) {
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry)
// Run through the "leak detection holdoff" loops.
for i := 0; i < (numRepairsBeforeLeakCleanup - 1); i++ {
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
after, err := portallocator.NewFromSnapshot(registry.updated)
@ -117,7 +117,7 @@ func TestRepairLeak(t *testing.T) {
}
}
// Run one more time to actually remove the leak.
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
after, err := portallocator.NewFromSnapshot(registry.updated)
@ -191,7 +191,7 @@ func TestRepairWithExisting(t *testing.T) {
},
}
r := NewRepair(0, fakeClient.CoreV1(), fakeClient.EventsV1(), *pr, registry)
if err := r.RunOnce(); err != nil {
if err := r.runOnce(); err != nil {
t.Fatal(err)
}
after, err := portallocator.NewFromSnapshot(registry.updated)