diff --git a/cmd/console.go b/cmd/console.go index 16f4bb427..13d56e5d7 100644 --- a/cmd/console.go +++ b/cmd/console.go @@ -42,68 +42,71 @@ func init() { } func runConsole() { - hubUrl := kubernetes.GetHubUrl() - response, err := http.Get(fmt.Sprintf("%s/echo", hubUrl)) - if err != nil || response.StatusCode != 200 { - log.Info().Msg(fmt.Sprintf(utils.Yellow, "Couldn't connect to Hub. Establishing proxy...")) - runProxy(false, true) - } - - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt) - - log.Info().Str("host", config.Config.Tap.Proxy.Host).Str("url", hubUrl).Msg("Connecting to:") - u := url.URL{ - Scheme: "ws", - Host: fmt.Sprintf("%s:%d", config.Config.Tap.Proxy.Host, config.Config.Tap.Proxy.Front.Port), - Path: "/api/scripts/logs", - } - headers := http.Header{} - headers.Set(utils.X_KUBESHARK_CAPTURE_HEADER_KEY, utils.X_KUBESHARK_CAPTURE_HEADER_IGNORE_VALUE) - headers.Set("License-Key", config.Config.License) - - c, _, err := websocket.DefaultDialer.Dial(u.String(), headers) - if err != nil { - log.Error().Err(err).Send() - return - } - defer c.Close() - - done := make(chan struct{}) - - go func() { - defer close(done) - for { - _, message, err := c.ReadMessage() - if err != nil { - log.Error().Err(err).Send() - return - } - - msg := string(message) - if strings.Contains(msg, ":ERROR]") { - msg = fmt.Sprintf(utils.Red, msg) - fmt.Fprintln(os.Stderr, msg) - } else { - fmt.Fprintln(os.Stdout, msg) - } - } - }() - - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { + hubUrl := kubernetes.GetHubUrl() + response, err := http.Get(fmt.Sprintf("%s/echo", hubUrl)) + if err != nil || response.StatusCode != 200 { + log.Info().Msg(fmt.Sprintf(utils.Yellow, "Couldn't connect to Hub. Establishing proxy...")) + runProxy(false, true) + } + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt) + + log.Info().Str("host", config.Config.Tap.Proxy.Host).Str("url", hubUrl).Msg("Connecting to:") + u := url.URL{ + Scheme: "ws", + Host: fmt.Sprintf("%s:%d", config.Config.Tap.Proxy.Host, config.Config.Tap.Proxy.Front.Port), + Path: "/api/scripts/logs", + } + headers := http.Header{} + headers.Set(utils.X_KUBESHARK_CAPTURE_HEADER_KEY, utils.X_KUBESHARK_CAPTURE_HEADER_IGNORE_VALUE) + headers.Set("License-Key", config.Config.License) + + c, _, err := websocket.DefaultDialer.Dial(u.String(), headers) + if err != nil { + log.Error().Err(err).Msg("Websocket dial error, retrying in 5 seconds...") + time.Sleep(5 * time.Second) // Delay before retrying + continue + } + defer c.Close() + + done := make(chan struct{}) + + go func() { + defer close(done) + for { + _, message, err := c.ReadMessage() + if err != nil { + log.Error().Err(err).Msg("Error reading websocket message, reconnecting...") + break // Break to reconnect + } + + msg := string(message) + if strings.Contains(msg, ":ERROR]") { + msg = fmt.Sprintf(utils.Red, msg) + fmt.Fprintln(os.Stderr, msg) + } else { + fmt.Fprintln(os.Stdout, msg) + } + } + }() + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + select { case <-done: - return + log.Warn().Msg(fmt.Sprintf(utils.Yellow, "Connection closed, reconnecting...")) + time.Sleep(5 * time.Second) // Delay before reconnecting + continue // Reconnect after error case <-interrupt: log.Warn().Msg(fmt.Sprintf(utils.Yellow, "Received interrupt, exiting...")) err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) if err != nil { log.Error().Err(err).Send() - return + continue } select {