mirror of
https://github.com/rancher/steve.git
synced 2025-09-19 01:29:48 +00:00
Filtering and sorting needs to operate on unstructured data. It also needs to operate after the parallel partitioner, higher in the store stack. This means that the proxy Store needs to return raw, unstructured data up to the partitioner. This change moves all conversions from unstructured Kubernetes types to apiserver objects up from the proxy store into the partitioner.
512 lines
14 KiB
Go
512 lines
14 KiB
Go
// Package proxy implements the proxy store, which is responsible for interfacing directly with Kubernetes.
|
|
package proxy
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os"
|
|
"regexp"
|
|
"strconv"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/rancher/apiserver/pkg/types"
|
|
"github.com/rancher/steve/pkg/accesscontrol"
|
|
"github.com/rancher/steve/pkg/attributes"
|
|
metricsStore "github.com/rancher/steve/pkg/stores/metrics"
|
|
"github.com/rancher/steve/pkg/stores/partition"
|
|
"github.com/rancher/wrangler/pkg/data"
|
|
"github.com/rancher/wrangler/pkg/schemas/validation"
|
|
"github.com/rancher/wrangler/pkg/summary"
|
|
"github.com/sirupsen/logrus"
|
|
"golang.org/x/sync/errgroup"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
apitypes "k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/kubernetes"
|
|
)
|
|
|
|
const watchTimeoutEnv = "CATTLE_WATCH_TIMEOUT_SECONDS"
|
|
|
|
var (
|
|
lowerChars = regexp.MustCompile("[a-z]+")
|
|
paramScheme = runtime.NewScheme()
|
|
paramCodec = runtime.NewParameterCodec(paramScheme)
|
|
)
|
|
|
|
func init() {
|
|
metav1.AddToGroupVersion(paramScheme, metav1.SchemeGroupVersion)
|
|
}
|
|
|
|
// ClientGetter is a dynamic kubernetes client factory.
|
|
type ClientGetter interface {
|
|
IsImpersonating() bool
|
|
K8sInterface(ctx *types.APIRequest) (kubernetes.Interface, error)
|
|
AdminK8sInterface() (kubernetes.Interface, error)
|
|
Client(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
|
|
DynamicClient(ctx *types.APIRequest) (dynamic.Interface, error)
|
|
AdminClient(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
|
|
TableClient(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
|
|
TableAdminClient(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
|
|
TableClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
|
|
TableAdminClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
|
|
}
|
|
|
|
// RelationshipNotifier is an interface for handling wrangler summary.Relationship events.
|
|
type RelationshipNotifier interface {
|
|
OnInboundRelationshipChange(ctx context.Context, schema *types.APISchema, namespace string) <-chan *summary.Relationship
|
|
}
|
|
|
|
// Store implements partition.UnstructuredStore directly on top of kubernetes.
|
|
type Store struct {
|
|
clientGetter ClientGetter
|
|
notifier RelationshipNotifier
|
|
}
|
|
|
|
// NewProxyStore returns a wrapped types.Store.
|
|
func NewProxyStore(clientGetter ClientGetter, notifier RelationshipNotifier, lookup accesscontrol.AccessSetLookup) types.Store {
|
|
return &errorStore{
|
|
Store: &WatchRefresh{
|
|
Store: &partition.Store{
|
|
Partitioner: &rbacPartitioner{
|
|
proxyStore: &Store{
|
|
clientGetter: clientGetter,
|
|
notifier: notifier,
|
|
},
|
|
},
|
|
},
|
|
asl: lookup,
|
|
},
|
|
}
|
|
}
|
|
|
|
// ByID looks up a single object by its ID.
|
|
func (s *Store) ByID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
|
|
return s.byID(apiOp, schema, apiOp.Namespace, id)
|
|
}
|
|
|
|
func decodeParams(apiOp *types.APIRequest, target runtime.Object) error {
|
|
return paramCodec.DecodeParameters(apiOp.Request.URL.Query(), metav1.SchemeGroupVersion, target)
|
|
}
|
|
|
|
func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, namespace, id string) (*unstructured.Unstructured, error) {
|
|
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, namespace))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
opts := metav1.GetOptions{}
|
|
if err := decodeParams(apiOp, &opts); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
obj, err := k8sClient.Get(apiOp, id, opts)
|
|
rowToObject(obj)
|
|
return obj, err
|
|
}
|
|
|
|
func moveFromUnderscore(obj map[string]interface{}) map[string]interface{} {
|
|
if obj == nil {
|
|
return nil
|
|
}
|
|
for k := range types.ReservedFields {
|
|
v, ok := obj["_"+k]
|
|
delete(obj, "_"+k)
|
|
delete(obj, k)
|
|
if ok {
|
|
obj[k] = v
|
|
}
|
|
}
|
|
return obj
|
|
}
|
|
|
|
func rowToObject(obj *unstructured.Unstructured) {
|
|
if obj == nil {
|
|
return
|
|
}
|
|
if obj.Object["kind"] != "Table" ||
|
|
(obj.Object["apiVersion"] != "meta.k8s.io/v1" &&
|
|
obj.Object["apiVersion"] != "meta.k8s.io/v1beta1") {
|
|
return
|
|
}
|
|
|
|
items := tableToObjects(obj.Object)
|
|
if len(items) == 1 {
|
|
obj.Object = items[0].Object
|
|
}
|
|
}
|
|
|
|
func tableToList(obj *unstructured.UnstructuredList) {
|
|
if obj.Object["kind"] != "Table" ||
|
|
(obj.Object["apiVersion"] != "meta.k8s.io/v1" &&
|
|
obj.Object["apiVersion"] != "meta.k8s.io/v1beta1") {
|
|
return
|
|
}
|
|
|
|
obj.Items = tableToObjects(obj.Object)
|
|
}
|
|
|
|
func tableToObjects(obj map[string]interface{}) []unstructured.Unstructured {
|
|
var result []unstructured.Unstructured
|
|
|
|
rows, _ := obj["rows"].([]interface{})
|
|
for _, row := range rows {
|
|
m, ok := row.(map[string]interface{})
|
|
if !ok {
|
|
continue
|
|
}
|
|
cells := m["cells"]
|
|
object, ok := m["object"].(map[string]interface{})
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
data.PutValue(object, cells, "metadata", "fields")
|
|
result = append(result, unstructured.Unstructured{
|
|
Object: object,
|
|
})
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// ByNames filters a list of objects by an allowed set of names.
|
|
// In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names,
|
|
// performing the list or watch will result in a Forbidden error, because the user does not have permission
|
|
// to list *all* resources.
|
|
// With this filter, the request can be performed successfully, and only the allowed resources will
|
|
// be returned in the list.
|
|
func (s *Store) ByNames(apiOp *types.APIRequest, schema *types.APISchema, names sets.String) (*unstructured.UnstructuredList, error) {
|
|
if apiOp.Namespace == "*" {
|
|
// This happens when you grant namespaced objects with "get" by name in a clusterrolebinding. We will treat
|
|
// this as an invalid situation instead of listing all objects in the cluster and filtering by name.
|
|
return nil, nil
|
|
}
|
|
|
|
adminClient, err := s.clientGetter.TableAdminClient(apiOp, schema, apiOp.Namespace)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
objs, err := s.list(apiOp, schema, adminClient)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var filtered []unstructured.Unstructured
|
|
for _, obj := range objs.Items {
|
|
if names.Has(obj.GetName()) {
|
|
filtered = append(filtered, obj)
|
|
}
|
|
}
|
|
|
|
objs.Items = filtered
|
|
return objs, nil
|
|
}
|
|
|
|
// List returns an unstructured list of resources.
|
|
func (s *Store) List(apiOp *types.APIRequest, schema *types.APISchema) (*unstructured.UnstructuredList, error) {
|
|
client, err := s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.list(apiOp, schema, client)
|
|
}
|
|
|
|
func (s *Store) list(apiOp *types.APIRequest, schema *types.APISchema, client dynamic.ResourceInterface) (*unstructured.UnstructuredList, error) {
|
|
opts := metav1.ListOptions{}
|
|
if err := decodeParams(apiOp, &opts); err != nil {
|
|
return nil, nil
|
|
}
|
|
|
|
k8sClient, _ := metricsStore.Wrap(client, nil)
|
|
resultList, err := k8sClient.List(apiOp, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
tableToList(resultList)
|
|
|
|
return resultList, nil
|
|
}
|
|
|
|
func returnErr(err error, c chan watch.Event) {
|
|
c <- watch.Event{
|
|
Type: "resource.error",
|
|
Object: &metav1.Status{
|
|
Message: err.Error(),
|
|
},
|
|
}
|
|
}
|
|
|
|
func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInterface, schema *types.APISchema, w types.WatchRequest, result chan watch.Event) {
|
|
rev := w.Revision
|
|
if rev == "-1" || rev == "0" {
|
|
rev = ""
|
|
}
|
|
|
|
timeout := int64(60 * 30)
|
|
timeoutSetting := os.Getenv(watchTimeoutEnv)
|
|
if timeoutSetting != "" {
|
|
userSetTimeout, err := strconv.Atoi(timeoutSetting)
|
|
if err != nil {
|
|
logrus.Debugf("could not parse %s environment variable, error: %v", watchTimeoutEnv, err)
|
|
} else {
|
|
timeout = int64(userSetTimeout)
|
|
}
|
|
}
|
|
k8sClient, _ := metricsStore.Wrap(client, nil)
|
|
watcher, err := k8sClient.Watch(apiOp, metav1.ListOptions{
|
|
Watch: true,
|
|
TimeoutSeconds: &timeout,
|
|
ResourceVersion: rev,
|
|
LabelSelector: w.Selector,
|
|
})
|
|
if err != nil {
|
|
returnErr(errors.Wrapf(err, "stopping watch for %s: %v", schema.ID, err), result)
|
|
return
|
|
}
|
|
defer watcher.Stop()
|
|
logrus.Debugf("opening watcher for %s", schema.ID)
|
|
|
|
eg, ctx := errgroup.WithContext(apiOp.Context())
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
watcher.Stop()
|
|
}()
|
|
|
|
if s.notifier != nil {
|
|
eg.Go(func() error {
|
|
for rel := range s.notifier.OnInboundRelationshipChange(ctx, schema, apiOp.Namespace) {
|
|
obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name)
|
|
if err == nil {
|
|
rowToObject(obj)
|
|
result <- watch.Event{Type: watch.Modified, Object: obj}
|
|
} else {
|
|
logrus.Debugf("notifier watch error: %v", err)
|
|
returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result)
|
|
}
|
|
}
|
|
return fmt.Errorf("closed")
|
|
})
|
|
}
|
|
|
|
eg.Go(func() error {
|
|
for event := range watcher.ResultChan() {
|
|
if event.Type == watch.Error {
|
|
if status, ok := event.Object.(*metav1.Status); ok {
|
|
logrus.Debugf("event watch error: %s", status.Message)
|
|
returnErr(fmt.Errorf("event watch error: %s", status.Message), result)
|
|
} else {
|
|
logrus.Debugf("event watch error: could not decode event object %T", event.Object)
|
|
}
|
|
continue
|
|
}
|
|
if unstr, ok := event.Object.(*unstructured.Unstructured); ok {
|
|
rowToObject(unstr)
|
|
}
|
|
result <- event
|
|
}
|
|
return fmt.Errorf("closed")
|
|
})
|
|
|
|
_ = eg.Wait()
|
|
return
|
|
}
|
|
|
|
// WatchNames returns a channel of events filtered by an allowed set of names.
|
|
// In plain kubernetes, if a user has permission to 'list' or 'watch' a defined set of resource names,
|
|
// performing the list or watch will result in a Forbidden error, because the user does not have permission
|
|
// to list *all* resources.
|
|
// With this filter, the request can be performed successfully, and only the allowed resources will
|
|
// be returned in watch.
|
|
func (s *Store) WatchNames(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, names sets.String) (chan watch.Event, error) {
|
|
adminClient, err := s.clientGetter.TableAdminClientForWatch(apiOp, schema, apiOp.Namespace)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c, err := s.watch(apiOp, schema, w, adminClient)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
result := make(chan watch.Event)
|
|
go func() {
|
|
defer close(result)
|
|
for item := range c {
|
|
|
|
m, err := meta.Accessor(item.Object)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if item.Type != watch.Error && names.Has(m.GetName()) {
|
|
result <- item
|
|
}
|
|
}
|
|
}()
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// Watch returns a channel of events for a list or resource.
|
|
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan watch.Event, error) {
|
|
client, err := s.clientGetter.TableClientForWatch(apiOp, schema, apiOp.Namespace)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.watch(apiOp, schema, w, client)
|
|
}
|
|
|
|
func (s *Store) watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest, client dynamic.ResourceInterface) (chan watch.Event, error) {
|
|
result := make(chan watch.Event)
|
|
go func() {
|
|
s.listAndWatch(apiOp, client, schema, w, result)
|
|
logrus.Debugf("closing watcher for %s", schema.ID)
|
|
close(result)
|
|
}()
|
|
return result, nil
|
|
}
|
|
|
|
// Create creates a single object in the store.
|
|
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (*unstructured.Unstructured, error) {
|
|
var (
|
|
resp *unstructured.Unstructured
|
|
)
|
|
|
|
input := params.Data()
|
|
|
|
if input == nil {
|
|
input = data.Object{}
|
|
}
|
|
|
|
name := types.Name(input)
|
|
ns := types.Namespace(input)
|
|
if name == "" && input.String("metadata", "generateName") == "" {
|
|
input.SetNested(schema.ID[0:1]+"-", "metadata", "generatedName")
|
|
}
|
|
if ns == "" && apiOp.Namespace != "" {
|
|
ns = apiOp.Namespace
|
|
input.SetNested(ns, "metadata", "namespace")
|
|
}
|
|
|
|
gvk := attributes.GVK(schema)
|
|
input["apiVersion"], input["kind"] = gvk.ToAPIVersionAndKind()
|
|
|
|
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
opts := metav1.CreateOptions{}
|
|
if err := decodeParams(apiOp, &opts); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err = k8sClient.Create(apiOp, &unstructured.Unstructured{Object: input}, opts)
|
|
rowToObject(resp)
|
|
return resp, err
|
|
}
|
|
|
|
// Update updates a single object in the store.
|
|
func (s *Store) Update(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject, id string) (*unstructured.Unstructured, error) {
|
|
var (
|
|
err error
|
|
input = params.Data()
|
|
)
|
|
|
|
ns := types.Namespace(input)
|
|
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, ns))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if apiOp.Method == http.MethodPatch {
|
|
bytes, err := ioutil.ReadAll(io.LimitReader(apiOp.Request.Body, 2<<20))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
pType := apitypes.StrategicMergePatchType
|
|
if apiOp.Request.Header.Get("content-type") == string(apitypes.JSONPatchType) {
|
|
pType = apitypes.JSONPatchType
|
|
}
|
|
|
|
opts := metav1.PatchOptions{}
|
|
if err := decodeParams(apiOp, &opts); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if pType == apitypes.StrategicMergePatchType {
|
|
data := map[string]interface{}{}
|
|
if err := json.Unmarshal(bytes, &data); err != nil {
|
|
return nil, err
|
|
}
|
|
data = moveFromUnderscore(data)
|
|
bytes, err = json.Marshal(data)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
resp, err := k8sClient.Patch(apiOp, id, pType, bytes, opts)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
resourceVersion := input.String("metadata", "resourceVersion")
|
|
if resourceVersion == "" {
|
|
return nil, fmt.Errorf("metadata.resourceVersion is required for update")
|
|
}
|
|
|
|
opts := metav1.UpdateOptions{}
|
|
if err := decodeParams(apiOp, &opts); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := k8sClient.Update(apiOp, &unstructured.Unstructured{Object: moveFromUnderscore(input)}, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rowToObject(resp)
|
|
return resp, nil
|
|
}
|
|
|
|
// Delete deletes an object from a store.
|
|
func (s *Store) Delete(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
|
|
opts := metav1.DeleteOptions{}
|
|
if err := decodeParams(apiOp, &opts); err != nil {
|
|
return nil, nil
|
|
}
|
|
|
|
k8sClient, err := metricsStore.Wrap(s.clientGetter.TableClient(apiOp, schema, apiOp.Namespace))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := k8sClient.Delete(apiOp, id, opts); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
obj, err := s.byID(apiOp, schema, apiOp.Namespace, id)
|
|
if err != nil {
|
|
// ignore lookup error
|
|
return nil, validation.ErrorCode{
|
|
Status: http.StatusNoContent,
|
|
}
|
|
}
|
|
return obj, nil
|
|
}
|