client-go/metadata/metadatainformer: wrap the LW with WatchList semantics

Kubernetes-commit: 78fcb4475f4f1ff2d31830c057e8dcb45a1de128
This commit is contained in:
Lukasz Szaszkiewicz
2025-10-17 14:06:47 +02:00
committed by Kubernetes Publisher
parent 716ba150d1
commit fed267b819
3 changed files with 102 additions and 3 deletions

View File

@@ -32,9 +32,20 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/watchlist"
"k8s.io/klog/v2/ktesting"
)
func TestDoesClientSupportWatchListSemantics(t *testing.T) {
target, err := NewForConfig(&rest.Config{})
if err != nil {
t.Fatal(err)
}
if watchlist.DoesClientNotSupportWatchListSemantics(target) {
t.Fatalf("Metadata client should support WatchList semantics")
}
}
func TestClient(t *testing.T) {
gvr := schema.GroupVersionResource{Group: "group", Version: "v1", Resource: "resource"}
statusOK := &metav1.Status{

View File

@@ -178,7 +178,7 @@ func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVers
return &metadataInformer{
gvr: gvr,
informer: cache.NewSharedIndexInformer(
&cache.ListWatch{
cache.ToListWatcherWithWatchListSemantics(&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
@@ -203,7 +203,7 @@ func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVers
}
return client.Resource(gvr).Namespace(namespace).Watch(ctx, options)
},
},
}, client),
&metav1.PartialObjectMetadata{},
resyncPeriod,
indexers,

View File

@@ -19,18 +19,27 @@ package metadatainformer
import (
"context"
"flag"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"k8s.io/klog/v2"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
clientfeaturestesting "k8s.io/client-go/features/testing"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/fake"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
func init() {
@@ -39,6 +48,85 @@ func init() {
flag.CommandLine.Lookup("alsologtostderr").Value.Set("true")
}
func TestWatchListSemanticsSimple(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
obj := &metav1.PartialObjectMetadata{
TypeMeta: metav1.TypeMeta{
APIVersion: "meta.k8s.io/v1",
Kind: "PartialObjectMetadata",
},
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
metav1.InitialEventsAnnotationKey: "true",
},
},
}
rawObj, err := json.Marshal(obj)
if err != nil {
t.Errorf("failed to marshal rawObj: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
watchEvent := &metav1.WatchEvent{
Type: string(watch.Bookmark),
Object: runtime.RawExtension{Raw: rawObj},
}
rawRsp, err := json.Marshal(watchEvent)
if err != nil {
t.Errorf("failed to marshal watchEvent: %v", err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
_, err = w.Write(rawRsp)
if err != nil {
t.Fatalf("failed to write response: %v", err)
}
}))
defer server.Close()
cfg := &rest.Config{Host: server.URL}
client, err := metadata.NewForConfig(cfg)
if err != nil {
t.Fatal(err)
}
factory := NewFilteredSharedInformerFactory(client, 0, "ns", nil)
target := factory.ForResource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
factory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), target.Informer().HasSynced) {
t.Fatalf("failed to wait for caches to sync")
}
}
func TestUnSupportWatchListSemantics(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
scheme := runtime.NewScheme()
if err := appsv1.AddToScheme(scheme); err != nil {
t.Fatalf("failed to add appsv1 to scheme: %v", err)
}
// The fake client doesnt support WatchList semantics,
// so we dont need to prepare a response.
fakeClient := fake.NewSimpleMetadataClient(scheme)
factory := NewFilteredSharedInformerFactory(fakeClient, 0, "ns", nil)
target := factory.ForResource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"})
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
factory.Start(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), target.Informer().HasSynced) {
t.Fatalf("failed to wait for caches to sync")
}
}
func TestMetadataSharedInformerFactory(t *testing.T) {
scenarios := []struct {
name string