diff --git a/agent/main.go b/agent/main.go index f0cff4971..99f4934af 100644 --- a/agent/main.go +++ b/agent/main.go @@ -65,6 +65,7 @@ func main() { hostApi(nil) } else if *tapperMode { + rlog.Infof("Starting tapper, websocket address: %s", *apiServerAddress) if *apiServerAddress == "" { panic("API server address must be provided with --api-server-address when using --tap") } @@ -77,13 +78,13 @@ func main() { filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions) - socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false) + socketConnection, err := utils.ConnectToSocketServer(*apiServerAddress) if err != nil { panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err)) } + rlog.Infof("Connected successfully to websocket %s", *apiServerAddress) go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel) - // go pipeOutboundLinksChannelToSocket(socketConnection, outboundLinkOutputChannel) } else if *apiServerMode { api.StartResolving(*namespace) @@ -122,7 +123,7 @@ func loadExtensions() { extensionsMap = make(map[string]*tapApi.Extension) for i, file := range files { filename := file.Name() - log.Printf("Loading extension: %s\n", filename) + rlog.Infof("Loading extension: %s\n", filename) extension := &tapApi.Extension{ Path: path.Join(extensionsDir, filename), } @@ -290,7 +291,7 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha for messageData := range messageDataChannel { marshaledData, err := models.CreateWebsocketTappedEntryMessage(messageData) if err != nil { - rlog.Infof("error converting message to json %s, (%v,%+v)\n", err, err, err) + rlog.Errorf("error converting message to json %v, err: %s, (%v,%+v)", messageData, err, err, err) continue } @@ -298,26 +299,8 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha // and goes into the intermediate WebSocket. err = connection.WriteMessage(websocket.TextMessage, marshaledData) if err != nil { - rlog.Infof("error sending message through socket server %s, (%v,%+v)\n", err, err, err) + rlog.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err) continue } } } - -func pipeOutboundLinksChannelToSocket(connection *websocket.Conn, outboundLinkChannel <-chan *tap.OutboundLink) { - for outboundLink := range outboundLinkChannel { - if outboundLink.SuggestedProtocol == tap.TLSProtocol { - marshaledData, err := models.CreateWebsocketOutboundLinkMessage(outboundLink) - if err != nil { - rlog.Infof("Error converting outbound link to json %s, (%v,%+v)", err, err, err) - continue - } - - err = connection.WriteMessage(websocket.TextMessage, marshaledData) - if err != nil { - rlog.Infof("error sending outbound link message through socket server %s, (%v,%+v)", err, err, err) - continue - } - } - } -} diff --git a/agent/pkg/rules/rulesHTTP.go b/agent/pkg/rules/rulesHTTP.go index 84215e7fc..e4dc927b4 100644 --- a/agent/pkg/rules/rulesHTTP.go +++ b/agent/pkg/rules/rulesHTTP.go @@ -4,6 +4,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "github.com/romana/rlog" "reflect" "regexp" "strings" @@ -65,7 +66,7 @@ func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched { if err != nil { continue } - fmt.Println(matchValue, rule.Value) + rlog.Info(matchValue, rule.Value) } else { val := fmt.Sprint(out) matchValue, err = regexp.MatchString(rule.Value, val) diff --git a/agent/pkg/utils/har.go b/agent/pkg/utils/har.go index 419bf7568..6386356d0 100644 --- a/agent/pkg/utils/har.go +++ b/agent/pkg/utils/har.go @@ -4,13 +4,12 @@ import ( "bytes" "errors" "fmt" - "strconv" - "strings" - "time" - "github.com/google/martian/har" "github.com/romana/rlog" "github.com/up9inc/mizu/tap/api" + "strconv" + "strings" + "time" ) // Keep it because we might want cookies in the future diff --git a/shared/socket_client.go b/agent/pkg/utils/socket_client.go similarity index 53% rename from shared/socket_client.go rename to agent/pkg/utils/socket_client.go index 4cc618356..aa532852a 100644 --- a/shared/socket_client.go +++ b/agent/pkg/utils/socket_client.go @@ -1,8 +1,8 @@ -package shared +package utils import ( - "fmt" "github.com/gorilla/websocket" + "github.com/romana/rlog" "time" ) @@ -11,24 +11,23 @@ const ( DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10 ) -func ConnectToSocketServer(address string, retries int, retrySleepTime time.Duration, hideTimeoutErrors bool) (*websocket.Conn, error) { +func ConnectToSocketServer(address string) (*websocket.Conn, error) { var err error var connection *websocket.Conn try := 0 // Connection to server fails if client pod is up before server. // Retries solve this issue. - for try < retries { + for try < DEFAULT_SOCKET_RETRIES { + rlog.Infof("Trying to connect to websocket: %s, attempt: %v/%v", address, try, DEFAULT_SOCKET_RETRIES) connection, _, err = websocket.DefaultDialer.Dial(address, nil) if err != nil { + rlog.Warnf("Failed connecting to websocket: %s, attempt: %v/%v, err: %s, (%v,%+v)", address, try, DEFAULT_SOCKET_RETRIES, err, err, err) try++ - if !hideTimeoutErrors { - fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err) - } } else { break } - time.Sleep(retrySleepTime) + time.Sleep(DEFAULT_SOCKET_RETRY_SLEEP_TIME) } if err != nil { diff --git a/agent/pkg/utils/utils.go b/agent/pkg/utils/utils.go index cdb5a35cb..e4ac80232 100644 --- a/agent/pkg/utils/utils.go +++ b/agent/pkg/utils/utils.go @@ -4,7 +4,6 @@ import ( "context" "github.com/gin-gonic/gin" "github.com/romana/rlog" - "log" "net/http" "net/url" "os" @@ -18,8 +17,8 @@ import ( func StartServer(app *gin.Engine) { signals := make(chan os.Signal, 2) signal.Notify(signals, - os.Interrupt, // this catch ctrl + c - syscall.SIGTSTP, // this catch ctrl + z + os.Interrupt, // this catch ctrl + c + syscall.SIGTSTP, // this catch ctrl + z ) srv := &http.Server{ @@ -36,8 +35,9 @@ func StartServer(app *gin.Engine) { }() // Run server. + rlog.Infof("Starting the server...") if err := app.Run(":8899"); err != nil { - log.Printf("Oops... Server is not running! Reason: %v", err) + rlog.Errorf("Server is not running! Reason: %v", err) } } @@ -54,15 +54,15 @@ func ReverseSlice(data interface{}) { func CheckErr(e error) { if e != nil { - log.Printf("%v", e) + rlog.Infof("%v", e) //panic(e) } } func SetHostname(address, newHostname string) string { replacedUrl, err := url.Parse(address) - if err != nil{ - log.Printf("error replacing hostname to %s in address %s, returning original %v",newHostname, address, err) + if err != nil { + rlog.Infof("error replacing hostname to %s in address %s, returning original %v", newHostname, address, err) return address } replacedUrl.Host = newHostname diff --git a/cli/mizu/controlSocket.go b/cli/mizu/controlSocket.go deleted file mode 100644 index f35b9377c..000000000 --- a/cli/mizu/controlSocket.go +++ /dev/null @@ -1,42 +0,0 @@ -package mizu - -import ( - "encoding/json" - "github.com/gorilla/websocket" - "github.com/up9inc/mizu/shared" - core "k8s.io/api/core/v1" - "time" -) - -type ControlSocket struct { - connection *websocket.Conn -} - -func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) { - connection, err := shared.ConnectToSocketServer(socketServerAddress, 30, 2 * time.Second, true) - if err != nil { - return nil, err - } else { - return &ControlSocket{connection: connection}, nil - } -} - -func (controlSocket *ControlSocket) SendNewTappedPodsListMessage(pods []core.Pod) error { - podInfos := make([]shared.PodInfo, 0) - for _, pod := range pods { - podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace}) - } - tapStatus := shared.TapStatus{Pods: podInfos} - socketMessage := shared.CreateWebSocketStatusMessage(tapStatus) - - jsonMessage, err := json.Marshal(socketMessage) - if err != nil { - return err - } - err = controlSocket.connection.WriteMessage(websocket.TextMessage, jsonMessage) - if err != nil { - return err - } - - return nil -}