diff --git a/metadata/metadata_test.go b/metadata/metadata_test.go index 8e2069265..905a70308 100644 --- a/metadata/metadata_test.go +++ b/metadata/metadata_test.go @@ -32,9 +32,20 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/rest" + "k8s.io/client-go/util/watchlist" "k8s.io/klog/v2/ktesting" ) +func TestDoesClientSupportWatchListSemantics(t *testing.T) { + target, err := NewForConfig(&rest.Config{}) + if err != nil { + t.Fatal(err) + } + if watchlist.DoesClientNotSupportWatchListSemantics(target) { + t.Fatalf("Metadata client should support WatchList semantics") + } +} + func TestClient(t *testing.T) { gvr := schema.GroupVersionResource{Group: "group", Version: "v1", Resource: "resource"} statusOK := &metav1.Status{ diff --git a/metadata/metadatainformer/informer.go b/metadata/metadatainformer/informer.go index 6eb0584fd..665198f12 100644 --- a/metadata/metadatainformer/informer.go +++ b/metadata/metadatainformer/informer.go @@ -178,7 +178,7 @@ func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVers return &metadataInformer{ gvr: gvr, informer: cache.NewSharedIndexInformer( - &cache.ListWatch{ + cache.ToListWatcherWithWatchListSemantics(&cache.ListWatch{ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { if tweakListOptions != nil { tweakListOptions(&options) @@ -203,7 +203,7 @@ func NewFilteredMetadataInformer(client metadata.Interface, gvr schema.GroupVers } return client.Resource(gvr).Namespace(namespace).Watch(ctx, options) }, - }, + }, client), &metav1.PartialObjectMetadata{}, resyncPeriod, indexers, diff --git a/metadata/metadatainformer/informer_test.go b/metadata/metadatainformer/informer_test.go index f52dbf061..a1b9a99a6 100644 --- a/metadata/metadatainformer/informer_test.go +++ b/metadata/metadatainformer/informer_test.go @@ -19,18 +19,27 @@ package metadatainformer import ( "context" "flag" + "net/http" + "net/http/httptest" "testing" "time" "github.com/google/go-cmp/cmp" - "k8s.io/klog/v2" + appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/watch" + clientfeatures "k8s.io/client-go/features" + clientfeaturestesting "k8s.io/client-go/features/testing" + "k8s.io/client-go/metadata" "k8s.io/client-go/metadata/fake" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" ) func init() { @@ -39,6 +48,85 @@ func init() { flag.CommandLine.Lookup("alsologtostderr").Value.Set("true") } +func TestWatchListSemanticsSimple(t *testing.T) { + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + obj := &metav1.PartialObjectMetadata{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "meta.k8s.io/v1", + Kind: "PartialObjectMetadata", + }, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + metav1.InitialEventsAnnotationKey: "true", + }, + }, + } + rawObj, err := json.Marshal(obj) + if err != nil { + t.Errorf("failed to marshal rawObj: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + watchEvent := &metav1.WatchEvent{ + Type: string(watch.Bookmark), + Object: runtime.RawExtension{Raw: rawObj}, + } + rawRsp, err := json.Marshal(watchEvent) + if err != nil { + t.Errorf("failed to marshal watchEvent: %v", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/json") + _, err = w.Write(rawRsp) + if err != nil { + t.Fatalf("failed to write response: %v", err) + } + })) + defer server.Close() + + cfg := &rest.Config{Host: server.URL} + client, err := metadata.NewForConfig(cfg) + if err != nil { + t.Fatal(err) + } + + factory := NewFilteredSharedInformerFactory(client, 0, "ns", nil) + target := factory.ForResource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + factory.Start(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), target.Informer().HasSynced) { + t.Fatalf("failed to wait for caches to sync") + } +} + +func TestUnSupportWatchListSemantics(t *testing.T) { + clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true) + scheme := runtime.NewScheme() + if err := appsv1.AddToScheme(scheme); err != nil { + t.Fatalf("failed to add appsv1 to scheme: %v", err) + } + // The fake client doesn’t support WatchList semantics, + // so we don’t need to prepare a response. + fakeClient := fake.NewSimpleMetadataClient(scheme) + factory := NewFilteredSharedInformerFactory(fakeClient, 0, "ns", nil) + target := factory.ForResource(schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"}) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + factory.Start(ctx.Done()) + + if !cache.WaitForCacheSync(ctx.Done(), target.Informer().HasSynced) { + t.Fatalf("failed to wait for caches to sync") + } +} + func TestMetadataSharedInformerFactory(t *testing.T) { scenarios := []struct { name string