1
0
mirror of https://github.com/rancher/norman.git synced 2025-07-13 15:15:46 +00:00
norman/store/proxy/proxy_store.go

562 lines
15 KiB
Go
Raw Normal View History

2017-11-11 04:44:02 +00:00
package proxy
import (
2018-04-26 22:01:33 +00:00
"context"
2017-11-28 21:28:25 +00:00
ejson "encoding/json"
2017-12-18 20:56:50 +00:00
"net/http"
2017-12-23 06:16:40 +00:00
"strings"
2018-04-26 22:01:33 +00:00
"sync"
2018-03-23 15:41:33 +00:00
"time"
2017-12-18 20:56:50 +00:00
2018-04-11 22:24:41 +00:00
"github.com/rancher/norman/httperror"
2018-10-10 02:27:02 +00:00
"github.com/rancher/norman/objectclient/dynamic"
2018-04-26 22:01:33 +00:00
"github.com/rancher/norman/pkg/broadcast"
2018-04-02 22:45:10 +00:00
"github.com/rancher/norman/restwatch"
2017-11-11 04:44:02 +00:00
"github.com/rancher/norman/types"
"github.com/rancher/norman/types/convert"
2018-04-23 05:20:38 +00:00
"github.com/rancher/norman/types/convert/merge"
2017-11-29 21:27:02 +00:00
"github.com/rancher/norman/types/values"
2018-02-09 20:31:12 +00:00
"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
2018-02-09 20:31:12 +00:00
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
2018-06-04 23:48:22 +00:00
"k8s.io/apimachinery/pkg/api/errors"
2017-11-11 04:44:02 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2017-11-28 21:28:25 +00:00
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/watch"
2017-11-11 04:44:02 +00:00
"k8s.io/client-go/rest"
2017-11-28 21:28:25 +00:00
restclientwatch "k8s.io/client-go/rest/watch"
2017-11-11 04:44:02 +00:00
)
2017-12-16 20:49:21 +00:00
var (
userAuthHeader = "Impersonate-User"
authHeaders = []string{
userAuthHeader,
2017-12-16 20:49:21 +00:00
"Impersonate-Group",
}
)
2018-02-09 20:31:12 +00:00
type ClientGetter interface {
UnversionedClient(apiContext *types.APIContext, context types.StorageContext) (rest.Interface, error)
APIExtClient(apiContext *types.APIContext, context types.StorageContext) (clientset.Interface, error)
}
type simpleClientGetter struct {
restConfig rest.Config
client rest.Interface
apiExtClient clientset.Interface
}
func NewClientGetterFromConfig(config rest.Config) (ClientGetter, error) {
dynamicConfig := config
if dynamicConfig.NegotiatedSerializer == nil {
2018-10-10 02:27:02 +00:00
dynamicConfig.NegotiatedSerializer = dynamic.NegotiatedSerializer
2018-02-09 20:31:12 +00:00
}
unversionedClient, err := rest.UnversionedRESTClientFor(&dynamicConfig)
if err != nil {
return nil, err
}
apiExtClient, err := clientset.NewForConfig(&dynamicConfig)
if err != nil {
return nil, err
}
return &simpleClientGetter{
restConfig: config,
client: unversionedClient,
apiExtClient: apiExtClient,
}, nil
}
func (s *simpleClientGetter) Config(apiContext *types.APIContext, context types.StorageContext) (rest.Config, error) {
return s.restConfig, nil
}
func (s *simpleClientGetter) UnversionedClient(apiContext *types.APIContext, context types.StorageContext) (rest.Interface, error) {
return s.client, nil
}
func (s *simpleClientGetter) APIExtClient(apiContext *types.APIContext, context types.StorageContext) (clientset.Interface, error) {
return s.apiExtClient, nil
}
2017-11-11 04:44:02 +00:00
type Store struct {
2018-04-26 22:01:33 +00:00
sync.Mutex
2018-02-09 20:31:12 +00:00
clientGetter ClientGetter
storageContext types.StorageContext
2017-11-11 04:44:02 +00:00
prefix []string
group string
version string
kind string
resourcePlural string
2017-12-18 20:56:50 +00:00
authContext map[string]string
2018-04-26 22:01:33 +00:00
close context.Context
broadcasters map[rest.Interface]*broadcast.Broadcaster
2017-11-11 04:44:02 +00:00
}
2018-04-26 22:01:33 +00:00
func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContext types.StorageContext,
prefix []string, group, version, kind, resourcePlural string) types.Store {
return &errorStore{
Store: &Store{
2018-02-09 20:31:12 +00:00
clientGetter: clientGetter,
storageContext: storageContext,
prefix: prefix,
group: group,
version: version,
kind: kind,
resourcePlural: resourcePlural,
authContext: map[string]string{
"apiGroup": group,
"resource": resourcePlural,
},
2018-04-26 22:01:33 +00:00
close: ctx,
broadcasters: map[rest.Interface]*broadcast.Broadcaster{},
},
}
}
2018-06-19 18:25:49 +00:00
func (s *Store) getUser(apiContext *types.APIContext) string {
return apiContext.Request.Header.Get(userAuthHeader)
}
2018-06-19 18:25:49 +00:00
func (s *Store) doAuthed(apiContext *types.APIContext, request *rest.Request) rest.Result {
2018-03-23 15:41:33 +00:00
start := time.Now()
defer func() {
2018-06-19 18:25:49 +00:00
logrus.Debug("GET: ", time.Now().Sub(start), s.resourcePlural)
2018-03-23 15:41:33 +00:00
}()
2017-12-16 20:49:21 +00:00
for _, header := range authHeaders {
2017-12-18 20:56:50 +00:00
request.SetHeader(header, apiContext.Request.Header[http.CanonicalHeaderKey(header)]...)
2017-12-16 20:49:21 +00:00
}
return request.Do()
}
2018-06-19 18:25:49 +00:00
func (s *Store) k8sClient(apiContext *types.APIContext) (rest.Interface, error) {
return s.clientGetter.UnversionedClient(apiContext, s.storageContext)
2018-02-09 20:31:12 +00:00
}
2018-06-19 18:25:49 +00:00
func (s *Store) ByID(apiContext *types.APIContext, schema *types.Schema, id string) (map[string]interface{}, error) {
_, result, err := s.byID(apiContext, schema, id, true)
return result, err
}
func (s *Store) byID(apiContext *types.APIContext, schema *types.Schema, id string, retry bool) (string, map[string]interface{}, error) {
2018-04-11 22:24:41 +00:00
splitted := strings.Split(strings.TrimSpace(id), ":")
validID := false
namespaced := schema.Scope == types.NamespaceScope
if namespaced {
validID = len(splitted) == 2 && len(strings.TrimSpace(splitted[0])) > 0 && len(strings.TrimSpace(splitted[1])) > 0
} else {
validID = len(splitted) == 1 && len(strings.TrimSpace(splitted[0])) > 0
}
if !validID {
return "", nil, httperror.NewAPIError(httperror.NotFound, "failed to find resource by id")
2018-04-11 22:24:41 +00:00
}
2017-11-11 04:44:02 +00:00
namespace, id := splitID(id)
2018-06-19 18:25:49 +00:00
k8sClient, err := s.k8sClient(apiContext)
2018-02-09 20:31:12 +00:00
if err != nil {
return "", nil, err
}
req := s.common(namespace, k8sClient.Get()).Name(id)
if !retry {
return s.singleResult(apiContext, schema, req)
}
2017-11-11 04:44:02 +00:00
var version string
var data map[string]interface{}
for i := 0; i < 3; i++ {
req = s.common(namespace, k8sClient.Get()).Name(id)
version, data, err = s.singleResult(apiContext, schema, req)
if err != nil {
if i < 2 && strings.Contains(err.Error(), "Client.Timeout exceeded") {
logrus.Warnf("Retrying GET. Error: %v", err)
continue
}
return version, data, err
}
return version, data, err
}
return version, data, err
2017-11-11 04:44:02 +00:00
}
2018-06-19 18:25:49 +00:00
func (s *Store) Context() types.StorageContext {
return s.storageContext
2018-02-09 20:31:12 +00:00
}
2018-06-19 18:25:49 +00:00
func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) ([]map[string]interface{}, error) {
var resultList unstructured.UnstructuredList
2017-11-11 04:44:02 +00:00
// if there are no namespaces field in options, a single request is made
if opt == nil || opt.Namespaces == nil {
ns := getNamespace(apiContext, opt)
list, err := s.retryList(ns, apiContext)
if err != nil {
return nil, err
}
resultList = *list
} else {
var (
errGroup errgroup.Group
mux sync.Mutex
)
allNS := opt.Namespaces
for _, ns := range allNS {
nsCopy := ns
errGroup.Go(func() error {
list, err := s.retryList(nsCopy, apiContext)
if err != nil {
return err
}
mux.Lock()
resultList.Items = append(resultList.Items, list.Items...)
mux.Unlock()
return nil
})
}
if err := errGroup.Wait(); err != nil {
return nil, err
}
2017-11-11 04:44:02 +00:00
}
2017-11-21 20:46:30 +00:00
var result []map[string]interface{}
2017-11-11 04:44:02 +00:00
for _, obj := range resultList.Items {
result = append(result, s.fromInternal(apiContext, schema, obj.Object))
2017-11-11 04:44:02 +00:00
}
2018-06-19 18:25:49 +00:00
return apiContext.AccessControl.FilterList(apiContext, schema, result, s.authContext), nil
2017-11-11 04:44:02 +00:00
}
func (s *Store) retryList(namespace string, apiContext *types.APIContext) (*unstructured.UnstructuredList, error) {
2018-11-20 20:16:33 +00:00
var resultList *unstructured.UnstructuredList
k8sClient, err := s.k8sClient(apiContext)
if err != nil {
return nil, err
}
for i := 0; i < 3; i++ {
req := s.common(namespace, k8sClient.Get())
start := time.Now()
resultList = &unstructured.UnstructuredList{}
err = req.Do().Into(resultList)
logrus.Debugf("LIST: %v, %v", time.Now().Sub(start), s.resourcePlural)
if err != nil {
if i < 2 && strings.Contains(err.Error(), "Client.Timeout exceeded") {
logrus.Infof("Error on LIST %v: %v. Attempt: %v. Retrying", s.resourcePlural, err, i+1)
continue
}
return resultList, err
}
return resultList, err
}
return resultList, err
}
2018-06-19 18:25:49 +00:00
func (s *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) {
c, err := s.shareWatch(apiContext, schema, opt)
2018-04-26 22:01:33 +00:00
if err != nil {
return nil, err
}
return convert.Chan(c, func(data map[string]interface{}) map[string]interface{} {
2018-06-19 18:25:49 +00:00
return apiContext.AccessControl.Filter(apiContext, schema, data, s.authContext)
2018-04-26 22:01:33 +00:00
}), nil
}
2018-06-19 18:25:49 +00:00
func (s *Store) realWatch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) {
2017-11-28 21:28:25 +00:00
namespace := getNamespace(apiContext, opt)
2018-06-19 18:25:49 +00:00
k8sClient, err := s.k8sClient(apiContext)
2018-02-09 20:31:12 +00:00
if err != nil {
return nil, err
}
2018-04-02 22:45:10 +00:00
if watchClient, ok := k8sClient.(restwatch.WatchClient); ok {
k8sClient = watchClient.WatchClient()
}
timeout := int64(60 * 30)
2018-06-19 18:25:49 +00:00
req := s.common(namespace, k8sClient.Get())
2017-11-28 21:28:25 +00:00
req.VersionedParams(&metav1.ListOptions{
Watch: true,
TimeoutSeconds: &timeout,
ResourceVersion: "0",
2018-10-10 02:27:02 +00:00
}, metav1.ParameterCodec)
2017-11-28 21:28:25 +00:00
body, err := req.Stream()
if err != nil {
return nil, err
}
framer := json.Framer.NewFrameReader(body)
2018-07-31 23:30:59 +00:00
decoder := streaming.NewDecoder(framer, &unstructuredDecoder{})
2019-10-02 03:24:34 +00:00
watcher := watch.NewStreamWatcher(restclientwatch.NewDecoder(decoder, &unstructuredDecoder{}), &errorReporter{})
2017-11-28 21:28:25 +00:00
watchingContext, cancelWatchingContext := context.WithCancel(apiContext.Request.Context())
2017-11-28 21:28:25 +00:00
go func() {
<-watchingContext.Done()
2018-02-09 20:31:12 +00:00
logrus.Debugf("stopping watcher for %s", schema.ID)
2017-11-28 21:28:25 +00:00
watcher.Stop()
}()
result := make(chan map[string]interface{})
go func() {
for event := range watcher.ResultChan() {
2019-10-02 03:24:34 +00:00
if data, ok := event.Object.(*metav1.Status); ok {
// just logging it, keeping the same behavior as before
logrus.Debugf("watcher status for %s: %s", schema.ID, data.Message)
2019-10-02 03:24:34 +00:00
} else {
data := event.Object.(*unstructured.Unstructured)
s.fromInternal(apiContext, schema, data.Object)
if event.Type == watch.Deleted && data.Object != nil {
data.Object[".removed"] = true
}
result <- data.Object
2017-12-28 15:47:10 +00:00
}
2017-11-28 21:28:25 +00:00
}
2018-02-09 20:31:12 +00:00
logrus.Debugf("closing watcher for %s", schema.ID)
2017-11-28 21:28:25 +00:00
close(result)
cancelWatchingContext()
2017-11-28 21:28:25 +00:00
}()
return result, nil
}
type unstructuredDecoder struct {
}
2019-10-02 03:24:34 +00:00
type errorReporter struct {
}
func (e *errorReporter) AsObject(err error) runtime.Object {
return &metav1.Status{Message: err.Error(), Code: http.StatusInternalServerError, Reason: "ClientWatchDecoding"}
}
2017-11-28 21:28:25 +00:00
func (d *unstructuredDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
if into == nil {
into = &unstructured.Unstructured{}
}
return into, defaults, ejson.Unmarshal(data, &into)
}
2017-12-28 15:47:10 +00:00
func getNamespace(apiContext *types.APIContext, opt *types.QueryOptions) string {
2017-11-28 21:28:25 +00:00
if val, ok := apiContext.SubContext["namespaces"]; ok {
2017-11-11 04:44:02 +00:00
return convert.ToString(val)
}
for _, condition := range opt.Conditions {
mod := condition.ToCondition().Modifier
2018-09-01 06:54:47 +00:00
if condition.Field == "namespaceId" && condition.Value != "" && mod == types.ModifierEQ {
2017-11-21 20:46:30 +00:00
return condition.Value
2017-11-11 04:44:02 +00:00
}
2018-09-01 06:54:47 +00:00
if condition.Field == "namespace" && condition.Value != "" && mod == types.ModifierEQ {
2018-06-19 18:25:49 +00:00
return condition.Value
}
2017-11-11 04:44:02 +00:00
}
return ""
}
2018-06-19 18:25:49 +00:00
func (s *Store) Create(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}) (map[string]interface{}, error) {
2018-06-04 23:48:22 +00:00
if err := s.toInternal(schema.Mapper, data); err != nil {
return nil, err
}
2018-06-19 18:25:49 +00:00
namespace, _ := values.GetValueN(data, "metadata", "namespace").(string)
2017-11-11 04:44:02 +00:00
2018-06-19 18:25:49 +00:00
values.PutValue(data, s.getUser(apiContext), "metadata", "annotations", "field.cattle.io/creatorId")
values.PutValue(data, "norman", "metadata", "labels", "cattle.io/creator")
2017-11-29 21:27:02 +00:00
name, _ := values.GetValueN(data, "metadata", "name").(string)
2017-11-21 20:46:30 +00:00
if name == "" {
2017-11-29 21:27:02 +00:00
generated, _ := values.GetValueN(data, "metadata", "generateName").(string)
2017-11-21 20:46:30 +00:00
if generated == "" {
2020-01-31 20:47:34 +00:00
values.PutValue(data, types.GenerateTypePrefix(schema.ID), "metadata", "generateName")
2017-11-21 20:46:30 +00:00
}
}
2018-06-19 18:25:49 +00:00
k8sClient, err := s.k8sClient(apiContext)
2018-02-09 20:31:12 +00:00
if err != nil {
return nil, err
}
2018-06-19 18:25:49 +00:00
req := s.common(namespace, k8sClient.Post()).
2017-11-11 04:44:02 +00:00
Body(&unstructured.Unstructured{
Object: data,
})
2018-06-19 18:25:49 +00:00
_, result, err := s.singleResult(apiContext, schema, req)
2017-11-29 21:27:02 +00:00
return result, err
2017-11-11 04:44:02 +00:00
}
2018-06-04 23:48:22 +00:00
func (s *Store) toInternal(mapper types.Mapper, data map[string]interface{}) error {
2017-11-11 04:44:02 +00:00
if mapper != nil {
2018-06-04 23:48:22 +00:00
if err := mapper.ToInternal(data); err != nil {
return err
}
2017-11-11 04:44:02 +00:00
}
2018-06-19 18:25:49 +00:00
if s.group == "" {
data["apiVersion"] = s.version
2017-11-11 04:44:02 +00:00
} else {
2018-06-19 18:25:49 +00:00
data["apiVersion"] = s.group + "/" + s.version
2017-11-11 04:44:02 +00:00
}
2018-06-19 18:25:49 +00:00
data["kind"] = s.kind
2018-06-04 23:48:22 +00:00
return nil
2017-11-11 04:44:02 +00:00
}
2018-06-19 18:25:49 +00:00
func (s *Store) Update(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}, id string) (map[string]interface{}, error) {
2018-06-04 23:48:22 +00:00
var (
result map[string]interface{}
err error
)
2018-06-19 18:25:49 +00:00
k8sClient, err := s.k8sClient(apiContext)
2018-02-09 20:31:12 +00:00
if err != nil {
return nil, err
}
2018-01-18 23:40:34 +00:00
namespace, id := splitID(id)
2018-06-04 23:48:22 +00:00
if err := s.toInternal(schema.Mapper, data); err != nil {
return nil, err
2017-12-30 02:14:59 +00:00
}
2018-06-04 23:48:22 +00:00
for i := 0; i < 5; i++ {
req := s.common(namespace, k8sClient.Get()).
Name(id)
2017-11-11 04:44:02 +00:00
2018-06-04 23:48:22 +00:00
resourceVersion, existing, rawErr := s.singleResultRaw(apiContext, schema, req)
if rawErr != nil {
return nil, rawErr
}
2017-12-30 02:14:59 +00:00
2018-06-04 23:48:22 +00:00
existing = merge.APIUpdateMerge(schema.InternalSchema, apiContext.Schemas, existing, data, apiContext.Option("replace") == "true")
values.PutValue(existing, resourceVersion, "metadata", "resourceVersion")
values.PutValue(existing, namespace, "metadata", "namespace")
values.PutValue(existing, id, "metadata", "name")
req = s.common(namespace, k8sClient.Put()).
Body(&unstructured.Unstructured{
Object: existing,
}).
Name(id)
_, result, err = s.singleResult(apiContext, schema, req)
if errors.IsConflict(err) {
continue
}
return result, err
2018-06-04 23:48:22 +00:00
}
2017-11-11 04:44:02 +00:00
2017-11-29 21:27:02 +00:00
return result, err
2017-11-11 04:44:02 +00:00
}
2018-06-19 18:25:49 +00:00
func (s *Store) Delete(apiContext *types.APIContext, schema *types.Schema, id string) (map[string]interface{}, error) {
k8sClient, err := s.k8sClient(apiContext)
2018-02-09 20:31:12 +00:00
if err != nil {
return nil, err
}
2018-04-09 23:33:08 +00:00
namespace, name := splitID(id)
options, err := getDeleteOption(apiContext.Request)
if err != nil {
return nil, err
}
2018-06-19 18:25:49 +00:00
req := s.common(namespace, k8sClient.Delete()).
Body(options).
2018-04-09 23:33:08 +00:00
Name(name)
2017-11-11 04:44:02 +00:00
2018-06-19 18:25:49 +00:00
err = s.doAuthed(apiContext, req).Error()
if err != nil {
return nil, err
}
_, obj, err := s.byID(apiContext, schema, id, false)
if err != nil {
return nil, nil
}
return obj, nil
2017-11-11 04:44:02 +00:00
}
2018-06-19 18:25:49 +00:00
func (s *Store) singleResult(apiContext *types.APIContext, schema *types.Schema, req *rest.Request) (string, map[string]interface{}, error) {
version, data, err := s.singleResultRaw(apiContext, schema, req)
2018-01-18 23:40:34 +00:00
if err != nil {
return "", nil, err
}
s.fromInternal(apiContext, schema, data)
2018-01-18 23:40:34 +00:00
return version, data, nil
}
2018-06-19 18:25:49 +00:00
func (s *Store) singleResultRaw(apiContext *types.APIContext, schema *types.Schema, req *rest.Request) (string, map[string]interface{}, error) {
2017-11-11 04:44:02 +00:00
result := &unstructured.Unstructured{}
2018-06-19 18:25:49 +00:00
err := s.doAuthed(apiContext, req).Into(result)
2017-11-11 04:44:02 +00:00
if err != nil {
2017-11-29 21:27:02 +00:00
return "", nil, err
2017-11-11 04:44:02 +00:00
}
2018-01-18 23:40:34 +00:00
return result.GetResourceVersion(), result.Object, nil
2017-11-11 04:44:02 +00:00
}
func splitID(id string) (string, string) {
namespace := ""
parts := strings.SplitN(id, ":", 2)
if len(parts) == 2 {
namespace = parts[0]
id = parts[1]
}
return namespace, id
}
func getDeleteOption(req *http.Request) (*metav1.DeleteOptions, error) {
options := &metav1.DeleteOptions{}
if err := metav1.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, options); err != nil {
return nil, err
}
prop := metav1.DeletePropagationBackground
if options.PropagationPolicy == nil {
options.PropagationPolicy = &prop
}
return options, nil
}
2018-06-19 18:25:49 +00:00
func (s *Store) common(namespace string, req *rest.Request) *rest.Request {
prefix := append([]string{}, s.prefix...)
if s.group != "" {
prefix = append(prefix, s.group)
2017-11-11 04:44:02 +00:00
}
2018-06-19 18:25:49 +00:00
prefix = append(prefix, s.version)
2017-11-11 04:44:02 +00:00
req.Prefix(prefix...).
2018-06-19 18:25:49 +00:00
Resource(s.resourcePlural)
2017-11-11 04:44:02 +00:00
if namespace != "" {
req.Namespace(namespace)
}
return req
}
func (s *Store) fromInternal(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}) map[string]interface{} {
if apiContext.Option("export") == "true" {
delete(data, "status")
}
2017-11-11 04:44:02 +00:00
if schema.Mapper != nil {
schema.Mapper.FromInternal(data)
}
return data
}