1
0
mirror of https://github.com/rancher/norman.git synced 2025-08-04 00:49:53 +00:00
norman/pkg/subscribe/handler.go

172 lines
3.9 KiB
Go
Raw Normal View History

2017-12-23 06:13:35 +00:00
package subscribe
import (
"bytes"
"context"
"errors"
2018-01-12 10:13:35 +00:00
"time"
2017-12-23 06:13:35 +00:00
"github.com/gorilla/websocket"
"github.com/rancher/norman/api/writer"
"github.com/rancher/norman/httperror"
"github.com/rancher/norman/parse"
"github.com/rancher/norman/types"
"github.com/rancher/norman/types/convert"
"github.com/rancher/norman/types/slice"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)
var upgrader = websocket.Upgrader{}
type Subscribe struct {
ResourceTypes []string
APIVersions []string
2018-02-13 23:42:40 +00:00
ProjectID string `norman:"type=reference[/v3/schemas/project]"`
2017-12-23 06:13:35 +00:00
}
2018-01-30 23:49:37 +00:00
func Handler(apiContext *types.APIContext, _ types.RequestHandler) error {
2017-12-23 06:13:35 +00:00
err := handler(apiContext)
if err != nil {
logrus.Errorf("Error during subscribe %v", err)
}
return err
}
2018-01-17 04:46:20 +00:00
func getMatchingSchemas(apiContext *types.APIContext) []*types.Schema {
2017-12-23 06:13:35 +00:00
resourceTypes := apiContext.Request.URL.Query()["resourceTypes"]
var schemas []*types.Schema
2018-02-09 20:31:12 +00:00
for _, schema := range apiContext.Schemas.SchemasForVersion(*apiContext.Version) {
2017-12-23 06:13:35 +00:00
if !matches(resourceTypes, schema.ID) {
continue
}
if schema.Store != nil {
schemas = append(schemas, schema)
}
}
2018-01-17 04:46:20 +00:00
return schemas
}
func handler(apiContext *types.APIContext) error {
schemas := getMatchingSchemas(apiContext)
2017-12-23 06:13:35 +00:00
if len(schemas) == 0 {
return httperror.NewAPIError(httperror.NotFound, "no resources types matched")
}
2018-01-17 04:46:20 +00:00
c, err := upgrader.Upgrade(apiContext.Response, apiContext.Request, nil)
if err != nil {
return err
}
defer c.Close()
cancelCtx, cancel := context.WithCancel(apiContext.Request.Context())
readerGroup, ctx := errgroup.WithContext(cancelCtx)
apiContext.Request = apiContext.Request.WithContext(ctx)
go func() {
for {
if _, _, err := c.NextReader(); err != nil {
cancel()
c.Close()
break
}
}
}()
2017-12-23 06:13:35 +00:00
events := make(chan map[string]interface{})
for _, schema := range schemas {
streamStore(ctx, readerGroup, apiContext, schema, events)
}
go func() {
readerGroup.Wait()
close(events)
}()
jsonWriter := writer.JSONResponseWriter{}
2018-01-12 10:13:35 +00:00
t := time.NewTicker(5 * time.Second)
defer t.Stop()
done := false
for !done {
select {
case item, ok := <-events:
if !ok {
done = true
break
}
2018-01-12 10:13:35 +00:00
header := `{"name":"resource.change","data":`
if item[".removed"] == true {
header = `{"name":"resource.remove","data":`
2017-12-23 06:13:35 +00:00
}
2018-01-12 10:13:35 +00:00
schema := apiContext.Schemas.Schema(apiContext.Version, convert.ToString(item["type"]))
if schema != nil {
buffer := &bytes.Buffer{}
if err := jsonWriter.VersionBody(apiContext, &schema.Version, buffer, item); err != nil {
return err
}
if err := writeData(c, header, buffer.Bytes()); err != nil {
return err
}
2017-12-23 06:13:35 +00:00
}
2018-01-12 10:13:35 +00:00
case <-t.C:
if err := writeData(c, `{"name":"ping","data":`, []byte("{}")); err != nil {
2017-12-23 06:13:35 +00:00
return err
}
}
}
// no point in ever returning null because the connection is hijacked and we can't write it
return nil
2017-12-23 06:13:35 +00:00
}
2018-01-12 10:13:35 +00:00
func writeData(c *websocket.Conn, header string, buf []byte) error {
messageWriter, err := c.NextWriter(websocket.TextMessage)
if err != nil {
return err
}
if _, err := messageWriter.Write([]byte(header)); err != nil {
return err
}
if _, err := messageWriter.Write(buf); err != nil {
return err
}
if _, err := messageWriter.Write([]byte(`}`)); err != nil {
return err
}
2018-01-17 22:42:31 +00:00
return messageWriter.Close()
2018-01-12 10:13:35 +00:00
}
2017-12-23 06:13:35 +00:00
func streamStore(ctx context.Context, eg *errgroup.Group, apiContext *types.APIContext, schema *types.Schema, result chan map[string]interface{}) {
eg.Go(func() error {
opts := parse.QueryOptions(apiContext, schema)
2017-12-28 15:47:10 +00:00
events, err := schema.Store.Watch(apiContext, schema, &opts)
if err != nil || events == nil {
2018-02-09 20:31:12 +00:00
if err != nil {
logrus.Errorf("failed on subscribe %s: %v", schema.ID, err)
}
2017-12-23 06:13:35 +00:00
return err
}
2018-02-09 20:31:12 +00:00
logrus.Debugf("watching %s", schema.ID)
for e := range events {
result <- e
2017-12-23 06:13:35 +00:00
}
return errors.New("disconnect")
2017-12-23 06:13:35 +00:00
})
}
func matches(items []string, item string) bool {
if len(items) == 0 {
return true
}
return slice.ContainsString(items, item)
}