mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
aggregator: wait for complete proxy handler
This commit is contained in:
parent
84dc704679
commit
e77ef0e649
@ -235,7 +235,13 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
|
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-registration-controller", func(context genericapiserver.PostStartHookContext) error {
|
||||||
go apiserviceRegistrationController.Run(context.StopCh)
|
handlerSyncedCh := make(chan struct{})
|
||||||
|
go apiserviceRegistrationController.Run(context.StopCh, handlerSyncedCh)
|
||||||
|
select {
|
||||||
|
case <-context.StopCh:
|
||||||
|
case <-handlerSyncedCh:
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
|
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
|
||||||
|
@ -20,15 +20,15 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog"
|
|
||||||
|
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
|
"k8s.io/klog"
|
||||||
|
|
||||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
|
||||||
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
|
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
|
||||||
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
|
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
|
||||||
"k8s.io/kube-aggregator/pkg/controllers"
|
"k8s.io/kube-aggregator/pkg/controllers"
|
||||||
@ -87,7 +87,7 @@ func (c *APIServiceRegistrationController) sync(key string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run starts APIServiceRegistrationController which will process all registration requests until stopCh is closed.
|
// Run starts APIServiceRegistrationController which will process all registration requests until stopCh is closed.
|
||||||
func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {
|
func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}, handlerSyncedCh chan<- struct{}) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
defer c.queue.ShutDown()
|
defer c.queue.ShutDown()
|
||||||
|
|
||||||
@ -98,6 +98,28 @@ func (c *APIServiceRegistrationController) Run(stopCh <-chan struct{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// initially sync all APIServices to make sure the proxy handler is complete
|
||||||
|
if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
|
||||||
|
services, err := c.apiServiceLister.List(labels.Everything())
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("failed to initially list APIServices: %v", err))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
for _, s := range services {
|
||||||
|
if err := c.apiHandlerManager.AddAPIService(s); err != nil {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("failed to initially sync APIService %s: %v", s.Name, err))
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}, stopCh); err == wait.ErrWaitTimeout {
|
||||||
|
utilruntime.HandleError(fmt.Errorf("timed out waiting for proxy handler to initialize"))
|
||||||
|
return
|
||||||
|
} else if err != nil {
|
||||||
|
panic(fmt.Errorf("unexpected error: %v", err))
|
||||||
|
}
|
||||||
|
close(handlerSyncedCh)
|
||||||
|
|
||||||
// only start one worker thread since its a slow moving API and the aggregation server adding bits
|
// only start one worker thread since its a slow moving API and the aggregation server adding bits
|
||||||
// aren't threadsafe
|
// aren't threadsafe
|
||||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||||
|
Loading…
Reference in New Issue
Block a user