Refactor subscribe

This commit is contained in:
Darren Shepherd
2019-08-12 12:42:37 -07:00
parent e71d8fb0df
commit 01395a0ace
19 changed files with 882 additions and 184 deletions

View File

@@ -19,7 +19,9 @@ func QueryOptions(apiOp *types.APIRequest, schema *types.Schema) types.QueryOpti
return types.QueryOptions{}
}
result := &types.QueryOptions{}
result := &types.QueryOptions{
Options: map[string]string{},
}
result.Sort = parseSort(schema, apiOp)
result.Pagination = parsePagination(apiOp)

View File

@@ -27,6 +27,6 @@ func (e *Store) Update(apiOp *types.APIRequest, schema *types.Schema, data types
return types.APIObject{}, nil
}
func (e *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) {
func (e *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, wr types.WatchRequest) (chan types.APIEvent, error) {
return nil, nil
}

View File

@@ -38,8 +38,8 @@ func (e *errorStore) Delete(apiOp *types.APIRequest, schema *types.Schema, id st
}
func (e *errorStore) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) {
data, err := e.Store.Watch(apiOp, schema, opt)
func (e *errorStore) Watch(apiOp *types.APIRequest, schema *types.Schema, wr types.WatchRequest) (chan types.APIEvent, error) {
data, err := e.Store.Watch(apiOp, schema, wr)
return data, translateError(err)
}

View File

@@ -3,6 +3,8 @@ package proxy
import (
"sync"
errors2 "github.com/pkg/errors"
"github.com/rancher/norman/pkg/types"
"github.com/rancher/norman/pkg/types/convert/merge"
"github.com/rancher/norman/pkg/types/values"
@@ -107,61 +109,78 @@ func (s *Store) listNamespace(namespace string, apiOp types.APIRequest, schema *
return k8sClient.List(metav1.ListOptions{})
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) {
k8sClient, err := s.clientGetter.Client(apiOp, schema)
if err != nil {
return nil, err
func returnErr(err error, c chan types.APIEvent) {
c <- types.APIEvent{
Name: "resource.error",
Error: err,
}
}
func (s *Store) listAndWatch(apiOp *types.APIRequest, k8sClient dynamic.ResourceInterface, schema *types.Schema, w types.WatchRequest, result chan types.APIEvent) {
rev := w.Revision
if rev == "" {
list, err := k8sClient.List(metav1.ListOptions{
Limit: 1,
})
if err != nil {
returnErr(errors2.Wrapf(err, "failed to list %s", schema.ID), result)
return
}
rev = list.GetResourceVersion()
}
list, err := k8sClient.List(metav1.ListOptions{})
timeout := int64(60 * 30)
watcher, err := k8sClient.Watch(metav1.ListOptions{
Watch: true,
TimeoutSeconds: &timeout,
ResourceVersion: rev,
})
if err != nil {
returnErr(errors2.Wrapf(err, "stopping watch for %s: %v", schema.ID), result)
return
}
defer watcher.Stop()
logrus.Debugf("opening watcher for %s", schema.ID)
go func() {
<-apiOp.Request.Context().Done()
watcher.Stop()
}()
for event := range watcher.ResultChan() {
data := event.Object.(*unstructured.Unstructured)
result <- s.toAPIEvent(apiOp, schema, event.Type, data)
}
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, w types.WatchRequest) (chan types.APIEvent, error) {
k8sClient, err := s.clientGetter.Client(apiOp, schema)
if err != nil {
return nil, err
}
result := make(chan types.APIEvent)
go func() {
defer func() {
logrus.Debugf("closing watcher for %s", schema.ID)
close(result)
}()
for i, obj := range list.Items {
result <- s.toAPIEvent(apiOp, schema, i, len(list.Items), false, &obj)
}
timeout := int64(60 * 30)
watcher, err := k8sClient.Watch(metav1.ListOptions{
Watch: true,
TimeoutSeconds: &timeout,
ResourceVersion: list.GetResourceVersion(),
})
if err != nil {
logrus.Debugf("stopping watch for %s: %v", schema.ID, err)
return
}
defer watcher.Stop()
for event := range watcher.ResultChan() {
data := event.Object.(*unstructured.Unstructured)
result <- s.toAPIEvent(apiOp, schema, 0, 0, event.Type == watch.Deleted, data)
}
s.listAndWatch(apiOp, k8sClient, schema, w, result)
logrus.Debugf("closing watcher for %s", schema.ID)
close(result)
}()
return result, nil
}
func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.Schema, index, count int, remove bool, obj *unstructured.Unstructured) types.APIEvent {
func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.Schema, et watch.EventType, obj *unstructured.Unstructured) types.APIEvent {
name := "resource.change"
if remove && obj.Object != nil {
switch et {
case watch.Deleted:
name = "resource.remove"
case watch.Added:
name = "resource.create"
}
s.fromInternal(apiOp, schema, obj.Object)
return types.APIEvent{
Name: name,
Count: count,
Index: index,
Object: types.ToAPI(obj.Object),
}
}

View File

@@ -34,7 +34,7 @@ func (s *Store) ByID(apiOp *types.APIRequest, schema *types.Schema, id string) (
return types.APIObject{}, httperror.NewAPIError(httperror.NotFound, "no such schema")
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, opt *types.QueryOptions) (chan types.APIEvent, error) {
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.Schema, wr types.WatchRequest) (chan types.APIEvent, error) {
return nil, nil
}

View File

@@ -0,0 +1,57 @@
package subscribe
import (
"io"
"github.com/rancher/norman/pkg/api/writer"
"github.com/rancher/norman/pkg/types"
)
type Converter struct {
writer.EncodingResponseWriter
apiOp *types.APIRequest
obj interface{}
}
func MarshallObject(apiOp *types.APIRequest, event types.APIEvent) types.APIEvent {
if event.Error != nil {
return event
}
if event.Object.IsNil() {
return event
}
data, err := newConverter(apiOp).ToAPIObject(event.Object)
if err != nil {
event.Error = err
return event
}
event.Data = data.Raw()
return event
}
func newConverter(apiOp *types.APIRequest) *Converter {
c := &Converter{
apiOp: apiOp,
}
c.EncodingResponseWriter = writer.EncodingResponseWriter{
ContentType: "application/json",
Encoder: c.Encoder,
}
return c
}
func (c *Converter) ToAPIObject(data interface{}) (types.APIObject, error) {
c.obj = nil
if err := c.VersionBody(c.apiOp, nil, data); err != nil {
return types.APIObject{}, err
}
return types.ToAPI(c.obj), nil
}
func (c *Converter) Encoder(w io.Writer, obj interface{}) error {
c.obj = obj
return nil
}

View File

@@ -1,29 +1,23 @@
package subscribe
import (
"context"
"encoding/json"
"errors"
"io"
"time"
"github.com/gorilla/websocket"
"github.com/rancher/norman/pkg/api/writer"
"github.com/rancher/norman/pkg/httperror"
"github.com/rancher/norman/pkg/parse"
"github.com/rancher/norman/pkg/types"
"github.com/rancher/norman/pkg/types/convert"
"github.com/rancher/norman/pkg/types/slice"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
var upgrader = websocket.Upgrader{}
var upgrader = websocket.Upgrader{
HandshakeTimeout: 60 * time.Second,
EnableCompression: true,
}
type Subscribe struct {
ResourceTypes []string
APIVersions []string
ProjectID string `norman:"type=reference[/v3/schemas/project]"`
Stop bool `json:"stop,omitempty"`
ResourceType string `json:"resourceType,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty"`
}
func Handler(apiOp *types.APIRequest) (types.APIObject, error) {
@@ -34,102 +28,46 @@ func Handler(apiOp *types.APIRequest) (types.APIObject, error) {
return types.APIObject{}, err
}
func getMatchingSchemas(apiOp *types.APIRequest) []*types.Schema {
resourceTypes := apiOp.Request.URL.Query()["resourceTypes"]
var schemas []*types.Schema
for _, schema := range apiOp.Schemas.Schemas() {
if !matches(resourceTypes, schema.ID) {
continue
}
if schema.Store != nil {
schemas = append(schemas, schema)
}
}
return schemas
}
func handler(apiOp *types.APIRequest) error {
schemas := getMatchingSchemas(apiOp)
if len(schemas) == 0 {
return httperror.NewAPIError(httperror.NotFound, "no resources types matched")
}
c, err := upgrader.Upgrade(apiOp.Response, apiOp.Request, nil)
if err != nil {
return err
}
defer c.Close()
cancelCtx, cancel := context.WithCancel(apiOp.Request.Context())
readerGroup, ctx := errgroup.WithContext(cancelCtx)
apiOp.Request = apiOp.Request.WithContext(ctx)
watches := NewWatchSession(apiOp)
defer watches.Close()
go func() {
for {
if _, _, err := c.NextReader(); err != nil {
cancel()
c.Close()
break
}
}
}()
events := make(chan types.APIEvent)
for _, schema := range schemas {
if apiOp.AccessControl.CanWatch(apiOp, schema) == nil {
streamStore(ctx, readerGroup, apiOp, schema, events)
}
}
go func() {
readerGroup.Wait()
close(events)
}()
capture := &Capture{}
captureWriter := writer.EncodingResponseWriter{
ContentType: "application/json",
Encoder: capture.Encoder,
}
t := time.NewTicker(60 * time.Second)
events := watches.Watch(c)
t := time.NewTicker(30 * time.Second)
defer t.Stop()
done := false
for !done {
for {
select {
case item, ok := <-events:
case event, ok := <-events:
if !ok {
done = true
break
return nil
}
schema := apiOp.Schemas.Schema(convert.ToString(item.Object.Map()["type"]))
if schema != nil {
if err := captureWriter.VersionBody(apiOp, nil, item.Object); err != nil {
cancel()
continue
}
item.Object = types.ToAPI(capture.Object)
if err := writeData(c, item); err != nil {
cancel()
}
if err := writeData(apiOp, c, event); err != nil {
return err
}
case <-t.C:
if err := writeData(c, types.APIEvent{Name: "ping"}); err != nil {
cancel()
if err := writeData(apiOp, c, types.APIEvent{Name: "ping"}); err != nil {
return err
}
}
}
// no point in ever returning null because the connection is hijacked and we can't write it
return nil
}
func writeData(c *websocket.Conn, event types.APIEvent) error {
event.Data = event.Object.Raw()
func writeData(apiOp *types.APIRequest, c *websocket.Conn, event types.APIEvent) error {
event = MarshallObject(apiOp, event)
if event.Error != nil {
event.Name = "resource.error"
event.Data = map[string]interface{}{
"error": event.Error.Error(),
}
}
messageWriter, err := c.NextWriter(websocket.TextMessage)
if err != nil {
return err
@@ -138,51 +76,3 @@ func writeData(c *websocket.Conn, event types.APIEvent) error {
return json.NewEncoder(messageWriter).Encode(event)
}
func watch(apiOp *types.APIRequest, schema *types.Schema, opts *types.QueryOptions) (chan types.APIEvent, error) {
c, err := schema.Store.Watch(apiOp, schema, opts)
if err != nil {
return nil, err
}
return types.APIChan(c, func(data types.APIEvent) types.APIEvent {
data.Object = apiOp.FilterObject(nil, schema, data.Object)
return data
}), nil
}
func streamStore(ctx context.Context, eg *errgroup.Group, apiOp *types.APIRequest, schema *types.Schema, result chan types.APIEvent) {
eg.Go(func() error {
opts := parse.QueryOptions(apiOp, schema)
events, err := watch(apiOp, schema, &opts)
if err != nil || events == nil {
if err != nil {
logrus.Errorf("failed on subscribe %s: %v", schema.ID, err)
}
return err
}
logrus.Debugf("watching %s", schema.ID)
for e := range events {
result <- e
}
return errors.New("disconnect")
})
}
func matches(items []string, item string) bool {
if len(items) == 0 {
return true
}
return slice.ContainsString(items, item)
}
type Capture struct {
Object interface{}
}
func (c *Capture) Encoder(w io.Writer, obj interface{}) error {
c.Object = obj
return nil
}

View File

@@ -0,0 +1,140 @@
package subscribe
import (
"context"
"encoding/json"
"fmt"
"sync"
"github.com/gorilla/websocket"
"github.com/rancher/norman/pkg/types"
)
type WatchSession struct {
sync.Mutex
apiOp *types.APIRequest
watchers map[string]func()
wg sync.WaitGroup
ctx context.Context
cancel func()
}
func (s *WatchSession) stop(id string, resp chan<- types.APIEvent) {
s.Lock()
defer s.Unlock()
if cancel, ok := s.watchers[id]; ok {
cancel()
resp <- types.APIEvent{
Name: "resource.stop",
ResourceType: id,
}
}
delete(s.watchers, id)
}
func (s *WatchSession) add(resourceType, revision string, resp chan<- types.APIEvent) {
s.Lock()
defer s.Unlock()
ctx, cancel := context.WithCancel(s.ctx)
s.watchers[resourceType] = cancel
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.stop(resourceType, resp)
if err := s.stream(ctx, resourceType, revision, resp); err != nil {
sendErr(resp, err, resourceType)
}
}()
}
func (s *WatchSession) stream(ctx context.Context, resourceType, revision string, result chan<- types.APIEvent) error {
schema := s.apiOp.Schemas.Schema(resourceType)
if schema == nil {
return fmt.Errorf("failed to find schema %s", resourceType)
} else if schema.Store == nil {
return fmt.Errorf("schema %s does not support watching", resourceType)
}
if err := s.apiOp.AccessControl.CanWatch(s.apiOp, schema); err != nil {
return err
}
c, err := schema.Store.Watch(s.apiOp.WithContext(ctx), schema, types.WatchRequest{Revision: revision})
if err != nil {
return err
}
result <- types.APIEvent{
Name: "resource.start",
ResourceType: resourceType,
}
for event := range c {
result <- event
}
return nil
}
func NewWatchSession(apiOp *types.APIRequest) *WatchSession {
ws := &WatchSession{
apiOp: apiOp,
watchers: map[string]func(){},
}
ws.ctx, ws.cancel = context.WithCancel(apiOp.Request.Context())
return ws
}
func (s *WatchSession) Watch(conn *websocket.Conn) <-chan types.APIEvent {
result := make(chan types.APIEvent, 100)
go func() {
defer close(result)
if err := s.watch(conn, result); err != nil {
sendErr(result, err, "")
}
}()
return result
}
func (s *WatchSession) Close() {
s.cancel()
s.wg.Wait()
}
func (s *WatchSession) watch(conn *websocket.Conn, resp chan types.APIEvent) error {
defer s.wg.Wait()
defer s.cancel()
for {
_, r, err := conn.NextReader()
if err != nil {
return err
}
var sub Subscribe
if err := json.NewDecoder(r).Decode(&sub); err != nil {
sendErr(resp, err, "")
continue
}
if sub.Stop {
s.stop(sub.ResourceType, resp)
} else if _, ok := s.watchers[sub.ResourceType]; !ok {
s.add(sub.ResourceType, sub.ResourceVersion, resp)
}
}
}
func sendErr(resp chan<- types.APIEvent, err error, resourceType string) {
resp <- types.APIEvent{
ResourceType: resourceType,
Error: err,
}
}

View File

@@ -3,7 +3,7 @@ package definition
import (
"strings"
convert2 "github.com/rancher/norman/pkg/types/convert"
"github.com/rancher/norman/pkg/types/convert"
)
func IsMapType(fieldType string) bool {
@@ -41,5 +41,5 @@ func GetShortTypeFromFull(fullType string) string {
}
func GetFullType(data map[string]interface{}) string {
return convert2.ToString(data["type"])
return convert.ToString(data["type"])
}

View File

@@ -1,6 +1,7 @@
package types
import (
"context"
"encoding/json"
"net/http"
"net/url"
@@ -112,6 +113,12 @@ type APIRequest struct {
Response http.ResponseWriter
}
func (r *APIRequest) WithContext(ctx context.Context) *APIRequest {
result := *r
result.Request = result.Request.WithContext(ctx)
return &result
}
func (r *APIRequest) GetUser() string {
user, ok := request.UserFrom(r.Request.Context())
if ok {
@@ -160,6 +167,7 @@ type QueryOptions struct {
Sort Sort
Pagination *Pagination
Conditions []*QueryCondition
Options map[string]string
}
type ReferenceValidator interface {
@@ -189,14 +197,18 @@ type Store interface {
Create(apiOp *APIRequest, schema *Schema, data APIObject) (APIObject, error)
Update(apiOp *APIRequest, schema *Schema, data APIObject, id string) (APIObject, error)
Delete(apiOp *APIRequest, schema *Schema, id string) (APIObject, error)
Watch(apiOp *APIRequest, schema *Schema, opt *QueryOptions) (chan APIEvent, error)
Watch(apiOp *APIRequest, schema *Schema, w WatchRequest) (chan APIEvent, error)
}
type WatchRequest struct {
Revision string
}
type APIEvent struct {
Name string `json:"name,omitempty"`
Count int `json:"count,omitempty"`
Index int `json:"index,omitempty"`
Object APIObject `json:"-"`
Name string `json:"name,omitempty"`
ResourceType string `json:"resourceType,omitempty"`
Object APIObject `json:"-"`
Error error `json:"-"`
// Data should be used
Data interface{} `json:"data,omitempty"`
}