mirror of
https://github.com/rancher/norman.git
synced 2025-08-25 02:18:37 +00:00
Using the watch is ultimately share and should use the store's context. Using the requests context may cause the watch to end too early.
563 lines
16 KiB
Go
563 lines
16 KiB
Go
package proxy
|
|
|
|
import (
|
|
"context"
|
|
ejson "encoding/json"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/rancher/norman/httperror"
|
|
"github.com/rancher/norman/objectclient/dynamic"
|
|
"github.com/rancher/norman/pkg/broadcast"
|
|
"github.com/rancher/norman/restwatch"
|
|
"github.com/rancher/norman/types"
|
|
"github.com/rancher/norman/types/convert"
|
|
"github.com/rancher/norman/types/convert/merge"
|
|
"github.com/rancher/norman/types/values"
|
|
"github.com/sirupsen/logrus"
|
|
"golang.org/x/sync/errgroup"
|
|
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/runtime/serializer/json"
|
|
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/client-go/rest"
|
|
restclientwatch "k8s.io/client-go/rest/watch"
|
|
)
|
|
|
|
var (
|
|
userAuthHeader = "Impersonate-User"
|
|
authHeaders = []string{
|
|
userAuthHeader,
|
|
"Impersonate-Group",
|
|
}
|
|
)
|
|
|
|
type ClientGetter interface {
|
|
UnversionedClient(apiContext *types.APIContext, context types.StorageContext) (rest.Interface, error)
|
|
APIExtClient(apiContext *types.APIContext, context types.StorageContext) (clientset.Interface, error)
|
|
}
|
|
|
|
type simpleClientGetter struct {
|
|
restConfig rest.Config
|
|
client rest.Interface
|
|
apiExtClient clientset.Interface
|
|
}
|
|
|
|
func NewClientGetterFromConfig(config rest.Config) (ClientGetter, error) {
|
|
dynamicConfig := config
|
|
if dynamicConfig.NegotiatedSerializer == nil {
|
|
dynamicConfig.NegotiatedSerializer = dynamic.NegotiatedSerializer
|
|
}
|
|
|
|
unversionedClient, err := rest.UnversionedRESTClientFor(&dynamicConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
apiExtClient, err := clientset.NewForConfig(&dynamicConfig)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &simpleClientGetter{
|
|
restConfig: config,
|
|
client: unversionedClient,
|
|
apiExtClient: apiExtClient,
|
|
}, nil
|
|
}
|
|
|
|
func (s *simpleClientGetter) Config(apiContext *types.APIContext, context types.StorageContext) (rest.Config, error) {
|
|
return s.restConfig, nil
|
|
}
|
|
|
|
func (s *simpleClientGetter) UnversionedClient(apiContext *types.APIContext, context types.StorageContext) (rest.Interface, error) {
|
|
return s.client, nil
|
|
}
|
|
|
|
func (s *simpleClientGetter) APIExtClient(apiContext *types.APIContext, context types.StorageContext) (clientset.Interface, error) {
|
|
return s.apiExtClient, nil
|
|
}
|
|
|
|
type Store struct {
|
|
sync.Mutex
|
|
|
|
clientGetter ClientGetter
|
|
storageContext types.StorageContext
|
|
prefix []string
|
|
group string
|
|
version string
|
|
kind string
|
|
resourcePlural string
|
|
authContext map[string]string
|
|
close context.Context
|
|
broadcasters map[rest.Interface]*broadcast.Broadcaster
|
|
}
|
|
|
|
func NewProxyStore(ctx context.Context, clientGetter ClientGetter, storageContext types.StorageContext,
|
|
prefix []string, group, version, kind, resourcePlural string) types.Store {
|
|
return &errorStore{
|
|
Store: &Store{
|
|
clientGetter: clientGetter,
|
|
storageContext: storageContext,
|
|
prefix: prefix,
|
|
group: group,
|
|
version: version,
|
|
kind: kind,
|
|
resourcePlural: resourcePlural,
|
|
authContext: map[string]string{
|
|
"apiGroup": group,
|
|
"resource": resourcePlural,
|
|
},
|
|
close: ctx,
|
|
broadcasters: map[rest.Interface]*broadcast.Broadcaster{},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (s *Store) getUser(apiContext *types.APIContext) string {
|
|
return apiContext.Request.Header.Get(userAuthHeader)
|
|
}
|
|
|
|
func (s *Store) doAuthed(apiContext *types.APIContext, request *rest.Request) rest.Result {
|
|
start := time.Now()
|
|
defer func() {
|
|
logrus.Tracef("GET: %v, %v", time.Now().Sub(start), s.resourcePlural)
|
|
}()
|
|
|
|
for _, header := range authHeaders {
|
|
request.SetHeader(header, apiContext.Request.Header[http.CanonicalHeaderKey(header)]...)
|
|
}
|
|
return request.Do(apiContext.Request.Context())
|
|
}
|
|
|
|
func (s *Store) k8sClient(apiContext *types.APIContext) (rest.Interface, error) {
|
|
return s.clientGetter.UnversionedClient(apiContext, s.storageContext)
|
|
}
|
|
|
|
func (s *Store) ByID(apiContext *types.APIContext, schema *types.Schema, id string) (map[string]interface{}, error) {
|
|
_, result, err := s.byID(apiContext, schema, id, true)
|
|
return result, err
|
|
}
|
|
|
|
func (s *Store) byID(apiContext *types.APIContext, schema *types.Schema, id string, retry bool) (string, map[string]interface{}, error) {
|
|
splitted := strings.Split(strings.TrimSpace(id), ":")
|
|
validID := false
|
|
namespaced := schema.Scope == types.NamespaceScope
|
|
if namespaced {
|
|
validID = len(splitted) == 2 && len(strings.TrimSpace(splitted[0])) > 0 && len(strings.TrimSpace(splitted[1])) > 0
|
|
} else {
|
|
validID = len(splitted) == 1 && len(strings.TrimSpace(splitted[0])) > 0
|
|
}
|
|
if !validID {
|
|
return "", nil, httperror.NewAPIError(httperror.NotFound, "failed to find resource by id")
|
|
}
|
|
|
|
namespace, id := splitID(id)
|
|
|
|
k8sClient, err := s.k8sClient(apiContext)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
req := s.common(namespace, k8sClient.Get()).Name(id)
|
|
if !retry {
|
|
return s.singleResult(apiContext, schema, req)
|
|
}
|
|
|
|
var version string
|
|
var data map[string]interface{}
|
|
for i := 0; i < 3; i++ {
|
|
req = s.common(namespace, k8sClient.Get()).Name(id)
|
|
version, data, err = s.singleResult(apiContext, schema, req)
|
|
if err != nil {
|
|
if i < 2 && strings.Contains(err.Error(), "Client.Timeout exceeded") {
|
|
logrus.Warnf("Retrying GET. Error: %v", err)
|
|
continue
|
|
}
|
|
return version, data, err
|
|
}
|
|
return version, data, err
|
|
}
|
|
return version, data, err
|
|
}
|
|
|
|
func (s *Store) Context() types.StorageContext {
|
|
return s.storageContext
|
|
}
|
|
|
|
func (s *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) ([]map[string]interface{}, error) {
|
|
var resultList unstructured.UnstructuredList
|
|
|
|
// if there are no namespaces field in options, a single request is made
|
|
if opt == nil || opt.Namespaces == nil {
|
|
ns := getNamespace(apiContext, opt)
|
|
list, err := s.retryList(ns, apiContext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
resultList = *list
|
|
} else {
|
|
var (
|
|
errGroup errgroup.Group
|
|
mux sync.Mutex
|
|
)
|
|
|
|
allNS := opt.Namespaces
|
|
for _, ns := range allNS {
|
|
nsCopy := ns
|
|
errGroup.Go(func() error {
|
|
list, err := s.retryList(nsCopy, apiContext)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
mux.Lock()
|
|
resultList.Items = append(resultList.Items, list.Items...)
|
|
mux.Unlock()
|
|
|
|
return nil
|
|
})
|
|
}
|
|
if err := errGroup.Wait(); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
var result []map[string]interface{}
|
|
|
|
for _, obj := range resultList.Items {
|
|
result = append(result, s.fromInternal(apiContext, schema, obj.Object))
|
|
}
|
|
|
|
return apiContext.AccessControl.FilterList(apiContext, schema, result, s.authContext), nil
|
|
}
|
|
|
|
func (s *Store) retryList(namespace string, apiContext *types.APIContext) (*unstructured.UnstructuredList, error) {
|
|
var resultList *unstructured.UnstructuredList
|
|
k8sClient, err := s.k8sClient(apiContext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i := 0; i < 3; i++ {
|
|
req := s.common(namespace, k8sClient.Get())
|
|
start := time.Now()
|
|
resultList = &unstructured.UnstructuredList{}
|
|
err = req.Do(apiContext.Request.Context()).Into(resultList)
|
|
logrus.Tracef("LIST: %v, %v", time.Now().Sub(start), s.resourcePlural)
|
|
if err != nil {
|
|
if i < 2 && strings.Contains(err.Error(), "Client.Timeout exceeded") {
|
|
logrus.Infof("Error on LIST %v: %v. Attempt: %v. Retrying", s.resourcePlural, err, i+1)
|
|
continue
|
|
}
|
|
return resultList, err
|
|
}
|
|
return resultList, err
|
|
}
|
|
return resultList, err
|
|
}
|
|
|
|
func (s *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) {
|
|
c, err := s.shareWatch(apiContext, schema, opt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return convert.Chan(c, func(data map[string]interface{}) map[string]interface{} {
|
|
apiContext.ExpireAccessControl(schema)
|
|
return apiContext.AccessControl.Filter(apiContext, schema, data, s.authContext)
|
|
}), nil
|
|
}
|
|
|
|
func (s *Store) realWatch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) {
|
|
namespace := getNamespace(apiContext, opt)
|
|
|
|
k8sClient, err := s.k8sClient(apiContext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if watchClient, ok := k8sClient.(restwatch.WatchClient); ok {
|
|
k8sClient = watchClient.WatchClient()
|
|
}
|
|
|
|
timeout := int64(60 * 30)
|
|
req := s.common(namespace, k8sClient.Get())
|
|
req.VersionedParams(&metav1.ListOptions{
|
|
Watch: true,
|
|
TimeoutSeconds: &timeout,
|
|
ResourceVersion: "0",
|
|
}, metav1.ParameterCodec)
|
|
|
|
body, err := req.Stream(s.close)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
framer := json.Framer.NewFrameReader(body)
|
|
decoder := streaming.NewDecoder(framer, &unstructuredDecoder{})
|
|
watcher := watch.NewStreamWatcher(restclientwatch.NewDecoder(decoder, &unstructuredDecoder{}), &errorReporter{})
|
|
|
|
watchingContext, cancelWatchingContext := context.WithCancel(s.close)
|
|
go func() {
|
|
<-watchingContext.Done()
|
|
logrus.Tracef("stopping watcher for %s", schema.ID)
|
|
watcher.Stop()
|
|
}()
|
|
|
|
result := make(chan map[string]interface{})
|
|
go func() {
|
|
for event := range watcher.ResultChan() {
|
|
if data, ok := event.Object.(*metav1.Status); ok {
|
|
// just logging it, keeping the same behavior as before
|
|
logrus.Tracef("watcher status for %s: %s", schema.ID, data.Message)
|
|
} else {
|
|
data := event.Object.(*unstructured.Unstructured)
|
|
s.fromInternal(apiContext, schema, data.Object)
|
|
if event.Type == watch.Deleted && data.Object != nil {
|
|
data.Object[".removed"] = true
|
|
}
|
|
result <- data.Object
|
|
}
|
|
}
|
|
logrus.Tracef("closing watcher for %s", schema.ID)
|
|
close(result)
|
|
cancelWatchingContext()
|
|
}()
|
|
|
|
return result, nil
|
|
}
|
|
|
|
type unstructuredDecoder struct {
|
|
}
|
|
|
|
type errorReporter struct {
|
|
}
|
|
|
|
func (e *errorReporter) AsObject(err error) runtime.Object {
|
|
return &metav1.Status{Message: err.Error(), Code: http.StatusInternalServerError, Reason: "ClientWatchDecoding"}
|
|
}
|
|
|
|
func (d *unstructuredDecoder) Decode(data []byte, defaults *schema.GroupVersionKind, into runtime.Object) (runtime.Object, *schema.GroupVersionKind, error) {
|
|
if into == nil {
|
|
into = &unstructured.Unstructured{}
|
|
}
|
|
return into, defaults, ejson.Unmarshal(data, &into)
|
|
}
|
|
|
|
func getNamespace(apiContext *types.APIContext, opt *types.QueryOptions) string {
|
|
if val, ok := apiContext.SubContext["namespaces"]; ok {
|
|
return convert.ToString(val)
|
|
}
|
|
|
|
for _, condition := range opt.Conditions {
|
|
mod := condition.ToCondition().Modifier
|
|
if condition.Field == "namespaceId" && condition.Value != "" && mod == types.ModifierEQ {
|
|
return condition.Value
|
|
}
|
|
if condition.Field == "namespace" && condition.Value != "" && mod == types.ModifierEQ {
|
|
return condition.Value
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
func (s *Store) Create(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}) (map[string]interface{}, error) {
|
|
if err := s.toInternal(schema.Mapper, data); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
namespace, _ := values.GetValueN(data, "metadata", "namespace").(string)
|
|
|
|
values.PutValue(data, s.getUser(apiContext), "metadata", "annotations", "field.cattle.io/creatorId")
|
|
values.PutValue(data, "norman", "metadata", "labels", "cattle.io/creator")
|
|
|
|
name, _ := values.GetValueN(data, "metadata", "name").(string)
|
|
if name == "" {
|
|
generated, _ := values.GetValueN(data, "metadata", "generateName").(string)
|
|
if generated == "" {
|
|
values.PutValue(data, types.GenerateTypePrefix(schema.ID), "metadata", "generateName")
|
|
}
|
|
}
|
|
|
|
k8sClient, err := s.k8sClient(apiContext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
req := s.common(namespace, k8sClient.Post()).
|
|
Body(&unstructured.Unstructured{
|
|
Object: data,
|
|
})
|
|
|
|
_, result, err := s.singleResult(apiContext, schema, req)
|
|
return result, err
|
|
}
|
|
|
|
func (s *Store) toInternal(mapper types.Mapper, data map[string]interface{}) error {
|
|
if mapper != nil {
|
|
if err := mapper.ToInternal(data); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if s.group == "" {
|
|
data["apiVersion"] = s.version
|
|
} else {
|
|
data["apiVersion"] = s.group + "/" + s.version
|
|
}
|
|
data["kind"] = s.kind
|
|
return nil
|
|
}
|
|
|
|
func (s *Store) Update(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}, id string) (map[string]interface{}, error) {
|
|
var (
|
|
result map[string]interface{}
|
|
err error
|
|
)
|
|
|
|
k8sClient, err := s.k8sClient(apiContext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
namespace, id := splitID(id)
|
|
if err := s.toInternal(schema.Mapper, data); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i := 0; i < 5; i++ {
|
|
req := s.common(namespace, k8sClient.Get()).
|
|
Name(id)
|
|
|
|
resourceVersion, existing, rawErr := s.singleResultRaw(apiContext, schema, req)
|
|
if rawErr != nil {
|
|
return nil, rawErr
|
|
}
|
|
|
|
existing = merge.APIUpdateMerge(schema.InternalSchema, apiContext.Schemas, existing, data, apiContext.Option("replace") == "true")
|
|
|
|
values.PutValue(existing, resourceVersion, "metadata", "resourceVersion")
|
|
values.PutValue(existing, namespace, "metadata", "namespace")
|
|
values.PutValue(existing, id, "metadata", "name")
|
|
|
|
req = s.common(namespace, k8sClient.Put()).
|
|
Body(&unstructured.Unstructured{
|
|
Object: existing,
|
|
}).
|
|
Name(id)
|
|
|
|
_, result, err = s.singleResult(apiContext, schema, req)
|
|
if errors.IsConflict(err) {
|
|
continue
|
|
}
|
|
return result, err
|
|
}
|
|
|
|
return result, err
|
|
}
|
|
|
|
func (s *Store) Delete(apiContext *types.APIContext, schema *types.Schema, id string) (map[string]interface{}, error) {
|
|
k8sClient, err := s.k8sClient(apiContext)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
namespace, name := splitID(id)
|
|
options, err := getDeleteOption(apiContext.Request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req := s.common(namespace, k8sClient.Delete()).
|
|
Body(options).
|
|
Name(name)
|
|
|
|
err = s.doAuthed(apiContext, req).Error()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
_, obj, err := s.byID(apiContext, schema, id, false)
|
|
if err != nil {
|
|
return nil, nil
|
|
}
|
|
return obj, nil
|
|
}
|
|
|
|
func (s *Store) singleResult(apiContext *types.APIContext, schema *types.Schema, req *rest.Request) (string, map[string]interface{}, error) {
|
|
version, data, err := s.singleResultRaw(apiContext, schema, req)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
s.fromInternal(apiContext, schema, data)
|
|
return version, data, nil
|
|
}
|
|
|
|
func (s *Store) singleResultRaw(apiContext *types.APIContext, schema *types.Schema, req *rest.Request) (string, map[string]interface{}, error) {
|
|
result := &unstructured.Unstructured{}
|
|
err := s.doAuthed(apiContext, req).Into(result)
|
|
if err != nil {
|
|
return "", nil, err
|
|
}
|
|
|
|
return result.GetResourceVersion(), result.Object, nil
|
|
}
|
|
|
|
func splitID(id string) (string, string) {
|
|
namespace := ""
|
|
parts := strings.SplitN(id, ":", 2)
|
|
if len(parts) == 2 {
|
|
namespace = parts[0]
|
|
id = parts[1]
|
|
}
|
|
|
|
return namespace, id
|
|
}
|
|
|
|
func getDeleteOption(req *http.Request) (*metav1.DeleteOptions, error) {
|
|
options := &metav1.DeleteOptions{}
|
|
if err := metav1.ParameterCodec.DecodeParameters(req.URL.Query(), metav1.SchemeGroupVersion, options); err != nil {
|
|
return nil, err
|
|
}
|
|
prop := metav1.DeletePropagationBackground
|
|
if options.PropagationPolicy == nil {
|
|
options.PropagationPolicy = &prop
|
|
}
|
|
return options, nil
|
|
}
|
|
|
|
func (s *Store) common(namespace string, req *rest.Request) *rest.Request {
|
|
prefix := append([]string{}, s.prefix...)
|
|
if s.group != "" {
|
|
prefix = append(prefix, s.group)
|
|
}
|
|
prefix = append(prefix, s.version)
|
|
req.Prefix(prefix...).
|
|
Resource(s.resourcePlural)
|
|
|
|
if namespace != "" {
|
|
req.Namespace(namespace)
|
|
}
|
|
|
|
return req
|
|
}
|
|
|
|
func (s *Store) fromInternal(apiContext *types.APIContext, schema *types.Schema, data map[string]interface{}) map[string]interface{} {
|
|
if apiContext.Option("export") == "true" {
|
|
delete(data, "status")
|
|
}
|
|
if schema.Mapper != nil {
|
|
schema.Mapper.FromInternal(data)
|
|
}
|
|
|
|
return data
|
|
}
|