Merge pull request #24789 from wojtek-t/use_proper_codec_in_client

Automatic merge from submit-queue

Use proper codec in client
This commit is contained in:
k8s-merge-robot 2016-05-04 11:00:04 -07:00
commit 93e3df8e55
41 changed files with 343 additions and 149 deletions

View File

@ -204,7 +204,8 @@ func setConfigDefaults(config *$.Config|raw$) error {
config.GroupVersion = &copyGroupVersion
//}
config.Codec = $.codecs|raw$.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = $.codecs|raw$
if config.QPS == 0 {
config.QPS = 5
}
@ -232,11 +233,7 @@ func setConfigDefaults(config *$.Config|raw$) error {
config.GroupVersion = &copyGroupVersion
//}
codec, ok := $.codecs|raw$.SerializerForFileExtension("json")
if !ok {
return $.Errorf|raw$("unable to find serializer for JSON")
}
config.Codec = codec
config.NegotiatedSerializer = $.codecs|raw$
if config.QPS == 0 {
config.QPS = 5

View File

@ -80,7 +80,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5
}

View File

@ -89,6 +89,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
mux := http.NewServeMux()
podListHandler := func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
pods := mockPodListWatch.Pods()
w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), &pods)))
@ -106,6 +107,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
ts.stats[name] = ts.stats[name] + 1
p := mockPodListWatch.Pod(name)
w.Header().Set("Content-Type", "application/json")
if p != nil {
w.WriteHeader(http.StatusOK)
w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), p)))
@ -117,6 +119,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
mux.HandleFunc(
testapi.Default.ResourcePath("events", namespace, ""),
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
},
)
@ -125,6 +128,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
testapi.Default.ResourcePath("nodes", "", ""),
func(w http.ResponseWriter, r *http.Request) {
var node api.Node
w.Header().Set("Content-Type", "application/json")
if err := json.NewDecoder(r.Body).Decode(&node); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
@ -144,6 +148,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusNotFound)
})

View File

@ -45,13 +45,14 @@ import (
)
var (
Groups = make(map[string]TestGroup)
Default TestGroup
Autoscaling TestGroup
Batch TestGroup
Extensions TestGroup
Apps TestGroup
Federation TestGroup
Groups = make(map[string]TestGroup)
Default TestGroup
Autoscaling TestGroup
Batch TestGroup
Extensions TestGroup
Apps TestGroup
Federation TestGroup
NegotiatedSerializer = api.Codecs
)
type TestGroup struct {

View File

@ -80,7 +80,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5
}

View File

@ -155,7 +155,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5
}

View File

@ -110,7 +110,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5
}

View File

@ -149,7 +149,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5
}

View File

@ -114,7 +114,8 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5
}

View File

@ -17,7 +17,6 @@ limitations under the License.
package v1
import (
fmt "fmt"
api "k8s.io/kubernetes/pkg/api"
registered "k8s.io/kubernetes/pkg/apimachinery/registered"
restclient "k8s.io/kubernetes/pkg/client/restclient"
@ -150,11 +149,7 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}
codec, ok := api.Codecs.SerializerForFileExtension("json")
if !ok {
return fmt.Errorf("unable to find serializer for JSON")
}
config.Codec = codec
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5

View File

@ -17,7 +17,6 @@ limitations under the License.
package v1beta1
import (
fmt "fmt"
api "k8s.io/kubernetes/pkg/api"
registered "k8s.io/kubernetes/pkg/apimachinery/registered"
restclient "k8s.io/kubernetes/pkg/client/restclient"
@ -115,11 +114,7 @@ func setConfigDefaults(config *restclient.Config) error {
config.GroupVersion = &copyGroupVersion
//}
codec, ok := api.Codecs.SerializerForFileExtension("json")
if !ok {
return fmt.Errorf("unable to find serializer for JSON")
}
config.Codec = codec
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5

View File

@ -17,6 +17,7 @@ limitations under the License.
package restclient
import (
"fmt"
"net/http"
"net/url"
"os"
@ -53,6 +54,9 @@ type RESTClient struct {
// contentConfig is the information used to communicate with the server.
contentConfig ContentConfig
// serializers contain all serializers for undelying content type.
serializers Serializers
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle flowcontrol.RateLimiter
@ -60,10 +64,17 @@ type RESTClient struct {
Client *http.Client
}
type Serializers struct {
Encoder runtime.Encoder
Decoder runtime.Decoder
StreamingSerializer runtime.Serializer
Framer runtime.Framer
}
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server.
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) *RESTClient {
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConfig, maxQPS float32, maxBurst int, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
@ -77,6 +88,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
serializers, err := createSerializers(config)
if err != nil {
return nil, err
}
var throttle flowcontrol.RateLimiter
if maxQPS > 0 && rateLimiter == nil {
@ -88,9 +103,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
base: &base,
versionedAPIPath: versionedAPIPath,
contentConfig: config,
serializers: *serializers,
Throttle: throttle,
Client: client,
}
}, nil
}
// GetRateLimiter returns rate limier for a given client, or nil if it's called on a nil client
@ -119,10 +135,38 @@ func readExpBackoffConfig() BackoffManager {
time.Duration(backoffDurationInt)*time.Second)}
}
// createSerializers creates all necessary serializers for given contentType.
func createSerializers(config ContentConfig) (*Serializers, error) {
negotiated := config.NegotiatedSerializer
contentType := config.ContentType
serializer, ok := negotiated.SerializerForMediaType(contentType, nil)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
streamingSerializer, framer, _, ok := negotiated.StreamingSerializerForMediaType(contentType, nil)
if !ok {
return nil, fmt.Errorf("streaming serializer for %s not registered", contentType)
}
if framer == nil {
return nil, fmt.Errorf("no framer for %s", contentType)
}
internalGV := unversioned.GroupVersion{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
}
return &Serializers{
Encoder: negotiated.EncoderForVersion(serializer, *config.GroupVersion),
Decoder: negotiated.DecoderToVersion(serializer, internalGV),
StreamingSerializer: streamingSerializer,
Framer: framer,
}, nil
}
// Verb begins a request with a verb (GET, POST, PUT, DELETE).
//
// Example usage of RESTClient's request building interface:
// c := NewRESTClient(url, codec)
// c, err := NewRESTClient(...)
// if err != nil { ... }
// resp, err := c.Verb("GET").
// Path("pods").
// SelectorParam("labels", "area=staging").
@ -135,9 +179,9 @@ func (c *RESTClient) Verb(verb string) *Request {
backoff := readExpBackoffConfig()
if c.Client == nil {
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle)
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle)
}
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle)
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle)
}
// Post begins a POST request. Short for c.Verb("POST").

View File

@ -46,8 +46,8 @@ func TestDoRequestSuccess(t *testing.T) {
c, err := RESTClientFor(&Config{
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
},
Username: "user",
Password: "pass",
@ -91,8 +91,8 @@ func TestDoRequestFailed(t *testing.T) {
c, err := RESTClientFor(&Config{
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
},
})
if err != nil {
@ -129,8 +129,8 @@ func TestDoRequestCreated(t *testing.T) {
c, err := RESTClientFor(&Config{
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
},
Username: "user",
Password: "pass",

View File

@ -130,9 +130,17 @@ type ContentConfig struct {
// a RESTClient directly. When initializing a Client, will be set with the default
// code version.
GroupVersion *unversioned.GroupVersion
// NegotiatedSerializer is used for obtaining encoders and decoders for multiple
// supported media types.
NegotiatedSerializer runtime.NegotiatedSerializer
// Codec specifies the encoding and decoding behavior for runtime.Objects passed
// to a RESTClient or Client. Required when initializing a RESTClient, optional
// when initializing a Client.
//
// DEPRECATED: Please use NegotiatedSerializer instead.
// Codec is currently used only in some tests and will be removed soon.
// All production setups should use NegotiatedSerializer.
Codec runtime.Codec
}
@ -144,8 +152,8 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
if config.GroupVersion == nil {
return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")
}
if config.Codec == nil {
return nil, fmt.Errorf("Codec is required when initializing a RESTClient")
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")
}
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
@ -163,16 +171,14 @@ func RESTClientFor(config *Config) (*RESTClient, error) {
httpClient = &http.Client{Transport: transport}
}
client := NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
return client, nil
return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
}
// UnversionedRESTClientFor is the same as RESTClientFor, except that it allows
// the config.Version to be empty.
func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
if config.Codec == nil {
return nil, fmt.Errorf("Codec is required when initializing a RESTClient")
if config.NegotiatedSerializer == nil {
return nil, fmt.Errorf("NeogitatedSerializer is required when initializing a RESTClient")
}
baseURL, versionedAPIPath, err := defaultServerUrlFor(config)
@ -196,8 +202,7 @@ func UnversionedRESTClientFor(config *Config) (*RESTClient, error) {
versionConfig.GroupVersion = &v
}
client := NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
return client, nil
return NewRESTClient(baseURL, versionedAPIPath, versionConfig, config.QPS, config.Burst, config.RateLimiter, httpClient)
}
// SetKubernetesDefaults sets default values on the provided client config for accessing the

View File

@ -87,13 +87,13 @@ func TestSetKubernetesDefaultsUserAgent(t *testing.T) {
}
func TestRESTClientRequires(t *testing.T) {
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{Codec: testapi.Default.Codec()}}); err == nil {
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{NegotiatedSerializer: testapi.NegotiatedSerializer}}); err == nil {
t.Errorf("unexpected non-error")
}
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}); err == nil {
t.Errorf("unexpected non-error")
}
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}}); err != nil {
if _, err := RESTClientFor(&Config{Host: "127.0.0.1", ContentConfig: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), NegotiatedSerializer: testapi.NegotiatedSerializer}}); err != nil {
t.Errorf("unexpected error: %v", err)
}
}

View File

@ -39,11 +39,12 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
watchjson "k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
)
var (
@ -90,8 +91,9 @@ type Request struct {
client HTTPClient
verb string
baseURL *url.URL
content ContentConfig
baseURL *url.URL
content ContentConfig
serializers Serializers
// generic components accessible via method setters
pathPrefix string
@ -121,7 +123,7 @@ type Request struct {
}
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request {
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request {
if backoff == nil {
glog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}
@ -132,13 +134,14 @@ func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPa
pathPrefix = path.Join(pathPrefix, baseURL.Path)
}
r := &Request{
client: client,
verb: verb,
baseURL: baseURL,
pathPrefix: path.Join(pathPrefix, versionedAPIPath),
content: content,
backoffMgr: backoff,
throttle: throttle,
client: client,
verb: verb,
baseURL: baseURL,
pathPrefix: path.Join(pathPrefix, versionedAPIPath),
content: content,
serializers: serializers,
backoffMgr: backoff,
throttle: throttle,
}
if len(content.ContentType) > 0 {
r.SetHeader("Accept", content.ContentType+", */*")
@ -547,7 +550,7 @@ func (r *Request) Body(obj interface{}) *Request {
if reflect.ValueOf(t).IsNil() {
return r
}
data, err := runtime.Encode(r.content.Codec, t)
data, err := runtime.Encode(r.serializers.Encoder, t)
if err != nil {
r.err = err
return r
@ -670,7 +673,9 @@ func (r *Request) Watch() (watch.Interface, error) {
}
return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
}
return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.content.Codec)), nil
framer := r.serializers.Framer.NewFrameReader(resp.Body)
decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
return watch.NewStreamWatcher(versioned.NewDecoder(decoder, r.serializers.Decoder)), nil
}
// updateURLMetrics is a convenience function for pushing metrics.
@ -738,7 +743,8 @@ func (r *Request) Stream() (io.ReadCloser, error) {
return nil, fmt.Errorf("%v while accessing %v", resp.Status, url)
}
if runtimeObject, err := runtime.Decode(r.content.Codec, bodyBytes); err == nil {
// TODO: Check ContentType.
if runtimeObject, err := runtime.Decode(r.serializers.Decoder, bodyBytes); err == nil {
statusError := errors.FromObject(runtimeObject)
if _, ok := statusError.(errors.APIStatus); ok {
@ -876,7 +882,7 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
// default groupVersion, otherwise a status response won't be correctly
// decoded.
status := &unversioned.Status{}
err := runtime.DecodeInto(r.content.Codec, body, status)
err := runtime.DecodeInto(r.serializers.Decoder, body, status)
if err == nil && len(status.Status) > 0 {
isStatusResponse = true
}
@ -898,11 +904,12 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
return Result{err: errors.FromObject(status)}
}
// TODO: Check ContentType.
return Result{
body: body,
contentType: resp.Header.Get("Content-Type"),
statusCode: resp.StatusCode,
decoder: r.content.Codec,
decoder: r.serializers.Decoder,
}
}

View File

@ -37,21 +37,22 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/intstr"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/watch"
watchjson "k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func TestNewRequestSetsAccept(t *testing.T) {
r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, nil, nil)
r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, Serializers{}, nil, nil)
if r.headers.Get("Accept") != "" {
t.Errorf("unexpected headers: %#v", r.headers)
}
r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, nil, nil)
r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, Serializers{}, nil, nil)
if r.headers.Get("Accept") != "application/other, */*" {
t.Errorf("unexpected headers: %#v", r.headers)
}
@ -242,6 +243,23 @@ type NotAnAPIObject struct{}
func (obj NotAnAPIObject) GroupVersionKind() *unversioned.GroupVersionKind { return nil }
func (obj NotAnAPIObject) SetGroupVersionKind(gvk *unversioned.GroupVersionKind) {}
func defaultContentConfig() ContentConfig {
return ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
}
}
func defaultSerializers() Serializers {
return Serializers{
Encoder: testapi.Default.Codec(),
Decoder: testapi.Default.Codec(),
StreamingSerializer: testapi.Default.Codec(),
Framer: runtime.DefaultFramer,
}
}
func TestRequestBody(t *testing.T) {
// test unknown type
r := (&Request{}).Body([]string{"test"})
@ -262,7 +280,7 @@ func TestRequestBody(t *testing.T) {
}
// test unencodable api object
r = (&Request{content: ContentConfig{Codec: testapi.Default.Codec()}}).Body(&NotAnAPIObject{})
r = (&Request{content: defaultContentConfig()}).Body(&NotAnAPIObject{})
if r.err == nil || r.body != nil {
t.Errorf("should have set err and left body nil: %#v", r)
}
@ -277,7 +295,7 @@ func TestResultIntoWithErrReturnsErr(t *testing.T) {
func TestURLTemplate(t *testing.T) {
uri, _ := url.Parse("http://localhost")
r := NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil)
r := NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, Serializers{}, nil, nil)
r.Prefix("pre1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0")
full := r.URL()
if full.String() != "http://localhost/pre1/namespaces/ns/r1/nm?p0=v0" {
@ -338,7 +356,7 @@ func TestTransformResponse(t *testing.T) {
{Response: &http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid},
}
for i, test := range testCases {
r := NewRequest(nil, "", uri, "", ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, nil, nil)
r := NewRequest(nil, "", uri, "", defaultContentConfig(), defaultSerializers(), nil, nil)
if test.Response.Body == nil {
test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
}
@ -425,7 +443,8 @@ func TestTransformUnstructuredError(t *testing.T) {
for _, testCase := range testCases {
r := &Request{
content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()},
content: defaultContentConfig(),
serializers: defaultSerializers(),
resourceName: testCase.Name,
resource: testCase.Resource,
}
@ -476,7 +495,8 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()},
content: defaultContentConfig(),
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusForbidden,
@ -492,7 +512,8 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()},
content: defaultContentConfig(),
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnauthorized,
@ -508,7 +529,8 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()},
content: defaultContentConfig(),
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnauthorized,
@ -620,8 +642,9 @@ func TestRequestStream(t *testing.T) {
})))),
}, nil
}),
content: ContentConfig{Codec: testapi.Default.Codec()},
baseURL: &url.URL{},
content: defaultContentConfig(),
serializers: defaultSerializers(),
baseURL: &url.URL{},
},
Err: true,
},
@ -1107,7 +1130,7 @@ func TestAbsPath(t *testing.T) {
{"/p1/api/p2", "/api/r1", "/api/", "/p1/api/p2/api/"},
} {
u, _ := url.Parse("http://localhost:123" + tc.configPrefix)
r := NewRequest(nil, "POST", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil).Prefix(tc.resourcePrefix).AbsPath(tc.absPath)
r := NewRequest(nil, "POST", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, Serializers{}, nil, nil).Prefix(tc.resourcePrefix).AbsPath(tc.absPath)
if r.pathPrefix != tc.wantsAbsPath {
t.Errorf("test case %d failed, unexpected path: %q, expected %q", i, r.pathPrefix, tc.wantsAbsPath)
}
@ -1127,7 +1150,7 @@ func TestUintParam(t *testing.T) {
for _, item := range table {
u, _ := url.Parse("http://localhost")
r := NewRequest(nil, "GET", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil).AbsPath("").UintParam(item.name, item.testVal)
r := NewRequest(nil, "GET", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, Serializers{}, nil, nil).AbsPath("").UintParam(item.name, item.testVal)
if e, a := item.expectStr, r.URL().String(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
@ -1233,7 +1256,7 @@ func TestWatch(t *testing.T) {
w.WriteHeader(http.StatusOK)
flusher.Flush()
encoder := watchjson.NewEncoder(w, testapi.Default.Codec())
encoder := versioned.NewEncoder(streaming.NewEncoder(w, testapi.Default.Codec()), testapi.Default.Codec())
for _, item := range table {
if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
panic(err)
@ -1308,5 +1331,9 @@ func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
}
}
versionedAPIPath := testapi.Default.ResourcePath("", "", "")
return NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil, nil)
client, err := NewRESTClient(baseURL, versionedAPIPath, defaultContentConfig(), 0, 0, nil, nil)
if err != nil {
t.Fatalf("failed to create a client: %v", err)
}
return client
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/version"
)
@ -213,7 +214,8 @@ func (d *DiscoveryClient) SwaggerSchema(version unversioned.GroupVersion) (*swag
func setDiscoveryDefaults(config *restclient.Config) error {
config.APIPath = ""
config.GroupVersion = nil
config.Codec = runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()}
codec := runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()}
config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, codec, runtime.DefaultFramer)
if len(config.UserAgent) == 0 {
config.UserAgent = restclient.DefaultKubernetesUserAgent()
}

View File

@ -26,11 +26,14 @@ import (
"net/url"
"strings"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/conversion/queryparams"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer"
serializerjson "k8s.io/kubernetes/pkg/runtime/serializer/json"
"k8s.io/kubernetes/pkg/watch"
)
@ -47,7 +50,9 @@ func NewClient(conf *restclient.Config) (*Client, error) {
confCopy := *conf
conf = &confCopy
conf.Codec = dynamicCodec{}
codec := dynamicCodec{}
legacyCodec := api.Codecs.LegacyCodec(v1.SchemeGroupVersion)
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, legacyCodec, serializerjson.Framer)
if conf.APIPath == "" {
conf.APIPath = "/api"

View File

@ -29,8 +29,9 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/watch"
watchjson "k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func getJSON(version, kind, name string) []byte {
@ -449,7 +450,7 @@ func TestWatch(t *testing.T) {
t.Errorf("Watch(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
}
enc := watchjson.NewEncoder(w, dynamicCodec{})
enc := versioned.NewEncoder(streaming.NewEncoder(w, dynamicCodec{}), dynamicCodec{})
for _, e := range tc.events {
enc.Encode(&e)
}

View File

@ -72,6 +72,7 @@ func setAppsDefaults(config *restclient.Config) error {
//}
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5
}

View File

@ -73,6 +73,7 @@ func setAutoscalingDefaults(config *restclient.Config) error {
//}
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5
}

View File

@ -73,6 +73,7 @@ func setBatchDefaults(config *restclient.Config) error {
//}
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5
}

View File

@ -127,6 +127,7 @@ func setExtensionsDefaults(config *restclient.Config) error {
//}
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
config.NegotiatedSerializer = api.Codecs
if config.QPS == 0 {
config.QPS = 5
}

View File

@ -70,7 +70,17 @@ func (c *RESTClient) Delete() *restclient.Request {
}
func (c *RESTClient) request(verb string) *restclient.Request {
return restclient.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: c.Codec}, nil, nil)
config := restclient.ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: c.Codec,
}
serializers := restclient.Serializers{
Encoder: c.Codec,
Decoder: c.Codec,
StreamingSerializer: c.Codec,
Framer: runtime.DefaultFramer,
}
return restclient.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", config, serializers, nil, nil)
}
func (c *RESTClient) Do(req *http.Request) (*http.Response, error) {

View File

@ -231,6 +231,9 @@ func SetKubernetesDefaults(config *restclient.Config) error {
// TODO: Unconditionally set the config.Version, until we fix the config.
copyGroupVersion := g.GroupVersion
config.GroupVersion = &copyGroupVersion
if config.NegotiatedSerializer == nil {
config.NegotiatedSerializer = api.Codecs
}
if config.Codec == nil {
config.Codec = api.Codecs.LegacyCodec(*config.GroupVersion)
}

View File

@ -40,8 +40,9 @@ func TestSetKubernetesDefaults(t *testing.T) {
restclient.Config{
APIPath: "/api",
ContentConfig: restclient.ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
},
QPS: 5,
Burst: 10,
@ -125,7 +126,7 @@ func TestHelperGetServerAPIVersions(t *testing.T) {
w.Write(output)
}))
defer server.Close()
got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, Codec: testapi.Default.Codec()}})
got, err := restclient.ServerAPIVersions(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "invalid version", Version: "one"}, NegotiatedSerializer: testapi.NegotiatedSerializer}})
if err != nil {
t.Fatalf("unexpected encoding error: %v", err)
}

View File

@ -30,6 +30,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
@ -212,7 +213,14 @@ func TestStream(t *testing.T) {
server := httptest.NewServer(fakeServer(t, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols))
url, _ := url.ParseRequestURI(server.URL)
c := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil, nil)
config := restclient.ContentConfig{
GroupVersion: &unversioned.GroupVersion{Group: "x"},
NegotiatedSerializer: testapi.NegotiatedSerializer,
}
c, err := restclient.NewRESTClient(url, "", config, -1, -1, nil, nil)
if err != nil {
t.Fatalf("failed to create a client: %v", err)
}
req := c.Post().Resource("testing")
if exec {

View File

@ -224,7 +224,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
// Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: "",
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
@ -266,7 +266,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
// This is a happy server just to record the PUT request we expect for status.Replicas
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: "",
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
@ -311,7 +311,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
// Setup a test server so we can lie about the current state of pods
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: "",
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
@ -574,7 +574,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
// This server should force a requeue of the controller because it fails to update status.Replicas.
fakeHandler := utiltesting.FakeHandler{
StatusCode: 500,
ResponseBody: "",
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()

View File

@ -35,9 +35,11 @@ import (
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/json"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func testData() (*api.PodList, *api.ServiceList, *api.ReplicationControllerList) {
@ -859,9 +861,9 @@ func TestWatchOnlyResource(t *testing.T) {
func watchBody(codec runtime.Codec, events []watch.Event) io.ReadCloser {
buf := bytes.NewBuffer([]byte{})
enc := json.NewEncoder(buf, codec)
enc := versioned.NewEncoder(streaming.NewEncoder(buf, codec), codec)
for i := range events {
enc.Encode(&events[i])
}
return ioutil.NopCloser(buf)
return json.Framer.NewFrameReader(ioutil.NopCloser(buf))
}

View File

@ -39,10 +39,11 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/watch"
watchjson "k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func stringBody(body string) io.ReadCloser {
@ -51,7 +52,8 @@ func stringBody(body string) io.ReadCloser {
func watchBody(events ...watch.Event) string {
buf := &bytes.Buffer{}
enc := watchjson.NewEncoder(buf, testapi.Default.Codec())
codec := testapi.Default.Codec()
enc := versioned.NewEncoder(streaming.NewEncoder(buf, codec), codec)
for _, e := range events {
enc.Encode(&e)
}

View File

@ -0,0 +1,58 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package serializer
import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
)
// TODO: We should figure out what happens when someone asks
// encoder for version and it conflicts with the raw serializer.
type negotiatedSerializerWrapper struct {
serializer runtime.Serializer
streamingSerializer runtime.Serializer
framer runtime.Framer
}
func NegotiatedSerializerWrapper(serializer, streamingSerializer runtime.Serializer, framer runtime.Framer) runtime.NegotiatedSerializer {
return &negotiatedSerializerWrapper{serializer, streamingSerializer, framer}
}
func (n *negotiatedSerializerWrapper) SupportedMediaTypes() []string {
return []string{}
}
func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
return n.serializer, true
}
func (n *negotiatedSerializerWrapper) SupportedStreamingMediaTypes() []string {
return []string{}
}
func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) {
return n.streamingSerializer, n.framer, "", true
}
func (n *negotiatedSerializerWrapper) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return n.serializer
}
func (n *negotiatedSerializerWrapper) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return n.serializer
}

View File

@ -145,6 +145,7 @@ func (r *jsonFrameReader) Read(data []byte) (int, error) {
// RawMessage#Unmarshal appends to data - we reset the slice down to 0 and will either see
// data written to data, or be larger than data and a different array.
n := len(data)
m := json.RawMessage(data[:0])
if err := r.decoder.Decode(&m); err != nil {
return 0, err
@ -153,7 +154,7 @@ func (r *jsonFrameReader) Read(data []byte) (int, error) {
// If capacity of data is less than length of the message, decoder will allocate a new slice
// and set m to it, which means we need to copy the partial result back into data and preserve
// the remaining result for subsequent reads.
if n := cap(data); len(m) > n {
if len(m) > n {
data = append(data[0:0], m[:n]...)
r.remaining = m[n:]
return n, io.ErrShortBuffer

View File

@ -63,6 +63,7 @@ func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Requ
}
f.RequestReceived = request
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(f.StatusCode)
response.Write([]byte(f.ResponseBody))

View File

@ -42,9 +42,9 @@ type Decoder interface {
// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type StreamWatcher struct {
source Decoder
result chan Event
sync.Mutex
source Decoder
result chan Event
stopped bool
}

View File

@ -14,56 +14,58 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package json
package versioned
import (
"encoding/json"
"fmt"
"io"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/watch"
)
// Decoder implements the watch.Decoder interface for io.ReadClosers that
// have contents which consist of a series of watchEvent objects encoded via JSON.
// It will decode any object registered in the supplied codec.
// have contents which consist of a series of watchEvent objects encoded
// with the given streaming decoder. The internal objects will be then
// decoded by the embedded decoder.
type Decoder struct {
r io.ReadCloser
decoder *json.Decoder
codec runtime.Codec
decoder streaming.Decoder
embeddedDecoder runtime.Decoder
}
// NewDecoder creates an Decoder for the given writer and codec.
func NewDecoder(r io.ReadCloser, codec runtime.Codec) *Decoder {
func NewDecoder(decoder streaming.Decoder, embeddedDecoder runtime.Decoder) *Decoder {
return &Decoder{
r: r,
decoder: json.NewDecoder(r),
codec: codec,
decoder: decoder,
embeddedDecoder: embeddedDecoder,
}
}
// Decode blocks until it can return the next object in the writer. Returns an error
// if the writer is closed or an object can't be decoded.
// Decode blocks until it can return the next object in the reader. Returns an error
// if the reader is closed or an object can't be decoded.
func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
var got WatchEvent
if err := d.decoder.Decode(&got); err != nil {
var got Event
res, _, err := d.decoder.Decode(nil, &got)
if err != nil {
return "", nil, err
}
if res != &got {
return "", nil, fmt.Errorf("unable to decode to versioned.Event")
}
switch got.Type {
case watch.Added, watch.Modified, watch.Deleted, watch.Error:
case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error):
default:
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}
obj, err := runtime.Decode(d.codec, got.Object.Raw)
obj, err := runtime.Decode(d.embeddedDecoder, got.Object.Raw)
if err != nil {
return "", nil, fmt.Errorf("unable to decode watch event: %v", err)
}
return got.Type, obj, nil
return watch.EventType(got.Type), obj, nil
}
// Close closes the underlying r.
func (d *Decoder) Close() {
d.r.Close()
d.decoder.Close()
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package json
package versioned_test
import (
"encoding/json"
@ -25,8 +25,10 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func TestDecoder(t *testing.T) {
@ -34,7 +36,8 @@ func TestDecoder(t *testing.T) {
for _, eventType := range table {
out, in := io.Pipe()
decoder := NewDecoder(out, testapi.Default.Codec())
codec := testapi.Default.Codec()
decoder := versioned.NewDecoder(streaming.NewDecoder(out, codec), codec)
expect := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
encoder := json.NewEncoder(in)
@ -43,7 +46,11 @@ func TestDecoder(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if err := encoder.Encode(&WatchEvent{eventType, runtime.RawExtension{Raw: json.RawMessage(data)}}); err != nil {
event := versioned.Event{
Type: string(eventType),
Object: runtime.RawExtension{Raw: json.RawMessage(data)},
}
if err := encoder.Encode(&event); err != nil {
t.Errorf("Unexpected error %v", err)
}
in.Close()
@ -82,7 +89,8 @@ func TestDecoder(t *testing.T) {
func TestDecoder_SourceClose(t *testing.T) {
out, in := io.Pipe()
decoder := NewDecoder(out, testapi.Default.Codec())
codec := testapi.Default.Codec()
decoder := versioned.NewDecoder(streaming.NewDecoder(out, codec), codec)
done := make(chan struct{})

View File

@ -14,40 +14,38 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package json
package versioned
import (
"encoding/json"
"io"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/watch"
)
// Encoder implements the json.Encoder interface for io.Writers that
// should serialize WatchEvent objects into JSON. It will encode any object
// registered in the supplied codec and return an error otherwies.
// Encoder serializes watch.Events into io.Writer. The internal objects
// are encoded using embedded encoder, and the outer Event is serialized
// using encoder.
type Encoder struct {
w io.Writer
encoder *json.Encoder
codec runtime.Encoder
encoder streaming.Encoder
embeddedEncoder runtime.Encoder
}
// NewEncoder creates an Encoder for the given writer and codec
func NewEncoder(w io.Writer, codec runtime.Encoder) *Encoder {
func NewEncoder(encoder streaming.Encoder, embeddedEncoder runtime.Encoder) *Encoder {
return &Encoder{
w: w,
encoder: json.NewEncoder(w),
codec: codec,
encoder: encoder,
embeddedEncoder: embeddedEncoder,
}
}
// Encode writes an event to the writer. Returns an error
// if the writer is closed or an object can't be encoded.
func (e *Encoder) Encode(event *watch.Event) error {
obj, err := Object(e.codec, event)
data, err := runtime.Encode(e.embeddedEncoder, event.Object)
if err != nil {
return err
}
return e.encoder.Encode(obj)
// FIXME: get rid of json.RawMessage.
return e.encoder.Encode(&Event{string(event.Type), runtime.RawExtension{Raw: json.RawMessage(data)}})
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package json
package versioned_test
import (
"bytes"
@ -24,7 +24,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func TestEncodeDecodeRoundTrip(t *testing.T) {
@ -52,13 +54,15 @@ func TestEncodeDecodeRoundTrip(t *testing.T) {
for i, testCase := range testCases {
buf := &bytes.Buffer{}
encoder := NewEncoder(buf, testCase.Codec)
codec := testCase.Codec
encoder := versioned.NewEncoder(streaming.NewEncoder(buf, codec), codec)
if err := encoder.Encode(&watch.Event{Type: testCase.Type, Object: testCase.Object}); err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
decoder := NewDecoder(ioutil.NopCloser(buf), testCase.Codec)
rc := ioutil.NopCloser(buf)
decoder := versioned.NewDecoder(streaming.NewDecoder(rc, codec), codec)
event, obj, err := decoder.Decode()
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/runtime"
runtimeserializer "k8s.io/kubernetes/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/runtime/serializer/json"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
@ -86,7 +87,8 @@ func New(kubeConfigFile string) (*WebhookAuthorizer, error) {
return nil, err
}
serializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), false)
clientConfig.ContentConfig.Codec = versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions)
codec := versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions)
clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper(codec, codec, json.Framer)
restClient, err := restclient.UnversionedRESTClientFor(clientConfig)
if err != nil {