mirror of
https://github.com/rancher/steve.git
synced 2025-09-25 06:42:35 +00:00
Refactor proxy store
Filtering and sorting needs to operate on unstructured data. It also needs to operate after the parallel partitioner, higher in the store stack. This means that the proxy Store needs to return raw, unstructured data up to the partitioner. This change moves all conversions from unstructured Kubernetes types to apiserver objects up from the proxy store into the partitioner.
This commit is contained in:
@@ -5,9 +5,9 @@ import (
|
|||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
|
||||||
"github.com/rancher/apiserver/pkg/types"
|
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
"golang.org/x/sync/semaphore"
|
"golang.org/x/sync/semaphore"
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Partition represents a named grouping of kubernetes resources,
|
// Partition represents a named grouping of kubernetes resources,
|
||||||
@@ -33,7 +33,7 @@ type ParallelPartitionLister struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// PartitionLister lists objects for one partition.
|
// PartitionLister lists objects for one partition.
|
||||||
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error)
|
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error)
|
||||||
|
|
||||||
// Err returns the latest error encountered.
|
// Err returns the latest error encountered.
|
||||||
func (p *ParallelPartitionLister) Err() error {
|
func (p *ParallelPartitionLister) Err() error {
|
||||||
@@ -72,7 +72,7 @@ func indexOrZero(partitions []Partition, name string) int {
|
|||||||
// List returns a stream of objects up to the requested limit.
|
// List returns a stream of objects up to the requested limit.
|
||||||
// If the continue token is not empty, it decodes it and returns the stream
|
// If the continue token is not empty, it decodes it and returns the stream
|
||||||
// starting at the indicated marker.
|
// starting at the indicated marker.
|
||||||
func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []types.APIObject, error) {
|
func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []unstructured.Unstructured, error) {
|
||||||
var state listState
|
var state listState
|
||||||
if resume != "" {
|
if resume != "" {
|
||||||
bytes, err := base64.StdEncoding.DecodeString(resume)
|
bytes, err := base64.StdEncoding.DecodeString(resume)
|
||||||
@@ -88,7 +88,7 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make(chan []types.APIObject)
|
result := make(chan []unstructured.Unstructured)
|
||||||
go p.feeder(ctx, state, limit, result)
|
go p.feeder(ctx, state, limit, result)
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@@ -120,7 +120,7 @@ type listState struct {
|
|||||||
// 100000, the result is truncated and a continue token is generated that
|
// 100000, the result is truncated and a continue token is generated that
|
||||||
// indicates the partition and offset for the client to start on in the next
|
// indicates the partition and offset for the client to start on in the next
|
||||||
// request.
|
// request.
|
||||||
func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) {
|
func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []unstructured.Unstructured) {
|
||||||
var (
|
var (
|
||||||
sem = semaphore.NewWeighted(p.Concurrency)
|
sem = semaphore.NewWeighted(p.Concurrency)
|
||||||
capacity = limit
|
capacity = limit
|
||||||
@@ -183,25 +183,25 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
|
|||||||
}
|
}
|
||||||
|
|
||||||
if state.Revision == "" {
|
if state.Revision == "" {
|
||||||
state.Revision = list.Revision
|
state.Revision = list.GetResourceVersion()
|
||||||
}
|
}
|
||||||
|
|
||||||
if p.revision == "" {
|
if p.revision == "" {
|
||||||
p.revision = list.Revision
|
p.revision = list.GetResourceVersion()
|
||||||
}
|
}
|
||||||
|
|
||||||
// We have already seen the first objects in the list, truncate up to the offset.
|
// We have already seen the first objects in the list, truncate up to the offset.
|
||||||
if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) {
|
if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Items) {
|
||||||
list.Objects = list.Objects[state.Offset:]
|
list.Items = list.Items[state.Offset:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Case 1: the capacity has been reached across all goroutines but the list is still only partial,
|
// Case 1: the capacity has been reached across all goroutines but the list is still only partial,
|
||||||
// so save the state so that the next page can be requested later.
|
// so save the state so that the next page can be requested later.
|
||||||
if len(list.Objects) > capacity {
|
if len(list.Items) > capacity {
|
||||||
result <- list.Objects[:capacity]
|
result <- list.Items[:capacity]
|
||||||
// save state to redo this list at this offset
|
// save state to redo this list at this offset
|
||||||
p.state = &listState{
|
p.state = &listState{
|
||||||
Revision: list.Revision,
|
Revision: list.GetResourceVersion(),
|
||||||
PartitionName: partition.Name(),
|
PartitionName: partition.Name(),
|
||||||
Continue: cont,
|
Continue: cont,
|
||||||
Offset: capacity,
|
Offset: capacity,
|
||||||
@@ -210,16 +210,16 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
|
|||||||
capacity = 0
|
capacity = 0
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
result <- list.Objects
|
result <- list.Items
|
||||||
capacity -= len(list.Objects)
|
capacity -= len(list.Items)
|
||||||
// Case 2: all objects have been returned, we are done.
|
// Case 2: all objects have been returned, we are done.
|
||||||
if list.Continue == "" {
|
if list.GetContinue() == "" {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// Case 3: we started at an offset and truncated the list to skip the objects up to the offset.
|
// Case 3: we started at an offset and truncated the list to skip the objects up to the offset.
|
||||||
// We're not yet up to capacity and have not retrieved every object,
|
// We're not yet up to capacity and have not retrieved every object,
|
||||||
// so loop again and get more data.
|
// so loop again and get more data.
|
||||||
state.Continue = list.Continue
|
state.Continue = list.GetContinue()
|
||||||
state.PartitionName = partition.Name()
|
state.PartitionName = partition.Name()
|
||||||
state.Offset = 0
|
state.Offset = 0
|
||||||
}
|
}
|
||||||
|
@@ -1,12 +1,21 @@
|
|||||||
|
// Package partition implements a store with parallel partitioning of data
|
||||||
|
// so that segmented data can be concurrently collected and returned as a single data set.
|
||||||
package partition
|
package partition
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
"github.com/rancher/apiserver/pkg/types"
|
"github.com/rancher/apiserver/pkg/types"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
"k8s.io/apimachinery/pkg/api/meta"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
const defaultLimit = 100000
|
const defaultLimit = 100000
|
||||||
@@ -15,7 +24,7 @@ const defaultLimit = 100000
|
|||||||
type Partitioner interface {
|
type Partitioner interface {
|
||||||
Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error)
|
Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error)
|
||||||
All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error)
|
All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error)
|
||||||
Store(apiOp *types.APIRequest, partition Partition) (types.Store, error)
|
Store(apiOp *types.APIRequest, partition Partition) (UnstructuredStore, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store implements types.Store for partitions.
|
// Store implements types.Store for partitions.
|
||||||
@@ -23,7 +32,17 @@ type Store struct {
|
|||||||
Partitioner Partitioner
|
Partitioner Partitioner
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (types.Store, error) {
|
// UnstructuredStore is like types.Store but deals in k8s unstructured objects instead of apiserver types.
|
||||||
|
type UnstructuredStore interface {
|
||||||
|
ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error)
|
||||||
|
List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error)
|
||||||
|
Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error)
|
||||||
|
Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error)
|
||||||
|
Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error)
|
||||||
|
Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (UnstructuredStore, error) {
|
||||||
p, err := s.Partitioner.Lookup(apiOp, schema, verb, id)
|
p, err := s.Partitioner.Lookup(apiOp, schema, verb, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -39,7 +58,11 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri
|
|||||||
return types.APIObject{}, err
|
return types.APIObject{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return target.Delete(apiOp, schema, id)
|
obj, err := target.Delete(apiOp, schema, id)
|
||||||
|
if err != nil {
|
||||||
|
return types.APIObject{}, err
|
||||||
|
}
|
||||||
|
return toAPI(schema, obj), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ByID looks up a single object by its ID.
|
// ByID looks up a single object by its ID.
|
||||||
@@ -49,14 +72,18 @@ func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string
|
|||||||
return types.APIObject{}, err
|
return types.APIObject{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return target.ByID(apiOp, schema, id)
|
obj, err := target.ByID(apiOp, schema, id)
|
||||||
|
if err != nil {
|
||||||
|
return types.APIObject{}, err
|
||||||
|
}
|
||||||
|
return toAPI(schema, obj), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition,
|
func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition,
|
||||||
cont string, revision string, limit int) (types.APIObjectList, error) {
|
cont string, revision string, limit int) (*unstructured.UnstructuredList, error) {
|
||||||
store, err := s.Partitioner.Store(apiOp, partition)
|
store, err := s.Partitioner.Store(apiOp, partition)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObjectList{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
req := apiOp.Clone()
|
req := apiOp.Clone()
|
||||||
@@ -88,7 +115,7 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
|
|||||||
}
|
}
|
||||||
|
|
||||||
lister := ParallelPartitionLister{
|
lister := ParallelPartitionLister{
|
||||||
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) {
|
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error) {
|
||||||
return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit)
|
return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit)
|
||||||
},
|
},
|
||||||
Concurrency: 3,
|
Concurrency: 3,
|
||||||
@@ -104,7 +131,10 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
|
|||||||
}
|
}
|
||||||
|
|
||||||
for items := range list {
|
for items := range list {
|
||||||
result.Objects = append(result.Objects, items...)
|
for _, item := range items {
|
||||||
|
item := item
|
||||||
|
result.Objects = append(result.Objects, toAPI(schema, &item))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
result.Revision = lister.Revision()
|
result.Revision = lister.Revision()
|
||||||
@@ -119,7 +149,11 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty
|
|||||||
return types.APIObject{}, err
|
return types.APIObject{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return target.Create(apiOp, schema, data)
|
obj, err := target.Create(apiOp, schema, data)
|
||||||
|
if err != nil {
|
||||||
|
return types.APIObject{}, err
|
||||||
|
}
|
||||||
|
return toAPI(schema, obj), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update updates a single object in the store.
|
// Update updates a single object in the store.
|
||||||
@@ -129,7 +163,11 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty
|
|||||||
return types.APIObject{}, err
|
return types.APIObject{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return target.Update(apiOp, schema, data, id)
|
obj, err := target.Update(apiOp, schema, data, id)
|
||||||
|
if err != nil {
|
||||||
|
return types.APIObject{}, err
|
||||||
|
}
|
||||||
|
return toAPI(schema, obj), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch returns a channel of events for a list or resource.
|
// Watch returns a channel of events for a list or resource.
|
||||||
@@ -159,7 +197,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
for i := range c {
|
for i := range c {
|
||||||
response <- i
|
response <- toAPIEvent(apiOp, schema, i)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@@ -189,3 +227,80 @@ func getLimit(req *http.Request) int {
|
|||||||
}
|
}
|
||||||
return limit
|
return limit
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
|
||||||
|
if obj == nil || reflect.ValueOf(obj).IsNil() {
|
||||||
|
return types.APIObject{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if unstr, ok := obj.(*unstructured.Unstructured); ok {
|
||||||
|
obj = moveToUnderscore(unstr)
|
||||||
|
}
|
||||||
|
|
||||||
|
apiObject := types.APIObject{
|
||||||
|
Type: schema.ID,
|
||||||
|
Object: obj,
|
||||||
|
}
|
||||||
|
|
||||||
|
m, err := meta.Accessor(obj)
|
||||||
|
if err != nil {
|
||||||
|
return apiObject
|
||||||
|
}
|
||||||
|
|
||||||
|
id := m.GetName()
|
||||||
|
ns := m.GetNamespace()
|
||||||
|
if ns != "" {
|
||||||
|
id = fmt.Sprintf("%s/%s", ns, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
apiObject.ID = id
|
||||||
|
return apiObject
|
||||||
|
}
|
||||||
|
|
||||||
|
func moveToUnderscore(obj *unstructured.Unstructured) *unstructured.Unstructured {
|
||||||
|
if obj == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for k := range types.ReservedFields {
|
||||||
|
v, ok := obj.Object[k]
|
||||||
|
if ok {
|
||||||
|
delete(obj.Object, k)
|
||||||
|
obj.Object["_"+k] = v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
|
||||||
|
func toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, event watch.Event) types.APIEvent {
|
||||||
|
name := types.ChangeAPIEvent
|
||||||
|
switch event.Type {
|
||||||
|
case watch.Deleted:
|
||||||
|
name = types.RemoveAPIEvent
|
||||||
|
case watch.Added:
|
||||||
|
name = types.CreateAPIEvent
|
||||||
|
case watch.Error:
|
||||||
|
name = "resource.error"
|
||||||
|
}
|
||||||
|
|
||||||
|
apiEvent := types.APIEvent{
|
||||||
|
Name: name,
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.Type == watch.Error {
|
||||||
|
status, _ := event.Object.(*metav1.Status)
|
||||||
|
apiEvent.Error = fmt.Errorf(status.Message)
|
||||||
|
return apiEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
apiEvent.Object = toAPI(schema, event.Object)
|
||||||
|
|
||||||
|
m, err := meta.Accessor(event.Object)
|
||||||
|
if err != nil {
|
||||||
|
return apiEvent
|
||||||
|
}
|
||||||
|
|
||||||
|
apiEvent.Revision = m.GetResourceVersion()
|
||||||
|
return apiEvent
|
||||||
|
}
|
||||||
|
@@ -12,6 +12,7 @@ import (
|
|||||||
"github.com/rancher/wrangler/pkg/schemas"
|
"github.com/rancher/wrangler/pkg/schemas"
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestList(t *testing.T) {
|
func TestList(t *testing.T) {
|
||||||
@@ -19,7 +20,7 @@ func TestList(t *testing.T) {
|
|||||||
name string
|
name string
|
||||||
apiOps []*types.APIRequest
|
apiOps []*types.APIRequest
|
||||||
partitions []Partition
|
partitions []Partition
|
||||||
objects map[string]types.APIObjectList
|
objects map[string]*unstructured.UnstructuredList
|
||||||
want []types.APIObjectList
|
want []types.APIObjectList
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
@@ -32,10 +33,10 @@ func TestList(t *testing.T) {
|
|||||||
name: "all",
|
name: "all",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
objects: map[string]types.APIObjectList{
|
objects: map[string]*unstructured.UnstructuredList{
|
||||||
"all": {
|
"all": {
|
||||||
Objects: []types.APIObject{
|
Items: []unstructured.Unstructured{
|
||||||
newApple("fuji").toObj(),
|
newApple("fuji").Unstructured,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -59,12 +60,12 @@ func TestList(t *testing.T) {
|
|||||||
name: "all",
|
name: "all",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
objects: map[string]types.APIObjectList{
|
objects: map[string]*unstructured.UnstructuredList{
|
||||||
"all": {
|
"all": {
|
||||||
Objects: []types.APIObject{
|
Items: []unstructured.Unstructured{
|
||||||
newApple("fuji").toObj(),
|
newApple("fuji").Unstructured,
|
||||||
newApple("granny-smith").toObj(),
|
newApple("granny-smith").Unstructured,
|
||||||
newApple("crispin").toObj(),
|
newApple("crispin").Unstructured,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -101,20 +102,20 @@ func TestList(t *testing.T) {
|
|||||||
name: "yellow",
|
name: "yellow",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
objects: map[string]types.APIObjectList{
|
objects: map[string]*unstructured.UnstructuredList{
|
||||||
"pink": {
|
"pink": {
|
||||||
Objects: []types.APIObject{
|
Items: []unstructured.Unstructured{
|
||||||
newApple("fuji").toObj(),
|
newApple("fuji").Unstructured,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"green": {
|
"green": {
|
||||||
Objects: []types.APIObject{
|
Items: []unstructured.Unstructured{
|
||||||
newApple("granny-smith").toObj(),
|
newApple("granny-smith").Unstructured,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"yellow": {
|
"yellow": {
|
||||||
Objects: []types.APIObject{
|
Items: []unstructured.Unstructured{
|
||||||
newApple("crispin").toObj(),
|
newApple("crispin").Unstructured,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -148,28 +149,28 @@ func TestList(t *testing.T) {
|
|||||||
name: "red",
|
name: "red",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
objects: map[string]types.APIObjectList{
|
objects: map[string]*unstructured.UnstructuredList{
|
||||||
"pink": {
|
"pink": {
|
||||||
Objects: []types.APIObject{
|
Items: []unstructured.Unstructured{
|
||||||
newApple("fuji").toObj(),
|
newApple("fuji").Unstructured,
|
||||||
newApple("honeycrisp").toObj(),
|
newApple("honeycrisp").Unstructured,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"green": {
|
"green": {
|
||||||
Objects: []types.APIObject{
|
Items: []unstructured.Unstructured{
|
||||||
newApple("granny-smith").toObj(),
|
newApple("granny-smith").Unstructured,
|
||||||
newApple("bramley").toObj(),
|
newApple("bramley").Unstructured,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"yellow": {
|
"yellow": {
|
||||||
Objects: []types.APIObject{
|
Items: []unstructured.Unstructured{
|
||||||
newApple("crispin").toObj(),
|
newApple("crispin").Unstructured,
|
||||||
newApple("golden-delicious").toObj(),
|
newApple("golden-delicious").Unstructured,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"red": {
|
"red": {
|
||||||
Objects: []types.APIObject{
|
Items: []unstructured.Unstructured{
|
||||||
newApple("red-delicious").toObj(),
|
newApple("red-delicious").Unstructured,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -235,7 +236,7 @@ func (m mockPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema, v
|
|||||||
return m.partitions, nil
|
return m.partitions, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m mockPartitioner) Store(apiOp *types.APIRequest, partition Partition) (types.Store, error) {
|
func (m mockPartitioner) Store(apiOp *types.APIRequest, partition Partition) (UnstructuredStore, error) {
|
||||||
return m.stores[partition.Name()], nil
|
return m.stores[partition.Name()], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -248,11 +249,11 @@ func (m mockPartition) Name() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type mockStore struct {
|
type mockStore struct {
|
||||||
contents types.APIObjectList
|
contents *unstructured.UnstructuredList
|
||||||
partition mockPartition
|
partition mockPartition
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
|
func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
|
||||||
query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery)
|
query, _ := url.ParseQuery(apiOp.Request.URL.RawQuery)
|
||||||
l := query.Get("limit")
|
l := query.Get("limit")
|
||||||
if l == "" {
|
if l == "" {
|
||||||
@@ -261,46 +262,46 @@ func (m *mockStore) List(apiOp *types.APIRequest, schema *types.APISchema) (type
|
|||||||
i := 0
|
i := 0
|
||||||
if c := query.Get("continue"); c != "" {
|
if c := query.Get("continue"); c != "" {
|
||||||
start, _ := base64.StdEncoding.DecodeString(c)
|
start, _ := base64.StdEncoding.DecodeString(c)
|
||||||
for j, obj := range m.contents.Objects {
|
for j, obj := range m.contents.Items {
|
||||||
if string(start) == obj.Name() {
|
if string(start) == obj.GetName() {
|
||||||
i = j
|
i = j
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
lInt, _ := strconv.Atoi(l)
|
lInt, _ := strconv.Atoi(l)
|
||||||
contents := m.contents
|
contents := m.contents.DeepCopy()
|
||||||
if len(contents.Objects) > i+lInt {
|
if len(contents.Items) > i+lInt {
|
||||||
contents.Continue = base64.StdEncoding.EncodeToString([]byte(contents.Objects[i+lInt].Name()))
|
contents.SetContinue(base64.StdEncoding.EncodeToString([]byte(contents.Items[i+lInt].GetName())))
|
||||||
}
|
}
|
||||||
if i > len(contents.Objects) {
|
if i > len(contents.Items) {
|
||||||
return contents, nil
|
return contents, nil
|
||||||
}
|
}
|
||||||
if i+lInt > len(contents.Objects) {
|
if i+lInt > len(contents.Items) {
|
||||||
contents.Objects = contents.Objects[i:]
|
contents.Items = contents.Items[i:]
|
||||||
return contents, nil
|
return contents, nil
|
||||||
}
|
}
|
||||||
contents.Objects = contents.Objects[i : i+lInt]
|
contents.Items = contents.Items[i : i+lInt]
|
||||||
return contents, nil
|
return contents, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
|
func (m *mockStore) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
|
func (m *mockStore) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (types.APIObject, error) {
|
func (m *mockStore) Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
|
func (m *mockStore) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *mockStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
|
func (m *mockStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) {
|
||||||
panic("not implemented")
|
panic("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -1,3 +1,4 @@
|
|||||||
|
// Package proxy implements the proxy store, which is responsible for interfacing directly with Kubernetes.
|
||||||
package proxy
|
package proxy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
@@ -8,7 +9,6 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
|
||||||
"regexp"
|
"regexp"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
@@ -65,7 +65,7 @@ type RelationshipNotifier interface {
|
|||||||
OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship
|
OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store implements types.Store directly on top of kubernetes.
|
// Store implements partition.UnstructuredStore directly on top of kubernetes.
|
||||||
type Store struct {
|
type Store struct {
|
||||||
clientGetter ClientGetter
|
clientGetter ClientGetter
|
||||||
notifier RelationshipNotifier
|
notifier RelationshipNotifier
|
||||||
@@ -89,44 +89,14 @@ func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, loo
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ByID looks up a single object by its ID.
|
// ByID looks up a single object by its ID.
|
||||||
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
|
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
|
||||||
result, err := s.byID(apiOp, schema, apiOp.Namespace, id)
|
return s.byID(apiOp, schema, apiOp.Namespace, id)
|
||||||
return toAPI(schema, result), err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeParams(apiOp *types.APIRequest, target runtime.Object) error {
|
func decodeParams(apiOp *types.APIRequest, target runtime.Object) error {
|
||||||
return paramCodec.DecodeParameters(apiOp.Request.URL.Query(), metav1.SchemeGroupVersion, target)
|
return paramCodec.DecodeParameters(apiOp.Request.URL.Query(), metav1.SchemeGroupVersion, target)
|
||||||
}
|
}
|
||||||
|
|
||||||
func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
|
|
||||||
if obj == nil || reflect.ValueOf(obj).IsNil() {
|
|
||||||
return types.APIObject{}
|
|
||||||
}
|
|
||||||
|
|
||||||
if unstr, ok := obj.(*unstructured.Unstructured); ok {
|
|
||||||
obj = moveToUnderscore(unstr)
|
|
||||||
}
|
|
||||||
|
|
||||||
apiObject := types.APIObject{
|
|
||||||
Type: schema.ID,
|
|
||||||
Object: obj,
|
|
||||||
}
|
|
||||||
|
|
||||||
m, err := meta.Accessor(obj)
|
|
||||||
if err != nil {
|
|
||||||
return apiObject
|
|
||||||
}
|
|
||||||
|
|
||||||
id := m.GetName()
|
|
||||||
ns := m.GetNamespace()
|
|
||||||
if ns != "" {
|
|
||||||
id = fmt.Sprintf("%s/%s", ns, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
apiObject.ID = id
|
|
||||||
return apiObject
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, error) {
|
func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, error) {
|
||||||
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace))
|
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -158,22 +128,6 @@ func moveFromUnderscore(obj map[string]interface{}) map[string]interface{} {
|
|||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
|
||||||
func moveToUnderscore(obj *unstructured.Unstructured) *unstructured.Unstructured {
|
|
||||||
if obj == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
for k := range types.ReservedFields {
|
|
||||||
v, ok := obj.Object[k]
|
|
||||||
if ok {
|
|
||||||
delete(obj.Object, k)
|
|
||||||
obj.Object["_"+k] = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return obj
|
|
||||||
}
|
|
||||||
|
|
||||||
func rowToObject(obj *unstructured.Unstructured) {
|
func rowToObject(obj *unstructured.Unstructured) {
|
||||||
if obj == nil {
|
if obj == nil {
|
||||||
return
|
return
|
||||||
@@ -230,77 +184,70 @@ func tableToObjects(obj map[string]interface{}) []unstructured.Unstructured {
|
|||||||
// to list *all* resources.
|
// to list *all* resources.
|
||||||
// With this filter, the request can be performed successfully, and only the allowed resources will
|
// With this filter, the request can be performed successfully, and only the allowed resources will
|
||||||
// be returned in the list.
|
// be returned in the list.
|
||||||
func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (types.APIObjectList, error) {
|
func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (*unstructured.UnstructuredList, error) {
|
||||||
if apiOp.Namespace == "*" {
|
if apiOp.Namespace == "*" {
|
||||||
// This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat
|
// This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat
|
||||||
// this as an invalid situation instead of listing all objects in the cluster and filtering by name.
|
// this as an invalid situation instead of listing all objects in the cluster and filtering by name.
|
||||||
return types.APIObjectList{}, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
adminClient, err := s.clientGetter.TableAdminClient(apiOp, schema, apiOp.Namespace)
|
adminClient, err := s.clientGetter.TableAdminClient(apiOp, schema, apiOp.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObjectList{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
objs, err := s.list(apiOp, schema, adminClient)
|
objs, err := s.list(apiOp, schema, adminClient)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObjectList{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var filtered []types.APIObject
|
var filtered []unstructured.Unstructured
|
||||||
for _, obj := range objs.Objects {
|
for _, obj := range objs.Items {
|
||||||
if names.Has(obj.Name()) {
|
if names.Has(obj.GetName()) {
|
||||||
filtered = append(filtered, obj)
|
filtered = append(filtered, obj)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
objs.Objects = filtered
|
objs.Items = filtered
|
||||||
return objs, nil
|
return objs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// List returns a list of resources.
|
// List returns an unstructured list of resources.
|
||||||
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
|
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
|
||||||
client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)
|
client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObjectList{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return s.list(apiOp, schema, client)
|
return s.list(apiOp, schema, client)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dynamic.ResourceInterface) (types.APIObjectList, error) {
|
func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dynamic.ResourceInterface) (*unstructured.UnstructuredList, error) {
|
||||||
opts := metav1.ListOptions{}
|
opts := metav1.ListOptions{}
|
||||||
if err := decodeParams(apiOp, &opts); err != nil {
|
if err := decodeParams(apiOp, &opts); err != nil {
|
||||||
return types.APIObjectList{}, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
k8sClient, _ := metricsStore.Wrap(client, nil)
|
k8sClient, _ := metricsStore.Wrap(client, nil)
|
||||||
resultList, err := k8sClient.List(apiOp, opts)
|
resultList, err := k8sClient.List(apiOp, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObjectList{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
tableToList(resultList)
|
tableToList(resultList)
|
||||||
|
|
||||||
result := types.APIObjectList{
|
return resultList, nil
|
||||||
Revision: resultList.GetResourceVersion(),
|
|
||||||
Continue: resultList.GetContinue(),
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := range resultList.Items {
|
|
||||||
result.Objects = append(result.Objects, toAPI(schema, &resultList.Items[i]))
|
|
||||||
}
|
|
||||||
|
|
||||||
return result, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func returnErr(err error, c chan types.APIEvent) {
|
func returnErr(err error, c chan watch.Event) {
|
||||||
c <- types.APIEvent{
|
c <- watch.Event{
|
||||||
Name: "resource.error",
|
Type: "resource.error",
|
||||||
Error: err,
|
Object: &metav1.Status{
|
||||||
|
Message: err.Error(),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan types.APIEvent) {
|
func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan watch.Event) {
|
||||||
rev := w.Revision
|
rev := w.Revision
|
||||||
if rev == "-1" || rev == "0" {
|
if rev == "-1" || rev == "0" {
|
||||||
rev = ""
|
rev = ""
|
||||||
@@ -342,7 +289,8 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
|
|||||||
for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) {
|
for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) {
|
||||||
obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name)
|
obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
result <- s.toAPIEvent(apiOp, schema, watch.Modified, obj)
|
rowToObject(obj)
|
||||||
|
result <- watch.Event{Type: watch.Modified, Object: obj}
|
||||||
} else {
|
} else {
|
||||||
logrus.Debugf("notifier watch error: %v", err)
|
logrus.Debugf("notifier watch error: %v", err)
|
||||||
returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result)
|
returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result)
|
||||||
@@ -363,7 +311,10 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
|
|||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object)
|
if unstr, ok := event.Object.(*unstructured.Unstructured); ok {
|
||||||
|
rowToObject(unstr)
|
||||||
|
}
|
||||||
|
result <- event
|
||||||
}
|
}
|
||||||
return fmt.Errorf("closed")
|
return fmt.Errorf("closed")
|
||||||
})
|
})
|
||||||
@@ -378,7 +329,7 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
|
|||||||
// to list *all* resources.
|
// to list *all* resources.
|
||||||
// With this filter, the request can be performed successfully, and only the allowed resources will
|
// With this filter, the request can be performed successfully, and only the allowed resources will
|
||||||
// be returned in watch.
|
// be returned in watch.
|
||||||
func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan types.APIEvent, error) {
|
func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan watch.Event, error) {
|
||||||
adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace)
|
adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -388,11 +339,16 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
result := make(chan types.APIEvent)
|
result := make(chan watch.Event)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(result)
|
defer close(result)
|
||||||
for item := range c {
|
for item := range c {
|
||||||
if item.Error == nil && names.Has(item.Object.Name()) {
|
|
||||||
|
m, err := meta.Accessor(item.Object)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if item.Type != watch.Error && names.Has(m.GetName()) {
|
||||||
result <- item
|
result <- item
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -402,7 +358,7 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Watch returns a channel of events for a list or resource.
|
// Watch returns a channel of events for a list or resource.
|
||||||
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
|
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) {
|
||||||
client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace)
|
client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -410,8 +366,8 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
|
|||||||
return s.watch(apiOp, schema, w, client)
|
return s.watch(apiOp, schema, w, client)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, client dynamic.ResourceInterface) (chan types.APIEvent, error) {
|
func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, client dynamic.ResourceInterface) (chan watch.Event, error) {
|
||||||
result := make(chan types.APIEvent)
|
result := make(chan watch.Event)
|
||||||
go func() {
|
go func() {
|
||||||
s.listAndWatch(apiOp, client, schema, w, result)
|
s.listAndWatch(apiOp, client, schema, w, result)
|
||||||
logrus.Debugf("closing watcher for %s", schema.ID)
|
logrus.Debugf("closing watcher for %s", schema.ID)
|
||||||
@@ -420,35 +376,8 @@ func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
|
|||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et watch.EventType, obj runtime.Object) types.APIEvent {
|
|
||||||
name := types.ChangeAPIEvent
|
|
||||||
switch et {
|
|
||||||
case watch.Deleted:
|
|
||||||
name = types.RemoveAPIEvent
|
|
||||||
case watch.Added:
|
|
||||||
name = types.CreateAPIEvent
|
|
||||||
}
|
|
||||||
|
|
||||||
if unstr, ok := obj.(*unstructured.Unstructured); ok {
|
|
||||||
rowToObject(unstr)
|
|
||||||
}
|
|
||||||
|
|
||||||
event := types.APIEvent{
|
|
||||||
Name: name,
|
|
||||||
Object: toAPI(schema, obj),
|
|
||||||
}
|
|
||||||
|
|
||||||
m, err := meta.Accessor(obj)
|
|
||||||
if err != nil {
|
|
||||||
return event
|
|
||||||
}
|
|
||||||
|
|
||||||
event.Revision = m.GetResourceVersion()
|
|
||||||
return event
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create creates a single object in the store.
|
// Create creates a single object in the store.
|
||||||
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (types.APIObject, error) {
|
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (*unstructured.Unstructured, error) {
|
||||||
var (
|
var (
|
||||||
resp *unstructured.Unstructured
|
resp *unstructured.Unstructured
|
||||||
)
|
)
|
||||||
@@ -474,22 +403,21 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params
|
|||||||
|
|
||||||
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
|
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := metav1.CreateOptions{}
|
opts := metav1.CreateOptions{}
|
||||||
if err := decodeParams(apiOp, &opts); err != nil {
|
if err := decodeParams(apiOp, &opts); err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err = k8sClient.Create(apiOp, &unstructured.Unstructured{Object: input}, opts)
|
resp, err = k8sClient.Create(apiOp, &unstructured.Unstructured{Object: input}, opts)
|
||||||
rowToObject(resp)
|
rowToObject(resp)
|
||||||
apiObject := toAPI(schema, resp)
|
return resp, err
|
||||||
return apiObject, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update updates a single object in the store.
|
// Update updates a single object in the store.
|
||||||
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (types.APIObject, error) {
|
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (*unstructured.Unstructured, error) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
input = params.Data()
|
input = params.Data()
|
||||||
@@ -498,13 +426,13 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
|
|||||||
ns := types.Namespace(input)
|
ns := types.Namespace(input)
|
||||||
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
|
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if apiOp.Method == http.MethodPatch {
|
if apiOp.Method == http.MethodPatch {
|
||||||
bytes, err := ioutil.ReadAll(io.LimitReader(apiOp.Request.Body, 2<<20))
|
bytes, err := ioutil.ReadAll(io.LimitReader(apiOp.Request.Body, 2<<20))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
pType := apitypes.StrategicMergePatchType
|
pType := apitypes.StrategicMergePatchType
|
||||||
@@ -514,70 +442,70 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
|
|||||||
|
|
||||||
opts := metav1.PatchOptions{}
|
opts := metav1.PatchOptions{}
|
||||||
if err := decodeParams(apiOp, &opts); err != nil {
|
if err := decodeParams(apiOp, &opts); err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if pType == apitypes.StrategicMergePatchType {
|
if pType == apitypes.StrategicMergePatchType {
|
||||||
data := map[string]interface{}{}
|
data := map[string]interface{}{}
|
||||||
if err := json.Unmarshal(bytes, &data); err != nil {
|
if err := json.Unmarshal(bytes, &data); err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
data = moveFromUnderscore(data)
|
data = moveFromUnderscore(data)
|
||||||
bytes, err = json.Marshal(data)
|
bytes, err = json.Marshal(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := k8sClient.Patch(apiOp, id, pType, bytes, opts)
|
resp, err := k8sClient.Patch(apiOp, id, pType, bytes, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return toAPI(schema, resp), nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
resourceVersion := input.String("metadata", "resourceVersion")
|
resourceVersion := input.String("metadata", "resourceVersion")
|
||||||
if resourceVersion == "" {
|
if resourceVersion == "" {
|
||||||
return types.APIObject{}, fmt.Errorf("metadata.resourceVersion is required for update")
|
return nil, fmt.Errorf("metadata.resourceVersion is required for update")
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := metav1.UpdateOptions{}
|
opts := metav1.UpdateOptions{}
|
||||||
if err := decodeParams(apiOp, &opts); err != nil {
|
if err := decodeParams(apiOp, &opts); err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := k8sClient.Update(apiOp, &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{})
|
resp, err := k8sClient.Update(apiOp, &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
rowToObject(resp)
|
rowToObject(resp)
|
||||||
return toAPI(schema, resp), nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete deletes an object from a store.
|
// Delete deletes an object from a store.
|
||||||
func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) {
|
func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
|
||||||
opts := metav1.DeleteOptions{}
|
opts := metav1.DeleteOptions{}
|
||||||
if err := decodeParams(apiOp, &opts); err != nil {
|
if err := decodeParams(apiOp, &opts); err != nil {
|
||||||
return types.APIObject{}, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace))
|
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := k8sClient.Delete(apiOp, id, opts); err != nil {
|
if err := k8sClient.Delete(apiOp, id, opts); err != nil {
|
||||||
return types.APIObject{}, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := s.byID(apiOp, schema, apiOp.Namespace, id)
|
obj, err := s.byID(apiOp, schema, apiOp.Namespace, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// ignore lookup error
|
// ignore lookup error
|
||||||
return types.APIObject{}, validation.ErrorCode{
|
return nil, validation.ErrorCode{
|
||||||
Status: http.StatusNoContent,
|
Status: http.StatusNoContent,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return toAPI(schema, obj), nil
|
return obj, nil
|
||||||
}
|
}
|
||||||
|
@@ -9,7 +9,9 @@ import (
|
|||||||
"github.com/rancher/steve/pkg/attributes"
|
"github.com/rancher/steve/pkg/attributes"
|
||||||
"github.com/rancher/steve/pkg/stores/partition"
|
"github.com/rancher/steve/pkg/stores/partition"
|
||||||
"github.com/rancher/wrangler/pkg/kv"
|
"github.com/rancher/wrangler/pkg/kv"
|
||||||
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/apimachinery/pkg/watch"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -85,8 +87,8 @@ func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store returns a proxy Store suited to listing and watching resources by partition.
|
// Store returns an UnstructuredStore suited to listing and watching resources by partition.
|
||||||
func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) {
|
func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (partition.UnstructuredStore, error) {
|
||||||
return &byNameOrNamespaceStore{
|
return &byNameOrNamespaceStore{
|
||||||
Store: p.proxyStore,
|
Store: p.proxyStore,
|
||||||
partition: partition.(Partition),
|
partition: partition.(Partition),
|
||||||
@@ -99,7 +101,7 @@ type byNameOrNamespaceStore struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// List returns a list of resources by partition.
|
// List returns a list of resources by partition.
|
||||||
func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) {
|
func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
|
||||||
if b.partition.Passthrough {
|
if b.partition.Passthrough {
|
||||||
return b.Store.List(apiOp, schema)
|
return b.Store.List(apiOp, schema)
|
||||||
}
|
}
|
||||||
@@ -112,7 +114,7 @@ func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.API
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Watch returns a channel of resources by partition.
|
// Watch returns a channel of resources by partition.
|
||||||
func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) {
|
func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan watch.Event, error) {
|
||||||
if b.partition.Passthrough {
|
if b.partition.Passthrough {
|
||||||
return b.Store.Watch(apiOp, schema, wr)
|
return b.Store.Watch(apiOp, schema, wr)
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user