From e77ef0e64919b6fbea78dacef94a584da2c04014 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Mon, 16 Mar 2020 11:55:41 +0100 Subject: [PATCH] aggregator: wait for complete proxy handler --- .../pkg/apiserver/apiserver.go | 8 ++++- .../pkg/apiserver/apiservice_controller.go | 30 ++++++++++++++++--- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 02c887c2a51..9e38d0412d1 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -235,7 +235,13 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil }) s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error { - go apiserviceRegistrationController.Run(context.StopCh) + handlerSyncedCh := make(chan struct{}) + go apiserviceRegistrationController.Run(context.StopCh, handlerSyncedCh) + select { + case <-context.StopCh: + case <-handlerSyncedCh: + } + return nil }) s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go index 5167134b8b1..8caa8ac69d8 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiservice_controller.go @@ -20,15 +20,15 @@ import ( "fmt" "time" - "k8s.io/klog" - apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/klog" - "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" + v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1" "k8s.io/kube-aggregator/pkg/controllers" @@ -87,7 +87,7 @@ func (c *APIServiceRegistrationController) sync(key string) error { } // Run starts APIServiceRegistrationController which will process all registration requests until stopCh is closed. -func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) { +func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}, handlerSyncedCh chan<- struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() @@ -98,6 +98,28 @@ func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) { return } + /// initially sync all APIServices to make sure the proxy handler is complete + if err := wait.PollImmediateUntil(time.Second, func() (bool, error) { + services, err := c.apiServiceLister.List(labels.Everything()) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to initially list APIServices: %v", err)) + return false, nil + } + for _, s := range services { + if err := c.apiHandlerManager.AddAPIService(s); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to initially sync APIService %s: %v", s.Name, err)) + return false, nil + } + } + return true, nil + }, stopCh); err == wait.ErrWaitTimeout { + utilruntime.HandleError(fmt.Errorf("timed out waiting for proxy handler to initialize")) + return + } else if err != nil { + panic(fmt.Errorf("unexpected error: %v", err)) + } + close(handlerSyncedCh) + // only start one worker thread since its a slow moving API and the aggregation server adding bits // aren't threadsafe go wait.Until(c.runWorker, time.Second, stopCh)