diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/BUILD b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/BUILD index 2ed8afda6e3..009f8e9de54 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/BUILD @@ -5,6 +5,7 @@ go_library( srcs = [ "cert_key.go", "client_ca.go", + "dynamicfile_content.go", "named_certificates.go", "static_content.go", "tlsconfig.go", diff --git a/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamicfile_content.go b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamicfile_content.go new file mode 100644 index 00000000000..a168eb68065 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/dynamiccertificates/dynamicfile_content.go @@ -0,0 +1,201 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dynamiccertificates + +import ( + "crypto/x509" + "fmt" + "io/ioutil" + "reflect" + "sync/atomic" + "time" + + "k8s.io/client-go/util/cert" + + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +type CAListener interface { + Enqueue() +} + +// DynamicFileCAContent provies a CAContentProvider that can dynamically react to new file content +// It also fulfills the authenticator interface to provide verifyoptions +type DynamicFileCAContent struct { + name string + + // filename is the name the file to read. + filename string + + // caBundle is a caBundleAndVerifier that contains the last read, non-zero length content of the file + caBundle atomic.Value + + listeners []CAListener + + // queue only ever has one item, but it has nice error handling backoff/retry semantics + queue workqueue.RateLimitingInterface +} + +type caBundleAndVerifier struct { + caBundle []byte + verifyOptions x509.VerifyOptions +} + +// NewStaticCAContentFromFile returns a CAContentProvider based on a filename +func NewDynamicCAContentFromFile(purpose, filename string) (*DynamicFileCAContent, error) { + if len(filename) == 0 { + return nil, fmt.Errorf("missing filename for ca bundle") + } + name := fmt.Sprintf("%s::%s", purpose, filename) + + ret := &DynamicFileCAContent{ + name: name, + filename: filename, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), fmt.Sprintf("DynamicCABundle-%s", purpose)), + } + if err := ret.loadCABundle(); err != nil { + return nil, err + } + + return ret, nil +} + +// AddListener adds a listener to be notified when the CA content changes. +func (c *DynamicFileCAContent) AddListener(listener CAListener) { + c.listeners = append(c.listeners, listener) +} + +// loadCABundle determines the next set of content for the file. +func (c *DynamicFileCAContent) loadCABundle() error { + caBundle, err := ioutil.ReadFile(c.filename) + if err != nil { + return err + } + if len(caBundle) == 0 { + return fmt.Errorf("missing content for CA bundle %q", c.Name()) + } + + // check to see if we have a change. If the values are the same, do nothing. + existing, ok := c.caBundle.Load().(*caBundleAndVerifier) + if ok && existing != nil && reflect.DeepEqual(existing.caBundle, caBundle) { + return nil + } + + caBundleAndVerifier, err := newCABundleAndVerifier(c.Name(), caBundle) + if err != nil { + return err + } + c.caBundle.Store(caBundleAndVerifier) + + for _, listener := range c.listeners { + listener.Enqueue() + } + + return nil +} + +// Run starts the kube-apiserver and blocks until stopCh is closed. +func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + klog.Infof("Starting %s", c.name) + defer klog.Infof("Shutting down %s", c.name) + + // doesn't matter what workers say, only start one. + go wait.Until(c.runWorker, time.Second, stopCh) + + // start timer that rechecks every minute, just in case. this also serves to prime the controller quickly. + _ = wait.PollImmediateUntil(1*time.Minute, func() (bool, error) { + c.queue.Add(workItemKey) + return false, nil + }, stopCh) + + // TODO this can be wired to an fsnotifier as well. + + <-stopCh +} + +func (c *DynamicFileCAContent) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *DynamicFileCAContent) processNextWorkItem() bool { + dsKey, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(dsKey) + + err := c.loadCABundle() + if err == nil { + c.queue.Forget(dsKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err)) + c.queue.AddRateLimited(dsKey) + + return true +} + +// Name is just an identifier +func (c *DynamicFileCAContent) Name() string { + return c.name +} + +// CurrentCABundleContent provides ca bundle byte content +func (c *DynamicFileCAContent) CurrentCABundleContent() (cabundle []byte) { + return c.caBundle.Load().(caBundleAndVerifier).caBundle +} + +func (c *DynamicFileCAContent) VerifyOptions() x509.VerifyOptions { + return c.caBundle.Load().(caBundleAndVerifier).verifyOptions +} + +// newVerifyOptions creates a new verification func from a file. It reads the content and then fails. +// It will return a nil function if you pass an empty CA file. +func newCABundleAndVerifier(name string, caBundle []byte) (*caBundleAndVerifier, error) { + if len(caBundle) == 0 { + return nil, fmt.Errorf("missing content for CA bundle %q", name) + } + + // Wrap with an x509 verifier + var err error + verifyOptions := defaultVerifyOptions() + verifyOptions.Roots, err = cert.NewPoolFromBytes(caBundle) + if err != nil { + return nil, fmt.Errorf("error loading CA bundle for %q: %v", name, err) + } + + return &caBundleAndVerifier{ + caBundle: caBundle, + verifyOptions: verifyOptions, + }, nil +} + +// defaultVerifyOptions returns VerifyOptions that use the system root certificates, current time, +// and requires certificates to be valid for client auth (x509.ExtKeyUsageClientAuth) +func defaultVerifyOptions() x509.VerifyOptions { + return x509.VerifyOptions{ + KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + } +}