Use NegotiatedSerializer in client

This commit is contained in:
Wojciech Tyczynski 2016-04-26 09:05:40 +02:00
parent b4c83022e3
commit 3aadafd411
23 changed files with 280 additions and 100 deletions

View File

@ -89,6 +89,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
mux := http.NewServeMux()
podListHandler := func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
pods := mockPodListWatch.Pods()
w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), &pods)))
@ -106,6 +107,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
ts.stats[name] = ts.stats[name] + 1
p := mockPodListWatch.Pod(name)
w.Header().Set("Content-Type", "application/json")
if p != nil {
w.WriteHeader(http.StatusOK)
w.Write([]byte(runtime.EncodeOrDie(testapi.Default.Codec(), p)))
@ -117,6 +119,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
mux.HandleFunc(
testapi.Default.ResourcePath("events", namespace, ""),
func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
},
)
@ -125,6 +128,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
testapi.Default.ResourcePath("nodes", "", ""),
func(w http.ResponseWriter, r *http.Request) {
var node api.Node
w.Header().Set("Content-Type", "application/json")
if err := json.NewDecoder(r.Body).Decode(&node); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
@ -144,6 +148,7 @@ func NewTestServer(t *testing.T, namespace string, mockPodListWatch *MockPodsLis
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
res.Header().Set("Content-Type", "application/json")
res.WriteHeader(http.StatusNotFound)
})

View File

@ -17,6 +17,7 @@ limitations under the License.
package restclient
import (
"fmt"
"net/http"
"net/url"
"os"
@ -53,6 +54,9 @@ type RESTClient struct {
// contentConfig is the information used to communicate with the server.
contentConfig ContentConfig
// serializers contain all serializers for undelying content type.
serializers Serializers
// TODO extract this into a wrapper interface via the RESTClient interface in kubectl.
Throttle flowcontrol.RateLimiter
@ -60,6 +64,13 @@ type RESTClient struct {
Client *http.Client
}
type Serializers struct {
Encoder runtime.Encoder
Decoder runtime.Decoder
StreamingSerializer runtime.Serializer
Framer runtime.Framer
}
// NewRESTClient creates a new RESTClient. This client performs generic REST functions
// such as Get, Put, Post, and Delete on specified paths. Codec controls encoding and
// decoding of responses from the server.
@ -77,6 +88,10 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
serializers, err := createSerializers(config)
if err != nil {
return nil, err
}
var throttle flowcontrol.RateLimiter
if maxQPS > 0 && rateLimiter == nil {
@ -88,6 +103,7 @@ func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ContentConf
base: &base,
versionedAPIPath: versionedAPIPath,
contentConfig: config,
serializers: *serializers,
Throttle: throttle,
Client: client,
}, nil
@ -119,6 +135,33 @@ func readExpBackoffConfig() BackoffManager {
time.Duration(backoffDurationInt)*time.Second)}
}
// createSerializers creates all necessary serializers for given contentType.
func createSerializers(config ContentConfig) (*Serializers, error) {
negotiated := config.NegotiatedSerializer
contentType := config.ContentType
serializer, ok := negotiated.SerializerForMediaType(contentType, nil)
if !ok {
return nil, fmt.Errorf("serializer for %s not registered", contentType)
}
streamingSerializer, framer, _, ok := negotiated.StreamingSerializerForMediaType(contentType, nil)
if !ok {
return nil, fmt.Errorf("streaming serializer for %s not registered", contentType)
}
if framer == nil {
return nil, fmt.Errorf("no framer for %s", contentType)
}
internalGV := unversioned.GroupVersion{
Group: config.GroupVersion.Group,
Version: runtime.APIVersionInternal,
}
return &Serializers{
Encoder: negotiated.EncoderForVersion(serializer, *config.GroupVersion),
Decoder: negotiated.DecoderToVersion(serializer, internalGV),
StreamingSerializer: streamingSerializer,
Framer: framer,
}, nil
}
// Verb begins a request with a verb (GET, POST, PUT, DELETE).
//
// Example usage of RESTClient's request building interface:
@ -136,9 +179,9 @@ func (c *RESTClient) Verb(verb string) *Request {
backoff := readExpBackoffConfig()
if c.Client == nil {
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle)
return NewRequest(nil, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle)
}
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, backoff, c.Throttle)
return NewRequest(c.Client, verb, c.base, c.versionedAPIPath, c.contentConfig, c.serializers, backoff, c.Throttle)
}
// Post begins a POST request. Short for c.Verb("POST").

View File

@ -46,8 +46,8 @@ func TestDoRequestSuccess(t *testing.T) {
c, err := RESTClientFor(&Config{
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
},
Username: "user",
Password: "pass",
@ -91,8 +91,8 @@ func TestDoRequestFailed(t *testing.T) {
c, err := RESTClientFor(&Config{
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
},
})
if err != nil {
@ -129,8 +129,8 @@ func TestDoRequestCreated(t *testing.T) {
c, err := RESTClientFor(&Config{
Host: testServer.URL,
ContentConfig: ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
GroupVersion: testapi.Default.GroupVersion(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
},
Username: "user",
Password: "pass",

View File

@ -139,6 +139,8 @@ type ContentConfig struct {
// when initializing a Client.
//
// DEPRECATED: Please use NegotiatedSerializer instead.
// Codec is currently used only in some tests and will be removed soon.
// All production setups should use NegotiatedSerializer.
Codec runtime.Codec
}

View File

@ -39,11 +39,12 @@ import (
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/watch"
watchjson "k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
)
var (
@ -90,8 +91,9 @@ type Request struct {
client HTTPClient
verb string
baseURL *url.URL
content ContentConfig
baseURL *url.URL
content ContentConfig
serializers Serializers
// generic components accessible via method setters
pathPrefix string
@ -121,7 +123,7 @@ type Request struct {
}
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request {
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPath string, content ContentConfig, serializers Serializers, backoff BackoffManager, throttle flowcontrol.RateLimiter) *Request {
if backoff == nil {
glog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}
@ -132,13 +134,14 @@ func NewRequest(client HTTPClient, verb string, baseURL *url.URL, versionedAPIPa
pathPrefix = path.Join(pathPrefix, baseURL.Path)
}
r := &Request{
client: client,
verb: verb,
baseURL: baseURL,
pathPrefix: path.Join(pathPrefix, versionedAPIPath),
content: content,
backoffMgr: backoff,
throttle: throttle,
client: client,
verb: verb,
baseURL: baseURL,
pathPrefix: path.Join(pathPrefix, versionedAPIPath),
content: content,
serializers: serializers,
backoffMgr: backoff,
throttle: throttle,
}
if len(content.ContentType) > 0 {
r.SetHeader("Accept", content.ContentType+", */*")
@ -547,7 +550,7 @@ func (r *Request) Body(obj interface{}) *Request {
if reflect.ValueOf(t).IsNil() {
return r
}
data, err := runtime.Encode(r.content.Codec, t)
data, err := runtime.Encode(r.serializers.Encoder, t)
if err != nil {
r.err = err
return r
@ -670,7 +673,9 @@ func (r *Request) Watch() (watch.Interface, error) {
}
return nil, fmt.Errorf("for request '%+v', got status: %v", url, resp.StatusCode)
}
return watch.NewStreamWatcher(watchjson.NewDecoder(resp.Body, r.content.Codec)), nil
framer := r.serializers.Framer.NewFrameReader(resp.Body)
decoder := streaming.NewDecoder(framer, r.serializers.StreamingSerializer)
return watch.NewStreamWatcher(versioned.NewDecoder(decoder, r.serializers.Decoder)), nil
}
// updateURLMetrics is a convenience function for pushing metrics.
@ -738,7 +743,8 @@ func (r *Request) Stream() (io.ReadCloser, error) {
return nil, fmt.Errorf("%v while accessing %v", resp.Status, url)
}
if runtimeObject, err := runtime.Decode(r.content.Codec, bodyBytes); err == nil {
// TODO: Check ContentType.
if runtimeObject, err := runtime.Decode(r.serializers.Decoder, bodyBytes); err == nil {
statusError := errors.FromObject(runtimeObject)
if _, ok := statusError.(errors.APIStatus); ok {
@ -876,7 +882,7 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
// default groupVersion, otherwise a status response won't be correctly
// decoded.
status := &unversioned.Status{}
err := runtime.DecodeInto(r.content.Codec, body, status)
err := runtime.DecodeInto(r.serializers.Decoder, body, status)
if err == nil && len(status.Status) > 0 {
isStatusResponse = true
}
@ -898,11 +904,12 @@ func (r *Request) transformResponse(resp *http.Response, req *http.Request) Resu
return Result{err: errors.FromObject(status)}
}
// TODO: Check ContentType.
return Result{
body: body,
contentType: resp.Header.Get("Content-Type"),
statusCode: resp.StatusCode,
decoder: r.content.Codec,
decoder: r.serializers.Decoder,
}
}

View File

@ -37,21 +37,22 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/flowcontrol"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/intstr"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/watch"
watchjson "k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func TestNewRequestSetsAccept(t *testing.T) {
r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, nil, nil)
r := NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{}, Serializers{}, nil, nil)
if r.headers.Get("Accept") != "" {
t.Errorf("unexpected headers: %#v", r.headers)
}
r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, nil, nil)
r = NewRequest(nil, "get", &url.URL{Path: "/path/"}, "", ContentConfig{ContentType: "application/other"}, Serializers{}, nil, nil)
if r.headers.Get("Accept") != "application/other, */*" {
t.Errorf("unexpected headers: %#v", r.headers)
}
@ -242,6 +243,23 @@ type NotAnAPIObject struct{}
func (obj NotAnAPIObject) GroupVersionKind() *unversioned.GroupVersionKind { return nil }
func (obj NotAnAPIObject) SetGroupVersionKind(gvk *unversioned.GroupVersionKind) {}
func defaultContentConfig() ContentConfig {
return ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: testapi.Default.Codec(),
NegotiatedSerializer: testapi.NegotiatedSerializer,
}
}
func defaultSerializers() Serializers {
return Serializers{
Encoder: testapi.Default.Codec(),
Decoder: testapi.Default.Codec(),
StreamingSerializer: testapi.Default.Codec(),
Framer: runtime.DefaultFramer,
}
}
func TestRequestBody(t *testing.T) {
// test unknown type
r := (&Request{}).Body([]string{"test"})
@ -262,7 +280,7 @@ func TestRequestBody(t *testing.T) {
}
// test unencodable api object
r = (&Request{content: ContentConfig{Codec: testapi.Default.Codec()}}).Body(&NotAnAPIObject{})
r = (&Request{content: defaultContentConfig()}).Body(&NotAnAPIObject{})
if r.err == nil || r.body != nil {
t.Errorf("should have set err and left body nil: %#v", r)
}
@ -277,7 +295,7 @@ func TestResultIntoWithErrReturnsErr(t *testing.T) {
func TestURLTemplate(t *testing.T) {
uri, _ := url.Parse("http://localhost")
r := NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil)
r := NewRequest(nil, "POST", uri, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, Serializers{}, nil, nil)
r.Prefix("pre1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0")
full := r.URL()
if full.String() != "http://localhost/pre1/namespaces/ns/r1/nm?p0=v0" {
@ -338,7 +356,7 @@ func TestTransformResponse(t *testing.T) {
{Response: &http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid},
}
for i, test := range testCases {
r := NewRequest(nil, "", uri, "", ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, nil, nil)
r := NewRequest(nil, "", uri, "", defaultContentConfig(), defaultSerializers(), nil, nil)
if test.Response.Body == nil {
test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
}
@ -425,7 +443,8 @@ func TestTransformUnstructuredError(t *testing.T) {
for _, testCase := range testCases {
r := &Request{
content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()},
content: defaultContentConfig(),
serializers: defaultSerializers(),
resourceName: testCase.Name,
resource: testCase.Resource,
}
@ -476,7 +495,8 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()},
content: defaultContentConfig(),
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusForbidden,
@ -492,7 +512,8 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()},
content: defaultContentConfig(),
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnauthorized,
@ -508,7 +529,8 @@ func TestRequestWatch(t *testing.T) {
},
{
Request: &Request{
content: ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()},
content: defaultContentConfig(),
serializers: defaultSerializers(),
client: clientFunc(func(req *http.Request) (*http.Response, error) {
return &http.Response{
StatusCode: http.StatusUnauthorized,
@ -620,8 +642,9 @@ func TestRequestStream(t *testing.T) {
})))),
}, nil
}),
content: ContentConfig{Codec: testapi.Default.Codec()},
baseURL: &url.URL{},
content: defaultContentConfig(),
serializers: defaultSerializers(),
baseURL: &url.URL{},
},
Err: true,
},
@ -1107,7 +1130,7 @@ func TestAbsPath(t *testing.T) {
{"/p1/api/p2", "/api/r1", "/api/", "/p1/api/p2/api/"},
} {
u, _ := url.Parse("http://localhost:123" + tc.configPrefix)
r := NewRequest(nil, "POST", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil).Prefix(tc.resourcePrefix).AbsPath(tc.absPath)
r := NewRequest(nil, "POST", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, Serializers{}, nil, nil).Prefix(tc.resourcePrefix).AbsPath(tc.absPath)
if r.pathPrefix != tc.wantsAbsPath {
t.Errorf("test case %d failed, unexpected path: %q, expected %q", i, r.pathPrefix, tc.wantsAbsPath)
}
@ -1127,7 +1150,7 @@ func TestUintParam(t *testing.T) {
for _, item := range table {
u, _ := url.Parse("http://localhost")
r := NewRequest(nil, "GET", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, nil, nil).AbsPath("").UintParam(item.name, item.testVal)
r := NewRequest(nil, "GET", u, "", ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "test"}}, Serializers{}, nil, nil).AbsPath("").UintParam(item.name, item.testVal)
if e, a := item.expectStr, r.URL().String(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
@ -1233,7 +1256,7 @@ func TestWatch(t *testing.T) {
w.WriteHeader(http.StatusOK)
flusher.Flush()
encoder := watchjson.NewEncoder(w, testapi.Default.Codec())
encoder := versioned.NewEncoder(streaming.NewEncoder(w, testapi.Default.Codec()), testapi.Default.Codec())
for _, item := range table {
if err := encoder.Encode(&watch.Event{Type: item.t, Object: item.obj}); err != nil {
panic(err)
@ -1308,7 +1331,7 @@ func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
}
}
versionedAPIPath := testapi.Default.ResourcePath("", "", "")
client, err := NewRESTClient(baseURL, versionedAPIPath, ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: testapi.Default.Codec()}, 0, 0, nil, nil)
client, err := NewRESTClient(baseURL, versionedAPIPath, defaultContentConfig(), 0, 0, nil, nil)
if err != nil {
t.Fatalf("failed to create a client: %v", err)
}

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/version"
)
@ -213,7 +214,8 @@ func (d *DiscoveryClient) SwaggerSchema(version unversioned.GroupVersion) (*swag
func setDiscoveryDefaults(config *restclient.Config) error {
config.APIPath = ""
config.GroupVersion = nil
config.Codec = runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()}
codec := runtime.NoopEncoder{Decoder: api.Codecs.UniversalDecoder()}
config.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, codec, runtime.DefaultFramer)
if len(config.UserAgent) == 0 {
config.UserAgent = restclient.DefaultKubernetesUserAgent()
}

View File

@ -26,11 +26,14 @@ import (
"net/url"
"strings"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/conversion/queryparams"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer"
serializerjson "k8s.io/kubernetes/pkg/runtime/serializer/json"
"k8s.io/kubernetes/pkg/watch"
)
@ -47,7 +50,9 @@ func NewClient(conf *restclient.Config) (*Client, error) {
confCopy := *conf
conf = &confCopy
conf.Codec = dynamicCodec{}
codec := dynamicCodec{}
legacyCodec := api.Codecs.LegacyCodec(v1.SchemeGroupVersion)
conf.NegotiatedSerializer = serializer.NegotiatedSerializerWrapper(codec, legacyCodec, serializerjson.Framer)
if conf.APIPath == "" {
conf.APIPath = "/api"

View File

@ -29,8 +29,9 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/watch"
watchjson "k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func getJSON(version, kind, name string) []byte {
@ -454,7 +455,7 @@ func TestWatch(t *testing.T) {
t.Errorf("Watch(%q) got path %s. wanted %s", tc.name, r.URL.Path, tc.path)
}
enc := watchjson.NewEncoder(w, dynamicCodec{})
enc := versioned.NewEncoder(streaming.NewEncoder(w, dynamicCodec{}), dynamicCodec{})
for _, e := range tc.events {
enc.Encode(&e)
}

View File

@ -70,7 +70,17 @@ func (c *RESTClient) Delete() *restclient.Request {
}
func (c *RESTClient) request(verb string) *restclient.Request {
return restclient.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion(), Codec: c.Codec}, nil, nil)
config := restclient.ContentConfig{
GroupVersion: testapi.Default.GroupVersion(),
Codec: c.Codec,
}
serializers := restclient.Serializers{
Encoder: c.Codec,
Decoder: c.Codec,
StreamingSerializer: c.Codec,
Framer: runtime.DefaultFramer,
}
return restclient.NewRequest(c, verb, &url.URL{Host: "localhost"}, "", config, serializers, nil, nil)
}
func (c *RESTClient) Do(req *http.Request) (*http.Response, error) {

View File

@ -30,6 +30,7 @@ import (
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
@ -212,7 +213,11 @@ func TestStream(t *testing.T) {
server := httptest.NewServer(fakeServer(t, name, exec, testCase.Stdin, testCase.Stdout, testCase.Stderr, testCase.Error, testCase.Tty, testCase.MessageCount, testCase.ServerProtocols))
url, _ := url.ParseRequestURI(server.URL)
c, err := restclient.NewRESTClient(url, "", restclient.ContentConfig{GroupVersion: &unversioned.GroupVersion{Group: "x"}}, -1, -1, nil, nil)
config := restclient.ContentConfig{
GroupVersion: &unversioned.GroupVersion{Group: "x"},
NegotiatedSerializer: testapi.NegotiatedSerializer,
}
c, err := restclient.NewRESTClient(url, "", config, -1, -1, nil, nil)
if err != nil {
t.Fatalf("failed to create a client: %v", err)
}

View File

@ -224,7 +224,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
// Setup a fake server to listen for requests, and run the ReplicaSet controller in steady state
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: "",
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
@ -266,7 +266,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
// This is a happy server just to record the PUT request we expect for status.Replicas
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: "",
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
@ -311,7 +311,7 @@ func TestSyncReplicaSetDormancy(t *testing.T) {
// Setup a test server so we can lie about the current state of pods
fakeHandler := utiltesting.FakeHandler{
StatusCode: 200,
ResponseBody: "",
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
@ -574,7 +574,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
// This server should force a requeue of the controller because it fails to update status.Replicas.
fakeHandler := utiltesting.FakeHandler{
StatusCode: 500,
ResponseBody: "",
ResponseBody: "{}",
}
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()

View File

@ -35,9 +35,11 @@ import (
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/json"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/util/diff"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func testData() (*api.PodList, *api.ServiceList, *api.ReplicationControllerList) {
@ -859,9 +861,9 @@ func TestWatchOnlyResource(t *testing.T) {
func watchBody(codec runtime.Codec, events []watch.Event) io.ReadCloser {
buf := bytes.NewBuffer([]byte{})
enc := json.NewEncoder(buf, codec)
enc := versioned.NewEncoder(streaming.NewEncoder(buf, codec), codec)
for i := range events {
enc.Encode(&events[i])
}
return ioutil.NopCloser(buf)
return json.Framer.NewFrameReader(ioutil.NopCloser(buf))
}

View File

@ -39,10 +39,11 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/unversioned/fake"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/watch"
watchjson "k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func stringBody(body string) io.ReadCloser {
@ -51,7 +52,8 @@ func stringBody(body string) io.ReadCloser {
func watchBody(events ...watch.Event) string {
buf := &bytes.Buffer{}
enc := watchjson.NewEncoder(buf, testapi.Default.Codec())
codec := testapi.Default.Codec()
enc := versioned.NewEncoder(streaming.NewEncoder(buf, codec), codec)
for _, e := range events {
enc.Encode(&e)
}

View File

@ -0,0 +1,58 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package serializer
import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
)
// TODO: We should figure out what happens when someone asks
// encoder for version and it conflicts with the raw serializer.
type negotiatedSerializerWrapper struct {
serializer runtime.Serializer
streamingSerializer runtime.Serializer
framer runtime.Framer
}
func NegotiatedSerializerWrapper(serializer, streamingSerializer runtime.Serializer, framer runtime.Framer) runtime.NegotiatedSerializer {
return &negotiatedSerializerWrapper{serializer, streamingSerializer, framer}
}
func (n *negotiatedSerializerWrapper) SupportedMediaTypes() []string {
return []string{}
}
func (n *negotiatedSerializerWrapper) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) {
return n.serializer, true
}
func (n *negotiatedSerializerWrapper) SupportedStreamingMediaTypes() []string {
return []string{}
}
func (n *negotiatedSerializerWrapper) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) {
return n.streamingSerializer, n.framer, "", true
}
func (n *negotiatedSerializerWrapper) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder {
return n.serializer
}
func (n *negotiatedSerializerWrapper) DecoderToVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Decoder {
return n.serializer
}

View File

@ -63,6 +63,7 @@ func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Requ
}
f.RequestReceived = request
response.Header().Set("Content-Type", "application/json")
response.WriteHeader(f.StatusCode)
response.Write([]byte(f.ResponseBody))

View File

@ -42,9 +42,9 @@ type Decoder interface {
// StreamWatcher turns any stream for which you can write a Decoder interface
// into a watch.Interface.
type StreamWatcher struct {
source Decoder
result chan Event
sync.Mutex
source Decoder
result chan Event
stopped bool
}

View File

@ -14,56 +14,58 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package json
package versioned
import (
"encoding/json"
"fmt"
"io"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/watch"
)
// Decoder implements the watch.Decoder interface for io.ReadClosers that
// have contents which consist of a series of watchEvent objects encoded via JSON.
// It will decode any object registered in the supplied codec.
// have contents which consist of a series of watchEvent objects encoded
// with the given streaming decoder. The internal objects will be then
// decoded by the embedded decoder.
type Decoder struct {
r io.ReadCloser
decoder *json.Decoder
codec runtime.Codec
decoder streaming.Decoder
embeddedDecoder runtime.Decoder
}
// NewDecoder creates an Decoder for the given writer and codec.
func NewDecoder(r io.ReadCloser, codec runtime.Codec) *Decoder {
func NewDecoder(decoder streaming.Decoder, embeddedDecoder runtime.Decoder) *Decoder {
return &Decoder{
r: r,
decoder: json.NewDecoder(r),
codec: codec,
decoder: decoder,
embeddedDecoder: embeddedDecoder,
}
}
// Decode blocks until it can return the next object in the writer. Returns an error
// if the writer is closed or an object can't be decoded.
// Decode blocks until it can return the next object in the reader. Returns an error
// if the reader is closed or an object can't be decoded.
func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
var got WatchEvent
if err := d.decoder.Decode(&got); err != nil {
var got Event
res, _, err := d.decoder.Decode(nil, &got)
if err != nil {
return "", nil, err
}
if res != &got {
return "", nil, fmt.Errorf("unable to decode to versioned.Event")
}
switch got.Type {
case watch.Added, watch.Modified, watch.Deleted, watch.Error:
case string(watch.Added), string(watch.Modified), string(watch.Deleted), string(watch.Error):
default:
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
}
obj, err := runtime.Decode(d.codec, got.Object.Raw)
obj, err := runtime.Decode(d.embeddedDecoder, got.Object.Raw)
if err != nil {
return "", nil, fmt.Errorf("unable to decode watch event: %v", err)
}
return got.Type, obj, nil
return watch.EventType(got.Type), obj, nil
}
// Close closes the underlying r.
func (d *Decoder) Close() {
d.r.Close()
d.decoder.Close()
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package json
package versioned_test
import (
"encoding/json"
@ -25,8 +25,10 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func TestDecoder(t *testing.T) {
@ -34,7 +36,8 @@ func TestDecoder(t *testing.T) {
for _, eventType := range table {
out, in := io.Pipe()
decoder := NewDecoder(out, testapi.Default.Codec())
codec := testapi.Default.Codec()
decoder := versioned.NewDecoder(streaming.NewDecoder(out, codec), codec)
expect := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}
encoder := json.NewEncoder(in)
@ -43,7 +46,11 @@ func TestDecoder(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error %v", err)
}
if err := encoder.Encode(&WatchEvent{eventType, runtime.RawExtension{Raw: json.RawMessage(data)}}); err != nil {
event := versioned.Event{
Type: string(eventType),
Object: runtime.RawExtension{Raw: json.RawMessage(data)},
}
if err := encoder.Encode(&event); err != nil {
t.Errorf("Unexpected error %v", err)
}
in.Close()
@ -82,7 +89,8 @@ func TestDecoder(t *testing.T) {
func TestDecoder_SourceClose(t *testing.T) {
out, in := io.Pipe()
decoder := NewDecoder(out, testapi.Default.Codec())
codec := testapi.Default.Codec()
decoder := versioned.NewDecoder(streaming.NewDecoder(out, codec), codec)
done := make(chan struct{})

View File

@ -14,40 +14,38 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package json
package versioned
import (
"encoding/json"
"io"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/watch"
)
// Encoder implements the json.Encoder interface for io.Writers that
// should serialize WatchEvent objects into JSON. It will encode any object
// registered in the supplied codec and return an error otherwies.
// Encoder serializes watch.Events into io.Writer. The internal objects
// are encoded using embedded encoder, and the outer Event is serialized
// using encoder.
type Encoder struct {
w io.Writer
encoder *json.Encoder
codec runtime.Encoder
encoder streaming.Encoder
embeddedEncoder runtime.Encoder
}
// NewEncoder creates an Encoder for the given writer and codec
func NewEncoder(w io.Writer, codec runtime.Encoder) *Encoder {
func NewEncoder(encoder streaming.Encoder, embeddedEncoder runtime.Encoder) *Encoder {
return &Encoder{
w: w,
encoder: json.NewEncoder(w),
codec: codec,
encoder: encoder,
embeddedEncoder: embeddedEncoder,
}
}
// Encode writes an event to the writer. Returns an error
// if the writer is closed or an object can't be encoded.
func (e *Encoder) Encode(event *watch.Event) error {
obj, err := Object(e.codec, event)
data, err := runtime.Encode(e.embeddedEncoder, event.Object)
if err != nil {
return err
}
return e.encoder.Encode(obj)
// FIXME: get rid of json.RawMessage.
return e.encoder.Encode(&Event{string(event.Type), runtime.RawExtension{Raw: json.RawMessage(data)}})
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package json
package versioned_test
import (
"bytes"
@ -24,7 +24,9 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/watch/versioned"
)
func TestEncodeDecodeRoundTrip(t *testing.T) {
@ -52,13 +54,15 @@ func TestEncodeDecodeRoundTrip(t *testing.T) {
for i, testCase := range testCases {
buf := &bytes.Buffer{}
encoder := NewEncoder(buf, testCase.Codec)
codec := testCase.Codec
encoder := versioned.NewEncoder(streaming.NewEncoder(buf, codec), codec)
if err := encoder.Encode(&watch.Event{Type: testCase.Type, Object: testCase.Object}); err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
decoder := NewDecoder(ioutil.NopCloser(buf), testCase.Codec)
rc := ioutil.NopCloser(buf)
decoder := versioned.NewDecoder(streaming.NewDecoder(rc, codec), codec)
event, obj, err := decoder.Decode()
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/runtime"
runtimeserializer "k8s.io/kubernetes/pkg/runtime/serializer"
"k8s.io/kubernetes/pkg/runtime/serializer/json"
"k8s.io/kubernetes/pkg/runtime/serializer/versioning"
@ -86,7 +87,8 @@ func New(kubeConfigFile string) (*WebhookAuthorizer, error) {
return nil, err
}
serializer := json.NewSerializer(json.DefaultMetaFactory, api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), false)
clientConfig.ContentConfig.Codec = versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions)
codec := versioning.NewCodecForScheme(api.Scheme, serializer, serializer, encodeVersions, decodeVersions)
clientConfig.ContentConfig.NegotiatedSerializer = runtimeserializer.NegotiatedSerializerWrapper(codec, codec, json.Framer)
restClient, err := restclient.UnversionedRESTClientFor(clientConfig)
if err != nil {