API Server Changes

This commit includes all the changes needed for APIServer. Instead of modifying the existing signatures for the methods which either generate or return stopChannel, we generate a context from the channel and use the generated context to be passed to the controllers which are started in APIServer. This ensures we don't have to touch APIServer dependencies.
This commit is contained in:
Ravi Gudimetla 2022-03-07 09:20:45 -05:00
parent e777f72163
commit 8b84a793b3
14 changed files with 122 additions and 66 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package app
import (
"context"
"errors"
"fmt"
"reflect"
@ -95,8 +96,18 @@ func BuildAuthn(client authenticationclient.AuthenticationV1Interface, authn kub
}
return authenticator, func(stopCh <-chan struct{}) {
// generate a context from stopCh. This is to avoid modifying files which are relying on this method
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-stopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
if dynamicCAContentFromFile != nil {
go dynamicCAContentFromFile.Run(1, stopCh)
go dynamicCAContentFromFile.Run(ctx, 1)
}
}, err
}

View File

@ -432,7 +432,7 @@ func (c *Controller) Enqueue() {
}
// Run the controller until stopped.
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
func (c *Controller) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown()
@ -441,25 +441,25 @@ func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer klog.Infof("Shutting down cluster_authentication_trust_controller controller")
// we have a personal informer that is narrowly scoped, start it.
go c.kubeSystemConfigMapInformer.Run(stopCh)
go c.kubeSystemConfigMapInformer.Run(ctx.Done())
// wait for your secondary caches to fill before starting your work
if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", stopCh, c.preRunCaches...) {
if !cache.WaitForNamedCacheSync("cluster_authentication_trust_controller", ctx.Done(), c.preRunCaches...) {
return
}
// only run one worker
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
// checks are cheap. run once a minute just to be sure we stay in sync in case fsnotify fails again
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
_ = wait.PollImmediateUntil(1*time.Minute, func() (bool, error) {
c.queue.Add(keyFn())
return false, nil
}, stopCh)
}, ctx.Done())
// wait until we're told to stop
<-stopCh
<-ctx.Done()
}
func (c *Controller) runWorker() {

View File

@ -452,29 +452,40 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
}
controller := clusterauthenticationtrust.NewClusterAuthenticationTrustController(m.ClusterAuthenticationInfo, kubeClient)
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-hookContext.StopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
// prime values and start listeners
if m.ClusterAuthenticationInfo.ClientCA != nil {
m.ClusterAuthenticationInfo.ClientCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(1, hookContext.StopCh)
go controller.Run(ctx, 1)
}
}
if m.ClusterAuthenticationInfo.RequestHeaderCA != nil {
m.ClusterAuthenticationInfo.RequestHeaderCA.AddListener(controller)
if controller, ok := m.ClusterAuthenticationInfo.RequestHeaderCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to be sure that we have a value.
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
runtime.HandleError(err)
}
go controller.Run(1, hookContext.StopCh)
go controller.Run(ctx, 1)
}
}
go controller.Run(1, hookContext.StopCh)
go controller.Run(ctx, 1)
return nil
})

View File

@ -162,29 +162,29 @@ func (c *RequestHeaderAuthRequestController) AllowedClientNames() []string {
}
// Run starts RequestHeaderAuthRequestController controller and blocks until stopCh is closed.
func (c *RequestHeaderAuthRequestController) Run(workers int, stopCh <-chan struct{}) {
func (c *RequestHeaderAuthRequestController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Infof("Starting %s", c.name)
defer klog.Infof("Shutting down %s", c.name)
go c.configmapInformer.Run(stopCh)
go c.configmapInformer.Run(ctx.Done())
// wait for caches to fill before starting your work
if !cache.WaitForNamedCacheSync(c.name, stopCh, c.configmapInformerSynced) {
if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.configmapInformerSynced) {
return
}
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
<-stopCh
<-ctx.Done()
}
// // RunOnce runs a single sync loop
func (c *RequestHeaderAuthRequestController) RunOnce() error {
configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(context.TODO(), c.configmapName, metav1.GetOptions{})
func (c *RequestHeaderAuthRequestController) RunOnce(ctx context.Context) error {
configMap, err := c.client.CoreV1().ConfigMaps(c.configmapNamespace).Get(ctx, c.configmapName, metav1.GetOptions{})
switch {
case errors.IsNotFound(err):
// ignore, authConfigMap is nil now

View File

@ -17,6 +17,7 @@ limitations under the License.
package headerrequest
import (
"context"
"encoding/json"
"k8s.io/apimachinery/pkg/api/equality"
"testing"
@ -221,7 +222,8 @@ func TestRequestHeaderAuthRequestControllerSyncOnce(t *testing.T) {
target.client = fakeKubeClient
// act
err := target.RunOnce()
ctx := context.TODO()
err := target.RunOnce(ctx)
if err != nil && !scenario.expectErr {
t.Errorf("got unexpected error %v", err)

View File

@ -18,6 +18,7 @@ package dynamiccertificates
import (
"bytes"
"context"
"crypto/x509"
"fmt"
"sync/atomic"
@ -186,7 +187,7 @@ func (c *ConfigMapCAController) hasCAChanged(caBundle []byte) bool {
}
// RunOnce runs a single sync loop
func (c *ConfigMapCAController) RunOnce() error {
func (c *ConfigMapCAController) RunOnce(ctx context.Context) error {
// Ignore the error when running once because when using a dynamically loaded ca file, because we think it's better to have nothing for
// a brief time than completely crash. If crashing is necessary, higher order logic like a healthcheck and cause failures.
_ = c.loadCABundle()
@ -194,7 +195,7 @@ func (c *ConfigMapCAController) RunOnce() error {
}
// Run starts the kube-apiserver and blocks until stopCh is closed.
func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) {
func (c *ConfigMapCAController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
@ -202,23 +203,23 @@ func (c *ConfigMapCAController) Run(workers int, stopCh <-chan struct{}) {
defer klog.InfoS("Shutting down controller", "name", c.name)
// we have a personal informer that is narrowly scoped, start it.
go c.configMapInformer.Run(stopCh)
go c.configMapInformer.Run(ctx.Done())
// wait for your secondary caches to fill before starting your work
if !cache.WaitForNamedCacheSync(c.name, stopCh, c.preRunCaches...) {
if !cache.WaitForNamedCacheSync(c.name, ctx.Done(), c.preRunCaches...) {
return
}
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) {
c.queue.Add(workItemKey)
return false, nil
}, stopCh)
}, ctx.Done())
<-stopCh
<-ctx.Done()
}
func (c *ConfigMapCAController) runWorker() {

View File

@ -18,6 +18,7 @@ package dynamiccertificates
import (
"bytes"
"context"
"crypto/x509"
"fmt"
"io/ioutil"
@ -39,10 +40,10 @@ var FileRefreshDuration = 1 * time.Minute
// ControllerRunner is a generic interface for starting a controller
type ControllerRunner interface {
// RunOnce runs the sync loop a single time. This useful for synchronous priming
RunOnce() error
RunOnce(ctx context.Context) error
// Run should be called a go .Run
Run(workers int, stopCh <-chan struct{})
Run(ctx context.Context, workers int)
}
// DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content
@ -144,12 +145,12 @@ func (c *DynamicFileCAContent) hasCAChanged(caBundle []byte) bool {
}
// RunOnce runs a single sync loop
func (c *DynamicFileCAContent) RunOnce() error {
func (c *DynamicFileCAContent) RunOnce(ctx context.Context) error {
return c.loadCABundle()
}
// Run starts the controller and blocks until stopCh is closed.
func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) {
func (c *DynamicFileCAContent) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
@ -157,16 +158,16 @@ func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) {
defer klog.InfoS("Shutting down controller", "name", c.name)
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
// start the loop that watches the CA file until stopCh is closed.
go wait.Until(func() {
if err := c.watchCAFile(stopCh); err != nil {
if err := c.watchCAFile(ctx.Done()); err != nil {
klog.ErrorS(err, "Failed to watch CA file, will retry later")
}
}, time.Minute, stopCh)
}, time.Minute, ctx.Done())
<-stopCh
<-ctx.Done()
}
func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error {

View File

@ -17,6 +17,7 @@ limitations under the License.
package dynamiccertificates
import (
"context"
"crypto/tls"
"fmt"
"io/ioutil"
@ -119,12 +120,12 @@ func (c *DynamicCertKeyPairContent) loadCertKeyPair() error {
}
// RunOnce runs a single sync loop
func (c *DynamicCertKeyPairContent) RunOnce() error {
func (c *DynamicCertKeyPairContent) RunOnce(ctx context.Context) error {
return c.loadCertKeyPair()
}
// Run starts the controller and blocks until stopCh is closed.
func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) {
// Run starts the controller and blocks until context is killed.
func (c *DynamicCertKeyPairContent) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
@ -132,16 +133,16 @@ func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) {
defer klog.InfoS("Shutting down controller", "name", c.name)
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, ctx.Done())
// start the loop that watches the cert and key files until stopCh is closed.
go wait.Until(func() {
if err := c.watchCertKeyFile(stopCh); err != nil {
if err := c.watchCertKeyFile(ctx.Done()); err != nil {
klog.ErrorS(err, "Failed to watch cert and key file, will retry later")
}
}, time.Minute, stopCh)
}, time.Minute, ctx.Done())
<-stopCh
<-ctx.Done()
}
func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error {

View File

@ -18,6 +18,7 @@ package dynamiccertificates
import (
"bytes"
"context"
"crypto/x509"
"strings"
@ -81,11 +82,11 @@ func (c unionCAContent) AddListener(listener Listener) {
}
// AddListener adds a listener to be notified when the CA content changes.
func (c unionCAContent) RunOnce() error {
func (c unionCAContent) RunOnce(ctx context.Context) error {
errors := []error{}
for _, curr := range c {
if controller, ok := curr.(ControllerRunner); ok {
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
errors = append(errors, err)
}
}
@ -95,10 +96,10 @@ func (c unionCAContent) RunOnce() error {
}
// Run runs the controller
func (c unionCAContent) Run(workers int, stopCh <-chan struct{}) {
func (c unionCAContent) Run(ctx context.Context, workers int) {
for _, curr := range c {
if controller, ok := curr.(ControllerRunner); ok {
go controller.Run(workers, stopCh)
go controller.Run(ctx, workers)
}
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package options
import (
"context"
"fmt"
"strings"
"time"
@ -387,7 +388,10 @@ func (s *DelegatingAuthenticationOptions) createRequestHeaderConfig(client kuber
}
// look up authentication configuration in the cluster and in case of an err defer to authentication-tolerate-lookup-failure flag
if err := dynamicRequestHeaderProvider.RunOnce(); err != nil {
// We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the
// context is not used at all. So passing a empty context shouldn't be a problem
ctx := context.TODO()
if err := dynamicRequestHeaderProvider.RunOnce(ctx); err != nil {
return nil, err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package options
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/util/errors"
@ -64,15 +65,15 @@ func newDynamicRequestHeaderController(client kubernetes.Interface) (*DynamicReq
}, nil
}
func (c *DynamicRequestHeaderController) RunOnce() error {
func (c *DynamicRequestHeaderController) RunOnce(ctx context.Context) error {
errs := []error{}
errs = append(errs, c.ConfigMapCAController.RunOnce())
errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce())
errs = append(errs, c.ConfigMapCAController.RunOnce(ctx))
errs = append(errs, c.RequestHeaderAuthRequestController.RunOnce(ctx))
return errors.NewAggregate(errs)
}
func (c *DynamicRequestHeaderController) Run(workers int, stopCh <-chan struct{}) {
go c.ConfigMapCAController.Run(workers, stopCh)
go c.RequestHeaderAuthRequestController.Run(workers, stopCh)
<-stopCh
func (c *DynamicRequestHeaderController) Run(ctx context.Context, workers int) {
go c.ConfigMapCAController.Run(ctx, workers)
go c.RequestHeaderAuthRequestController.Run(ctx, workers)
<-ctx.Done()
}

View File

@ -93,36 +93,45 @@ func (s *SecureServingInfo) tlsConfig(stopCh <-chan struct{}) (*tls.Config, erro
if s.Cert != nil {
s.Cert.AddListener(dynamicCertificateController)
}
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-stopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
// start controllers if possible
if controller, ok := s.ClientCA.(dynamiccertificates.ControllerRunner); ok {
// runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
klog.Warningf("Initial population of client CA failed: %v", err)
}
go controller.Run(1, stopCh)
go controller.Run(ctx, 1)
}
if controller, ok := s.Cert.(dynamiccertificates.ControllerRunner); ok {
// runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
klog.Warningf("Initial population of default serving certificate failed: %v", err)
}
go controller.Run(1, stopCh)
go controller.Run(ctx, 1)
}
for _, sniCert := range s.SNICerts {
sniCert.AddListener(dynamicCertificateController)
if controller, ok := sniCert.(dynamiccertificates.ControllerRunner); ok {
// runonce to try to prime data. If this fails, it's ok because we fail closed.
// Files are required to be populated already, so this is for convenience.
if err := controller.RunOnce(); err != nil {
if err := controller.RunOnce(ctx); err != nil {
klog.Warningf("Initial population of SNI serving certificate failed: %v", err)
}
go controller.Run(1, stopCh)
go controller.Run(ctx, 1)
}
}

View File

@ -244,14 +244,27 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
if err != nil {
return nil, err
}
if err := aggregatorProxyCerts.RunOnce(); err != nil {
// We are passing the context to ProxyCerts.RunOnce as it needs to implement RunOnce(ctx) however the
// context is not used at all. So passing a empty context shouldn't be a problem
ctx := context.TODO()
if err := aggregatorProxyCerts.RunOnce(ctx); err != nil {
return nil, err
}
aggregatorProxyCerts.AddListener(apiserviceRegistrationController)
s.proxyCurrentCertKeyContent = aggregatorProxyCerts.CurrentCertKeyContent
s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(context genericapiserver.PostStartHookContext) error {
go aggregatorProxyCerts.Run(1, context.StopCh)
s.GenericAPIServer.AddPostStartHookOrDie("aggregator-reload-proxy-client-cert", func(postStartHookContext genericapiserver.PostStartHookContext) error {
// generate a context from stopCh. This is to avoid modifying files which are relying on apiserver
// TODO: See if we can pass ctx to the current method
ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-postStartHookContext.StopCh:
cancel() // stopCh closed, so cancel our context
case <-ctx.Done():
}
}()
go aggregatorProxyCerts.Run(ctx, 1)
return nil
})
}

View File

@ -847,7 +847,8 @@ func TestProxyCertReload(t *testing.T) {
if err != nil {
t.Fatalf("Unable to create dynamic certificates: %v", err)
}
err = certProvider.RunOnce()
ctx := context.TODO()
err = certProvider.RunOnce(ctx)
if err != nil {
t.Fatalf("Unable to load dynamic certificates: %v", err)
}
@ -886,7 +887,7 @@ func TestProxyCertReload(t *testing.T) {
// STEP 3: swap the certificate used by the aggregator to auth against the backend server and verify the request passes
// note that this step uses the certificate that can be validated by the backend server with clientCaCrt()
writeCerts(certFile, keyFile, clientCert(), clientKey(), t)
err = certProvider.RunOnce()
err = certProvider.RunOnce(ctx)
if err != nil {
t.Fatalf("Expected no error when refreshing dynamic certs, got %v", err)
}