mirror of
https://github.com/niusmallnan/steve.git
synced 2025-08-11 10:11:47 +00:00
Merge pull request #49 from cmurphy/watch-logging-head
Make watches debuggable
This commit is contained in:
commit
22b03d364d
@ -7,8 +7,10 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"regexp"
|
"regexp"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"github.com/rancher/apiserver/pkg/types"
|
"github.com/rancher/apiserver/pkg/types"
|
||||||
@ -32,6 +34,8 @@ import (
|
|||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const watchTimeoutEnv = "CATTLE_WATCH_TIMEOUT_SECONDS"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
lowerChars = regexp.MustCompile("[a-z]+")
|
lowerChars = regexp.MustCompile("[a-z]+")
|
||||||
paramScheme = runtime.NewScheme()
|
paramScheme = runtime.NewScheme()
|
||||||
@ -291,6 +295,15 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
|
|||||||
}
|
}
|
||||||
|
|
||||||
timeout := int64(60 * 30)
|
timeout := int64(60 * 30)
|
||||||
|
timeoutSetting := os.Getenv(watchTimeoutEnv)
|
||||||
|
if timeoutSetting != "" {
|
||||||
|
userSetTimeout, err := strconv.Atoi(timeoutSetting)
|
||||||
|
if err != nil {
|
||||||
|
logrus.Debugf("could not parse %s environment variable, error: %v", watchTimeoutEnv, err)
|
||||||
|
} else {
|
||||||
|
timeout = int64(userSetTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
k8sClient, _ := metricsStore.Wrap(client, nil)
|
k8sClient, _ := metricsStore.Wrap(client, nil)
|
||||||
watcher, err := k8sClient.Watch(apiOp, metav1.ListOptions{
|
watcher, err := k8sClient.Watch(apiOp, metav1.ListOptions{
|
||||||
Watch: true,
|
Watch: true,
|
||||||
@ -318,6 +331,9 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
|
|||||||
obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name)
|
obj, err := s.byID(apiOp, schema, rel.Namespace, rel.Name)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
result <- s.toAPIEvent(apiOp, schema, watch.Modified, obj)
|
result <- s.toAPIEvent(apiOp, schema, watch.Modified, obj)
|
||||||
|
} else {
|
||||||
|
logrus.Debugf("notifier watch error: %v", err)
|
||||||
|
returnErr(errors.Wrapf(err, "notifier watch error: %v", err), result)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return fmt.Errorf("closed")
|
return fmt.Errorf("closed")
|
||||||
@ -327,6 +343,12 @@ func (s *Store) listAndWatch(apiOp *types.APIRequest, client dynamic.ResourceInt
|
|||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
for event := range watcher.ResultChan() {
|
for event := range watcher.ResultChan() {
|
||||||
if event.Type == watch.Error {
|
if event.Type == watch.Error {
|
||||||
|
if status, ok := event.Object.(*metav1.Status); ok {
|
||||||
|
logrus.Debugf("event watch error: %s", status.Message)
|
||||||
|
returnErr(fmt.Errorf("event watch error: %s", status.Message), result)
|
||||||
|
} else {
|
||||||
|
logrus.Debugf("event watch error: could not decode event object %T", event.Object)
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object)
|
result <- s.toAPIEvent(apiOp, schema, event.Type, event.Object)
|
||||||
|
Loading…
Reference in New Issue
Block a user