mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-20 10:20:51 +00:00
Merge pull request #115315 from enj/enj/i/kas_kubelet_conn_close
kubelet/client: collapse transport wiring onto standard approach
This commit is contained in:
commit
22b88dea36
@ -210,13 +210,13 @@ func (s *ServerRunOptions) Flags() (fss cliflag.NamedFlagSets) {
|
||||
fs.DurationVar(&s.KubeletConfig.HTTPTimeout, "kubelet-timeout", s.KubeletConfig.HTTPTimeout,
|
||||
"Timeout for kubelet operations.")
|
||||
|
||||
fs.StringVar(&s.KubeletConfig.CertFile, "kubelet-client-certificate", s.KubeletConfig.CertFile,
|
||||
fs.StringVar(&s.KubeletConfig.TLSClientConfig.CertFile, "kubelet-client-certificate", s.KubeletConfig.TLSClientConfig.CertFile,
|
||||
"Path to a client cert file for TLS.")
|
||||
|
||||
fs.StringVar(&s.KubeletConfig.KeyFile, "kubelet-client-key", s.KubeletConfig.KeyFile,
|
||||
fs.StringVar(&s.KubeletConfig.TLSClientConfig.KeyFile, "kubelet-client-key", s.KubeletConfig.TLSClientConfig.KeyFile,
|
||||
"Path to a client key file for TLS.")
|
||||
|
||||
fs.StringVar(&s.KubeletConfig.CAFile, "kubelet-certificate-authority", s.KubeletConfig.CAFile,
|
||||
fs.StringVar(&s.KubeletConfig.TLSClientConfig.CAFile, "kubelet-certificate-authority", s.KubeletConfig.TLSClientConfig.CAFile,
|
||||
"Path to a cert file for the certificate authority.")
|
||||
|
||||
fs.StringVar(&s.ProxyClientCertFile, "proxy-client-cert-file", s.ProxyClientCertFile, ""+
|
||||
|
@ -26,13 +26,13 @@ import (
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/spf13/pflag"
|
||||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
|
||||
"k8s.io/apiserver/pkg/admission"
|
||||
apiserveroptions "k8s.io/apiserver/pkg/server/options"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
auditbuffered "k8s.io/apiserver/plugin/pkg/audit/buffered"
|
||||
audittruncate "k8s.io/apiserver/plugin/pkg/audit/truncate"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
"k8s.io/component-base/logs"
|
||||
"k8s.io/component-base/metrics"
|
||||
@ -200,7 +200,7 @@ func TestAddFlags(t *testing.T) {
|
||||
string(kapi.NodeExternalIP),
|
||||
},
|
||||
HTTPTimeout: time.Duration(5) * time.Second,
|
||||
TLSClientConfig: restclient.TLSClientConfig{
|
||||
TLSClientConfig: kubeletclient.KubeletTLSConfig{
|
||||
CertFile: "/var/run/kubernetes/ceserver.crt",
|
||||
KeyFile: "/var/run/kubernetes/server.key",
|
||||
CAFile: "/var/run/kubernetes/caserver.crt",
|
||||
|
@ -26,9 +26,7 @@ import (
|
||||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apiserver/pkg/server/egressselector"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/transport"
|
||||
nodeutil "k8s.io/kubernetes/pkg/util/node"
|
||||
)
|
||||
@ -45,21 +43,24 @@ type KubeletClientConfig struct {
|
||||
PreferredAddressTypes []string
|
||||
|
||||
// TLSClientConfig contains settings to enable transport layer security
|
||||
restclient.TLSClientConfig
|
||||
|
||||
// Server requires Bearer authentication
|
||||
BearerToken string `datapolicy:"token"`
|
||||
TLSClientConfig KubeletTLSConfig
|
||||
|
||||
// HTTPTimeout is used by the client to timeout http requests to Kubelet.
|
||||
HTTPTimeout time.Duration
|
||||
|
||||
// Dial is a custom dialer used for the client
|
||||
Dial utilnet.DialFunc
|
||||
|
||||
// Lookup will give us a dialer if the egress selector is configured for it
|
||||
Lookup egressselector.Lookup
|
||||
}
|
||||
|
||||
type KubeletTLSConfig struct {
|
||||
// Server requires TLS client certificate authentication
|
||||
CertFile string
|
||||
// Server requires TLS client certificate authentication
|
||||
KeyFile string
|
||||
// Trusted root certificates for server
|
||||
CAFile string
|
||||
}
|
||||
|
||||
// ConnectionInfo provides the information needed to connect to a kubelet
|
||||
type ConnectionInfo struct {
|
||||
Scheme string
|
||||
@ -88,53 +89,38 @@ func MakeInsecureTransport(config *KubeletClientConfig) (http.RoundTripper, erro
|
||||
func makeTransport(config *KubeletClientConfig, insecureSkipTLSVerify bool) (http.RoundTripper, error) {
|
||||
// do the insecureSkipTLSVerify on the pre-transport *before* we go get a potentially cached connection.
|
||||
// transportConfig always produces a new struct pointer.
|
||||
preTLSConfig := config.transportConfig()
|
||||
if insecureSkipTLSVerify && preTLSConfig != nil {
|
||||
preTLSConfig.TLS.Insecure = true
|
||||
preTLSConfig.TLS.CAData = nil
|
||||
preTLSConfig.TLS.CAFile = ""
|
||||
transportConfig := config.transportConfig()
|
||||
if insecureSkipTLSVerify {
|
||||
transportConfig.TLS.Insecure = true
|
||||
transportConfig.TLS.CAFile = "" // we are only using files so we can ignore CAData
|
||||
}
|
||||
|
||||
tlsConfig, err := transport.TLSConfigFor(preTLSConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rt := http.DefaultTransport
|
||||
dialer := config.Dial
|
||||
if dialer == nil && config.Lookup != nil {
|
||||
if config.Lookup != nil {
|
||||
// Assuming EgressSelector if SSHTunnel is not turned on.
|
||||
// We will not get a dialer if egress selector is disabled.
|
||||
networkContext := egressselector.Cluster.AsNetworkContext()
|
||||
dialer, err = config.Lookup(networkContext)
|
||||
dialer, err := config.Lookup(networkContext)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get context dialer for 'cluster': got %v", err)
|
||||
}
|
||||
if dialer != nil {
|
||||
transportConfig.DialHolder = &transport.DialHolder{Dial: dialer}
|
||||
}
|
||||
}
|
||||
if dialer != nil || tlsConfig != nil {
|
||||
// If SSH Tunnel is turned on
|
||||
rt = utilnet.SetOldTransportDefaults(&http.Transport{
|
||||
DialContext: dialer,
|
||||
TLSClientConfig: tlsConfig,
|
||||
})
|
||||
}
|
||||
|
||||
return transport.HTTPWrappersForConfig(config.transportConfig(), rt)
|
||||
return transport.New(transportConfig)
|
||||
}
|
||||
|
||||
// transportConfig converts a client config to an appropriate transport config.
|
||||
func (c *KubeletClientConfig) transportConfig() *transport.Config {
|
||||
cfg := &transport.Config{
|
||||
TLS: transport.TLSConfig{
|
||||
CAFile: c.CAFile,
|
||||
CAData: c.CAData,
|
||||
CertFile: c.CertFile,
|
||||
CertData: c.CertData,
|
||||
KeyFile: c.KeyFile,
|
||||
KeyData: c.KeyData,
|
||||
NextProtos: c.NextProtos,
|
||||
CAFile: c.TLSClientConfig.CAFile,
|
||||
CertFile: c.TLSClientConfig.CertFile,
|
||||
KeyFile: c.TLSClientConfig.KeyFile,
|
||||
// transport.loadTLSFiles would set this to true because we are only using files
|
||||
// it is clearer to set it explicitly here so we remember that this is happening
|
||||
ReloadTLSFiles: true,
|
||||
},
|
||||
BearerToken: c.BearerToken,
|
||||
}
|
||||
if !cfg.HasCA() {
|
||||
cfg.TLS.Insecure = true
|
||||
|
@ -24,14 +24,12 @@ import (
|
||||
"net/url"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
restclient "k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
func TestMakeTransportInvalid(t *testing.T) {
|
||||
config := &KubeletClientConfig{
|
||||
//Invalid certificate and key path
|
||||
TLSClientConfig: restclient.TLSClientConfig{
|
||||
// Invalid certificate and key path
|
||||
TLSClientConfig: KubeletTLSConfig{
|
||||
CertFile: "../../client/testdata/mycertinvalid.cer",
|
||||
KeyFile: "../../client/testdata/mycertinvalid.key",
|
||||
CAFile: "../../client/testdata/myCA.cer",
|
||||
@ -50,7 +48,7 @@ func TestMakeTransportInvalid(t *testing.T) {
|
||||
func TestMakeTransportValid(t *testing.T) {
|
||||
config := &KubeletClientConfig{
|
||||
Port: 1234,
|
||||
TLSClientConfig: restclient.TLSClientConfig{
|
||||
TLSClientConfig: KubeletTLSConfig{
|
||||
CertFile: "../../client/testdata/mycertvalid.cer",
|
||||
// TLS Configuration
|
||||
KeyFile: "../../client/testdata/mycertvalid.key",
|
||||
@ -61,7 +59,7 @@ func TestMakeTransportValid(t *testing.T) {
|
||||
|
||||
rt, err := MakeTransport(config)
|
||||
if err != nil {
|
||||
t.Errorf("Not expecting an error #%v", err)
|
||||
t.Errorf("Not expecting an error %#v", err)
|
||||
}
|
||||
if rt == nil {
|
||||
t.Error("rt should not be nil")
|
||||
@ -89,7 +87,7 @@ func TestMakeInsecureTransport(t *testing.T) {
|
||||
|
||||
config := &KubeletClientConfig{
|
||||
Port: uint(port),
|
||||
TLSClientConfig: restclient.TLSClientConfig{
|
||||
TLSClientConfig: KubeletTLSConfig{
|
||||
CertFile: "../../client/testdata/mycertvalid.cer",
|
||||
// TLS Configuration
|
||||
KeyFile: "../../client/testdata/mycertvalid.key",
|
||||
|
@ -109,7 +109,7 @@ func (c *tlsTransportCache) get(config *Config) (http.RoundTripper, error) {
|
||||
|
||||
// If we use are reloading files, we need to handle certificate rotation properly
|
||||
// TODO(jackkleeman): We can also add rotation here when config.HasCertCallback() is true
|
||||
if config.TLS.ReloadTLSFiles {
|
||||
if config.TLS.ReloadTLSFiles && tlsConfig != nil && tlsConfig.GetClientCertificate != nil {
|
||||
dynamicCertDialer := certRotatingDialer(tlsConfig.GetClientCertificate, dial)
|
||||
tlsConfig.GetClientCertificate = dynamicCertDialer.GetClientCertificate
|
||||
dial = dynamicCertDialer.connDialer.DialContext
|
||||
|
@ -18,26 +18,43 @@ package podlogs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"crypto/x509/pkix"
|
||||
"encoding/pem"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/big"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apiserver/pkg/authentication/authenticatorfactory"
|
||||
"k8s.io/apiserver/pkg/endpoints/filters"
|
||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/transport"
|
||||
certutil "k8s.io/client-go/util/cert"
|
||||
"k8s.io/client-go/util/keyutil"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||
"k8s.io/kubernetes/test/integration/framework"
|
||||
)
|
||||
|
||||
func TestInsecurePodLogs(t *testing.T) {
|
||||
clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
|
||||
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
||||
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
|
||||
// I have no idea what this cert is, but it doesn't matter, we just want something that always fails validation
|
||||
opts.KubeletConfig.CAData = []byte(`
|
||||
badCA := writeDataToTempFile(t, []byte(`
|
||||
-----BEGIN CERTIFICATE-----
|
||||
MIIDMDCCAhigAwIBAgIIHNPD7sig7YIwDQYJKoZIhvcNAQELBQAwNjESMBAGA1UE
|
||||
CxMJb3BlbnNoaWZ0MSAwHgYDVQQDExdhZG1pbi1rdWJlY29uZmlnLXNpZ25lcjAe
|
||||
@ -58,7 +75,13 @@ cTWpa4zcBwru0CRG7iHc66VX16X8jHB1iFeZ5W/FgY4MsE+G1Vze4mCXSPVI4BZ2
|
||||
Bgqc+dJN9xS9Ah5gLiGQJ6C4niUA11piCpvMsy+j/LQ1Erx47KMar5fuMXYk7iPq
|
||||
1vqIwg==
|
||||
-----END CERTIFICATE-----
|
||||
`)
|
||||
`))
|
||||
|
||||
clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
|
||||
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
||||
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
|
||||
// I have no idea what this cert is, but it doesn't matter, we just want something that always fails validation
|
||||
opts.KubeletConfig.TLSClientConfig.CAFile = badCA
|
||||
},
|
||||
})
|
||||
defer tearDownFn()
|
||||
@ -69,74 +92,7 @@ Bgqc+dJN9xS9Ah5gLiGQJ6C4niUA11piCpvMsy+j/LQ1Erx47KMar5fuMXYk7iPq
|
||||
}))
|
||||
defer fakeKubeletServer.Close()
|
||||
|
||||
fakeKubeletURL, err := url.Parse(fakeKubeletServer.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fakeKubeletHost, fakeKubeletPortStr, err := net.SplitHostPort(fakeKubeletURL.Host)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fakeKubeletPort, err := strconv.ParseUint(fakeKubeletPortStr, 10, 32)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node, err := clientSet.CoreV1().Nodes().Create(context.TODO(), &corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "fake"},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
node.Status = corev1.NodeStatus{
|
||||
Addresses: []corev1.NodeAddress{
|
||||
{
|
||||
Type: corev1.NodeExternalIP,
|
||||
Address: fakeKubeletHost,
|
||||
},
|
||||
},
|
||||
DaemonEndpoints: corev1.NodeDaemonEndpoints{
|
||||
KubeletEndpoint: corev1.DaemonEndpoint{
|
||||
Port: int32(fakeKubeletPort),
|
||||
},
|
||||
},
|
||||
}
|
||||
node, err = clientSet.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = clientSet.CoreV1().Namespaces().Create(context.TODO(), &corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "ns"},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = clientSet.CoreV1().ServiceAccounts("ns").Create(context.TODO(), &corev1.ServiceAccount{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "default", Namespace: "ns"},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
falseRef := false
|
||||
pod, err := clientSet.CoreV1().Pods("ns").Create(context.TODO(), &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "ns"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
{
|
||||
Name: "foo",
|
||||
Image: "some/image:latest",
|
||||
},
|
||||
},
|
||||
NodeName: node.Name,
|
||||
AutomountServiceAccountToken: &falseRef,
|
||||
},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
pod := prepareFakeNodeAndPod(context.TODO(), t, clientSet, fakeKubeletServer)
|
||||
|
||||
insecureResult := clientSet.CoreV1().Pods("ns").GetLogs(pod.Name, &corev1.PodLogOptions{InsecureSkipTLSVerifyBackend: true}).Do(context.TODO())
|
||||
if err := insecureResult.Error(); err != nil {
|
||||
@ -162,5 +118,278 @@ Bgqc+dJN9xS9Ah5gLiGQJ6C4niUA11piCpvMsy+j/LQ1Erx47KMar5fuMXYk7iPq
|
||||
t.Log(string(raw))
|
||||
t.Fatal(secureStatusCode)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func prepareFakeNodeAndPod(ctx context.Context, t *testing.T, clientSet kubernetes.Interface, fakeKubeletServer *httptest.Server) *corev1.Pod {
|
||||
t.Helper()
|
||||
|
||||
fakeKubeletURL, err := url.Parse(fakeKubeletServer.URL)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fakeKubeletHost, fakeKubeletPortStr, err := net.SplitHostPort(fakeKubeletURL.Host)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
fakeKubeletPort, err := strconv.ParseUint(fakeKubeletPortStr, 10, 32)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
node, err := clientSet.CoreV1().Nodes().Create(ctx, &corev1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "fake"},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
node.Status = corev1.NodeStatus{
|
||||
Addresses: []corev1.NodeAddress{
|
||||
{
|
||||
Type: corev1.NodeExternalIP,
|
||||
Address: fakeKubeletHost,
|
||||
},
|
||||
},
|
||||
DaemonEndpoints: corev1.NodeDaemonEndpoints{
|
||||
KubeletEndpoint: corev1.DaemonEndpoint{
|
||||
Port: int32(fakeKubeletPort),
|
||||
},
|
||||
},
|
||||
}
|
||||
node, err = clientSet.CoreV1().Nodes().UpdateStatus(ctx, node, metav1.UpdateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = clientSet.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "ns"},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = clientSet.CoreV1().ServiceAccounts("ns").Create(ctx, &corev1.ServiceAccount{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "default", Namespace: "ns"},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
falseRef := false
|
||||
pod, err := clientSet.CoreV1().Pods("ns").Create(ctx, &corev1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "test-pod", Namespace: "ns"},
|
||||
Spec: corev1.PodSpec{
|
||||
Containers: []corev1.Container{
|
||||
{
|
||||
Name: "foo",
|
||||
Image: "some/image:latest",
|
||||
},
|
||||
},
|
||||
NodeName: node.Name,
|
||||
AutomountServiceAccountToken: &falseRef,
|
||||
},
|
||||
}, metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return pod
|
||||
}
|
||||
|
||||
func TestPodLogsKubeletClientCertReload(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
origCertCallbackRefreshDuration := transport.CertCallbackRefreshDuration
|
||||
origDialerStopCh := transport.DialerStopCh
|
||||
transport.CertCallbackRefreshDuration = time.Second // make client cert reloading fast
|
||||
transport.DialerStopCh = ctx.Done()
|
||||
t.Cleanup(func() {
|
||||
transport.CertCallbackRefreshDuration = origCertCallbackRefreshDuration
|
||||
transport.DialerStopCh = origDialerStopCh
|
||||
})
|
||||
|
||||
// create a CA to sign the API server's kubelet client cert
|
||||
startingCerts := generateClientCert(t)
|
||||
|
||||
dynamicCAContentFromFile, err := dynamiccertificates.NewDynamicCAContentFromFile("client-ca-bundle", startingCerts.caFile)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := dynamicCAContentFromFile.RunOnce(ctx); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
go dynamicCAContentFromFile.Run(ctx, 1)
|
||||
authenticatorConfig := authenticatorfactory.DelegatingAuthenticatorConfig{
|
||||
ClientCertificateCAContentProvider: dynamicCAContentFromFile,
|
||||
}
|
||||
authenticator, _, err := authenticatorConfig.New()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// this fake kubelet will perform per request authentication using the configured CA (which is dynamically reloaded)
|
||||
fakeKubeletServer := httptest.NewUnstartedServer(
|
||||
filters.WithAuthentication(
|
||||
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
_, _ = w.Write([]byte("pod-logs-here"))
|
||||
}),
|
||||
authenticator,
|
||||
http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||
w.WriteHeader(http.StatusUnauthorized)
|
||||
}),
|
||||
nil,
|
||||
),
|
||||
)
|
||||
fakeKubeletServer.TLS = &tls.Config{ClientAuth: tls.RequestClientCert}
|
||||
fakeKubeletServer.StartTLS()
|
||||
t.Cleanup(fakeKubeletServer.Close)
|
||||
|
||||
kubeletCA := writeDataToTempFile(t, pem.EncodeToMemory(&pem.Block{
|
||||
Type: "CERTIFICATE",
|
||||
Bytes: fakeKubeletServer.Certificate().Raw,
|
||||
}))
|
||||
|
||||
clientSet, _, tearDownFn := framework.StartTestServer(t, framework.TestServerSetup{
|
||||
ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
|
||||
opts.GenericServerRunOptions.MaxRequestBodyBytes = 1024 * 1024
|
||||
opts.KubeletConfig.TLSClientConfig.CAFile = kubeletCA
|
||||
opts.KubeletConfig.TLSClientConfig.CertFile = startingCerts.clientCertFile
|
||||
opts.KubeletConfig.TLSClientConfig.KeyFile = startingCerts.clientCertKeyFile
|
||||
},
|
||||
})
|
||||
t.Cleanup(tearDownFn)
|
||||
|
||||
pod := prepareFakeNodeAndPod(ctx, t, clientSet, fakeKubeletServer)
|
||||
|
||||
// verify that the starting state works as expected
|
||||
podLogs, err := clientSet.CoreV1().Pods("ns").GetLogs(pod.Name, &corev1.PodLogOptions{}).DoRaw(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if l := string(podLogs); l != "pod-logs-here" {
|
||||
t.Fatalf("unexpected pod logs: %s", l)
|
||||
}
|
||||
|
||||
// generate a new CA and overwrite the existing CA that the kubelet is using for request authentication
|
||||
newCerts := generateClientCert(t)
|
||||
if err := os.Rename(newCerts.caFile, startingCerts.caFile); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// wait until the kubelet observes the new CA
|
||||
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
|
||||
_, errLog := clientSet.CoreV1().Pods("ns").GetLogs(pod.Name, &corev1.PodLogOptions{}).DoRaw(ctx)
|
||||
if errors.IsUnauthorized(errLog) {
|
||||
return true, nil
|
||||
}
|
||||
return false, errLog
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// now update the API server's kubelet client cert to use the new cert
|
||||
if err := os.Rename(newCerts.clientCertFile, startingCerts.clientCertFile); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.Rename(newCerts.clientCertKeyFile, startingCerts.clientCertKeyFile); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// confirm that the API server observes the new client cert and closes existing connections to use it
|
||||
if err := wait.PollImmediateUntilWithContext(ctx, time.Second, func(ctx context.Context) (bool, error) {
|
||||
fixedPodLogs, errLog := clientSet.CoreV1().Pods("ns").GetLogs(pod.Name, &corev1.PodLogOptions{}).DoRaw(ctx)
|
||||
if errors.IsUnauthorized(errLog) {
|
||||
t.Log("api server has not observed new client cert")
|
||||
return false, nil
|
||||
}
|
||||
if errLog != nil {
|
||||
return false, errLog
|
||||
}
|
||||
if l := string(fixedPodLogs); l != "pod-logs-here" {
|
||||
return false, fmt.Errorf("unexpected pod logs: %s", l)
|
||||
}
|
||||
return true, nil
|
||||
}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
type testCerts struct {
|
||||
caFile, clientCertFile, clientCertKeyFile string
|
||||
}
|
||||
|
||||
func generateClientCert(t *testing.T) testCerts {
|
||||
t.Helper()
|
||||
|
||||
caPrivateKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
caCert, err := certutil.NewSelfSignedCACert(certutil.Config{CommonName: "test-ca"}, caPrivateKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clientCertKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clientCertKeyBytes, err := keyutil.MarshalPrivateKeyToPEM(clientCertKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
serial, err := rand.Int(rand.Reader, new(big.Int).SetInt64(math.MaxInt64))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
certTmpl := x509.Certificate{
|
||||
Subject: pkix.Name{
|
||||
CommonName: "the-api-server-user",
|
||||
},
|
||||
NotBefore: caCert.NotBefore,
|
||||
SerialNumber: serial,
|
||||
NotAfter: time.Now().Add(time.Hour).UTC(),
|
||||
KeyUsage: x509.KeyUsageKeyEncipherment | x509.KeyUsageDigitalSignature,
|
||||
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
|
||||
}
|
||||
clientCertDERBytes, err := x509.CreateCertificate(rand.Reader, &certTmpl, caCert, clientCertKey.Public(), caPrivateKey)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
clientCert, err := x509.ParseCertificate(clientCertDERBytes)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
return testCerts{
|
||||
caFile: writeDataToTempFile(t, pem.EncodeToMemory(&pem.Block{
|
||||
Type: "CERTIFICATE",
|
||||
Bytes: caCert.Raw,
|
||||
})),
|
||||
clientCertFile: writeDataToTempFile(t, pem.EncodeToMemory(&pem.Block{
|
||||
Type: "CERTIFICATE",
|
||||
Bytes: clientCert.Raw,
|
||||
})),
|
||||
clientCertKeyFile: writeDataToTempFile(t, clientCertKeyBytes),
|
||||
}
|
||||
}
|
||||
|
||||
func writeDataToTempFile(t *testing.T, data []byte) string {
|
||||
t.Helper()
|
||||
|
||||
file, err := os.CreateTemp("", "pod-logs-test-")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if _, err := file.Write(data); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Cleanup(func() {
|
||||
_ = os.Remove(file.Name())
|
||||
})
|
||||
return file.Name()
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user