From 4f74be47d3b57028aa3df69a2b5c6954b03be636 Mon Sep 17 00:00:00 2001 From: "M. Mert Yildiran" Date: Thu, 16 Sep 2021 18:32:51 +0300 Subject: [PATCH] Write the client and integrate to the new real-time database --- agent/pkg/api/main.go | 9 ++- agent/pkg/api/realtime_client.go | 110 +++++++++++++++++++++++++++++++ agent/pkg/models/models.go | 7 +- tap/api/api.go | 1 + 4 files changed, 121 insertions(+), 6 deletions(-) create mode 100644 agent/pkg/api/realtime_client.go diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index 3bcf5081a..ec92e8a00 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -21,7 +21,6 @@ import ( "github.com/up9inc/mizu/tap" tapApi "github.com/up9inc/mizu/tap/api" - "mizuserver/pkg/models" "mizuserver/pkg/resolver" "mizuserver/pkg/utils" ) @@ -105,10 +104,14 @@ func startReadingFiles(workingDir string) { } func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extensionsMap map[string]*tapApi.Extension) { + conn := Connect("localhost", "8000") + SetModeInsert(conn) if outputItems == nil { panic("Channel of captured messages is nil") } + go Query("", "localhost", "8000") + for item := range outputItems { extension := extensionsMap[item.Protocol.Name] resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo) @@ -116,8 +119,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension baseEntry := extension.Dissector.Summarize(mizuEntry) mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry) database.CreateEntry(mizuEntry) - baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(baseEntry) - BroadcastToBrowserClients(baseEntryBytes) + item.Summary = baseEntry + Insert(item, conn) } } diff --git a/agent/pkg/api/realtime_client.go b/agent/pkg/api/realtime_client.go new file mode 100644 index 000000000..0d50b643f --- /dev/null +++ b/agent/pkg/api/realtime_client.go @@ -0,0 +1,110 @@ +package api + +import ( + "bufio" + "encoding/json" + "fmt" + "mizuserver/pkg/models" + "net" + "os" + "regexp" + "sync" + "time" +) + +func Connect(host string, port string) (conn net.Conn) { + dest := host + ":" + port + + fmt.Printf("Connecting to %s...\n", dest) + + conn, err := net.Dial("tcp", dest) + + if err != nil { + if _, t := err.(*net.OpError); t { + fmt.Println("Some problem connecting.") + } else { + fmt.Println("Unknown error: " + err.Error()) + } + os.Exit(1) + } + + return +} + +func SetModeInsert(conn net.Conn) { + conn.SetWriteDeadline(time.Now().Add(1 * time.Second)) + conn.Write([]byte("/insert\n")) +} + +func Insert(entry interface{}, conn net.Conn) { + var data []byte + conn.SetWriteDeadline(time.Now().Add(1 * time.Second)) + data, _ = json.Marshal(entry) + conn.Write(data) + + conn.Write([]byte("\n")) +} + +func Query(query string, host string, port string) { + conn := Connect(host, port) + + var wg sync.WaitGroup + go readConnection(&wg, conn) + wg.Add(1) + + conn.SetWriteDeadline(time.Now().Add(1 * time.Second)) + conn.Write([]byte("/query\n")) + + conn.SetWriteDeadline(time.Now().Add(1 * time.Second)) + conn.Write([]byte(fmt.Sprintf("%s\n", query))) + + wg.Wait() +} + +func readConnection(wg *sync.WaitGroup, conn net.Conn) { + defer wg.Done() + for { + scanner := bufio.NewScanner(conn) + + for { + ok := scanner.Scan() + text := scanner.Text() + + command := handleCommands(text) + if !command { + fmt.Printf("\b\b** %s\n> ", text) + + var data map[string]interface{} + if err := json.Unmarshal([]byte(text), &data); err != nil { + panic(err) + } + + baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(data["Summary"].(map[string]interface{})) + BroadcastToBrowserClients(baseEntryBytes) + } + + if !ok { + fmt.Println("Reached EOF on server connection.") + break + } + } + } +} + +func handleCommands(text string) bool { + r, err := regexp.Compile("^%.*%$") + if err == nil { + if r.MatchString(text) { + + switch { + case text == "%quit%": + fmt.Println("\b\bServer is leaving. Hanging up.") + os.Exit(0) + } + + return true + } + } + + return false +} diff --git a/agent/pkg/models/models.go b/agent/pkg/models/models.go index e97069168..3f80ead7a 100644 --- a/agent/pkg/models/models.go +++ b/agent/pkg/models/models.go @@ -2,10 +2,11 @@ package models import ( "encoding/json" - tapApi "github.com/up9inc/mizu/tap/api" "mizuserver/pkg/rules" "mizuserver/pkg/utils" + tapApi "github.com/up9inc/mizu/tap/api" + "github.com/google/martian/har" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/tap" @@ -42,7 +43,7 @@ type HarFetchRequestQuery struct { type WebSocketEntryMessage struct { *shared.WebSocketMessageMetadata - Data *tapApi.BaseEntryDetails `json:"data,omitempty"` + Data map[string]interface{} `json:"data,omitempty"` } type WebSocketTappedEntryMessage struct { @@ -55,7 +56,7 @@ type WebsocketOutboundLinkMessage struct { Data *tap.OutboundLink } -func CreateBaseEntryWebSocketMessage(base *tapApi.BaseEntryDetails) ([]byte, error) { +func CreateBaseEntryWebSocketMessage(base map[string]interface{}) ([]byte, error) { message := &WebSocketEntryMessage{ WebSocketMessageMetadata: &shared.WebSocketMessageMetadata{ MessageType: shared.WebSocketMessageTypeEntry, diff --git a/tap/api/api.go b/tap/api/api.go index e9f9c0208..12aff0277 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -66,6 +66,7 @@ type OutputChannelItem struct { Timestamp int64 ConnectionInfo *ConnectionInfo Pair *RequestResponsePair + Summary *BaseEntryDetails } type SuperTimer struct {