diff --git a/pkg/subscribe/handler.go b/pkg/subscribe/handler.go index 9d84aa08..0aa3e703 100644 --- a/pkg/subscribe/handler.go +++ b/pkg/subscribe/handler.go @@ -3,7 +3,7 @@ package subscribe import ( "bytes" "context" - + "errors" "time" "github.com/gorilla/websocket" @@ -124,8 +124,8 @@ func handler(apiContext *types.APIContext) error { } } - // Group is already done at this point because of goroutine above, this is just to send the error if needed - return readerGroup.Wait() + // no point in ever returning null because the connection is hijacked and we can't write it + return nil } func writeData(c *websocket.Conn, header string, buf []byte) error { @@ -158,7 +158,7 @@ func streamStore(ctx context.Context, eg *errgroup.Group, apiContext *types.APIC result <- e } - return nil + return errors.New("disconnect") }) } diff --git a/store/proxy/proxy_store.go b/store/proxy/proxy_store.go index 334d5da4..ec44cbae 100644 --- a/store/proxy/proxy_store.go +++ b/store/proxy/proxy_store.go @@ -142,9 +142,12 @@ func (p *Store) List(apiContext *types.APIContext, schema *types.Schema, opt *ty func (p *Store) Watch(apiContext *types.APIContext, schema *types.Schema, opt *types.QueryOptions) (chan map[string]interface{}, error) { namespace := getNamespace(apiContext, opt) + timeout := int64(60 * 60) req := p.common(namespace, p.k8sClient.Get()) req.VersionedParams(&metav1.ListOptions{ - Watch: true, + Watch: true, + TimeoutSeconds: &timeout, + ResourceVersion: "0", }, dynamic.VersionedParameterEncoderWithV1Fallback) body, err := req.Stream() diff --git a/types/convert/convert.go b/types/convert/convert.go index c214ad49..2fb14d3b 100644 --- a/types/convert/convert.go +++ b/types/convert/convert.go @@ -11,6 +11,9 @@ import ( ) func Chan(c <-chan map[string]interface{}, f func(map[string]interface{}) map[string]interface{}) chan map[string]interface{} { + if c == nil { + return nil + } result := make(chan map[string]interface{}) go func() { for data := range c {