diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 5258d2a41f8..2babadc747c 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/apiserver/pkg/storageversion" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/util/cert" @@ -49,9 +50,10 @@ type TearDownFunc func() type TestServerInstanceOptions struct { // DisableStorageCleanup Disable the automatic storage cleanup DisableStorageCleanup bool - // Enable cert-auth for the kube-apiserver EnableCertAuth bool + // Wrap the storage version interface of the created server's generic server. + StorageVersionWrapFunc func(storageversion.Manager) storageversion.Manager } // TestServer return values supplied by kube-test-ApiServer @@ -182,6 +184,9 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig) t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort) server, err := app.CreateServerChain(completedOptions, stopCh) + if instanceOptions.StorageVersionWrapFunc != nil { + server.GenericAPIServer.StorageVersionManager = instanceOptions.StorageVersionWrapFunc(server.GenericAPIServer.StorageVersionManager) + } if err != nil { return result, fmt.Errorf("failed to create server chain: %v", err) } @@ -196,51 +201,54 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo } }(stopCh) - t.Logf("Waiting for /healthz to be ok...") + // skip healthz check when we test the storage version manager poststart hook + if instanceOptions.StorageVersionWrapFunc == nil { + t.Logf("Waiting for /healthz to be ok...") - client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig) - if err != nil { - return result, fmt.Errorf("failed to create a client: %v", err) - } - - // wait until healthz endpoint returns ok - err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) { - select { - case err := <-errCh: - return false, err - default: + client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig) + if err != nil { + return result, fmt.Errorf("failed to create a client: %v", err) } - result := client.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()) - status := 0 - result.StatusCode(&status) - if status == 200 { - return true, nil - } - return false, nil - }) - if err != nil { - return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err) - } + // wait until healthz endpoint returns ok + err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) { + select { + case err := <-errCh: + return false, err + default: + } - // wait until default namespace is created - err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) { - select { - case err := <-errCh: - return false, err - default: - } - - if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil { - if !errors.IsNotFound(err) { - t.Logf("Unable to get default namespace: %v", err) + result := client.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()) + status := 0 + result.StatusCode(&status) + if status == 200 { + return true, nil } return false, nil + }) + if err != nil { + return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err) + } + + // wait until default namespace is created + err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) { + select { + case err := <-errCh: + return false, err + default: + } + + if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil { + if !errors.IsNotFound(err) { + t.Logf("Unable to get default namespace: %v", err) + } + return false, nil + } + return true, nil + }) + if err != nil { + return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err) } - return true, nil - }) - if err != nil { - return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err) } // from here the caller must call tearDown diff --git a/test/integration/storageversion/storage_version_filter_test.go b/test/integration/storageversion/storage_version_filter_test.go new file mode 100644 index 00000000000..1bc4c64ee29 --- /dev/null +++ b/test/integration/storageversion/storage_version_filter_test.go @@ -0,0 +1,255 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "context" + "testing" + "time" + + "k8s.io/api/apiserverinternal/v1alpha1" + authenticationv1 "k8s.io/api/authentication/v1" + v1 "k8s.io/api/core/v1" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/storageversion" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/dynamic" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" + apiregistrationv1beta1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" + aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/test/integration/etcd" + "k8s.io/kubernetes/test/integration/framework" +) + +type wrappedStorageVersionManager struct { + storageversion.Manager + startUpdateSV <-chan struct{} + updateFinished chan<- struct{} + finishUpdateSV <-chan struct{} + completed <-chan struct{} +} + +func (w *wrappedStorageVersionManager) UpdateStorageVersions(loopbackClientConfig *rest.Config, serverID string) { + <-w.startUpdateSV + w.Manager.UpdateStorageVersions(loopbackClientConfig, serverID) + close(w.updateFinished) + <-w.finishUpdateSV +} + +func (w *wrappedStorageVersionManager) Completed() bool { + select { + case <-w.completed: + return true + default: + return false + } +} + +func assertBlocking(name string, t *testing.T, err error, shouldBlock bool) { + if shouldBlock { + if err == nil || !errors.IsServiceUnavailable(err) { + t.Fatalf("%q should be rejected with service unavailable error, got %v", name, err) + } + } else { + if err != nil { + t.Fatalf("%q should be allowed, got %v", name, err) + } + } +} + +func testBuiltinResourceWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + client := clientset.NewForConfigOrDie(cfg) + _, err := client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "test"}}, metav1.CreateOptions{}) + assertBlocking("writes to built in resources", t, err, shouldBlock) +} + +func testCRDWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + crdClient := apiextensionsclientset.NewForConfigOrDie(cfg) + _, err := crdClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(context.TODO(), etcd.GetCustomResourceDefinitionData()[1], metav1.CreateOptions{}) + assertBlocking("writes to CRD", t, err, shouldBlock) +} + +func testAPIServiceWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + aggregatorClient := aggregatorclient.NewForConfigOrDie(cfg) + _, err := aggregatorClient.ApiregistrationV1beta1().APIServices().Create(context.TODO(), &apiregistrationv1beta1.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"}, + Spec: apiregistrationv1beta1.APIServiceSpec{ + Service: &apiregistrationv1beta1.ServiceReference{ + Namespace: "kube-wardle", + Name: "api", + }, + Group: "wardle.example.com", + Version: "v1alpha1", + GroupPriorityMinimum: 200, + VersionPriority: 200, + }, + }, metav1.CreateOptions{}) + assertBlocking("writes to APIServices", t, err, shouldBlock) +} + +func testCRWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + dynamicClient := dynamic.NewForConfigOrDie(cfg) + crclient := dynamicClient.Resource(schema.GroupVersionResource{Group: "cr.bar.com", Version: "v1", Resource: "foos"}).Namespace("default") + _, err := crclient.Create(context.TODO(), &unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"generateName": "test-"}}}, metav1.CreateOptions{}) + assertBlocking("writes to CR", t, err, shouldBlock) +} + +func testStorageVersionWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + apiserverClient := clientset.NewForConfigOrDie(cfg) + _, err := apiserverClient.InternalV1alpha1().StorageVersions().Create(context.TODO(), &v1alpha1.StorageVersion{ObjectMeta: metav1.ObjectMeta{GenerateName: "test.resource"}}, metav1.CreateOptions{}) + assertBlocking("writes to Storage Version", t, err, shouldBlock) +} + +func testNonPersistedWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + client := clientset.NewForConfigOrDie(cfg) + _, err := client.AuthenticationV1().TokenReviews().Create(context.TODO(), &authenticationv1.TokenReview{ + Spec: authenticationv1.TokenReviewSpec{ + Token: "some token", + }, + }, metav1.CreateOptions{}) + assertBlocking("non-persisted write", t, err, shouldBlock) +} + +func testBuiltinResourceRead(t *testing.T, cfg *rest.Config, shouldBlock bool) { + client := clientset.NewForConfigOrDie(cfg) + _, err := client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + assertBlocking("reads of built-in resources", t, err, shouldBlock) +} + +// TestStorageVersionBootstrap ensures that before the StorageVersions are +// updated, only the following the request are accepted by the apiserver: +// 1. read requests +// 2. non-persisting write requests +// 3. write requests to the storageversion API +// 4. requests to CR or aggregated API +func TestStorageVersionBootstrap(t *testing.T) { + // Start server and create CRD + etcdConfig := framework.SharedEtcd() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, etcdConfig) + etcd.CreateTestCRDs(t, apiextensionsclientset.NewForConfigOrDie(server.ClientConfig), false, etcd.GetCustomResourceDefinitionData()[0]) + server.TearDownFn() + + startUpdateSV := make(chan struct{}) + finishUpdateSV := make(chan struct{}) + updateFinished := make(chan struct{}) + completed := make(chan struct{}) + wrapperFunc := func(delegate storageversion.Manager) storageversion.Manager { + return &wrappedStorageVersionManager{ + startUpdateSV: startUpdateSV, + finishUpdateSV: finishUpdateSV, + updateFinished: updateFinished, + completed: completed, + Manager: delegate, + } + } + // Restart api server, enable the storage version API and the feature gates. + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + server = kubeapiservertesting.StartTestServerOrDie(t, + &kubeapiservertesting.TestServerInstanceOptions{ + StorageVersionWrapFunc: wrapperFunc, + }, + []string{ + // force enable all resources to ensure that the storage updates can handle cross group resources. + // TODO: drop these once we stop allowing them to be served. + "--runtime-config=api/all=true,extensions/v1beta1/deployments=true,extensions/v1beta1/daemonsets=true,extensions/v1beta1/replicasets=true,extensions/v1beta1/podsecuritypolicies=true,extensions/v1beta1/networkpolicies=true,internal.apiserver.k8s.io/v1alpha1=true", + }, + etcdConfig) + defer server.TearDownFn() + + cfg := rest.CopyConfig(server.ClientConfig) + + t.Run("before storage version update", func(t *testing.T) { + t.Run("write to k8s native API object should be blocked", func(t *testing.T) { + testBuiltinResourceWrite(t, cfg, true) + }) + t.Run("write to CRD should be blocked", func(t *testing.T) { + testCRDWrite(t, cfg, true) + }) + t.Run("write to APIService should be blocked", func(t *testing.T) { + testAPIServiceWrite(t, cfg, true) + }) + t.Run("write to CR should pass", func(t *testing.T) { + testCRWrite(t, cfg, false) + }) + t.Run("write to the storage version API should pass", func(t *testing.T) { + testStorageVersionWrite(t, cfg, false) + }) + t.Run("write to non-persisted API should pass", func(t *testing.T) { + testNonPersistedWrite(t, cfg, false) + }) + t.Run("read of k8s native API should pass", func(t *testing.T) { + testBuiltinResourceRead(t, cfg, false) + }) + // TODO: Write to aggregated API should pass. + }) + + // After the storage versions are updated, even though the + // StorageVersionManager.Complete() still returns false, the filter + // should not block any request. + close(startUpdateSV) + <-updateFinished + t.Run("after storage version update", func(t *testing.T) { + t.Run("write to k8s native API object should pass", func(t *testing.T) { + testBuiltinResourceWrite(t, cfg, false) + }) + t.Run("write to CRD should pass", func(t *testing.T) { + testCRDWrite(t, cfg, false) + }) + t.Run("write to APIService should pass", func(t *testing.T) { + testAPIServiceWrite(t, cfg, false) + }) + t.Run("write to the storage version API should pass", func(t *testing.T) { + testStorageVersionWrite(t, cfg, false) + }) + t.Run("write to non-persisted API should pass", func(t *testing.T) { + testNonPersistedWrite(t, cfg, false) + }) + t.Run("read of k8s native API should pass", func(t *testing.T) { + testBuiltinResourceRead(t, cfg, false) + }) + }) + + // After the StorageVersionManager.Complete() returns true, the server should become healthy. + close(completed) + close(finishUpdateSV) + t.Run("after storage version manager complete", func(t *testing.T) { + // wait until healthz endpoint returns ok + client := clientset.NewForConfigOrDie(cfg) + err := wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) { + result := client.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()) + status := 0 + result.StatusCode(&status) + if status == 200 { + return true, nil + } + return false, nil + }) + if err != nil { + t.Errorf("failed to wait for /healthz to return ok: %v", err) + } + }) +} diff --git a/test/integration/storageversion/storage_version_main_test.go b/test/integration/storageversion/storage_version_main_test.go new file mode 100644 index 00000000000..4aca4618db7 --- /dev/null +++ b/test/integration/storageversion/storage_version_main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +}