mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-17 23:57:49 +00:00
Merge pull request #118104 from liggitt/crd-sync
Fix waiting for CRD sync at server start
This commit is contained in:
commit
7ad8303b96
@ -27,7 +27,6 @@ import (
|
|||||||
|
|
||||||
"k8s.io/klog/v2"
|
"k8s.io/klog/v2"
|
||||||
|
|
||||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
|
||||||
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
|
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/externalversions"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
@ -117,7 +116,7 @@ func createAggregatorConfig(
|
|||||||
return aggregatorConfig, nil
|
return aggregatorConfig, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory) (*aggregatorapiserver.APIAggregator, error) {
|
func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delegateAPIServer genericapiserver.DelegationTarget, apiExtensionInformers apiextensionsinformers.SharedInformerFactory, crdAPIEnabled bool) (*aggregatorapiserver.APIAggregator, error) {
|
||||||
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
|
aggregatorServer, err := aggregatorConfig.Complete().NewWithDelegate(delegateAPIServer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -147,8 +146,12 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
|
|||||||
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
|
// let the CRD controller process the initial set of CRDs before starting the autoregistration controller.
|
||||||
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
|
// this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist.
|
||||||
// we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery.
|
// we only need to do this if CRDs are enabled on this server. We can't use discovery because we are the source for discovery.
|
||||||
if aggregatorConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions")) {
|
if crdAPIEnabled {
|
||||||
|
klog.Infof("waiting for initial CRD sync...")
|
||||||
crdRegistrationController.WaitForInitialSync()
|
crdRegistrationController.WaitForInitialSync()
|
||||||
|
klog.Infof("initial CRD sync complete...")
|
||||||
|
} else {
|
||||||
|
klog.Infof("CRD API not enabled, starting APIService registration without waiting for initial CRD sync")
|
||||||
}
|
}
|
||||||
autoRegistrationController.Run(5, context.StopCh)
|
autoRegistrationController.Run(5, context.StopCh)
|
||||||
}()
|
}()
|
||||||
|
@ -34,6 +34,7 @@ import (
|
|||||||
|
|
||||||
oteltrace "go.opentelemetry.io/otel/trace"
|
oteltrace "go.opentelemetry.io/otel/trace"
|
||||||
|
|
||||||
|
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||||
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
|
extensionsapiserver "k8s.io/apiextensions-apiserver/pkg/apiserver"
|
||||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||||
@ -193,6 +194,7 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
crdAPIEnabled := apiExtensionsConfig.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
|
||||||
|
|
||||||
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
|
notFoundHandler := notfoundhandler.New(kubeAPIServerConfig.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
|
||||||
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
|
apiExtensionsServer, err := createAPIExtensionsServer(apiExtensionsConfig, genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
|
||||||
@ -210,7 +212,7 @@ func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatora
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers)
|
aggregatorServer, err := createAggregatorServer(aggregatorConfig, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
|
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -62,6 +62,9 @@ type TearDownFunc func()
|
|||||||
|
|
||||||
// TestServerInstanceOptions Instance options the TestServer
|
// TestServerInstanceOptions Instance options the TestServer
|
||||||
type TestServerInstanceOptions struct {
|
type TestServerInstanceOptions struct {
|
||||||
|
// SkipHealthzCheck returns without waiting for the server to become healthy.
|
||||||
|
// Useful for testing server configurations expected to prevent /healthz from completing.
|
||||||
|
SkipHealthzCheck bool
|
||||||
// Enable cert-auth for the kube-apiserver
|
// Enable cert-auth for the kube-apiserver
|
||||||
EnableCertAuth bool
|
EnableCertAuth bool
|
||||||
// Wrap the storage version interface of the created server's generic server.
|
// Wrap the storage version interface of the created server's generic server.
|
||||||
@ -262,40 +265,42 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo
|
|||||||
}
|
}
|
||||||
}(stopCh)
|
}(stopCh)
|
||||||
|
|
||||||
t.Logf("Waiting for /healthz to be ok...")
|
|
||||||
|
|
||||||
client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
|
client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, fmt.Errorf("failed to create a client: %v", err)
|
return result, fmt.Errorf("failed to create a client: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait until healthz endpoint returns ok
|
if !instanceOptions.SkipHealthzCheck {
|
||||||
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
|
t.Logf("Waiting for /healthz to be ok...")
|
||||||
select {
|
|
||||||
case err := <-errCh:
|
|
||||||
return false, err
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
req := client.CoreV1().RESTClient().Get().AbsPath("/healthz")
|
// wait until healthz endpoint returns ok
|
||||||
// The storage version bootstrap test wraps the storage version post-start
|
err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) {
|
||||||
// hook, so the hook won't become health when the server bootstraps
|
select {
|
||||||
if instanceOptions.StorageVersionWrapFunc != nil {
|
case err := <-errCh:
|
||||||
// We hardcode the param instead of having a new instanceOptions field
|
return false, err
|
||||||
// to avoid confusing users with more options.
|
default:
|
||||||
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
|
}
|
||||||
req.Param("exclude", storageVersionCheck)
|
|
||||||
|
req := client.CoreV1().RESTClient().Get().AbsPath("/healthz")
|
||||||
|
// The storage version bootstrap test wraps the storage version post-start
|
||||||
|
// hook, so the hook won't become health when the server bootstraps
|
||||||
|
if instanceOptions.StorageVersionWrapFunc != nil {
|
||||||
|
// We hardcode the param instead of having a new instanceOptions field
|
||||||
|
// to avoid confusing users with more options.
|
||||||
|
storageVersionCheck := fmt.Sprintf("poststarthook/%s", apiserver.StorageVersionPostStartHookName)
|
||||||
|
req.Param("exclude", storageVersionCheck)
|
||||||
|
}
|
||||||
|
result := req.Do(context.TODO())
|
||||||
|
status := 0
|
||||||
|
result.StatusCode(&status)
|
||||||
|
if status == 200 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
|
||||||
}
|
}
|
||||||
result := req.Do(context.TODO())
|
|
||||||
status := 0
|
|
||||||
result.StatusCode(&status)
|
|
||||||
if status == 200 {
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
return false, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait until default namespace is created
|
// wait until default namespace is created
|
||||||
|
@ -35,6 +35,7 @@ import (
|
|||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
"k8s.io/apiserver/pkg/server/dynamiccertificates"
|
||||||
@ -48,6 +49,7 @@ import (
|
|||||||
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
|
aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset"
|
||||||
"k8s.io/kubernetes/cmd/kube-apiserver/app"
|
"k8s.io/kubernetes/cmd/kube-apiserver/app"
|
||||||
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
kastesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
|
||||||
|
"k8s.io/kubernetes/test/integration"
|
||||||
"k8s.io/kubernetes/test/integration/framework"
|
"k8s.io/kubernetes/test/integration/framework"
|
||||||
wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
|
wardlev1alpha1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1alpha1"
|
||||||
wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1"
|
wardlev1beta1 "k8s.io/sample-apiserver/pkg/apis/wardle/v1beta1"
|
||||||
@ -56,6 +58,177 @@ import (
|
|||||||
netutils "k8s.io/utils/net"
|
netutils "k8s.io/utils/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func TestAPIServiceWaitOnStart(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||||
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
|
stopCh := make(chan struct{})
|
||||||
|
defer close(stopCh)
|
||||||
|
|
||||||
|
etcdConfig := framework.SharedEtcd()
|
||||||
|
|
||||||
|
etcd3Client, _, err := integration.GetEtcdClients(etcdConfig.Transport)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Cleanup(func() { etcd3Client.Close() })
|
||||||
|
|
||||||
|
// Pollute CRD path in etcd so CRD lists cannot succeed and the informer cannot sync
|
||||||
|
bogusCRDEtcdPath := path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/bogus")
|
||||||
|
if _, err := etcd3Client.KV.Put(ctx, bogusCRDEtcdPath, `bogus data`); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate a valid CRD and managed APIService in etcd
|
||||||
|
if _, err := etcd3Client.KV.Put(
|
||||||
|
ctx,
|
||||||
|
path.Join("/", etcdConfig.Prefix, "apiextensions.k8s.io/customresourcedefinitions/widgets.valid.example.com"),
|
||||||
|
`{
|
||||||
|
"apiVersion":"apiextensions.k8s.io/v1beta1",
|
||||||
|
"kind":"CustomResourceDefinition",
|
||||||
|
"metadata":{
|
||||||
|
"name":"widgets.valid.example.com",
|
||||||
|
"uid":"mycrd",
|
||||||
|
"creationTimestamp": "2022-06-08T23:46:32Z"
|
||||||
|
},
|
||||||
|
"spec":{
|
||||||
|
"scope": "Namespaced",
|
||||||
|
"group":"valid.example.com",
|
||||||
|
"version":"v1",
|
||||||
|
"names":{
|
||||||
|
"kind": "Widget",
|
||||||
|
"listKind": "WidgetList",
|
||||||
|
"plural": "widgets",
|
||||||
|
"singular": "widget"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"status": {
|
||||||
|
"acceptedNames": {
|
||||||
|
"kind": "Widget",
|
||||||
|
"listKind": "WidgetList",
|
||||||
|
"plural": "widgets",
|
||||||
|
"singular": "widget"
|
||||||
|
},
|
||||||
|
"conditions": [
|
||||||
|
{
|
||||||
|
"lastTransitionTime": "2023-05-18T15:03:57Z",
|
||||||
|
"message": "no conflicts found",
|
||||||
|
"reason": "NoConflicts",
|
||||||
|
"status": "True",
|
||||||
|
"type": "NamesAccepted"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"lastTransitionTime": "2023-05-18T15:03:57Z",
|
||||||
|
"message": "the initial names have been accepted",
|
||||||
|
"reason": "InitialNamesAccepted",
|
||||||
|
"status": "True",
|
||||||
|
"type": "Established"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"storedVersions": [
|
||||||
|
"v1"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}`); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if _, err := etcd3Client.KV.Put(
|
||||||
|
ctx,
|
||||||
|
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.valid.example.com"),
|
||||||
|
`{
|
||||||
|
"apiVersion":"apiregistration.k8s.io/v1",
|
||||||
|
"kind":"APIService",
|
||||||
|
"metadata": {
|
||||||
|
"name": "v1.valid.example.com",
|
||||||
|
"uid":"foo",
|
||||||
|
"creationTimestamp": "2022-06-08T23:46:32Z",
|
||||||
|
"labels":{"kube-aggregator.kubernetes.io/automanaged":"true"}
|
||||||
|
},
|
||||||
|
"spec": {
|
||||||
|
"group": "valid.example.com",
|
||||||
|
"version": "v1",
|
||||||
|
"groupPriorityMinimum":100,
|
||||||
|
"versionPriority":10
|
||||||
|
}
|
||||||
|
}`,
|
||||||
|
); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Populate a stale managed APIService in etcd
|
||||||
|
if _, err := etcd3Client.KV.Put(
|
||||||
|
ctx,
|
||||||
|
path.Join("/", etcdConfig.Prefix, "apiregistration.k8s.io/apiservices/v1.stale.example.com"),
|
||||||
|
`{
|
||||||
|
"apiVersion":"apiregistration.k8s.io/v1",
|
||||||
|
"kind":"APIService",
|
||||||
|
"metadata": {
|
||||||
|
"name": "v1.stale.example.com",
|
||||||
|
"uid":"foo",
|
||||||
|
"creationTimestamp": "2022-06-08T23:46:32Z",
|
||||||
|
"labels":{"kube-aggregator.kubernetes.io/automanaged":"true"}
|
||||||
|
},
|
||||||
|
"spec": {
|
||||||
|
"group": "stale.example.com",
|
||||||
|
"version": "v1",
|
||||||
|
"groupPriorityMinimum":100,
|
||||||
|
"versionPriority":10
|
||||||
|
}
|
||||||
|
}`,
|
||||||
|
); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start server
|
||||||
|
options := kastesting.NewDefaultTestServerOptions()
|
||||||
|
options.SkipHealthzCheck = true
|
||||||
|
testServer := kastesting.StartTestServerOrDie(t, options, nil, etcdConfig)
|
||||||
|
defer testServer.TearDownFn()
|
||||||
|
|
||||||
|
kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
|
||||||
|
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)
|
||||||
|
|
||||||
|
// ensure both APIService objects remain
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear the bogus CRD data so the informer can sync
|
||||||
|
if _, err := etcd3Client.KV.Delete(ctx, bogusCRDEtcdPath); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
t.Log("cleaned up bogus CRD data")
|
||||||
|
|
||||||
|
// ensure the stale APIService object is cleaned up
|
||||||
|
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) {
|
||||||
|
_, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.stale.example.com", metav1.GetOptions{})
|
||||||
|
if err == nil {
|
||||||
|
t.Log("stale APIService still exists, waiting...")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if !apierrors.IsNotFound(err) {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ensure the valid APIService object remains
|
||||||
|
for i := 0; i < 5; i++ {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
if _, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1.valid.example.com", metav1.GetOptions{}); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestAggregatedAPIServer(t *testing.T) {
|
func TestAggregatedAPIServer(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
|
Loading…
Reference in New Issue
Block a user