diff --git a/examples/apiserver/apiserver.go b/examples/apiserver/apiserver.go index 5e805714188..52e894716a4 100644 --- a/examples/apiserver/apiserver.go +++ b/examples/apiserver/apiserver.go @@ -94,9 +94,8 @@ func Run(serverOptions *genericapiserver.ServerRunOptions) error { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ groupVersion.Version: restStorageMap, }, - Scheme: api.Scheme, - NegotiatedSerializer: api.Codecs, - NegotiatedStreamSerializer: api.StreamCodecs, + Scheme: api.Scheme, + NegotiatedSerializer: api.Codecs, } if err := s.InstallAPIGroups([]genericapiserver.APIGroupInfo{apiGroupInfo}); err != nil { return fmt.Errorf("Error in installing API: %v", err) diff --git a/pkg/api/register.go b/pkg/api/register.go index 5edc2d98a26..13b2ef7ba57 100644 --- a/pkg/api/register.go +++ b/pkg/api/register.go @@ -29,9 +29,6 @@ var Scheme = runtime.NewScheme() // Codecs provides access to encoding and decoding for the scheme var Codecs = serializer.NewCodecFactory(Scheme) -// StreamCodecs provides access to streaming encoding and decoding for the scheme -var StreamCodecs = serializer.NewStreamingCodecFactory(Scheme) - // GroupName is the group name use in this package const GroupName = "" diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index 248e1f8bbb6..423f09bfc4e 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -457,12 +457,11 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag // test/integration/auth_test.go is currently the most comprehensive status code test reqScope := RequestScope{ - ContextFunc: ctxFn, - Serializer: a.group.Serializer, - StreamSerializer: a.group.StreamSerializer, - ParameterCodec: a.group.ParameterCodec, - Creater: a.group.Creater, - Convertor: a.group.Convertor, + ContextFunc: ctxFn, + Serializer: a.group.Serializer, + ParameterCodec: a.group.ParameterCodec, + Creater: a.group.Creater, + Convertor: a.group.Convertor, // TODO: This seems wrong for cross-group subresources. It makes an assumption that a subresource and its parent are in the same group version. Revisit this. Resource: a.group.GroupVersion.WithResource(resource), @@ -641,7 +640,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("watch"+namespaced+kind+strings.Title(subresource)). - Produces(a.group.StreamSerializer.SupportedMediaTypes()...). + Produces(a.group.Serializer.SupportedStreamingMediaTypes()...). Returns(http.StatusOK, "OK", versionedWatchEvent). Writes(versionedWatchEvent) if err := addObjectParams(ws, route, versionedListOptions); err != nil { @@ -660,7 +659,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("watch"+namespaced+kind+strings.Title(subresource)+"List"). - Produces(a.group.StreamSerializer.SupportedMediaTypes()...). + Produces(a.group.Serializer.SupportedStreamingMediaTypes()...). Returns(http.StatusOK, "OK", versionedWatchEvent). Writes(versionedWatchEvent) if err := addObjectParams(ws, route, versionedListOptions); err != nil { diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index a5fd87e3bcd..ac6a4547fc9 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -85,11 +85,8 @@ type APIGroupVersion struct { // Serializer is used to determine how to convert responses from API methods into bytes to send over // the wire. - Serializer runtime.NegotiatedSerializer - // StreamSerializer is used for sending a series of objects to the client over a single channel, where - // the underlying channel has no innate framing (such as an io.Writer) - StreamSerializer runtime.NegotiatedSerializer - ParameterCodec runtime.ParameterCodec + Serializer runtime.NegotiatedSerializer + ParameterCodec runtime.ParameterCodec Typer runtime.ObjectTyper Creater runtime.ObjectCreater diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 36e09c5901b..4a0e81a763e 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -151,11 +151,12 @@ func addTestTypes() { &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &ListOptions{}, &api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}, &SimpleXGSubresource{}) - api.Scheme.AddKnownTypes(testGroupVersion, &api.Pod{}) + api.Scheme.AddKnownTypes(testGroupVersion, &v1.Pod{}) api.Scheme.AddKnownTypes(testInternalGroupVersion, &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &api.ListOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}, &SimpleXGSubresource{}) + api.Scheme.AddKnownTypes(testInternalGroupVersion, &api.Pod{}) // Register SimpleXGSubresource in both testGroupVersion and testGroup2Version, and also their // their corresponding internal versions, to verify that the desired group version object is // served in the tests. @@ -287,7 +288,6 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = grouplessGroupVersion group.OptionsExternalVersion = &grouplessGroupVersion group.Serializer = api.Codecs - group.StreamSerializer = api.StreamCodecs if err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } @@ -300,7 +300,6 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = testGroupVersion group.OptionsExternalVersion = &testGroupVersion group.Serializer = api.Codecs - group.StreamSerializer = api.StreamCodecs if err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } @@ -313,7 +312,6 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = newGroupVersion group.OptionsExternalVersion = &newGroupVersion group.Serializer = api.Codecs - group.StreamSerializer = api.StreamCodecs if err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } @@ -356,6 +354,8 @@ func TestSimpleOptionsSetupRight(t *testing.T) { } type SimpleRESTStorage struct { + lock sync.Mutex + errors map[string]error list []apiservertesting.Simple item apiservertesting.Simple @@ -518,6 +518,8 @@ func (storage *SimpleRESTStorage) Update(ctx api.Context, obj runtime.Object) (r // Implement ResourceWatcher. func (storage *SimpleRESTStorage) Watch(ctx api.Context, options *api.ListOptions) (watch.Interface, error) { + storage.lock.Lock() + defer storage.lock.Unlock() storage.checkContext(ctx) storage.requestedLabelSelector = labels.Everything() if options != nil && options.LabelSelector != nil { @@ -539,6 +541,12 @@ func (storage *SimpleRESTStorage) Watch(ctx api.Context, options *api.ListOption return storage.fakeWatch, nil } +func (storage *SimpleRESTStorage) Watcher() *watch.FakeWatcher { + storage.lock.Lock() + defer storage.lock.Unlock() + return storage.fakeWatch +} + // Implement Redirector. var _ = rest.Redirector(&SimpleRESTStorage{}) @@ -1234,8 +1242,9 @@ func TestMetadata(t *testing.T) { } } if matches["text/plain,application/json,application/yaml,application/vnd.kubernetes.protobuf"] == 0 || + matches["application/json,application/json;stream=watch,application/vnd.kubernetes.protobuf,application/vnd.kubernetes.protobuf;stream=watch"] == 0 || matches["application/json,application/yaml,application/vnd.kubernetes.protobuf"] == 0 || - matches["application/json,application/vnd.kubernetes.protobuf"] == 0 || + matches["application/json"] == 0 || matches["*/*"] == 0 || len(matches) != 5 { t.Errorf("unexpected mime types: %v", matches) @@ -2415,9 +2424,8 @@ func TestUpdateREST(t *testing.T) { GroupVersion: newGroupVersion, OptionsExternalVersion: &newGroupVersion, - Serializer: api.Codecs, - StreamSerializer: api.StreamCodecs, - ParameterCodec: api.ParameterCodec, + Serializer: api.Codecs, + ParameterCodec: api.ParameterCodec, } } @@ -2500,9 +2508,8 @@ func TestParentResourceIsRequired(t *testing.T) { GroupVersion: newGroupVersion, OptionsExternalVersion: &newGroupVersion, - Serializer: api.Codecs, - StreamSerializer: api.StreamCodecs, - ParameterCodec: api.ParameterCodec, + Serializer: api.Codecs, + ParameterCodec: api.ParameterCodec, } container := restful.NewContainer() if err := group.InstallREST(container); err == nil { @@ -2532,9 +2539,8 @@ func TestParentResourceIsRequired(t *testing.T) { GroupVersion: newGroupVersion, OptionsExternalVersion: &newGroupVersion, - Serializer: api.Codecs, - StreamSerializer: api.StreamCodecs, - ParameterCodec: api.ParameterCodec, + Serializer: api.Codecs, + ParameterCodec: api.ParameterCodec, } container = restful.NewContainer() if err := group.InstallREST(container); err != nil { @@ -3246,7 +3252,6 @@ func TestXGSubresource(t *testing.T) { GroupVersion: testGroupVersion, OptionsExternalVersion: &testGroupVersion, Serializer: api.Codecs, - StreamSerializer: api.StreamCodecs, SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{ "simple/subsimple": testGroup2Version.WithKind("SimpleXGSubresource"), diff --git a/pkg/apiserver/negotiate.go b/pkg/apiserver/negotiate.go index 1457addbfc2..3a5a1380663 100644 --- a/pkg/apiserver/negotiate.go +++ b/pkg/apiserver/negotiate.go @@ -27,32 +27,53 @@ import ( "k8s.io/kubernetes/pkg/runtime" ) -func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, string, error) { +func negotiateOutput(req *http.Request, supported []string) (string, map[string]string, error) { acceptHeader := req.Header.Get("Accept") - supported := ns.SupportedMediaTypes() if len(acceptHeader) == 0 && len(supported) > 0 { acceptHeader = supported[0] } accept, ok := negotiate(acceptHeader, supported) if !ok { - return nil, "", errNotAcceptable{supported} + return "", nil, errNotAcceptable{supported} } pretty := isPrettyPrint(req) if _, ok := accept.Params["pretty"]; !ok && pretty { accept.Params["pretty"] = "1" } + mediaType := accept.Type if len(accept.SubType) > 0 { mediaType += "/" + accept.SubType } - if s, ok := ns.SerializerForMediaType(mediaType, accept.Params); ok { + + return mediaType, accept.Params, nil +} + +func negotiateOutputSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, string, error) { + supported := ns.SupportedMediaTypes() + mediaType, params, err := negotiateOutput(req, supported) + if err != nil { + return nil, "", err + } + if s, ok := ns.SerializerForMediaType(mediaType, params); ok { return s, mediaType, nil } - return nil, "", errNotAcceptable{supported} } +func negotiateOutputStreamSerializer(req *http.Request, ns runtime.NegotiatedSerializer) (runtime.Serializer, runtime.Framer, string, string, error) { + supported := ns.SupportedMediaTypes() + mediaType, params, err := negotiateOutput(req, supported) + if err != nil { + return nil, nil, "", "", err + } + if s, f, exactMediaType, ok := ns.StreamingSerializerForMediaType(mediaType, params); ok { + return s, f, mediaType, exactMediaType, nil + } + return nil, nil, "", "", errNotAcceptable{supported} +} + func negotiateInputSerializer(req *http.Request, s runtime.NegotiatedSerializer) (runtime.Serializer, error) { supported := s.SupportedMediaTypes() mediaType := req.Header.Get("Content-Type") diff --git a/pkg/apiserver/negotiate_test.go b/pkg/apiserver/negotiate_test.go index 8e59d6d69cc..c37137aee07 100644 --- a/pkg/apiserver/negotiate_test.go +++ b/pkg/apiserver/negotiate_test.go @@ -27,15 +27,19 @@ import ( ) type fakeNegotiater struct { - serializer runtime.Serializer - types []string - mediaType string - options map[string]string + serializer, streamSerializer runtime.Serializer + framer runtime.Framer + types, streamTypes []string + mediaType, streamMediaType string + options, streamOptions map[string]string } func (n *fakeNegotiater) SupportedMediaTypes() []string { return n.types } +func (n *fakeNegotiater) SupportedStreamingMediaTypes() []string { + return n.streamTypes +} func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, bool) { n.mediaType = mediaType @@ -45,6 +49,14 @@ func (n *fakeNegotiater) SerializerForMediaType(mediaType string, options map[st return n.serializer, n.serializer != nil } +func (n *fakeNegotiater) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { + n.streamMediaType = mediaType + if len(options) > 0 { + n.streamOptions = options + } + return n.streamSerializer, n.framer, mediaType, n.streamSerializer != nil +} + func (n *fakeNegotiater) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { return n.serializer } diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index a292f1520b7..ea4d039b159 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -71,8 +71,7 @@ type RequestScope struct { Namer ScopeNamer ContextFunc - Serializer runtime.NegotiatedSerializer - StreamSerializer runtime.NegotiatedSerializer + Serializer runtime.NegotiatedSerializer runtime.ParameterCodec Creater runtime.ObjectCreater diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index e316d7a18fa..a383314c86c 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -69,15 +69,19 @@ type textEncodable interface { // TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled. func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) { // negotiate for the stream serializer - serializer, mediaType, err := negotiateOutputSerializer(req.Request, scope.StreamSerializer) + serializer, framer, mediaType, exactMediaType, err := negotiateOutputStreamSerializer(req.Request, scope.Serializer) if err != nil { scope.err(err, res.ResponseWriter, req.Request) return } - encoder := scope.StreamSerializer.EncoderForVersion(serializer, scope.Kind.GroupVersion()) + if framer == nil { + scope.err(fmt.Errorf("no framer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request) + return + } + encoder := scope.Serializer.EncoderForVersion(serializer, scope.Kind.GroupVersion()) useTextFraming := false - if encodable, ok := encoder.(textEncodable); ok && encodable.EncodesAsText() { + if encodable, ok := serializer.(textEncodable); ok && encodable.EncodesAsText() { useTextFraming = true } @@ -94,7 +98,8 @@ func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Reques scope: scope, useTextFraming: useTextFraming, - mediaType: mediaType, + mediaType: exactMediaType, + framer: framer, encoder: encoder, embeddedEncoder: embeddedEncoder, fixup: func(obj runtime.Object) { @@ -118,6 +123,8 @@ type WatchServer struct { useTextFraming bool // the media type this watch is being served with mediaType string + // used to frame the watch stream + framer runtime.Framer // used to encode the watch stream event itself encoder runtime.Encoder // used to encode the nested object in the watch stream @@ -153,16 +160,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { return } - // get a framed encoder - f, ok := s.encoder.(streaming.Framer) - if !ok { - // programmer error - err := fmt.Errorf("no streaming support is available for media type %q", s.mediaType) - utilruntime.HandleError(err) - s.scope.err(errors.NewBadRequest(err.Error()), w, req) - return - } - framer := f.NewFrameWriter(w) + framer := s.framer.NewFrameWriter(w) if framer == nil { // programmer error err := fmt.Errorf("no stream framing support is available for media type %q", s.mediaType) @@ -208,7 +206,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // type } if err := e.Encode((*versioned.InternalEvent)(&event)); err != nil { - utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err)) + utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e)) // client disconnect. return } diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 0faf1f8d268..8f2d0c42b76 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -18,6 +18,7 @@ package apiserver import ( "encoding/json" + "fmt" "io" "io/ioutil" "math/rand" @@ -38,8 +39,11 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" + "k8s.io/kubernetes/pkg/util/diff" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/pkg/watch/versioned" ) // watchJSON defines the expected JSON wire equivalent of watch.Event @@ -48,6 +52,19 @@ type watchJSON struct { Object json.RawMessage `json:"object,omitempty"` } +// roundTripOrDie round trips an object to get defaults set. +func roundTripOrDie(codec runtime.Codec, object runtime.Object) runtime.Object { + data, err := runtime.Encode(codec, object) + if err != nil { + panic(err) + } + obj, err := runtime.Decode(codec, data) + if err != nil { + panic(err) + } + return obj +} + var watchTestTable = []struct { t watch.EventType obj runtime.Object @@ -57,6 +74,15 @@ var watchTestTable = []struct { {watch.Deleted, &apiservertesting.Simple{ObjectMeta: api.ObjectMeta{Name: "bar"}}}, } +var podWatchTestTable = []struct { + t watch.EventType + obj runtime.Object +}{ + {watch.Added, roundTripOrDie(codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}})}, + {watch.Modified, roundTripOrDie(codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})}, + {watch.Deleted, roundTripOrDie(codec, &api.Pod{ObjectMeta: api.ObjectMeta{Name: "bar"}})}, +} + func TestWatchWebsocket(t *testing.T) { simpleStorage := &SimpleRESTStorage{} _ = rest.Watcher(simpleStorage) // Give compile error if this doesn't work. @@ -111,65 +137,159 @@ func TestWatchWebsocket(t *testing.T) { } } -func TestWatchHTTP(t *testing.T) { +func TestWatchRead(t *testing.T) { simpleStorage := &SimpleRESTStorage{} + _ = rest.Watcher(simpleStorage) // Give compile error if this doesn't work. handler := handle(map[string]rest.Storage{"simples": simpleStorage}) server := httptest.NewServer(handler) // TODO: Uncomment when fix #19254 // defer server.Close() - client := http.Client{} dest, _ := url.Parse(server.URL) - dest.Path = "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/watch/simples" - dest.RawQuery = "" + dest.Path = "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/simples" + dest.RawQuery = "watch=1" - request, err := http.NewRequest("GET", dest.String(), nil) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - response, err := client.Do(request) - if err != nil { - t.Errorf("unexpected error: %v", err) - } - - if response.StatusCode != http.StatusOK { - t.Errorf("Unexpected response %#v", response) - } - - decoder := json.NewDecoder(response.Body) - - for i, item := range watchTestTable { - // Send - simpleStorage.fakeWatch.Action(item.t, item.obj) - // Test receive - var got watchJSON - err := decoder.Decode(&got) + connectHTTP := func(accept string) (io.ReadCloser, string) { + client := http.Client{} + request, err := http.NewRequest("GET", dest.String(), nil) if err != nil { - t.Fatalf("%d: Unexpected error: %v", i, err) + t.Fatalf("unexpected error: %v", err) } - if got.Type != item.t { - t.Errorf("%d: Unexpected type: %v", i, got.Type) - } - t.Logf("obj: %v", string(got.Object)) - gotObj, err := runtime.Decode(codec, got.Object) - if err != nil { - t.Fatalf("Decode error: %v", err) - } - t.Logf("obj: %#v", gotObj) - if _, err := api.GetReference(gotObj); err != nil { - t.Errorf("Unable to construct reference: %v", err) - } - if e, a := item.obj, gotObj; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %#v, got %#v", e, a) - } - } - simpleStorage.fakeWatch.Stop() + request.Header.Add("Accept", accept) - var got watchJSON - err = decoder.Decode(&got) - if err == nil { - t.Errorf("Unexpected non-error") + response, err := client.Do(request) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if response.StatusCode != http.StatusOK { + t.Fatalf("Unexpected response %#v", response) + } + return response.Body, response.Header.Get("Content-Type") + } + + connectWebSocket := func(accept string) (io.ReadCloser, string) { + dest := *dest + dest.Scheme = "ws" // Required by websocket, though the server never sees it. + config, err := websocket.NewConfig(dest.String(), "http://localhost") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + config.Header.Add("Accept", accept) + ws, err := websocket.DialConfig(config) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + return ws, "__default__" + } + + testCases := []struct { + Accept string + ExpectedContentType string + MediaType string + }{ + { + Accept: "application/json", + ExpectedContentType: "application/json", + MediaType: "application/json", + }, + // TODO: yaml stream serialization requires that RawExtension.MarshalJSON + // be able to understand nested encoding (since yaml calls json.Marshal + // rather than yaml.Marshal, which results in the raw bytes being in yaml). + // Same problem as thirdparty object. + /*{ + Accept: "application/yaml", + ExpectedContentType: "application/yaml;stream=watch", + MediaType: "application/yaml", + },*/ + { + Accept: "application/vnd.kubernetes.protobuf", + ExpectedContentType: "application/vnd.kubernetes.protobuf;stream=watch", + MediaType: "application/vnd.kubernetes.protobuf", + }, + { + Accept: "application/vnd.kubernetes.protobuf;stream=watch", + ExpectedContentType: "application/vnd.kubernetes.protobuf;stream=watch", + MediaType: "application/vnd.kubernetes.protobuf", + }, + } + protocols := []struct { + name string + selfFraming bool + fn func(string) (io.ReadCloser, string) + }{ + {name: "http", fn: connectHTTP}, + {name: "websocket", selfFraming: true, fn: connectWebSocket}, + } + + for _, protocol := range protocols { + for _, test := range testCases { + serializer, framer, _, ok := api.Codecs.StreamingSerializerForMediaType(test.MediaType, nil) + if !ok { + t.Fatal(framer) + } + + r, contentType := protocol.fn(test.Accept) + defer r.Close() + + if contentType != "__default__" && contentType != test.ExpectedContentType { + t.Errorf("Unexpected content type: %#v", contentType) + } + objectSerializer, ok := api.Codecs.SerializerForMediaType(test.MediaType, nil) + if !ok { + t.Fatal(framer) + } + objectCodec := api.Codecs.DecoderToVersion(objectSerializer, testInternalGroupVersion) + + var fr io.Reader = r + if !protocol.selfFraming { + fr = framer.NewFrameReader(r) + } + d := streaming.NewDecoder(fr, serializer) + + var w *watch.FakeWatcher + for w == nil { + w = simpleStorage.Watcher() + time.Sleep(time.Millisecond) + } + + for i, item := range podWatchTestTable { + action, object := item.t, item.obj + name := fmt.Sprintf("%s-%s-%d", protocol.name, test.MediaType, i) + + // Send + w.Action(action, object) + // Test receive + var got versioned.Event + _, _, err := d.Decode(nil, &got) + if err != nil { + t.Fatalf("%s: Unexpected error: %v", name, err) + } + if got.Type != string(action) { + t.Errorf("%s: Unexpected type: %v", name, got.Type) + } + + gotObj, err := runtime.Decode(objectCodec, got.Object.Raw) + if err != nil { + t.Fatalf("%s: Decode error: %v", name, err) + } + if _, err := api.GetReference(gotObj); err != nil { + t.Errorf("%s: Unable to construct reference: %v", name, err) + } + if e, a := object, gotObj; !api.Semantic.DeepEqual(e, a) { + t.Errorf("%s: different: %s", name, diff.ObjectDiff(e, a)) + } + } + w.Stop() + + var got versioned.Event + _, _, err := d.Decode(nil, &got) + if err == nil { + t.Errorf("Unexpected non-error") + } + + r.Close() + } } } @@ -189,7 +309,7 @@ func TestWatchHTTPAccept(t *testing.T) { t.Errorf("unexpected error: %v", err) } - request.Header.Set("Accept", "application/yaml") + request.Header.Set("Accept", "application/XYZ") response, err := client.Do(request) if err != nil { t.Errorf("unexpected error: %v", err) @@ -379,11 +499,17 @@ func TestWatchHTTPTimeout(t *testing.T) { timeoutCh := make(chan time.Time) done := make(chan struct{}) + _, framer, _, ok := api.Codecs.StreamingSerializerForMediaType("application/json", nil) + if !ok { + t.Fatal(framer) + } + // Setup a new watchserver watchServer := &WatchServer{ watching: watcher, mediaType: "testcase/json", + framer: framer, encoder: newCodec, embeddedEncoder: newCodec, diff --git a/pkg/genericapiserver/genericapiserver.go b/pkg/genericapiserver/genericapiserver.go index b8199818632..c04fb3ba083 100644 --- a/pkg/genericapiserver/genericapiserver.go +++ b/pkg/genericapiserver/genericapiserver.go @@ -82,8 +82,6 @@ type APIGroupInfo struct { Scheme *runtime.Scheme // NegotiatedSerializer controls how this group encodes and decodes data NegotiatedSerializer runtime.NegotiatedSerializer - // NegotiatedStreamSerializer controls how streaming responses are encoded and decoded. - NegotiatedStreamSerializer runtime.NegotiatedSerializer // ParameterCodec performs conversions for query parameters passed to API calls ParameterCodec runtime.ParameterCodec @@ -824,7 +822,6 @@ func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV version.Storage = storage version.ParameterCodec = apiGroupInfo.ParameterCodec version.Serializer = apiGroupInfo.NegotiatedSerializer - version.StreamSerializer = apiGroupInfo.NegotiatedStreamSerializer version.Creater = apiGroupInfo.Scheme version.Convertor = apiGroupInfo.Scheme version.Typer = apiGroupInfo.Scheme diff --git a/pkg/genericapiserver/genericapiserver_test.go b/pkg/genericapiserver/genericapiserver_test.go index 1da9a535fa4..173e08b4288 100644 --- a/pkg/genericapiserver/genericapiserver_test.go +++ b/pkg/genericapiserver/genericapiserver_test.go @@ -129,7 +129,6 @@ func TestInstallAPIGroups(t *testing.T) { IsLegacyGroup: true, ParameterCodec: api.ParameterCodec, NegotiatedSerializer: api.Codecs, - NegotiatedStreamSerializer: api.StreamCodecs, }, { // extensions group version @@ -138,7 +137,6 @@ func TestInstallAPIGroups(t *testing.T) { OptionsExternalVersion: &apiGroupMeta.GroupVersion, ParameterCodec: api.ParameterCodec, NegotiatedSerializer: api.Codecs, - NegotiatedStreamSerializer: api.StreamCodecs, }, } s.InstallAPIGroups(apiGroupsInfo) diff --git a/pkg/master/master.go b/pkg/master/master.go index fe32c15e754..e961fcf5d2a 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -203,11 +203,10 @@ func (m *Master) InstallAPIs(c *Config) { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ "v1": m.v1ResourcesStorage, }, - IsLegacyGroup: true, - Scheme: api.Scheme, - ParameterCodec: api.ParameterCodec, - NegotiatedSerializer: api.Codecs, - NegotiatedStreamSerializer: api.StreamCodecs, + IsLegacyGroup: true, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, } if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) { apiGroupInfo.SubresourceGroupVersionKind = map[string]unversioned.GroupVersionKind{ @@ -260,11 +259,10 @@ func (m *Master) InstallAPIs(c *Config) { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ "v1beta1": extensionResources, }, - OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, - Scheme: api.Scheme, - ParameterCodec: api.ParameterCodec, - NegotiatedSerializer: api.Codecs, - NegotiatedStreamSerializer: api.StreamCodecs, + OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, } apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) @@ -293,11 +291,10 @@ func (m *Master) InstallAPIs(c *Config) { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ "v1": autoscalingResources, }, - OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, - Scheme: api.Scheme, - ParameterCodec: api.ParameterCodec, - NegotiatedSerializer: api.Codecs, - NegotiatedStreamSerializer: api.StreamCodecs, + OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, } apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) @@ -326,11 +323,10 @@ func (m *Master) InstallAPIs(c *Config) { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ "v1": batchResources, }, - OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, - Scheme: api.Scheme, - ParameterCodec: api.ParameterCodec, - NegotiatedSerializer: api.Codecs, - NegotiatedStreamSerializer: api.StreamCodecs, + OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, } apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) @@ -358,11 +354,10 @@ func (m *Master) InstallAPIs(c *Config) { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ "v1alpha1": appsResources, }, - OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, - Scheme: api.Scheme, - ParameterCodec: api.ParameterCodec, - NegotiatedSerializer: api.Codecs, - NegotiatedStreamSerializer: api.StreamCodecs, + OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, } apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) @@ -713,9 +708,8 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV Storage: storage, OptionsExternalVersion: &optionsExternalVersion, - Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion), - StreamSerializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.StreamCodecs, kind, externalVersion, internalVersion), - ParameterCodec: thirdpartyresourcedata.NewThirdPartyParameterCodec(api.ParameterCodec), + Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion), + ParameterCodec: thirdpartyresourcedata.NewThirdPartyParameterCodec(api.ParameterCodec), Context: m.RequestContextMapper, diff --git a/pkg/registry/thirdpartyresourcedata/codec.go b/pkg/registry/thirdpartyresourcedata/codec.go index 3c09c6c6a07..a7f46a2ee0a 100644 --- a/pkg/registry/thirdpartyresourcedata/codec.go +++ b/pkg/registry/thirdpartyresourcedata/codec.go @@ -33,7 +33,6 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/runtime/serializer/streaming" ) type thirdPartyObjectConverter struct { @@ -203,10 +202,7 @@ type thirdPartyResourceDataCodec struct { kind string } -var ( - _ runtime.Codec = &thirdPartyResourceDataCodec{} - _ streaming.Framer = &thirdPartyResourceDataCodec{} -) +var _ runtime.Codec = &thirdPartyResourceDataCodec{} func NewCodec(codec runtime.Codec, kind string) runtime.Codec { return &thirdPartyResourceDataCodec{codec, kind} @@ -416,24 +412,6 @@ func (t *thirdPartyResourceDataCodec) EncodeToStream(obj runtime.Object, stream } } -// NewFrameWriter calls into the nested encoder to expose its framing -func (c *thirdPartyResourceDataCodec) NewFrameWriter(w io.Writer) io.Writer { - f, ok := c.delegate.(streaming.Framer) - if !ok { - return nil - } - return f.NewFrameWriter(w) -} - -// NewFrameReader calls into the nested decoder to expose its framing -func (c *thirdPartyResourceDataCodec) NewFrameReader(r io.Reader) io.Reader { - f, ok := c.delegate.(streaming.Framer) - if !ok { - return nil - } - return f.NewFrameReader(r) -} - func NewObjectCreator(group, version string, delegate runtime.ObjectCreater) runtime.ObjectCreater { return &thirdPartyResourceDataCreator{group, version, delegate} } diff --git a/pkg/runtime/helper.go b/pkg/runtime/helper.go index ac23e3a2689..8bb33a91f4c 100644 --- a/pkg/runtime/helper.go +++ b/pkg/runtime/helper.go @@ -18,6 +18,7 @@ package runtime import ( "fmt" + "io" "reflect" "k8s.io/kubernetes/pkg/api/unversioned" @@ -179,3 +180,12 @@ func SetZeroValue(objPtr Object) error { v.Set(reflect.Zero(v.Type())) return nil } + +// DefaultFramer is valid for any stream that can read objects serially without +// any separation in the stream. +var DefaultFramer = defaultFramer{} + +type defaultFramer struct{} + +func (defaultFramer) NewFrameReader(r io.Reader) io.Reader { return r } +func (defaultFramer) NewFrameWriter(w io.Writer) io.Writer { return w } diff --git a/pkg/runtime/interfaces.go b/pkg/runtime/interfaces.go index 67b37b4401b..be2fa71321b 100644 --- a/pkg/runtime/interfaces.go +++ b/pkg/runtime/interfaces.go @@ -80,12 +80,37 @@ type ParameterCodec interface { EncodeParameters(obj Object, to unversioned.GroupVersion) (url.Values, error) } +// Framer is a factory for creating readers and writers that obey a particular framing pattern. +type Framer interface { + NewFrameReader(r io.Reader) io.Reader + NewFrameWriter(w io.Writer) io.Writer +} + // NegotiatedSerializer is an interface used for obtaining encoders, decoders, and serializers // for multiple supported media types. type NegotiatedSerializer interface { + // SupportedMediaTypes is the media types supported for reading and writing single objects. SupportedMediaTypes() []string + // SerializerForMediaType returns a serializer for the provided media type. Options is a set of + // parameters applied to the media type that may modify the resulting output. SerializerForMediaType(mediaType string, options map[string]string) (Serializer, bool) + + // SupportedStreamingMediaTypes returns the media types of the supported streaming serializers. + // Streaming serializers control how multiple objects are written to a stream output. + SupportedStreamingMediaTypes() []string + // StreamingSerializerForMediaType returns a serializer for the provided media type that supports + // reading and writing multiple objects to a stream. It returns a framer and serializer, or an + // error if no such serializer can be created. Options is a set of parameters applied to the + // media type that may modify the resulting output. + StreamingSerializerForMediaType(mediaType string, options map[string]string) (Serializer, Framer, string, bool) + + // EncoderForVersion returns an encoder that ensures objects being written to the provided + // serializer are in the provided group version. + // TODO: take multiple group versions EncoderForVersion(serializer Serializer, gv unversioned.GroupVersion) Encoder + // DecoderForVersion returns a decoder that ensures objects being read by the provided + // serializer are in the provided group version by default. + // TODO: take multiple group versions DecoderToVersion(serializer Serializer, gv unversioned.GroupVersion) Decoder } diff --git a/pkg/runtime/serializer/codec_factory.go b/pkg/runtime/serializer/codec_factory.go index 54b68368279..1ed0312b143 100644 --- a/pkg/runtime/serializer/codec_factory.go +++ b/pkg/runtime/serializer/codec_factory.go @@ -17,13 +17,10 @@ limitations under the License. package serializer import ( - "io/ioutil" - "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime/serializer/json" "k8s.io/kubernetes/pkg/runtime/serializer/recognizer" - "k8s.io/kubernetes/pkg/runtime/serializer/streaming" "k8s.io/kubernetes/pkg/runtime/serializer/versioning" ) @@ -44,17 +41,25 @@ type serializerType struct { // be expected to pass into or a gvk to Decode, since no type information will be available on // the object itself. RawSerializer runtime.Serializer - // Specialize gives the type the opportunity to return a different serializer implementation if // the content type contains alternate operations. Here it is used to implement "pretty" as an // option to application/json, but could also be used to allow serializers to perform type // defaulting or alter output. Specialize func(map[string]string) (runtime.Serializer, bool) + + AcceptStreamContentTypes []string + StreamContentType string + + Framer runtime.Framer + StreamSerializer runtime.Serializer + StreamSpecialize func(map[string]string) (runtime.Serializer, bool) } func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []serializerType { jsonSerializer := json.NewSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme), false) jsonPrettySerializer := json.NewSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme), true) + yamlSerializer := json.NewYAMLSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme)) + serializers := []serializerType{ { AcceptContentTypes: []string{"application/json"}, @@ -62,15 +67,26 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []seri FileExtensions: []string{"json"}, Serializer: jsonSerializer, PrettySerializer: jsonPrettySerializer, + + AcceptStreamContentTypes: []string{"application/json", "application/json;stream=watch"}, + StreamContentType: "application/json", + Framer: json.Framer, + StreamSerializer: jsonSerializer, + }, + { + AcceptContentTypes: []string{"application/yaml"}, + ContentType: "application/yaml", + FileExtensions: []string{"yaml"}, + Serializer: yamlSerializer, + + // TODO: requires runtime.RawExtension to properly distinguish when the nested content is + // yaml, because the yaml encoder invokes MarshalJSON first + //AcceptStreamContentTypes: []string{"application/yaml", "application/yaml;stream=watch"}, + //StreamContentType: "application/yaml;stream=watch", + //Framer: json.YAMLFramer, + //StreamSerializer: yamlSerializer, }, } - yamlSerializer := json.NewYAMLSerializer(mf, scheme, runtime.ObjectTyperToTyper(scheme)) - serializers = append(serializers, serializerType{ - AcceptContentTypes: []string{"application/yaml"}, - ContentType: "application/yaml", - FileExtensions: []string{"yaml"}, - Serializer: yamlSerializer, - }) for _, fn := range serializerExtensions { if serializer, ok := fn(scheme); ok { @@ -83,10 +99,11 @@ func newSerializersForScheme(scheme *runtime.Scheme, mf json.MetaFactory) []seri // CodecFactory provides methods for retrieving codecs and serializers for specific // versions and content types. type CodecFactory struct { - scheme *runtime.Scheme - serializers []serializerType - universal runtime.Decoder - accepts []string + scheme *runtime.Scheme + serializers []serializerType + universal runtime.Decoder + accepts []string + streamingAccepts []string legacySerializer runtime.Serializer } @@ -102,40 +119,12 @@ func NewCodecFactory(scheme *runtime.Scheme) CodecFactory { return newCodecFactory(scheme, serializers) } -// NewStreamingCodecFactory returns serializers that support the streaming.Serializer interface. -// TODO: determine whether this returns a streaming.Serializer AND runtime.Serializer, or whether -// streaming should be added to the CodecFactory interface. -func NewStreamingCodecFactory(scheme *runtime.Scheme) CodecFactory { - return newStreamingCodecFactory(scheme, json.DefaultMetaFactory) -} - -// newStreamingCodecFactory handles providing streaming codecs -func newStreamingCodecFactory(scheme *runtime.Scheme, mf json.MetaFactory) CodecFactory { - serializers := newSerializersForScheme(scheme, mf) - streamers := []serializerType{} - for i := range serializers { - if serializers[i].RawSerializer != nil { - serializers[i].Serializer = serializers[i].RawSerializer - } - if s, ok := serializers[i].Serializer.(streaming.Framer); ok { - // TODO: more elegant option? - // TODO: add tests and assertions for which serializers should - // have framers. We need to answer whether all Serializers - // are streaming serializers or not. - if s.NewFrameWriter(ioutil.Discard) == nil { - continue - } - streamers = append(streamers, serializers[i]) - } - } - return newCodecFactory(scheme, streamers) -} - // newCodecFactory is a helper for testing that allows a different metafactory to be specified. func newCodecFactory(scheme *runtime.Scheme, serializers []serializerType) CodecFactory { decoders := make([]runtime.Decoder, 0, len(serializers)) accepts := []string{} alreadyAccepted := make(map[string]struct{}) + var legacySerializer runtime.Serializer for _, d := range serializers { decoders = append(decoders, d.Serializer) @@ -153,11 +142,29 @@ func newCodecFactory(scheme *runtime.Scheme, serializers []serializerType) Codec if legacySerializer == nil { legacySerializer = serializers[0].Serializer } + + streamAccepts := []string{} + alreadyAccepted = make(map[string]struct{}) + for _, d := range serializers { + if len(d.StreamContentType) == 0 { + continue + } + for _, mediaType := range d.AcceptStreamContentTypes { + if _, ok := alreadyAccepted[mediaType]; ok { + continue + } + alreadyAccepted[mediaType] = struct{}{} + streamAccepts = append(streamAccepts, mediaType) + } + } + return CodecFactory{ scheme: scheme, serializers: serializers, universal: recognizer.NewDecoder(decoders...), - accepts: accepts, + + accepts: accepts, + streamingAccepts: streamAccepts, legacySerializer: legacySerializer, } @@ -170,6 +177,11 @@ func (f CodecFactory) SupportedMediaTypes() []string { return f.accepts } +// SupportedStreamingMediaTypes returns the RFC2046 media types that this factory has stream serializers for. +func (f CodecFactory) SupportedStreamingMediaTypes() []string { + return f.streamingAccepts +} + // LegacyCodec encodes output to a given API version, and decodes output into the internal form from // any recognized source. The returned codec will always encode output to JSON. // @@ -234,6 +246,24 @@ func (f CodecFactory) SerializerForMediaType(mediaType string, options map[strin return nil, false } +// StreamingSerializerForMediaType returns a serializer that matches the provided RFC2046 mediaType, or false if no such +// serializer exists +func (f CodecFactory) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { + for _, s := range f.serializers { + for _, accepted := range s.AcceptStreamContentTypes { + if accepted == mediaType { + if s.StreamSpecialize != nil && len(options) > 0 { + serializer, ok := s.StreamSpecialize(options) + // TODO: have StreamSpecialize return exact content type + return serializer, s.Framer, s.StreamContentType, ok + } + return s.StreamSerializer, s.Framer, s.StreamContentType, true + } + } + } + return nil, nil, "", false +} + // SerializerForFileExtension returns a serializer for the provided extension, or false if no serializer matches. func (f CodecFactory) SerializerForFileExtension(extension string) (runtime.Serializer, bool) { for _, s := range f.serializers { diff --git a/pkg/runtime/serializer/json/json.go b/pkg/runtime/serializer/json/json.go index bb19b1ee784..e91fe262042 100644 --- a/pkg/runtime/serializer/json/json.go +++ b/pkg/runtime/serializer/json/json.go @@ -31,7 +31,7 @@ import ( // NewSerializer creates a JSON serializer that handles encoding versioned objects into the proper JSON form. If typer // is not nil, the object has the group, version, and kind fields set. -func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.Typer, pretty bool) runtime.Serializer { +func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.Typer, pretty bool) *Serializer { return &Serializer{ meta: meta, creater: creater, @@ -44,7 +44,7 @@ func NewSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtim // NewYAMLSerializer creates a YAML serializer that handles encoding versioned objects into the proper YAML form. If typer // is not nil, the object has the group, version, and kind fields set. This serializer supports only the subset of YAML that // matches JSON, and will error if constructs are used that do not serialize to JSON. -func NewYAMLSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.Typer) runtime.Serializer { +func NewYAMLSerializer(meta MetaFactory, creater runtime.ObjectCreater, typer runtime.Typer) *Serializer { return &Serializer{ meta: meta, creater: creater, @@ -194,26 +194,6 @@ func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) { return ok, nil } -// NewFrameWriter implements stream framing for this serializer -func (s *Serializer) NewFrameWriter(w io.Writer) io.Writer { - if s.yaml { - // TODO: needs document framing - return nil - } - // we can write JSON objects directly to the writer, because they are self-framing - return w -} - -// NewFrameReader implements stream framing for this serializer -func (s *Serializer) NewFrameReader(r io.Reader) io.Reader { - if s.yaml { - // TODO: needs document framing - return nil - } - // we need to extract the JSON chunks of data to pass to Decode() - return framer.NewJSONFramedReader(r) -} - // EncodesAsText returns true because both JSON and YAML are considered textual representations // of data. This is used to determine whether the serialized object should be transmitted over // a WebSocket Text or Binary frame. This must remain true for legacy compatibility with v1.1 @@ -221,3 +201,49 @@ func (s *Serializer) NewFrameReader(r io.Reader) io.Reader { func (s *Serializer) EncodesAsText() bool { return true } + +// Framer is the default JSON framing behavior, with newlines delimiting individual objects. +var Framer = jsonFramer{} + +type jsonFramer struct{} + +// NewFrameWriter implements stream framing for this serializer +func (jsonFramer) NewFrameWriter(w io.Writer) io.Writer { + // we can write JSON objects directly to the writer, because they are self-framing + return w +} + +// NewFrameReader implements stream framing for this serializer +func (jsonFramer) NewFrameReader(r io.Reader) io.Reader { + // we need to extract the JSON chunks of data to pass to Decode() + return framer.NewJSONFramedReader(r) +} + +// Framer is the default JSON framing behavior, with newlines delimiting individual objects. +var YAMLFramer = yamlFramer{} + +type yamlFramer struct{} + +// NewFrameWriter implements stream framing for this serializer +func (yamlFramer) NewFrameWriter(w io.Writer) io.Writer { + return yamlFrameWriter{w} +} + +// NewFrameReader implements stream framing for this serializer +func (yamlFramer) NewFrameReader(r io.Reader) io.Reader { + // extract the YAML document chunks directly + return utilyaml.NewDocumentDecoder(r) +} + +type yamlFrameWriter struct { + w io.Writer +} + +// Write separates each document with the YAML document separator (`---` followed by line +// break). Writers must write well formed YAML documents (include a final line break). +func (w yamlFrameWriter) Write(data []byte) (n int, err error) { + if _, err := w.w.Write([]byte("---\n")); err != nil { + return 0, err + } + return w.w.Write(data) +} diff --git a/pkg/runtime/serializer/protobuf/protobuf.go b/pkg/runtime/serializer/protobuf/protobuf.go index eb8c41ad680..8828831e007 100644 --- a/pkg/runtime/serializer/protobuf/protobuf.go +++ b/pkg/runtime/serializer/protobuf/protobuf.go @@ -240,16 +240,6 @@ func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) { return bytes.Equal(s.prefix, prefix), nil } -// NewFrameWriter implements stream framing for this serializer -func (s *Serializer) NewFrameWriter(w io.Writer) io.Writer { - return framer.NewLengthDelimitedFrameWriter(w) -} - -// NewFrameReader implements stream framing for this serializer -func (s *Serializer) NewFrameReader(r io.Reader) io.Reader { - return framer.NewLengthDelimitedFrameReader(r) -} - // copyKindDefaults defaults dst to the value in src if dst does not have a value set. func copyKindDefaults(dst, src *unversioned.GroupVersionKind) { if src == nil { @@ -435,12 +425,16 @@ func (s *RawSerializer) RecognizesData(peek io.Reader) (bool, error) { return false, nil } +var LengthDelimitedFramer = lengthDelimitedFramer{} + +type lengthDelimitedFramer struct{} + // NewFrameWriter implements stream framing for this serializer -func (s *RawSerializer) NewFrameWriter(w io.Writer) io.Writer { +func (lengthDelimitedFramer) NewFrameWriter(w io.Writer) io.Writer { return framer.NewLengthDelimitedFrameWriter(w) } // NewFrameReader implements stream framing for this serializer -func (s *RawSerializer) NewFrameReader(r io.Reader) io.Reader { +func (lengthDelimitedFramer) NewFrameReader(r io.Reader) io.Reader { return framer.NewLengthDelimitedFrameReader(r) } diff --git a/pkg/runtime/serializer/protobuf_extension.go b/pkg/runtime/serializer/protobuf_extension.go index dc24fc1c9b7..2c95e01f9ce 100644 --- a/pkg/runtime/serializer/protobuf_extension.go +++ b/pkg/runtime/serializer/protobuf_extension.go @@ -21,11 +21,14 @@ import ( "k8s.io/kubernetes/pkg/runtime/serializer/protobuf" ) -// contentTypeProtobuf is the protobuf type exposed for Kubernetes. It is private to prevent others from -// depending on it unintentionally. -// TODO: potentially move to pkg/api (since it's part of the Kube public API) and pass it in to the -// CodecFactory on initialization. -const contentTypeProtobuf = "application/vnd.kubernetes.protobuf" +const ( + // contentTypeProtobuf is the protobuf type exposed for Kubernetes. It is private to prevent others from + // depending on it unintentionally. + // TODO: potentially move to pkg/api (since it's part of the Kube public API) and pass it in to the + // CodecFactory on initialization. + contentTypeProtobuf = "application/vnd.kubernetes.protobuf" + contentTypeProtobufWatch = contentTypeProtobuf + ";stream=watch" +) func protobufSerializer(scheme *runtime.Scheme) (serializerType, bool) { serializer := protobuf.NewSerializer(scheme, runtime.ObjectTyperToTyper(scheme), contentTypeProtobuf) @@ -36,6 +39,11 @@ func protobufSerializer(scheme *runtime.Scheme) (serializerType, bool) { FileExtensions: []string{"pb"}, Serializer: serializer, RawSerializer: raw, + + AcceptStreamContentTypes: []string{contentTypeProtobuf, contentTypeProtobufWatch}, + StreamContentType: contentTypeProtobufWatch, + Framer: protobuf.LengthDelimitedFramer, + StreamSerializer: raw, }, true } diff --git a/pkg/runtime/serializer/streaming/streaming.go b/pkg/runtime/serializer/streaming/streaming.go index b7daf774d26..2268ebeb3e3 100644 --- a/pkg/runtime/serializer/streaming/streaming.go +++ b/pkg/runtime/serializer/streaming/streaming.go @@ -27,12 +27,6 @@ import ( "k8s.io/kubernetes/pkg/runtime" ) -// Framer is a factory for creating readers and writers that obey a particular framing pattern. -type Framer interface { - NewFrameReader(r io.Reader) io.Reader - NewFrameWriter(w io.Writer) io.Writer -} - // Encoder is a runtime.Encoder on a stream. type Encoder interface { // Encode will write the provided object to the stream or return an error. It obeys the same diff --git a/pkg/runtime/serializer/versioning/versioning.go b/pkg/runtime/serializer/versioning/versioning.go index eeafa2a3245..36c66fa9f57 100644 --- a/pkg/runtime/serializer/versioning/versioning.go +++ b/pkg/runtime/serializer/versioning/versioning.go @@ -22,7 +22,6 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/runtime/serializer/streaming" ) // EnableCrossGroupDecoding modifies the given decoder in place, if it is a codec @@ -278,24 +277,6 @@ func (c *codec) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unv return c.encoder.EncodeToStream(obj, w, overrides...) } -// NewFrameWriter calls into the nested encoder to expose its framing -func (c *codec) NewFrameWriter(w io.Writer) io.Writer { - f, ok := c.encoder.(streaming.Framer) - if !ok { - return nil - } - return f.NewFrameWriter(w) -} - -// NewFrameReader calls into the nested decoder to expose its framing -func (c *codec) NewFrameReader(r io.Reader) io.Reader { - f, ok := c.decoder.(streaming.Framer) - if !ok { - return nil - } - return f.NewFrameReader(r) -} - // promoteOrPrependGroupVersion finds the group version in the provided group versions that has the same group as target. // If the group is found the returned array will have that group version in the first position - if the group is not found // the returned array will have target in the first position. diff --git a/pkg/util/yaml/decoder.go b/pkg/util/yaml/decoder.go index 5846fc20ff4..519242a383a 100644 --- a/pkg/util/yaml/decoder.go +++ b/pkg/util/yaml/decoder.go @@ -75,6 +75,58 @@ func (d *YAMLToJSONDecoder) Decode(into interface{}) error { return err } +// YAMLDecoder reads chunks of objects and returns ErrShortBuffer if +// the data is not sufficient. +type YAMLDecoder struct { + scanner *bufio.Scanner + remaining []byte +} + +// NewDocumentDecoder decodes YAML documents from the provided +// stream in chunks by converting each document (as defined by +// the YAML spec) into its own chunk. io.ErrShortBuffer will be +// returned if the entire buffer could not be read to assist +// the caller in framing the chunk. +func NewDocumentDecoder(r io.Reader) io.Reader { + scanner := bufio.NewScanner(r) + scanner.Split(splitYAMLDocument) + return &YAMLDecoder{ + scanner: scanner, + } +} + +// Read reads the previous slice into the buffer, or attempts to read +// the next chunk. +// TODO: switch to readline approach. +func (d *YAMLDecoder) Read(data []byte) (n int, err error) { + left := len(d.remaining) + if left == 0 { + // return the next chunk from the stream + if !d.scanner.Scan() { + err := d.scanner.Err() + if err == nil { + err = io.EOF + } + return 0, err + } + out := d.scanner.Bytes() + d.remaining = out + left = len(out) + } + + // fits within data + if left <= len(data) { + copy(data, d.remaining) + d.remaining = nil + return len(d.remaining), nil + } + + // caller will need to reread + copy(data, d.remaining[:left]) + d.remaining = d.remaining[left:] + return len(data), io.ErrShortBuffer +} + const yamlSeparator = "\n---" // splitYAMLDocument is a bufio.SplitFunc for splitting YAML streams into individual documents. diff --git a/pkg/watch/watch.go b/pkg/watch/watch.go index 93c34d936b5..e8fca0a6249 100644 --- a/pkg/watch/watch.go +++ b/pkg/watch/watch.go @@ -99,6 +99,14 @@ func (f *FakeWatcher) Stop() { } } +// Reset prepares the watcher to be reused. +func (f *FakeWatcher) Reset() { + f.Lock() + defer f.Unlock() + f.Stopped = false + f.result = make(chan Event) +} + func (f *FakeWatcher) ResultChan() <-chan Event { return f.result } diff --git a/test/integration/framework/serializer.go b/test/integration/framework/serializer.go index aacd913e95c..e6aa79574a4 100644 --- a/test/integration/framework/serializer.go +++ b/test/integration/framework/serializer.go @@ -49,6 +49,12 @@ func (s *wrappedSerializer) SerializerForMediaType(mediaType string, options map return s.serializer, true } +func (s *wrappedSerializer) SupportedStreamingMediaTypes() []string { + return nil +} +func (s *wrappedSerializer) StreamingSerializerForMediaType(mediaType string, options map[string]string) (runtime.Serializer, runtime.Framer, string, bool) { + return nil, nil, "", false +} func (s *wrappedSerializer) EncoderForVersion(serializer runtime.Serializer, gv unversioned.GroupVersion) runtime.Encoder { return versioning.NewCodec(s.serializer, s.serializer, s.scheme, s.scheme, s.scheme, runtime.ObjectTyperToTyper(s.scheme), []unversioned.GroupVersion{gv}, nil)