2017-12-23 06:13:35 +00:00
|
|
|
package subscribe
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"context"
|
|
|
|
|
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"github.com/rancher/norman/api/writer"
|
|
|
|
"github.com/rancher/norman/httperror"
|
|
|
|
"github.com/rancher/norman/parse"
|
|
|
|
"github.com/rancher/norman/types"
|
|
|
|
"github.com/rancher/norman/types/convert"
|
|
|
|
"github.com/rancher/norman/types/slice"
|
|
|
|
"github.com/sirupsen/logrus"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
)
|
|
|
|
|
|
|
|
var upgrader = websocket.Upgrader{}
|
|
|
|
|
|
|
|
type Subscribe struct {
|
|
|
|
ResourceTypes []string
|
|
|
|
APIVersions []string
|
|
|
|
ProjectID string `norman:"type=reference[project]"`
|
|
|
|
}
|
|
|
|
|
|
|
|
func Handler(apiContext *types.APIContext) error {
|
|
|
|
err := handler(apiContext)
|
|
|
|
if err != nil {
|
|
|
|
logrus.Errorf("Error during subscribe %v", err)
|
|
|
|
}
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func handler(apiContext *types.APIContext) error {
|
|
|
|
c, err := upgrader.Upgrade(apiContext.Response, apiContext.Request, nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer c.Close()
|
|
|
|
|
|
|
|
cancelCtx, cancel := context.WithCancel(apiContext.Request.Context())
|
|
|
|
apiContext.Request = apiContext.Request.WithContext(cancelCtx)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for {
|
|
|
|
if _, _, err := c.NextReader(); err != nil {
|
|
|
|
cancel()
|
|
|
|
c.Close()
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
apiVersions := apiContext.Request.URL.Query()["apiVersions"]
|
|
|
|
resourceTypes := apiContext.Request.URL.Query()["resourceTypes"]
|
|
|
|
|
|
|
|
var schemas []*types.Schema
|
|
|
|
for _, schema := range apiContext.Schemas.Schemas() {
|
|
|
|
if !matches(apiVersions, schema.Version.Path) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if !matches(resourceTypes, schema.ID) {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if schema.Store != nil {
|
|
|
|
schemas = append(schemas, schema)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if len(schemas) == 0 {
|
|
|
|
return httperror.NewAPIError(httperror.NotFound, "no resources types matched")
|
|
|
|
}
|
|
|
|
|
|
|
|
readerGroup, ctx := errgroup.WithContext(apiContext.Request.Context())
|
|
|
|
events := make(chan map[string]interface{})
|
|
|
|
for _, schema := range schemas {
|
|
|
|
streamStore(ctx, readerGroup, apiContext, schema, events)
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
readerGroup.Wait()
|
|
|
|
close(events)
|
|
|
|
}()
|
|
|
|
|
|
|
|
jsonWriter := writer.JSONResponseWriter{}
|
|
|
|
for item := range events {
|
2017-12-28 15:48:01 +00:00
|
|
|
header := `{"name":"resource.change","data":`
|
|
|
|
if item[".removed"] == true {
|
2017-12-29 22:07:45 +00:00
|
|
|
header = `{"name":"resource.remove","data":`
|
2017-12-28 15:48:01 +00:00
|
|
|
}
|
2017-12-23 06:13:35 +00:00
|
|
|
schema := apiContext.Schemas.Schema(apiContext.Version, convert.ToString(item["type"]))
|
|
|
|
if schema != nil {
|
|
|
|
buffer := &bytes.Buffer{}
|
|
|
|
if err := jsonWriter.VersionBody(apiContext, &schema.Version, buffer, item); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-12-29 22:07:45 +00:00
|
|
|
|
|
|
|
messageWriter, err := c.NextWriter(websocket.TextMessage)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-12-28 15:48:01 +00:00
|
|
|
if _, err := messageWriter.Write([]byte(header)); err != nil {
|
2017-12-23 06:13:35 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := messageWriter.Write(buffer.Bytes()); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := messageWriter.Write([]byte(`}`)); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Group is already done at this point because of goroutine above, this is just to send the error if needed
|
|
|
|
return readerGroup.Wait()
|
|
|
|
}
|
|
|
|
|
|
|
|
func streamStore(ctx context.Context, eg *errgroup.Group, apiContext *types.APIContext, schema *types.Schema, result chan map[string]interface{}) {
|
|
|
|
eg.Go(func() error {
|
|
|
|
opts := parse.QueryOptions(apiContext, schema)
|
2017-12-28 15:47:10 +00:00
|
|
|
events, err := schema.Store.Watch(apiContext, schema, &opts)
|
2017-12-23 06:13:35 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if events == nil {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case e, ok := <-events:
|
|
|
|
if !ok {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
select {
|
|
|
|
case result <- e:
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
|
|
return ctx.Err()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func matches(items []string, item string) bool {
|
|
|
|
if len(items) == 0 {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return slice.ContainsString(items, item)
|
|
|
|
}
|