mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-19 00:31:00 +00:00
Split httpWatcher into watch.StreamWatcher and tools.APIEventDecoder.
This commit is contained in:
parent
dbd0d419df
commit
91375f4c04
@ -18,19 +18,17 @@ package client
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
@ -231,7 +229,7 @@ func (r *Request) Watch() (watch.Interface, error) {
|
||||
if response.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("Got status: %v", response.StatusCode)
|
||||
}
|
||||
return newHTTPWatcher(response.Body), nil
|
||||
return watch.NewStreamWatcher(tools.NewAPIEventDecoder(response.Body)), nil
|
||||
}
|
||||
|
||||
// Do formats and executes the request. Returns the API object received, or an error.
|
||||
@ -293,74 +291,3 @@ func (r Result) Into(obj interface{}) error {
|
||||
func (r Result) Error() error {
|
||||
return r.err
|
||||
}
|
||||
|
||||
type httpWatcher struct {
|
||||
source io.ReadCloser
|
||||
result chan watch.Event
|
||||
done chan struct{}
|
||||
sync.Mutex
|
||||
stopped bool
|
||||
}
|
||||
|
||||
func newHTTPWatcher(source io.ReadCloser) *httpWatcher {
|
||||
hw := &httpWatcher{
|
||||
source: source,
|
||||
result: make(chan watch.Event),
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
go hw.receive()
|
||||
return hw
|
||||
}
|
||||
|
||||
// Implements watch.Interface
|
||||
func (hw *httpWatcher) ResultChan() <-chan watch.Event {
|
||||
return hw.result
|
||||
}
|
||||
|
||||
// Implements watch.Interface
|
||||
func (hw *httpWatcher) Stop() {
|
||||
hw.Lock()
|
||||
defer hw.Unlock()
|
||||
if !hw.stopped {
|
||||
close(hw.done)
|
||||
hw.stopped = true
|
||||
}
|
||||
}
|
||||
|
||||
// In a loop, read results from http, decode, and send down the result channel.
|
||||
func (hw *httpWatcher) receive() {
|
||||
defer close(hw.result)
|
||||
defer hw.source.Close()
|
||||
defer util.HandleCrash()
|
||||
|
||||
decoder := json.NewDecoder(hw.source)
|
||||
|
||||
decoded := make(chan *api.WatchEvent)
|
||||
|
||||
// Read one at a time. Have to do this separately because Decode blocks and
|
||||
// we want to wait on the done channel, too.
|
||||
go func() {
|
||||
defer util.HandleCrash()
|
||||
for {
|
||||
var got api.WatchEvent
|
||||
err := decoder.Decode(&got)
|
||||
if err != nil {
|
||||
hw.Stop()
|
||||
return
|
||||
}
|
||||
decoded <- &got
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-hw.done:
|
||||
return
|
||||
case got := <-decoded:
|
||||
hw.result <- watch.Event{
|
||||
Type: got.Type,
|
||||
Object: got.Object.Object,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -419,29 +419,7 @@ func TestSyncronize(t *testing.T) {
|
||||
validateSyncReplication(t, &fakePodControl, 7, 0)
|
||||
}
|
||||
|
||||
type asyncTimeout struct {
|
||||
doneChan chan bool
|
||||
}
|
||||
|
||||
func beginTimeout(d time.Duration) *asyncTimeout {
|
||||
a := &asyncTimeout{doneChan: make(chan bool)}
|
||||
go func() {
|
||||
select {
|
||||
case <-a.doneChan:
|
||||
return
|
||||
case <-time.After(d):
|
||||
panic("Timeout expired!")
|
||||
}
|
||||
}()
|
||||
return a
|
||||
}
|
||||
|
||||
func (a *asyncTimeout) done() {
|
||||
close(a.doneChan)
|
||||
}
|
||||
|
||||
func TestWatchControllers(t *testing.T) {
|
||||
defer beginTimeout(20 * time.Second).done()
|
||||
fakeEtcd := tools.MakeFakeEtcdClient(t)
|
||||
manager := MakeReplicationManager(fakeEtcd, nil)
|
||||
var testControllerSpec api.ReplicationController
|
||||
|
53
pkg/tools/decoder.go
Normal file
53
pkg/tools/decoder.go
Normal file
@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package tools
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
// APIEventDecoder implements the watch.Decoder interface for io.ReadClosers that
|
||||
// have contents which consist of a series of api.WatchEvent objects encoded via JSON.
|
||||
type APIEventDecoder struct {
|
||||
stream io.ReadCloser
|
||||
decoder *json.Decoder
|
||||
}
|
||||
|
||||
// NewAPIEventDecoder makes an APIEventDecoder for the given stream.
|
||||
func NewAPIEventDecoder(stream io.ReadCloser) *APIEventDecoder {
|
||||
return &APIEventDecoder{
|
||||
stream: stream,
|
||||
decoder: json.NewDecoder(stream),
|
||||
}
|
||||
}
|
||||
|
||||
// Decode blocks until it can return the next object in the stream. Returns an error
|
||||
// if the stream is closed or an object can't be decoded.
|
||||
func (d *APIEventDecoder) Decode() (action watch.EventType, object interface{}, err error) {
|
||||
var got api.WatchEvent
|
||||
err = d.decoder.Decode(&got)
|
||||
return got.Type, got.Object.Object, err
|
||||
}
|
||||
|
||||
// Close closes the underlying stream.
|
||||
func (d *APIEventDecoder) Close() {
|
||||
d.stream.Close()
|
||||
}
|
85
pkg/tools/decoder_test.go
Normal file
85
pkg/tools/decoder_test.go
Normal file
@ -0,0 +1,85 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package tools
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
func TestDecoder(t *testing.T) {
|
||||
out, in := io.Pipe()
|
||||
encoder := json.NewEncoder(in)
|
||||
decoder := NewAPIEventDecoder(out)
|
||||
|
||||
expect := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
|
||||
go func() {
|
||||
err := encoder.Encode(api.WatchEvent{watch.Added, api.APIObject{expect}})
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
action, got, err := decoder.Decode()
|
||||
if err != nil {
|
||||
t.Errorf("Unexpected error %v", err)
|
||||
}
|
||||
if e, a := watch.Added, action; e != a {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
if e, a := expect, got; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
}
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
_, _, err := decoder.Decode()
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected nil error")
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
decoder.Close()
|
||||
|
||||
<-done
|
||||
}
|
||||
|
||||
func TestDecoder_SourceClose(t *testing.T) {
|
||||
out, in := io.Pipe()
|
||||
decoder := NewAPIEventDecoder(out)
|
||||
|
||||
done := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
_, _, err := decoder.Decode()
|
||||
if err == nil {
|
||||
t.Errorf("Unexpected nil error")
|
||||
}
|
||||
close(done)
|
||||
}()
|
||||
|
||||
in.Close()
|
||||
|
||||
<-done
|
||||
}
|
91
pkg/watch/iowatcher.go
Normal file
91
pkg/watch/iowatcher.go
Normal file
@ -0,0 +1,91 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
|
||||
type Decoder interface {
|
||||
// Decode should return the type of event, the decoded object, or an error.
|
||||
// An error will cause StreamWatcher to call Close(). Decode should block until
|
||||
// it has data or an error occurs.
|
||||
Decode() (action EventType, object interface{}, err error)
|
||||
|
||||
// Close should close the underlying io.Reader, signalling to the source of
|
||||
// the stream that it is no longer being watched. Close() must cause any
|
||||
// outstanding call to Decode() to return with an error of some sort.
|
||||
Close()
|
||||
}
|
||||
|
||||
// StreamWatcher turns any stream for which you can write a Decoder interface
|
||||
// into a watch.Interface.
|
||||
type StreamWatcher struct {
|
||||
source Decoder
|
||||
result chan Event
|
||||
sync.Mutex
|
||||
stopped bool
|
||||
}
|
||||
|
||||
// NewStreamWatcher creates a StreamWatcher from the given decoder.
|
||||
func NewStreamWatcher(d Decoder) *StreamWatcher {
|
||||
sw := &StreamWatcher{
|
||||
source: d,
|
||||
// It's easy for a consumer to add buffering via an extra
|
||||
// goroutine/channel, but impossible for them to remove it,
|
||||
// so nonbuffered is better.
|
||||
result: make(chan Event),
|
||||
}
|
||||
go sw.receive()
|
||||
return sw
|
||||
}
|
||||
|
||||
// ResultChan implements Interface.
|
||||
func (sw *StreamWatcher) ResultChan() <-chan Event {
|
||||
return sw.result
|
||||
}
|
||||
|
||||
// Stop implements Interface.
|
||||
func (sw *StreamWatcher) Stop() {
|
||||
// Call Close() exactly once by locking and setting a flag.
|
||||
sw.Lock()
|
||||
defer sw.Unlock()
|
||||
if !sw.stopped {
|
||||
sw.stopped = true
|
||||
sw.source.Close()
|
||||
}
|
||||
}
|
||||
|
||||
// In a loop, read a result from the decoder and send down the result channel.
|
||||
func (sw *StreamWatcher) receive() {
|
||||
defer close(sw.result)
|
||||
defer sw.Stop()
|
||||
defer util.HandleCrash()
|
||||
for {
|
||||
action, obj, err := sw.source.Decode()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
sw.result <- Event{
|
||||
Type: action,
|
||||
Object: obj,
|
||||
}
|
||||
}
|
||||
}
|
65
pkg/watch/iowatcher_test.go
Normal file
65
pkg/watch/iowatcher_test.go
Normal file
@ -0,0 +1,65 @@
|
||||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
"io"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
type fakeDecoder struct {
|
||||
items chan Event
|
||||
}
|
||||
|
||||
func (f fakeDecoder) Decode() (action EventType, object interface{}, err error) {
|
||||
item, open := <-f.items
|
||||
if !open {
|
||||
return action, nil, io.EOF
|
||||
}
|
||||
return item.Type, item.Object, nil
|
||||
}
|
||||
|
||||
func (f fakeDecoder) Close() {
|
||||
close(f.items)
|
||||
}
|
||||
|
||||
func TestStreamWatcher(t *testing.T) {
|
||||
table := []Event{
|
||||
{Added, "foo"},
|
||||
}
|
||||
|
||||
fd := fakeDecoder{make(chan Event, 5)}
|
||||
sw := NewStreamWatcher(fd)
|
||||
|
||||
for _, item := range table {
|
||||
fd.items <- item
|
||||
got, open := <-sw.ResultChan()
|
||||
if !open {
|
||||
t.Errorf("unexpected early close")
|
||||
}
|
||||
if e, a := item, got; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("expected %v, got %v", e, a)
|
||||
}
|
||||
}
|
||||
|
||||
sw.Stop()
|
||||
_, open := <-sw.ResultChan()
|
||||
if open {
|
||||
t.Errorf("Unexpected failure to close")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user