Test APIService safe handling at startup

This commit is contained in:
Jordan Liggitt 2023-05-18 11:09:03 -04:00
parent 3be3997193
commit e4102d5e30
No known key found for this signature in database
2 changed files with 205 additions and 27 deletions

View File

@ -62,6 +62,9 @@ type TearDownFunc func()
// TestServerInstanceOptions Instance options the TestServer // TestServerInstanceOptions Instance options the TestServer
type TestServerInstanceOptions struct { type TestServerInstanceOptions struct {
// SkipHealthzCheck returns without waiting for the server to become healthy.
// Useful for testing server configurations expected to prevent /healthz from completing.
SkipHealthzCheck bool
// Enable cert-auth for the kube-apiserver // Enable cert-auth for the kube-apiserver
EnableCertAuth bool EnableCertAuth bool
// Wrap the storage version interface of the created server's generic server. // Wrap the storage version interface of the created server's generic server.
@ -262,40 +265,42 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
} }
}(stopCh) }(stopCh)
t.Logf("Waiting for /healthz to be ok...")
client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig) client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
if err != nil { if err != nil {
return result, fmt.Errorf("failed to create a client: %v", err) return result, fmt.Errorf("failed to create a client: %v", err)
} }
// wait until healthz endpoint returns ok if !instanceOptions.SkipHealthzCheck {
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) { t.Logf("Waiting for /healthz to be ok...")
select {
case err := <-errCh:
return false, err
default:
}
req := client.CoreV1().RESTClient().Get().AbsPath("/healthz") // wait until healthz endpoint returns ok
// The storage version bootstrap test wraps the storage version post-start err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
// hook, so the hook won't become health when the server bootstraps select {
if instanceOptions.StorageVersionWrapFunc != nil { case err := <-errCh:
// We hardcode the param instead of having a new instanceOptions field return false, err
// to avoid confusing users with more options. default:
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName) }
req.Param("exclude", storageVersionCheck)
req := client.CoreV1().RESTClient().Get().AbsPath("/healthz")
// The storage version bootstrap test wraps the storage version post-start
// hook, so the hook won't become health when the server bootstraps
if instanceOptions.StorageVersionWrapFunc != nil {
// We hardcode the param instead of having a new instanceOptions field
// to avoid confusing users with more options.
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
req.Param("exclude", storageVersionCheck)
}
result := req.Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
} }
result := req.Do(context.TODO())
status := 0
result.StatusCode(&status)
if status == 200 {
return true, nil
}
return false, nil
})
if err != nil {
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
} }
// wait until default namespace is created // wait until default namespace is created

View File

@ -35,6 +35,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/dynamiccertificates" "k8s.io/apiserver/pkg/server/dynamiccertificates"
@ -48,6 +49,7 @@ import (
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/cmd/kube-apiserver/app" "k8s.io/kubernetes/cmd/kube-apiserver/app"
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
"k8s.io/kubernetes/test/integration"
"k8s.io/kubernetes/test/integration/framework" "k8s.io/kubernetes/test/integration/framework"
wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1" wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1" wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1"
@ -56,6 +58,177 @@ import (
netutils "k8s.io/utils/net" netutils "k8s.io/utils/net"
) )
func TestAPIServiceWaitOnStart(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel)
stopCh := make(chan struct{})
defer close(stopCh)
etcdConfig := framework.SharedEtcd()
etcd3Client, _, err := integration.GetEtcdClients(etcdConfig.Transport)
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { etcd3Client.Close() })
// Pollute CRD path in etcd so CRD lists cannot succeed and the informer cannot sync
bogusCRDEtcdPath := path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/bogus")
if _, err := etcd3Client.KV.Put(ctx, bogusCRDEtcdPath, `bogus data`); err != nil {
t.Fatal(err)
}
// Populate a valid CRD and managed APIService in etcd
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/widgets.valid.example.com"),
`{
"apiVersion":"apiextensions.k8s.io/v1beta1",
"kind":"CustomResourceDefinition",
"metadata":{
"name":"widgets.valid.example.com",
"uid":"mycrd",
"creationTimestamp": "2022-06-08T23:46:32Z"
},
"spec":{
"scope": "Namespaced",
"group":"valid.example.com",
"version":"v1",
"names":{
"kind": "Widget",
"listKind": "WidgetList",
"plural": "widgets",
"singular": "widget"
}
},
"status": {
"acceptedNames": {
"kind": "Widget",
"listKind": "WidgetList",
"plural": "widgets",
"singular": "widget"
},
"conditions": [
{
"lastTransitionTime": "2023-05-18T15:03:57Z",
"message": "no conflicts found",
"reason": "NoConflicts",
"status": "True",
"type": "NamesAccepted"
},
{
"lastTransitionTime": "2023-05-18T15:03:57Z",
"message": "the initial names have been accepted",
"reason": "InitialNamesAccepted",
"status": "True",
"type": "Established"
}
],
"storedVersions": [
"v1"
]
}
}`); err != nil {
t.Fatal(err)
}
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.valid.example.com"),
`{
"apiVersion":"apiregistration.k8s.io/v1",
"kind":"APIService",
"metadata": {
"name": "v1.valid.example.com",
"uid":"foo",
"creationTimestamp": "2022-06-08T23:46:32Z",
"labels":{"kube-aggregator.kubernetes.io/automanaged":"true"}
},
"spec": {
"group": "valid.example.com",
"version": "v1",
"groupPriorityMinimum":100,
"versionPriority":10
}
}`,
); err != nil {
t.Fatal(err)
}
// Populate a stale managed APIService in etcd
if _, err := etcd3Client.KV.Put(
ctx,
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.stale.example.com"),
`{
"apiVersion":"apiregistration.k8s.io/v1",
"kind":"APIService",
"metadata": {
"name": "v1.stale.example.com",
"uid":"foo",
"creationTimestamp": "2022-06-08T23:46:32Z",
"labels":{"kube-aggregator.kubernetes.io/automanaged":"true"}
},
"spec": {
"group": "stale.example.com",
"version": "v1",
"groupPriorityMinimum":100,
"versionPriority":10
}
}`,
); err != nil {
t.Fatal(err)
}
// Start server
options := kastesting.NewDefaultTestServerOptions()
options.SkipHealthzCheck = true
testServer := kastesting.StartTestServerOrDie(t, options, nil, etcdConfig)
defer testServer.TearDownFn()
kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)
// ensure both APIService objects remain
for i := 0; i < 10; i++ {
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
}
// Clear the bogus CRD data so the informer can sync
if _, err := etcd3Client.KV.Delete(ctx, bogusCRDEtcdPath); err != nil {
t.Fatal(err)
}
t.Log("cleaned up bogus CRD data")
// ensure the stale APIService object is cleaned up
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
_, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{})
if err == nil {
t.Log("stale APIService still exists, waiting...")
return false, nil
}
if !apierrors.IsNotFound(err) {
return false, err
}
return true, nil
}); err != nil {
t.Fatal(err)
}
// ensure the valid APIService object remains
for i := 0; i < 5; i++ {
time.Sleep(time.Second)
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
t.Fatal(err)
}
}
}
func TestAggregatedAPIServer(t *testing.T) { func TestAggregatedAPIServer(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
t.Cleanup(cancel) t.Cleanup(cancel)