mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-01-05 23:47:50 +00:00
Replace negotiation with a new method that can extract info
Alter how runtime.SerializeInfo is represented to simplify negotiation and reduce the need to allocate during negotiation. Simplify the dynamic client's logic around negotiating type. Add more tests for media type handling where necessary.
This commit is contained in:
@@ -18,6 +18,7 @@ package restclient
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"mime"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@@ -153,34 +154,48 @@ func readExpBackoffConfig() BackoffManager {
|
||||
}
|
||||
|
||||
// createSerializers creates all necessary serializers for given contentType.
|
||||
// TODO: the negotiated serializer passed to this method should probably return
|
||||
// serializers that control decoding and versioning without this package
|
||||
// being aware of the types. Depends on whether RESTClient must deal with
|
||||
// generic infrastructure.
|
||||
func createSerializers(config ContentConfig) (*Serializers, error) {
|
||||
negotiated := config.NegotiatedSerializer
|
||||
mediaTypes := config.NegotiatedSerializer.SupportedMediaTypes()
|
||||
contentType := config.ContentType
|
||||
info, ok := negotiated.SerializerForMediaType(contentType, nil)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("serializer for %s not registered", contentType)
|
||||
mediaType, _, err := mime.ParseMediaType(contentType)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("the content type specified in the client configuration is not recognized: %v", err)
|
||||
}
|
||||
streamInfo, ok := negotiated.StreamingSerializerForMediaType(contentType, nil)
|
||||
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, mediaType)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("streaming serializer for %s not registered", contentType)
|
||||
if len(contentType) != 0 || len(mediaTypes) == 0 {
|
||||
return nil, fmt.Errorf("no serializers registered for %s", contentType)
|
||||
}
|
||||
info = mediaTypes[0]
|
||||
}
|
||||
|
||||
internalGV := unversioned.GroupVersion{
|
||||
Group: config.GroupVersion.Group,
|
||||
Version: runtime.APIVersionInternal,
|
||||
}
|
||||
return &Serializers{
|
||||
Encoder: negotiated.EncoderForVersion(info.Serializer, *config.GroupVersion),
|
||||
Decoder: negotiated.DecoderToVersion(info.Serializer, internalGV),
|
||||
StreamingSerializer: streamInfo.Serializer,
|
||||
Framer: streamInfo.Framer,
|
||||
|
||||
s := &Serializers{
|
||||
Encoder: config.NegotiatedSerializer.EncoderForVersion(info.Serializer, *config.GroupVersion),
|
||||
Decoder: config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV),
|
||||
|
||||
RenegotiatedDecoder: func(contentType string, params map[string]string) (runtime.Decoder, error) {
|
||||
renegotiated, ok := negotiated.SerializerForMediaType(contentType, params)
|
||||
info, ok := runtime.SerializerInfoForMediaType(mediaTypes, contentType)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("serializer for %s not registered", contentType)
|
||||
}
|
||||
return negotiated.DecoderToVersion(renegotiated.Serializer, internalGV), nil
|
||||
return config.NegotiatedSerializer.DecoderToVersion(info.Serializer, internalGV), nil
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
if info.StreamSerializer != nil {
|
||||
s.StreamingSerializer = info.StreamSerializer.Serializer
|
||||
s.Framer = info.StreamSerializer.Framer
|
||||
}
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Verb begins a request with a verb (GET, POST, PUT, DELETE).
|
||||
|
||||
@@ -153,20 +153,8 @@ var fakeWrapperFunc = func(http.RoundTripper) http.RoundTripper {
|
||||
|
||||
type fakeNegotiatedSerializer struct{}
|
||||
|
||||
func (n *fakeNegotiatedSerializer) SupportedMediaTypes() []string {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
func (n *fakeNegotiatedSerializer) SerializerForMediaType(mediaType string, params map[string]string) (s runtime.SerializerInfo, ok bool) {
|
||||
return runtime.SerializerInfo{}, true
|
||||
}
|
||||
|
||||
func (n *fakeNegotiatedSerializer) SupportedStreamingMediaTypes() []string {
|
||||
return []string{}
|
||||
}
|
||||
|
||||
func (n *fakeNegotiatedSerializer) StreamingSerializerForMediaType(mediaType string, params map[string]string) (s runtime.StreamSerializerInfo, ok bool) {
|
||||
return runtime.StreamSerializerInfo{}, true
|
||||
func (n *fakeNegotiatedSerializer) SupportedMediaTypes() []runtime.SerializerInfo {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *fakeNegotiatedSerializer) EncoderForVersion(serializer runtime.Encoder, gv runtime.GroupVersioner) runtime.Encoder {
|
||||
|
||||
@@ -306,10 +306,7 @@ func setDiscoveryDefaults(config *restclient.Config) error {
|
||||
config.APIPath = ""
|
||||
config.GroupVersion = nil
|
||||
codec := runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()}
|
||||
config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(
|
||||
runtime.SerializerInfo{Serializer: codec},
|
||||
runtime.StreamSerializerInfo{},
|
||||
)
|
||||
config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec})
|
||||
if len(config.UserAgent) == 0 {
|
||||
config.UserAgent = restclient.DefaultKubernetesUserAgent()
|
||||
}
|
||||
|
||||
@@ -241,13 +241,22 @@ func (dynamicCodec) Encode(obj runtime.Object, w io.Writer) error {
|
||||
|
||||
// ContentConfig returns a restclient.ContentConfig for dynamic types.
|
||||
func ContentConfig() restclient.ContentConfig {
|
||||
// TODO: it's questionable that this should be using anything other than unstructured schema and JSON
|
||||
codec := dynamicCodec{}
|
||||
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
|
||||
var jsonInfo runtime.SerializerInfo
|
||||
// TODO: api.Codecs here should become "pkg/apis/server/scheme" which is the minimal core you need
|
||||
// to talk to a kubernetes server
|
||||
for _, info := range api.Codecs.SupportedMediaTypes() {
|
||||
if info.MediaType == runtime.ContentTypeJSON {
|
||||
jsonInfo = info
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
jsonInfo.Serializer = dynamicCodec{}
|
||||
jsonInfo.PrettySerializer = nil
|
||||
return restclient.ContentConfig{
|
||||
AcceptContentTypes: runtime.ContentTypeJSON,
|
||||
ContentType: runtime.ContentTypeJSON,
|
||||
NegotiatedSerializer: serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: codec}, streamingInfo),
|
||||
NegotiatedSerializer: serializer.NegotiatedSerializerWrapper(jsonInfo),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,12 +19,9 @@ package dynamic
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/meta"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/restclient"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/runtime/serializer"
|
||||
)
|
||||
|
||||
// ClientPool manages a pool of dynamic clients.
|
||||
@@ -64,6 +61,7 @@ type clientPoolImpl struct {
|
||||
// resources or groups.
|
||||
func NewClientPool(config *restclient.Config, mapper meta.RESTMapper, apiPathResolverFunc APIPathResolverFunc) ClientPool {
|
||||
confCopy := *config
|
||||
|
||||
return &clientPoolImpl{
|
||||
config: &confCopy,
|
||||
clients: map[unversioned.GroupVersion]*Client{},
|
||||
@@ -108,11 +106,6 @@ func (c *clientPoolImpl) ClientForGroupVersionKind(kind unversioned.GroupVersion
|
||||
// we need to make a client
|
||||
conf.GroupVersion = &gv
|
||||
|
||||
if conf.NegotiatedSerializer == nil {
|
||||
streamingInfo, _ := api.Codecs.StreamingSerializerForMediaType("application/json;stream=watch", nil)
|
||||
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(runtime.SerializerInfo{Serializer: dynamicCodec{}}, streamingInfo)
|
||||
}
|
||||
|
||||
dynamicClient, err := NewClient(conf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
@@ -529,6 +529,7 @@ func TestPatch(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Write(data)
|
||||
})
|
||||
if err != nil {
|
||||
|
||||
@@ -92,25 +92,25 @@ func (c *RESTClient) request(verb string) *restclient.Request {
|
||||
GroupVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion,
|
||||
NegotiatedSerializer: c.NegotiatedSerializer,
|
||||
}
|
||||
ns := c.NegotiatedSerializer
|
||||
serializer, _ := ns.SerializerForMediaType(runtime.ContentTypeJSON, nil)
|
||||
streamingSerializer, _ := ns.StreamingSerializerForMediaType(runtime.ContentTypeJSON, nil)
|
||||
|
||||
groupName := api.GroupName
|
||||
if c.GroupName != "" {
|
||||
groupName = c.GroupName
|
||||
}
|
||||
|
||||
ns := c.NegotiatedSerializer
|
||||
info, _ := runtime.SerializerInfoForMediaType(ns.SupportedMediaTypes(), runtime.ContentTypeJSON)
|
||||
internalVersion := unversioned.GroupVersion{
|
||||
Group: registered.GroupOrDie(groupName).GroupVersion.Group,
|
||||
Version: runtime.APIVersionInternal,
|
||||
}
|
||||
internalVersion.Version = runtime.APIVersionInternal
|
||||
serializers := restclient.Serializers{
|
||||
Encoder: ns.EncoderForVersion(serializer, registered.GroupOrDie(api.GroupName).GroupVersion),
|
||||
Decoder: ns.DecoderToVersion(serializer, internalVersion),
|
||||
StreamingSerializer: streamingSerializer,
|
||||
Framer: streamingSerializer.Framer,
|
||||
Encoder: ns.EncoderForVersion(info.Serializer, registered.GroupOrDie(api.GroupName).GroupVersion),
|
||||
Decoder: ns.DecoderToVersion(info.Serializer, internalVersion),
|
||||
}
|
||||
if info.StreamSerializer != nil {
|
||||
serializers.StreamingSerializer = info.StreamSerializer.Serializer
|
||||
serializers.Framer = info.StreamSerializer.Framer
|
||||
}
|
||||
return restclient.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", config, serializers, nil, nil)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user