diff --git a/api/main.go b/api/main.go index a02ac2220..7c818de5e 100644 --- a/api/main.go +++ b/api/main.go @@ -47,7 +47,7 @@ func main() { } harOutputChannel := tap.StartPassiveTapper() - socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress) + socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false) if err != nil { panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err)) } diff --git a/cli/mizu/controlSocket.go b/cli/mizu/controlSocket.go index 1fd81933b..163fe9502 100644 --- a/cli/mizu/controlSocket.go +++ b/cli/mizu/controlSocket.go @@ -5,6 +5,7 @@ import ( "github.com/gorilla/websocket" "github.com/up9inc/mizu/shared" core "k8s.io/api/core/v1" + "time" ) type ControlSocket struct { @@ -12,7 +13,7 @@ type ControlSocket struct { } func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) { - connection, err := shared.ConnectToSocketServer(socketServerAddress) + connection, err := shared.ConnectToSocketServer(socketServerAddress, 30, 2 * time.Second, true) if err != nil { return nil, err } else { diff --git a/cli/mizu/mizuRunner.go b/cli/mizu/mizuRunner.go index fb2d75a96..4daa8b40c 100644 --- a/cli/mizu/mizuRunner.go +++ b/cli/mizu/mizuRunner.go @@ -13,6 +13,8 @@ import ( "time" ) +var currentlyTappedPods []core.Pod + func Run(podRegexQuery *regexp.Regexp) { kubernetesProvider := kubernetes.NewProvider(config.Configuration.KubeConfigPath, config.Configuration.Namespace) ctx, cancel := context.WithCancel(context.Background()) @@ -23,6 +25,7 @@ func Run(podRegexQuery *regexp.Regexp) { fmt.Printf("Error getting pods to tap %v\n", err) return } + currentlyTappedPods = matchingPods nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, matchingPods) if err != nil { @@ -35,18 +38,7 @@ func Run(podRegexQuery *regexp.Regexp) { return } go portForwardApiPod(ctx, kubernetesProvider, cancel) //TODO convert this to job for built in pod ttl or have the running app handle this - - controlSocket, err := CreateControlSocket(fmt.Sprintf("ws://localhost:%d/ws", config.Configuration.GuiPort)) - if err != nil { - fmt.Printf("error establishing control socket connection %s\n", err) - cancel() - } - time.Sleep(2 * time.Second) - err = controlSocket.SendNewTappedPodsListMessage(kubernetesProvider.Namespace, matchingPods) - if err != nil { - fmt.Printf("error Sending message via control socket %s\n", err) - } - + go syncApiStatus(ctx, cancel, kubernetesProvider.Namespace) waitForFinish(ctx, cancel) //block until exit signal or error // TODO handle incoming traffic from tapper using a channel @@ -200,3 +192,25 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) { cancel() } } + +func syncApiStatus(ctx context.Context, cancel context.CancelFunc, namespace string) { + controlSocket, err := CreateControlSocket(fmt.Sprintf("ws://localhost:%d/ws", config.Configuration.GuiPort)) + if err != nil { + fmt.Printf("error establishing control socket connection %s\n", err) + cancel() + } + + for { + select { + case <- ctx.Done(): + return + default: + err = controlSocket.SendNewTappedPodsListMessage(namespace, currentlyTappedPods) + if err != nil { + fmt.Printf("error Sending message via control socket %s\n", err) + } + time.Sleep(5 * time.Second) + } + } + +} diff --git a/shared/socket_client.go b/shared/socket_client.go index 862ec7625..2faeabf5c 100644 --- a/shared/socket_client.go +++ b/shared/socket_client.go @@ -1,28 +1,34 @@ package shared import ( + "fmt" "github.com/gorilla/websocket" "time" ) -func ConnectToSocketServer(address string) (*websocket.Conn, error) { - const maxTry = 5 - const sleepTime = time.Second * 2 +const ( + DEFAULT_SOCKET_RETRIES = 3 + DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10 +) + +func ConnectToSocketServer(address string, retries int, retrySleepTime time.Duration, hideTimeoutErrors bool) (*websocket.Conn, error) { var err error var connection *websocket.Conn try := 0 // Connection to server fails if client pod is up before server. // Retries solve this issue. - for try < maxTry { + for try < retries { connection, _, err = websocket.DefaultDialer.Dial(address, nil) if err != nil { try++ - // fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err) + if !hideTimeoutErrors { + fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err) + } } else { break } - time.Sleep(sleepTime) + time.Sleep(retrySleepTime) } if err != nil {