diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 8421df3c699..d110300b6d5 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -17,7 +17,6 @@ limitations under the License. package apiserver import ( - "encoding/json" "net/http" "net/url" "regexp" @@ -25,11 +24,11 @@ import ( "strings" "code.google.com/p/go.net/websocket" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/httplog" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json" ) type WatchHandler struct { @@ -120,7 +119,7 @@ func (w *WatchServer) HandleWS(ws *websocket.Conn) { // End of results. return } - obj, err := api.NewJSONWatchEvent(w.codec, event) + obj, err := watchjson.Object(w.codec, &event) if err != nil { // Client disconnect. w.watching.Stop() @@ -158,7 +157,7 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) flusher.Flush() - encoder := json.NewEncoder(w) + encoder := watchjson.NewEncoder(w, self.codec) for { select { case <-cn.CloseNotify(): @@ -169,13 +168,7 @@ func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // End of results. return } - obj, err := api.NewJSONWatchEvent(self.codec, event) - if err != nil { - // Client disconnect. - self.watching.Stop() - return - } - if err := encoder.Encode(obj); err != nil { + if err := encoder.Encode(&event); err != nil { // Client disconnect. self.watching.Stop() return diff --git a/pkg/apiserver/watch_test.go b/pkg/apiserver/watch_test.go index 40039418c49..575e843c84e 100644 --- a/pkg/apiserver/watch_test.go +++ b/pkg/apiserver/watch_test.go @@ -25,11 +25,16 @@ import ( "testing" "code.google.com/p/go.net/websocket" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) +// watchJSON defines the expected JSON wire equivalent of watch.Event +type watchJSON struct { + Type watch.EventType `json:"type,omitempty" yaml:"type,omitempty"` + Object json.RawMessage `json:"object,omitempty" yaml:"object,omitempty"` +} + var watchTestTable = []struct { t watch.EventType obj runtime.Object @@ -61,7 +66,7 @@ func TestWatchWebsocket(t *testing.T) { // Send simpleStorage.fakeWatch.Action(action, object) // Test receive - var got api.WatchEvent + var got watchJSON err := websocket.JSON.Receive(ws, &got) if err != nil { t.Fatalf("Unexpected error: %v", err) @@ -69,8 +74,8 @@ func TestWatchWebsocket(t *testing.T) { if got.Type != action { t.Errorf("Unexpected type: %v", got.Type) } - if e, a := object, got.Object.Object; !reflect.DeepEqual(e, a) { - t.Errorf("Expected %v, got %v", e, a) + if e, a := runtime.EncodeOrDie(codec, object), string(got.Object); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) } } @@ -79,7 +84,7 @@ func TestWatchWebsocket(t *testing.T) { } simpleStorage.fakeWatch.Stop() - var got api.WatchEvent + var got watchJSON err = websocket.JSON.Receive(ws, &got) if err == nil { t.Errorf("Unexpected non-error") @@ -118,7 +123,7 @@ func TestWatchHTTP(t *testing.T) { // Send simpleStorage.fakeWatch.Action(item.t, item.obj) // Test receive - var got api.WatchEvent + var got watchJSON err := decoder.Decode(&got) if err != nil { t.Fatalf("%d: Unexpected error: %v", i, err) @@ -126,13 +131,13 @@ func TestWatchHTTP(t *testing.T) { if got.Type != item.t { t.Errorf("%d: Unexpected type: %v", i, got.Type) } - if e, a := item.obj, got.Object.Object; !reflect.DeepEqual(e, a) { - t.Errorf("%d: Expected %v, got %v", i, e, a) + if e, a := runtime.EncodeOrDie(codec, item.obj), string(got.Object); !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) } } simpleStorage.fakeWatch.Stop() - var got api.WatchEvent + var got watchJSON err = decoder.Decode(&got) if err == nil { t.Errorf("Unexpected non-error") diff --git a/pkg/client/request.go b/pkg/client/request.go index 7dd1060cafc..65cdd4d7011 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -28,11 +28,11 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - cwatch "github.com/GoogleCloudPlatform/kubernetes/pkg/client/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json" "github.com/golang/glog" ) @@ -269,7 +269,7 @@ func (r *Request) Watch() (watch.Interface, error) { if response.StatusCode != http.StatusOK { return nil, fmt.Errorf("Got status: %v", response.StatusCode) } - return watch.NewStreamWatcher(cwatch.NewAPIEventDecoder(response.Body)), nil + return watch.NewStreamWatcher(watchjson.NewDecoder(response.Body, r.c.Codec)), nil } // Do formats and executes the request. Returns the API object received, or an error. diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index cf7229e73a9..53ff25b4ef1 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -19,7 +19,6 @@ package client import ( "bytes" "encoding/base64" - "encoding/json" "io/ioutil" "net/http" "net/http/httptest" @@ -29,12 +28,14 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + watchjson "github.com/GoogleCloudPlatform/kubernetes/pkg/watch/json" ) func TestDoRequestNewWay(t *testing.T) { @@ -386,7 +387,7 @@ func TestWatch(t *testing.T) { }{ {watch.Added, &api.Pod{JSONBase: api.JSONBase{ID: "first"}}}, {watch.Modified, &api.Pod{JSONBase: api.JSONBase{ID: "second"}}}, - {watch.Deleted, &api.Pod{JSONBase: api.JSONBase{ID: "third"}}}, + {watch.Deleted, &api.Pod{JSONBase: api.JSONBase{ID: "last"}}}, } auth := AuthInfo{User: "user", Password: "pass"} @@ -401,13 +402,9 @@ func TestWatch(t *testing.T) { w.WriteHeader(http.StatusOK) flusher.Flush() - encoder := json.NewEncoder(w) + encoder := watchjson.NewEncoder(w, latest.Codec) for _, item := range table { - data, err := api.NewJSONWatchEvent(v1beta1.Codec, watch.Event{item.t, item.obj}) - if err != nil { - panic(err) - } - if err := encoder.Encode(data); err != nil { + if err := encoder.Encode(&watch.Event{item.t, item.obj}); err != nil { panic(err) } flusher.Flush() diff --git a/pkg/client/watch/decoder.go b/pkg/client/watch/decoder.go deleted file mode 100644 index 2a6f91e0700..00000000000 --- a/pkg/client/watch/decoder.go +++ /dev/null @@ -1,64 +0,0 @@ -/* -Copyright 2014 Google Inc. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package watch - -import ( - "encoding/json" - "fmt" - "io" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" -) - -// APIEventDecoder implements the watch.Decoder interface for io.ReadClosers that -// have contents which consist of a series of api.WatchEvent objects encoded via JSON. -// It will decode any object which is registered to convert to api.WatchEvent via -// api.Scheme -type APIEventDecoder struct { - stream io.ReadCloser - decoder *json.Decoder -} - -// NewAPIEventDecoder creates an APIEventDecoder for the given stream. -func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder { - return &APIEventDecoder{ - stream: stream, - decoder: json.NewDecoder(stream), - } -} - -// Decode blocks until it can return the next object in the stream. Returns an error -// if the stream is closed or an object can't be decoded. -func (d *APIEventDecoder) Decode() (action watch.EventType, object runtime.Object, err error) { - var got api.WatchEvent - err = d.decoder.Decode(&got) - if err != nil { - return action, nil, err - } - switch got.Type { - case watch.Added, watch.Modified, watch.Deleted: - return got.Type, got.Object.Object, err - } - return action, nil, fmt.Errorf("got invalid watch event type: %v", got.Type) -} - -// Close closes the underlying stream. -func (d *APIEventDecoder) Close() { - d.stream.Close() -} diff --git a/pkg/watch/json/decoder.go b/pkg/watch/json/decoder.go new file mode 100644 index 00000000000..bb087b88003 --- /dev/null +++ b/pkg/watch/json/decoder.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package json + +import ( + "encoding/json" + "fmt" + "io" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Decoder implements the watch.Decoder interface for io.ReadClosers that +// have contents which consist of a series of watchEvent objects encoded via JSON. +// It will decode any object registered in the supplied codec. +type Decoder struct { + r io.ReadCloser + decoder *json.Decoder + codec runtime.Codec +} + +// NewDecoder creates an Decoder for the given writer and codec. +func NewDecoder(r io.ReadCloser, codec runtime.Codec) *Decoder { + return &Decoder{ + r: r, + decoder: json.NewDecoder(r), + codec: codec, + } +} + +// Decode blocks until it can return the next object in the writer. Returns an error +// if the writer is closed or an object can't be decoded. +func (d *Decoder) Decode() (watch.EventType, runtime.Object, error) { + var got watchEvent + if err := d.decoder.Decode(&got); err != nil { + return "", nil, err + } + switch got.Type { + case watch.Added, watch.Modified, watch.Deleted: + default: + return "", nil, fmt.Errorf("got invalid watch event type: %v", got.Type) + } + + obj, err := d.codec.Decode(got.Object.RawJSON) + if err != nil { + return "", nil, fmt.Errorf("unable to decode watch event: %v", err) + } + return got.Type, obj, nil +} + +// Close closes the underlying r. +func (d *Decoder) Close() { + d.r.Close() +} diff --git a/pkg/client/watch/decoder_test.go b/pkg/watch/json/decoder_test.go similarity index 88% rename from pkg/client/watch/decoder_test.go rename to pkg/watch/json/decoder_test.go index 52d57c0b198..c407cce4e4b 100644 --- a/pkg/client/watch/decoder_test.go +++ b/pkg/watch/json/decoder_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package watch +package json import ( "encoding/json" @@ -25,17 +25,13 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) -type watchSerialization struct { - Type watch.EventType - Object json.RawMessage -} - func TestDecoder(t *testing.T) { out, in := io.Pipe() - decoder := NewAPIEventDecoder(out) + decoder := NewDecoder(out, v1beta1.Codec) expect := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} encoder := json.NewEncoder(in) @@ -44,7 +40,7 @@ func TestDecoder(t *testing.T) { if err != nil { t.Fatalf("Unexpected error %v", err) } - if err := encoder.Encode(&watchSerialization{watch.Added, json.RawMessage(data)}); err != nil { + if err := encoder.Encode(&watchEvent{watch.Added, runtime.RawExtension{json.RawMessage(data)}}); err != nil { t.Errorf("Unexpected error %v", err) } in.Close() @@ -82,7 +78,7 @@ func TestDecoder(t *testing.T) { func TestDecoder_SourceClose(t *testing.T) { out, in := io.Pipe() - decoder := NewAPIEventDecoder(out) + decoder := NewDecoder(out, v1beta1.Codec) done := make(chan struct{}) diff --git a/pkg/api/watch_test.go b/pkg/watch/json/doc.go similarity index 52% rename from pkg/api/watch_test.go rename to pkg/watch/json/doc.go index 228568da116..8fbacd7673c 100644 --- a/pkg/api/watch_test.go +++ b/pkg/watch/json/doc.go @@ -14,30 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -package api - -import ( - "encoding/json" - "reflect" - "testing" -) - -func TestEmbeddedDefaultSerialization(t *testing.T) { - expected := WatchEvent{ - Type: "foo", - Object: EmbeddedObject{&Pod{}}, - } - data, err := json.Marshal(expected) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - actual := WatchEvent{} - if err := json.Unmarshal(data, &actual); err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - if !reflect.DeepEqual(actual, expected) { - t.Errorf("Expected %#v, Got %#v", expected, actual) - } -} +// Package json implements a simple encoder and decoder for streams +// of watch events over io.Writer/Readers +package json diff --git a/pkg/watch/json/encoder.go b/pkg/watch/json/encoder.go new file mode 100644 index 00000000000..f6895f6e485 --- /dev/null +++ b/pkg/watch/json/encoder.go @@ -0,0 +1,53 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package json + +import ( + "encoding/json" + "io" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Encoder implements the json.Encoder interface for io.Writers that +// should serialize watchEvent objects into JSON. It will encode any object +// registered in the supplied codec and return an error otherwies. +type Encoder struct { + w io.Writer + encoder *json.Encoder + codec runtime.Codec +} + +// NewEncoder creates an Encoder for the given writer and codec +func NewEncoder(w io.Writer, codec runtime.Codec) *Encoder { + return &Encoder{ + w: w, + encoder: json.NewEncoder(w), + codec: codec, + } +} + +// Encode writes an event to the writer. Returns an error +// if the writer is closed or an object can't be encoded. +func (e *Encoder) Encode(event *watch.Event) error { + obj, err := Object(e.codec, event) + if err != nil { + return err + } + return e.encoder.Encode(obj) +} diff --git a/pkg/watch/json/encoder_test.go b/pkg/watch/json/encoder_test.go new file mode 100644 index 00000000000..87c0c6fd3cb --- /dev/null +++ b/pkg/watch/json/encoder_test.go @@ -0,0 +1,76 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package json + +import ( + "bytes" + "io/ioutil" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +func TestEncodeDecodeRoundTrip(t *testing.T) { + testCases := []struct { + Type watch.EventType + Object runtime.Object + Codec runtime.Codec + }{ + { + watch.Added, + &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}, + v1beta1.Codec, + }, + { + watch.Modified, + &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}, + v1beta2.Codec, + }, + { + watch.Deleted, + &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}, + api.Codec, + }, + } + for i, testCase := range testCases { + buf := &bytes.Buffer{} + + encoder := NewEncoder(buf, testCase.Codec) + if err := encoder.Encode(&watch.Event{Type: testCase.Type, Object: testCase.Object}); err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + + decoder := NewDecoder(ioutil.NopCloser(buf), testCase.Codec) + event, obj, err := decoder.Decode() + if err != nil { + t.Errorf("%d: unexpected error: %v", i, err) + continue + } + if !reflect.DeepEqual(testCase.Object, obj) { + t.Errorf("%d: expected %#v, got %#v", i, testCase.Object, obj) + } + if event != testCase.Type { + t.Errorf("%d: unexpected type: %#v", i, event) + } + } +} diff --git a/pkg/api/watch.go b/pkg/watch/json/types.go similarity index 69% rename from pkg/api/watch.go rename to pkg/watch/json/types.go index 288c4cb2458..8851a09601f 100644 --- a/pkg/api/watch.go +++ b/pkg/watch/json/types.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package api +package json import ( "encoding/json" @@ -25,26 +25,19 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) -// WatchEvent objects are streamed from the api server in response to a watch request. +// watchEvent objects are streamed from the api server in response to a watch request. // These are not API objects and are unversioned today. -type WatchEvent struct { +type watchEvent struct { // The type of the watch event; added, modified, or deleted. - Type watch.EventType + Type watch.EventType `json:"type,omitempty" yaml:"type,omitempty"` // For added or modified objects, this is the new object; for deleted objects, // it's the state of the object immediately prior to its deletion. - Object EmbeddedObject + Object runtime.RawExtension `json:"object,omitempty" yaml:"object,omitempty"` } -// watchSerialization defines the JSON wire equivalent of watch.Event -type watchSerialization struct { - Type watch.EventType - Object json.RawMessage -} - -// NewJSONWatcHEvent returns an object that will serialize to JSON and back -// to a WatchEvent. -func NewJSONWatchEvent(codec runtime.Codec, event watch.Event) (interface{}, error) { +// Object converts a watch.Event into an appropriately serializable JSON object +func Object(codec runtime.Codec, event *watch.Event) (interface{}, error) { obj, ok := event.Object.(runtime.Object) if !ok { return nil, fmt.Errorf("The event object cannot be safely converted to JSON: %v", reflect.TypeOf(event.Object).Name()) @@ -53,5 +46,5 @@ func NewJSONWatchEvent(codec runtime.Codec, event watch.Event) (interface{}, err if err != nil { return nil, err } - return &watchSerialization{event.Type, json.RawMessage(data)}, nil + return &watchEvent{event.Type, runtime.RawExtension{json.RawMessage(data)}}, nil }