From c1aef65640b8508c0ac93709007890f156507ee4 Mon Sep 17 00:00:00 2001 From: Richa Banker Date: Tue, 18 Jul 2023 17:30:09 -0700 Subject: [PATCH] Add integration test --- cmd/kube-apiserver/app/testing/testserver.go | 57 +++- .../apiserver/peerproxy/main_test.go | 27 ++ .../apiserver/peerproxy/peer_proxy_test.go | 244 ++++++++++++++++++ 3 files changed, 320 insertions(+), 8 deletions(-) create mode 100644 test/integration/apiserver/peerproxy/main_test.go create mode 100644 test/integration/apiserver/peerproxy/peer_proxy_test.go diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index f6778d15f95..4c032718c95 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -18,6 +18,7 @@ package testing import ( "context" + "crypto/rsa" "crypto/x509" "fmt" "net" @@ -38,12 +39,15 @@ import ( serveroptions "k8s.io/apiserver/pkg/server/options" "k8s.io/apiserver/pkg/storage/storagebackend" "k8s.io/apiserver/pkg/storageversion" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" + clientgotransport "k8s.io/client-go/transport" "k8s.io/client-go/util/cert" logsapi "k8s.io/component-base/logs/api/v1" "k8s.io/klog/v2" "k8s.io/kube-aggregator/pkg/apiserver" + "k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" @@ -77,6 +81,14 @@ type TestServerInstanceOptions struct { EnableCertAuth bool // Wrap the storage version interface of the created server's generic server. StorageVersionWrapFunc func(storageversion.Manager) storageversion.Manager + // CA file used for requestheader authn during communication between: + // 1. kube-apiserver and peer when the local apiserver is not able to serve the request due + // to version skew + // 2. kube-apiserver and aggregated apiserver + + // We specify this as on option to pass a common proxyCA to multiple apiservers to simulate + // an apiserver version skew scenario where all apiservers use the same proxyCA to verify client connections. + ProxyCA *ProxyCA } // TestServer return values supplied by kube-test-ApiServer @@ -95,6 +107,16 @@ type Logger interface { Errorf(format string, args ...interface{}) Fatalf(format string, args ...interface{}) Logf(format string, args ...interface{}) + Cleanup(func()) +} + +// ProxyCA contains the certificate authority certificate and key which is used to verify client connections +// to kube-apiservers. The clients can be : +// 1. aggregated apiservers +// 2. peer kube-apiservers +type ProxyCA struct { + ProxySigningCert *x509.Certificate + ProxySigningKey *rsa.PrivateKey } // NewDefaultTestServerOptions Default options for TestServer instances @@ -161,14 +183,24 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo reqHeaders := serveroptions.NewDelegatingAuthenticationOptions() s.Authentication.RequestHeader = &reqHeaders.RequestHeader - // create certificates for aggregation and client-cert auth - proxySigningKey, err := testutil.NewPrivateKey() - if err != nil { - return result, err - } - proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey) - if err != nil { - return result, err + var proxySigningKey *rsa.PrivateKey + var proxySigningCert *x509.Certificate + + if instanceOptions.ProxyCA != nil { + // use provided proxyCA + proxySigningKey = instanceOptions.ProxyCA.ProxySigningKey + proxySigningCert = instanceOptions.ProxyCA.ProxySigningCert + + } else { + // create certificates for aggregation and client-cert auth + proxySigningKey, err = testutil.NewPrivateKey() + if err != nil { + return result, err + } + proxySigningCert, err = cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey) + if err != nil { + return result, err + } } proxyCACertFile := filepath.Join(s.SecureServing.ServerCert.CertDirectory, "proxy-ca.crt") if err := os.WriteFile(proxyCACertFile, testutil.EncodeCertPEM(proxySigningCert), 0644); err != nil { @@ -213,6 +245,15 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo return result, err } s.Authentication.ClientCert.ClientCA = clientCACertFile + if utilfeature.DefaultFeatureGate.Enabled(features.UnknownVersionInteroperabilityProxy) { + // TODO: set up a general clean up for testserver + if clientgotransport.DialerStopCh == wait.NeverStop { + ctx, cancel := context.WithTimeout(context.Background(), time.Hour) + t.Cleanup(cancel) + clientgotransport.DialerStopCh = ctx.Done() + } + s.PeerCAFile = filepath.Join(s.SecureServing.ServerCert.CertDirectory, s.SecureServing.ServerCert.PairName+".crt") + } } s.SecureServing.ExternalAddress = s.SecureServing.Listener.Addr().(*net.TCPAddr).IP // use listener addr although it is a loopback device diff --git a/test/integration/apiserver/peerproxy/main_test.go b/test/integration/apiserver/peerproxy/main_test.go new file mode 100644 index 00000000000..72bf12eeccc --- /dev/null +++ b/test/integration/apiserver/peerproxy/main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} diff --git a/test/integration/apiserver/peerproxy/peer_proxy_test.go b/test/integration/apiserver/peerproxy/peer_proxy_test.go new file mode 100644 index 00000000000..9dd66340bac --- /dev/null +++ b/test/integration/apiserver/peerproxy/peer_proxy_test.go @@ -0,0 +1,244 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package peerproxy + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/server" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/transport" + "k8s.io/client-go/util/cert" + featuregatetesting "k8s.io/component-base/featuregate/testing" + "k8s.io/klog/v2" + kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controller/storageversiongc" + "k8s.io/kubernetes/pkg/controlplane" + kubefeatures "k8s.io/kubernetes/pkg/features" + + "k8s.io/kubernetes/test/integration/framework" + testutil "k8s.io/kubernetes/test/utils" + "k8s.io/kubernetes/test/utils/ktesting" +) + +func TestPeerProxiedRequest(t *testing.T) { + + ktesting.SetDefaultVerbosity(1) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + t.Cleanup(cancel) + + // ensure to stop cert reloading after shutdown + transport.DialerStopCh = ctx.Done() + + // enable feature flags + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)() + + // create sharedetcd + etcd := framework.SharedEtcd() + + // create certificates for aggregation and client-cert auth + proxyCA, err := createProxyCertContent() + require.NoError(t, err) + + // start test server with all APIs enabled + // override hostname to ensure unique ips + server.SetHostnameFuncForTests("test-server-a") + serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{ + EnableCertAuth: true, + ProxyCA: &proxyCA}, + []string{}, etcd) + defer serverA.TearDownFn() + + // start another test server with some api disabled + // override hostname to ensure unique ips + server.SetHostnameFuncForTests("test-server-b") + serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{ + EnableCertAuth: true, + ProxyCA: &proxyCA}, + []string{fmt.Sprintf("--runtime-config=%s", "batch/v1=false")}, etcd) + defer serverB.TearDownFn() + + kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig) + require.NoError(t, err) + + kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig) + require.NoError(t, err) + + // create jobs resource using serverA + job := createJobResource() + _, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{}) + require.NoError(t, err) + + klog.Infof("\nServerA has created jobs\n") + + // List jobs using ServerB + // This request should be proxied to ServerA since ServerB does not have batch API enabled + jobsB, err := kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{}) + klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items)) + require.NoError(t, err) + assert.NotEmpty(t, jobsB) + assert.Equal(t, job.Name, jobsB.Items[0].Name) +} + +func TestPeerProxiedRequestToThirdServerAfterFirstDies(t *testing.T) { + + ktesting.SetDefaultVerbosity(1) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + t.Cleanup(cancel) + + // ensure to stop cert reloading after shutdown + transport.DialerStopCh = ctx.Done() + + // enable feature flags + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, kubefeatures.UnknownVersionInteroperabilityProxy, true)() + + // create sharedetcd + etcd := framework.SharedEtcd() + + // create certificates for aggregation and client-cert auth + proxyCA, err := createProxyCertContent() + require.NoError(t, err) + + // set lease duration to 1s for serverA to ensure that storageversions for serverA are updated + // once it is shutdown + controlplane.IdentityLeaseDurationSeconds = 10 + controlplane.IdentityLeaseGCPeriod = time.Second + controlplane.IdentityLeaseRenewIntervalPeriod = 10 * time.Second + + // start serverA with all APIs enabled + // override hostname to ensure unique ips + server.SetHostnameFuncForTests("test-server-a") + serverA := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd) + kubeClientSetA, err := kubernetes.NewForConfig(serverA.ClientConfig) + require.NoError(t, err) + // ensure storageversion garbage collector ctlr is set up + informersA := informers.NewSharedInformerFactory(kubeClientSetA, time.Second) + setupStorageVersionGC(ctx, kubeClientSetA, informersA) + // reset lease duration to default value for serverB and serverC since we will not be + // shutting these down + controlplane.IdentityLeaseDurationSeconds = 3600 + + // start serverB with some api disabled + // override hostname to ensure unique ips + server.SetHostnameFuncForTests("test-server-b") + serverB := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{ + fmt.Sprintf("--runtime-config=%v", "batch/v1=false")}, etcd) + defer serverB.TearDownFn() + kubeClientSetB, err := kubernetes.NewForConfig(serverB.ClientConfig) + require.NoError(t, err) + // ensure storageversion garbage collector ctlr is set up + informersB := informers.NewSharedInformerFactory(kubeClientSetB, time.Second) + setupStorageVersionGC(ctx, kubeClientSetB, informersB) + + // start serverC with all APIs enabled + // override hostname to ensure unique ips + server.SetHostnameFuncForTests("test-server-c") + serverC := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, ProxyCA: &proxyCA}, []string{}, etcd) + defer serverC.TearDownFn() + + // create jobs resource using serverA + job := createJobResource() + _, err = kubeClientSetA.BatchV1().Jobs("default").Create(context.Background(), job, metav1.CreateOptions{}) + require.NoError(t, err) + klog.Infof("\nServerA has created jobs\n") + + // shutdown serverA + serverA.TearDownFn() + + var jobsB *v1.JobList + // list jobs using ServerB which it should proxy to ServerC and get back valid response + err = wait.PollImmediate(1*time.Second, 1*time.Minute, func() (bool, error) { + jobsB, err = kubeClientSetB.BatchV1().Jobs("default").List(context.Background(), metav1.ListOptions{}) + if err != nil { + return false, nil + } + if jobsB != nil { + return true, nil + } + return false, nil + }) + klog.Infof("\nServerB has retrieved jobs list of length %v \n\n", len(jobsB.Items)) + require.NoError(t, err) + assert.NotEmpty(t, jobsB) + assert.Equal(t, job.Name, jobsB.Items[0].Name) +} + +func setupStorageVersionGC(ctx context.Context, kubeClientSet *kubernetes.Clientset, informers informers.SharedInformerFactory) { + leaseInformer := informers.Coordination().V1().Leases() + storageVersionInformer := informers.Internal().V1alpha1().StorageVersions() + go leaseInformer.Informer().Run(ctx.Done()) + go storageVersionInformer.Informer().Run(ctx.Done()) + + controller := storageversiongc.NewStorageVersionGC(ctx, kubeClientSet, leaseInformer, storageVersionInformer) + go controller.Run(ctx) +} + +func createProxyCertContent() (kastesting.ProxyCA, error) { + result := kastesting.ProxyCA{} + proxySigningKey, err := testutil.NewPrivateKey() + if err != nil { + return result, err + } + proxySigningCert, err := cert.NewSelfSignedCACert(cert.Config{CommonName: "front-proxy-ca"}, proxySigningKey) + if err != nil { + return result, err + } + + result = kastesting.ProxyCA{ + ProxySigningCert: proxySigningCert, + ProxySigningKey: proxySigningKey, + } + return result, nil +} + +func createJobResource() *v1.Job { + return &v1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-job", + Namespace: "default", + }, + Spec: v1.JobSpec{ + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "test", + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + } +}