mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 22:46:12 +00:00
Merge pull request #1340 from smarterclayton/cleanup_watch_embedding
Cleanup watch encoding w/ RawExtension
This commit is contained in:
commit
248f49fb7c
@ -23,33 +23,3 @@ import (
|
||||
// Codec is the identity codec for this package - it can only convert itself
|
||||
// to itself.
|
||||
var Codec = runtime.CodecFor(Scheme, "")
|
||||
|
||||
// EmbeddedObject implements a Codec specific version of an
|
||||
// embedded object.
|
||||
type EmbeddedObject struct {
|
||||
runtime.Object
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
func (a *EmbeddedObject) UnmarshalJSON(b []byte) error {
|
||||
obj, err := runtime.CodecUnmarshalJSON(Codec, b)
|
||||
a.Object = obj
|
||||
return err
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (a EmbeddedObject) MarshalJSON() ([]byte, error) {
|
||||
return runtime.CodecMarshalJSON(Codec, a.Object)
|
||||
}
|
||||
|
||||
// SetYAML implements the yaml.Setter interface.
|
||||
func (a *EmbeddedObject) SetYAML(tag string, value interface{}) bool {
|
||||
obj, ok := runtime.CodecSetYAML(Codec, tag, value)
|
||||
a.Object = obj
|
||||
return ok
|
||||
}
|
||||
|
||||
// GetYAML implements the yaml.Getter interface.
|
||||
func (a EmbeddedObject) GetYAML() (tag string, value interface{}) {
|
||||
return runtime.CodecGetYAML(Codec, a.Object)
|
||||
}
|
||||
|
@ -19,7 +19,6 @@ package v1beta1
|
||||
import (
|
||||
newer "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -80,33 +79,3 @@ func init() {
|
||||
)
|
||||
|
||||
}
|
||||
|
||||
// EmbeddedObject implements a Codec specific version of an
|
||||
// embedded object.
|
||||
type EmbeddedObject struct {
|
||||
runtime.Object
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
func (a *EmbeddedObject) UnmarshalJSON(b []byte) error {
|
||||
obj, err := runtime.CodecUnmarshalJSON(Codec, b)
|
||||
a.Object = obj
|
||||
return err
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (a EmbeddedObject) MarshalJSON() ([]byte, error) {
|
||||
return runtime.CodecMarshalJSON(Codec, a.Object)
|
||||
}
|
||||
|
||||
// SetYAML implements the yaml.Setter interface.
|
||||
func (a *EmbeddedObject) SetYAML(tag string, value interface{}) bool {
|
||||
obj, ok := runtime.CodecSetYAML(Codec, tag, value)
|
||||
a.Object = obj
|
||||
return ok
|
||||
}
|
||||
|
||||
// GetYAML implements the yaml.Getter interface.
|
||||
func (a EmbeddedObject) GetYAML() (tag string, value interface{}) {
|
||||
return runtime.CodecGetYAML(Codec, a.Object)
|
||||
}
|
||||
|
@ -19,7 +19,6 @@ package v1beta2
|
||||
import (
|
||||
newer "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/conversion"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -79,33 +78,3 @@ func init() {
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
// EmbeddedObject implements a Codec specific version of an
|
||||
// embedded object.
|
||||
type EmbeddedObject struct {
|
||||
runtime.Object
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
func (a *EmbeddedObject) UnmarshalJSON(b []byte) error {
|
||||
obj, err := runtime.CodecUnmarshalJSON(Codec, b)
|
||||
a.Object = obj
|
||||
return err
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (a EmbeddedObject) MarshalJSON() ([]byte, error) {
|
||||
return runtime.CodecMarshalJSON(Codec, a.Object)
|
||||
}
|
||||
|
||||
// SetYAML implements the yaml.Setter interface.
|
||||
func (a *EmbeddedObject) SetYAML(tag string, value interface{}) bool {
|
||||
obj, ok := runtime.CodecSetYAML(Codec, tag, value)
|
||||
a.Object = obj
|
||||
return ok
|
||||
}
|
||||
|
||||
// GetYAML implements the yaml.Getter interface.
|
||||
func (a EmbeddedObject) GetYAML() (tag string, value interface{}) {
|
||||
return runtime.CodecGetYAML(Codec, a.Object)
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ limitations under the License.
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
@ -25,11 +24,11 @@ import (
|
||||
"strings"
|
||||
|
||||
"code.google.com/p/go.net/websocket"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
|
||||
)
|
||||
|
||||
type WatchHandler struct {
|
||||
@ -120,7 +119,7 @@ func (w *WatchServer) HandleWS(ws *websocket.Conn) {
|
||||
// End of results.
|
||||
return
|
||||
}
|
||||
obj, err := api.NewJSONWatchEvent(w.codec, event)
|
||||
obj, err := watchjson.Object(w.codec, &event)
|
||||
if err != nil {
|
||||
// Client disconnect.
|
||||
w.watching.Stop()
|
||||
@ -158,7 +157,7 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
flusher.Flush()
|
||||
|
||||
encoder := json.NewEncoder(w)
|
||||
encoder := watchjson.NewEncoder(w, self.codec)
|
||||
for {
|
||||
select {
|
||||
case <-cn.CloseNotify():
|
||||
@ -169,13 +168,7 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// End of results.
|
||||
return
|
||||
}
|
||||
obj, err := api.NewJSONWatchEvent(self.codec, event)
|
||||
if err != nil {
|
||||
// Client disconnect.
|
||||
self.watching.Stop()
|
||||
return
|
||||
}
|
||||
if err := encoder.Encode(obj); err != nil {
|
||||
if err := encoder.Encode(&event); err != nil {
|
||||
// Client disconnect.
|
||||
self.watching.Stop()
|
||||
return
|
||||
|
@ -25,11 +25,16 @@ import (
|
||||
"testing"
|
||||
|
||||
"code.google.com/p/go.net/websocket"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// watchJSON defines the expected JSON wire equivalent of watch.Event
|
||||
type watchJSON struct {
|
||||
Type watch.EventType `json:"type,omitempty" yaml:"type,omitempty"`
|
||||
Object json.RawMessage `json:"object,omitempty" yaml:"object,omitempty"`
|
||||
}
|
||||
|
||||
var watchTestTable = []struct {
|
||||
t watch.EventType
|
||||
obj runtime.Object
|
||||
@ -61,7 +66,7 @@ func TestWatchWebsocket(t *testing.T) {
|
||||
// Send
|
||||
simpleStorage.fakeWatch.Action(action, object)
|
||||
// Test receive
|
||||
var got api.WatchEvent
|
||||
var got watchJSON
|
||||
err := websocket.JSON.Receive(ws, &got)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
@ -69,8 +74,8 @@ func TestWatchWebsocket(t *testing.T) {
|
||||
if got.Type != action {
|
||||
t.Errorf("Unexpected type: %v", got.Type)
|
||||
}
|
||||
if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
if e, a := runtime.EncodeOrDie(codec, object), string(got.Object); !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
@ -79,7 +84,7 @@ func TestWatchWebsocket(t *testing.T) {
|
||||
}
|
||||
simpleStorage.fakeWatch.Stop()
|
||||
|
||||
var got api.WatchEvent
|
||||
var got watchJSON
|
||||
err = websocket.JSON.Receive(ws, &got)
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected non-error")
|
||||
@ -118,7 +123,7 @@ func TestWatchHTTP(t *testing.T) {
|
||||
// Send
|
||||
simpleStorage.fakeWatch.Action(item.t, item.obj)
|
||||
// Test receive
|
||||
var got api.WatchEvent
|
||||
var got watchJSON
|
||||
err := decoder.Decode(&got)
|
||||
if err != nil {
|
||||
t.Fatalf("%d: Unexpected error: %v", i, err)
|
||||
@ -126,13 +131,13 @@ func TestWatchHTTP(t *testing.T) {
|
||||
if got.Type != item.t {
|
||||
t.Errorf("%d: Unexpected type: %v", i, got.Type)
|
||||
}
|
||||
if e, a := item.obj, got.Object.Object; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("%d: Expected %v, got %v", i, e, a)
|
||||
if e, a := runtime.EncodeOrDie(codec, item.obj), string(got.Object); !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %#v, got %#v", e, a)
|
||||
}
|
||||
}
|
||||
simpleStorage.fakeWatch.Stop()
|
||||
|
||||
var got api.WatchEvent
|
||||
var got watchJSON
|
||||
err = decoder.Decode(&got)
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected non-error")
|
||||
|
@ -28,11 +28,11 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
cwatch "github.com/GoogleCloudPlatform/kubernetes/pkg/client/watch"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
@ -269,7 +269,7 @@ func (r *Request) Watch() (watch.Interface, error) {
|
||||
if response.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("Got status: %v", response.StatusCode)
|
||||
}
|
||||
return watch.NewStreamWatcher(cwatch.NewAPIEventDecoder(response.Body)), nil
|
||||
return watch.NewStreamWatcher(watchjson.NewDecoder(response.Body, r.c.Codec)), nil
|
||||
}
|
||||
|
||||
// Do formats and executes the request. Returns the API object received, or an error.
|
||||
|
@ -19,7 +19,6 @@ package client
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
@ -29,12 +28,14 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json"
|
||||
)
|
||||
|
||||
func TestDoRequestNewWay(t *testing.T) {
|
||||
@ -386,7 +387,7 @@ func TestWatch(t *testing.T) {
|
||||
}{
|
||||
{watch.Added, &api.Pod{JSONBase: api.JSONBase{ID: "first"}}},
|
||||
{watch.Modified, &api.Pod{JSONBase: api.JSONBase{ID: "second"}}},
|
||||
{watch.Deleted, &api.Pod{JSONBase: api.JSONBase{ID: "third"}}},
|
||||
{watch.Deleted, &api.Pod{JSONBase: api.JSONBase{ID: "last"}}},
|
||||
}
|
||||
|
||||
auth := AuthInfo{User: "user", Password: "pass"}
|
||||
@ -401,13 +402,9 @@ func TestWatch(t *testing.T) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
flusher.Flush()
|
||||
|
||||
encoder := json.NewEncoder(w)
|
||||
encoder := watchjson.NewEncoder(w, latest.Codec)
|
||||
for _, item := range table {
|
||||
data, err := api.NewJSONWatchEvent(v1beta1.Codec, watch.Event{item.t, item.obj})
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := encoder.Encode(data); err != nil {
|
||||
if err := encoder.Encode(&watch.Event{item.t, item.obj}); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
flusher.Flush()
|
||||
|
@ -1,64 +0,0 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// APIEventDecoder implements the watch.Decoder interface for io.ReadClosers that
|
||||
// have contents which consist of a series of api.WatchEvent objects encoded via JSON.
|
||||
// It will decode any object which is registered to convert to api.WatchEvent via
|
||||
// api.Scheme
|
||||
type APIEventDecoder struct {
|
||||
stream io.ReadCloser
|
||||
decoder *json.Decoder
|
||||
}
|
||||
|
||||
// NewAPIEventDecoder creates an APIEventDecoder for the given stream.
|
||||
func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder {
|
||||
return &APIEventDecoder{
|
||||
stream: stream,
|
||||
decoder: json.NewDecoder(stream),
|
||||
}
|
||||
}
|
||||
|
||||
// Decode blocks until it can return the next object in the stream. Returns an error
|
||||
// if the stream is closed or an object can't be decoded.
|
||||
func (d *APIEventDecoder) Decode() (action watch.EventType, object runtime.Object, err error) {
|
||||
var got api.WatchEvent
|
||||
err = d.decoder.Decode(&got)
|
||||
if err != nil {
|
||||
return action, nil, err
|
||||
}
|
||||
switch got.Type {
|
||||
case watch.Added, watch.Modified, watch.Deleted:
|
||||
return got.Type, got.Object.Object, err
|
||||
}
|
||||
return action, nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
|
||||
}
|
||||
|
||||
// Close closes the underlying stream.
|
||||
func (d *APIEventDecoder) Close() {
|
||||
d.stream.Close()
|
||||
}
|
@ -20,47 +20,6 @@ import (
|
||||
"gopkg.in/v1/yaml"
|
||||
)
|
||||
|
||||
// EmbeddedObject must have an appropriate encoder and decoder functions, such that on the
|
||||
// wire, it's stored as a []byte, but in memory, the contained object is accessable as an
|
||||
// Object via the Get() function. Only valid API objects may be stored via EmbeddedObject.
|
||||
// The purpose of this is to allow an API object of type known only at runtime to be
|
||||
// embedded within other API objects.
|
||||
//
|
||||
// Define a Codec variable in your package and import the runtime package and
|
||||
// then use the commented section below
|
||||
|
||||
/*
|
||||
// EmbeddedObject implements a Codec specific version of an
|
||||
// embedded object.
|
||||
type EmbeddedObject struct {
|
||||
runtime.Object
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
func (a *EmbeddedObject) UnmarshalJSON(b []byte) error {
|
||||
obj, err := runtime.CodecUnmarshalJSON(Codec, b)
|
||||
a.Object = obj
|
||||
return err
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (a EmbeddedObject) MarshalJSON() ([]byte, error) {
|
||||
return runtime.CodecMarshalJSON(Codec, a.Object)
|
||||
}
|
||||
|
||||
// SetYAML implements the yaml.Setter interface.
|
||||
func (a *EmbeddedObject) SetYAML(tag string, value interface{}) bool {
|
||||
obj, ok := runtime.CodecSetYAML(Codec, tag, value)
|
||||
a.Object = obj
|
||||
return ok
|
||||
}
|
||||
|
||||
// GetYAML implements the yaml.Getter interface.
|
||||
func (a EmbeddedObject) GetYAML() (tag string, value interface{}) {
|
||||
return runtime.CodecGetYAML(Codec, a.Object)
|
||||
}
|
||||
*/
|
||||
|
||||
// Encode()/Decode() are the canonical way of converting an API object to/from
|
||||
// wire format. This file provides utility functions which permit doing so
|
||||
// recursively, such that API objects of types known only at run time can be
|
||||
|
@ -27,52 +27,29 @@ import (
|
||||
var scheme = runtime.NewScheme()
|
||||
var Codec = runtime.CodecFor(scheme, "v1test")
|
||||
|
||||
// EmbeddedObject implements a Codec specific version of an
|
||||
// embedded object.
|
||||
type EmbeddedObject struct {
|
||||
runtime.Object
|
||||
}
|
||||
|
||||
// UnmarshalJSON implements the json.Unmarshaler interface.
|
||||
func (a *EmbeddedObject) UnmarshalJSON(b []byte) error {
|
||||
obj, err := runtime.CodecUnmarshalJSON(Codec, b)
|
||||
a.Object = obj
|
||||
return err
|
||||
}
|
||||
|
||||
// MarshalJSON implements the json.Marshaler interface.
|
||||
func (a EmbeddedObject) MarshalJSON() ([]byte, error) {
|
||||
return runtime.CodecMarshalJSON(Codec, a.Object)
|
||||
}
|
||||
|
||||
// SetYAML implements the yaml.Setter interface.
|
||||
func (a *EmbeddedObject) SetYAML(tag string, value interface{}) bool {
|
||||
obj, ok := runtime.CodecSetYAML(Codec, tag, value)
|
||||
a.Object = obj
|
||||
return ok
|
||||
}
|
||||
|
||||
// GetYAML implements the yaml.Getter interface.
|
||||
func (a EmbeddedObject) GetYAML() (tag string, value interface{}) {
|
||||
return runtime.CodecGetYAML(Codec, a.Object)
|
||||
}
|
||||
|
||||
type EmbeddedTest struct {
|
||||
runtime.JSONBase `yaml:",inline" json:",inline"`
|
||||
Object EmbeddedObject `yaml:"object,omitempty" json:"object,omitempty"`
|
||||
EmptyObject EmbeddedObject `yaml:"emptyObject,omitempty" json:"emptyObject,omitempty"`
|
||||
Object runtime.EmbeddedObject `yaml:"object,omitempty" json:"object,omitempty"`
|
||||
EmptyObject runtime.EmbeddedObject `yaml:"emptyObject,omitempty" json:"emptyObject,omitempty"`
|
||||
}
|
||||
|
||||
func (*EmbeddedTest) IsAnAPIObject() {}
|
||||
type EmbeddedTestExternal struct {
|
||||
runtime.JSONBase `yaml:",inline" json:",inline"`
|
||||
Object runtime.RawExtension `yaml:"object,omitempty" json:"object,omitempty"`
|
||||
EmptyObject runtime.RawExtension `yaml:"emptyObject,omitempty" json:"emptyObject,omitempty"`
|
||||
}
|
||||
|
||||
func (*EmbeddedTest) IsAnAPIObject() {}
|
||||
func (*EmbeddedTestExternal) IsAnAPIObject() {}
|
||||
|
||||
func TestEmbeddedObject(t *testing.T) {
|
||||
s := scheme
|
||||
s.AddKnownTypes("", &EmbeddedTest{})
|
||||
s.AddKnownTypes("v1test", &EmbeddedTest{})
|
||||
s.AddKnownTypeWithName("v1test", "EmbeddedTest", &EmbeddedTestExternal{})
|
||||
|
||||
outer := &EmbeddedTest{
|
||||
JSONBase: runtime.JSONBase{ID: "outer"},
|
||||
Object: EmbeddedObject{
|
||||
Object: runtime.EmbeddedObject{
|
||||
&EmbeddedTest{
|
||||
JSONBase: runtime.JSONBase{ID: "inner"},
|
||||
},
|
||||
@ -95,18 +72,30 @@ func TestEmbeddedObject(t *testing.T) {
|
||||
t.Errorf("Expected: %#v but got %#v", e, a)
|
||||
}
|
||||
|
||||
// test JSON decoding of the external object, which should preserve
|
||||
// raw bytes
|
||||
var externalViaJSON EmbeddedTestExternal
|
||||
err = json.Unmarshal(wire, &externalViaJSON)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected decode error %v", err)
|
||||
}
|
||||
if externalViaJSON.Kind == "" || externalViaJSON.APIVersion == "" || externalViaJSON.ID != "outer" {
|
||||
t.Errorf("Expected objects to have type info set, got %#v", externalViaJSON)
|
||||
}
|
||||
if !reflect.DeepEqual(externalViaJSON.EmptyObject.RawJSON, []byte("null")) || len(externalViaJSON.Object.RawJSON) == 0 {
|
||||
t.Errorf("Expected deserialization of nested objects into bytes, got %#v", externalViaJSON)
|
||||
}
|
||||
|
||||
// test JSON decoding, too, since Decode uses yaml unmarshalling.
|
||||
// Generic Unmarshalling of JSON cannot load the nested objects because there is
|
||||
// no default schema set. Consumers wishing to get direct JSON decoding must use
|
||||
// the external representation
|
||||
var decodedViaJSON EmbeddedTest
|
||||
err = json.Unmarshal(wire, &decodedViaJSON)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected decode error %v", err)
|
||||
}
|
||||
|
||||
// Things that Decode would have done for us:
|
||||
decodedViaJSON.Kind = ""
|
||||
decodedViaJSON.APIVersion = ""
|
||||
|
||||
if e, a := outer, &decodedViaJSON; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected: %#v but got %#v", e, a)
|
||||
if a := decodedViaJSON; a.Object.Object != nil || a.EmptyObject.Object != nil {
|
||||
t.Errorf("Expected embedded objects to be nil: %#v", a)
|
||||
}
|
||||
}
|
||||
|
@ -17,11 +17,16 @@ limitations under the License.
|
||||
package runtime
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"gopkg.in/v1/yaml"
|
||||
)
|
||||
|
||||
func (re *RawExtension) UnmarshalJSON(in []byte) error {
|
||||
re.RawJSON = in
|
||||
if re == nil {
|
||||
return errors.New("runtime.RawExtension: UnmarshalJSON on nil pointer")
|
||||
}
|
||||
re.RawJSON = append(re.RawJSON[0:0], in...)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -121,7 +121,7 @@ func (self *Scheme) embeddedObjectToRawExtension(in *EmbeddedObject, out *RawExt
|
||||
}
|
||||
|
||||
// rawExtensionToEmbeddedObject does the conversion you would expect from the name, using the information
|
||||
// given in conversion.Scope. It's placed in the DefaultScheme as a ConversionFunc to enable plugins;
|
||||
// given in conversion.Scope. It's placed in all schemes as a ConversionFunc to enable plugins;
|
||||
// see the comment for RawExtension.
|
||||
func (self *Scheme) rawExtensionToEmbeddedObject(in *RawExtension, out *EmbeddedObject, s conversion.Scope) error {
|
||||
if len(in.RawJSON) == 4 && string(in.RawJSON) == "null" {
|
||||
|
@ -17,10 +17,12 @@ limitations under the License.
|
||||
package watch
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
|
||||
@ -82,6 +84,9 @@ func (sw *StreamWatcher) receive() {
|
||||
for {
|
||||
action, obj, err := sw.source.Decode()
|
||||
if err != nil {
|
||||
if err != io.EOF {
|
||||
glog.Errorf("Unable to decode an event from the watch stream: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
sw.result <- Event{
|
||||
|
69
pkg/watch/json/decoder.go
Normal file
69
pkg/watch/json/decoder.go
Normal file
@ -0,0 +1,69 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package json
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// Decoder implements the watch.Decoder interface for io.ReadClosers that
|
||||
// have contents which consist of a series of watchEvent objects encoded via JSON.
|
||||
// It will decode any object registered in the supplied codec.
|
||||
type Decoder struct {
|
||||
r io.ReadCloser
|
||||
decoder *json.Decoder
|
||||
codec runtime.Codec
|
||||
}
|
||||
|
||||
// NewDecoder creates an Decoder for the given writer and codec.
|
||||
func NewDecoder(r io.ReadCloser, codec runtime.Codec) *Decoder {
|
||||
return &Decoder{
|
||||
r: r,
|
||||
decoder: json.NewDecoder(r),
|
||||
codec: codec,
|
||||
}
|
||||
}
|
||||
|
||||
// Decode blocks until it can return the next object in the writer. Returns an error
|
||||
// if the writer is closed or an object can't be decoded.
|
||||
func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) {
|
||||
var got watchEvent
|
||||
if err := d.decoder.Decode(&got); err != nil {
|
||||
return "", nil, err
|
||||
}
|
||||
switch got.Type {
|
||||
case watch.Added, watch.Modified, watch.Deleted:
|
||||
default:
|
||||
return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type)
|
||||
}
|
||||
|
||||
obj, err := d.codec.Decode(got.Object.RawJSON)
|
||||
if err != nil {
|
||||
return "", nil, fmt.Errorf("unable to decode watch event: %v", err)
|
||||
}
|
||||
return got.Type, obj, nil
|
||||
}
|
||||
|
||||
// Close closes the underlying r.
|
||||
func (d *Decoder) Close() {
|
||||
d.r.Close()
|
||||
}
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch
|
||||
package json
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@ -25,17 +25,13 @@ import (
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
type watchSerialization struct {
|
||||
Type watch.EventType
|
||||
Object json.RawMessage
|
||||
}
|
||||
|
||||
func TestDecoder(t *testing.T) {
|
||||
out, in := io.Pipe()
|
||||
decoder := NewAPIEventDecoder(out)
|
||||
decoder := NewDecoder(out, v1beta1.Codec)
|
||||
|
||||
expect := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
encoder := json.NewEncoder(in)
|
||||
@ -44,7 +40,7 @@ func TestDecoder(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error %v", err)
|
||||
}
|
||||
if err := encoder.Encode(&watchSerialization{watch.Added, json.RawMessage(data)}); err != nil {
|
||||
if err := encoder.Encode(&watchEvent{watch.Added, runtime.RawExtension{json.RawMessage(data)}}); err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
in.Close()
|
||||
@ -82,7 +78,7 @@ func TestDecoder(t *testing.T) {
|
||||
|
||||
func TestDecoder_SourceClose(t *testing.T) {
|
||||
out, in := io.Pipe()
|
||||
decoder := NewAPIEventDecoder(out)
|
||||
decoder := NewDecoder(out, v1beta1.Codec)
|
||||
|
||||
done := make(chan struct{})
|
||||
|
@ -14,30 +14,6 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestEmbeddedDefaultSerialization(t *testing.T) {
|
||||
expected := WatchEvent{
|
||||
Type: "foo",
|
||||
Object: EmbeddedObject{&Pod{}},
|
||||
}
|
||||
data, err := json.Marshal(expected)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
actual := WatchEvent{}
|
||||
if err := json.Unmarshal(data, &actual); err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("Expected %#v, Got %#v", expected, actual)
|
||||
}
|
||||
}
|
||||
// Package json implements a simple encoder and decoder for streams
|
||||
// of watch events over io.Writer/Readers
|
||||
package json
|
53
pkg/watch/json/encoder.go
Normal file
53
pkg/watch/json/encoder.go
Normal file
@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package json
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// Encoder implements the json.Encoder interface for io.Writers that
|
||||
// should serialize watchEvent objects into JSON. It will encode any object
|
||||
// registered in the supplied codec and return an error otherwies.
|
||||
type Encoder struct {
|
||||
w io.Writer
|
||||
encoder *json.Encoder
|
||||
codec runtime.Codec
|
||||
}
|
||||
|
||||
// NewEncoder creates an Encoder for the given writer and codec
|
||||
func NewEncoder(w io.Writer, codec runtime.Codec) *Encoder {
|
||||
return &Encoder{
|
||||
w: w,
|
||||
encoder: json.NewEncoder(w),
|
||||
codec: codec,
|
||||
}
|
||||
}
|
||||
|
||||
// Encode writes an event to the writer. Returns an error
|
||||
// if the writer is closed or an object can't be encoded.
|
||||
func (e *Encoder) Encode(event *watch.Event) error {
|
||||
obj, err := Object(e.codec, event)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return e.encoder.Encode(obj)
|
||||
}
|
76
pkg/watch/json/encoder_test.go
Normal file
76
pkg/watch/json/encoder_test.go
Normal file
@ -0,0 +1,76 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package json
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io/ioutil"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
func TestEncodeDecodeRoundTrip(t *testing.T) {
|
||||
testCases := []struct {
|
||||
Type watch.EventType
|
||||
Object runtime.Object
|
||||
Codec runtime.Codec
|
||||
}{
|
||||
{
|
||||
watch.Added,
|
||||
&api.Pod{JSONBase: api.JSONBase{ID: "foo"}},
|
||||
v1beta1.Codec,
|
||||
},
|
||||
{
|
||||
watch.Modified,
|
||||
&api.Pod{JSONBase: api.JSONBase{ID: "foo"}},
|
||||
v1beta2.Codec,
|
||||
},
|
||||
{
|
||||
watch.Deleted,
|
||||
&api.Pod{JSONBase: api.JSONBase{ID: "foo"}},
|
||||
api.Codec,
|
||||
},
|
||||
}
|
||||
for i, testCase := range testCases {
|
||||
buf := &bytes.Buffer{}
|
||||
|
||||
encoder := NewEncoder(buf, testCase.Codec)
|
||||
if err := encoder.Encode(&watch.Event{Type: testCase.Type, Object: testCase.Object}); err != nil {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
continue
|
||||
}
|
||||
|
||||
decoder := NewDecoder(ioutil.NopCloser(buf), testCase.Codec)
|
||||
event, obj, err := decoder.Decode()
|
||||
if err != nil {
|
||||
t.Errorf("%d: unexpected error: %v", i, err)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(testCase.Object, obj) {
|
||||
t.Errorf("%d: expected %#v, got %#v", i, testCase.Object, obj)
|
||||
}
|
||||
if event != testCase.Type {
|
||||
t.Errorf("%d: unexpected type: %#v", i, event)
|
||||
}
|
||||
}
|
||||
}
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package api
|
||||
package json
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
@ -25,26 +25,19 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// WatchEvent objects are streamed from the api server in response to a watch request.
|
||||
// watchEvent objects are streamed from the api server in response to a watch request.
|
||||
// These are not API objects and are unversioned today.
|
||||
type WatchEvent struct {
|
||||
type watchEvent struct {
|
||||
// The type of the watch event; added, modified, or deleted.
|
||||
Type watch.EventType
|
||||
Type watch.EventType `json:"type,omitempty" yaml:"type,omitempty"`
|
||||
|
||||
// For added or modified objects, this is the new object; for deleted objects,
|
||||
// it's the state of the object immediately prior to its deletion.
|
||||
Object EmbeddedObject
|
||||
Object runtime.RawExtension `json:"object,omitempty" yaml:"object,omitempty"`
|
||||
}
|
||||
|
||||
// watchSerialization defines the JSON wire equivalent of watch.Event
|
||||
type watchSerialization struct {
|
||||
Type watch.EventType
|
||||
Object json.RawMessage
|
||||
}
|
||||
|
||||
// NewJSONWatcHEvent returns an object that will serialize to JSON and back
|
||||
// to a WatchEvent.
|
||||
func NewJSONWatchEvent(codec runtime.Codec, event watch.Event) (interface{}, error) {
|
||||
// Object converts a watch.Event into an appropriately serializable JSON object
|
||||
func Object(codec runtime.Codec, event *watch.Event) (interface{}, error) {
|
||||
obj, ok := event.Object.(runtime.Object)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("The event object cannot be safely converted to JSON: %v", reflect.TypeOf(event.Object).Name())
|
||||
@ -53,5 +46,5 @@ func NewJSONWatchEvent(codec runtime.Codec, event watch.Event) (interface{}, err
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &watchSerialization{event.Type, json.RawMessage(data)}, nil
|
||||
return &watchEvent{event.Type, runtime.RawExtension{json.RawMessage(data)}}, nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user