Manager timeouts and ratelimit

This commit is contained in:
Darren Shepherd
2020-02-01 22:13:32 -07:00
parent a66ef65fd9
commit d80ba54e48
3 changed files with 74 additions and 27 deletions

View File

@@ -13,6 +13,7 @@ import (
"github.com/rancher/wrangler/pkg/data"
"github.com/rancher/wrangler/pkg/schemas/validation"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
@@ -27,6 +28,7 @@ var (
type ClientGetter interface {
Client(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
ClientForWatch(ctx *types.APIRequest, schema *types.APISchema, namespace string) (dynamic.ResourceInterface, error)
}
type Store struct {
@@ -50,24 +52,32 @@ func decodeParams(apiOp *types.APIRequest, target runtime.Object) error {
return metav1.ParameterCodec.DecodeParameters(apiOp.Request.URL.Query(), metav1.SchemeGroupVersion, target)
}
func toAPI(schema *types.APISchema, obj *unstructured.Unstructured) types.APIObject {
func toAPI(schema *types.APISchema, obj runtime.Object) types.APIObject {
if obj == nil {
return types.APIObject{}
}
gvr := attributes.GVR(schema)
id := obj.GetName()
ns := obj.GetNamespace()
t := fmt.Sprintf("%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource)
apiObject := types.APIObject{
Type: t,
Object: obj,
}
m, err := meta.Accessor(obj)
if err != nil {
return apiObject
}
id := m.GetName()
ns := m.GetNamespace()
if ns != "" {
id = fmt.Sprintf("%s/%s", ns, id)
}
t := fmt.Sprintf("%s/%s/%s", gvr.Group, gvr.Version, gvr.Resource)
return types.APIObject{
Type: t,
ID: id,
Object: obj,
}
apiObject.ID = id
return apiObject
}
func (s *Store) byID(apiOp *types.APIRequest, schema *types.APISchema, id string) (*unstructured.Unstructured, error) {
@@ -153,13 +163,15 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, k8sClient dynamic.Resource
}()
for event := range watcher.ResultChan() {
data := event.Object.(*unstructured.Unstructured)
result <- s.toAPIEvent(apiOp, schema, event.Type, data)
if event.Type == watch.Error {
continue
}
result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object)
}
}
func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.WatchRequest) (chan types.APIEvent, error) {
k8sClient, err := s.clientGetter.Client(apiOp, schema, apiOp.Namespace)
k8sClient, err := s.clientGetter.ClientForWatch(apiOp, schema, apiOp.Namespace)
if err != nil {
return nil, err
}
@@ -173,7 +185,7 @@ func (s *Store) Watch(apiOp *types.APIRequest, schema *types.APISchema, w types.
return result, nil
}
func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et watch.EventType, obj *unstructured.Unstructured) types.APIEvent {
func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et watch.EventType, obj runtime.Object) types.APIEvent {
name := types.ChangeAPIEvent
switch et {
case watch.Deleted:
@@ -182,11 +194,18 @@ func (s *Store) toAPIEvent(apiOp *types.APIRequest, schema *types.APISchema, et
name = types.CreateAPIEvent
}
return types.APIEvent{
Name: name,
Revision: obj.GetResourceVersion(),
Object: toAPI(schema, obj),
event := types.APIEvent{
Name: name,
Object: toAPI(schema, obj),
}
m, err := meta.Accessor(obj)
if err != nil {
return event
}
event.Revision = m.GetResourceVersion()
return event
}
func (s *Store) Create(apiOp *types.APIRequest, schema *types.APISchema, params types.APIObject) (types.APIObject, error) {