dynamic client: add support for API streaming

Kubernetes-commit: d778356bc6a057ae41bee4577e568293a25fce9b
This commit is contained in:
Lukasz Szaszkiewicz
2024-07-11 13:49:29 +02:00
committed by Kubernetes Publisher
parent 34751e103a
commit 732dd289a0
2 changed files with 207 additions and 5 deletions

View File

@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"net/http"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -30,6 +31,8 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/consistencydetector"
"k8s.io/client-go/util/watchlist"
"k8s.io/klog/v2"
)
type DynamicClient struct {
@@ -293,13 +296,22 @@ func (c *dynamicResourceClient) Get(ctx context.Context, name string, opts metav
return uncastObj.(*unstructured.Unstructured), nil
}
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (result *unstructured.UnstructuredList, err error) {
defer func() {
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if watchListOptions, hasWatchListOptionsPrepared, watchListOptionsErr := watchlist.PrepareWatchListOptionsFromListOptions(opts); watchListOptionsErr != nil {
klog.Warningf("Failed preparing watchlist options for %v, falling back to the standard LIST semantics, err = %v", c.resource, watchListOptionsErr)
} else if hasWatchListOptionsPrepared {
result, err := c.watchList(ctx, watchListOptions)
if err == nil {
consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result)
consistencydetector.CheckWatchListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("watchlist request for %v", c.resource), c.list, opts, result)
return result, nil
}
}()
return c.list(ctx, opts)
klog.Warningf("The watchlist request for %v ended with an error, falling back to the standard LIST semantics, err = %v", c.resource, err)
}
result, err := c.list(ctx, opts)
if err == nil {
consistencydetector.CheckListFromCacheDataConsistencyIfRequested(ctx, fmt.Sprintf("list request for %v", c.resource), c.list, opts, result)
}
return result, err
}
func (c *dynamicResourceClient) list(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
@@ -329,6 +341,27 @@ func (c *dynamicResourceClient) list(ctx context.Context, opts metav1.ListOption
return list, nil
}
// watchList establishes a watch stream with the server and returns an unstructured list.
func (c *dynamicResourceClient) watchList(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if err := validateNamespaceWithOptionalName(c.namespace); err != nil {
return nil, err
}
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result := &unstructured.UnstructuredList{}
err := c.client.client.Get().AbsPath(c.makeURLSegments("")...).
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
Timeout(timeout).
WatchList(ctx).
Into(result)
return result, err
}
func (c *dynamicResourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
opts.Watch = true
if err := validateNamespaceWithOptionalName(c.namespace); err != nil {