adds watchListEndpointRestrictions for watchlist requests (#126996)

* endpoints/handlers/get: intro watchListEndpointRestrictions

* consistencydetector/list_data_consistency_detector: expose IsDataConsistencyDetectionForListEnabled

* e2e/watchlist: extract common function for adding unstructured secrets

* e2e/watchlist: new e2e scenarios for convering watchListEndpointRestrict
This commit is contained in:
Lukasz Szaszkiewicz 2024-09-25 11:12:01 +02:00 committed by GitHub
parent 99ff62e87a
commit ae35048cb0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 140 additions and 21 deletions

View File

@ -45,6 +45,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/component-base/tracing"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
)
// getterFunc performs a get request with the given context and object name. The request
@ -185,15 +186,8 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
if err != nil {
hasName = false
}
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
opts := metainternalversion.ListOptions{}
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &opts); err != nil {
err = errors.NewBadRequest(err.Error())
@ -208,6 +202,17 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
return
}
var restrictions negotiation.EndpointRestrictions
restrictions = scope
if isListWatchRequest(opts) {
restrictions = &watchListEndpointRestrictions{scope}
}
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, restrictions)
if err != nil {
scope.err(err, w, req)
return
}
// transform fields
// TODO: DecodeParametersInto should do this.
if opts.FieldSelector != nil {
@ -307,3 +312,18 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
transformResponseObject(ctx, scope, req, w, http.StatusOK, outputMediaType, result)
}
}
type watchListEndpointRestrictions struct {
negotiation.EndpointRestrictions
}
func (e *watchListEndpointRestrictions) AllowsMediaTypeTransform(mimeType, mimeSubType string, target *schema.GroupVersionKind) bool {
if target != nil && target.Kind == "Table" {
return false
}
return e.EndpointRestrictions.AllowsMediaTypeTransform(mimeType, mimeSubType, target)
}
func isListWatchRequest(opts metainternalversion.ListOptions) bool {
return utilfeature.DefaultFeatureGate.Enabled(features.WatchList) && ptr.Deref(opts.SendInitialEvents, false) && opts.AllowWatchBookmarks
}

View File

@ -32,6 +32,12 @@ func init() {
dataConsistencyDetectionForListFromCacheEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR"))
}
// IsDataConsistencyDetectionForListEnabled returns true when
// the KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
func IsDataConsistencyDetectionForListEnabled() bool {
return dataConsistencyDetectionForListFromCacheEnabled
}
// CheckListFromCacheDataConsistencyIfRequested performs a data consistency check only when
// the KUBE_LIST_FROM_CACHE_INCONSISTENCY_DETECTOR environment variable was set during a binary startup
// for requests that have a high chance of being served from the watch-cache.
@ -50,7 +56,7 @@ func init() {
// the cache (even though this might not be true for some requests)
// and issue the second call to get data from etcd for comparison.
func CheckListFromCacheDataConsistencyIfRequested[T runtime.Object](ctx context.Context, identity string, listItemsFn ListFunc[T], optionsUsedToReceiveList metav1.ListOptions, receivedList runtime.Object) {
if !dataConsistencyDetectionForListFromCacheEnabled {
if !IsDataConsistencyDetectionForListEnabled() {
return
}
checkListFromCacheDataConsistencyIfRequestedInternal(ctx, identity, listItemsFn, optionsUsedToReceiveList, receivedList)

View File

@ -21,6 +21,7 @@ import (
"fmt"
"net/http"
"sort"
"strings"
"time"
"github.com/google/go-cmp/cmp"
@ -117,14 +118,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
var expectedSecrets []unstructured.Unstructured
for i := 1; i <= 5; i++ {
unstructuredSecret, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newSecret(fmt.Sprintf("secret-%d", i)))
framework.ExpectNoError(err)
secret, err := f.DynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Create(ctx, &unstructured.Unstructured{Object: unstructuredSecret}, metav1.CreateOptions{})
framework.ExpectNoError(err)
expectedSecrets = append(expectedSecrets, *secret)
}
expectedSecrets := addWellKnownUnstructuredSecrets(ctx, f)
rt, clientConfig := clientConfigWithRoundTripper(f)
wrappedDynamicClient, err := dynamic.NewForConfig(clientConfig)
@ -171,16 +165,86 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
expectedRequestMadeByMetaClient := getExpectedRequestMadeByClientFor(secretMetaList.GetResourceVersion())
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByMetaClient))
})
// Validates unsupported Accept headers in WatchList.
// Sets AcceptContentType to "application/json;as=Table", which the API doesn't support, returning a 406 error.
// After the 406, the client falls back to a regular list request.
ginkgo.It("doesn't support receiving resources as Tables", func(ctx context.Context) {
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
_ = addWellKnownUnstructuredSecrets(ctx, f)
rt, clientConfig := clientConfigWithRoundTripper(f)
modifiedClientConfig := dynamic.ConfigFor(clientConfig)
modifiedClientConfig.AcceptContentTypes = strings.Join([]string{
fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName),
}, ",")
modifiedClientConfig.GroupVersion = &v1.SchemeGroupVersion
restClient, err := rest.RESTClientFor(modifiedClientConfig)
framework.ExpectNoError(err)
wrappedDynamicClient := dynamic.New(restClient)
// note that the client in case of an error (406) will fall back
// to a standard list request thus the overall call passes
ginkgo.By("Streaming secrets as Table from the server")
secretTable, err := wrappedDynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err)
gomega.Expect(secretTable.GetObjectKind().GroupVersionKind()).To(gomega.Equal(metav1.SchemeGroupVersion.WithKind("Table")))
ginkgo.By("Verifying if expected response was sent by the server")
gomega.Expect(rt.actualResponseStatuses[0]).To(gomega.Equal("406 Not Acceptable"))
expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientWhenFallbackToListFor(secretTable.GetResourceVersion())
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient))
})
// Sets AcceptContentType to both "application/json;as=Table" and "application/json".
// Unlike the previous test, no 406 error occurs, as the API falls back to "application/json" and returns a valid response.
ginkgo.It("falls backs to supported content type when when receiving resources as Tables was requested", func(ctx context.Context) {
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
expectedSecrets := addWellKnownUnstructuredSecrets(ctx, f)
rt, clientConfig := clientConfigWithRoundTripper(f)
modifiedClientConfig := dynamic.ConfigFor(clientConfig)
modifiedClientConfig.AcceptContentTypes = strings.Join([]string{
fmt.Sprintf("application/json;as=Table;v=%s;g=%s", metav1.SchemeGroupVersion.Version, metav1.GroupName),
"application/json",
}, ",")
modifiedClientConfig.GroupVersion = &v1.SchemeGroupVersion
restClient, err := rest.RESTClientFor(modifiedClientConfig)
framework.ExpectNoError(err)
wrappedDynamicClient := dynamic.New(restClient)
ginkgo.By("Streaming secrets from the server")
secretList, err := wrappedDynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{})
framework.ExpectNoError(err)
ginkgo.By("Verifying if the secret list was properly streamed")
streamedSecrets := secretList.Items
gomega.Expect(cmp.Equal(expectedSecrets, streamedSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data"))
ginkgo.By("Verifying if expected requests were sent to the server")
expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientFor(secretList.GetResourceVersion())
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient))
})
})
type roundTripper struct {
actualRequests []string
delegate http.RoundTripper
actualRequests []string
actualResponseStatuses []string
delegate http.RoundTripper
}
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
r.actualRequests = append(r.actualRequests, req.URL.RawQuery)
return r.delegate.RoundTrip(req)
rsp, err := r.delegate.RoundTrip(req)
if rsp != nil {
r.actualResponseStatuses = append(r.actualResponseStatuses, rsp.Status)
}
return rsp, err
}
func (r *roundTripper) Wrap(delegate http.RoundTripper) http.RoundTripper {
@ -211,10 +275,12 @@ func verifyStore(ctx context.Context, expectedSecrets []v1.Secret, store cache.S
framework.ExpectNoError(err)
}
// corresponds to a streaming request made by the client to stream the secrets
const expectedStreamingRequestMadeByClient string = "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true"
func getExpectedRequestMadeByClientFor(rv string) []string {
expectedRequestMadeByClient := []string{
// corresponds to a streaming request made by the client to stream the secrets
"allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true",
expectedStreamingRequestMadeByClient,
}
if consistencydetector.IsDataConsistencyDetectionForWatchListEnabled() {
// corresponds to a standard list request made by the consistency detector build in into the client
@ -223,6 +289,19 @@ func getExpectedRequestMadeByClientFor(rv string) []string {
return expectedRequestMadeByClient
}
func getExpectedRequestMadeByClientWhenFallbackToListFor(rv string) []string {
expectedRequestMadeByClient := []string{
expectedStreamingRequestMadeByClient,
// corresponds to a list request made by the client
"",
}
if consistencydetector.IsDataConsistencyDetectionForListEnabled() {
// corresponds to a standard list request made by the consistency detector build in into the client
expectedRequestMadeByClient = append(expectedRequestMadeByClient, fmt.Sprintf("resourceVersion=%s&resourceVersionMatch=Exact", rv))
}
return expectedRequestMadeByClient
}
func addWellKnownSecrets(ctx context.Context, f *framework.Framework) []v1.Secret {
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
var secrets []v1.Secret
@ -234,6 +313,20 @@ func addWellKnownSecrets(ctx context.Context, f *framework.Framework) []v1.Secre
return secrets
}
// addWellKnownUnstructuredSecrets exists because secrets from addWellKnownSecrets
// don't have type info and cannot be converted.
func addWellKnownUnstructuredSecrets(ctx context.Context, f *framework.Framework) []unstructured.Unstructured {
var secrets []unstructured.Unstructured
for i := 1; i <= 5; i++ {
unstructuredSecret, err := runtime.DefaultUnstructuredConverter.ToUnstructured(newSecret(fmt.Sprintf("secret-%d", i)))
framework.ExpectNoError(err)
secret, err := f.DynamicClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Create(ctx, &unstructured.Unstructured{Object: unstructuredSecret}, metav1.CreateOptions{})
framework.ExpectNoError(err)
secrets = append(secrets, *secret)
}
return secrets
}
type byName []v1.Secret
func (a byName) Len() int { return len(a) }