Update main.go, controlSocket.go, and 2 more files...

This commit is contained in:
RamiBerm
2021-05-24 10:45:22 +03:00
parent 24dba9e851
commit c1409251e2
4 changed files with 41 additions and 20 deletions

View File

@@ -47,7 +47,7 @@ func main() {
} }
harOutputChannel := tap.StartPassiveTapper() harOutputChannel := tap.StartPassiveTapper()
socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress) socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
if err != nil { if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err)) panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err))
} }

View File

@@ -5,6 +5,7 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
"time"
) )
type ControlSocket struct { type ControlSocket struct {
@@ -12,7 +13,7 @@ type ControlSocket struct {
} }
func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) { func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) {
connection, err := shared.ConnectToSocketServer(socketServerAddress) connection, err := shared.ConnectToSocketServer(socketServerAddress, 30, 2 * time.Second, true)
if err != nil { if err != nil {
return nil, err return nil, err
} else { } else {

View File

@@ -13,6 +13,8 @@ import (
"time" "time"
) )
var currentlyTappedPods []core.Pod
func Run(podRegexQuery *regexp.Regexp) { func Run(podRegexQuery *regexp.Regexp) {
kubernetesProvider := kubernetes.NewProvider(config.Configuration.KubeConfigPath, config.Configuration.Namespace) kubernetesProvider := kubernetes.NewProvider(config.Configuration.KubeConfigPath, config.Configuration.Namespace)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
@@ -23,6 +25,7 @@ func Run(podRegexQuery *regexp.Regexp) {
fmt.Printf("Error getting pods to tap %v\n", err) fmt.Printf("Error getting pods to tap %v\n", err)
return return
} }
currentlyTappedPods = matchingPods
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, matchingPods) nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, matchingPods)
if err != nil { if err != nil {
@@ -35,18 +38,7 @@ func Run(podRegexQuery *regexp.Regexp) {
return return
} }
go portForwardApiPod(ctx, kubernetesProvider, cancel) //TODO convert this to job for built in pod ttl or have the running app handle this go portForwardApiPod(ctx, kubernetesProvider, cancel) //TODO convert this to job for built in pod ttl or have the running app handle this
go syncApiStatus(ctx, cancel, kubernetesProvider.Namespace)
controlSocket, err := CreateControlSocket(fmt.Sprintf("ws://localhost:%d/ws", config.Configuration.GuiPort))
if err != nil {
fmt.Printf("error establishing control socket connection %s\n", err)
cancel()
}
time.Sleep(2 * time.Second)
err = controlSocket.SendNewTappedPodsListMessage(kubernetesProvider.Namespace, matchingPods)
if err != nil {
fmt.Printf("error Sending message via control socket %s\n", err)
}
waitForFinish(ctx, cancel) //block until exit signal or error waitForFinish(ctx, cancel) //block until exit signal or error
// TODO handle incoming traffic from tapper using a channel // TODO handle incoming traffic from tapper using a channel
@@ -200,3 +192,25 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
cancel() cancel()
} }
} }
func syncApiStatus(ctx context.Context, cancel context.CancelFunc, namespace string) {
controlSocket, err := CreateControlSocket(fmt.Sprintf("ws://localhost:%d/ws", config.Configuration.GuiPort))
if err != nil {
fmt.Printf("error establishing control socket connection %s\n", err)
cancel()
}
for {
select {
case <- ctx.Done():
return
default:
err = controlSocket.SendNewTappedPodsListMessage(namespace, currentlyTappedPods)
if err != nil {
fmt.Printf("error Sending message via control socket %s\n", err)
}
time.Sleep(5 * time.Second)
}
}
}

View File

@@ -1,28 +1,34 @@
package shared package shared
import ( import (
"fmt"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"time" "time"
) )
func ConnectToSocketServer(address string) (*websocket.Conn, error) { const (
const maxTry = 5 DEFAULT_SOCKET_RETRIES = 3
const sleepTime = time.Second * 2 DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10
)
func ConnectToSocketServer(address string, retries int, retrySleepTime time.Duration, hideTimeoutErrors bool) (*websocket.Conn, error) {
var err error var err error
var connection *websocket.Conn var connection *websocket.Conn
try := 0 try := 0
// Connection to server fails if client pod is up before server. // Connection to server fails if client pod is up before server.
// Retries solve this issue. // Retries solve this issue.
for try < maxTry { for try < retries {
connection, _, err = websocket.DefaultDialer.Dial(address, nil) connection, _, err = websocket.DefaultDialer.Dial(address, nil)
if err != nil { if err != nil {
try++ try++
// fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err) if !hideTimeoutErrors {
fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err)
}
} else { } else {
break break
} }
time.Sleep(sleepTime) time.Sleep(retrySleepTime)
} }
if err != nil { if err != nil {