Merge pull request #132229 from p0lyn0mial/upstream-watchlist-dynamic-client-rm

dynamic client: remove support for API streaming

Kubernetes-commit: 5abd2ac2e3900165d87c6078f55afd98ae6b085a
This commit is contained in:
Kubernetes Publisher 2025-06-12 00:45:03 -07:00
commit d33a6ba860
4 changed files with 7 additions and 229 deletions

View File

@ -19,7 +19,6 @@ package dynamic
import ( import (
"bytes" "bytes"
"context" "context"
"encoding/base64"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -28,8 +27,6 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
@ -37,8 +34,6 @@ import (
"k8s.io/apimachinery/pkg/runtime/serializer/streaming" "k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
clientfeatures "k8s.io/client-go/features"
clientfeaturestesting "k8s.io/client-go/features/testing"
restclient "k8s.io/client-go/rest" restclient "k8s.io/client-go/rest"
restclientwatch "k8s.io/client-go/rest/watch" restclientwatch "k8s.io/client-go/rest/watch"
) )
@ -159,179 +154,6 @@ func TestList(t *testing.T) {
} }
} }
func TestWatchList(t *testing.T) {
clientfeaturestesting.SetFeatureDuringTest(t, clientfeatures.WatchListClient, true)
type requestParam struct {
Path string
Query string
}
scenarios := []struct {
name string
namespace string
watchResponse []watch.Event
listResponse []byte
expectedRequestParams []requestParam
expectedList *unstructured.UnstructuredList
}{
{
name: "watch-list request for cluster wide resource",
watchResponse: []watch.Event{
{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item1")},
{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item2")},
{Type: watch.Bookmark, Object: func() runtime.Object {
obj := getObject("gtest/vTest", "rTest", "item2")
obj.SetResourceVersion("10")
obj.SetAnnotations(map[string]string{
metav1.InitialEventsAnnotationKey: "true",
metav1.InitialEventsListBlueprintAnnotationKey: base64.StdEncoding.EncodeToString(getJSON("vTest", "rTests", "")),
})
return obj
}()},
},
expectedRequestParams: []requestParam{
{
Path: "/apis/gtest/vtest/rtest",
Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true",
},
},
expectedList: &unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": "vTest",
"kind": "rTests",
"metadata": map[string]interface{}{
"name": "",
"resourceVersion": "10",
},
},
Items: []unstructured.Unstructured{
*getObject("gtest/vTest", "rTest", "item1"),
*getObject("gtest/vTest", "rTest", "item2"),
},
},
},
{
name: "watch-list request for namespaced watch resource",
namespace: "nstest",
watchResponse: []watch.Event{
{Type: watch.Added, Object: getObject("gtest/vTest", "rTest", "item1")},
{Type: watch.Bookmark, Object: func() runtime.Object {
obj := getObject("gtest/vTest", "rTest", "item2")
obj.SetResourceVersion("39")
obj.SetAnnotations(map[string]string{
metav1.InitialEventsAnnotationKey: "true",
metav1.InitialEventsListBlueprintAnnotationKey: base64.StdEncoding.EncodeToString(getJSON("vTest", "rTests", "")),
})
return obj
}()},
},
expectedRequestParams: []requestParam{
{
Path: "/apis/gtest/vtest/namespaces/nstest/rtest",
Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true",
},
},
expectedList: &unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": "vTest",
"kind": "rTests",
"metadata": map[string]interface{}{
"name": "",
"resourceVersion": "39",
},
},
Items: []unstructured.Unstructured{
*getObject("gtest/vTest", "rTest", "item1"),
},
},
},
{
name: "watch-list request falls back to standard list on any error",
namespace: "nstest",
// watchList method in client-go expect only watch.Add and watch.Bookmark events
// receiving watch.Error will cause this method to report an error which will
// trigger the fallback logic
watchResponse: []watch.Event{
{Type: watch.Error, Object: getObject("gtest/vTest", "rTest", "item1")},
},
listResponse: getListJSON("vTest", "UnstructuredList",
getJSON("gtest/vTest", "rTest", "item1"),
getJSON("gtest/vTest", "rTest", "item2")),
expectedRequestParams: []requestParam{
// a watch-list request first
{
Path: "/apis/gtest/vtest/namespaces/nstest/rtest",
Query: "allowWatchBookmarks=true&resourceVersionMatch=NotOlderThan&sendInitialEvents=true&watch=true",
},
// a standard list request second
{
Path: "/apis/gtest/vtest/namespaces/nstest/rtest",
},
},
expectedList: &unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": "vTest",
"kind": "UnstructuredList",
},
Items: []unstructured.Unstructured{
*getObject("gtest/vTest", "rTest", "item1"),
*getObject("gtest/vTest", "rTest", "item2"),
},
},
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
var actualRequestParams []requestParam
resource := schema.GroupVersionResource{Group: "gtest", Version: "vtest", Resource: "rtest"}
cl, srv, err := getClientServer(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "GET" {
t.Errorf("unexpected HTTP method %s. expected GET", r.Method)
}
actualRequestParams = append(actualRequestParams, requestParam{
Path: r.URL.Path,
Query: r.URL.RawQuery,
})
w.Header().Set("Content-Type", runtime.ContentTypeJSON)
// handle LIST response
if len(scenario.listResponse) > 0 {
if _, err := w.Write(scenario.listResponse); err != nil {
t.Fatal(err)
}
return
}
// handle WATCH response
enc := restclientwatch.NewEncoder(streaming.NewEncoder(w, unstructured.UnstructuredJSONScheme), unstructured.UnstructuredJSONScheme)
for _, e := range scenario.watchResponse {
if err := enc.Encode(&e); err != nil {
t.Fatal(err)
}
}
})
if err != nil {
t.Fatalf("unexpected error when creating test client and server: %v", err)
}
defer srv.Close()
actualList, err := cl.Resource(resource).Namespace(scenario.namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !cmp.Equal(scenario.expectedRequestParams, actualRequestParams) {
t.Fatalf("unexpected request params: %v", cmp.Diff(scenario.expectedRequestParams, actualRequestParams))
}
if !cmp.Equal(scenario.expectedList, actualList) {
t.Errorf("received expected list, diff: %s", cmp.Diff(scenario.expectedList, actualList))
}
})
}
}
func TestGet(t *testing.T) { func TestGet(t *testing.T) {
tcs := []struct { tcs := []struct {
resource string resource string

View File

@ -19,9 +19,6 @@ package dynamic
import ( import (
"context" "context"
"fmt" "fmt"
"net/http"
"time"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -31,9 +28,7 @@ import (
"k8s.io/client-go/features" "k8s.io/client-go/features"
"k8s.io/client-go/rest" "k8s.io/client-go/rest"
"k8s.io/client-go/util/apply" "k8s.io/client-go/util/apply"
"k8s.io/client-go/util/consistencydetector" "net/http"
"k8s.io/client-go/util/watchlist"
"k8s.io/klog/v2"
) )
type DynamicClient struct { type DynamicClient struct {
@ -251,24 +246,6 @@ func (c *dynamicResourceClient) Get(ctx context.Context, name string, opts metav
} }
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) { func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if watchListOptions, hasWatchListOptionsPrepared, watchListOptionsErr := watchlist.PrepareWatchListOptionsFromListOptions(opts); watchListOptionsErr != nil {
klog.Warningf("Failed preparing watchlist options for %v, falling back to the standard LIST semantics, err = %v", c.resource, watchListOptionsErr)
} else if hasWatchListOptionsPrepared {
result, err := c.watchList(ctx, watchListOptions)
if err == nil {
consistencydetector.CheckWatchListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("watchlist request for %v", c.resource), c.list, opts, result)
return result, nil
}
klog.Warningf("The watchlist request for %v ended with an error, falling back to the standard LIST semantics, err = %v", c.resource, err)
}
result, err := c.list(ctx, opts)
if err == nil {
consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result)
}
return result, err
}
func (c *dynamicResourceClient) list(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if err := validateNamespaceWithOptionalName(c.namespace); err != nil { if err := validateNamespaceWithOptionalName(c.namespace); err != nil {
return nil, err return nil, err
} }
@ -283,27 +260,6 @@ func (c *dynamicResourceClient) list(ctx context.Context, opts metav1.ListOption
return &out, nil return &out, nil
} }
// watchList establishes a watch stream with the server and returns an unstructured list.
func (c *dynamicResourceClient) watchList(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if err := validateNamespaceWithOptionalName(c.namespace); err != nil {
return nil, err
}
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result := &unstructured.UnstructuredList{}
err := c.client.client.Get().AbsPath(c.makeURLSegments("")...).
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
Timeout(timeout).
WatchList(ctx).
Into(result)
return result, err
}
func (c *dynamicResourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { func (c *dynamicResourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
opts.Watch = true opts.Watch = true
if err := validateNamespaceWithOptionalName(c.namespace); err != nil { if err := validateNamespaceWithOptionalName(c.namespace); err != nil {

4
go.mod
View File

@ -25,8 +25,8 @@ require (
golang.org/x/time v0.9.0 golang.org/x/time v0.9.0
google.golang.org/protobuf v1.36.5 google.golang.org/protobuf v1.36.5
gopkg.in/evanphx/json-patch.v4 v4.12.0 gopkg.in/evanphx/json-patch.v4 v4.12.0
k8s.io/api v0.0.0-20250610195658-be9d9f7dc6a8 k8s.io/api v0.0.0-20250612075647-2f1ca0f4e40c
k8s.io/apimachinery v0.0.0-20250606195423-d5745c2f38f8 k8s.io/apimachinery v0.0.0-20250612075405-9ad036216e83
k8s.io/klog/v2 v2.130.1 k8s.io/klog/v2 v2.130.1
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 k8s.io/utils v0.0.0-20250604170112-4c0f3b243397

8
go.sum
View File

@ -146,10 +146,10 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
k8s.io/api v0.0.0-20250610195658-be9d9f7dc6a8 h1:FqgJO31JFCAbGW7/3ec2lOgfBIw1v0cndlJBrN58umk= k8s.io/api v0.0.0-20250612075647-2f1ca0f4e40c h1:8pRhXap6JbPdOUojXUb8cyo/1JNqX/TJCpaHMZIwSWM=
k8s.io/api v0.0.0-20250610195658-be9d9f7dc6a8/go.mod h1:xYU4p3Ir8msMLqBi9mgeDAgHcf2fNo6NKYkkYFogdEI= k8s.io/api v0.0.0-20250612075647-2f1ca0f4e40c/go.mod h1:dB84Z2NTVsuY1l3HM8lCbv+ofC6Nq/zLmUJP86ufqKo=
k8s.io/apimachinery v0.0.0-20250606195423-d5745c2f38f8 h1:A1dmKRZ8nLaHG91IF7/hYkS+gxL+nbK2IbE0bpr+vZM= k8s.io/apimachinery v0.0.0-20250612075405-9ad036216e83 h1:c5S8r9Ce8xU4WgO9OOk19pSp85bk/gwtYhXTvApMGnI=
k8s.io/apimachinery v0.0.0-20250606195423-d5745c2f38f8/go.mod h1:PGtm8W98oUVV5mUJGWUaZ+YbAQNG2JrGPKDZ+9v87rk= k8s.io/apimachinery v0.0.0-20250612075405-9ad036216e83/go.mod h1:PGtm8W98oUVV5mUJGWUaZ+YbAQNG2JrGPKDZ+9v87rk=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4= k8s.io/kube-openapi v0.0.0-20250318190949-c8a335a9a2ff h1:/usPimJzUKKu+m+TE36gUyGcf03XZEP0ZIKgKj35LS4=