Add watch capability to our client.

Next steps: Make an etcd watcher... decide on a state field for pods...
move the scheduler to its own binary.
This commit is contained in:
Daniel Smith
2014-07-17 16:09:29 -07:00
parent f672edd1cf
commit dbd0d419df
3 changed files with 219 additions and 23 deletions

View File

@@ -18,15 +18,20 @@ package client
import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"path"
"sync"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
@@ -191,25 +196,51 @@ func (r *Request) PollPeriod(d time.Duration) *Request {
return r
}
func (r *Request) finalURL() string {
finalURL := r.c.host + r.path
query := url.Values{}
if r.selector != nil {
query.Add("labels", r.selector.String())
}
if r.sync {
query.Add("sync", "true")
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
}
}
finalURL += "?" + query.Encode()
return finalURL
}
// Attempts to begin watching the requested location. Returns a watch.Interface, or an error.
func (r *Request) Watch() (watch.Interface, error) {
if r.err != nil {
return nil, r.err
}
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
if err != nil {
return nil, err
}
if r.c.auth != nil {
req.SetBasicAuth(r.c.auth.User, r.c.auth.Password)
}
response, err := r.c.httpClient.Do(req)
if err != nil {
return nil, err
}
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Got status: %v", response.StatusCode)
}
return newHTTPWatcher(response.Body), nil
}
// Do formats and executes the request. Returns the API object received, or an error.
func (r *Request) Do() Result {
for {
if r.err != nil {
return Result{err: r.err}
}
finalURL := r.c.host + r.path
query := url.Values{}
if r.selector != nil {
query.Add("labels", r.selector.String())
}
if r.sync {
query.Add("sync", "true")
if r.timeout != 0 {
query.Add("timeout", r.timeout.String())
}
}
finalURL += "?" + query.Encode()
req, err := http.NewRequest(r.verb, finalURL, r.body)
req, err := http.NewRequest(r.verb, r.finalURL(), r.body)
if err != nil {
return Result{err: err}
}
@@ -262,3 +293,74 @@ func (r Result) Into(obj interface{}) error {
func (r Result) Error() error {
return r.err
}
type httpWatcher struct {
source io.ReadCloser
result chan watch.Event
done chan struct{}
sync.Mutex
stopped bool
}
func newHTTPWatcher(source io.ReadCloser) *httpWatcher {
hw := &httpWatcher{
source: source,
result: make(chan watch.Event),
done: make(chan struct{}),
}
go hw.receive()
return hw
}
// Implements watch.Interface
func (hw *httpWatcher) ResultChan() <-chan watch.Event {
return hw.result
}
// Implements watch.Interface
func (hw *httpWatcher) Stop() {
hw.Lock()
defer hw.Unlock()
if !hw.stopped {
close(hw.done)
hw.stopped = true
}
}
// In a loop, read results from http, decode, and send down the result channel.
func (hw *httpWatcher) receive() {
defer close(hw.result)
defer hw.source.Close()
defer util.HandleCrash()
decoder := json.NewDecoder(hw.source)
decoded := make(chan *api.WatchEvent)
// Read one at a time. Have to do this separately because Decode blocks and
// we want to wait on the done channel, too.
go func() {
defer util.HandleCrash()
for {
var got api.WatchEvent
err := decoder.Decode(&got)
if err != nil {
hw.Stop()
return
}
decoded <- &got
}
}()
for {
select {
case <-hw.done:
return
case got := <-decoded:
hw.result <- watch.Event{
Type: got.Type,
Object: got.Object.Object,
}
}
}
}