mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-21 09:34:40 +00:00
Merge pull request #127388 from p0lyn0mial/upstream-watchlist-meta-client
metadata client: add support for API streaming
This commit is contained in:
commit
1874039f82
@ -33,6 +33,8 @@ import (
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/util/consistencydetector"
|
||||
"k8s.io/client-go/util/watchlist"
|
||||
)
|
||||
|
||||
var deleteScheme = runtime.NewScheme()
|
||||
@ -218,6 +220,24 @@ func (c *client) Get(ctx context.Context, name string, opts metav1.GetOptions, s
|
||||
|
||||
// List returns all resources within the specified scope (namespace or cluster).
|
||||
func (c *client) List(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) {
|
||||
if watchListOptions, hasWatchListOptionsPrepared, watchListOptionsErr := watchlist.PrepareWatchListOptionsFromListOptions(opts); watchListOptionsErr != nil {
|
||||
klog.FromContext(ctx).Error(watchListOptionsErr, "Failed preparing watchlist options, falling back to the standard LIST semantics", "resource", c.resource)
|
||||
} else if hasWatchListOptionsPrepared {
|
||||
result, err := c.watchList(ctx, watchListOptions)
|
||||
if err == nil {
|
||||
consistencydetector.CheckWatchListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("watchlist request for %v", c.resource), c.list, opts, result)
|
||||
return result, nil
|
||||
}
|
||||
klog.FromContext(ctx).Error(err, "The watchlist request ended with an error, falling back to the standard LIST semantics", "resource", c.resource)
|
||||
}
|
||||
result, err := c.list(ctx, opts)
|
||||
if err == nil {
|
||||
consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
func (c *client) list(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) {
|
||||
result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).
|
||||
SetHeader("Accept", "application/vnd.kubernetes.protobuf;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1,application/json;as=PartialObjectMetadataList;g=meta.k8s.io;v=v1,application/json").
|
||||
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
|
||||
@ -249,6 +269,25 @@ func (c *client) List(ctx context.Context, opts metav1.ListOptions) (*metav1.Par
|
||||
return partial, nil
|
||||
}
|
||||
|
||||
// watchList establishes a watch stream with the server and returns PartialObjectMetadataList.
|
||||
func (c *client) watchList(ctx context.Context, opts metav1.ListOptions) (*metav1.PartialObjectMetadataList, error) {
|
||||
var timeout time.Duration
|
||||
if opts.TimeoutSeconds != nil {
|
||||
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
|
||||
}
|
||||
|
||||
result := &metav1.PartialObjectMetadataList{}
|
||||
err := c.client.client.Get().
|
||||
AbsPath(c.makeURLSegments("")...).
|
||||
SetHeader("Accept", "application/vnd.kubernetes.protobuf;as=PartialObjectMetadata;g=meta.k8s.io;v=v1,application/json;as=PartialObjectMetadata;g=meta.k8s.io;v=v1,application/json").
|
||||
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
|
||||
Timeout(timeout).
|
||||
WatchList(ctx).
|
||||
Into(result)
|
||||
|
||||
return result, err
|
||||
}
|
||||
|
||||
// Watch finds all changes to the resources in the specified scope (namespace or cluster).
|
||||
func (c *client) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
|
||||
var timeout time.Duration
|
||||
|
@ -31,12 +31,14 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/client-go/dynamic"
|
||||
clientfeatures "k8s.io/client-go/features"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/metadata"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/consistencydetector"
|
||||
@ -66,13 +68,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
|
||||
nil,
|
||||
)
|
||||
|
||||
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
|
||||
var expectedSecrets []v1.Secret
|
||||
for i := 1; i <= 5; i++ {
|
||||
secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
expectedSecrets = append(expectedSecrets, *secret)
|
||||
}
|
||||
expectedSecrets := addWellKnownSecrets(ctx, f)
|
||||
|
||||
ginkgo.By("Starting the secret informer")
|
||||
go secretInformer.Run(stopCh)
|
||||
@ -99,13 +95,7 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
|
||||
ginkgo.It("should be requested by client-go's List method when WatchListClient is enabled", func(ctx context.Context) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
|
||||
|
||||
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
|
||||
var expectedSecrets []v1.Secret
|
||||
for i := 1; i <= 5; i++ {
|
||||
secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
expectedSecrets = append(expectedSecrets, *secret)
|
||||
}
|
||||
expectedSecrets := addWellKnownSecrets(ctx, f)
|
||||
|
||||
rt, clientConfig := clientConfigWithRoundTripper(f)
|
||||
wrappedKubeClient, err := kubernetes.NewForConfig(clientConfig)
|
||||
@ -152,6 +142,35 @@ var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), fe
|
||||
expectedRequestMadeByDynamicClient := getExpectedRequestMadeByClientFor(secretList.GetResourceVersion())
|
||||
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByDynamicClient))
|
||||
})
|
||||
ginkgo.It("should be requested by metadata client's List method when WatchListClient is enabled", func(ctx context.Context) {
|
||||
featuregatetesting.SetFeatureGateDuringTest(ginkgo.GinkgoTB(), utilfeature.DefaultFeatureGate, featuregate.Feature(clientfeatures.WatchListClient), true)
|
||||
|
||||
metaClient, err := metadata.NewForConfig(f.ClientConfig())
|
||||
framework.ExpectNoError(err)
|
||||
expectedMetaSecrets := []metav1.PartialObjectMetadata{}
|
||||
for _, addedSecret := range addWellKnownSecrets(ctx, f) {
|
||||
addedSecretMeta, err := metaClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).Get(ctx, addedSecret.Name, metav1.GetOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
expectedMetaSecrets = append(expectedMetaSecrets, *addedSecretMeta)
|
||||
}
|
||||
|
||||
rt, clientConfig := clientConfigWithRoundTripper(f)
|
||||
wrappedMetaClient, err := metadata.NewForConfig(clientConfig)
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Streaming secrets metadata from the server")
|
||||
secretMetaList, err := wrappedMetaClient.Resource(v1.SchemeGroupVersion.WithResource("secrets")).Namespace(f.Namespace.Name).List(ctx, metav1.ListOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
|
||||
ginkgo.By("Verifying if the secret meta list was properly streamed")
|
||||
streamedMetaSecrets := secretMetaList.Items
|
||||
gomega.Expect(cmp.Equal(expectedMetaSecrets, streamedMetaSecrets)).To(gomega.BeTrueBecause("data received via watchlist must match the added data"))
|
||||
gomega.Expect(secretMetaList.GetObjectKind().GroupVersionKind()).To(gomega.Equal(schema.GroupVersion{}.WithKind("PartialObjectMetadataList")))
|
||||
|
||||
ginkgo.By("Verifying if expected requests were sent to the server")
|
||||
expectedRequestMadeByMetaClient := getExpectedRequestMadeByClientFor(secretMetaList.GetResourceVersion())
|
||||
gomega.Expect(rt.actualRequests).To(gomega.Equal(expectedRequestMadeByMetaClient))
|
||||
})
|
||||
})
|
||||
|
||||
type roundTripper struct {
|
||||
@ -204,6 +223,17 @@ func getExpectedRequestMadeByClientFor(rv string) []string {
|
||||
return expectedRequestMadeByClient
|
||||
}
|
||||
|
||||
func addWellKnownSecrets(ctx context.Context, f *framework.Framework) []v1.Secret {
|
||||
ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
|
||||
var secrets []v1.Secret
|
||||
for i := 1; i <= 5; i++ {
|
||||
secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
|
||||
framework.ExpectNoError(err)
|
||||
secrets = append(secrets, *secret)
|
||||
}
|
||||
return secrets
|
||||
}
|
||||
|
||||
type byName []v1.Secret
|
||||
|
||||
func (a byName) Len() int { return len(a) }
|
||||
|
Loading…
Reference in New Issue
Block a user