diff --git a/pkg/client/factory.go b/pkg/client/factory.go index d2f6e09..702415d 100644 --- a/pkg/client/factory.go +++ b/pkg/client/factory.go @@ -1,6 +1,8 @@ package client import ( + "time" + "github.com/rancher/steve/pkg/attributes" "github.com/rancher/steve/pkg/schemaserver/types" "k8s.io/client-go/dynamic" @@ -8,8 +10,9 @@ import ( ) type Factory struct { - client dynamic.Interface - Config *rest.Config + client dynamic.Interface + watchClient dynamic.Interface + Config *rest.Config } func NewFactory(cfg *rest.Config) (*Factory, error) { @@ -20,9 +23,17 @@ func NewFactory(cfg *rest.Config) (*Factory, error) { if err != nil { return nil, err } + + newCfg = rest.CopyConfig(cfg) + newCfg.Timeout = 30 * time.Minute + wc, err := dynamic.NewForConfig(newCfg) + if err != nil { + return nil, err + } return &Factory{ - client: c, - Config: newCfg, + client: c, + watchClient: wc, + Config: newCfg, }, nil } @@ -34,3 +45,8 @@ func (p *Factory) Client(ctx *types.APIRequest, s *types.APISchema, namespace st gvr := attributes.GVR(s) return p.client.Resource(gvr).Namespace(namespace), nil } + +func (p *Factory) ClientForWatch(ctx *types.APIRequest, s *types.APISchema, namespace string) (dynamic.ResourceInterface, error) { + gvr := attributes.GVR(s) + return p.watchClient.Resource(gvr).Namespace(namespace), nil +} diff --git a/pkg/server/config.go b/pkg/server/config.go index 732b08e..96181e4 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -3,6 +3,7 @@ package server import ( "context" "net/http" + "time" "github.com/rancher/steve/pkg/auth" "github.com/rancher/steve/pkg/schema" @@ -16,6 +17,7 @@ import ( corev1 "github.com/rancher/wrangler-api/pkg/generated/controllers/core/v1" "github.com/rancher/wrangler-api/pkg/generated/controllers/rbac" rbacv1 "github.com/rancher/wrangler-api/pkg/generated/controllers/rbac/v1" + "github.com/rancher/wrangler/pkg/ratelimit" "github.com/rancher/wrangler/pkg/start" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -37,18 +39,27 @@ type Server struct { } type Controllers struct { - K8s kubernetes.Interface - Core corev1.Interface - RBAC rbacv1.Interface - API apiregistrationv1.Interface - CRD apiextensionsv1beta1.Interface - starters []start.Starter + RestConfig *rest.Config + K8s kubernetes.Interface + Core corev1.Interface + RBAC rbacv1.Interface + API apiregistrationv1.Interface + CRD apiextensionsv1beta1.Interface + starters []start.Starter } func (c *Controllers) Start(ctx context.Context) error { return start.All(ctx, 5, c.starters...) } +func RestConfigDefaults(cfg *rest.Config) *rest.Config { + cfg = rest.CopyConfig(cfg) + cfg.Timeout = 15 * time.Minute + cfg.RateLimiter = ratelimit.None + + return cfg +} + func NewController(cfg *rest.Config) (*Controllers, error) { c := &Controllers{} @@ -84,6 +95,7 @@ func NewController(cfg *rest.Config) (*Controllers, error) { c.RBAC = rbac.Rbac().V1() c.API = api.Apiregistration().V1() c.CRD = crd.Apiextensions().V1beta1() + c.RestConfig = cfg return c, nil } diff --git a/pkg/server/store/proxy/proxy_store.go b/pkg/server/store/proxy/proxy_store.go index 017d6ea..9fb1300 100644 --- a/pkg/server/store/proxy/proxy_store.go +++ b/pkg/server/store/proxy/proxy_store.go @@ -13,6 +13,7 @@ import ( "github.com/rancher/wrangler/pkg/data" "github.com/rancher/wrangler/pkg/schemas/validation" "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -27,6 +28,7 @@ var ( type ClientGetter interface { Client(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) + ClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error) } type Store struct { @@ -50,24 +52,32 @@ func decodeParams(apiOp *types.APIRequest, target runtime.Object) error { return metav1.ParameterCodec.DecodeParameters(apiOp.Request.URL.Query(), metav1.SchemeGroupVersion, target) } -func toAPI(schema *types.APISchema, obj *unstructured.Unstructured) types.APIObject { +func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject { if obj == nil { return types.APIObject{} } gvr := attributes.GVR(schema) - id := obj.GetName() - ns := obj.GetNamespace() + t := fmt.Sprintf("%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource) + apiObject := types.APIObject{ + Type: t, + Object: obj, + } + + m, err := meta.Accessor(obj) + if err != nil { + return apiObject + } + + id := m.GetName() + ns := m.GetNamespace() if ns != "" { id = fmt.Sprintf("%s/%s", ns, id) } - t := fmt.Sprintf("%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource) - return types.APIObject{ - Type: t, - ID: id, - Object: obj, - } + + apiObject.ID = id + return apiObject } func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) { @@ -153,13 +163,15 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, k8sClient dynamic.Resource }() for event := range watcher.ResultChan() { - data := event.Object.(*unstructured.Unstructured) - result <- s.toAPIEvent(apiOp, schema, event.Type, data) + if event.Type == watch.Error { + continue + } + result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object) } } func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) { - k8sClient, err := s.clientGetter.Client(apiOp, schema, apiOp.Namespace) + k8sClient, err := s.clientGetter.ClientForWatch(apiOp, schema, apiOp.Namespace) if err != nil { return nil, err } @@ -173,7 +185,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types. return result, nil } -func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et watch.EventType, obj *unstructured.Unstructured) types.APIEvent { +func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et watch.EventType, obj runtime.Object) types.APIEvent { name := types.ChangeAPIEvent switch et { case watch.Deleted: @@ -182,11 +194,18 @@ func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et name = types.CreateAPIEvent } - return types.APIEvent{ - Name: name, - Revision: obj.GetResourceVersion(), - Object: toAPI(schema, obj), + event := types.APIEvent{ + Name: name, + Object: toAPI(schema, obj), } + + m, err := meta.Accessor(obj) + if err != nil { + return event + } + + event.Revision = m.GetResourceVersion() + return event } func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (types.APIObject, error) {