mirror of
				https://github.com/k3s-io/kubernetes.git
				synced 2025-11-04 07:49:35 +00:00 
			
		
		
		
	crd-openapi-publishing: in e2e query apiserver instances for HA
This commit is contained in:
		@@ -71,6 +71,7 @@ go_library(
 | 
			
		||||
        "//staging/src/k8s.io/client-go/discovery:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/dynamic:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/kubernetes:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/rest:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/util/cert:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/util/keyutil:go_default_library",
 | 
			
		||||
        "//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
 | 
			
		||||
 
 | 
			
		||||
@@ -19,6 +19,8 @@ package apimachinery
 | 
			
		||||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
	"net/http"
 | 
			
		||||
	"regexp"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
@@ -35,6 +37,7 @@ import (
 | 
			
		||||
	"k8s.io/apimachinery/pkg/util/wait"
 | 
			
		||||
	utilyaml "k8s.io/apimachinery/pkg/util/yaml"
 | 
			
		||||
	k8sclientset "k8s.io/client-go/kubernetes"
 | 
			
		||||
	"k8s.io/client-go/rest"
 | 
			
		||||
	openapiutil "k8s.io/kube-openapi/pkg/util"
 | 
			
		||||
	"k8s.io/kubernetes/test/e2e/framework"
 | 
			
		||||
	"k8s.io/kubernetes/test/utils/crd"
 | 
			
		||||
@@ -389,6 +392,23 @@ func patchSchema(schema []byte, crd *crd.TestCrd) error {
 | 
			
		||||
	return err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const waitSuccessThreshold = 10
 | 
			
		||||
 | 
			
		||||
// mustSucceedMultipleTimes calls f multiple times on success and only returns true if all calls are successful.
 | 
			
		||||
// This is necessary to avoid flaking tests where one call might hit a good apiserver while in HA other apiservers
 | 
			
		||||
// might be lagging behind. Calling f multiple times reduces the chance exponentially.
 | 
			
		||||
func mustSucceedMultipleTimes(n int, f func() (bool, error)) func() (bool, error) {
 | 
			
		||||
	return func() (bool, error) {
 | 
			
		||||
		for i := 0; i < n; i++ {
 | 
			
		||||
			ok, err := f()
 | 
			
		||||
			if err != nil || !ok {
 | 
			
		||||
				return ok, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// waitForDefinition waits for given definition showing up in swagger with given schema
 | 
			
		||||
func waitForDefinition(c k8sclientset.Interface, name string, schema []byte) error {
 | 
			
		||||
	expect := spec.Schema{}
 | 
			
		||||
@@ -396,54 +416,78 @@ func waitForDefinition(c k8sclientset.Interface, name string, schema []byte) err
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	lastMsg := ""
 | 
			
		||||
	if err := wait.Poll(500*time.Millisecond, 10*time.Second, func() (bool, error) {
 | 
			
		||||
		bs, err := c.CoreV1().RESTClient().Get().AbsPath("openapi", "v2").DoRaw()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
		spec := spec.Swagger{}
 | 
			
		||||
		if err := json.Unmarshal(bs, &spec); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
	err := waitForOpenAPISchema(c, func(spec *spec.Swagger) (bool, string) {
 | 
			
		||||
		d, ok := spec.SwaggerProps.Definitions[name]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			lastMsg = fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not found", name)
 | 
			
		||||
			return false, nil
 | 
			
		||||
			return false, fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not found", name)
 | 
			
		||||
		}
 | 
			
		||||
		// drop properties and extension that we added
 | 
			
		||||
		dropDefaults(&d)
 | 
			
		||||
		if !apiequality.Semantic.DeepEqual(expect, d) {
 | 
			
		||||
			lastMsg = fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not match; expect: %v, actual: %v", name, expect, d)
 | 
			
		||||
			return false, nil
 | 
			
		||||
			return false, fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] not match; expect: %v, actual: %v", name, expect, d)
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to wait for definition %s to be served: %v; lastMsg: %s", name, err, lastMsg)
 | 
			
		||||
		return true, ""
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to wait for definition %q to be served with the right OpenAPI schema: %v", name, err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// waitForDefinitionCleanup waits for given definition to be removed from swagger
 | 
			
		||||
func waitForDefinitionCleanup(c k8sclientset.Interface, name string) error {
 | 
			
		||||
	err := waitForOpenAPISchema(c, func(spec *spec.Swagger) (bool, string) {
 | 
			
		||||
		if _, ok := spec.SwaggerProps.Definitions[name]; ok {
 | 
			
		||||
			return false, fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] still exists", name)
 | 
			
		||||
		}
 | 
			
		||||
		return true, ""
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to wait for definition %q not to be served anymore: %v", name, err)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func waitForOpenAPISchema(c k8sclientset.Interface, pred func(*spec.Swagger) (bool, string)) error {
 | 
			
		||||
	client := c.CoreV1().RESTClient().(*rest.RESTClient).Client
 | 
			
		||||
	url := c.CoreV1().RESTClient().Get().AbsPath("openapi", "v2").URL()
 | 
			
		||||
	lastMsg := ""
 | 
			
		||||
	if err := wait.Poll(500*time.Millisecond, 10*time.Second, func() (bool, error) {
 | 
			
		||||
		bs, err := c.CoreV1().RESTClient().Get().AbsPath("openapi", "v2").DoRaw()
 | 
			
		||||
	etag := ""
 | 
			
		||||
	var etagSpec *spec.Swagger
 | 
			
		||||
	if err := wait.Poll(500*time.Millisecond, wait.ForeverTestTimeout, mustSucceedMultipleTimes(waitSuccessThreshold, func() (bool, error) {
 | 
			
		||||
		// download spec with etag support
 | 
			
		||||
		spec := &spec.Swagger{}
 | 
			
		||||
		req, err := http.NewRequest("GET", url.String(), nil)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
		spec := spec.Swagger{}
 | 
			
		||||
		if err := json.Unmarshal(bs, &spec); err != nil {
 | 
			
		||||
		req.Close = true // enforce a new connection to hit different HA API servers
 | 
			
		||||
		if len(etag) > 0 {
 | 
			
		||||
			req.Header.Set("If-None-Match", fmt.Sprintf(`"%s"`, etag))
 | 
			
		||||
		}
 | 
			
		||||
		resp, err := client.Do(req)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		}
 | 
			
		||||
		_, ok := spec.SwaggerProps.Definitions[name]
 | 
			
		||||
		if ok {
 | 
			
		||||
			lastMsg = fmt.Sprintf("spec.SwaggerProps.Definitions[\"%s\"] still exists", name)
 | 
			
		||||
			return false, nil
 | 
			
		||||
		defer resp.Body.Close()
 | 
			
		||||
		if resp.StatusCode == http.StatusNotModified {
 | 
			
		||||
			spec = etagSpec
 | 
			
		||||
		} else if resp.StatusCode != http.StatusOK {
 | 
			
		||||
			return false, fmt.Errorf("unexpected response: %d", resp.StatusCode)
 | 
			
		||||
		} else if bs, err := ioutil.ReadAll(resp.Body); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		} else if err := json.Unmarshal(bs, spec); err != nil {
 | 
			
		||||
			return false, err
 | 
			
		||||
		} else {
 | 
			
		||||
			etag = strings.Trim(resp.Header.Get("ETag"), `"`)
 | 
			
		||||
			etagSpec = spec
 | 
			
		||||
		}
 | 
			
		||||
		return true, nil
 | 
			
		||||
	}); err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to wait for definition %s to be removed: %v; lastMsg: %s", name, err, lastMsg)
 | 
			
		||||
 | 
			
		||||
		var ok bool
 | 
			
		||||
		ok, lastMsg = pred(spec)
 | 
			
		||||
		return ok, nil
 | 
			
		||||
	})); err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to wait for OpenAPI spec validating condition: %v; lastMsg: %s", err, lastMsg)
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user