diff --git a/pkg/stores/proxy/proxy_store.go b/pkg/stores/proxy/proxy_store.go index b707923..ce9fd99 100644 --- a/pkg/stores/proxy/proxy_store.go +++ b/pkg/stores/proxy/proxy_store.go @@ -7,8 +7,10 @@ import ( "io" "io/ioutil" "net/http" + "os" "reflect" "regexp" + "strconv" "github.com/pkg/errors" "github.com/rancher/apiserver/pkg/types" @@ -32,6 +34,8 @@ import ( "k8s.io/client-go/kubernetes" ) +const watchTimeoutEnv = "CATTLE_WATCH_TIMEOUT_SECONDS" + var ( lowerChars = regexp.MustCompile("[a-z]+") paramScheme = runtime.NewScheme() @@ -291,6 +295,15 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt } timeout := int64(60 * 30) + timeoutSetting := os.Getenv(watchTimeoutEnv) + if timeoutSetting != "" { + userSetTimeout, err := strconv.Atoi(timeoutSetting) + if err != nil { + logrus.Debugf("could not parse %s environment variable, error: %v", watchTimeoutEnv, err) + } else { + timeout = int64(userSetTimeout) + } + } k8sClient, _ := metricsStore.Wrap(client, nil) watcher, err := k8sClient.Watch(apiOp, metav1.ListOptions{ Watch: true, @@ -318,6 +331,9 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name) if err == nil { result <- s.toAPIEvent(apiOp, schema, watch.Modified, obj) + } else { + logrus.Debugf("notifier watch error: %v", err) + returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result) } } return fmt.Errorf("closed") @@ -327,6 +343,12 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt eg.Go(func() error { for event := range watcher.ResultChan() { if event.Type == watch.Error { + if status, ok := event.Object.(*metav1.Status); ok { + logrus.Debugf("event watch error: %s", status.Message) + returnErr(fmt.Errorf("event watch error: %s", status.Message), result) + } else { + logrus.Debugf("event watch error: could not decode event object %T", event.Object) + } continue } result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object)