mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-22 18:16:52 +00:00
Implement a streaming serializer for watch
Changeover watch to use streaming serialization. Properly version the watch objects. Implement simple framing for JSON and Protobuf (but not YAML).
This commit is contained in:
parent
87146c4255
commit
3474911736
@ -20,6 +20,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
|
||||
)
|
||||
|
||||
var SchemeGroupVersion = unversioned.GroupVersion{Group: "testgroup", Version: "v1"}
|
||||
@ -41,6 +42,7 @@ func addKnownTypes(scheme *runtime.Scheme) {
|
||||
&v1.DeleteOptions{},
|
||||
&unversioned.Status{},
|
||||
&v1.ExportOptions{})
|
||||
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
|
||||
}
|
||||
|
||||
func (obj *TestType) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }
|
||||
|
@ -63,6 +63,7 @@ func New() *Generator {
|
||||
`+k8s.io/kubernetes/pkg/util/intstr`,
|
||||
`+k8s.io/kubernetes/pkg/api/resource`,
|
||||
`+k8s.io/kubernetes/pkg/runtime`,
|
||||
`+k8s.io/kubernetes/pkg/watch/versioned`,
|
||||
`k8s.io/kubernetes/pkg/api/unversioned`,
|
||||
`k8s.io/kubernetes/pkg/api/v1`,
|
||||
`k8s.io/kubernetes/pkg/apis/extensions/v1beta1`,
|
||||
|
@ -86,8 +86,9 @@ func Run() error {
|
||||
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
|
||||
groupVersion.Version: restStorageMap,
|
||||
},
|
||||
Scheme: api.Scheme,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
Scheme: api.Scheme,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
NegotiatedStreamSerializer: api.StreamCodecs,
|
||||
}
|
||||
if err := s.InstallAPIGroups([]genericapiserver.APIGroupInfo{apiGroupInfo}); err != nil {
|
||||
return fmt.Errorf("Error in installing API: %v", err)
|
||||
|
@ -29,6 +29,9 @@ var Scheme = runtime.NewScheme()
|
||||
// Codecs provides access to encoding and decoding for the scheme
|
||||
var Codecs = serializer.NewCodecFactory(Scheme)
|
||||
|
||||
// StreamCodecs provides access to streaming encoding and decoding for the scheme
|
||||
var StreamCodecs = serializer.NewStreamingCodecFactory(Scheme)
|
||||
|
||||
// GroupName is the group name use in this package
|
||||
const GroupName = ""
|
||||
|
||||
|
@ -38,7 +38,7 @@ import (
|
||||
|
||||
func init() {
|
||||
codecsToTest = append(codecsToTest, func(version unversioned.GroupVersion, item runtime.Object) (runtime.Codec, error) {
|
||||
s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme))
|
||||
s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), "application/arbitrary.content.type")
|
||||
return api.Codecs.CodecForVersions(s, testapi.ExternalGroupVersions(), nil), nil
|
||||
})
|
||||
}
|
||||
@ -65,7 +65,7 @@ func TestProtobufRoundTrip(t *testing.T) {
|
||||
func BenchmarkEncodeCodecProtobuf(b *testing.B) {
|
||||
items := benchmarkItems()
|
||||
width := len(items)
|
||||
s := protobuf.NewSerializer(nil, nil)
|
||||
s := protobuf.NewSerializer(nil, nil, "application/arbitrary.content.type")
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if _, err := runtime.Encode(s, &items[i%width]); err != nil {
|
||||
@ -86,7 +86,7 @@ func BenchmarkEncodeCodecFromInternalProtobuf(b *testing.B) {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
s := protobuf.NewSerializer(nil, nil)
|
||||
s := protobuf.NewSerializer(nil, nil, "application/arbitrary.content.type")
|
||||
codec := api.Codecs.EncoderForVersion(s, v1.SchemeGroupVersion)
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
@ -17,12 +17,15 @@ limitations under the License.
|
||||
package api_test
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"math/rand"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
proto "github.com/golang/protobuf/proto"
|
||||
flag "github.com/spf13/pflag"
|
||||
"github.com/ugorji/go/codec"
|
||||
|
||||
@ -58,6 +61,15 @@ func fuzzInternalObject(t *testing.T, forVersion unversioned.GroupVersion, item
|
||||
return item
|
||||
}
|
||||
|
||||
func dataAsString(data []byte) string {
|
||||
dataString := string(data)
|
||||
if !strings.HasPrefix(dataString, "{") {
|
||||
dataString = "\n" + hex.Dump(data)
|
||||
proto.NewBuffer(make([]byte, 0, 1024)).DebugPrint("decoded object", data)
|
||||
}
|
||||
return dataString
|
||||
}
|
||||
|
||||
func roundTrip(t *testing.T, codec runtime.Codec, item runtime.Object) {
|
||||
printer := spew.ConfigState{DisableMethods: true}
|
||||
|
||||
@ -70,11 +82,12 @@ func roundTrip(t *testing.T, codec runtime.Codec, item runtime.Object) {
|
||||
|
||||
obj2, err := runtime.Decode(codec, data)
|
||||
if err != nil {
|
||||
t.Errorf("0: %v: %v\nCodec: %v\nData: %s\nSource: %#v", name, err, codec, string(data), printer.Sprintf("%#v", item))
|
||||
t.Errorf("0: %v: %v\nCodec: %v\nData: %s\nSource: %#v", name, err, codec, dataAsString(data), printer.Sprintf("%#v", item))
|
||||
panic("failed")
|
||||
return
|
||||
}
|
||||
if !api.Semantic.DeepEqual(item, obj2) {
|
||||
t.Errorf("\n1: %v: diff: %v\nCodec: %v\nSource:\n\n%#v\n\nEncoded:\n\n%s\n\nFinal:\n\n%#v", name, diff.ObjectGoPrintDiff(item, obj2), codec, printer.Sprintf("%#v", item), string(data), printer.Sprintf("%#v", obj2))
|
||||
t.Errorf("\n1: %v: diff: %v\nCodec: %v\nSource:\n\n%#v\n\nEncoded:\n\n%s\n\nFinal:\n\n%#v", name, diff.ObjectGoPrintDiff(item, obj2), codec, printer.Sprintf("%#v", item), dataAsString(data), printer.Sprintf("%#v", obj2))
|
||||
return
|
||||
}
|
||||
|
||||
@ -135,7 +148,14 @@ func TestList(t *testing.T) {
|
||||
roundTripSame(t, testapi.Default, item)
|
||||
}
|
||||
|
||||
var nonRoundTrippableTypes = sets.NewString("ExportOptions")
|
||||
var nonRoundTrippableTypes = sets.NewString(
|
||||
"ExportOptions",
|
||||
// WatchEvent does not include kind and version and can only be deserialized
|
||||
// implicitly (if the caller expects the specific object). The watch call defines
|
||||
// the schema by content type, rather than via kind/version included in each
|
||||
// object.
|
||||
"WatchEvent",
|
||||
)
|
||||
|
||||
var nonInternalRoundTrippableTypes = sets.NewString("List", "ListOptions", "ExportOptions")
|
||||
var nonRoundTrippableTypesByVersion = map[string][]string{}
|
||||
|
@ -19,6 +19,7 @@ package v1
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
|
||||
)
|
||||
|
||||
// GroupName is the group name use in this package
|
||||
@ -87,6 +88,9 @@ func addKnownTypes(scheme *runtime.Scheme) {
|
||||
|
||||
// Add common types
|
||||
scheme.AddKnownTypes(SchemeGroupVersion, &unversioned.Status{})
|
||||
|
||||
// Add the watch version that applies
|
||||
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
|
||||
}
|
||||
|
||||
func (obj *Pod) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
|
||||
)
|
||||
|
||||
// GroupName is the group name use in this package
|
||||
@ -42,6 +43,7 @@ func addKnownTypes(scheme *runtime.Scheme) {
|
||||
&Scale{},
|
||||
&v1.ListOptions{},
|
||||
)
|
||||
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
|
||||
}
|
||||
|
||||
func (obj *HorizontalPodAutoscaler) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
|
||||
)
|
||||
|
||||
// GroupName is the group name use in this package
|
||||
@ -41,6 +42,7 @@ func addKnownTypes(scheme *runtime.Scheme) {
|
||||
&JobList{},
|
||||
&v1.ListOptions{},
|
||||
)
|
||||
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
|
||||
}
|
||||
|
||||
func (obj *Job) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
|
||||
)
|
||||
|
||||
// GroupName is the group name use in this package
|
||||
@ -61,6 +62,8 @@ func addKnownTypes(scheme *runtime.Scheme) {
|
||||
&PodSecurityPolicy{},
|
||||
&PodSecurityPolicyList{},
|
||||
)
|
||||
// Add the watch version that applies
|
||||
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
|
||||
}
|
||||
|
||||
func (obj *Deployment) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
|
||||
)
|
||||
|
||||
// GroupName is the group name use in this package
|
||||
@ -40,6 +41,8 @@ func addKnownTypes(scheme *runtime.Scheme) {
|
||||
&RawPod{},
|
||||
&v1.DeleteOptions{},
|
||||
)
|
||||
// Add the watch version that applies
|
||||
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
|
||||
}
|
||||
|
||||
func (obj *RawNode) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }
|
||||
|
@ -34,7 +34,6 @@ import (
|
||||
"k8s.io/kubernetes/pkg/apiserver/metrics"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
watchjson "k8s.io/kubernetes/pkg/watch/json"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
)
|
||||
@ -284,6 +283,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
isGetter = true
|
||||
}
|
||||
|
||||
var versionedWatchEvent runtime.Object
|
||||
if isWatcher {
|
||||
versionedWatchEvent, err = a.group.Creater.New(a.group.GroupVersion.WithKind("WatchEvent"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
var (
|
||||
connectOptions runtime.Object
|
||||
versionedConnectOptions runtime.Object
|
||||
@ -450,11 +457,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
// test/integration/auth_test.go is currently the most comprehensive status code test
|
||||
|
||||
reqScope := RequestScope{
|
||||
ContextFunc: ctxFn,
|
||||
Serializer: a.group.Serializer,
|
||||
ParameterCodec: a.group.ParameterCodec,
|
||||
Creater: a.group.Creater,
|
||||
Convertor: a.group.Convertor,
|
||||
ContextFunc: ctxFn,
|
||||
Serializer: a.group.Serializer,
|
||||
StreamSerializer: a.group.StreamSerializer,
|
||||
ParameterCodec: a.group.ParameterCodec,
|
||||
Creater: a.group.Creater,
|
||||
Convertor: a.group.Convertor,
|
||||
|
||||
// TODO: This seems wrong for cross-group subresources. It makes an assumption that a subresource and its parent are in the same group version. Revisit this.
|
||||
Resource: a.group.GroupVersion.WithResource(resource),
|
||||
@ -633,9 +641,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
Doc(doc).
|
||||
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
|
||||
Operation("watch"+namespaced+kind+strings.Title(subresource)).
|
||||
Produces("application/json").
|
||||
Returns(http.StatusOK, "OK", watchjson.WatchEvent{}).
|
||||
Writes(watchjson.WatchEvent{})
|
||||
Produces(a.group.StreamSerializer.SupportedMediaTypes()...).
|
||||
Returns(http.StatusOK, "OK", versionedWatchEvent).
|
||||
Writes(versionedWatchEvent)
|
||||
if err := addObjectParams(ws, route, versionedListOptions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -652,9 +660,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
|
||||
Doc(doc).
|
||||
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
|
||||
Operation("watch"+namespaced+kind+strings.Title(subresource)+"List").
|
||||
Produces("application/json").
|
||||
Returns(http.StatusOK, "OK", watchjson.WatchEvent{}).
|
||||
Writes(watchjson.WatchEvent{})
|
||||
Produces(a.group.StreamSerializer.SupportedMediaTypes()...).
|
||||
Returns(http.StatusOK, "OK", versionedWatchEvent).
|
||||
Writes(versionedWatchEvent)
|
||||
if err := addObjectParams(ws, route, versionedListOptions); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -84,8 +84,13 @@ type APIGroupVersion struct {
|
||||
|
||||
Mapper meta.RESTMapper
|
||||
|
||||
Serializer runtime.NegotiatedSerializer
|
||||
ParameterCodec runtime.ParameterCodec
|
||||
// Serializer is used to determine how to convert responses from API methods into bytes to send over
|
||||
// the wire.
|
||||
Serializer runtime.NegotiatedSerializer
|
||||
// StreamSerializer is used for sending a series of objects to the client over a single channel, where
|
||||
// the underlying channel has no innate framing (such as an io.Writer)
|
||||
StreamSerializer runtime.NegotiatedSerializer
|
||||
ParameterCodec runtime.ParameterCodec
|
||||
|
||||
Typer runtime.ObjectTyper
|
||||
Creater runtime.ObjectCreater
|
||||
|
@ -48,6 +48,7 @@ import (
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/version"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
"k8s.io/kubernetes/pkg/watch/versioned"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/admit"
|
||||
"k8s.io/kubernetes/plugin/pkg/admission/deny"
|
||||
|
||||
@ -160,6 +161,7 @@ func addTestTypes() {
|
||||
// served in the tests.
|
||||
api.Scheme.AddKnownTypes(testGroup2Version, &SimpleXGSubresource{})
|
||||
api.Scheme.AddKnownTypes(testInternalGroup2Version, &SimpleXGSubresource{})
|
||||
versioned.AddToGroupVersion(api.Scheme, testGroupVersion)
|
||||
}
|
||||
|
||||
func addNewTestTypes() {
|
||||
@ -174,8 +176,10 @@ func addNewTestTypes() {
|
||||
}
|
||||
api.Scheme.AddKnownTypes(newGroupVersion,
|
||||
&apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &ListOptions{},
|
||||
&api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{})
|
||||
api.Scheme.AddKnownTypes(newGroupVersion, &v1.Pod{})
|
||||
&api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{},
|
||||
&v1.Pod{},
|
||||
)
|
||||
versioned.AddToGroupVersion(api.Scheme, newGroupVersion)
|
||||
}
|
||||
|
||||
func init() {
|
||||
@ -283,6 +287,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
|
||||
group.GroupVersion = grouplessGroupVersion
|
||||
group.OptionsExternalVersion = &grouplessGroupVersion
|
||||
group.Serializer = api.Codecs
|
||||
group.StreamSerializer = api.StreamCodecs
|
||||
if err := (&group).InstallREST(container); err != nil {
|
||||
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
|
||||
}
|
||||
@ -295,6 +300,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
|
||||
group.GroupVersion = testGroupVersion
|
||||
group.OptionsExternalVersion = &testGroupVersion
|
||||
group.Serializer = api.Codecs
|
||||
group.StreamSerializer = api.StreamCodecs
|
||||
if err := (&group).InstallREST(container); err != nil {
|
||||
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
|
||||
}
|
||||
@ -307,6 +313,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
|
||||
group.GroupVersion = newGroupVersion
|
||||
group.OptionsExternalVersion = &newGroupVersion
|
||||
group.Serializer = api.Codecs
|
||||
group.StreamSerializer = api.StreamCodecs
|
||||
if err := (&group).InstallREST(container); err != nil {
|
||||
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
|
||||
}
|
||||
@ -2408,8 +2415,9 @@ func TestUpdateREST(t *testing.T) {
|
||||
GroupVersion: newGroupVersion,
|
||||
OptionsExternalVersion: &newGroupVersion,
|
||||
|
||||
Serializer: api.Codecs,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
Serializer: api.Codecs,
|
||||
StreamSerializer: api.StreamCodecs,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
}
|
||||
}
|
||||
|
||||
@ -2492,8 +2500,9 @@ func TestParentResourceIsRequired(t *testing.T) {
|
||||
GroupVersion: newGroupVersion,
|
||||
OptionsExternalVersion: &newGroupVersion,
|
||||
|
||||
Serializer: api.Codecs,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
Serializer: api.Codecs,
|
||||
StreamSerializer: api.StreamCodecs,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
}
|
||||
container := restful.NewContainer()
|
||||
if err := group.InstallREST(container); err == nil {
|
||||
@ -2523,8 +2532,9 @@ func TestParentResourceIsRequired(t *testing.T) {
|
||||
GroupVersion: newGroupVersion,
|
||||
OptionsExternalVersion: &newGroupVersion,
|
||||
|
||||
Serializer: api.Codecs,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
Serializer: api.Codecs,
|
||||
StreamSerializer: api.StreamCodecs,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
}
|
||||
container = restful.NewContainer()
|
||||
if err := group.InstallREST(container); err != nil {
|
||||
@ -3236,6 +3246,7 @@ func TestXGSubresource(t *testing.T) {
|
||||
GroupVersion: testGroupVersion,
|
||||
OptionsExternalVersion: &testGroupVersion,
|
||||
Serializer: api.Codecs,
|
||||
StreamSerializer: api.StreamCodecs,
|
||||
|
||||
SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{
|
||||
"simple/subsimple": testGroup2Version.WithKind("SimpleXGSubresource"),
|
||||
|
@ -70,8 +70,11 @@ type ScopeNamer interface {
|
||||
type RequestScope struct {
|
||||
Namer ScopeNamer
|
||||
ContextFunc
|
||||
Serializer runtime.NegotiatedSerializer
|
||||
|
||||
Serializer runtime.NegotiatedSerializer
|
||||
StreamSerializer runtime.NegotiatedSerializer
|
||||
runtime.ParameterCodec
|
||||
|
||||
Creater runtime.ObjectCreater
|
||||
Convertor runtime.ObjectConvertor
|
||||
|
||||
|
@ -17,32 +17,27 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/errors"
|
||||
"k8s.io/kubernetes/pkg/httplog"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
|
||||
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/wsstream"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
watchjson "k8s.io/kubernetes/pkg/watch/json"
|
||||
"k8s.io/kubernetes/pkg/watch/versioned"
|
||||
|
||||
"github.com/emicklei/go-restful"
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/websocket"
|
||||
)
|
||||
|
||||
var (
|
||||
connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
|
||||
|
||||
// nothing will ever be sent down this channel
|
||||
neverExitWatch <-chan time.Time = make(chan time.Time)
|
||||
)
|
||||
|
||||
func isWebsocketRequest(req *http.Request) bool {
|
||||
return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
|
||||
}
|
||||
// nothing will ever be sent down this channel
|
||||
var neverExitWatch <-chan time.Time = make(chan time.Time)
|
||||
|
||||
// timeoutFactory abstracts watch timeout logic for testing
|
||||
type timeoutFactory interface {
|
||||
@ -64,119 +59,218 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
|
||||
return t.C, t.Stop
|
||||
}
|
||||
|
||||
type textEncodable interface {
|
||||
// EncodesAsText should return true if objects should be transmitted as a WebSocket Text
|
||||
// frame (otherwise, they will be sent as a Binary frame).
|
||||
EncodesAsText() bool
|
||||
}
|
||||
|
||||
// serveWatch handles serving requests to the server
|
||||
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
|
||||
func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) {
|
||||
s, mediaType, err := negotiateOutputSerializer(req.Request, scope.Serializer)
|
||||
// negotiate for the stream serializer
|
||||
serializer, mediaType, err := negotiateOutputSerializer(req.Request, scope.StreamSerializer)
|
||||
if err != nil {
|
||||
scope.err(err, res.ResponseWriter, req.Request)
|
||||
return
|
||||
}
|
||||
// TODO: replace with typed serialization
|
||||
if mediaType != "application/json" {
|
||||
writeRawJSON(http.StatusNotAcceptable, (errNotAcceptable{[]string{"application/json"}}).Status(), res.ResponseWriter)
|
||||
encoder := scope.StreamSerializer.EncoderForVersion(serializer, scope.Kind.GroupVersion())
|
||||
|
||||
useTextFraming := false
|
||||
if encodable, ok := encoder.(textEncodable); ok && encodable.EncodesAsText() {
|
||||
useTextFraming = true
|
||||
}
|
||||
|
||||
// find the embedded serializer matching the media type
|
||||
embeddedSerializer, ok := scope.Serializer.SerializerForMediaType(mediaType, nil)
|
||||
if !ok {
|
||||
scope.err(fmt.Errorf("no serializer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request)
|
||||
return
|
||||
}
|
||||
encoder := scope.Serializer.EncoderForVersion(s, scope.Kind.GroupVersion())
|
||||
watchServer := &WatchServer{watcher, encoder, func(obj runtime.Object) {
|
||||
if err := setSelfLink(obj, req, scope.Namer); err != nil {
|
||||
glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err)
|
||||
}
|
||||
}, &realTimeoutFactory{timeout}}
|
||||
if isWebsocketRequest(req.Request) {
|
||||
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(res.ResponseWriter), req.Request)
|
||||
} else {
|
||||
watchServer.ServeHTTP(res.ResponseWriter, req.Request)
|
||||
embeddedEncoder := scope.Serializer.EncoderForVersion(embeddedSerializer, scope.Kind.GroupVersion())
|
||||
|
||||
server := &WatchServer{
|
||||
watching: watcher,
|
||||
scope: scope,
|
||||
|
||||
useTextFraming: useTextFraming,
|
||||
mediaType: mediaType,
|
||||
encoder: encoder,
|
||||
embeddedEncoder: embeddedEncoder,
|
||||
fixup: func(obj runtime.Object) {
|
||||
if err := setSelfLink(obj, req, scope.Namer); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err))
|
||||
}
|
||||
},
|
||||
|
||||
t: &realTimeoutFactory{timeout},
|
||||
}
|
||||
|
||||
server.ServeHTTP(res.ResponseWriter, req.Request)
|
||||
}
|
||||
|
||||
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
|
||||
type WatchServer struct {
|
||||
watching watch.Interface
|
||||
encoder runtime.Encoder
|
||||
fixup func(runtime.Object)
|
||||
t timeoutFactory
|
||||
scope RequestScope
|
||||
|
||||
// true if websocket messages should use text framing (as opposed to binary framing)
|
||||
useTextFraming bool
|
||||
// the media type this watch is being served with
|
||||
mediaType string
|
||||
// used to encode the watch stream event itself
|
||||
encoder runtime.Encoder
|
||||
// used to encode the nested object in the watch stream
|
||||
embeddedEncoder runtime.Encoder
|
||||
fixup func(runtime.Object)
|
||||
|
||||
t timeoutFactory
|
||||
}
|
||||
|
||||
// HandleWS implements a websocket handler.
|
||||
func (w *WatchServer) HandleWS(ws *websocket.Conn) {
|
||||
defer ws.Close()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
var unused interface{}
|
||||
// Expect this to block until the connection is closed. Client should not
|
||||
// send anything.
|
||||
websocket.JSON.Receive(ws, &unused)
|
||||
close(done)
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
w.watching.Stop()
|
||||
return
|
||||
case event, ok := <-w.watching.ResultChan():
|
||||
if !ok {
|
||||
// End of results.
|
||||
return
|
||||
}
|
||||
w.fixup(event.Object)
|
||||
obj, err := watchjson.Object(w.encoder, &event)
|
||||
if err != nil {
|
||||
// Client disconnect.
|
||||
w.watching.Stop()
|
||||
return
|
||||
}
|
||||
if err := websocket.JSON.Send(ws, obj); err != nil {
|
||||
// Client disconnect.
|
||||
w.watching.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ServeHTTP serves a series of JSON encoded events via straight HTTP with
|
||||
// Transfer-Encoding: chunked.
|
||||
func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
loggedW := httplog.LogOf(req, w)
|
||||
// Serve serves a series of encoded events via HTTP with Transfer-Encoding: chunked
|
||||
// or over a websocket connection.
|
||||
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
w = httplog.Unlogged(w)
|
||||
timeoutCh, cleanup := self.t.TimeoutCh()
|
||||
defer cleanup()
|
||||
defer self.watching.Stop()
|
||||
|
||||
if wsstream.IsWebSocketRequest(req) {
|
||||
w.Header().Set("Content-Type", s.mediaType)
|
||||
websocket.Handler(s.HandleWS).ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
cn, ok := w.(http.CloseNotifier)
|
||||
if !ok {
|
||||
loggedW.Addf("unable to get CloseNotifier: %#v", w)
|
||||
http.NotFound(w, req)
|
||||
err := fmt.Errorf("unable to start watch - can't get http.CloseNotifier: %#v", w)
|
||||
utilruntime.HandleError(err)
|
||||
s.scope.err(errors.NewInternalError(err), w, req)
|
||||
return
|
||||
}
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
loggedW.Addf("unable to get Flusher: %#v", w)
|
||||
http.NotFound(w, req)
|
||||
err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
|
||||
utilruntime.HandleError(err)
|
||||
s.scope.err(errors.NewInternalError(err), w, req)
|
||||
return
|
||||
}
|
||||
|
||||
// get a framed encoder
|
||||
f, ok := s.encoder.(streaming.Framer)
|
||||
if !ok {
|
||||
// programmer error
|
||||
err := fmt.Errorf("no streaming support is available for media type %q", s.mediaType)
|
||||
utilruntime.HandleError(err)
|
||||
s.scope.err(errors.NewBadRequest(err.Error()), w, req)
|
||||
return
|
||||
}
|
||||
framer := f.NewFrameWriter(w)
|
||||
if framer == nil {
|
||||
// programmer error
|
||||
err := fmt.Errorf("no stream framing support is available for media type %q", s.mediaType)
|
||||
utilruntime.HandleError(err)
|
||||
s.scope.err(errors.NewBadRequest(err.Error()), w, req)
|
||||
return
|
||||
}
|
||||
e := streaming.NewEncoder(framer, s.encoder)
|
||||
|
||||
// ensure the connection times out
|
||||
timeoutCh, cleanup := s.t.TimeoutCh()
|
||||
defer cleanup()
|
||||
defer s.watching.Stop()
|
||||
|
||||
// begin the stream
|
||||
w.Header().Set("Content-Type", s.mediaType)
|
||||
w.Header().Set("Transfer-Encoding", "chunked")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
flusher.Flush()
|
||||
// TODO: use arbitrary serialization on watch
|
||||
encoder := watchjson.NewEncoder(w, self.encoder)
|
||||
|
||||
buf := &bytes.Buffer{}
|
||||
for {
|
||||
select {
|
||||
case <-cn.CloseNotify():
|
||||
return
|
||||
case <-timeoutCh:
|
||||
return
|
||||
case event, ok := <-self.watching.ResultChan():
|
||||
case event, ok := <-s.watching.ResultChan():
|
||||
if !ok {
|
||||
// End of results.
|
||||
return
|
||||
}
|
||||
self.fixup(event.Object)
|
||||
if err := encoder.Encode(&event); err != nil {
|
||||
// Client disconnect.
|
||||
obj := event.Object
|
||||
s.fixup(obj)
|
||||
if err := s.embeddedEncoder.EncodeToStream(obj, buf); err != nil {
|
||||
// unexpected error
|
||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
|
||||
return
|
||||
}
|
||||
event.Object = &runtime.Unknown{
|
||||
Raw: buf.Bytes(),
|
||||
// ContentType is not required here because we are defaulting to the serializer
|
||||
// type
|
||||
}
|
||||
if err := e.Encode((*versioned.InternalEvent)(&event)); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
|
||||
// client disconnect.
|
||||
return
|
||||
}
|
||||
flusher.Flush()
|
||||
|
||||
buf.Reset()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// HandleWS implements a websocket handler.
|
||||
func (s *WatchServer) HandleWS(ws *websocket.Conn) {
|
||||
defer ws.Close()
|
||||
done := make(chan struct{})
|
||||
go wsstream.IgnoreReceives(ws, 0)
|
||||
buf := &bytes.Buffer{}
|
||||
streamBuf := &bytes.Buffer{}
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
s.watching.Stop()
|
||||
return
|
||||
case event, ok := <-s.watching.ResultChan():
|
||||
if !ok {
|
||||
// End of results.
|
||||
return
|
||||
}
|
||||
obj := event.Object
|
||||
s.fixup(obj)
|
||||
if err := s.embeddedEncoder.EncodeToStream(obj, buf); err != nil {
|
||||
// unexpected error
|
||||
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
|
||||
return
|
||||
}
|
||||
event.Object = &runtime.Unknown{
|
||||
Raw: buf.Bytes(),
|
||||
// ContentType is not required here because we are defaulting to the serializer
|
||||
// type
|
||||
}
|
||||
// the internal event will be versioned by the encoder
|
||||
internalEvent := versioned.InternalEvent(event)
|
||||
if err := s.encoder.EncodeToStream(&internalEvent, streamBuf); err != nil {
|
||||
// encoding error
|
||||
utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err))
|
||||
s.watching.Stop()
|
||||
return
|
||||
}
|
||||
if s.useTextFraming {
|
||||
if err := websocket.Message.Send(ws, streamBuf.String()); err != nil {
|
||||
// Client disconnect.
|
||||
s.watching.Stop()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
if err := websocket.Message.Send(ws, streamBuf.Bytes()); err != nil {
|
||||
// Client disconnect.
|
||||
s.watching.Stop()
|
||||
return
|
||||
}
|
||||
}
|
||||
buf.Reset()
|
||||
streamBuf.Reset()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -72,7 +72,7 @@ func TestWatchWebsocket(t *testing.T) {
|
||||
|
||||
ws, err := websocket.Dial(dest.String(), "", "http://localhost")
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
try := func(action watch.EventType, object runtime.Object) {
|
||||
@ -89,7 +89,7 @@ func TestWatchWebsocket(t *testing.T) {
|
||||
}
|
||||
gotObj, err := runtime.Decode(codec, got.Object)
|
||||
if err != nil {
|
||||
t.Fatalf("Decode error: %v", err)
|
||||
t.Fatalf("Decode error: %v\n%v", err, got)
|
||||
}
|
||||
if _, err := api.GetReference(gotObj); err != nil {
|
||||
t.Errorf("Unable to construct reference: %v", err)
|
||||
@ -381,10 +381,14 @@ func TestWatchHTTPTimeout(t *testing.T) {
|
||||
|
||||
// Setup a new watchserver
|
||||
watchServer := &WatchServer{
|
||||
watcher,
|
||||
newCodec,
|
||||
func(obj runtime.Object) {},
|
||||
&fakeTimeoutFactory{timeoutCh, done},
|
||||
watching: watcher,
|
||||
|
||||
mediaType: "testcase/json",
|
||||
encoder: newCodec,
|
||||
embeddedEncoder: newCodec,
|
||||
|
||||
fixup: func(obj runtime.Object) {},
|
||||
t: &fakeTimeoutFactory{timeoutCh, done},
|
||||
}
|
||||
|
||||
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
@ -526,3 +530,52 @@ func BenchmarkWatchWebsocket(b *testing.B) {
|
||||
wg.Wait()
|
||||
b.StopTimer()
|
||||
}
|
||||
|
||||
// BenchmarkWatchProtobuf measures the cost of serving a watch.
|
||||
func BenchmarkWatchProtobuf(b *testing.B) {
|
||||
items := benchmarkItems()
|
||||
|
||||
simpleStorage := &SimpleRESTStorage{}
|
||||
handler := handle(map[string]rest.Storage{"simples": simpleStorage})
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
client := http.Client{}
|
||||
|
||||
dest, _ := url.Parse(server.URL)
|
||||
dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/watch/simples"
|
||||
dest.RawQuery = ""
|
||||
|
||||
request, err := http.NewRequest("GET", dest.String(), nil)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
request.Header.Set("Accept", "application/vnd.kubernetes.protobuf")
|
||||
response, err := client.Do(request)
|
||||
if err != nil {
|
||||
b.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if response.StatusCode != http.StatusOK {
|
||||
body, _ := ioutil.ReadAll(response.Body)
|
||||
b.Fatalf("Unexpected response %#v\n%s", response, body)
|
||||
}
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer response.Body.Close()
|
||||
if _, err := io.Copy(ioutil.Discard, response.Body); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
|
||||
actions := []watch.EventType{watch.Added, watch.Modified, watch.Deleted}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
simpleStorage.fakeWatch.Action(actions[i%len(actions)], &items[i%len(items)])
|
||||
}
|
||||
simpleStorage.fakeWatch.Stop()
|
||||
wg.Wait()
|
||||
b.StopTimer()
|
||||
}
|
||||
|
@ -184,6 +184,8 @@ type APIGroupInfo struct {
|
||||
Scheme *runtime.Scheme
|
||||
// NegotiatedSerializer controls how this group encodes and decodes data
|
||||
NegotiatedSerializer runtime.NegotiatedSerializer
|
||||
// NegotiatedStreamSerializer controls how streaming responses are encoded and decoded.
|
||||
NegotiatedStreamSerializer runtime.NegotiatedSerializer
|
||||
// ParameterCodec performs conversions for query parameters passed to API calls
|
||||
ParameterCodec runtime.ParameterCodec
|
||||
|
||||
@ -864,6 +866,7 @@ func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
|
||||
version.Storage = storage
|
||||
version.ParameterCodec = apiGroupInfo.ParameterCodec
|
||||
version.Serializer = apiGroupInfo.NegotiatedSerializer
|
||||
version.StreamSerializer = apiGroupInfo.NegotiatedStreamSerializer
|
||||
version.Creater = apiGroupInfo.Scheme
|
||||
version.Convertor = apiGroupInfo.Scheme
|
||||
version.Typer = apiGroupInfo.Scheme
|
||||
|
@ -130,6 +130,7 @@ func TestInstallAPIGroups(t *testing.T) {
|
||||
IsLegacyGroup: true,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
NegotiatedStreamSerializer: api.StreamCodecs,
|
||||
},
|
||||
{
|
||||
// extensions group version
|
||||
@ -138,6 +139,7 @@ func TestInstallAPIGroups(t *testing.T) {
|
||||
OptionsExternalVersion: &apiGroupMeta.GroupVersion,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
NegotiatedStreamSerializer: api.StreamCodecs,
|
||||
},
|
||||
}
|
||||
s.InstallAPIGroups(apiGroupsInfo)
|
||||
|
@ -193,10 +193,11 @@ func (m *Master) InstallAPIs(c *Config) {
|
||||
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
|
||||
"v1": m.v1ResourcesStorage,
|
||||
},
|
||||
IsLegacyGroup: true,
|
||||
Scheme: api.Scheme,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
IsLegacyGroup: true,
|
||||
Scheme: api.Scheme,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
NegotiatedStreamSerializer: api.StreamCodecs,
|
||||
}
|
||||
if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) {
|
||||
apiGroupInfo.SubresourceGroupVersionKind = map[string]unversioned.GroupVersionKind{
|
||||
@ -252,10 +253,11 @@ func (m *Master) InstallAPIs(c *Config) {
|
||||
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
|
||||
"v1beta1": extensionResources,
|
||||
},
|
||||
OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion,
|
||||
Scheme: api.Scheme,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion,
|
||||
Scheme: api.Scheme,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
NegotiatedStreamSerializer: api.StreamCodecs,
|
||||
}
|
||||
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
|
||||
|
||||
@ -284,10 +286,11 @@ func (m *Master) InstallAPIs(c *Config) {
|
||||
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
|
||||
"v1": autoscalingResources,
|
||||
},
|
||||
OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion,
|
||||
Scheme: api.Scheme,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion,
|
||||
Scheme: api.Scheme,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
NegotiatedStreamSerializer: api.StreamCodecs,
|
||||
}
|
||||
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
|
||||
|
||||
@ -316,10 +319,11 @@ func (m *Master) InstallAPIs(c *Config) {
|
||||
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
|
||||
"v1": batchResources,
|
||||
},
|
||||
OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion,
|
||||
Scheme: api.Scheme,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
OptionsExternalVersion: ®istered.GroupOrDie(api.GroupName).GroupVersion,
|
||||
Scheme: api.Scheme,
|
||||
ParameterCodec: api.ParameterCodec,
|
||||
NegotiatedSerializer: api.Codecs,
|
||||
NegotiatedStreamSerializer: api.StreamCodecs,
|
||||
}
|
||||
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
|
||||
|
||||
@ -660,8 +664,9 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
|
||||
Storage: storage,
|
||||
OptionsExternalVersion: &optionsExternalVersion,
|
||||
|
||||
Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion),
|
||||
ParameterCodec: thirdpartyresourcedata.NewThirdPartyParameterCodec(api.ParameterCodec),
|
||||
Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion),
|
||||
StreamSerializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.StreamCodecs, kind, externalVersion, internalVersion),
|
||||
ParameterCodec: thirdpartyresourcedata.NewThirdPartyParameterCodec(api.ParameterCodec),
|
||||
|
||||
Context: m.RequestContextMapper,
|
||||
|
||||
|
@ -432,7 +432,8 @@ func (t *thirdPartyResourceDataCreator) New(kind unversioned.GroupVersionKind) (
|
||||
return nil, fmt.Errorf("unknown kind %v", kind)
|
||||
}
|
||||
return &extensions.ThirdPartyResourceDataList{}, nil
|
||||
case "ListOptions":
|
||||
// TODO: this list needs to be formalized higher in the chain
|
||||
case "ListOptions", "WatchEvent":
|
||||
if apiutil.GetGroupVersion(t.group, t.version) == kind.GroupVersion().String() {
|
||||
// Translate third party group to external group.
|
||||
gvk := registered.EnabledVersionsForGroup(api.GroupName)[0].WithKind(kind.Kind)
|
||||
|
@ -25,6 +25,7 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/framer"
|
||||
utilyaml "k8s.io/kubernetes/pkg/util/yaml"
|
||||
)
|
||||
|
||||
@ -192,3 +193,31 @@ func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) {
|
||||
}
|
||||
return ok, nil
|
||||
}
|
||||
|
||||
// NewFrameWriter implements stream framing for this serializer
|
||||
func (s *Serializer) NewFrameWriter(w io.Writer) io.Writer {
|
||||
if s.yaml {
|
||||
// TODO: needs document framing
|
||||
return nil
|
||||
}
|
||||
// we can write JSON objects directly to the writer, because they are self-framing
|
||||
return w
|
||||
}
|
||||
|
||||
// NewFrameReader implements stream framing for this serializer
|
||||
func (s *Serializer) NewFrameReader(r io.Reader) io.Reader {
|
||||
if s.yaml {
|
||||
// TODO: needs document framing
|
||||
return nil
|
||||
}
|
||||
// we need to extract the JSON chunks of data to pass to Decode()
|
||||
return framer.NewJSONFramedReader(r)
|
||||
}
|
||||
|
||||
// EncodesAsText returns true because both JSON and YAML are considered textual representations
|
||||
// of data. This is used to determine whether the serialized object should be transmitted over
|
||||
// a WebSocket Text or Binary frame. This must remain true for legacy compatibility with v1.1
|
||||
// watch over websocket implementations.
|
||||
func (s *Serializer) EncodesAsText() bool {
|
||||
return true
|
||||
}
|
||||
|
@ -28,6 +28,7 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util/framer"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -241,6 +242,16 @@ func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) {
|
||||
return bytes.Equal(s.prefix, prefix), nil
|
||||
}
|
||||
|
||||
// NewFrameWriter implements stream framing for this serializer
|
||||
func (s *Serializer) NewFrameWriter(w io.Writer) io.Writer {
|
||||
return framer.NewLengthDelimitedFrameWriter(w)
|
||||
}
|
||||
|
||||
// NewFrameReader implements stream framing for this serializer
|
||||
func (s *Serializer) NewFrameReader(r io.Reader) io.Reader {
|
||||
return framer.NewLengthDelimitedFrameReader(r)
|
||||
}
|
||||
|
||||
// copyKindDefaults defaults dst to the value in src if dst does not have a value set.
|
||||
func copyKindDefaults(dst, src *unversioned.GroupVersionKind) {
|
||||
if src == nil {
|
||||
@ -425,3 +436,13 @@ func (s *RawSerializer) EncodeToStream(obj runtime.Object, w io.Writer, override
|
||||
func (s *RawSerializer) RecognizesData(peek io.Reader) (bool, error) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// NewFrameWriter implements stream framing for this serializer
|
||||
func (s *RawSerializer) NewFrameWriter(w io.Writer) io.Writer {
|
||||
return framer.NewLengthDelimitedFrameWriter(w)
|
||||
}
|
||||
|
||||
// NewFrameReader implements stream framing for this serializer
|
||||
func (s *RawSerializer) NewFrameReader(r io.Reader) io.Reader {
|
||||
return framer.NewLengthDelimitedFrameReader(r)
|
||||
}
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
|
||||
)
|
||||
|
||||
// EnableCrossGroupDecoding modifies the given decoder in place, if it is a codec
|
||||
@ -277,6 +278,24 @@ func (c *codec) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unv
|
||||
return c.encoder.EncodeToStream(obj, w, overrides...)
|
||||
}
|
||||
|
||||
// NewFrameWriter calls into the nested encoder to expose its framing
|
||||
func (c *codec) NewFrameWriter(w io.Writer) io.Writer {
|
||||
f, ok := c.encoder.(streaming.Framer)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return f.NewFrameWriter(w)
|
||||
}
|
||||
|
||||
// NewFrameReader calls into the nested decoder to expose its framing
|
||||
func (c *codec) NewFrameReader(r io.Reader) io.Reader {
|
||||
f, ok := c.decoder.(streaming.Framer)
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
return f.NewFrameReader(r)
|
||||
}
|
||||
|
||||
// promoteOrPrependGroupVersion finds the group version in the provided group versions that has the same group as target.
|
||||
// If the group is found the returned array will have that group version in the first position - if the group is not found
|
||||
// the returned array will have target in the first position.
|
||||
|
@ -89,9 +89,9 @@ func IsWebSocketRequest(req *http.Request) bool {
|
||||
return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
|
||||
}
|
||||
|
||||
// ignoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
|
||||
// IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
|
||||
// read and write deadlines are pushed every time a new message is received.
|
||||
func ignoreReceives(ws *websocket.Conn, timeout time.Duration) {
|
||||
func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
|
||||
defer runtime.HandleCrash()
|
||||
var data []byte
|
||||
for {
|
||||
|
@ -82,7 +82,7 @@ func (r *Reader) handle(ws *websocket.Conn) {
|
||||
encode := len(ws.Config().Protocol) > 0 && ws.Config().Protocol[0] == base64BinaryWebSocketProtocol
|
||||
defer close(r.err)
|
||||
defer ws.Close()
|
||||
go ignoreReceives(ws, r.timeout)
|
||||
go IgnoreReceives(ws, r.timeout)
|
||||
r.err <- messageCopy(ws, r.r, encode, r.ping, r.timeout)
|
||||
}
|
||||
|
||||
|
@ -166,7 +166,7 @@ func readWebSocket(r *Reader, t *testing.T, fn func(*websocket.Conn), protocols
|
||||
s, addr := newServer(func(ws *websocket.Conn) {
|
||||
cfg := ws.Config()
|
||||
cfg.Protocol = protocols
|
||||
go ignoreReceives(ws, 0)
|
||||
go IgnoreReceives(ws, 0)
|
||||
go func() {
|
||||
err := <-r.err
|
||||
errCh <- err
|
||||
@ -198,7 +198,7 @@ func expectWebSocketFrames(r *Reader, t *testing.T, fn func(*websocket.Conn), fr
|
||||
s, addr := newServer(func(ws *websocket.Conn) {
|
||||
cfg := ws.Config()
|
||||
cfg.Protocol = protocols
|
||||
go ignoreReceives(ws, 0)
|
||||
go IgnoreReceives(ws, 0)
|
||||
go func() {
|
||||
err := <-r.err
|
||||
errCh <- err
|
||||
|
84
pkg/watch/versioned/register.go
Normal file
84
pkg/watch/versioned/register.go
Normal file
@ -0,0 +1,84 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package versioned
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/conversion"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// WatchEventKind is name reserved for serializing watch events.
|
||||
const WatchEventKind = "WatchEvent"
|
||||
|
||||
// AddToGroupVersion registers the watch external and internal kinds with the scheme, and ensures the proper
|
||||
// conversions are in place.
|
||||
func AddToGroupVersion(scheme *runtime.Scheme, groupVersion unversioned.GroupVersion) {
|
||||
scheme.AddKnownTypeWithName(groupVersion.WithKind(WatchEventKind), &Event{})
|
||||
scheme.AddKnownTypeWithName(
|
||||
unversioned.GroupVersion{Group: groupVersion.Group, Version: runtime.APIVersionInternal}.WithKind(WatchEventKind),
|
||||
&InternalEvent{},
|
||||
)
|
||||
scheme.AddConversionFuncs(
|
||||
Convert_versioned_Event_to_watch_Event,
|
||||
Convert_versioned_InternalEvent_to_versioned_Event,
|
||||
Convert_watch_Event_to_versioned_Event,
|
||||
Convert_versioned_Event_to_versioned_InternalEvent,
|
||||
)
|
||||
}
|
||||
|
||||
func Convert_watch_Event_to_versioned_Event(in *watch.Event, out *Event, s conversion.Scope) error {
|
||||
out.Type = string(in.Type)
|
||||
switch t := in.Object.(type) {
|
||||
case *runtime.Unknown:
|
||||
// TODO: handle other fields on Unknown and detect type
|
||||
out.Object.Raw = t.Raw
|
||||
case nil:
|
||||
default:
|
||||
out.Object.Object = in.Object
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_versioned_InternalEvent_to_versioned_Event(in *InternalEvent, out *Event, s conversion.Scope) error {
|
||||
return Convert_watch_Event_to_versioned_Event((*watch.Event)(in), out, s)
|
||||
}
|
||||
|
||||
func Convert_versioned_Event_to_watch_Event(in *Event, out *watch.Event, s conversion.Scope) error {
|
||||
out.Type = watch.EventType(in.Type)
|
||||
if in.Object.Object != nil {
|
||||
out.Object = in.Object.Object
|
||||
} else if in.Object.Raw != nil {
|
||||
// TODO: handle other fields on Unknown and detect type
|
||||
out.Object = &runtime.Unknown{
|
||||
Raw: in.Object.Raw,
|
||||
ContentType: runtime.ContentTypeJSON,
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Convert_versioned_Event_to_versioned_InternalEvent(in *Event, out *InternalEvent, s conversion.Scope) error {
|
||||
return Convert_versioned_Event_to_watch_Event(in, (*watch.Event)(out), s)
|
||||
}
|
||||
|
||||
// InternalEvent makes watch.Event versioned
|
||||
type InternalEvent watch.Event
|
||||
|
||||
func (e *InternalEvent) GetObjectKind() unversioned.ObjectKind { return unversioned.EmptyObjectKind }
|
||||
func (e *Event) GetObjectKind() unversioned.ObjectKind { return unversioned.EmptyObjectKind }
|
37
pkg/watch/versioned/types.go
Normal file
37
pkg/watch/versioned/types.go
Normal file
@ -0,0 +1,37 @@
|
||||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package versioned contains the versioned types for watch. This is the first
|
||||
// serialization version unless otherwise noted.
|
||||
package versioned
|
||||
|
||||
import (
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
)
|
||||
|
||||
// Event represents a single event to a watched resource.
|
||||
//
|
||||
// +protobuf=true
|
||||
type Event struct {
|
||||
Type string `json:"type" protobuf:"bytes,1,opt,name=type"`
|
||||
|
||||
// Object is:
|
||||
// * If Type is Added or Modified: the new state of the object.
|
||||
// * If Type is Deleted: the state of the object immediately before deletion.
|
||||
// * If Type is Error: *api.Status is recommended; other types may make sense
|
||||
// depending on context.
|
||||
Object runtime.RawExtension `json:"object" protobuf:"bytes,2,opt,name=object"`
|
||||
}
|
Loading…
Reference in New Issue
Block a user