Merge pull request #63 from cmurphy/pagination-sorting-filtering

Pagination, sorting, filtering
This commit is contained in:
Colleen Murphy 2022-12-20 15:59:29 -08:00 committed by GitHub
commit 7565dba268
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 4510 additions and 194 deletions

186
README.md Normal file
View File

@ -0,0 +1,186 @@
steve
=====
Steve is a lightweight API proxy for Kubernetes whose aim is to create an
interface layer suitable for dashboards to efficiently interact with
Kubernetes.
API Usage
---------
### Kubernetes proxy
Requests made to `/api`, `/api/*`, `/apis/*`, `/openapi/*` and `/version` will
be proxied directly to Kubernetes.
### /v1 API
Steve registers all Kubernetes resources as schemas in the /v1 API. Any
endpoint can support methods GET, POST, PATCH, PUT, or DELETE, depending on
what the underlying Kubernetes endpoint supports and the user's permissions.
* `/v1/{type}` - all cluster-scoped resources OR all resources in all
namespaces of type `{type}` that the user has access to
* `/v1/{type}/{name}` - cluster-scoped resource of type `{type}` and unique name `{name}`
* `/v1/{type}/{namespace}` - all resources of type `{type}` under namespace `{namespace}`
* `/v1/{type}/{namespace}/{name}` - resource of type `{type}` under namespace
`{namespace}` with name `{name}` unique within the namespace
### Query parameters
Steve supports query parameters to perform actions or process data on top of
what Kubernetes supports.
#### `link`
Trigger a link handler, which is registered with the schema. Examples are
calling the shell for a cluster, or following logs during cluster or catalog
operations:
```
GET /v1/management.cattle.io.clusters/local?link=log
```
#### `action`
Trigger an action handler, which is registered with the schema. Examples are
generating a kubeconfig for a cluster, or installing an app from a catalog:
```
POST /v1/catalog.cattle.io.clusterrepos/rancher-partner-charts?action=install
```
#### `limit`
Only applicable to list requests (`/v1/{type}` and `/v1/{type}/{namespace}`).
Set the maximum number of results to retrieve from Kubernetes. The limit is
passed on as a parameter to the Kubernetes request. The purpose of setting this
limit is to prevent a huge response from overwhelming Steve and Rancher. For
more information about setting limits, review the Kubernetes documentation on
[retrieving results in
chunks](https://kubernetes.io/docs/reference/using-api/api-concepts/#retrieving-large-results-sets-in-chunks).
The limit controls the size of the set coming from Kubernetes, and then
filtering, sorting, and pagination are applied on that set. Because of this, if
the result set is partial, there is no guarantee that the result returned to
the client is fully sorted across the entire list, only across the returned
chunk.
The returned response will include a `continue` token, which indicates that the
result is partial and must be used in the subsequent request to retrieve the
next chunk.
The default limit is 100000. To override the default, set `limit=-1`.
#### `continue`
Only applicable to list requests (`/v1/{type}` and `/v1/{type}/{namespace}`).
Continue retrieving the next chunk of a partial list. The continue token is
included in the response of a limited list and indicates that the result is
partial. This token can then be used as a query parameter to retrieve the next
chunk. All chunks have been retrieved when the continue field in the response
is empty.
#### `filter`
Only applicable to list requests (`/v1/{type}` and `/v1/{type}/{namespace}`).
Filter results by a designated field. Filter keys use dot notation to denote
the subfield of an object to filter on. The filter value is matched as a
substring.
Example, filtering by object name:
```
/v1/{type}?filter=metadata.name=foo
```
Filters are ANDed together, so an object must match all filters to be
included in the list.
```
/v1/{type}?filter=metadata.name=foo&filter=metadata.namespace=bar
```
Arrays are searched for matching items. If any item in the array matches, the
item is included in the list.
```
/v1/{type}?filter=spec.containers.image=alpine
```
#### `sort`
Only applicable to list requests (`/v1/{type}` and `/v1/{type}/{namespace}`).
Results can be sorted lexicographically by primary and secondary columns.
Sorting by only a primary column, for example name:
```
/v1/{type}?sort=metadata.name
```
Reverse sorting by name:
```
/v1/{type}?sort=-metadata.name
```
The secondary sort criteria is comma separated.
Example, sorting by name and creation time in ascending order:
```
/v1/{type}?sort=metadata.name,metadata.creationTimestamp
```
Reverse sort by name, normal sort by creation time:
```
/v1/{type}?sort=-metadata.name,metadata.creationTimestamp
```
Normal sort by name, reverse sort by creation time:
```
/v1/{type}?sort=metadata.name,-metadata.creationTimestamp
```
#### `page`, `pagesize`, and `revision`
Only applicable to list requests (`/v1/{type}` and `/v1/{type}/{namespace}`).
Results can be batched by pages for easier display.
Example initial request returning a page with 10 results:
```
/v1/{type}?pagesize=10
```
Pages are one-indexed, so this is equivalent to
```
/v1/{type}?pagesize=10&page=1
```
To retrieve subsequent pages, the page number and the list revision number must
be included in the request. This ensures the page will be retrieved from the
cache, rather than making a new request to Kubernetes. If the revision number
is omitted, a new fetch is performed in order to get the latest revision. The
revision is included in the list response.
```
/v1/{type}?pagezie=10&page=2&revision=107440
```
The total number of pages and individual items are included in the list
response as `pages` and `count` respectively.
If a page number is out of bounds, an empty list is returned.
`page` and `pagesize` can be used alongside the `limit` and `continue`
parameters supported by Kubernetes. `limit` and `continue` are typically used
for server-side chunking and do not guarantee results in any order.

2
go.mod
View File

@ -18,7 +18,7 @@ require (
github.com/pborman/uuid v1.2.0 github.com/pborman/uuid v1.2.0
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1 github.com/prometheus/client_golang v1.12.1
github.com/rancher/apiserver v0.0.0-20221205175736-7c507bd5c076 github.com/rancher/apiserver v0.0.0-20221220225852-94cba4f28cfd
github.com/rancher/dynamiclistener v0.3.5 github.com/rancher/dynamiclistener v0.3.5
github.com/rancher/kubernetes-provider-detector v0.1.2 github.com/rancher/kubernetes-provider-detector v0.1.2
github.com/rancher/norman v0.0.0-20221205184727-32ef2e185b99 github.com/rancher/norman v0.0.0-20221205184727-32ef2e185b99

4
go.sum
View File

@ -502,8 +502,8 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rancher/apiserver v0.0.0-20221205175736-7c507bd5c076 h1:wS95KbXFI1QOVQr3Tz+qyOJ9iia1ITCnjsapxJyI/9U= github.com/rancher/apiserver v0.0.0-20221220225852-94cba4f28cfd h1:g0hNrbONfmY4lxvrD2q9KkueYYY4wKUYscm6Ih0QfQ0=
github.com/rancher/apiserver v0.0.0-20221205175736-7c507bd5c076/go.mod h1:xwQhXv3XFxWfA6tLa4ZeaERu8ldNbyKv2sF+mT+c5WA= github.com/rancher/apiserver v0.0.0-20221220225852-94cba4f28cfd/go.mod h1:xwQhXv3XFxWfA6tLa4ZeaERu8ldNbyKv2sF+mT+c5WA=
github.com/rancher/client-go v1.25.4-rancher1 h1:9MlBC8QbgngUkhNzMR8rZmmCIj6WNRHFOnYiwC2Kty4= github.com/rancher/client-go v1.25.4-rancher1 h1:9MlBC8QbgngUkhNzMR8rZmmCIj6WNRHFOnYiwC2Kty4=
github.com/rancher/client-go v1.25.4-rancher1/go.mod h1:8trHCAC83XKY0wsBIpbirZU4NTUpbuhc2JnI7OruGZw= github.com/rancher/client-go v1.25.4-rancher1/go.mod h1:8trHCAC83XKY0wsBIpbirZU4NTUpbuhc2JnI7OruGZw=
github.com/rancher/dynamiclistener v0.3.5 h1:5TaIHvkDGmZKvc96Huur16zfTKOiLhDtK4S+WV0JA6A= github.com/rancher/dynamiclistener v0.3.5 h1:5TaIHvkDGmZKvc96Huur16zfTKOiLhDtK4S+WV0JA6A=

View File

@ -0,0 +1,300 @@
// Package listprocessor contains methods for filtering, sorting, and paginating lists of objects.
package listprocessor
import (
"sort"
"strconv"
"strings"
"github.com/rancher/apiserver/pkg/types"
"github.com/rancher/wrangler/pkg/data"
"github.com/rancher/wrangler/pkg/data/convert"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
const (
defaultLimit = 100000
continueParam = "continue"
limitParam = "limit"
filterParam = "filter"
sortParam = "sort"
pageSizeParam = "pagesize"
pageParam = "page"
revisionParam = "revision"
)
// ListOptions represents the query parameters that may be included in a list request.
type ListOptions struct {
ChunkSize int
Resume string
Filters []Filter
Sort Sort
Pagination Pagination
Revision string
}
// Filter represents a field to filter by.
// A subfield in an object is represented in a request query using . notation, e.g. 'metadata.name'.
// The subfield is internally represented as a slice, e.g. [metadata, name].
type Filter struct {
field []string
match string
}
// String returns the filter as a query string.
func (f Filter) String() string {
field := strings.Join(f.field, ".")
return field + "=" + f.match
}
// SortOrder represents whether the list should be ascending or descending.
type SortOrder int
const (
// ASC stands for ascending order.
ASC SortOrder = iota
// DESC stands for descending (reverse) order.
DESC
)
// Sort represents the criteria to sort on.
// The subfield to sort by is represented in a request query using . notation, e.g. 'metadata.name'.
// The subfield is internally represented as a slice, e.g. [metadata, name].
// The order is represented by prefixing the sort key by '-', e.g. sort=-metadata.name.
type Sort struct {
primaryField []string
secondaryField []string
primaryOrder SortOrder
secondaryOrder SortOrder
}
// String returns the sort parameters as a query string.
func (s Sort) String() string {
field := ""
if s.primaryOrder == DESC {
field = "-" + field
}
field += strings.Join(s.primaryField, ".")
if len(s.secondaryField) > 0 {
field += ","
if s.secondaryOrder == DESC {
field += "-"
}
field += strings.Join(s.secondaryField, ".")
}
return field
}
// Pagination represents how to return paginated results.
type Pagination struct {
pageSize int
page int
}
// PageSize returns the integer page size.
func (p Pagination) PageSize() int {
return p.pageSize
}
// ParseQuery parses the query params of a request and returns a ListOptions.
func ParseQuery(apiOp *types.APIRequest) *ListOptions {
chunkSize := getLimit(apiOp)
q := apiOp.Request.URL.Query()
cont := q.Get(continueParam)
filterParams := q[filterParam]
filterOpts := []Filter{}
for _, filters := range filterParams {
filter := strings.Split(filters, "=")
if len(filter) != 2 {
continue
}
filterOpts = append(filterOpts, Filter{field: strings.Split(filter[0], "."), match: filter[1]})
}
// sort the filter fields so they can be used as a cache key in the store
sort.Slice(filterOpts, func(i, j int) bool {
fieldI := strings.Join(filterOpts[i].field, ".")
fieldJ := strings.Join(filterOpts[j].field, ".")
return fieldI < fieldJ
})
sortOpts := Sort{}
sortKeys := q.Get(sortParam)
if sortKeys != "" {
sortParts := strings.SplitN(sortKeys, ",", 2)
primaryField := sortParts[0]
if primaryField != "" && primaryField[0] == '-' {
sortOpts.primaryOrder = DESC
primaryField = primaryField[1:]
}
if primaryField != "" {
sortOpts.primaryField = strings.Split(primaryField, ".")
}
if len(sortParts) > 1 {
secondaryField := sortParts[1]
if secondaryField != "" && secondaryField[0] == '-' {
sortOpts.secondaryOrder = DESC
secondaryField = secondaryField[1:]
}
if secondaryField != "" {
sortOpts.secondaryField = strings.Split(secondaryField, ".")
}
}
}
var err error
pagination := Pagination{}
pagination.pageSize, err = strconv.Atoi(q.Get(pageSizeParam))
if err != nil {
pagination.pageSize = 0
}
pagination.page, err = strconv.Atoi(q.Get(pageParam))
if err != nil {
pagination.page = 1
}
revision := q.Get(revisionParam)
return &ListOptions{
ChunkSize: chunkSize,
Resume: cont,
Filters: filterOpts,
Sort: sortOpts,
Pagination: pagination,
Revision: revision,
}
}
// getLimit extracts the limit parameter from the request or sets a default of 100000.
// The default limit can be explicitly disabled by setting it to zero or negative.
// If the default is accepted, clients must be aware that the list may be incomplete, and use the "continue" token to get the next chunk of results.
func getLimit(apiOp *types.APIRequest) int {
limitString := apiOp.Request.URL.Query().Get(limitParam)
limit, err := strconv.Atoi(limitString)
if err != nil {
limit = defaultLimit
}
return limit
}
// FilterList accepts a channel of unstructured objects and a slice of filters and returns the filtered list.
// Filters are ANDed together.
func FilterList(list <-chan []unstructured.Unstructured, filters []Filter) []unstructured.Unstructured {
result := []unstructured.Unstructured{}
for items := range list {
for _, item := range items {
if len(filters) == 0 {
result = append(result, item)
continue
}
if matchesAll(item.Object, filters) {
result = append(result, item)
}
}
}
return result
}
func matchesOne(obj map[string]interface{}, filter Filter) bool {
var objValue interface{}
var ok bool
subField := []string{}
for !ok && len(filter.field) > 0 {
objValue, ok = data.GetValue(obj, filter.field...)
if !ok {
subField = append(subField, filter.field[len(filter.field)-1])
filter.field = filter.field[:len(filter.field)-1]
}
}
if !ok {
return false
}
switch typedVal := objValue.(type) {
case string, int, bool:
if len(subField) > 0 {
return false
}
stringVal := convert.ToString(typedVal)
if strings.Contains(stringVal, filter.match) {
return true
}
case []interface{}:
filter = Filter{field: subField, match: filter.match}
if matchesAny(typedVal, filter) {
return true
}
}
return false
}
func matchesAny(obj []interface{}, filter Filter) bool {
for _, v := range obj {
switch typedItem := v.(type) {
case string, int, bool:
stringVal := convert.ToString(typedItem)
if strings.Contains(stringVal, filter.match) {
return true
}
case map[string]interface{}:
if matchesOne(typedItem, filter) {
return true
}
case []interface{}:
if matchesAny(typedItem, filter) {
return true
}
}
}
return false
}
func matchesAll(obj map[string]interface{}, filters []Filter) bool {
for _, f := range filters {
if !matchesOne(obj, f) {
return false
}
}
return true
}
// SortList sorts the slice by the provided sort criteria.
func SortList(list []unstructured.Unstructured, s Sort) []unstructured.Unstructured {
if len(s.primaryField) == 0 {
return list
}
sort.Slice(list, func(i, j int) bool {
leftPrime := convert.ToString(data.GetValueN(list[i].Object, s.primaryField...))
rightPrime := convert.ToString(data.GetValueN(list[j].Object, s.primaryField...))
if leftPrime == rightPrime && len(s.secondaryField) > 0 {
leftSecond := convert.ToString(data.GetValueN(list[i].Object, s.secondaryField...))
rightSecond := convert.ToString(data.GetValueN(list[j].Object, s.secondaryField...))
if s.secondaryOrder == ASC {
return leftSecond < rightSecond
}
return rightSecond < leftSecond
}
if s.primaryOrder == ASC {
return leftPrime < rightPrime
}
return rightPrime < leftPrime
})
return list
}
// PaginateList returns a subset of the result based on the pagination criteria as well as the total number of pages the caller can expect.
func PaginateList(list []unstructured.Unstructured, p Pagination) ([]unstructured.Unstructured, int) {
if p.pageSize <= 0 {
return list, 0
}
page := p.page - 1
if p.page < 1 {
page = 0
}
pages := len(list) / p.pageSize
if len(list)%p.pageSize != 0 {
pages++
}
offset := p.pageSize * page
if offset > len(list) {
return []unstructured.Unstructured{}, pages
}
if offset+p.pageSize > len(list) {
return list[offset:], pages
}
return list[offset : offset+p.pageSize], pages
}

File diff suppressed because it is too large Load Diff

View File

@ -5,9 +5,9 @@ import (
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"github.com/rancher/apiserver/pkg/types"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore" "golang.org/x/sync/semaphore"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
) )
// Partition represents a named grouping of kubernetes resources, // Partition represents a named grouping of kubernetes resources,
@ -33,7 +33,7 @@ type ParallelPartitionLister struct {
} }
// PartitionLister lists objects for one partition. // PartitionLister lists objects for one partition.
type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) type PartitionLister func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error)
// Err returns the latest error encountered. // Err returns the latest error encountered.
func (p *ParallelPartitionLister) Err() error { func (p *ParallelPartitionLister) Err() error {
@ -72,7 +72,7 @@ func indexOrZero(partitions []Partition, name string) int {
// List returns a stream of objects up to the requested limit. // List returns a stream of objects up to the requested limit.
// If the continue token is not empty, it decodes it and returns the stream // If the continue token is not empty, it decodes it and returns the stream
// starting at the indicated marker. // starting at the indicated marker.
func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume string) (<-chan []types.APIObject, error) { func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume, revision string) (<-chan []unstructured.Unstructured, error) {
var state listState var state listState
if resume != "" { if resume != "" {
bytes, err := base64.StdEncoding.DecodeString(resume) bytes, err := base64.StdEncoding.DecodeString(resume)
@ -86,9 +86,11 @@ func (p *ParallelPartitionLister) List(ctx context.Context, limit int, resume st
if state.Limit > 0 { if state.Limit > 0 {
limit = state.Limit limit = state.Limit
} }
} else {
state.Revision = revision
} }
result := make(chan []types.APIObject) result := make(chan []unstructured.Unstructured)
go p.feeder(ctx, state, limit, result) go p.feeder(ctx, state, limit, result)
return result, nil return result, nil
} }
@ -120,7 +122,7 @@ type listState struct {
// 100000, the result is truncated and a continue token is generated that // 100000, the result is truncated and a continue token is generated that
// indicates the partition and offset for the client to start on in the next // indicates the partition and offset for the client to start on in the next
// request. // request.
func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []types.APIObject) { func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, limit int, result chan []unstructured.Unstructured) {
var ( var (
sem = semaphore.NewWeighted(p.Concurrency) sem = semaphore.NewWeighted(p.Concurrency)
capacity = limit capacity = limit
@ -137,7 +139,7 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
}() }()
for i := indexOrZero(p.Partitions, state.PartitionName); i < len(p.Partitions); i++ { for i := indexOrZero(p.Partitions, state.PartitionName); i < len(p.Partitions); i++ {
if capacity <= 0 || isDone(ctx) { if (limit > 0 && capacity <= 0) || isDone(ctx) {
break break
} }
@ -183,25 +185,25 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
} }
if state.Revision == "" { if state.Revision == "" {
state.Revision = list.Revision state.Revision = list.GetResourceVersion()
} }
if p.revision == "" { if p.revision == "" {
p.revision = list.Revision p.revision = list.GetResourceVersion()
} }
// We have already seen the first objects in the list, truncate up to the offset. // We have already seen the first objects in the list, truncate up to the offset.
if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Objects) { if state.PartitionName == partition.Name() && state.Offset > 0 && state.Offset < len(list.Items) {
list.Objects = list.Objects[state.Offset:] list.Items = list.Items[state.Offset:]
} }
// Case 1: the capacity has been reached across all goroutines but the list is still only partial, // Case 1: the capacity has been reached across all goroutines but the list is still only partial,
// so save the state so that the next page can be requested later. // so save the state so that the next page can be requested later.
if len(list.Objects) > capacity { if limit > 0 && len(list.Items) > capacity {
result <- list.Objects[:capacity] result <- list.Items[:capacity]
// save state to redo this list at this offset // save state to redo this list at this offset
p.state = &listState{ p.state = &listState{
Revision: list.Revision, Revision: list.GetResourceVersion(),
PartitionName: partition.Name(), PartitionName: partition.Name(),
Continue: cont, Continue: cont,
Offset: capacity, Offset: capacity,
@ -210,16 +212,16 @@ func (p *ParallelPartitionLister) feeder(ctx context.Context, state listState, l
capacity = 0 capacity = 0
return nil return nil
} }
result <- list.Objects result <- list.Items
capacity -= len(list.Objects) capacity -= len(list.Items)
// Case 2: all objects have been returned, we are done. // Case 2: all objects have been returned, we are done.
if list.Continue == "" { if list.GetContinue() == "" {
return nil return nil
} }
// Case 3: we started at an offset and truncated the list to skip the objects up to the offset. // Case 3: we started at an offset and truncated the list to skip the objects up to the offset.
// We're not yet up to capacity and have not retrieved every object, // We're not yet up to capacity and have not retrieved every object,
// so loop again and get more data. // so loop again and get more data.
state.Continue = list.Continue state.Continue = list.GetContinue()
state.PartitionName = partition.Name() state.PartitionName = partition.Name()
state.Offset = 0 state.Offset = 0
} }

View File

@ -1,29 +1,93 @@
// Package partition implements a store with parallel partitioning of data
// so that segmented data can be concurrently collected and returned as a single data set.
package partition package partition
import ( import (
"context" "context"
"net/http" "fmt"
"os"
"reflect"
"strconv" "strconv"
"time"
"github.com/rancher/apiserver/pkg/types" "github.com/rancher/apiserver/pkg/types"
"github.com/rancher/steve/pkg/accesscontrol"
"github.com/rancher/steve/pkg/stores/partition/listprocessor"
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/request"
) )
const defaultLimit = 100000 const (
// Number of list request entries to save before cache replacement.
// Not related to the total size in memory of the cache, as any item could take any amount of memory.
cacheSizeEnv = "CATTLE_REQUEST_CACHE_SIZE_INT"
defaultCacheSize = 1000
// Set to non-empty to disable list request caching entirely.
cacheDisableEnv = "CATTLE_REQUEST_CACHE_DISABLED"
)
// Partitioner is an interface for interacting with partitions. // Partitioner is an interface for interacting with partitions.
type Partitioner interface { type Partitioner interface {
Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error) Lookup(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (Partition, error)
All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error) All(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) ([]Partition, error)
Store(apiOp *types.APIRequest, partition Partition) (types.Store, error) Store(apiOp *types.APIRequest, partition Partition) (UnstructuredStore, error)
} }
// Store implements types.Store for partitions. // Store implements types.Store for partitions.
type Store struct { type Store struct {
Partitioner Partitioner Partitioner Partitioner
listCache *cache.LRUExpireCache
asl accesscontrol.AccessSetLookup
} }
func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (types.Store, error) { // NewStore creates a types.Store implementation with a partitioner and an LRU expiring cache for list responses.
func NewStore(partitioner Partitioner, asl accesscontrol.AccessSetLookup) *Store {
cacheSize := defaultCacheSize
if v := os.Getenv(cacheSizeEnv); v != "" {
sizeInt, err := strconv.Atoi(v)
if err == nil {
cacheSize = sizeInt
}
}
s := &Store{
Partitioner: partitioner,
asl: asl,
}
if v := os.Getenv(cacheDisableEnv); v == "" {
s.listCache = cache.NewLRUExpireCache(cacheSize)
}
return s
}
type cacheKey struct {
chunkSize int
resume string
filters string
sort string
pageSize int
accessID string
resourcePath string
revision string
}
// UnstructuredStore is like types.Store but deals in k8s unstructured objects instead of apiserver types.
type UnstructuredStore interface {
ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error)
List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error)
Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (*unstructured.Unstructured, error)
Update(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject, id string) (*unstructured.Unstructured, error)
Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error)
Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error)
}
func (s *Store) getStore(apiOp *types.APIRequest, schema *types.APISchema, verb, id string) (UnstructuredStore, error) {
p, err := s.Partitioner.Lookup(apiOp, schema, verb, id) p, err := s.Partitioner.Lookup(apiOp, schema, verb, id)
if err != nil { if err != nil {
return nil, err return nil, err
@ -39,7 +103,11 @@ func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id stri
return types.APIObject{}, err return types.APIObject{}, err
} }
return target.Delete(apiOp, schema, id) obj, err := target.Delete(apiOp, schema, id)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
} }
// ByID looks up a single object by its ID. // ByID looks up a single object by its ID.
@ -49,14 +117,18 @@ func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string
return types.APIObject{}, err return types.APIObject{}, err
} }
return target.ByID(apiOp, schema, id) obj, err := target.ByID(apiOp, schema, id)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
} }
func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition, func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, schema *types.APISchema, partition Partition,
cont string, revision string, limit int) (types.APIObjectList, error) { cont string, revision string, limit int) (*unstructured.UnstructuredList, error) {
store, err := s.Partitioner.Store(apiOp, partition) store, err := s.Partitioner.Store(apiOp, partition)
if err != nil { if err != nil {
return types.APIObjectList{}, err return nil, err
} }
req := apiOp.Clone() req := apiOp.Clone()
@ -64,7 +136,10 @@ func (s *Store) listPartition(ctx context.Context, apiOp *types.APIRequest, sche
values := req.Request.URL.Query() values := req.Request.URL.Query()
values.Set("continue", cont) values.Set("continue", cont)
values.Set("revision", revision) if revision != "" && cont == "" {
values.Set("resourceVersion", revision)
values.Set("resourceVersionMatch", "Exact") // supported since k8s 1.19
}
if limit > 0 { if limit > 0 {
values.Set("limit", strconv.Itoa(limit)) values.Set("limit", strconv.Itoa(limit))
} else { } else {
@ -88,30 +163,90 @@ func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.AP
} }
lister := ParallelPartitionLister{ lister := ParallelPartitionLister{
Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (types.APIObjectList, error) { Lister: func(ctx context.Context, partition Partition, cont string, revision string, limit int) (*unstructured.UnstructuredList, error) {
return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit) return s.listPartition(ctx, apiOp, schema, partition, cont, revision, limit)
}, },
Concurrency: 3, Concurrency: 3,
Partitions: partitions, Partitions: partitions,
} }
resume := apiOp.Request.URL.Query().Get("continue") opts := listprocessor.ParseQuery(apiOp)
limit := getLimit(apiOp.Request)
list, err := lister.List(apiOp.Context(), limit, resume) key, err := s.getCacheKey(apiOp, opts)
if err != nil { if err != nil {
return result, err return result, err
} }
for items := range list { var list []unstructured.Unstructured
result.Objects = append(result.Objects, items...) if key.revision != "" && s.listCache != nil {
cachedList, ok := s.listCache.Get(key)
if ok {
logrus.Tracef("found cached list for query %s?%s", apiOp.Request.URL.Path, apiOp.Request.URL.RawQuery)
list = cachedList.(*unstructured.UnstructuredList).Items
result.Continue = cachedList.(*unstructured.UnstructuredList).GetContinue()
}
}
if list == nil { // did not look in cache or was not found in cache
stream, err := lister.List(apiOp.Context(), opts.ChunkSize, opts.Resume, opts.Revision)
if err != nil {
return result, err
}
list = listprocessor.FilterList(stream, opts.Filters)
// Check for any errors returned during the parallel listing requests.
// We don't want to cache the list or bother with further processing if the list is empty or corrupt.
// FilterList guarantees that the stream has been consumed and the error is populated if there is any.
if lister.Err() != nil {
return result, lister.Err()
}
list = listprocessor.SortList(list, opts.Sort)
key.revision = lister.Revision()
listToCache := &unstructured.UnstructuredList{
Items: list,
}
c := lister.Continue()
if c != "" {
listToCache.SetContinue(c)
}
if s.listCache != nil {
s.listCache.Add(key, listToCache, 30*time.Minute)
}
result.Continue = lister.Continue()
}
result.Count = len(list)
list, pages := listprocessor.PaginateList(list, opts.Pagination)
for _, item := range list {
item := item
result.Objects = append(result.Objects, toAPI(schema, &item))
} }
result.Revision = lister.Revision() result.Revision = key.revision
result.Continue = lister.Continue() result.Pages = pages
return result, lister.Err() return result, lister.Err()
} }
// getCacheKey returns a hashable struct identifying a unique user and request.
func (s *Store) getCacheKey(apiOp *types.APIRequest, opts *listprocessor.ListOptions) (cacheKey, error) {
user, ok := request.UserFrom(apiOp.Request.Context())
if !ok {
return cacheKey{}, fmt.Errorf("could not find user in request")
}
filters := ""
for _, f := range opts.Filters {
filters = filters + f.String()
}
return cacheKey{
chunkSize: opts.ChunkSize,
resume: opts.Resume,
filters: filters,
sort: opts.Sort.String(),
pageSize: opts.Pagination.PageSize(),
accessID: s.asl.AccessFor(user).ID,
resourcePath: apiOp.Request.URL.Path,
revision: opts.Revision,
}, nil
}
// Create creates a single object in the store. // Create creates a single object in the store.
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) { func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data types.APIObject) (types.APIObject, error) {
target, err := s.getStore(apiOp, schema, "create", "") target, err := s.getStore(apiOp, schema, "create", "")
@ -119,7 +254,11 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, data ty
return types.APIObject{}, err return types.APIObject{}, err
} }
return target.Create(apiOp, schema, data) obj, err := target.Create(apiOp, schema, data)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
} }
// Update updates a single object in the store. // Update updates a single object in the store.
@ -129,7 +268,11 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, data ty
return types.APIObject{}, err return types.APIObject{}, err
} }
return target.Update(apiOp, schema, data, id) obj, err := target.Update(apiOp, schema, data, id)
if err != nil {
return types.APIObject{}, err
}
return toAPI(schema, obj), nil
} }
// Watch returns a channel of events for a list or resource. // Watch returns a channel of events for a list or resource.
@ -159,7 +302,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types
return err return err
} }
for i := range c { for i := range c {
response <- i response <- toAPIEvent(apiOp, schema, i)
} }
return nil return nil
}) })
@ -175,17 +318,79 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types
return response, nil return response, nil
} }
// getLimit extracts the limit parameter from the request or sets a default of 100000. func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
// Since a default is always set, this implies that clients must always be if obj == nil || reflect.ValueOf(obj).IsNil() {
// aware that the list may be incomplete. return types.APIObject{}
func getLimit(req *http.Request) int { }
limitString := req.URL.Query().Get("limit")
limit, err := strconv.Atoi(limitString) if unstr, ok := obj.(*unstructured.Unstructured); ok {
obj = moveToUnderscore(unstr)
}
apiObject := types.APIObject{
Type: schema.ID,
Object: obj,
}
m, err := meta.Accessor(obj)
if err != nil { if err != nil {
limit = 0 return apiObject
} }
if limit <= 0 {
limit = defaultLimit id := m.GetName()
ns := m.GetNamespace()
if ns != "" {
id = fmt.Sprintf("%s/%s", ns, id)
} }
return limit
apiObject.ID = id
return apiObject
}
func moveToUnderscore(obj *unstructured.Unstructured) *unstructured.Unstructured {
if obj == nil {
return nil
}
for k := range types.ReservedFields {
v, ok := obj.Object[k]
if ok {
delete(obj.Object, k)
obj.Object["_"+k] = v
}
}
return obj
}
func toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, event watch.Event) types.APIEvent {
name := types.ChangeAPIEvent
switch event.Type {
case watch.Deleted:
name = types.RemoveAPIEvent
case watch.Added:
name = types.CreateAPIEvent
case watch.Error:
name = "resource.error"
}
apiEvent := types.APIEvent{
Name: name,
}
if event.Type == watch.Error {
status, _ := event.Object.(*metav1.Status)
apiEvent.Error = fmt.Errorf(status.Message)
return apiEvent
}
apiEvent.Object = toAPI(schema, event.Object)
m, err := meta.Accessor(event.Object)
if err != nil {
return apiEvent
}
apiEvent.Revision = m.GetResourceVersion()
return apiEvent
} }

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +1,4 @@
// Package proxy implements the proxy store, which is responsible for interfacing directly with Kubernetes.
package proxy package proxy
import ( import (
@ -8,7 +9,6 @@ import (
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"os" "os"
"reflect"
"regexp" "regexp"
"strconv" "strconv"
@ -65,7 +65,7 @@ type RelationshipNotifier interface {
OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship
} }
// Store implements types.Store directly on top of kubernetes. // Store implements partition.UnstructuredStore directly on top of kubernetes.
type Store struct { type Store struct {
clientGetter ClientGetter clientGetter ClientGetter
notifier RelationshipNotifier notifier RelationshipNotifier
@ -75,58 +75,29 @@ type Store struct {
func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store { func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store {
return &errorStore{ return &errorStore{
Store: &WatchRefresh{ Store: &WatchRefresh{
Store: &partition.Store{ Store: partition.NewStore(
Partitioner: &rbacPartitioner{ &rbacPartitioner{
proxyStore: &Store{ proxyStore: &Store{
clientGetter: clientGetter, clientGetter: clientGetter,
notifier: notifier, notifier: notifier,
}, },
}, },
}, lookup,
),
asl: lookup, asl: lookup,
}, },
} }
} }
// ByID looks up a single object by its ID. // ByID looks up a single object by its ID.
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
result, err := s.byID(apiOp, schema, apiOp.Namespace, id) return s.byID(apiOp, schema, apiOp.Namespace, id)
return toAPI(schema, result), err
} }
func decodeParams(apiOp *types.APIRequest, target runtime.Object) error { func decodeParams(apiOp *types.APIRequest, target runtime.Object) error {
return paramCodec.DecodeParameters(apiOp.Request.URL.Query(), metav1.SchemeGroupVersion, target) return paramCodec.DecodeParameters(apiOp.Request.URL.Query(), metav1.SchemeGroupVersion, target)
} }
func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
if obj == nil || reflect.ValueOf(obj).IsNil() {
return types.APIObject{}
}
if unstr, ok := obj.(*unstructured.Unstructured); ok {
obj = moveToUnderscore(unstr)
}
apiObject := types.APIObject{
Type: schema.ID,
Object: obj,
}
m, err := meta.Accessor(obj)
if err != nil {
return apiObject
}
id := m.GetName()
ns := m.GetNamespace()
if ns != "" {
id = fmt.Sprintf("%s/%s", ns, id)
}
apiObject.ID = id
return apiObject
}
func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, error) { func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, error) {
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace)) k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace))
if err != nil { if err != nil {
@ -158,22 +129,6 @@ func moveFromUnderscore(obj map[string]interface{}) map[string]interface{} {
return obj return obj
} }
func moveToUnderscore(obj *unstructured.Unstructured) *unstructured.Unstructured {
if obj == nil {
return nil
}
for k := range types.ReservedFields {
v, ok := obj.Object[k]
if ok {
delete(obj.Object, k)
obj.Object["_"+k] = v
}
}
return obj
}
func rowToObject(obj *unstructured.Unstructured) { func rowToObject(obj *unstructured.Unstructured) {
if obj == nil { if obj == nil {
return return
@ -230,77 +185,70 @@ func tableToObjects(obj map[string]interface{}) []unstructured.Unstructured {
// to list *all* resources. // to list *all* resources.
// With this filter, the request can be performed successfully, and only the allowed resources will // With this filter, the request can be performed successfully, and only the allowed resources will
// be returned in the list. // be returned in the list.
func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (types.APIObjectList, error) { func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (*unstructured.UnstructuredList, error) {
if apiOp.Namespace == "*" { if apiOp.Namespace == "*" {
// This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat // This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat
// this as an invalid situation instead of listing all objects in the cluster and filtering by name. // this as an invalid situation instead of listing all objects in the cluster and filtering by name.
return types.APIObjectList{}, nil return nil, nil
} }
adminClient, err := s.clientGetter.TableAdminClient(apiOp, schema, apiOp.Namespace) adminClient, err := s.clientGetter.TableAdminClient(apiOp, schema, apiOp.Namespace)
if err != nil { if err != nil {
return types.APIObjectList{}, err return nil, err
} }
objs, err := s.list(apiOp, schema, adminClient) objs, err := s.list(apiOp, schema, adminClient)
if err != nil { if err != nil {
return types.APIObjectList{}, err return nil, err
} }
var filtered []types.APIObject var filtered []unstructured.Unstructured
for _, obj := range objs.Objects { for _, obj := range objs.Items {
if names.Has(obj.Name()) { if names.Has(obj.GetName()) {
filtered = append(filtered, obj) filtered = append(filtered, obj)
} }
} }
objs.Objects = filtered objs.Items = filtered
return objs, nil return objs, nil
} }
// List returns a list of resources. // List returns an unstructured list of resources.
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace) client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)
if err != nil { if err != nil {
return types.APIObjectList{}, err return nil, err
} }
return s.list(apiOp, schema, client) return s.list(apiOp, schema, client)
} }
func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dynamic.ResourceInterface) (types.APIObjectList, error) { func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dynamic.ResourceInterface) (*unstructured.UnstructuredList, error) {
opts := metav1.ListOptions{} opts := metav1.ListOptions{}
if err := decodeParams(apiOp, &opts); err != nil { if err := decodeParams(apiOp, &opts); err != nil {
return types.APIObjectList{}, nil return nil, nil
} }
k8sClient, _ := metricsStore.Wrap(client, nil) k8sClient, _ := metricsStore.Wrap(client, nil)
resultList, err := k8sClient.List(apiOp, opts) resultList, err := k8sClient.List(apiOp, opts)
if err != nil { if err != nil {
return types.APIObjectList{}, err return nil, err
} }
tableToList(resultList) tableToList(resultList)
result := types.APIObjectList{ return resultList, nil
Revision: resultList.GetResourceVersion(),
Continue: resultList.GetContinue(),
}
for i := range resultList.Items {
result.Objects = append(result.Objects, toAPI(schema, &resultList.Items[i]))
}
return result, nil
} }
func returnErr(err error, c chan types.APIEvent) { func returnErr(err error, c chan watch.Event) {
c <- types.APIEvent{ c <- watch.Event{
Name: "resource.error", Type: "resource.error",
Error: err, Object: &metav1.Status{
Message: err.Error(),
},
} }
} }
func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan types.APIEvent) { func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan watch.Event) {
rev := w.Revision rev := w.Revision
if rev == "-1" || rev == "0" { if rev == "-1" || rev == "0" {
rev = "" rev = ""
@ -342,7 +290,8 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) { for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) {
obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name) obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name)
if err == nil { if err == nil {
result <- s.toAPIEvent(apiOp, schema, watch.Modified, obj) rowToObject(obj)
result <- watch.Event{Type: watch.Modified, Object: obj}
} else { } else {
logrus.Debugf("notifier watch error: %v", err) logrus.Debugf("notifier watch error: %v", err)
returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result) returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result)
@ -363,7 +312,10 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
} }
continue continue
} }
result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object) if unstr, ok := event.Object.(*unstructured.Unstructured); ok {
rowToObject(unstr)
}
result <- event
} }
return fmt.Errorf("closed") return fmt.Errorf("closed")
}) })
@ -378,7 +330,7 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
// to list *all* resources. // to list *all* resources.
// With this filter, the request can be performed successfully, and only the allowed resources will // With this filter, the request can be performed successfully, and only the allowed resources will
// be returned in watch. // be returned in watch.
func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan types.APIEvent, error) { func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan watch.Event, error) {
adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace) adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace)
if err != nil { if err != nil {
return nil, err return nil, err
@ -388,11 +340,16 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t
return nil, err return nil, err
} }
result := make(chan types.APIEvent) result := make(chan watch.Event)
go func() { go func() {
defer close(result) defer close(result)
for item := range c { for item := range c {
if item.Error == nil && names.Has(item.Object.Name()) {
m, err := meta.Accessor(item.Object)
if err != nil {
return
}
if item.Type != watch.Error && names.Has(m.GetName()) {
result <- item result <- item
} }
} }
@ -402,7 +359,7 @@ func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w t
} }
// Watch returns a channel of events for a list or resource. // Watch returns a channel of events for a list or resource.
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) {
client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace) client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace)
if err != nil { if err != nil {
return nil, err return nil, err
@ -410,8 +367,8 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
return s.watch(apiOp, schema, w, client) return s.watch(apiOp, schema, w, client)
} }
func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, client dynamic.ResourceInterface) (chan types.APIEvent, error) { func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, client dynamic.ResourceInterface) (chan watch.Event, error) {
result := make(chan types.APIEvent) result := make(chan watch.Event)
go func() { go func() {
s.listAndWatch(apiOp, client, schema, w, result) s.listAndWatch(apiOp, client, schema, w, result)
logrus.Debugf("closing watcher for %s", schema.ID) logrus.Debugf("closing watcher for %s", schema.ID)
@ -420,35 +377,8 @@ func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
return result, nil return result, nil
} }
func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et watch.EventType, obj runtime.Object) types.APIEvent {
name := types.ChangeAPIEvent
switch et {
case watch.Deleted:
name = types.RemoveAPIEvent
case watch.Added:
name = types.CreateAPIEvent
}
if unstr, ok := obj.(*unstructured.Unstructured); ok {
rowToObject(unstr)
}
event := types.APIEvent{
Name: name,
Object: toAPI(schema, obj),
}
m, err := meta.Accessor(obj)
if err != nil {
return event
}
event.Revision = m.GetResourceVersion()
return event
}
// Create creates a single object in the store. // Create creates a single object in the store.
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (types.APIObject, error) { func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (*unstructured.Unstructured, error) {
var ( var (
resp *unstructured.Unstructured resp *unstructured.Unstructured
) )
@ -474,22 +404,21 @@ func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns)) k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
if err != nil { if err != nil {
return types.APIObject{}, err return nil, err
} }
opts := metav1.CreateOptions{} opts := metav1.CreateOptions{}
if err := decodeParams(apiOp, &opts); err != nil { if err := decodeParams(apiOp, &opts); err != nil {
return types.APIObject{}, err return nil, err
} }
resp, err = k8sClient.Create(apiOp, &unstructured.Unstructured{Object: input}, opts) resp, err = k8sClient.Create(apiOp, &unstructured.Unstructured{Object: input}, opts)
rowToObject(resp) rowToObject(resp)
apiObject := toAPI(schema, resp) return resp, err
return apiObject, err
} }
// Update updates a single object in the store. // Update updates a single object in the store.
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (types.APIObject, error) { func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (*unstructured.Unstructured, error) {
var ( var (
err error err error
input = params.Data() input = params.Data()
@ -498,13 +427,13 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
ns := types.Namespace(input) ns := types.Namespace(input)
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns)) k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
if err != nil { if err != nil {
return types.APIObject{}, err return nil, err
} }
if apiOp.Method == http.MethodPatch { if apiOp.Method == http.MethodPatch {
bytes, err := ioutil.ReadAll(io.LimitReader(apiOp.Request.Body, 2<<20)) bytes, err := ioutil.ReadAll(io.LimitReader(apiOp.Request.Body, 2<<20))
if err != nil { if err != nil {
return types.APIObject{}, err return nil, err
} }
pType := apitypes.StrategicMergePatchType pType := apitypes.StrategicMergePatchType
@ -514,70 +443,70 @@ func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params
opts := metav1.PatchOptions{} opts := metav1.PatchOptions{}
if err := decodeParams(apiOp, &opts); err != nil { if err := decodeParams(apiOp, &opts); err != nil {
return types.APIObject{}, err return nil, err
} }
if pType == apitypes.StrategicMergePatchType { if pType == apitypes.StrategicMergePatchType {
data := map[string]interface{}{} data := map[string]interface{}{}
if err := json.Unmarshal(bytes, &data); err != nil { if err := json.Unmarshal(bytes, &data); err != nil {
return types.APIObject{}, err return nil, err
} }
data = moveFromUnderscore(data) data = moveFromUnderscore(data)
bytes, err = json.Marshal(data) bytes, err = json.Marshal(data)
if err != nil { if err != nil {
return types.APIObject{}, err return nil, err
} }
} }
resp, err := k8sClient.Patch(apiOp, id, pType, bytes, opts) resp, err := k8sClient.Patch(apiOp, id, pType, bytes, opts)
if err != nil { if err != nil {
return types.APIObject{}, err return nil, err
} }
return toAPI(schema, resp), nil return resp, nil
} }
resourceVersion := input.String("metadata", "resourceVersion") resourceVersion := input.String("metadata", "resourceVersion")
if resourceVersion == "" { if resourceVersion == "" {
return types.APIObject{}, fmt.Errorf("metadata.resourceVersion is required for update") return nil, fmt.Errorf("metadata.resourceVersion is required for update")
} }
opts := metav1.UpdateOptions{} opts := metav1.UpdateOptions{}
if err := decodeParams(apiOp, &opts); err != nil { if err := decodeParams(apiOp, &opts); err != nil {
return types.APIObject{}, err return nil, err
} }
resp, err := k8sClient.Update(apiOp, &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{}) resp, err := k8sClient.Update(apiOp, &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{})
if err != nil { if err != nil {
return types.APIObject{}, err return nil, err
} }
rowToObject(resp) rowToObject(resp)
return toAPI(schema, resp), nil return resp, nil
} }
// Delete deletes an object from a store. // Delete deletes an object from a store.
func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (types.APIObject, error) { func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
opts := metav1.DeleteOptions{} opts := metav1.DeleteOptions{}
if err := decodeParams(apiOp, &opts); err != nil { if err := decodeParams(apiOp, &opts); err != nil {
return types.APIObject{}, nil return nil, nil
} }
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)) k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace))
if err != nil { if err != nil {
return types.APIObject{}, err return nil, err
} }
if err := k8sClient.Delete(apiOp, id, opts); err != nil { if err := k8sClient.Delete(apiOp, id, opts); err != nil {
return types.APIObject{}, err return nil, err
} }
obj, err := s.byID(apiOp, schema, apiOp.Namespace, id) obj, err := s.byID(apiOp, schema, apiOp.Namespace, id)
if err != nil { if err != nil {
// ignore lookup error // ignore lookup error
return types.APIObject{}, validation.ErrorCode{ return nil, validation.ErrorCode{
Status: http.StatusNoContent, Status: http.StatusNoContent,
} }
} }
return toAPI(schema, obj), nil return obj, nil
} }

View File

@ -9,7 +9,9 @@ import (
"github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/attributes"
"github.com/rancher/steve/pkg/stores/partition" "github.com/rancher/steve/pkg/stores/partition"
"github.com/rancher/wrangler/pkg/kv" "github.com/rancher/wrangler/pkg/kv"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
) )
var ( var (
@ -85,8 +87,8 @@ func (p *rbacPartitioner) All(apiOp *types.APIRequest, schema *types.APISchema,
} }
} }
// Store returns a proxy Store suited to listing and watching resources by partition. // Store returns an UnstructuredStore suited to listing and watching resources by partition.
func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (types.Store, error) { func (p *rbacPartitioner) Store(apiOp *types.APIRequest, partition partition.Partition) (partition.UnstructuredStore, error) {
return &byNameOrNamespaceStore{ return &byNameOrNamespaceStore{
Store: p.proxyStore, Store: p.proxyStore,
partition: partition.(Partition), partition: partition.(Partition),
@ -99,7 +101,7 @@ type byNameOrNamespaceStore struct {
} }
// List returns a list of resources by partition. // List returns a list of resources by partition.
func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (types.APIObjectList, error) { func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
if b.partition.Passthrough { if b.partition.Passthrough {
return b.Store.List(apiOp, schema) return b.Store.List(apiOp, schema)
} }
@ -112,7 +114,7 @@ func (b *byNameOrNamespaceStore) List(apiOp *types.APIRequest, schema *types.API
} }
// Watch returns a channel of resources by partition. // Watch returns a channel of resources by partition.
func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan types.APIEvent, error) { func (b *byNameOrNamespaceStore) Watch(apiOp *types.APIRequest, schema *types.APISchema, wr types.WatchRequest) (chan watch.Event, error) {
if b.partition.Passthrough { if b.partition.Passthrough {
return b.Store.Watch(apiOp, schema, wr) return b.Store.Watch(apiOp, schema, wr)
} }