Derive kubelet serving certificate CSR template from node status addresses

This commit is contained in:
Jordan Liggitt 2018-06-28 14:44:25 -04:00
parent 3b269e182d
commit db9d3c2d10
No known key found for this signature in database
GPG Key ID: 39928704103C7229
8 changed files with 293 additions and 59 deletions

View File

@ -17,6 +17,7 @@ go_library(
"//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//staging/src/k8s.io/api/certificates/v1beta1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/net:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
@ -32,9 +33,13 @@ go_library(
go_test(
name = "go_default_test",
srcs = ["transport_test.go"],
srcs = [
"kubelet_test.go",
"transport_test.go",
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",

View File

@ -21,10 +21,12 @@ import (
"crypto/x509/pkix"
"fmt"
"net"
"sort"
"github.com/prometheus/client_golang/prometheus"
certificates "k8s.io/api/certificates/v1beta1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
clientcertificates "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
@ -35,7 +37,7 @@ import (
// NewKubeletServerCertificateManager creates a certificate manager for the kubelet when retrieving a server certificate
// or returns an error.
func NewKubeletServerCertificateManager(kubeClient clientset.Interface, kubeCfg *kubeletconfig.KubeletConfiguration, nodeName types.NodeName, ips []net.IP, hostnames []string, certDirectory string) (certificate.Manager, error) {
func NewKubeletServerCertificateManager(kubeClient clientset.Interface, kubeCfg *kubeletconfig.KubeletConfiguration, nodeName types.NodeName, getAddresses func() []v1.NodeAddress, certDirectory string) (certificate.Manager, error) {
var certSigningRequestClient clientcertificates.CertificateSigningRequestInterface
if kubeClient != nil && kubeClient.CertificatesV1beta1() != nil {
certSigningRequestClient = kubeClient.CertificatesV1beta1().CertificateSigningRequests()
@ -59,16 +61,25 @@ func NewKubeletServerCertificateManager(kubeClient clientset.Interface, kubeCfg
)
prometheus.MustRegister(certificateExpiration)
m, err := certificate.NewManager(&certificate.Config{
CertificateSigningRequestClient: certSigningRequestClient,
Template: &x509.CertificateRequest{
getTemplate := func() *x509.CertificateRequest {
hostnames, ips := addressesToHostnamesAndIPs(getAddresses())
// don't return a template if we have no addresses to request for
if len(hostnames) == 0 && len(ips) == 0 {
return nil
}
return &x509.CertificateRequest{
Subject: pkix.Name{
CommonName: fmt.Sprintf("system:node:%s", nodeName),
Organization: []string{"system:nodes"},
},
DNSNames: hostnames,
IPAddresses: ips,
},
}
}
m, err := certificate.NewManager(&certificate.Config{
CertificateSigningRequestClient: certSigningRequestClient,
GetTemplate: getTemplate,
Usages: []certificates.KeyUsage{
// https://tools.ietf.org/html/rfc5280#section-4.2.1.3
//
@ -92,6 +103,44 @@ func NewKubeletServerCertificateManager(kubeClient clientset.Interface, kubeCfg
return m, nil
}
func addressesToHostnamesAndIPs(addresses []v1.NodeAddress) (dnsNames []string, ips []net.IP) {
seenDNSNames := map[string]bool{}
seenIPs := map[string]bool{}
for _, address := range addresses {
if len(address.Address) == 0 {
continue
}
switch address.Type {
case v1.NodeHostName:
if ip := net.ParseIP(address.Address); ip != nil {
seenIPs[address.Address] = true
} else {
seenDNSNames[address.Address] = true
}
case v1.NodeExternalIP, v1.NodeInternalIP:
if ip := net.ParseIP(address.Address); ip != nil {
seenIPs[address.Address] = true
}
case v1.NodeExternalDNS, v1.NodeInternalDNS:
seenDNSNames[address.Address] = true
}
}
for dnsName := range seenDNSNames {
dnsNames = append(dnsNames, dnsName)
}
for ip := range seenIPs {
ips = append(ips, net.ParseIP(ip))
}
// return in stable order
sort.Strings(dnsNames)
sort.Slice(ips, func(i, j int) bool { return ips[i].String() < ips[j].String() })
return dnsNames, ips
}
// NewKubeletClientCertificateManager sets up a certificate manager without a
// client that can be used to sign new certificates (or rotate). It answers with
// whatever certificate it is initialized with. If a CSR client is set later, it

View File

@ -0,0 +1,101 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package certificate
import (
"net"
"reflect"
"testing"
"k8s.io/api/core/v1"
)
func TestAddressesToHostnamesAndIPs(t *testing.T) {
tests := []struct {
name string
addresses []v1.NodeAddress
wantDNSNames []string
wantIPs []net.IP
}{
{
name: "empty",
addresses: nil,
wantDNSNames: nil,
wantIPs: nil,
},
{
name: "ignore empty values",
addresses: []v1.NodeAddress{{Type: v1.NodeHostName, Address: ""}},
wantDNSNames: nil,
wantIPs: nil,
},
{
name: "ignore invalid IPs",
addresses: []v1.NodeAddress{
{Type: v1.NodeInternalIP, Address: "1.2"},
{Type: v1.NodeExternalIP, Address: "3.4"},
},
wantDNSNames: nil,
wantIPs: nil,
},
{
name: "dedupe values",
addresses: []v1.NodeAddress{
{Type: v1.NodeHostName, Address: "hostname"},
{Type: v1.NodeExternalDNS, Address: "hostname"},
{Type: v1.NodeInternalDNS, Address: "hostname"},
{Type: v1.NodeInternalIP, Address: "1.1.1.1"},
{Type: v1.NodeExternalIP, Address: "1.1.1.1"},
},
wantDNSNames: []string{"hostname"},
wantIPs: []net.IP{net.ParseIP("1.1.1.1")},
},
{
name: "order values",
addresses: []v1.NodeAddress{
{Type: v1.NodeHostName, Address: "hostname-2"},
{Type: v1.NodeExternalDNS, Address: "hostname-1"},
{Type: v1.NodeInternalDNS, Address: "hostname-3"},
{Type: v1.NodeInternalIP, Address: "2.2.2.2"},
{Type: v1.NodeExternalIP, Address: "1.1.1.1"},
{Type: v1.NodeInternalIP, Address: "3.3.3.3"},
},
wantDNSNames: []string{"hostname-1", "hostname-2", "hostname-3"},
wantIPs: []net.IP{net.ParseIP("1.1.1.1"), net.ParseIP("2.2.2.2"), net.ParseIP("3.3.3.3")},
},
{
name: "handle IP and DNS hostnames",
addresses: []v1.NodeAddress{
{Type: v1.NodeHostName, Address: "hostname"},
{Type: v1.NodeHostName, Address: "1.1.1.1"},
},
wantDNSNames: []string{"hostname"},
wantIPs: []net.IP{net.ParseIP("1.1.1.1")},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotDNSNames, gotIPs := addressesToHostnamesAndIPs(tt.addresses)
if !reflect.DeepEqual(gotDNSNames, tt.wantDNSNames) {
t.Errorf("addressesToHostnamesAndIPs() gotDNSNames = %v, want %v", gotDNSNames, tt.wantDNSNames)
}
if !reflect.DeepEqual(gotIPs, tt.wantIPs) {
t.Errorf("addressesToHostnamesAndIPs() gotIPs = %v, want %v", gotIPs, tt.wantIPs)
}
})
}
}

View File

@ -372,8 +372,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
hostname := nodeutil.GetHostname(hostnameOverride)
// Query the cloud provider for our node name, default to hostname
nodeName := types.NodeName(hostname)
cloudIPs := []net.IP{}
cloudNames := []string{}
if kubeDeps.Cloud != nil {
var err error
instances, ok := kubeDeps.Cloud.Instances()
@ -387,25 +385,6 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
}
glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
if utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
nodeAddresses, err := instances.NodeAddresses(context.TODO(), nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get the addresses of the current instance from the cloud provider: %v", err)
}
for _, nodeAddress := range nodeAddresses {
switch nodeAddress.Type {
case v1.NodeExternalIP, v1.NodeInternalIP:
ip := net.ParseIP(nodeAddress.Address)
if ip != nil && !ip.IsLoopback() {
cloudIPs = append(cloudIPs, ip)
}
case v1.NodeExternalDNS, v1.NodeInternalDNS, v1.NodeHostName:
cloudNames = append(cloudNames, nodeAddress.Address)
}
}
}
}
if kubeDeps.PodConfig == nil {
@ -747,21 +726,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
klet.statusManager = status.NewManager(klet.kubeClient, klet.podManager, klet)
if kubeCfg.ServerTLSBootstrap && kubeDeps.TLSOptions != nil && utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
var ips []net.IP
cfgAddress := net.ParseIP(kubeCfg.Address)
if cfgAddress == nil || cfgAddress.IsUnspecified() {
localIPs, err := allGlobalUnicastIPs()
if err != nil {
return nil, err
}
ips = localIPs
} else {
ips = []net.IP{cfgAddress}
}
ips = append(ips, cloudIPs...)
names := append([]string{klet.GetHostname(), hostnameOverride}, cloudNames...)
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, ips, names, certDirectory)
klet.serverCertificateManager, err = kubeletcertificate.NewKubeletServerCertificateManager(klet.kubeClient, kubeCfg, klet.nodeName, klet.getLastObservedNodeAddresses, certDirectory)
if err != nil {
return nil, fmt.Errorf("failed to initialize certificate manager: %v", err)
}
@ -893,6 +858,9 @@ type Kubelet struct {
iptClient utilipt.Interface
rootDirectory string
lastObservedNodeAddressesMux sync.Mutex
lastObservedNodeAddresses []v1.NodeAddress
// onRepeatedHeartbeatFailure is called when a heartbeat operation fails more than once. optional.
onRepeatedHeartbeatFailure func()

View File

@ -428,6 +428,7 @@ func (kl *Kubelet) tryUpdateNodeStatus(tryNumber int) error {
if err != nil {
return err
}
kl.setLastObservedNodeAddresses(updatedNode.Status.Addresses)
// If update finishes successfully, mark the volumeInUse as reportedInUse to indicate
// those volumes are already updated in the node's status
kl.volumeManager.MarkVolumesAsReportedInUse(updatedNode.Status.VolumesInUse)
@ -1072,6 +1073,17 @@ func (kl *Kubelet) setNodeStatus(node *v1.Node) {
}
}
func (kl *Kubelet) setLastObservedNodeAddresses(addresses []v1.NodeAddress) {
kl.lastObservedNodeAddressesMux.Lock()
defer kl.lastObservedNodeAddressesMux.Unlock()
kl.lastObservedNodeAddresses = addresses
}
func (kl *Kubelet) getLastObservedNodeAddresses() []v1.NodeAddress {
kl.lastObservedNodeAddressesMux.Lock()
defer kl.lastObservedNodeAddressesMux.Unlock()
return kl.lastObservedNodeAddresses
}
// defaultNodeStatusFuncs is a factory that generates the default set of
// setNodeStatus funcs
func (kl *Kubelet) defaultNodeStatusFuncs() []func(*v1.Node) error {

View File

@ -41,6 +41,7 @@ go_library(
"//staging/src/k8s.io/api/certificates/v1beta1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/certificates/v1beta1:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",

View File

@ -24,6 +24,7 @@ import (
"crypto/x509"
"encoding/pem"
"fmt"
"reflect"
"sync"
"time"
@ -32,6 +33,7 @@ import (
certificates "k8s.io/api/certificates/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
certificatesclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
"k8s.io/client-go/util/cert"
@ -75,6 +77,13 @@ type Config struct {
// part of rotation. It follows the same rules as the template parameter of
// crypto.x509.CreateCertificateRequest in the Go standard libraries.
Template *x509.CertificateRequest
// GetTemplate returns the CertificateRequest that will be used as a template for
// generating certificate signing requests for all new keys generated as
// part of rotation. It follows the same rules as the template parameter of
// crypto.x509.CreateCertificateRequest in the Go standard libraries.
// If no template is available, nil may be returned, and no certificate will be requested.
// If specified, takes precedence over Template.
GetTemplate func() *x509.CertificateRequest
// Usages is the types of usages that certificates generated by the manager
// can be used for.
Usages []certificates.KeyUsage
@ -136,7 +145,10 @@ func (e *NoCertKeyError) Error() string { return string(*e) }
type manager struct {
certSigningRequestClient certificatesclient.CertificateSigningRequestInterface
template *x509.CertificateRequest
getTemplate func() *x509.CertificateRequest
lastRequestLock sync.Mutex
lastRequest *x509.CertificateRequest
dynamicTemplate bool
usages []certificates.KeyUsage
certStore Store
certAccessLock sync.RWMutex
@ -158,9 +170,15 @@ func NewManager(config *Config) (Manager, error) {
return nil, err
}
getTemplate := config.GetTemplate
if getTemplate == nil {
getTemplate = func() *x509.CertificateRequest { return config.Template }
}
m := manager{
certSigningRequestClient: config.CertificateSigningRequestClient,
template: config.Template,
getTemplate: getTemplate,
dynamicTemplate: config.GetTemplate != nil,
usages: config.Usages,
certStore: config.CertificateStore,
cert: cert,
@ -215,12 +233,32 @@ func (m *manager) Start() {
glog.V(2).Infof("Certificate rotation is enabled.")
templateChanged := make(chan struct{})
go wait.Forever(func() {
deadline := m.nextRotationDeadline()
if sleepInterval := deadline.Sub(time.Now()); sleepInterval > 0 {
glog.V(2).Infof("Waiting %v for next certificate rotation", sleepInterval)
time.Sleep(sleepInterval)
timer := time.NewTimer(sleepInterval)
defer timer.Stop()
select {
case <-timer.C:
// unblock when deadline expires
case <-templateChanged:
if reflect.DeepEqual(m.getLastRequest(), m.getTemplate()) {
// if the template now matches what we last requested, restart the rotation deadline loop
return
}
glog.V(2).Infof("Certificate template changed, rotating")
}
}
// Don't enter rotateCerts and trigger backoff if we don't even have a template to request yet
if m.getTemplate() == nil {
return
}
backoff := wait.Backoff{
Duration: 2 * time.Second,
Factor: 2,
@ -231,7 +269,18 @@ func (m *manager) Start() {
utilruntime.HandleError(fmt.Errorf("Reached backoff limit, still unable to rotate certs: %v", err))
wait.PollInfinite(32*time.Second, m.rotateCerts)
}
}, 0)
}, time.Second)
if m.dynamicTemplate {
go wait.Forever(func() {
// check if the current template matches what we last requested
if !reflect.DeepEqual(m.getLastRequest(), m.getTemplate()) {
// if the template is different, queue up an interrupt of the rotation deadline loop.
// if we've requested a CSR that matches the new template by the time the interrupt is handled, the interrupt is disregarded.
templateChanged <- struct{}{}
}
}, time.Second)
}
}
func getCurrentCertificateOrBootstrap(
@ -286,7 +335,7 @@ func getCurrentCertificateOrBootstrap(
func (m *manager) rotateCerts() (bool, error) {
glog.V(2).Infof("Rotating certificates")
csrPEM, keyPEM, privateKey, err := m.generateCSR()
template, csrPEM, keyPEM, privateKey, err := m.generateCSR()
if err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to generate a certificate signing request: %v", err))
return false, nil
@ -300,6 +349,9 @@ func (m *manager) rotateCerts() (bool, error) {
return false, m.updateServerError(err)
}
// Once we've successfully submitted a CSR for this template, record that we did so
m.setLastRequest(template)
// Wait for the certificate to be signed. Instead of one long watch, we retry with slightly longer
// intervals each time in order to tolerate failures from the server AND to preserve the liveliness
// of the cert manager loop. This creates slightly more traffic against the API server in return
@ -353,6 +405,36 @@ func (m *manager) nextRotationDeadline() time.Time {
return time.Now()
}
// Ensure the currently held certificate satisfies the requested subject CN and SANs
if template := m.getTemplate(); template != nil {
if template.Subject.CommonName != m.cert.Leaf.Subject.CommonName {
glog.V(2).Infof("Current certificate CN (%s) does not match requested CN (%s), rotating now", m.cert.Leaf.Subject.CommonName, template.Subject.CommonName)
return time.Now()
}
currentDNSNames := sets.NewString(m.cert.Leaf.DNSNames...)
desiredDNSNames := sets.NewString(template.DNSNames...)
missingDNSNames := desiredDNSNames.Difference(currentDNSNames)
if len(missingDNSNames) > 0 {
glog.V(2).Infof("Current certificate is missing requested DNS names %v, rotating now", missingDNSNames.List())
return time.Now()
}
currentIPs := sets.NewString()
for _, ip := range m.cert.Leaf.IPAddresses {
currentIPs.Insert(ip.String())
}
desiredIPs := sets.NewString()
for _, ip := range template.IPAddresses {
desiredIPs.Insert(ip.String())
}
missingIPs := desiredIPs.Difference(currentIPs)
if len(missingIPs) > 0 {
glog.V(2).Infof("Current certificate is missing requested IP addresses %v, rotating now", missingIPs.List())
return time.Now()
}
}
notAfter := m.cert.Leaf.NotAfter
totalDuration := float64(notAfter.Sub(m.cert.Leaf.NotBefore))
deadline := m.cert.Leaf.NotBefore.Add(jitteryDuration(totalDuration))
@ -408,22 +490,38 @@ func (m *manager) updateServerError(err error) error {
return nil
}
func (m *manager) generateCSR() (csrPEM []byte, keyPEM []byte, key interface{}, err error) {
func (m *manager) generateCSR() (template *x509.CertificateRequest, csrPEM []byte, keyPEM []byte, key interface{}, err error) {
// Generate a new private key.
privateKey, err := ecdsa.GenerateKey(elliptic.P256(), cryptorand.Reader)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to generate a new private key: %v", err)
return nil, nil, nil, nil, fmt.Errorf("unable to generate a new private key: %v", err)
}
der, err := x509.MarshalECPrivateKey(privateKey)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to marshal the new key to DER: %v", err)
return nil, nil, nil, nil, fmt.Errorf("unable to marshal the new key to DER: %v", err)
}
keyPEM = pem.EncodeToMemory(&pem.Block{Type: cert.ECPrivateKeyBlockType, Bytes: der})
csrPEM, err = cert.MakeCSRFromTemplate(privateKey, m.template)
if err != nil {
return nil, nil, nil, fmt.Errorf("unable to create a csr from the private key: %v", err)
template = m.getTemplate()
if template == nil {
return nil, nil, nil, nil, fmt.Errorf("unable to create a csr, no template available")
}
return csrPEM, keyPEM, privateKey, nil
csrPEM, err = cert.MakeCSRFromTemplate(privateKey, template)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("unable to create a csr from the private key: %v", err)
}
return template, csrPEM, keyPEM, privateKey, nil
}
func (m *manager) getLastRequest() *x509.CertificateRequest {
m.lastRequestLock.Lock()
defer m.lastRequestLock.Unlock()
return m.lastRequest
}
func (m *manager) setLastRequest(r *x509.CertificateRequest) {
m.lastRequestLock.Lock()
defer m.lastRequestLock.Unlock()
m.lastRequest = r
}

View File

@ -186,7 +186,7 @@ func TestSetRotationDeadline(t *testing.T) {
NotAfter: tc.notAfter,
},
},
template: &x509.CertificateRequest{},
getTemplate: func() *x509.CertificateRequest { return &x509.CertificateRequest{} },
usages: []certificates.KeyUsage{},
certificateExpiration: &g,
}
@ -221,8 +221,8 @@ func TestRotateCertCreateCSRError(t *testing.T) {
NotAfter: now.Add(-1 * time.Hour),
},
},
template: &x509.CertificateRequest{},
usages: []certificates.KeyUsage{},
getTemplate: func() *x509.CertificateRequest { return &x509.CertificateRequest{} },
usages: []certificates.KeyUsage{},
certSigningRequestClient: fakeClient{
failureType: createError,
},
@ -244,8 +244,8 @@ func TestRotateCertWaitingForResultError(t *testing.T) {
NotAfter: now.Add(-1 * time.Hour),
},
},
template: &x509.CertificateRequest{},
usages: []certificates.KeyUsage{},
getTemplate: func() *x509.CertificateRequest { return &x509.CertificateRequest{} },
usages: []certificates.KeyUsage{},
certSigningRequestClient: fakeClient{
failureType: watchError,
},