From 34749117364be27ac7c6d5826043687210dfed34 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Mon, 4 Apr 2016 23:07:43 -0400 Subject: [PATCH] Implement a streaming serializer for watch Changeover watch to use streaming serialization. Properly version the watch objects. Implement simple framing for JSON and Protobuf (but not YAML). --- .../testdata/apis/testgroup/v1/register.go | 2 + .../go2idl/go-to-protobuf/protobuf/cmd.go | 1 + examples/apiserver/apiserver.go | 5 +- pkg/api/register.go | 3 + pkg/api/serialization_proto_test.go | 6 +- pkg/api/serialization_test.go | 26 +- pkg/api/v1/register.go | 4 + pkg/apis/autoscaling/v1/register.go | 2 + pkg/apis/batch/v1/register.go | 2 + pkg/apis/extensions/v1beta1/register.go | 3 + pkg/apis/metrics/v1alpha1/register.go | 3 + pkg/apiserver/api_installer.go | 32 ++- pkg/apiserver/apiserver.go | 9 +- pkg/apiserver/apiserver_test.go | 27 +- pkg/apiserver/resthandler.go | 5 +- pkg/apiserver/watch.go | 264 ++++++++++++------ pkg/apiserver/watch_test.go | 65 ++++- pkg/genericapiserver/genericapiserver.go | 3 + pkg/genericapiserver/genericapiserver_test.go | 2 + pkg/master/master.go | 41 +-- pkg/registry/thirdpartyresourcedata/codec.go | 3 +- pkg/runtime/serializer/json/json.go | 29 ++ pkg/runtime/serializer/protobuf/protobuf.go | 21 ++ .../serializer/versioning/versioning.go | 19 ++ pkg/util/wsstream/conn.go | 4 +- pkg/util/wsstream/stream.go | 2 +- pkg/util/wsstream/stream_test.go | 4 +- pkg/watch/versioned/register.go | 84 ++++++ pkg/watch/versioned/types.go | 37 +++ 29 files changed, 562 insertions(+), 146 deletions(-) create mode 100644 pkg/watch/versioned/register.go create mode 100644 pkg/watch/versioned/types.go diff --git a/cmd/libs/go2idl/client-gen/testdata/apis/testgroup/v1/register.go b/cmd/libs/go2idl/client-gen/testdata/apis/testgroup/v1/register.go index 9f98cf4c234..a540d357b19 100644 --- a/cmd/libs/go2idl/client-gen/testdata/apis/testgroup/v1/register.go +++ b/cmd/libs/go2idl/client-gen/testdata/apis/testgroup/v1/register.go @@ -20,6 +20,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/runtime" + versionedwatch "k8s.io/kubernetes/pkg/watch/versioned" ) var SchemeGroupVersion = unversioned.GroupVersion{Group: "testgroup", Version: "v1"} @@ -41,6 +42,7 @@ func addKnownTypes(scheme *runtime.Scheme) { &v1.DeleteOptions{}, &unversioned.Status{}, &v1.ExportOptions{}) + versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion) } func (obj *TestType) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta } diff --git a/cmd/libs/go2idl/go-to-protobuf/protobuf/cmd.go b/cmd/libs/go2idl/go-to-protobuf/protobuf/cmd.go index 938224e3f89..47e26d7d683 100644 --- a/cmd/libs/go2idl/go-to-protobuf/protobuf/cmd.go +++ b/cmd/libs/go2idl/go-to-protobuf/protobuf/cmd.go @@ -63,6 +63,7 @@ func New() *Generator { `+k8s.io/kubernetes/pkg/util/intstr`, `+k8s.io/kubernetes/pkg/api/resource`, `+k8s.io/kubernetes/pkg/runtime`, + `+k8s.io/kubernetes/pkg/watch/versioned`, `k8s.io/kubernetes/pkg/api/unversioned`, `k8s.io/kubernetes/pkg/api/v1`, `k8s.io/kubernetes/pkg/apis/extensions/v1beta1`, diff --git a/examples/apiserver/apiserver.go b/examples/apiserver/apiserver.go index 7c1b22666f2..79394fff79d 100644 --- a/examples/apiserver/apiserver.go +++ b/examples/apiserver/apiserver.go @@ -86,8 +86,9 @@ func Run() error { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ groupVersion.Version: restStorageMap, }, - Scheme: api.Scheme, - NegotiatedSerializer: api.Codecs, + Scheme: api.Scheme, + NegotiatedSerializer: api.Codecs, + NegotiatedStreamSerializer: api.StreamCodecs, } if err := s.InstallAPIGroups([]genericapiserver.APIGroupInfo{apiGroupInfo}); err != nil { return fmt.Errorf("Error in installing API: %v", err) diff --git a/pkg/api/register.go b/pkg/api/register.go index 13b2ef7ba57..5edc2d98a26 100644 --- a/pkg/api/register.go +++ b/pkg/api/register.go @@ -29,6 +29,9 @@ var Scheme = runtime.NewScheme() // Codecs provides access to encoding and decoding for the scheme var Codecs = serializer.NewCodecFactory(Scheme) +// StreamCodecs provides access to streaming encoding and decoding for the scheme +var StreamCodecs = serializer.NewStreamingCodecFactory(Scheme) + // GroupName is the group name use in this package const GroupName = "" diff --git a/pkg/api/serialization_proto_test.go b/pkg/api/serialization_proto_test.go index ec21848dcc0..a22e9796c07 100644 --- a/pkg/api/serialization_proto_test.go +++ b/pkg/api/serialization_proto_test.go @@ -38,7 +38,7 @@ import ( func init() { codecsToTest = append(codecsToTest, func(version unversioned.GroupVersion, item runtime.Object) (runtime.Codec, error) { - s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme)) + s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), "application/arbitrary.content.type") return api.Codecs.CodecForVersions(s, testapi.ExternalGroupVersions(), nil), nil }) } @@ -65,7 +65,7 @@ func TestProtobufRoundTrip(t *testing.T) { func BenchmarkEncodeCodecProtobuf(b *testing.B) { items := benchmarkItems() width := len(items) - s := protobuf.NewSerializer(nil, nil) + s := protobuf.NewSerializer(nil, nil, "application/arbitrary.content.type") b.ResetTimer() for i := 0; i < b.N; i++ { if _, err := runtime.Encode(s, &items[i%width]); err != nil { @@ -86,7 +86,7 @@ func BenchmarkEncodeCodecFromInternalProtobuf(b *testing.B) { b.Fatal(err) } } - s := protobuf.NewSerializer(nil, nil) + s := protobuf.NewSerializer(nil, nil, "application/arbitrary.content.type") codec := api.Codecs.EncoderForVersion(s, v1.SchemeGroupVersion) b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/pkg/api/serialization_test.go b/pkg/api/serialization_test.go index ff4cbd32489..974dd1e1b04 100644 --- a/pkg/api/serialization_test.go +++ b/pkg/api/serialization_test.go @@ -17,12 +17,15 @@ limitations under the License. package api_test import ( + "encoding/hex" "encoding/json" "math/rand" "reflect" + "strings" "testing" "github.com/davecgh/go-spew/spew" + proto "github.com/golang/protobuf/proto" flag "github.com/spf13/pflag" "github.com/ugorji/go/codec" @@ -58,6 +61,15 @@ func fuzzInternalObject(t *testing.T, forVersion unversioned.GroupVersion, item return item } +func dataAsString(data []byte) string { + dataString := string(data) + if !strings.HasPrefix(dataString, "{") { + dataString = "\n" + hex.Dump(data) + proto.NewBuffer(make([]byte, 0, 1024)).DebugPrint("decoded object", data) + } + return dataString +} + func roundTrip(t *testing.T, codec runtime.Codec, item runtime.Object) { printer := spew.ConfigState{DisableMethods: true} @@ -70,11 +82,12 @@ func roundTrip(t *testing.T, codec runtime.Codec, item runtime.Object) { obj2, err := runtime.Decode(codec, data) if err != nil { - t.Errorf("0: %v: %v\nCodec: %v\nData: %s\nSource: %#v", name, err, codec, string(data), printer.Sprintf("%#v", item)) + t.Errorf("0: %v: %v\nCodec: %v\nData: %s\nSource: %#v", name, err, codec, dataAsString(data), printer.Sprintf("%#v", item)) + panic("failed") return } if !api.Semantic.DeepEqual(item, obj2) { - t.Errorf("\n1: %v: diff: %v\nCodec: %v\nSource:\n\n%#v\n\nEncoded:\n\n%s\n\nFinal:\n\n%#v", name, diff.ObjectGoPrintDiff(item, obj2), codec, printer.Sprintf("%#v", item), string(data), printer.Sprintf("%#v", obj2)) + t.Errorf("\n1: %v: diff: %v\nCodec: %v\nSource:\n\n%#v\n\nEncoded:\n\n%s\n\nFinal:\n\n%#v", name, diff.ObjectGoPrintDiff(item, obj2), codec, printer.Sprintf("%#v", item), dataAsString(data), printer.Sprintf("%#v", obj2)) return } @@ -135,7 +148,14 @@ func TestList(t *testing.T) { roundTripSame(t, testapi.Default, item) } -var nonRoundTrippableTypes = sets.NewString("ExportOptions") +var nonRoundTrippableTypes = sets.NewString( + "ExportOptions", + // WatchEvent does not include kind and version and can only be deserialized + // implicitly (if the caller expects the specific object). The watch call defines + // the schema by content type, rather than via kind/version included in each + // object. + "WatchEvent", +) var nonInternalRoundTrippableTypes = sets.NewString("List", "ListOptions", "ExportOptions") var nonRoundTrippableTypesByVersion = map[string][]string{} diff --git a/pkg/api/v1/register.go b/pkg/api/v1/register.go index 760836e37a4..1a8342c63ac 100644 --- a/pkg/api/v1/register.go +++ b/pkg/api/v1/register.go @@ -19,6 +19,7 @@ package v1 import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" + versionedwatch "k8s.io/kubernetes/pkg/watch/versioned" ) // GroupName is the group name use in this package @@ -87,6 +88,9 @@ func addKnownTypes(scheme *runtime.Scheme) { // Add common types scheme.AddKnownTypes(SchemeGroupVersion, &unversioned.Status{}) + + // Add the watch version that applies + versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion) } func (obj *Pod) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta } diff --git a/pkg/apis/autoscaling/v1/register.go b/pkg/apis/autoscaling/v1/register.go index 5af7611c57f..9a7e369c621 100644 --- a/pkg/apis/autoscaling/v1/register.go +++ b/pkg/apis/autoscaling/v1/register.go @@ -20,6 +20,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/runtime" + versionedwatch "k8s.io/kubernetes/pkg/watch/versioned" ) // GroupName is the group name use in this package @@ -42,6 +43,7 @@ func addKnownTypes(scheme *runtime.Scheme) { &Scale{}, &v1.ListOptions{}, ) + versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion) } func (obj *HorizontalPodAutoscaler) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta } diff --git a/pkg/apis/batch/v1/register.go b/pkg/apis/batch/v1/register.go index 49f7376a339..a8c5e484c17 100644 --- a/pkg/apis/batch/v1/register.go +++ b/pkg/apis/batch/v1/register.go @@ -20,6 +20,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/runtime" + versionedwatch "k8s.io/kubernetes/pkg/watch/versioned" ) // GroupName is the group name use in this package @@ -41,6 +42,7 @@ func addKnownTypes(scheme *runtime.Scheme) { &JobList{}, &v1.ListOptions{}, ) + versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion) } func (obj *Job) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta } diff --git a/pkg/apis/extensions/v1beta1/register.go b/pkg/apis/extensions/v1beta1/register.go index 026a9f6810c..ee662c46314 100644 --- a/pkg/apis/extensions/v1beta1/register.go +++ b/pkg/apis/extensions/v1beta1/register.go @@ -20,6 +20,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/runtime" + versionedwatch "k8s.io/kubernetes/pkg/watch/versioned" ) // GroupName is the group name use in this package @@ -61,6 +62,8 @@ func addKnownTypes(scheme *runtime.Scheme) { &PodSecurityPolicy{}, &PodSecurityPolicyList{}, ) + // Add the watch version that applies + versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion) } func (obj *Deployment) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta } diff --git a/pkg/apis/metrics/v1alpha1/register.go b/pkg/apis/metrics/v1alpha1/register.go index c943d54687d..4af5dbfeaea 100644 --- a/pkg/apis/metrics/v1alpha1/register.go +++ b/pkg/apis/metrics/v1alpha1/register.go @@ -20,6 +20,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/runtime" + versionedwatch "k8s.io/kubernetes/pkg/watch/versioned" ) // GroupName is the group name use in this package @@ -40,6 +41,8 @@ func addKnownTypes(scheme *runtime.Scheme) { &RawPod{}, &v1.DeleteOptions{}, ) + // Add the watch version that applies + versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion) } func (obj *RawNode) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta } diff --git a/pkg/apiserver/api_installer.go b/pkg/apiserver/api_installer.go index cb020ff48c1..248e1f8bbb6 100644 --- a/pkg/apiserver/api_installer.go +++ b/pkg/apiserver/api_installer.go @@ -34,7 +34,6 @@ import ( "k8s.io/kubernetes/pkg/apiserver/metrics" "k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/runtime" - watchjson "k8s.io/kubernetes/pkg/watch/json" "github.com/emicklei/go-restful" ) @@ -284,6 +283,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag isGetter = true } + var versionedWatchEvent runtime.Object + if isWatcher { + versionedWatchEvent, err = a.group.Creater.New(a.group.GroupVersion.WithKind("WatchEvent")) + if err != nil { + return nil, err + } + } + var ( connectOptions runtime.Object versionedConnectOptions runtime.Object @@ -450,11 +457,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag // test/integration/auth_test.go is currently the most comprehensive status code test reqScope := RequestScope{ - ContextFunc: ctxFn, - Serializer: a.group.Serializer, - ParameterCodec: a.group.ParameterCodec, - Creater: a.group.Creater, - Convertor: a.group.Convertor, + ContextFunc: ctxFn, + Serializer: a.group.Serializer, + StreamSerializer: a.group.StreamSerializer, + ParameterCodec: a.group.ParameterCodec, + Creater: a.group.Creater, + Convertor: a.group.Convertor, // TODO: This seems wrong for cross-group subresources. It makes an assumption that a subresource and its parent are in the same group version. Revisit this. Resource: a.group.GroupVersion.WithResource(resource), @@ -633,9 +641,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("watch"+namespaced+kind+strings.Title(subresource)). - Produces("application/json"). - Returns(http.StatusOK, "OK", watchjson.WatchEvent{}). - Writes(watchjson.WatchEvent{}) + Produces(a.group.StreamSerializer.SupportedMediaTypes()...). + Returns(http.StatusOK, "OK", versionedWatchEvent). + Writes(versionedWatchEvent) if err := addObjectParams(ws, route, versionedListOptions); err != nil { return nil, err } @@ -652,9 +660,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Doc(doc). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Operation("watch"+namespaced+kind+strings.Title(subresource)+"List"). - Produces("application/json"). - Returns(http.StatusOK, "OK", watchjson.WatchEvent{}). - Writes(watchjson.WatchEvent{}) + Produces(a.group.StreamSerializer.SupportedMediaTypes()...). + Returns(http.StatusOK, "OK", versionedWatchEvent). + Writes(versionedWatchEvent) if err := addObjectParams(ws, route, versionedListOptions); err != nil { return nil, err } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index a038acf3738..66b9dc3d11d 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -84,8 +84,13 @@ type APIGroupVersion struct { Mapper meta.RESTMapper - Serializer runtime.NegotiatedSerializer - ParameterCodec runtime.ParameterCodec + // Serializer is used to determine how to convert responses from API methods into bytes to send over + // the wire. + Serializer runtime.NegotiatedSerializer + // StreamSerializer is used for sending a series of objects to the client over a single channel, where + // the underlying channel has no innate framing (such as an io.Writer) + StreamSerializer runtime.NegotiatedSerializer + ParameterCodec runtime.ParameterCodec Typer runtime.ObjectTyper Creater runtime.ObjectCreater diff --git a/pkg/apiserver/apiserver_test.go b/pkg/apiserver/apiserver_test.go index 89d3a1765ad..48e94bed9ca 100644 --- a/pkg/apiserver/apiserver_test.go +++ b/pkg/apiserver/apiserver_test.go @@ -48,6 +48,7 @@ import ( "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/version" "k8s.io/kubernetes/pkg/watch" + "k8s.io/kubernetes/pkg/watch/versioned" "k8s.io/kubernetes/plugin/pkg/admission/admit" "k8s.io/kubernetes/plugin/pkg/admission/deny" @@ -160,6 +161,7 @@ func addTestTypes() { // served in the tests. api.Scheme.AddKnownTypes(testGroup2Version, &SimpleXGSubresource{}) api.Scheme.AddKnownTypes(testInternalGroup2Version, &SimpleXGSubresource{}) + versioned.AddToGroupVersion(api.Scheme, testGroupVersion) } func addNewTestTypes() { @@ -174,8 +176,10 @@ func addNewTestTypes() { } api.Scheme.AddKnownTypes(newGroupVersion, &apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &ListOptions{}, - &api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}) - api.Scheme.AddKnownTypes(newGroupVersion, &v1.Pod{}) + &api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{}, + &v1.Pod{}, + ) + versioned.AddToGroupVersion(api.Scheme, newGroupVersion) } func init() { @@ -283,6 +287,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = grouplessGroupVersion group.OptionsExternalVersion = &grouplessGroupVersion group.Serializer = api.Codecs + group.StreamSerializer = api.StreamCodecs if err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } @@ -295,6 +300,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = testGroupVersion group.OptionsExternalVersion = &testGroupVersion group.Serializer = api.Codecs + group.StreamSerializer = api.StreamCodecs if err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } @@ -307,6 +313,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = newGroupVersion group.OptionsExternalVersion = &newGroupVersion group.Serializer = api.Codecs + group.StreamSerializer = api.StreamCodecs if err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } @@ -2408,8 +2415,9 @@ func TestUpdateREST(t *testing.T) { GroupVersion: newGroupVersion, OptionsExternalVersion: &newGroupVersion, - Serializer: api.Codecs, - ParameterCodec: api.ParameterCodec, + Serializer: api.Codecs, + StreamSerializer: api.StreamCodecs, + ParameterCodec: api.ParameterCodec, } } @@ -2492,8 +2500,9 @@ func TestParentResourceIsRequired(t *testing.T) { GroupVersion: newGroupVersion, OptionsExternalVersion: &newGroupVersion, - Serializer: api.Codecs, - ParameterCodec: api.ParameterCodec, + Serializer: api.Codecs, + StreamSerializer: api.StreamCodecs, + ParameterCodec: api.ParameterCodec, } container := restful.NewContainer() if err := group.InstallREST(container); err == nil { @@ -2523,8 +2532,9 @@ func TestParentResourceIsRequired(t *testing.T) { GroupVersion: newGroupVersion, OptionsExternalVersion: &newGroupVersion, - Serializer: api.Codecs, - ParameterCodec: api.ParameterCodec, + Serializer: api.Codecs, + StreamSerializer: api.StreamCodecs, + ParameterCodec: api.ParameterCodec, } container = restful.NewContainer() if err := group.InstallREST(container); err != nil { @@ -3236,6 +3246,7 @@ func TestXGSubresource(t *testing.T) { GroupVersion: testGroupVersion, OptionsExternalVersion: &testGroupVersion, Serializer: api.Codecs, + StreamSerializer: api.StreamCodecs, SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{ "simple/subsimple": testGroup2Version.WithKind("SimpleXGSubresource"), diff --git a/pkg/apiserver/resthandler.go b/pkg/apiserver/resthandler.go index 6f615b3f1c2..a292f1520b7 100644 --- a/pkg/apiserver/resthandler.go +++ b/pkg/apiserver/resthandler.go @@ -70,8 +70,11 @@ type ScopeNamer interface { type RequestScope struct { Namer ScopeNamer ContextFunc - Serializer runtime.NegotiatedSerializer + + Serializer runtime.NegotiatedSerializer + StreamSerializer runtime.NegotiatedSerializer runtime.ParameterCodec + Creater runtime.ObjectCreater Convertor runtime.ObjectConvertor diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 086ebfe3d83..e316d7a18fa 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -17,32 +17,27 @@ limitations under the License. package apiserver import ( + "bytes" + "fmt" "net/http" "reflect" - "regexp" - "strings" "time" + "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/httplog" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" + utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wsstream" "k8s.io/kubernetes/pkg/watch" - watchjson "k8s.io/kubernetes/pkg/watch/json" + "k8s.io/kubernetes/pkg/watch/versioned" "github.com/emicklei/go-restful" - "github.com/golang/glog" "golang.org/x/net/websocket" ) -var ( - connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)") - - // nothing will ever be sent down this channel - neverExitWatch <-chan time.Time = make(chan time.Time) -) - -func isWebsocketRequest(req *http.Request) bool { - return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket" -} +// nothing will ever be sent down this channel +var neverExitWatch <-chan time.Time = make(chan time.Time) // timeoutFactory abstracts watch timeout logic for testing type timeoutFactory interface { @@ -64,119 +59,218 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) { return t.C, t.Stop } +type textEncodable interface { + // EncodesAsText should return true if objects should be transmitted as a WebSocket Text + // frame (otherwise, they will be sent as a Binary frame). + EncodesAsText() bool +} + // serveWatch handles serving requests to the server +// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled. func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) { - s, mediaType, err := negotiateOutputSerializer(req.Request, scope.Serializer) + // negotiate for the stream serializer + serializer, mediaType, err := negotiateOutputSerializer(req.Request, scope.StreamSerializer) if err != nil { scope.err(err, res.ResponseWriter, req.Request) return } - // TODO: replace with typed serialization - if mediaType != "application/json" { - writeRawJSON(http.StatusNotAcceptable, (errNotAcceptable{[]string{"application/json"}}).Status(), res.ResponseWriter) + encoder := scope.StreamSerializer.EncoderForVersion(serializer, scope.Kind.GroupVersion()) + + useTextFraming := false + if encodable, ok := encoder.(textEncodable); ok && encodable.EncodesAsText() { + useTextFraming = true + } + + // find the embedded serializer matching the media type + embeddedSerializer, ok := scope.Serializer.SerializerForMediaType(mediaType, nil) + if !ok { + scope.err(fmt.Errorf("no serializer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request) return } - encoder := scope.Serializer.EncoderForVersion(s, scope.Kind.GroupVersion()) - watchServer := &WatchServer{watcher, encoder, func(obj runtime.Object) { - if err := setSelfLink(obj, req, scope.Namer); err != nil { - glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err) - } - }, &realTimeoutFactory{timeout}} - if isWebsocketRequest(req.Request) { - websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(res.ResponseWriter), req.Request) - } else { - watchServer.ServeHTTP(res.ResponseWriter, req.Request) + embeddedEncoder := scope.Serializer.EncoderForVersion(embeddedSerializer, scope.Kind.GroupVersion()) + + server := &WatchServer{ + watching: watcher, + scope: scope, + + useTextFraming: useTextFraming, + mediaType: mediaType, + encoder: encoder, + embeddedEncoder: embeddedEncoder, + fixup: func(obj runtime.Object) { + if err := setSelfLink(obj, req, scope.Namer); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err)) + } + }, + + t: &realTimeoutFactory{timeout}, } + + server.ServeHTTP(res.ResponseWriter, req.Request) } // WatchServer serves a watch.Interface over a websocket or vanilla HTTP. type WatchServer struct { watching watch.Interface - encoder runtime.Encoder - fixup func(runtime.Object) - t timeoutFactory + scope RequestScope + + // true if websocket messages should use text framing (as opposed to binary framing) + useTextFraming bool + // the media type this watch is being served with + mediaType string + // used to encode the watch stream event itself + encoder runtime.Encoder + // used to encode the nested object in the watch stream + embeddedEncoder runtime.Encoder + fixup func(runtime.Object) + + t timeoutFactory } -// HandleWS implements a websocket handler. -func (w *WatchServer) HandleWS(ws *websocket.Conn) { - defer ws.Close() - done := make(chan struct{}) - go func() { - var unused interface{} - // Expect this to block until the connection is closed. Client should not - // send anything. - websocket.JSON.Receive(ws, &unused) - close(done) - }() - for { - select { - case <-done: - w.watching.Stop() - return - case event, ok := <-w.watching.ResultChan(): - if !ok { - // End of results. - return - } - w.fixup(event.Object) - obj, err := watchjson.Object(w.encoder, &event) - if err != nil { - // Client disconnect. - w.watching.Stop() - return - } - if err := websocket.JSON.Send(ws, obj); err != nil { - // Client disconnect. - w.watching.Stop() - return - } - } - } -} - -// ServeHTTP serves a series of JSON encoded events via straight HTTP with -// Transfer-Encoding: chunked. -func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { - loggedW := httplog.LogOf(req, w) +// Serve serves a series of encoded events via HTTP with Transfer-Encoding: chunked +// or over a websocket connection. +func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { w = httplog.Unlogged(w) - timeoutCh, cleanup := self.t.TimeoutCh() - defer cleanup() - defer self.watching.Stop() + + if wsstream.IsWebSocketRequest(req) { + w.Header().Set("Content-Type", s.mediaType) + websocket.Handler(s.HandleWS).ServeHTTP(w, req) + return + } cn, ok := w.(http.CloseNotifier) if !ok { - loggedW.Addf("unable to get CloseNotifier: %#v", w) - http.NotFound(w, req) + err := fmt.Errorf("unable to start watch - can't get http.CloseNotifier: %#v", w) + utilruntime.HandleError(err) + s.scope.err(errors.NewInternalError(err), w, req) return } flusher, ok := w.(http.Flusher) if !ok { - loggedW.Addf("unable to get Flusher: %#v", w) - http.NotFound(w, req) + err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w) + utilruntime.HandleError(err) + s.scope.err(errors.NewInternalError(err), w, req) return } + + // get a framed encoder + f, ok := s.encoder.(streaming.Framer) + if !ok { + // programmer error + err := fmt.Errorf("no streaming support is available for media type %q", s.mediaType) + utilruntime.HandleError(err) + s.scope.err(errors.NewBadRequest(err.Error()), w, req) + return + } + framer := f.NewFrameWriter(w) + if framer == nil { + // programmer error + err := fmt.Errorf("no stream framing support is available for media type %q", s.mediaType) + utilruntime.HandleError(err) + s.scope.err(errors.NewBadRequest(err.Error()), w, req) + return + } + e := streaming.NewEncoder(framer, s.encoder) + + // ensure the connection times out + timeoutCh, cleanup := s.t.TimeoutCh() + defer cleanup() + defer s.watching.Stop() + + // begin the stream + w.Header().Set("Content-Type", s.mediaType) w.Header().Set("Transfer-Encoding", "chunked") w.WriteHeader(http.StatusOK) flusher.Flush() - // TODO: use arbitrary serialization on watch - encoder := watchjson.NewEncoder(w, self.encoder) + + buf := &bytes.Buffer{} for { select { case <-cn.CloseNotify(): return case <-timeoutCh: return - case event, ok := <-self.watching.ResultChan(): + case event, ok := <-s.watching.ResultChan(): if !ok { // End of results. return } - self.fixup(event.Object) - if err := encoder.Encode(&event); err != nil { - // Client disconnect. + obj := event.Object + s.fixup(obj) + if err := s.embeddedEncoder.EncodeToStream(obj, buf); err != nil { + // unexpected error + utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err)) + return + } + event.Object = &runtime.Unknown{ + Raw: buf.Bytes(), + // ContentType is not required here because we are defaulting to the serializer + // type + } + if err := e.Encode((*versioned.InternalEvent)(&event)); err != nil { + utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err)) + // client disconnect. return } flusher.Flush() + + buf.Reset() + } + } +} + +// HandleWS implements a websocket handler. +func (s *WatchServer) HandleWS(ws *websocket.Conn) { + defer ws.Close() + done := make(chan struct{}) + go wsstream.IgnoreReceives(ws, 0) + buf := &bytes.Buffer{} + streamBuf := &bytes.Buffer{} + for { + select { + case <-done: + s.watching.Stop() + return + case event, ok := <-s.watching.ResultChan(): + if !ok { + // End of results. + return + } + obj := event.Object + s.fixup(obj) + if err := s.embeddedEncoder.EncodeToStream(obj, buf); err != nil { + // unexpected error + utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err)) + return + } + event.Object = &runtime.Unknown{ + Raw: buf.Bytes(), + // ContentType is not required here because we are defaulting to the serializer + // type + } + // the internal event will be versioned by the encoder + internalEvent := versioned.InternalEvent(event) + if err := s.encoder.EncodeToStream(&internalEvent, streamBuf); err != nil { + // encoding error + utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err)) + s.watching.Stop() + return + } + if s.useTextFraming { + if err := websocket.Message.Send(ws, streamBuf.String()); err != nil { + // Client disconnect. + s.watching.Stop() + return + } + } else { + if err := websocket.Message.Send(ws, streamBuf.Bytes()); err != nil { + // Client disconnect. + s.watching.Stop() + return + } + } + buf.Reset() + streamBuf.Reset() } } } diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index ab4854b5f5c..0faf1f8d268 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -72,7 +72,7 @@ func TestWatchWebsocket(t *testing.T) { ws, err := websocket.Dial(dest.String(), "", "http://localhost") if err != nil { - t.Errorf("unexpected error: %v", err) + t.Fatalf("unexpected error: %v", err) } try := func(action watch.EventType, object runtime.Object) { @@ -89,7 +89,7 @@ func TestWatchWebsocket(t *testing.T) { } gotObj, err := runtime.Decode(codec, got.Object) if err != nil { - t.Fatalf("Decode error: %v", err) + t.Fatalf("Decode error: %v\n%v", err, got) } if _, err := api.GetReference(gotObj); err != nil { t.Errorf("Unable to construct reference: %v", err) @@ -381,10 +381,14 @@ func TestWatchHTTPTimeout(t *testing.T) { // Setup a new watchserver watchServer := &WatchServer{ - watcher, - newCodec, - func(obj runtime.Object) {}, - &fakeTimeoutFactory{timeoutCh, done}, + watching: watcher, + + mediaType: "testcase/json", + encoder: newCodec, + embeddedEncoder: newCodec, + + fixup: func(obj runtime.Object) {}, + t: &fakeTimeoutFactory{timeoutCh, done}, } s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { @@ -526,3 +530,52 @@ func BenchmarkWatchWebsocket(b *testing.B) { wg.Wait() b.StopTimer() } + +// BenchmarkWatchProtobuf measures the cost of serving a watch. +func BenchmarkWatchProtobuf(b *testing.B) { + items := benchmarkItems() + + simpleStorage := &SimpleRESTStorage{} + handler := handle(map[string]rest.Storage{"simples": simpleStorage}) + server := httptest.NewServer(handler) + defer server.Close() + client := http.Client{} + + dest, _ := url.Parse(server.URL) + dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/watch/simples" + dest.RawQuery = "" + + request, err := http.NewRequest("GET", dest.String(), nil) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + request.Header.Set("Accept", "application/vnd.kubernetes.protobuf") + response, err := client.Do(request) + if err != nil { + b.Fatalf("unexpected error: %v", err) + } + if response.StatusCode != http.StatusOK { + body, _ := ioutil.ReadAll(response.Body) + b.Fatalf("Unexpected response %#v\n%s", response, body) + } + + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer response.Body.Close() + if _, err := io.Copy(ioutil.Discard, response.Body); err != nil { + b.Fatal(err) + } + wg.Done() + }() + + actions := []watch.EventType{watch.Added, watch.Modified, watch.Deleted} + + b.ResetTimer() + for i := 0; i < b.N; i++ { + simpleStorage.fakeWatch.Action(actions[i%len(actions)], &items[i%len(items)]) + } + simpleStorage.fakeWatch.Stop() + wg.Wait() + b.StopTimer() +} diff --git a/pkg/genericapiserver/genericapiserver.go b/pkg/genericapiserver/genericapiserver.go index 5fd6cf3af89..f6006a037a9 100644 --- a/pkg/genericapiserver/genericapiserver.go +++ b/pkg/genericapiserver/genericapiserver.go @@ -184,6 +184,8 @@ type APIGroupInfo struct { Scheme *runtime.Scheme // NegotiatedSerializer controls how this group encodes and decodes data NegotiatedSerializer runtime.NegotiatedSerializer + // NegotiatedStreamSerializer controls how streaming responses are encoded and decoded. + NegotiatedStreamSerializer runtime.NegotiatedSerializer // ParameterCodec performs conversions for query parameters passed to API calls ParameterCodec runtime.ParameterCodec @@ -864,6 +866,7 @@ func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV version.Storage = storage version.ParameterCodec = apiGroupInfo.ParameterCodec version.Serializer = apiGroupInfo.NegotiatedSerializer + version.StreamSerializer = apiGroupInfo.NegotiatedStreamSerializer version.Creater = apiGroupInfo.Scheme version.Convertor = apiGroupInfo.Scheme version.Typer = apiGroupInfo.Scheme diff --git a/pkg/genericapiserver/genericapiserver_test.go b/pkg/genericapiserver/genericapiserver_test.go index a452552a823..545a67ff853 100644 --- a/pkg/genericapiserver/genericapiserver_test.go +++ b/pkg/genericapiserver/genericapiserver_test.go @@ -130,6 +130,7 @@ func TestInstallAPIGroups(t *testing.T) { IsLegacyGroup: true, ParameterCodec: api.ParameterCodec, NegotiatedSerializer: api.Codecs, + NegotiatedStreamSerializer: api.StreamCodecs, }, { // extensions group version @@ -138,6 +139,7 @@ func TestInstallAPIGroups(t *testing.T) { OptionsExternalVersion: &apiGroupMeta.GroupVersion, ParameterCodec: api.ParameterCodec, NegotiatedSerializer: api.Codecs, + NegotiatedStreamSerializer: api.StreamCodecs, }, } s.InstallAPIGroups(apiGroupsInfo) diff --git a/pkg/master/master.go b/pkg/master/master.go index cfcf263d183..faf2b0eb624 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -193,10 +193,11 @@ func (m *Master) InstallAPIs(c *Config) { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ "v1": m.v1ResourcesStorage, }, - IsLegacyGroup: true, - Scheme: api.Scheme, - ParameterCodec: api.ParameterCodec, - NegotiatedSerializer: api.Codecs, + IsLegacyGroup: true, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, + NegotiatedStreamSerializer: api.StreamCodecs, } if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) { apiGroupInfo.SubresourceGroupVersionKind = map[string]unversioned.GroupVersionKind{ @@ -252,10 +253,11 @@ func (m *Master) InstallAPIs(c *Config) { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ "v1beta1": extensionResources, }, - OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, - Scheme: api.Scheme, - ParameterCodec: api.ParameterCodec, - NegotiatedSerializer: api.Codecs, + OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, + NegotiatedStreamSerializer: api.StreamCodecs, } apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) @@ -284,10 +286,11 @@ func (m *Master) InstallAPIs(c *Config) { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ "v1": autoscalingResources, }, - OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, - Scheme: api.Scheme, - ParameterCodec: api.ParameterCodec, - NegotiatedSerializer: api.Codecs, + OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, + NegotiatedStreamSerializer: api.StreamCodecs, } apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) @@ -316,10 +319,11 @@ func (m *Master) InstallAPIs(c *Config) { VersionedResourcesStorageMap: map[string]map[string]rest.Storage{ "v1": batchResources, }, - OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, - Scheme: api.Scheme, - ParameterCodec: api.ParameterCodec, - NegotiatedSerializer: api.Codecs, + OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion, + Scheme: api.Scheme, + ParameterCodec: api.ParameterCodec, + NegotiatedSerializer: api.Codecs, + NegotiatedStreamSerializer: api.StreamCodecs, } apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo) @@ -660,8 +664,9 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV Storage: storage, OptionsExternalVersion: &optionsExternalVersion, - Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion), - ParameterCodec: thirdpartyresourcedata.NewThirdPartyParameterCodec(api.ParameterCodec), + Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion), + StreamSerializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.StreamCodecs, kind, externalVersion, internalVersion), + ParameterCodec: thirdpartyresourcedata.NewThirdPartyParameterCodec(api.ParameterCodec), Context: m.RequestContextMapper, diff --git a/pkg/registry/thirdpartyresourcedata/codec.go b/pkg/registry/thirdpartyresourcedata/codec.go index a0e47c560a0..8a315ca14c1 100644 --- a/pkg/registry/thirdpartyresourcedata/codec.go +++ b/pkg/registry/thirdpartyresourcedata/codec.go @@ -432,7 +432,8 @@ func (t *thirdPartyResourceDataCreator) New(kind unversioned.GroupVersionKind) ( return nil, fmt.Errorf("unknown kind %v", kind) } return &extensions.ThirdPartyResourceDataList{}, nil - case "ListOptions": + // TODO: this list needs to be formalized higher in the chain + case "ListOptions", "WatchEvent": if apiutil.GetGroupVersion(t.group, t.version) == kind.GroupVersion().String() { // Translate third party group to external group. gvk := registered.EnabledVersionsForGroup(api.GroupName)[0].WithKind(kind.Kind) diff --git a/pkg/runtime/serializer/json/json.go b/pkg/runtime/serializer/json/json.go index 4b65ce6e2f7..bb19b1ee784 100644 --- a/pkg/runtime/serializer/json/json.go +++ b/pkg/runtime/serializer/json/json.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/framer" utilyaml "k8s.io/kubernetes/pkg/util/yaml" ) @@ -192,3 +193,31 @@ func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) { } return ok, nil } + +// NewFrameWriter implements stream framing for this serializer +func (s *Serializer) NewFrameWriter(w io.Writer) io.Writer { + if s.yaml { + // TODO: needs document framing + return nil + } + // we can write JSON objects directly to the writer, because they are self-framing + return w +} + +// NewFrameReader implements stream framing for this serializer +func (s *Serializer) NewFrameReader(r io.Reader) io.Reader { + if s.yaml { + // TODO: needs document framing + return nil + } + // we need to extract the JSON chunks of data to pass to Decode() + return framer.NewJSONFramedReader(r) +} + +// EncodesAsText returns true because both JSON and YAML are considered textual representations +// of data. This is used to determine whether the serialized object should be transmitted over +// a WebSocket Text or Binary frame. This must remain true for legacy compatibility with v1.1 +// watch over websocket implementations. +func (s *Serializer) EncodesAsText() bool { + return true +} diff --git a/pkg/runtime/serializer/protobuf/protobuf.go b/pkg/runtime/serializer/protobuf/protobuf.go index dd89ad36ceb..aa39c338d21 100644 --- a/pkg/runtime/serializer/protobuf/protobuf.go +++ b/pkg/runtime/serializer/protobuf/protobuf.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/framer" ) var ( @@ -241,6 +242,16 @@ func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) { return bytes.Equal(s.prefix, prefix), nil } +// NewFrameWriter implements stream framing for this serializer +func (s *Serializer) NewFrameWriter(w io.Writer) io.Writer { + return framer.NewLengthDelimitedFrameWriter(w) +} + +// NewFrameReader implements stream framing for this serializer +func (s *Serializer) NewFrameReader(r io.Reader) io.Reader { + return framer.NewLengthDelimitedFrameReader(r) +} + // copyKindDefaults defaults dst to the value in src if dst does not have a value set. func copyKindDefaults(dst, src *unversioned.GroupVersionKind) { if src == nil { @@ -425,3 +436,13 @@ func (s *RawSerializer) EncodeToStream(obj runtime.Object, w io.Writer, override func (s *RawSerializer) RecognizesData(peek io.Reader) (bool, error) { return false, nil } + +// NewFrameWriter implements stream framing for this serializer +func (s *RawSerializer) NewFrameWriter(w io.Writer) io.Writer { + return framer.NewLengthDelimitedFrameWriter(w) +} + +// NewFrameReader implements stream framing for this serializer +func (s *RawSerializer) NewFrameReader(r io.Reader) io.Reader { + return framer.NewLengthDelimitedFrameReader(r) +} diff --git a/pkg/runtime/serializer/versioning/versioning.go b/pkg/runtime/serializer/versioning/versioning.go index 36c66fa9f57..eeafa2a3245 100644 --- a/pkg/runtime/serializer/versioning/versioning.go +++ b/pkg/runtime/serializer/versioning/versioning.go @@ -22,6 +22,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/runtime/serializer/streaming" ) // EnableCrossGroupDecoding modifies the given decoder in place, if it is a codec @@ -277,6 +278,24 @@ func (c *codec) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unv return c.encoder.EncodeToStream(obj, w, overrides...) } +// NewFrameWriter calls into the nested encoder to expose its framing +func (c *codec) NewFrameWriter(w io.Writer) io.Writer { + f, ok := c.encoder.(streaming.Framer) + if !ok { + return nil + } + return f.NewFrameWriter(w) +} + +// NewFrameReader calls into the nested decoder to expose its framing +func (c *codec) NewFrameReader(r io.Reader) io.Reader { + f, ok := c.decoder.(streaming.Framer) + if !ok { + return nil + } + return f.NewFrameReader(r) +} + // promoteOrPrependGroupVersion finds the group version in the provided group versions that has the same group as target. // If the group is found the returned array will have that group version in the first position - if the group is not found // the returned array will have target in the first position. diff --git a/pkg/util/wsstream/conn.go b/pkg/util/wsstream/conn.go index 3934a5b5b79..84e61967168 100644 --- a/pkg/util/wsstream/conn.go +++ b/pkg/util/wsstream/conn.go @@ -89,9 +89,9 @@ func IsWebSocketRequest(req *http.Request) bool { return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket" } -// ignoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the +// IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the // read and write deadlines are pushed every time a new message is received. -func ignoreReceives(ws *websocket.Conn, timeout time.Duration) { +func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) { defer runtime.HandleCrash() var data []byte for { diff --git a/pkg/util/wsstream/stream.go b/pkg/util/wsstream/stream.go index 846d6c3a1be..034c1216e02 100644 --- a/pkg/util/wsstream/stream.go +++ b/pkg/util/wsstream/stream.go @@ -82,7 +82,7 @@ func (r *Reader) handle(ws *websocket.Conn) { encode := len(ws.Config().Protocol) > 0 && ws.Config().Protocol[0] == base64BinaryWebSocketProtocol defer close(r.err) defer ws.Close() - go ignoreReceives(ws, r.timeout) + go IgnoreReceives(ws, r.timeout) r.err <- messageCopy(ws, r.r, encode, r.ping, r.timeout) } diff --git a/pkg/util/wsstream/stream_test.go b/pkg/util/wsstream/stream_test.go index 6bf1f4b1394..365a39ae470 100644 --- a/pkg/util/wsstream/stream_test.go +++ b/pkg/util/wsstream/stream_test.go @@ -166,7 +166,7 @@ func readWebSocket(r *Reader, t *testing.T, fn func(*websocket.Conn), protocols s, addr := newServer(func(ws *websocket.Conn) { cfg := ws.Config() cfg.Protocol = protocols - go ignoreReceives(ws, 0) + go IgnoreReceives(ws, 0) go func() { err := <-r.err errCh <- err @@ -198,7 +198,7 @@ func expectWebSocketFrames(r *Reader, t *testing.T, fn func(*websocket.Conn), fr s, addr := newServer(func(ws *websocket.Conn) { cfg := ws.Config() cfg.Protocol = protocols - go ignoreReceives(ws, 0) + go IgnoreReceives(ws, 0) go func() { err := <-r.err errCh <- err diff --git a/pkg/watch/versioned/register.go b/pkg/watch/versioned/register.go new file mode 100644 index 00000000000..feaea3b6b79 --- /dev/null +++ b/pkg/watch/versioned/register.go @@ -0,0 +1,84 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package versioned + +import ( + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/conversion" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/watch" +) + +// WatchEventKind is name reserved for serializing watch events. +const WatchEventKind = "WatchEvent" + +// AddToGroupVersion registers the watch external and internal kinds with the scheme, and ensures the proper +// conversions are in place. +func AddToGroupVersion(scheme *runtime.Scheme, groupVersion unversioned.GroupVersion) { + scheme.AddKnownTypeWithName(groupVersion.WithKind(WatchEventKind), &Event{}) + scheme.AddKnownTypeWithName( + unversioned.GroupVersion{Group: groupVersion.Group, Version: runtime.APIVersionInternal}.WithKind(WatchEventKind), + &InternalEvent{}, + ) + scheme.AddConversionFuncs( + Convert_versioned_Event_to_watch_Event, + Convert_versioned_InternalEvent_to_versioned_Event, + Convert_watch_Event_to_versioned_Event, + Convert_versioned_Event_to_versioned_InternalEvent, + ) +} + +func Convert_watch_Event_to_versioned_Event(in *watch.Event, out *Event, s conversion.Scope) error { + out.Type = string(in.Type) + switch t := in.Object.(type) { + case *runtime.Unknown: + // TODO: handle other fields on Unknown and detect type + out.Object.Raw = t.Raw + case nil: + default: + out.Object.Object = in.Object + } + return nil +} + +func Convert_versioned_InternalEvent_to_versioned_Event(in *InternalEvent, out *Event, s conversion.Scope) error { + return Convert_watch_Event_to_versioned_Event((*watch.Event)(in), out, s) +} + +func Convert_versioned_Event_to_watch_Event(in *Event, out *watch.Event, s conversion.Scope) error { + out.Type = watch.EventType(in.Type) + if in.Object.Object != nil { + out.Object = in.Object.Object + } else if in.Object.Raw != nil { + // TODO: handle other fields on Unknown and detect type + out.Object = &runtime.Unknown{ + Raw: in.Object.Raw, + ContentType: runtime.ContentTypeJSON, + } + } + return nil +} + +func Convert_versioned_Event_to_versioned_InternalEvent(in *Event, out *InternalEvent, s conversion.Scope) error { + return Convert_versioned_Event_to_watch_Event(in, (*watch.Event)(out), s) +} + +// InternalEvent makes watch.Event versioned +type InternalEvent watch.Event + +func (e *InternalEvent) GetObjectKind() unversioned.ObjectKind { return unversioned.EmptyObjectKind } +func (e *Event) GetObjectKind() unversioned.ObjectKind { return unversioned.EmptyObjectKind } diff --git a/pkg/watch/versioned/types.go b/pkg/watch/versioned/types.go new file mode 100644 index 00000000000..ba608aeab02 --- /dev/null +++ b/pkg/watch/versioned/types.go @@ -0,0 +1,37 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package versioned contains the versioned types for watch. This is the first +// serialization version unless otherwise noted. +package versioned + +import ( + "k8s.io/kubernetes/pkg/runtime" +) + +// Event represents a single event to a watched resource. +// +// +protobuf=true +type Event struct { + Type string `json:"type" protobuf:"bytes,1,opt,name=type"` + + // Object is: + // * If Type is Added or Modified: the new state of the object. + // * If Type is Deleted: the state of the object immediately before deletion. + // * If Type is Error: *api.Status is recommended; other types may make sense + // depending on context. + Object runtime.RawExtension `json:"object" protobuf:"bytes,2,opt,name=object"` +}