diff --git a/pkg/stores/partition/parallel.go b/pkg/stores/partition/parallel.go index 4e9f3e5f..49f380c4 100644 --- a/pkg/stores/partition/parallel.go +++ b/pkg/stores/partition/parallel.go @@ -5,9 +5,9 @@ import ( "encoding/base64" "encoding/json" - "github.com/rancher/apiserver/pkg/types" "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" ) // Partition represents a named grouping of kubernetes resources, @@ -33,7 +33,7 @@ type ParallelPartitionLister struct { } // PartitionLister lists objects for one partition. -type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) +type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error) // Err returns the latest error encountered. func (p *ParallelPartitionLister) Err() error { @@ -72,7 +72,7 @@ func indexOrZero(partitions []Partition, name string) int { // List returns a stream of objects up to the requested limit. // If the continue token is not empty, it decodes it and returns the stream // starting at the indicated marker. -func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []types.APIObject, error) { +func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []unstructured.Unstructured, error) { var state listState if resume != "" { bytes, err := base64.StdEncoding.DecodeString(resume) @@ -88,7 +88,7 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st } } - result := make(chan []types.APIObject) + result := make(chan []unstructured.Unstructured) go p.feeder(ctx, state, limit, result) return result, nil } @@ -120,7 +120,7 @@ type listState struct { // 100000, the result is truncated and a continue token is generated that // indicates the partition and offset for the client to start on in the next // request. -func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) { +func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []unstructured.Unstructured) { var ( sem = semaphore.NewWeighted(p.Concurrency) capacity = limit @@ -183,25 +183,25 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l } if state.Revision == "" { - state.Revision = list.Revision + state.Revision = list.GetResourceVersion() } if p.revision == "" { - p.revision = list.Revision + p.revision = list.GetResourceVersion() } // We have already seen the first objects in the list, truncate up to the offset. - if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) { - list.Objects = list.Objects[state.Offset:] + if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Items) { + list.Items = list.Items[state.Offset:] } // Case 1: the capacity has been reached across all goroutines but the list is still only partial, // so save the state so that the next page can be requested later. - if len(list.Objects) > capacity { - result <- list.Objects[:capacity] + if len(list.Items) > capacity { + result <- list.Items[:capacity] // save state to redo this list at this offset p.state = &listState{ - Revision: list.Revision, + Revision: list.GetResourceVersion(), PartitionName: partition.Name(), Continue: cont, Offset: capacity, @@ -210,16 +210,16 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l capacity = 0 return nil } - result <- list.Objects - capacity -= len(list.Objects) + result <- list.Items + capacity -= len(list.Items) // Case 2: all objects have been returned, we are done. - if list.Continue == "" { + if list.GetContinue() == "" { return nil } // Case 3: we started at an offset and truncated the list to skip the objects up to the offset. // We're not yet up to capacity and have not retrieved every object, // so loop again and get more data. - state.Continue = list.Continue + state.Continue = list.GetContinue() state.PartitionName = partition.Name() state.Offset = 0 } diff --git a/pkg/stores/partition/store.go b/pkg/stores/partition/store.go index 4bb53409..ccacd080 100644 --- a/pkg/stores/partition/store.go +++ b/pkg/stores/partition/store.go @@ -1,12 +1,21 @@ +// Package partition implements a store with parallel partitioning of data +// so that segmented data can be concurrently collected and returned as a single data set. package partition import ( "context" + "fmt" "net/http" + "reflect" "strconv" "github.com/rancher/apiserver/pkg/types" "golang.org/x/sync/errgroup" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" ) const defaultLimit = 100000 @@ -15,7 +24,7 @@ const defaultLimit = 100000 type Partitioner interface { Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error) All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error) - Store(apiOp *types.APIRequest, partition Partition) (types.Store, error) + Store(apiOp *types.APIRequest, partition Partition) (UnstructuredStore, error) } // Store implements types.Store for partitions. @@ -23,7 +32,17 @@ type Store struct { Partitioner Partitioner } -func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (types.Store, error) { +// UnstructuredStore is like types.Store but deals in k8s unstructured objects instead of apiserver types. +type UnstructuredStore interface { + ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) + List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) + Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error) + Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error) + Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) + Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) +} + +func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (UnstructuredStore, error) { p, err := s.Partitioner.Lookup(apiOp, schema, verb, id) if err != nil { return nil, err @@ -39,7 +58,11 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri return types.APIObject{}, err } - return target.Delete(apiOp, schema, id) + obj, err := target.Delete(apiOp, schema, id) + if err != nil { + return types.APIObject{}, err + } + return toAPI(schema, obj), nil } // ByID looks up a single object by its ID. @@ -49,14 +72,18 @@ func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string return types.APIObject{}, err } - return target.ByID(apiOp, schema, id) + obj, err := target.ByID(apiOp, schema, id) + if err != nil { + return types.APIObject{}, err + } + return toAPI(schema, obj), nil } func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition, - cont string, revision string, limit int) (types.APIObjectList, error) { + cont string, revision string, limit int) (*unstructured.UnstructuredList, error) { store, err := s.Partitioner.Store(apiOp, partition) if err != nil { - return types.APIObjectList{}, err + return nil, err } req := apiOp.Clone() @@ -88,7 +115,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP } lister := ParallelPartitionLister{ - Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) { + Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error) { return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit) }, Concurrency: 3, @@ -104,7 +131,10 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP } for items := range list { - result.Objects = append(result.Objects, items...) + for _, item := range items { + item := item + result.Objects = append(result.Objects, toAPI(schema, &item)) + } } result.Revision = lister.Revision() @@ -119,7 +149,11 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty return types.APIObject{}, err } - return target.Create(apiOp, schema, data) + obj, err := target.Create(apiOp, schema, data) + if err != nil { + return types.APIObject{}, err + } + return toAPI(schema, obj), nil } // Update updates a single object in the store. @@ -129,7 +163,11 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty return types.APIObject{}, err } - return target.Update(apiOp, schema, data, id) + obj, err := target.Update(apiOp, schema, data, id) + if err != nil { + return types.APIObject{}, err + } + return toAPI(schema, obj), nil } // Watch returns a channel of events for a list or resource. @@ -159,7 +197,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types return err } for i := range c { - response <- i + response <- toAPIEvent(apiOp, schema, i) } return nil }) @@ -189,3 +227,80 @@ func getLimit(req *http.Request) int { } return limit } + +func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject { + if obj == nil || reflect.ValueOf(obj).IsNil() { + return types.APIObject{} + } + + if unstr, ok := obj.(*unstructured.Unstructured); ok { + obj = moveToUnderscore(unstr) + } + + apiObject := types.APIObject{ + Type: schema.ID, + Object: obj, + } + + m, err := meta.Accessor(obj) + if err != nil { + return apiObject + } + + id := m.GetName() + ns := m.GetNamespace() + if ns != "" { + id = fmt.Sprintf("%s/%s", ns, id) + } + + apiObject.ID = id + return apiObject +} + +func moveToUnderscore(obj *unstructured.Unstructured) *unstructured.Unstructured { + if obj == nil { + return nil + } + + for k := range types.ReservedFields { + v, ok := obj.Object[k] + if ok { + delete(obj.Object, k) + obj.Object["_"+k] = v + } + } + + return obj +} + +func toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, event watch.Event) types.APIEvent { + name := types.ChangeAPIEvent + switch event.Type { + case watch.Deleted: + name = types.RemoveAPIEvent + case watch.Added: + name = types.CreateAPIEvent + case watch.Error: + name = "resource.error" + } + + apiEvent := types.APIEvent{ + Name: name, + } + + if event.Type == watch.Error { + status, _ := event.Object.(*metav1.Status) + apiEvent.Error = fmt.Errorf(status.Message) + return apiEvent + } + + apiEvent.Object = toAPI(schema, event.Object) + + m, err := meta.Accessor(event.Object) + if err != nil { + return apiEvent + } + + apiEvent.Revision = m.GetResourceVersion() + return apiEvent +} diff --git a/pkg/stores/partition/store_test.go b/pkg/stores/partition/store_test.go index 41fab01e..c08775f6 100644 --- a/pkg/stores/partition/store_test.go +++ b/pkg/stores/partition/store_test.go @@ -12,6 +12,7 @@ import ( "github.com/rancher/wrangler/pkg/schemas" "github.com/stretchr/testify/assert" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/watch" ) func TestList(t *testing.T) { @@ -19,7 +20,7 @@ func TestList(t *testing.T) { name string apiOps []*types.APIRequest partitions []Partition - objects map[string]types.APIObjectList + objects map[string]*unstructured.UnstructuredList want []types.APIObjectList }{ { @@ -32,10 +33,10 @@ func TestList(t *testing.T) { name: "all", }, }, - objects: map[string]types.APIObjectList{ + objects: map[string]*unstructured.UnstructuredList{ "all": { - Objects: []types.APIObject{ - newApple("fuji").toObj(), + Items: []unstructured.Unstructured{ + newApple("fuji").Unstructured, }, }, }, @@ -59,12 +60,12 @@ func TestList(t *testing.T) { name: "all", }, }, - objects: map[string]types.APIObjectList{ + objects: map[string]*unstructured.UnstructuredList{ "all": { - Objects: []types.APIObject{ - newApple("fuji").toObj(), - newApple("granny-smith").toObj(), - newApple("crispin").toObj(), + Items: []unstructured.Unstructured{ + newApple("fuji").Unstructured, + newApple("granny-smith").Unstructured, + newApple("crispin").Unstructured, }, }, }, @@ -101,20 +102,20 @@ func TestList(t *testing.T) { name: "yellow", }, }, - objects: map[string]types.APIObjectList{ + objects: map[string]*unstructured.UnstructuredList{ "pink": { - Objects: []types.APIObject{ - newApple("fuji").toObj(), + Items: []unstructured.Unstructured{ + newApple("fuji").Unstructured, }, }, "green": { - Objects: []types.APIObject{ - newApple("granny-smith").toObj(), + Items: []unstructured.Unstructured{ + newApple("granny-smith").Unstructured, }, }, "yellow": { - Objects: []types.APIObject{ - newApple("crispin").toObj(), + Items: []unstructured.Unstructured{ + newApple("crispin").Unstructured, }, }, }, @@ -148,28 +149,28 @@ func TestList(t *testing.T) { name: "red", }, }, - objects: map[string]types.APIObjectList{ + objects: map[string]*unstructured.UnstructuredList{ "pink": { - Objects: []types.APIObject{ - newApple("fuji").toObj(), - newApple("honeycrisp").toObj(), + Items: []unstructured.Unstructured{ + newApple("fuji").Unstructured, + newApple("honeycrisp").Unstructured, }, }, "green": { - Objects: []types.APIObject{ - newApple("granny-smith").toObj(), - newApple("bramley").toObj(), + Items: []unstructured.Unstructured{ + newApple("granny-smith").Unstructured, + newApple("bramley").Unstructured, }, }, "yellow": { - Objects: []types.APIObject{ - newApple("crispin").toObj(), - newApple("golden-delicious").toObj(), + Items: []unstructured.Unstructured{ + newApple("crispin").Unstructured, + newApple("golden-delicious").Unstructured, }, }, "red": { - Objects: []types.APIObject{ - newApple("red-delicious").toObj(), + Items: []unstructured.Unstructured{ + newApple("red-delicious").Unstructured, }, }, }, @@ -235,7 +236,7 @@ func (m mockPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, v return m.partitions, nil } -func (m mockPartitioner) Store(apiOp *types.APIRequest, partition Partition) (types.Store, error) { +func (m mockPartitioner) Store(apiOp *types.APIRequest, partition Partition) (UnstructuredStore, error) { return m.stores[partition.Name()], nil } @@ -248,11 +249,11 @@ func (m mockPartition) Name() string { } type mockStore struct { - contents types.APIObjectList + contents *unstructured.UnstructuredList partition mockPartition } -func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { +func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) { query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery) l := query.Get("limit") if l == "" { @@ -261,46 +262,46 @@ func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (type i := 0 if c := query.Get("continue"); c != "" { start, _ := base64.StdEncoding.DecodeString(c) - for j, obj := range m.contents.Objects { - if string(start) == obj.Name() { + for j, obj := range m.contents.Items { + if string(start) == obj.GetName() { i = j break } } } lInt, _ := strconv.Atoi(l) - contents := m.contents - if len(contents.Objects) > i+lInt { - contents.Continue = base64.StdEncoding.EncodeToString([]byte(contents.Objects[i+lInt].Name())) + contents := m.contents.DeepCopy() + if len(contents.Items) > i+lInt { + contents.SetContinue(base64.StdEncoding.EncodeToString([]byte(contents.Items[i+lInt].GetName()))) } - if i > len(contents.Objects) { + if i > len(contents.Items) { return contents, nil } - if i+lInt > len(contents.Objects) { - contents.Objects = contents.Objects[i:] + if i+lInt > len(contents.Items) { + contents.Items = contents.Items[i:] return contents, nil } - contents.Objects = contents.Objects[i : i+lInt] + contents.Items = contents.Items[i : i+lInt] return contents, nil } -func (m *mockStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { +func (m *mockStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) { panic("not implemented") } -func (m *mockStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { +func (m *mockStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error) { panic("not implemented") } -func (m *mockStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) { +func (m *mockStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error) { panic("not implemented") } -func (m *mockStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { +func (m *mockStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) { panic("not implemented") } -func (m *mockStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { +func (m *mockStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) { panic("not implemented") } diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index 82997400..c3c40c82 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -1,3 +1,4 @@ +// Package proxy implements the proxy store, which is responsible for interfacing directly with Kubernetes. package proxy import ( @@ -8,7 +9,6 @@ import ( "io/ioutil" "net/http" "os" - "reflect" "regexp" "strconv" @@ -65,7 +65,7 @@ type RelationshipNotifier interface { OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship } -// Store implements types.Store directly on top of kubernetes. +// Store implements partition.UnstructuredStore directly on top of kubernetes. type Store struct { clientGetter ClientGetter notifier RelationshipNotifier @@ -89,44 +89,14 @@ func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, loo } // ByID looks up a single object by its ID. -func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { - result, err := s.byID(apiOp, schema, apiOp.Namespace, id) - return toAPI(schema, result), err +func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) { + return s.byID(apiOp, schema, apiOp.Namespace, id) } func decodeParams(apiOp *types.APIRequest, target runtime.Object) error { return paramCodec.DecodeParameters(apiOp.Request.URL.Query(), metav1.SchemeGroupVersion, target) } -func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject { - if obj == nil || reflect.ValueOf(obj).IsNil() { - return types.APIObject{} - } - - if unstr, ok := obj.(*unstructured.Unstructured); ok { - obj = moveToUnderscore(unstr) - } - - apiObject := types.APIObject{ - Type: schema.ID, - Object: obj, - } - - m, err := meta.Accessor(obj) - if err != nil { - return apiObject - } - - id := m.GetName() - ns := m.GetNamespace() - if ns != "" { - id = fmt.Sprintf("%s/%s", ns, id) - } - - apiObject.ID = id - return apiObject -} - func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, error) { k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace)) if err != nil { @@ -158,22 +128,6 @@ func moveFromUnderscore(obj map[string]interface{}) map[string]interface{} { return obj } -func moveToUnderscore(obj *unstructured.Unstructured) *unstructured.Unstructured { - if obj == nil { - return nil - } - - for k := range types.ReservedFields { - v, ok := obj.Object[k] - if ok { - delete(obj.Object, k) - obj.Object["_"+k] = v - } - } - - return obj -} - func rowToObject(obj *unstructured.Unstructured) { if obj == nil { return @@ -230,77 +184,70 @@ func tableToObjects(obj map[string]interface{}) []unstructured.Unstructured { // to list *all* resources. // With this filter, the request can be performed successfully, and only the allowed resources will // be returned in the list. -func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (types.APIObjectList, error) { +func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (*unstructured.UnstructuredList, error) { if apiOp.Namespace == "*" { // This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat // this as an invalid situation instead of listing all objects in the cluster and filtering by name. - return types.APIObjectList{}, nil + return nil, nil } adminClient, err := s.clientGetter.TableAdminClient(apiOp, schema, apiOp.Namespace) if err != nil { - return types.APIObjectList{}, err + return nil, err } objs, err := s.list(apiOp, schema, adminClient) if err != nil { - return types.APIObjectList{}, err + return nil, err } - var filtered []types.APIObject - for _, obj := range objs.Objects { - if names.Has(obj.Name()) { + var filtered []unstructured.Unstructured + for _, obj := range objs.Items { + if names.Has(obj.GetName()) { filtered = append(filtered, obj) } } - objs.Objects = filtered + objs.Items = filtered return objs, nil } -// List returns a list of resources. -func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { +// List returns an unstructured list of resources. +func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) { client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace) if err != nil { - return types.APIObjectList{}, err + return nil, err } return s.list(apiOp, schema, client) } -func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dynamic.ResourceInterface) (types.APIObjectList, error) { +func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dynamic.ResourceInterface) (*unstructured.UnstructuredList, error) { opts := metav1.ListOptions{} if err := decodeParams(apiOp, &opts); err != nil { - return types.APIObjectList{}, nil + return nil, nil } k8sClient, _ := metricsStore.Wrap(client, nil) resultList, err := k8sClient.List(apiOp, opts) if err != nil { - return types.APIObjectList{}, err + return nil, err } tableToList(resultList) - result := types.APIObjectList{ - Revision: resultList.GetResourceVersion(), - Continue: resultList.GetContinue(), - } - - for i := range resultList.Items { - result.Objects = append(result.Objects, toAPI(schema, &resultList.Items[i])) - } - - return result, nil + return resultList, nil } -func returnErr(err error, c chan types.APIEvent) { - c <- types.APIEvent{ - Name: "resource.error", - Error: err, +func returnErr(err error, c chan watch.Event) { + c <- watch.Event{ + Type: "resource.error", + Object: &metav1.Status{ + Message: err.Error(), + }, } } -func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan types.APIEvent) { +func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan watch.Event) { rev := w.Revision if rev == "-1" || rev == "0" { rev = "" @@ -342,7 +289,8 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) { obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name) if err == nil { - result <- s.toAPIEvent(apiOp, schema, watch.Modified, obj) + rowToObject(obj) + result <- watch.Event{Type: watch.Modified, Object: obj} } else { logrus.Debugf("notifier watch error: %v", err) returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result) @@ -363,7 +311,10 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt } continue } - result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object) + if unstr, ok := event.Object.(*unstructured.Unstructured); ok { + rowToObject(unstr) + } + result <- event } return fmt.Errorf("closed") }) @@ -378,7 +329,7 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt // to list *all* resources. // With this filter, the request can be performed successfully, and only the allowed resources will // be returned in watch. -func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan types.APIEvent, error) { +func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan watch.Event, error) { adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace) if err != nil { return nil, err @@ -388,11 +339,16 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t return nil, err } - result := make(chan types.APIEvent) + result := make(chan watch.Event) go func() { defer close(result) for item := range c { - if item.Error == nil && names.Has(item.Object.Name()) { + + m, err := meta.Accessor(item.Object) + if err != nil { + return + } + if item.Type != watch.Error && names.Has(m.GetName()) { result <- item } } @@ -402,7 +358,7 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t } // Watch returns a channel of events for a list or resource. -func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { +func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) { client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace) if err != nil { return nil, err @@ -410,8 +366,8 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. return s.watch(apiOp, schema, w, client) } -func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, client dynamic.ResourceInterface) (chan types.APIEvent, error) { - result := make(chan types.APIEvent) +func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, client dynamic.ResourceInterface) (chan watch.Event, error) { + result := make(chan watch.Event) go func() { s.listAndWatch(apiOp, client, schema, w, result) logrus.Debugf("closing watcher for %s", schema.ID) @@ -420,35 +376,8 @@ func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types. return result, nil } -func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et watch.EventType, obj runtime.Object) types.APIEvent { - name := types.ChangeAPIEvent - switch et { - case watch.Deleted: - name = types.RemoveAPIEvent - case watch.Added: - name = types.CreateAPIEvent - } - - if unstr, ok := obj.(*unstructured.Unstructured); ok { - rowToObject(unstr) - } - - event := types.APIEvent{ - Name: name, - Object: toAPI(schema, obj), - } - - m, err := meta.Accessor(obj) - if err != nil { - return event - } - - event.Revision = m.GetResourceVersion() - return event -} - // Create creates a single object in the store. -func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (types.APIObject, error) { +func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (*unstructured.Unstructured, error) { var ( resp *unstructured.Unstructured ) @@ -474,22 +403,21 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns)) if err != nil { - return types.APIObject{}, err + return nil, err } opts := metav1.CreateOptions{} if err := decodeParams(apiOp, &opts); err != nil { - return types.APIObject{}, err + return nil, err } resp, err = k8sClient.Create(apiOp, &unstructured.Unstructured{Object: input}, opts) rowToObject(resp) - apiObject := toAPI(schema, resp) - return apiObject, err + return resp, err } // Update updates a single object in the store. -func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (types.APIObject, error) { +func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (*unstructured.Unstructured, error) { var ( err error input = params.Data() @@ -498,13 +426,13 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params ns := types.Namespace(input) k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns)) if err != nil { - return types.APIObject{}, err + return nil, err } if apiOp.Method == http.MethodPatch { bytes, err := ioutil.ReadAll(io.LimitReader(apiOp.Request.Body, 2<<20)) if err != nil { - return types.APIObject{}, err + return nil, err } pType := apitypes.StrategicMergePatchType @@ -514,70 +442,70 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params opts := metav1.PatchOptions{} if err := decodeParams(apiOp, &opts); err != nil { - return types.APIObject{}, err + return nil, err } if pType == apitypes.StrategicMergePatchType { data := map[string]interface{}{} if err := json.Unmarshal(bytes, &data); err != nil { - return types.APIObject{}, err + return nil, err } data = moveFromUnderscore(data) bytes, err = json.Marshal(data) if err != nil { - return types.APIObject{}, err + return nil, err } } resp, err := k8sClient.Patch(apiOp, id, pType, bytes, opts) if err != nil { - return types.APIObject{}, err + return nil, err } - return toAPI(schema, resp), nil + return resp, nil } resourceVersion := input.String("metadata", "resourceVersion") if resourceVersion == "" { - return types.APIObject{}, fmt.Errorf("metadata.resourceVersion is required for update") + return nil, fmt.Errorf("metadata.resourceVersion is required for update") } opts := metav1.UpdateOptions{} if err := decodeParams(apiOp, &opts); err != nil { - return types.APIObject{}, err + return nil, err } resp, err := k8sClient.Update(apiOp, &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{}) if err != nil { - return types.APIObject{}, err + return nil, err } rowToObject(resp) - return toAPI(schema, resp), nil + return resp, nil } // Delete deletes an object from a store. -func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { +func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) { opts := metav1.DeleteOptions{} if err := decodeParams(apiOp, &opts); err != nil { - return types.APIObject{}, nil + return nil, nil } k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)) if err != nil { - return types.APIObject{}, err + return nil, err } if err := k8sClient.Delete(apiOp, id, opts); err != nil { - return types.APIObject{}, err + return nil, err } obj, err := s.byID(apiOp, schema, apiOp.Namespace, id) if err != nil { // ignore lookup error - return types.APIObject{}, validation.ErrorCode{ + return nil, validation.ErrorCode{ Status: http.StatusNoContent, } } - return toAPI(schema, obj), nil + return obj, nil } diff --git a/pkg/stores/proxy/rbac_store.go b/pkg/stores/proxy/rbac_store.go index 15625603..f67089d7 100644 --- a/pkg/stores/proxy/rbac_store.go +++ b/pkg/stores/proxy/rbac_store.go @@ -9,7 +9,9 @@ import ( "github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/stores/partition" "github.com/rancher/wrangler/pkg/kv" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/watch" ) var ( @@ -85,8 +87,8 @@ func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, } } -// Store returns a proxy Store suited to listing and watching resources by partition. -func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) { +// Store returns an UnstructuredStore suited to listing and watching resources by partition. +func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (partition.UnstructuredStore, error) { return &byNameOrNamespaceStore{ Store: p.proxyStore, partition: partition.(Partition), @@ -99,7 +101,7 @@ type byNameOrNamespaceStore struct { } // List returns a list of resources by partition. -func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { +func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) { if b.partition.Passthrough { return b.Store.List(apiOp, schema) } @@ -112,7 +114,7 @@ func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.API } // Watch returns a channel of resources by partition. -func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { +func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan watch.Event, error) { if b.partition.Passthrough { return b.Store.Watch(apiOp, schema, wr) }