diff --git a/go.mod b/go.mod index 8ea4cf1..3ede1ea 100644 --- a/go.mod +++ b/go.mod @@ -16,7 +16,6 @@ require ( github.com/rancher/wrangler-api v0.1.5-0.20190619170228-c3525df45215 github.com/sirupsen/logrus v1.4.2 github.com/urfave/cli v1.20.0 - golang.org/x/sync v0.0.0-20190423024810-112230192c58 google.golang.org/appengine v1.5.0 // indirect k8s.io/api v0.0.0-20190409021203-6e4e0e4f393b k8s.io/apiextensions-apiserver v0.0.0-20190409022649-727a075fdec8 diff --git a/vendor/github.com/rancher/norman/pkg/store/empty/empty_store.go b/vendor/github.com/rancher/norman/pkg/store/empty/empty_store.go index 92da739..4d5b824 100644 --- a/vendor/github.com/rancher/norman/pkg/store/empty/empty_store.go +++ b/vendor/github.com/rancher/norman/pkg/store/empty/empty_store.go @@ -27,6 +27,6 @@ func (e *Store) Update(apiOp *types.APIRequest, schema *types.Schema, data types return types.APIObject{}, nil } -func (e *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIObject, error) { +func (e *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) { return nil, nil } diff --git a/vendor/github.com/rancher/norman/pkg/store/proxy/error_wrapper.go b/vendor/github.com/rancher/norman/pkg/store/proxy/error_wrapper.go index 86ebdb6..2d985dc 100644 --- a/vendor/github.com/rancher/norman/pkg/store/proxy/error_wrapper.go +++ b/vendor/github.com/rancher/norman/pkg/store/proxy/error_wrapper.go @@ -38,7 +38,7 @@ func (e *errorStore) Delete(apiOp *types.APIRequest, schema *types.Schema, id st } -func (e *errorStore) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIObject, error) { +func (e *errorStore) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) { data, err := e.Store.Watch(apiOp, schema, opt) return data, translateError(err) } diff --git a/vendor/github.com/rancher/norman/pkg/store/proxy/proxy_store.go b/vendor/github.com/rancher/norman/pkg/store/proxy/proxy_store.go index d4c0526..a3113fc 100644 --- a/vendor/github.com/rancher/norman/pkg/store/proxy/proxy_store.go +++ b/vendor/github.com/rancher/norman/pkg/store/proxy/proxy_store.go @@ -108,17 +108,22 @@ func (s *Store) listNamespace(namespace string, apiOp types.APIRequest, schema * return k8sClient.List(metav1.ListOptions{}) } -func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIObject, error) { +func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) { k8sClient, err := s.clientGetter.Client(apiOp, schema) if err != nil { return nil, err } + list, err := k8sClient.List(metav1.ListOptions{}) + if err != nil { + return nil, err + } + timeout := int64(60 * 30) watcher, err := k8sClient.Watch(metav1.ListOptions{ Watch: true, TimeoutSeconds: &timeout, - ResourceVersion: "0", + ResourceVersion: list.GetResourceVersion(), }) if err != nil { return nil, err @@ -131,15 +136,14 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types. watcher.Stop() }() - result := make(chan types.APIObject) + result := make(chan types.APIEvent) go func() { + for i, obj := range list.Items { + result <- s.toAPIEvent(apiOp, schema, i, len(list.Items), false, &obj) + } for event := range watcher.ResultChan() { data := event.Object.(*unstructured.Unstructured) - s.fromInternal(apiOp, schema, data.Object) - if event.Type == watch.Deleted && data.Object != nil { - data.Object[".removed"] = true - } - result <- types.ToAPI(data.Object) + result <- s.toAPIEvent(apiOp, schema, 0, 0, event.Type == watch.Deleted, data) } logrus.Debugf("closing watcher for %s", schema.ID) close(result) @@ -149,6 +153,22 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types. return result, nil } +func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.Schema, index, count int, remove bool, obj *unstructured.Unstructured) types.APIEvent { + name := "resource.change" + if remove && obj.Object != nil { + name = "resource.remove" + } + + s.fromInternal(apiOp, schema, obj.Object) + + return types.APIEvent{ + Name: name, + Count: count, + Index: index, + Object: types.ToAPI(obj.Object), + } +} + func (s *Store) Create(apiOp *types.APIRequest, schema *types.Schema, params types.APIObject) (types.APIObject, error) { data := params.Map() if err := s.toInternal(schema.Mapper, data); err != nil { diff --git a/vendor/github.com/rancher/norman/pkg/store/schema/schema_store.go b/vendor/github.com/rancher/norman/pkg/store/schema/schema_store.go index 88ebab1..63a6630 100644 --- a/vendor/github.com/rancher/norman/pkg/store/schema/schema_store.go +++ b/vendor/github.com/rancher/norman/pkg/store/schema/schema_store.go @@ -34,7 +34,7 @@ func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) ( return types.APIObject{}, httperror.NewAPIError(httperror.NotFound, "no such schema") } -func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIObject, error) { +func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) { return nil, nil } diff --git a/vendor/github.com/rancher/norman/pkg/subscribe/handler.go b/vendor/github.com/rancher/norman/pkg/subscribe/handler.go index 4ac9e6b..35b6da0 100644 --- a/vendor/github.com/rancher/norman/pkg/subscribe/handler.go +++ b/vendor/github.com/rancher/norman/pkg/subscribe/handler.go @@ -1,9 +1,10 @@ package subscribe import ( - "bytes" "context" + "encoding/json" "errors" + "io" "time" "github.com/gorilla/websocket" @@ -75,7 +76,7 @@ func handler(apiOp *types.APIRequest) error { } }() - events := make(chan types.APIObject) + events := make(chan types.APIEvent) for _, schema := range schemas { if apiOp.AccessControl.CanWatch(apiOp, schema) == nil { streamStore(ctx, readerGroup, apiOp, schema, events) @@ -87,9 +88,10 @@ func handler(apiOp *types.APIRequest) error { close(events) }() - jsonWriter := writer.EncodingResponseWriter{ + capture := &Capture{} + captureWriter := writer.EncodingResponseWriter{ ContentType: "application/json", - Encoder: types.JSONEncoder, + Encoder: capture.Encoder, } t := time.NewTicker(60 * time.Second) defer t.Stop() @@ -103,24 +105,20 @@ func handler(apiOp *types.APIRequest) error { break } - header := `{"name":"resource.change","data":` - if item.Map()[".removed"] == true { - header = `{"name":"resource.remove","data":` - } - schema := apiOp.Schemas.Schema(convert.ToString(item.Map()["type"])) + schema := apiOp.Schemas.Schema(convert.ToString(item.Object.Map()["type"])) if schema != nil { - buffer := &bytes.Buffer{} - if err := jsonWriter.VersionBody(apiOp, buffer, item); err != nil { + if err := captureWriter.VersionBody(apiOp, nil, item.Object); err != nil { cancel() continue } - if err := writeData(c, header, buffer.Bytes()); err != nil { + item.Object = types.ToAPI(capture.Object) + if err := writeData(c, item); err != nil { cancel() } } case <-t.C: - if err := writeData(c, `{"name":"ping","data":`, []byte("{}")); err != nil { + if err := writeData(c, types.APIEvent{Name: "ping"}); err != nil { cancel() } } @@ -130,35 +128,29 @@ func handler(apiOp *types.APIRequest) error { return nil } -func writeData(c *websocket.Conn, header string, buf []byte) error { +func writeData(c *websocket.Conn, event types.APIEvent) error { + event.Data = event.Object.Raw() messageWriter, err := c.NextWriter(websocket.TextMessage) if err != nil { return err } + defer messageWriter.Close() - if _, err := messageWriter.Write([]byte(header)); err != nil { - return err - } - if _, err := messageWriter.Write(buf); err != nil { - return err - } - if _, err := messageWriter.Write([]byte(`}`)); err != nil { - return err - } - return messageWriter.Close() + return json.NewEncoder(messageWriter).Encode(event) } -func watch(apiOp *types.APIRequest, schema *types.Schema, opts *types.QueryOptions) (chan types.APIObject, error) { +func watch(apiOp *types.APIRequest, schema *types.Schema, opts *types.QueryOptions) (chan types.APIEvent, error) { c, err := schema.Store.Watch(apiOp, schema, opts) if err != nil { return nil, err } - return types.APIChan(c, func(data types.APIObject) types.APIObject { - return apiOp.FilterObject(nil, schema, data) + return types.APIChan(c, func(data types.APIEvent) types.APIEvent { + data.Object = apiOp.FilterObject(nil, schema, data.Object) + return data }), nil } -func streamStore(ctx context.Context, eg *errgroup.Group, apiOp *types.APIRequest, schema *types.Schema, result chan types.APIObject) { +func streamStore(ctx context.Context, eg *errgroup.Group, apiOp *types.APIRequest, schema *types.Schema, result chan types.APIEvent) { eg.Go(func() error { opts := parse.QueryOptions(apiOp, schema) events, err := watch(apiOp, schema, &opts) @@ -185,3 +177,12 @@ func matches(items []string, item string) bool { } return slice.ContainsString(items, item) } + +type Capture struct { + Object interface{} +} + +func (c *Capture) Encoder(w io.Writer, obj interface{}) error { + c.Object = obj + return nil +} diff --git a/vendor/github.com/rancher/norman/pkg/types/server_types.go b/vendor/github.com/rancher/norman/pkg/types/server_types.go index ae7cbde..5b980d3 100644 --- a/vendor/github.com/rancher/norman/pkg/types/server_types.go +++ b/vendor/github.com/rancher/norman/pkg/types/server_types.go @@ -189,11 +189,20 @@ type Store interface { Create(apiOp *APIRequest, schema *Schema, data APIObject) (APIObject, error) Update(apiOp *APIRequest, schema *Schema, data APIObject, id string) (APIObject, error) Delete(apiOp *APIRequest, schema *Schema, id string) (APIObject, error) - Watch(apiOp *APIRequest, schema *Schema, opt *QueryOptions) (chan APIObject, error) + Watch(apiOp *APIRequest, schema *Schema, opt *QueryOptions) (chan APIEvent, error) +} + +type APIEvent struct { + Name string `json:"name,omitempty"` + Count int `json:"count,omitempty"` + Index int `json:"index,omitempty"` + Object APIObject `json:"-"` + // Data should be used + Data interface{} `json:"data,omitempty"` } type APIObject struct { - Object interface{} `json:",embed"` + Object interface{} `json:",inline"` } func ToAPI(data interface{}) APIObject { @@ -277,11 +286,11 @@ func Namespace(data map[string]interface{}) string { return convert.ToString(values.GetValueN(data, "metadata", "namespace")) } -func APIChan(c <-chan APIObject, f func(APIObject) APIObject) chan APIObject { +func APIChan(c <-chan APIEvent, f func(APIEvent) APIEvent) chan APIEvent { if c == nil { return nil } - result := make(chan APIObject) + result := make(chan APIEvent) go func() { for data := range c { modified := f(data) diff --git a/vendor/github.com/rancher/norman/pkg/urlbuilder/base.go b/vendor/github.com/rancher/norman/pkg/urlbuilder/base.go index ac09c76..9df945a 100644 --- a/vendor/github.com/rancher/norman/pkg/urlbuilder/base.go +++ b/vendor/github.com/rancher/norman/pkg/urlbuilder/base.go @@ -3,67 +3,53 @@ package urlbuilder import ( "bytes" "fmt" + "net" "net/http" "net/url" "strings" ) func ParseRequestURL(r *http.Request) string { - // Get url from standard headers - requestURL := getURLFromStandardHeaders(r) - if requestURL != "" { - return requestURL - } - - // Use incoming url - scheme := "http" - if r.TLS != nil { - scheme = "https" - } - return fmt.Sprintf("%s://%s%s%s", scheme, r.Host, r.Header.Get(PrefixHeader), r.URL.Path) + scheme := getScheme(r) + host := getHost(r, scheme) + return fmt.Sprintf("%s://%s%s%s", scheme, host, r.Header.Get(PrefixHeader), r.URL.Path) } -func getURLFromStandardHeaders(r *http.Request) string { - xForwardedProto := getOverrideHeader(r, ForwardedProtoHeader, "") - if xForwardedProto == "" { - return "" - } - - host := getOverrideHeader(r, ForwardedHostHeader, "") +func getHost(r *http.Request, scheme string) string { + host := strings.Split(r.Header.Get(ForwardedHostHeader), ",")[0] if host == "" { host = r.Host } - if host == "" { - return "" + port := r.Header.Get(ForwardedPortHeader) + if port == "" { + return host } - port := getOverrideHeader(r, ForwardedPortHeader, "") - if port == "443" || port == "80" { - port = "" // Don't include default ports in url + if port == "80" && scheme == "http" { + return host } - if port != "" && strings.Contains(host, ":") { - // Have to strip the port that is in the host. Handle IPv6, which has this format: [::1]:8080 - if (strings.HasPrefix(host, "[") && strings.Contains(host, "]:")) || !strings.HasPrefix(host, "[") { - host = host[0:strings.LastIndex(host, ":")] - } + if port == "443" && scheme == "http" { + return host } - if port != "" { - port = ":" + port + hostname, _, err := net.SplitHostPort(host) + if err != nil { + return host } - return fmt.Sprintf("%s://%s%s%s%s", xForwardedProto, host, port, r.Header.Get(PrefixHeader), r.URL.Path) + return strings.Join([]string{hostname, port}, ":") } -func getOverrideHeader(r *http.Request, header string, defaultValue string) string { - // Need to handle comma separated hosts in X-Forwarded-For - value := r.Header.Get(header) - if value != "" { - return strings.TrimSpace(strings.Split(value, ",")[0]) +func getScheme(r *http.Request) string { + scheme := r.Header.Get(ForwardedProtoHeader) + if scheme != "" { + return scheme + } else if r.TLS != nil { + return "https" } - return defaultValue + return "http" } func ParseResponseURLBase(currentURL string, r *http.Request) (string, error) { diff --git a/vendor/golang.org/x/sync/semaphore/semaphore.go b/vendor/golang.org/x/sync/semaphore/semaphore.go deleted file mode 100644 index 7f096fe..0000000 --- a/vendor/golang.org/x/sync/semaphore/semaphore.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2017 The Go Authors. All rights reserved. -// Use of this source code is governed by a BSD-style -// license that can be found in the LICENSE file. - -// Package semaphore provides a weighted semaphore implementation. -package semaphore // import "golang.org/x/sync/semaphore" - -import ( - "container/list" - "context" - "sync" -) - -type waiter struct { - n int64 - ready chan<- struct{} // Closed when semaphore acquired. -} - -// NewWeighted creates a new weighted semaphore with the given -// maximum combined weight for concurrent access. -func NewWeighted(n int64) *Weighted { - w := &Weighted{size: n} - return w -} - -// Weighted provides a way to bound concurrent access to a resource. -// The callers can request access with a given weight. -type Weighted struct { - size int64 - cur int64 - mu sync.Mutex - waiters list.List -} - -// Acquire acquires the semaphore with a weight of n, blocking until resources -// are available or ctx is done. On success, returns nil. On failure, returns -// ctx.Err() and leaves the semaphore unchanged. -// -// If ctx is already done, Acquire may still succeed without blocking. -func (s *Weighted) Acquire(ctx context.Context, n int64) error { - s.mu.Lock() - if s.size-s.cur >= n && s.waiters.Len() == 0 { - s.cur += n - s.mu.Unlock() - return nil - } - - if n > s.size { - // Don't make other Acquire calls block on one that's doomed to fail. - s.mu.Unlock() - <-ctx.Done() - return ctx.Err() - } - - ready := make(chan struct{}) - w := waiter{n: n, ready: ready} - elem := s.waiters.PushBack(w) - s.mu.Unlock() - - select { - case <-ctx.Done(): - err := ctx.Err() - s.mu.Lock() - select { - case <-ready: - // Acquired the semaphore after we were canceled. Rather than trying to - // fix up the queue, just pretend we didn't notice the cancelation. - err = nil - default: - s.waiters.Remove(elem) - } - s.mu.Unlock() - return err - - case <-ready: - return nil - } -} - -// TryAcquire acquires the semaphore with a weight of n without blocking. -// On success, returns true. On failure, returns false and leaves the semaphore unchanged. -func (s *Weighted) TryAcquire(n int64) bool { - s.mu.Lock() - success := s.size-s.cur >= n && s.waiters.Len() == 0 - if success { - s.cur += n - } - s.mu.Unlock() - return success -} - -// Release releases the semaphore with a weight of n. -func (s *Weighted) Release(n int64) { - s.mu.Lock() - s.cur -= n - if s.cur < 0 { - s.mu.Unlock() - panic("semaphore: released more than held") - } - for { - next := s.waiters.Front() - if next == nil { - break // No more waiters blocked. - } - - w := next.Value.(waiter) - if s.size-s.cur < w.n { - // Not enough tokens for the next waiter. We could keep going (to try to - // find a waiter with a smaller request), but under load that could cause - // starvation for large requests; instead, we leave all remaining waiters - // blocked. - // - // Consider a semaphore used as a read-write lock, with N tokens, N - // readers, and one writer. Each reader can Acquire(1) to obtain a read - // lock. The writer can Acquire(N) to obtain a write lock, excluding all - // of the readers. If we allow the readers to jump ahead in the queue, - // the writer will starve — there is always one token available for every - // reader. - break - } - - s.cur += w.n - s.waiters.Remove(next) - close(w.ready) - } - s.mu.Unlock() -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 4da3b3c..60f11b5 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -91,7 +91,6 @@ golang.org/x/net/context golang.org/x/oauth2 golang.org/x/oauth2/internal # golang.org/x/sync v0.0.0-20190423024810-112230192c58 -golang.org/x/sync/semaphore golang.org/x/sync/errgroup # golang.org/x/sys v0.0.0-20190422165155-953cdadca894 golang.org/x/sys/unix