diff --git a/pkg/client/request.go b/pkg/client/request.go index 6708d492715..47904f230d9 100644 --- a/pkg/client/request.go +++ b/pkg/client/request.go @@ -18,19 +18,17 @@ package client import ( "bytes" - "encoding/json" "fmt" "io" "io/ioutil" "net/http" "net/url" "path" - "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) @@ -231,7 +229,7 @@ func (r *Request) Watch() (watch.Interface, error) { if response.StatusCode != http.StatusOK { return nil, fmt.Errorf("Got status: %v", response.StatusCode) } - return newHTTPWatcher(response.Body), nil + return watch.NewStreamWatcher(tools.NewAPIEventDecoder(response.Body)), nil } // Do formats and executes the request. Returns the API object received, or an error. @@ -293,74 +291,3 @@ func (r Result) Into(obj interface{}) error { func (r Result) Error() error { return r.err } - -type httpWatcher struct { - source io.ReadCloser - result chan watch.Event - done chan struct{} - sync.Mutex - stopped bool -} - -func newHTTPWatcher(source io.ReadCloser) *httpWatcher { - hw := &httpWatcher{ - source: source, - result: make(chan watch.Event), - done: make(chan struct{}), - } - go hw.receive() - return hw -} - -// Implements watch.Interface -func (hw *httpWatcher) ResultChan() <-chan watch.Event { - return hw.result -} - -// Implements watch.Interface -func (hw *httpWatcher) Stop() { - hw.Lock() - defer hw.Unlock() - if !hw.stopped { - close(hw.done) - hw.stopped = true - } -} - -// In a loop, read results from http, decode, and send down the result channel. -func (hw *httpWatcher) receive() { - defer close(hw.result) - defer hw.source.Close() - defer util.HandleCrash() - - decoder := json.NewDecoder(hw.source) - - decoded := make(chan *api.WatchEvent) - - // Read one at a time. Have to do this separately because Decode blocks and - // we want to wait on the done channel, too. - go func() { - defer util.HandleCrash() - for { - var got api.WatchEvent - err := decoder.Decode(&got) - if err != nil { - hw.Stop() - return - } - decoded <- &got - } - }() - - for { - select { - case <-hw.done: - return - case got := <-decoded: - hw.result <- watch.Event{ - Type: got.Type, - Object: got.Object.Object, - } - } - } -} diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 54abf5bf510..95530b980c2 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -419,29 +419,7 @@ func TestSyncronize(t *testing.T) { validateSyncReplication(t, &fakePodControl, 7, 0) } -type asyncTimeout struct { - doneChan chan bool -} - -func beginTimeout(d time.Duration) *asyncTimeout { - a := &asyncTimeout{doneChan: make(chan bool)} - go func() { - select { - case <-a.doneChan: - return - case <-time.After(d): - panic("Timeout expired!") - } - }() - return a -} - -func (a *asyncTimeout) done() { - close(a.doneChan) -} - func TestWatchControllers(t *testing.T) { - defer beginTimeout(20 * time.Second).done() fakeEtcd := tools.MakeFakeEtcdClient(t) manager := MakeReplicationManager(fakeEtcd, nil) var testControllerSpec api.ReplicationController diff --git a/pkg/tools/decoder.go b/pkg/tools/decoder.go new file mode 100644 index 00000000000..5f50a1ece27 --- /dev/null +++ b/pkg/tools/decoder.go @@ -0,0 +1,53 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tools + +import ( + "encoding/json" + "io" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// APIEventDecoder implements the watch.Decoder interface for io.ReadClosers that +// have contents which consist of a series of api.WatchEvent objects encoded via JSON. +type APIEventDecoder struct { + stream io.ReadCloser + decoder *json.Decoder +} + +// NewAPIEventDecoder makes an APIEventDecoder for the given stream. +func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder { + return &APIEventDecoder{ + stream: stream, + decoder: json.NewDecoder(stream), + } +} + +// Decode blocks until it can return the next object in the stream. Returns an error +// if the stream is closed or an object can't be decoded. +func (d *APIEventDecoder) Decode() (action watch.EventType, object interface{}, err error) { + var got api.WatchEvent + err = d.decoder.Decode(&got) + return got.Type, got.Object.Object, err +} + +// Close closes the underlying stream. +func (d *APIEventDecoder) Close() { + d.stream.Close() +} diff --git a/pkg/tools/decoder_test.go b/pkg/tools/decoder_test.go new file mode 100644 index 00000000000..9d500bb3214 --- /dev/null +++ b/pkg/tools/decoder_test.go @@ -0,0 +1,85 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tools + +import ( + "encoding/json" + "io" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +func TestDecoder(t *testing.T) { + out, in := io.Pipe() + encoder := json.NewEncoder(in) + decoder := NewAPIEventDecoder(out) + + expect := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + go func() { + err := encoder.Encode(api.WatchEvent{watch.Added, api.APIObject{expect}}) + if err != nil { + t.Errorf("Unexpected error %v", err) + } + }() + + action, got, err := decoder.Decode() + if err != nil { + t.Errorf("Unexpected error %v", err) + } + if e, a := watch.Added, action; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := expect, got; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + + done := make(chan struct{}) + + go func() { + _, _, err := decoder.Decode() + if err == nil { + t.Errorf("Unexpected nil error") + } + close(done) + }() + + decoder.Close() + + <-done +} + +func TestDecoder_SourceClose(t *testing.T) { + out, in := io.Pipe() + decoder := NewAPIEventDecoder(out) + + done := make(chan struct{}) + + go func() { + _, _, err := decoder.Decode() + if err == nil { + t.Errorf("Unexpected nil error") + } + close(done) + }() + + in.Close() + + <-done +} diff --git a/pkg/watch/iowatcher.go b/pkg/watch/iowatcher.go new file mode 100644 index 00000000000..e2670413654 --- /dev/null +++ b/pkg/watch/iowatcher.go @@ -0,0 +1,91 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written. +type Decoder interface { + // Decode should return the type of event, the decoded object, or an error. + // An error will cause StreamWatcher to call Close(). Decode should block until + // it has data or an error occurs. + Decode() (action EventType, object interface{}, err error) + + // Close should close the underlying io.Reader, signalling to the source of + // the stream that it is no longer being watched. Close() must cause any + // outstanding call to Decode() to return with an error of some sort. + Close() +} + +// StreamWatcher turns any stream for which you can write a Decoder interface +// into a watch.Interface. +type StreamWatcher struct { + source Decoder + result chan Event + sync.Mutex + stopped bool +} + +// NewStreamWatcher creates a StreamWatcher from the given decoder. +func NewStreamWatcher(d Decoder) *StreamWatcher { + sw := &StreamWatcher{ + source: d, + // It's easy for a consumer to add buffering via an extra + // goroutine/channel, but impossible for them to remove it, + // so nonbuffered is better. + result: make(chan Event), + } + go sw.receive() + return sw +} + +// ResultChan implements Interface. +func (sw *StreamWatcher) ResultChan() <-chan Event { + return sw.result +} + +// Stop implements Interface. +func (sw *StreamWatcher) Stop() { + // Call Close() exactly once by locking and setting a flag. + sw.Lock() + defer sw.Unlock() + if !sw.stopped { + sw.stopped = true + sw.source.Close() + } +} + +// In a loop, read a result from the decoder and send down the result channel. +func (sw *StreamWatcher) receive() { + defer close(sw.result) + defer sw.Stop() + defer util.HandleCrash() + for { + action, obj, err := sw.source.Decode() + if err != nil { + return + } + sw.result <- Event{ + Type: action, + Object: obj, + } + } +} diff --git a/pkg/watch/iowatcher_test.go b/pkg/watch/iowatcher_test.go new file mode 100644 index 00000000000..564b254b11e --- /dev/null +++ b/pkg/watch/iowatcher_test.go @@ -0,0 +1,65 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package watch + +import ( + "io" + "reflect" + "testing" +) + +type fakeDecoder struct { + items chan Event +} + +func (f fakeDecoder) Decode() (action EventType, object interface{}, err error) { + item, open := <-f.items + if !open { + return action, nil, io.EOF + } + return item.Type, item.Object, nil +} + +func (f fakeDecoder) Close() { + close(f.items) +} + +func TestStreamWatcher(t *testing.T) { + table := []Event{ + {Added, "foo"}, + } + + fd := fakeDecoder{make(chan Event, 5)} + sw := NewStreamWatcher(fd) + + for _, item := range table { + fd.items <- item + got, open := <-sw.ResultChan() + if !open { + t.Errorf("unexpected early close") + } + if e, a := item, got; !reflect.DeepEqual(e, a) { + t.Errorf("expected %v, got %v", e, a) + } + } + + sw.Stop() + _, open := <-sw.ResultChan() + if open { + t.Errorf("Unexpected failure to close") + } +}