mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 23:15:14 +00:00
dynamic: The dynamic client no longer needs a special cased watch
By correctly handling content type negotiation, we can avoid the need for a special version of watch and use the same code path as typed clients.
This commit is contained in:
parent
9aad6aa54d
commit
3f94f80b0a
@ -40,8 +40,6 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/json:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/streaming:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/versioning:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
"//staging/src/k8s.io/client-go/rest:go_default_library",
|
||||||
|
@ -537,6 +537,8 @@ func TestWatch(t *testing.T) {
|
|||||||
t.Errorf("Watch(%q) got query %s. wanted %s", tc.name, r.URL.RawQuery, tc.query)
|
t.Errorf("Watch(%q) got query %s. wanted %s", tc.name, r.URL.RawQuery, tc.query)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
enc := restclientwatch.NewEncoder(streaming.NewEncoder(w, unstructured.UnstructuredJSONScheme), unstructured.UnstructuredJSONScheme)
|
enc := restclientwatch.NewEncoder(streaming.NewEncoder(w, unstructured.UnstructuredJSONScheme), unstructured.UnstructuredJSONScheme)
|
||||||
for _, e := range tc.events {
|
for _, e := range tc.events {
|
||||||
enc.Encode(&e)
|
enc.Encode(&e)
|
||||||
|
@ -18,11 +18,11 @@ package dynamic
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer/json"
|
"k8s.io/apimachinery/pkg/runtime/serializer/json"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer/versioning"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var watchScheme = runtime.NewScheme()
|
var watchScheme = runtime.NewScheme()
|
||||||
@ -41,37 +41,6 @@ func init() {
|
|||||||
metav1.AddToGroupVersion(deleteScheme, versionV1)
|
metav1.AddToGroupVersion(deleteScheme, versionV1)
|
||||||
}
|
}
|
||||||
|
|
||||||
var watchJsonSerializerInfo = runtime.SerializerInfo{
|
|
||||||
MediaType: "application/json",
|
|
||||||
MediaTypeType: "application",
|
|
||||||
MediaTypeSubType: "json",
|
|
||||||
EncodesAsText: true,
|
|
||||||
Serializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, false),
|
|
||||||
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, true),
|
|
||||||
StreamSerializer: &runtime.StreamSerializerInfo{
|
|
||||||
EncodesAsText: true,
|
|
||||||
Serializer: json.NewSerializer(json.DefaultMetaFactory, watchScheme, watchScheme, false),
|
|
||||||
Framer: json.Framer,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// watchNegotiatedSerializer is used to read the wrapper of the watch stream
|
|
||||||
type watchNegotiatedSerializer struct{}
|
|
||||||
|
|
||||||
var watchNegotiatedSerializerInstance = watchNegotiatedSerializer{}
|
|
||||||
|
|
||||||
func (s watchNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
|
|
||||||
return []runtime.SerializerInfo{watchJsonSerializerInfo}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s watchNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
|
|
||||||
return versioning.NewDefaultingCodecForScheme(watchScheme, encoder, nil, gv, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s watchNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
|
|
||||||
return versioning.NewDefaultingCodecForScheme(watchScheme, nil, decoder, nil, gv)
|
|
||||||
}
|
|
||||||
|
|
||||||
// basicNegotiatedSerializer is used to handle discovery and error handling serialization
|
// basicNegotiatedSerializer is used to handle discovery and error handling serialization
|
||||||
type basicNegotiatedSerializer struct{}
|
type basicNegotiatedSerializer struct{}
|
||||||
|
|
||||||
@ -82,8 +51,8 @@ func (s basicNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInf
|
|||||||
MediaTypeType: "application",
|
MediaTypeType: "application",
|
||||||
MediaTypeSubType: "json",
|
MediaTypeSubType: "json",
|
||||||
EncodesAsText: true,
|
EncodesAsText: true,
|
||||||
Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false),
|
Serializer: json.NewSerializer(json.DefaultMetaFactory, unstructuredCreater{basicScheme}, unstructuredTyper{basicScheme}, false),
|
||||||
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, true),
|
PrettySerializer: json.NewSerializer(json.DefaultMetaFactory, unstructuredCreater{basicScheme}, unstructuredTyper{basicScheme}, true),
|
||||||
StreamSerializer: &runtime.StreamSerializerInfo{
|
StreamSerializer: &runtime.StreamSerializerInfo{
|
||||||
EncodesAsText: true,
|
EncodesAsText: true,
|
||||||
Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false),
|
Serializer: json.NewSerializer(json.DefaultMetaFactory, basicScheme, basicScheme, false),
|
||||||
@ -94,9 +63,46 @@ func (s basicNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInf
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s basicNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
|
func (s basicNegotiatedSerializer) EncoderForVersion(encoder runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
|
||||||
return versioning.NewDefaultingCodecForScheme(watchScheme, encoder, nil, gv, nil)
|
return runtime.WithVersionEncoder{
|
||||||
|
Version: gv,
|
||||||
|
Encoder: encoder,
|
||||||
|
ObjectTyper: unstructuredTyper{basicScheme},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s basicNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
|
func (s basicNegotiatedSerializer) DecoderToVersion(decoder runtime.Decoder, gv runtime.GroupVersioner) runtime.Decoder {
|
||||||
return versioning.NewDefaultingCodecForScheme(watchScheme, nil, decoder, nil, gv)
|
return decoder
|
||||||
|
}
|
||||||
|
|
||||||
|
type unstructuredCreater struct {
|
||||||
|
nested runtime.ObjectCreater
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c unstructuredCreater) New(kind schema.GroupVersionKind) (runtime.Object, error) {
|
||||||
|
out, err := c.nested.New(kind)
|
||||||
|
if err == nil {
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
out = &unstructured.Unstructured{}
|
||||||
|
out.GetObjectKind().SetGroupVersionKind(kind)
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type unstructuredTyper struct {
|
||||||
|
nested runtime.ObjectTyper
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t unstructuredTyper) ObjectKinds(obj runtime.Object) ([]schema.GroupVersionKind, bool, error) {
|
||||||
|
kinds, unversioned, err := t.nested.ObjectKinds(obj)
|
||||||
|
if err == nil {
|
||||||
|
return kinds, unversioned, nil
|
||||||
|
}
|
||||||
|
if _, ok := obj.(runtime.Unstructured); ok && !obj.GetObjectKind().GroupVersionKind().Empty() {
|
||||||
|
return []schema.GroupVersionKind{obj.GetObjectKind().GroupVersionKind()}, false, nil
|
||||||
|
}
|
||||||
|
return nil, false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t unstructuredTyper) Recognizes(gvk schema.GroupVersionKind) bool {
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
@ -18,14 +18,12 @@ package dynamic
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
|
|
||||||
"k8s.io/apimachinery/pkg/api/meta"
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
|
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/watch"
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
@ -282,31 +280,10 @@ func (c *dynamicResourceClient) List(opts metav1.ListOptions) (*unstructured.Uns
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *dynamicResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
|
func (c *dynamicResourceClient) Watch(opts metav1.ListOptions) (watch.Interface, error) {
|
||||||
internalGV := schema.GroupVersions{
|
|
||||||
{Group: c.resource.Group, Version: runtime.APIVersionInternal},
|
|
||||||
// always include the legacy group as a decoding target to handle non-error `Status` return types
|
|
||||||
{Group: "", Version: runtime.APIVersionInternal},
|
|
||||||
}
|
|
||||||
s := &rest.Serializers{
|
|
||||||
Encoder: watchNegotiatedSerializerInstance.EncoderForVersion(watchJsonSerializerInfo.Serializer, c.resource.GroupVersion()),
|
|
||||||
Decoder: watchNegotiatedSerializerInstance.DecoderToVersion(watchJsonSerializerInfo.Serializer, internalGV),
|
|
||||||
|
|
||||||
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
|
|
||||||
return watchNegotiatedSerializerInstance.DecoderToVersion(watchJsonSerializerInfo.Serializer, internalGV), nil
|
|
||||||
},
|
|
||||||
StreamingSerializer: watchJsonSerializerInfo.StreamSerializer.Serializer,
|
|
||||||
Framer: watchJsonSerializerInfo.StreamSerializer.Framer,
|
|
||||||
}
|
|
||||||
|
|
||||||
wrappedDecoderFn := func(body io.ReadCloser) streaming.Decoder {
|
|
||||||
framer := s.Framer.NewFrameReader(body)
|
|
||||||
return streaming.NewDecoder(framer, s.StreamingSerializer)
|
|
||||||
}
|
|
||||||
|
|
||||||
opts.Watch = true
|
opts.Watch = true
|
||||||
return c.client.client.Get().AbsPath(c.makeURLSegments("")...).
|
return c.client.client.Get().AbsPath(c.makeURLSegments("")...).
|
||||||
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
|
SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).
|
||||||
WatchWithSpecificDecoders(wrappedDecoderFn, unstructured.UnstructuredJSONScheme)
|
Watch()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *dynamicResourceClient) Patch(name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) {
|
func (c *dynamicResourceClient) Patch(name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) {
|
||||||
|
Loading…
Reference in New Issue
Block a user