Update vendor

This commit is contained in:
Darren Shepherd
2019-08-07 09:06:11 -07:00
parent c922b54153
commit 1dc21e0576
10 changed files with 97 additions and 210 deletions

View File

@@ -27,6 +27,6 @@ func (e *Store) Update(apiOp *types.APIRequest, schema *types.Schema, data types
return types.APIObject{}, nil
}
func (e *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIObject, error) {
func (e *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) {
return nil, nil
}

View File

@@ -38,7 +38,7 @@ func (e *errorStore) Delete(apiOp *types.APIRequest, schema *types.Schema, id st
}
func (e *errorStore) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIObject, error) {
func (e *errorStore) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) {
data, err := e.Store.Watch(apiOp, schema, opt)
return data, translateError(err)
}

View File

@@ -108,17 +108,22 @@ func (s *Store) listNamespace(namespace string, apiOp types.APIRequest, schema *
return k8sClient.List(metav1.ListOptions{})
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIObject, error) {
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) {
k8sClient, err := s.clientGetter.Client(apiOp, schema)
if err != nil {
return nil, err
}
list, err := k8sClient.List(metav1.ListOptions{})
if err != nil {
return nil, err
}
timeout := int64(60 * 30)
watcher, err := k8sClient.Watch(metav1.ListOptions{
Watch: true,
TimeoutSeconds: &timeout,
ResourceVersion: "0",
ResourceVersion: list.GetResourceVersion(),
})
if err != nil {
return nil, err
@@ -131,15 +136,14 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.
watcher.Stop()
}()
result := make(chan types.APIObject)
result := make(chan types.APIEvent)
go func() {
for i, obj := range list.Items {
result <- s.toAPIEvent(apiOp, schema, i, len(list.Items), false, &obj)
}
for event := range watcher.ResultChan() {
data := event.Object.(*unstructured.Unstructured)
s.fromInternal(apiOp, schema, data.Object)
if event.Type == watch.Deleted && data.Object != nil {
data.Object[".removed"] = true
}
result <- types.ToAPI(data.Object)
result <- s.toAPIEvent(apiOp, schema, 0, 0, event.Type == watch.Deleted, data)
}
logrus.Debugf("closing watcher for %s", schema.ID)
close(result)
@@ -149,6 +153,22 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.
return result, nil
}
func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.Schema, index, count int, remove bool, obj *unstructured.Unstructured) types.APIEvent {
name := "resource.change"
if remove && obj.Object != nil {
name = "resource.remove"
}
s.fromInternal(apiOp, schema, obj.Object)
return types.APIEvent{
Name: name,
Count: count,
Index: index,
Object: types.ToAPI(obj.Object),
}
}
func (s *Store) Create(apiOp *types.APIRequest, schema *types.Schema, params types.APIObject) (types.APIObject, error) {
data := params.Map()
if err := s.toInternal(schema.Mapper, data); err != nil {

View File

@@ -34,7 +34,7 @@ func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) (
return types.APIObject{}, httperror.NewAPIError(httperror.NotFound, "no such schema")
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIObject, error) {
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) {
return nil, nil
}

View File

@@ -1,9 +1,10 @@
package subscribe
import (
"bytes"
"context"
"encoding/json"
"errors"
"io"
"time"
"github.com/gorilla/websocket"
@@ -75,7 +76,7 @@ func handler(apiOp *types.APIRequest) error {
}
}()
events := make(chan types.APIObject)
events := make(chan types.APIEvent)
for _, schema := range schemas {
if apiOp.AccessControl.CanWatch(apiOp, schema) == nil {
streamStore(ctx, readerGroup, apiOp, schema, events)
@@ -87,9 +88,10 @@ func handler(apiOp *types.APIRequest) error {
close(events)
}()
jsonWriter := writer.EncodingResponseWriter{
capture := &Capture{}
captureWriter := writer.EncodingResponseWriter{
ContentType: "application/json",
Encoder: types.JSONEncoder,
Encoder: capture.Encoder,
}
t := time.NewTicker(60 * time.Second)
defer t.Stop()
@@ -103,24 +105,20 @@ func handler(apiOp *types.APIRequest) error {
break
}
header := `{"name":"resource.change","data":`
if item.Map()[".removed"] == true {
header = `{"name":"resource.remove","data":`
}
schema := apiOp.Schemas.Schema(convert.ToString(item.Map()["type"]))
schema := apiOp.Schemas.Schema(convert.ToString(item.Object.Map()["type"]))
if schema != nil {
buffer := &bytes.Buffer{}
if err := jsonWriter.VersionBody(apiOp, buffer, item); err != nil {
if err := captureWriter.VersionBody(apiOp, nil, item.Object); err != nil {
cancel()
continue
}
if err := writeData(c, header, buffer.Bytes()); err != nil {
item.Object = types.ToAPI(capture.Object)
if err := writeData(c, item); err != nil {
cancel()
}
}
case <-t.C:
if err := writeData(c, `{"name":"ping","data":`, []byte("{}")); err != nil {
if err := writeData(c, types.APIEvent{Name: "ping"}); err != nil {
cancel()
}
}
@@ -130,35 +128,29 @@ func handler(apiOp *types.APIRequest) error {
return nil
}
func writeData(c *websocket.Conn, header string, buf []byte) error {
func writeData(c *websocket.Conn, event types.APIEvent) error {
event.Data = event.Object.Raw()
messageWriter, err := c.NextWriter(websocket.TextMessage)
if err != nil {
return err
}
defer messageWriter.Close()
if _, err := messageWriter.Write([]byte(header)); err != nil {
return err
}
if _, err := messageWriter.Write(buf); err != nil {
return err
}
if _, err := messageWriter.Write([]byte(`}`)); err != nil {
return err
}
return messageWriter.Close()
return json.NewEncoder(messageWriter).Encode(event)
}
func watch(apiOp *types.APIRequest, schema *types.Schema, opts *types.QueryOptions) (chan types.APIObject, error) {
func watch(apiOp *types.APIRequest, schema *types.Schema, opts *types.QueryOptions) (chan types.APIEvent, error) {
c, err := schema.Store.Watch(apiOp, schema, opts)
if err != nil {
return nil, err
}
return types.APIChan(c, func(data types.APIObject) types.APIObject {
return apiOp.FilterObject(nil, schema, data)
return types.APIChan(c, func(data types.APIEvent) types.APIEvent {
data.Object = apiOp.FilterObject(nil, schema, data.Object)
return data
}), nil
}
func streamStore(ctx context.Context, eg *errgroup.Group, apiOp *types.APIRequest, schema *types.Schema, result chan types.APIObject) {
func streamStore(ctx context.Context, eg *errgroup.Group, apiOp *types.APIRequest, schema *types.Schema, result chan types.APIEvent) {
eg.Go(func() error {
opts := parse.QueryOptions(apiOp, schema)
events, err := watch(apiOp, schema, &opts)
@@ -185,3 +177,12 @@ func matches(items []string, item string) bool {
}
return slice.ContainsString(items, item)
}
type Capture struct {
Object interface{}
}
func (c *Capture) Encoder(w io.Writer, obj interface{}) error {
c.Object = obj
return nil
}

View File

@@ -189,11 +189,20 @@ type Store interface {
Create(apiOp *APIRequest, schema *Schema, data APIObject) (APIObject, error)
Update(apiOp *APIRequest, schema *Schema, data APIObject, id string) (APIObject, error)
Delete(apiOp *APIRequest, schema *Schema, id string) (APIObject, error)
Watch(apiOp *APIRequest, schema *Schema, opt *QueryOptions) (chan APIObject, error)
Watch(apiOp *APIRequest, schema *Schema, opt *QueryOptions) (chan APIEvent, error)
}
type APIEvent struct {
Name string `json:"name,omitempty"`
Count int `json:"count,omitempty"`
Index int `json:"index,omitempty"`
Object APIObject `json:"-"`
// Data should be used
Data interface{} `json:"data,omitempty"`
}
type APIObject struct {
Object interface{} `json:",embed"`
Object interface{} `json:",inline"`
}
func ToAPI(data interface{}) APIObject {
@@ -277,11 +286,11 @@ func Namespace(data map[string]interface{}) string {
return convert.ToString(values.GetValueN(data, "metadata", "namespace"))
}
func APIChan(c <-chan APIObject, f func(APIObject) APIObject) chan APIObject {
func APIChan(c <-chan APIEvent, f func(APIEvent) APIEvent) chan APIEvent {
if c == nil {
return nil
}
result := make(chan APIObject)
result := make(chan APIEvent)
go func() {
for data := range c {
modified := f(data)

View File

@@ -3,67 +3,53 @@ package urlbuilder
import (
"bytes"
"fmt"
"net"
"net/http"
"net/url"
"strings"
)
func ParseRequestURL(r *http.Request) string {
// Get url from standard headers
requestURL := getURLFromStandardHeaders(r)
if requestURL != "" {
return requestURL
}
// Use incoming url
scheme := "http"
if r.TLS != nil {
scheme = "https"
}
return fmt.Sprintf("%s://%s%s%s", scheme, r.Host, r.Header.Get(PrefixHeader), r.URL.Path)
scheme := getScheme(r)
host := getHost(r, scheme)
return fmt.Sprintf("%s://%s%s%s", scheme, host, r.Header.Get(PrefixHeader), r.URL.Path)
}
func getURLFromStandardHeaders(r *http.Request) string {
xForwardedProto := getOverrideHeader(r, ForwardedProtoHeader, "")
if xForwardedProto == "" {
return ""
}
host := getOverrideHeader(r, ForwardedHostHeader, "")
func getHost(r *http.Request, scheme string) string {
host := strings.Split(r.Header.Get(ForwardedHostHeader), ",")[0]
if host == "" {
host = r.Host
}
if host == "" {
return ""
port := r.Header.Get(ForwardedPortHeader)
if port == "" {
return host
}
port := getOverrideHeader(r, ForwardedPortHeader, "")
if port == "443" || port == "80" {
port = "" // Don't include default ports in url
if port == "80" && scheme == "http" {
return host
}
if port != "" && strings.Contains(host, ":") {
// Have to strip the port that is in the host. Handle IPv6, which has this format: [::1]:8080
if (strings.HasPrefix(host, "[") && strings.Contains(host, "]:")) || !strings.HasPrefix(host, "[") {
host = host[0:strings.LastIndex(host, ":")]
}
if port == "443" && scheme == "http" {
return host
}
if port != "" {
port = ":" + port
hostname, _, err := net.SplitHostPort(host)
if err != nil {
return host
}
return fmt.Sprintf("%s://%s%s%s%s", xForwardedProto, host, port, r.Header.Get(PrefixHeader), r.URL.Path)
return strings.Join([]string{hostname, port}, ":")
}
func getOverrideHeader(r *http.Request, header string, defaultValue string) string {
// Need to handle comma separated hosts in X-Forwarded-For
value := r.Header.Get(header)
if value != "" {
return strings.TrimSpace(strings.Split(value, ",")[0])
func getScheme(r *http.Request) string {
scheme := r.Header.Get(ForwardedProtoHeader)
if scheme != "" {
return scheme
} else if r.TLS != nil {
return "https"
}
return defaultValue
return "http"
}
func ParseResponseURLBase(currentURL string, r *http.Request) (string, error) {