mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 12:15:52 +00:00
Merge pull request #126270 from stlaz/aggroapi-refactor
integration tests: split Wardle aggregation test API server running
This commit is contained in:
commit
77c3859aee
@ -237,87 +237,19 @@ func TestAggregatedAPIServer(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testAggregatedAPIServer(t *testing.T, enableWardleFeatureGate bool, emulationVersion string) {
|
func testAggregatedAPIServer(t *testing.T, flunderBanningFeatureGate bool, emulationVersion string) {
|
||||||
|
const testNamespace = "kube-wardle"
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
// makes the kube-apiserver very responsive. it's normally a minute
|
testKAS, wardleOptions, wardlePort := prepareAggregatedWardleAPIServer(ctx, t, testNamespace)
|
||||||
dynamiccertificates.FileRefreshDuration = 1 * time.Second
|
kubeClientConfig := getKubeConfig(testKAS)
|
||||||
|
|
||||||
// we need the wardle port information first to set up the service resolver
|
|
||||||
listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
// endpoints cannot have loopback IPs so we need to override the resolver itself
|
|
||||||
t.Cleanup(app.SetServiceResolverForTests(staticURLServiceResolver(fmt.Sprintf("https://127.0.0.1:%d", wardlePort))))
|
|
||||||
|
|
||||||
testServer := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, BinaryVersion: "1.32"}, nil, framework.SharedEtcd())
|
|
||||||
defer testServer.TearDownFn()
|
|
||||||
kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
|
|
||||||
// force json because everything speaks it
|
|
||||||
kubeClientConfig.ContentType = ""
|
|
||||||
kubeClientConfig.AcceptContentTypes = ""
|
|
||||||
kubeClient := client.NewForConfigOrDie(kubeClientConfig)
|
|
||||||
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeClientConfig)
|
|
||||||
wardleClient := wardlev1alpha1client.NewForConfigOrDie(kubeClientConfig)
|
|
||||||
|
|
||||||
// create the bare minimum resources required to be able to get the API service into an available state
|
|
||||||
_, err = kubeClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "kube-wardle",
|
|
||||||
},
|
|
||||||
}, metav1.CreateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
_, err = kubeClient.CoreV1().Services("kube-wardle").Create(ctx, &corev1.Service{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "api",
|
|
||||||
},
|
|
||||||
Spec: corev1.ServiceSpec{
|
|
||||||
ExternalName: "needs-to-be-non-empty",
|
|
||||||
Type: corev1.ServiceTypeExternalName,
|
|
||||||
},
|
|
||||||
}, metav1.CreateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// start the wardle server to prove we can aggregate it
|
|
||||||
wardleToKASKubeConfigFile := writeKubeConfigForWardleServerToKASConnection(t, rest.CopyConfig(kubeClientConfig))
|
|
||||||
defer os.Remove(wardleToKASKubeConfigFile)
|
|
||||||
wardleCertDir, _ := os.MkdirTemp("", "test-integration-wardle-server")
|
wardleCertDir, _ := os.MkdirTemp("", "test-integration-wardle-server")
|
||||||
defer os.RemoveAll(wardleCertDir)
|
defer os.RemoveAll(wardleCertDir)
|
||||||
go func() {
|
|
||||||
o := sampleserver.NewWardleServerOptions(os.Stdout, os.Stderr)
|
directWardleClientConfig := runPreparedWardleServer(ctx, t, wardleOptions, wardleCertDir, wardlePort, flunderBanningFeatureGate, emulationVersion, kubeClientConfig)
|
||||||
// ensure this is a SAN on the generated cert for service FQDN
|
|
||||||
o.AlternateDNS = []string{
|
|
||||||
"api.kube-wardle.svc",
|
|
||||||
}
|
|
||||||
o.RecommendedOptions.SecureServing.Listener = listener
|
|
||||||
o.RecommendedOptions.SecureServing.BindAddress = netutils.ParseIPSloppy("127.0.0.1")
|
|
||||||
wardleCmd := sampleserver.NewCommandStartWardleServer(ctx, o)
|
|
||||||
args := []string{
|
|
||||||
"--authentication-kubeconfig", wardleToKASKubeConfigFile,
|
|
||||||
"--authorization-kubeconfig", wardleToKASKubeConfigFile,
|
|
||||||
"--etcd-servers", framework.GetEtcdURL(),
|
|
||||||
"--cert-dir", wardleCertDir,
|
|
||||||
"--kubeconfig", wardleToKASKubeConfigFile,
|
|
||||||
"--emulated-version", fmt.Sprintf("wardle=%s", emulationVersion),
|
|
||||||
}
|
|
||||||
if enableWardleFeatureGate {
|
|
||||||
args = append(args, "--feature-gates", "wardle:BanFlunder=true")
|
|
||||||
}
|
|
||||||
wardleCmd.SetArgs(args)
|
|
||||||
if err := wardleCmd.Execute(); err != nil {
|
|
||||||
t.Error(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
directWardleClientConfig, err := waitForWardleRunning(ctx, t, kubeClientConfig, wardleCertDir, wardlePort)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// now we're finally ready to test. These are what's run by default now
|
// now we're finally ready to test. These are what's run by default now
|
||||||
wardleDirectClient := client.NewForConfigOrDie(directWardleClientConfig)
|
wardleDirectClient := client.NewForConfigOrDie(directWardleClientConfig)
|
||||||
@ -325,81 +257,12 @@ func testAggregatedAPIServer(t *testing.T, enableWardleFeatureGate bool, emulati
|
|||||||
testAPIGroup(ctx, t, wardleDirectClient.Discovery().RESTClient())
|
testAPIGroup(ctx, t, wardleDirectClient.Discovery().RESTClient())
|
||||||
testAPIResourceList(ctx, t, wardleDirectClient.Discovery().RESTClient())
|
testAPIResourceList(ctx, t, wardleDirectClient.Discovery().RESTClient())
|
||||||
|
|
||||||
wardleCA, err := os.ReadFile(directWardleClientConfig.CAFile)
|
wardleClient := wardlev1alpha1client.NewForConfigOrDie(kubeClientConfig)
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
_, err = aggregatorClient.ApiregistrationV1().APIServices().Create(ctx, &apiregistrationv1.APIService{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"},
|
|
||||||
Spec: apiregistrationv1.APIServiceSpec{
|
|
||||||
Service: &apiregistrationv1.ServiceReference{
|
|
||||||
Namespace: "kube-wardle",
|
|
||||||
Name: "api",
|
|
||||||
},
|
|
||||||
Group: "wardle.example.com",
|
|
||||||
Version: "v1alpha1",
|
|
||||||
CABundle: wardleCA,
|
|
||||||
GroupPriorityMinimum: 200,
|
|
||||||
VersionPriority: 200,
|
|
||||||
},
|
|
||||||
}, metav1.CreateOptions{})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// wait for the API service to be available
|
waitForWardleAPIServiceReady(ctx, t, kubeClientConfig, wardleCertDir, testNamespace)
|
||||||
err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (done bool, err error) {
|
|
||||||
apiService, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1alpha1.wardle.example.com", metav1.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
var available bool
|
|
||||||
for _, condition := range apiService.Status.Conditions {
|
|
||||||
if condition.Type == apiregistrationv1.Available && condition.Status == apiregistrationv1.ConditionTrue {
|
|
||||||
available = true
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !available {
|
|
||||||
t.Log("api service is not available", apiService.Status.Conditions)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure discovery is healthy overall
|
|
||||||
_, _, err = kubeClient.Discovery().ServerGroupsAndResources()
|
|
||||||
if err != nil {
|
|
||||||
t.Log("discovery failed", err)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure we have the wardle resources in discovery
|
|
||||||
apiResources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("wardle.example.com/v1alpha1")
|
|
||||||
if err != nil {
|
|
||||||
t.Log("wardle discovery failed", err)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
if len(apiResources.APIResources) != 2 {
|
|
||||||
t.Log("wardle discovery has wrong resources", apiResources.APIResources)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
resources := make([]string, 0, 2)
|
|
||||||
for _, resource := range apiResources.APIResources {
|
|
||||||
resource := resource
|
|
||||||
resources = append(resources, resource.Name)
|
|
||||||
}
|
|
||||||
sort.Strings(resources)
|
|
||||||
if !reflect.DeepEqual([]string{"fischers", "flunders"}, resources) {
|
|
||||||
return false, fmt.Errorf("unexpected resources: %v", resources)
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// perform simple CRUD operations against the wardle resources
|
// perform simple CRUD operations against the wardle resources
|
||||||
_, err = wardleClient.Fischers().Create(ctx, &wardlev1alpha1.Fischer{
|
_, err := wardleClient.Fischers().Create(ctx, &wardlev1alpha1.Fischer{
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
Name: "panda",
|
Name: "panda",
|
||||||
},
|
},
|
||||||
@ -426,7 +289,7 @@ func testAggregatedAPIServer(t *testing.T, enableWardleFeatureGate bool, emulati
|
|||||||
Name: "badname",
|
Name: "badname",
|
||||||
},
|
},
|
||||||
}, metav1.CreateOptions{})
|
}, metav1.CreateOptions{})
|
||||||
banFlunder := enableWardleFeatureGate || emulationVersion == "1.2"
|
banFlunder := flunderBanningFeatureGate || emulationVersion == "1.2"
|
||||||
if banFlunder && err == nil {
|
if banFlunder && err == nil {
|
||||||
t.Fatal("expect flunder:badname not admitted when wardle feature gates are specified")
|
t.Fatal("expect flunder:badname not admitted when wardle feature gates are specified")
|
||||||
}
|
}
|
||||||
@ -498,10 +361,10 @@ func testAggregatedAPIServer(t *testing.T, enableWardleFeatureGate bool, emulati
|
|||||||
|
|
||||||
// now we update the client-ca nd request-header-client-ca-file and the kas will consume it, update the configmap
|
// now we update the client-ca nd request-header-client-ca-file and the kas will consume it, update the configmap
|
||||||
// and then the wardle server will detect and update too.
|
// and then the wardle server will detect and update too.
|
||||||
if err := os.WriteFile(path.Join(testServer.TmpDir, "client-ca.crt"), differentClientCA, 0644); err != nil {
|
if err := os.WriteFile(path.Join(testKAS.TmpDir, "client-ca.crt"), differentClientCA, 0644); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := os.WriteFile(path.Join(testServer.TmpDir, "proxy-ca.crt"), differentFrontProxyCA, 0644); err != nil {
|
if err := os.WriteFile(path.Join(testKAS.TmpDir, "proxy-ca.crt"), differentFrontProxyCA, 0644); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
// wait for it to be picked up. there's a test in certreload_test.go that ensure this works
|
// wait for it to be picked up. there's a test in certreload_test.go that ensure this works
|
||||||
@ -547,44 +410,6 @@ func testAggregatedAPIServer(t *testing.T, enableWardleFeatureGate bool, emulati
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func waitForWardleRunning(ctx context.Context, t *testing.T, wardleToKASKubeConfig *rest.Config, wardleCertDir string, wardlePort int) (*rest.Config, error) {
|
|
||||||
directWardleClientConfig := rest.AnonymousClientConfig(rest.CopyConfig(wardleToKASKubeConfig))
|
|
||||||
directWardleClientConfig.CAFile = path.Join(wardleCertDir, "apiserver.crt")
|
|
||||||
directWardleClientConfig.CAData = nil
|
|
||||||
directWardleClientConfig.ServerName = ""
|
|
||||||
directWardleClientConfig.BearerToken = wardleToKASKubeConfig.BearerToken
|
|
||||||
var wardleClient client.Interface
|
|
||||||
lastHealthContent := []byte{}
|
|
||||||
var lastHealthErr error
|
|
||||||
err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
|
||||||
if _, err := os.Stat(directWardleClientConfig.CAFile); os.IsNotExist(err) { // wait until the file trust is created
|
|
||||||
lastHealthErr = err
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
directWardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort)
|
|
||||||
wardleClient, err = client.NewForConfig(directWardleClientConfig)
|
|
||||||
if err != nil {
|
|
||||||
// this happens because we race the API server start
|
|
||||||
t.Log(err)
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
healthStatus := 0
|
|
||||||
result := wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus)
|
|
||||||
lastHealthContent, lastHealthErr = result.Raw()
|
|
||||||
if healthStatus != http.StatusOK {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
t.Log(string(lastHealthContent))
|
|
||||||
t.Log(lastHealthErr)
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return directWardleClientConfig, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestAggregatedAPIServerRejectRedirectResponse(t *testing.T) {
|
func TestAggregatedAPIServerRejectRedirectResponse(t *testing.T) {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
@ -699,6 +524,225 @@ func TestAggregatedAPIServerRejectRedirectResponse(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func prepareAggregatedWardleAPIServer(ctx context.Context, t *testing.T, namespace string) (*kastesting.TestServer, *sampleserver.WardleServerOptions, int) {
|
||||||
|
// makes the kube-apiserver very responsive. it's normally a minute
|
||||||
|
dynamiccertificates.FileRefreshDuration = 1 * time.Second
|
||||||
|
|
||||||
|
// we need the wardle port information first to set up the service resolver
|
||||||
|
listener, wardlePort, err := genericapiserveroptions.CreateListener("tcp", "127.0.0.1:0", net.ListenConfig{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
// endpoints cannot have loopback IPs so we need to override the resolver itself
|
||||||
|
t.Cleanup(app.SetServiceResolverForTests(staticURLServiceResolver(fmt.Sprintf("https://127.0.0.1:%d", wardlePort))))
|
||||||
|
|
||||||
|
testServer := kastesting.StartTestServerOrDie(t, &kastesting.TestServerInstanceOptions{EnableCertAuth: true, BinaryVersion: "1.32"}, nil, framework.SharedEtcd())
|
||||||
|
t.Cleanup(func() { testServer.TearDownFn() })
|
||||||
|
|
||||||
|
kubeClient := client.NewForConfigOrDie(getKubeConfig(testServer))
|
||||||
|
|
||||||
|
// create the bare minimum resources required to be able to get the API service into an available state
|
||||||
|
_, err = kubeClient.CoreV1().Namespaces().Create(ctx, &corev1.Namespace{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: namespace,
|
||||||
|
},
|
||||||
|
}, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = kubeClient.CoreV1().Services(namespace).Create(ctx, &corev1.Service{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "api",
|
||||||
|
},
|
||||||
|
Spec: corev1.ServiceSpec{
|
||||||
|
ExternalName: "needs-to-be-non-empty",
|
||||||
|
Type: corev1.ServiceTypeExternalName,
|
||||||
|
},
|
||||||
|
}, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
wardleOptions := sampleserver.NewWardleServerOptions(os.Stdout, os.Stderr)
|
||||||
|
// ensure this is a SAN on the generated cert for service FQDN
|
||||||
|
wardleOptions.AlternateDNS = []string{
|
||||||
|
fmt.Sprintf("api.%s.svc", namespace),
|
||||||
|
}
|
||||||
|
wardleOptions.RecommendedOptions.SecureServing.Listener = listener
|
||||||
|
wardleOptions.RecommendedOptions.SecureServing.BindAddress = netutils.ParseIPSloppy("127.0.0.1")
|
||||||
|
|
||||||
|
return testServer, wardleOptions, wardlePort
|
||||||
|
}
|
||||||
|
|
||||||
|
func runPreparedWardleServer(
|
||||||
|
ctx context.Context,
|
||||||
|
t *testing.T,
|
||||||
|
wardleOptions *sampleserver.WardleServerOptions,
|
||||||
|
certDir string,
|
||||||
|
wardlePort int,
|
||||||
|
flunderBanningFeatureGate bool,
|
||||||
|
emulationVersion string,
|
||||||
|
kubeConfig *rest.Config,
|
||||||
|
) *rest.Config {
|
||||||
|
|
||||||
|
// start the wardle server to prove we can aggregate it
|
||||||
|
wardleToKASKubeConfigFile := writeKubeConfigForWardleServerToKASConnection(t, rest.CopyConfig(kubeConfig))
|
||||||
|
t.Cleanup(func() { os.Remove(wardleToKASKubeConfigFile) })
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
args := []string{
|
||||||
|
"--authentication-kubeconfig", wardleToKASKubeConfigFile,
|
||||||
|
"--authorization-kubeconfig", wardleToKASKubeConfigFile,
|
||||||
|
"--etcd-servers", framework.GetEtcdURL(),
|
||||||
|
"--cert-dir", certDir,
|
||||||
|
"--kubeconfig", wardleToKASKubeConfigFile,
|
||||||
|
"--emulated-version", fmt.Sprintf("wardle=%s", emulationVersion),
|
||||||
|
}
|
||||||
|
if flunderBanningFeatureGate {
|
||||||
|
args = append(args, "--feature-gates", "wardle:BanFlunder=true")
|
||||||
|
}
|
||||||
|
wardleCmd := sampleserver.NewCommandStartWardleServer(ctx, wardleOptions)
|
||||||
|
wardleCmd.SetArgs(args)
|
||||||
|
if err := wardleCmd.Execute(); err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
directWardleClientConfig, err := waitForWardleRunning(ctx, t, kubeConfig, certDir, wardlePort)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return directWardleClientConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForWardleAPIServiceReady(ctx context.Context, t *testing.T, kubeConfig *rest.Config, wardleCertDir string, namespace string) {
|
||||||
|
kubeClient := client.NewForConfigOrDie(kubeConfig)
|
||||||
|
aggregatorClient := aggregatorclient.NewForConfigOrDie(kubeConfig)
|
||||||
|
|
||||||
|
wardleCA, err := os.ReadFile(wardleCAFilePath(wardleCertDir))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
_, err = aggregatorClient.ApiregistrationV1().APIServices().Create(ctx, &apiregistrationv1.APIService{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"},
|
||||||
|
Spec: apiregistrationv1.APIServiceSpec{
|
||||||
|
Service: &apiregistrationv1.ServiceReference{
|
||||||
|
Namespace: namespace,
|
||||||
|
Name: "api",
|
||||||
|
},
|
||||||
|
Group: "wardle.example.com",
|
||||||
|
Version: "v1alpha1",
|
||||||
|
CABundle: wardleCA,
|
||||||
|
GroupPriorityMinimum: 200,
|
||||||
|
VersionPriority: 200,
|
||||||
|
},
|
||||||
|
}, metav1.CreateOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// wait for the API service to be available
|
||||||
|
err = wait.Poll(time.Second, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||||
|
apiService, err := aggregatorClient.ApiregistrationV1().APIServices().Get(ctx, "v1alpha1.wardle.example.com", metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
var available bool
|
||||||
|
for _, condition := range apiService.Status.Conditions {
|
||||||
|
if condition.Type == apiregistrationv1.Available && condition.Status == apiregistrationv1.ConditionTrue {
|
||||||
|
available = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !available {
|
||||||
|
t.Log("api service is not available", apiService.Status.Conditions)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure discovery is healthy overall
|
||||||
|
_, _, err = kubeClient.Discovery().ServerGroupsAndResources()
|
||||||
|
if err != nil {
|
||||||
|
t.Log("discovery failed", err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure we have the wardle resources in discovery
|
||||||
|
apiResources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("wardle.example.com/v1alpha1")
|
||||||
|
if err != nil {
|
||||||
|
t.Log("wardle discovery failed", err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if len(apiResources.APIResources) != 2 {
|
||||||
|
t.Log("wardle discovery has wrong resources", apiResources.APIResources)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
resources := make([]string, 0, 2)
|
||||||
|
for _, resource := range apiResources.APIResources {
|
||||||
|
resource := resource
|
||||||
|
resources = append(resources, resource.Name)
|
||||||
|
}
|
||||||
|
sort.Strings(resources)
|
||||||
|
if !reflect.DeepEqual([]string{"fischers", "flunders"}, resources) {
|
||||||
|
return false, fmt.Errorf("unexpected resources: %v", resources)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func getKubeConfig(testServer *kastesting.TestServer) *rest.Config {
|
||||||
|
kubeClientConfig := rest.CopyConfig(testServer.ClientConfig)
|
||||||
|
// force json because everything speaks it
|
||||||
|
kubeClientConfig.ContentType = ""
|
||||||
|
kubeClientConfig.AcceptContentTypes = ""
|
||||||
|
|
||||||
|
return kubeClientConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForWardleRunning(ctx context.Context, t *testing.T, wardleToKASKubeConfig *rest.Config, wardleCertDir string, wardlePort int) (*rest.Config, error) {
|
||||||
|
directWardleClientConfig := rest.AnonymousClientConfig(rest.CopyConfig(wardleToKASKubeConfig))
|
||||||
|
directWardleClientConfig.CAFile = wardleCAFilePath(wardleCertDir)
|
||||||
|
directWardleClientConfig.CAData = nil
|
||||||
|
directWardleClientConfig.ServerName = ""
|
||||||
|
directWardleClientConfig.BearerToken = wardleToKASKubeConfig.BearerToken
|
||||||
|
var wardleClient client.Interface
|
||||||
|
lastHealthContent := []byte{}
|
||||||
|
var lastHealthErr error
|
||||||
|
err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (done bool, err error) {
|
||||||
|
if _, err := os.Stat(directWardleClientConfig.CAFile); os.IsNotExist(err) { // wait until the file trust is created
|
||||||
|
lastHealthErr = err
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
directWardleClientConfig.Host = fmt.Sprintf("https://127.0.0.1:%d", wardlePort)
|
||||||
|
wardleClient, err = client.NewForConfig(directWardleClientConfig)
|
||||||
|
if err != nil {
|
||||||
|
// this happens because we race the API server start
|
||||||
|
t.Log(err)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
healthStatus := 0
|
||||||
|
result := wardleClient.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus)
|
||||||
|
lastHealthContent, lastHealthErr = result.Raw()
|
||||||
|
if healthStatus != http.StatusOK {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
t.Log(string(lastHealthContent))
|
||||||
|
t.Log(lastHealthErr)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return directWardleClientConfig, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func wardleCAFilePath(wardleCertDir string) string { return path.Join(wardleCertDir, "apiserver.crt") }
|
||||||
|
|
||||||
func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfig *rest.Config) string {
|
func writeKubeConfigForWardleServerToKASConnection(t *testing.T, kubeClientConfig *rest.Config) string {
|
||||||
// write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config
|
// write a kubeconfig out for starting other API servers with delegated auth. remember, no in-cluster config
|
||||||
// the loopback client config uses a loopback cert with different SNI. We need to use the "real"
|
// the loopback client config uses a loopback cert with different SNI. We need to use the "real"
|
||||||
|
Loading…
Reference in New Issue
Block a user