diff --git a/pkg/aggregation/server.go b/pkg/aggregation/server.go index 9d39c3d..b0b3a74 100644 --- a/pkg/aggregation/server.go +++ b/pkg/aggregation/server.go @@ -60,12 +60,24 @@ func ListenAndServe(ctx context.Context, url string, caCert []byte, token string func serve(ctx context.Context, dialer websocket.Dialer, url string, headers http.Header, handler http.Handler) error { url = strings.Replace(url, "http://", "ws://", 1) url = strings.Replace(url, "https://", "wss://", 1) - conn, _, err := dialer.DialContext(ctx, url, headers) + + // ensure we clean up everything on exit + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + dialCtx, dialCancel := context.WithTimeout(ctx, 5*time.Second) + defer dialCancel() + conn, _, err := dialer.DialContext(dialCtx, url, headers) if err != nil { return err } defer conn.Close() + go func() { + <-ctx.Done() + conn.Close() + }() + listener := NewListener("steve") server := http.Server{ Handler: handler, diff --git a/pkg/aggregation/watch.go b/pkg/aggregation/watch.go index 5f42400..263b65d 100644 --- a/pkg/aggregation/watch.go +++ b/pkg/aggregation/watch.go @@ -85,7 +85,7 @@ func (h *handler) shouldRestart(secret *corev1.Secret) (string, []byte, string, if h.url != url || h.token != token || - bytes.Equal(h.caCert, caCert) { + !bytes.Equal(h.caCert, caCert) { return url, caCert, token, true, nil }