mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-21 18:04:51 +00:00
Write the client and integrate to the new real-time database
This commit is contained in:
parent
3dec786553
commit
4f74be47d3
@ -21,7 +21,6 @@ import (
|
||||
"github.com/up9inc/mizu/tap"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/resolver"
|
||||
"mizuserver/pkg/utils"
|
||||
)
|
||||
@ -105,10 +104,14 @@ func startReadingFiles(workingDir string) {
|
||||
}
|
||||
|
||||
func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extensionsMap map[string]*tapApi.Extension) {
|
||||
conn := Connect("localhost", "8000")
|
||||
SetModeInsert(conn)
|
||||
if outputItems == nil {
|
||||
panic("Channel of captured messages is nil")
|
||||
}
|
||||
|
||||
go Query("", "localhost", "8000")
|
||||
|
||||
for item := range outputItems {
|
||||
extension := extensionsMap[item.Protocol.Name]
|
||||
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
|
||||
@ -116,8 +119,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
baseEntry := extension.Dissector.Summarize(mizuEntry)
|
||||
mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry)
|
||||
database.CreateEntry(mizuEntry)
|
||||
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(baseEntry)
|
||||
BroadcastToBrowserClients(baseEntryBytes)
|
||||
item.Summary = baseEntry
|
||||
Insert(item, conn)
|
||||
}
|
||||
}
|
||||
|
||||
|
110
agent/pkg/api/realtime_client.go
Normal file
110
agent/pkg/api/realtime_client.go
Normal file
@ -0,0 +1,110 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"mizuserver/pkg/models"
|
||||
"net"
|
||||
"os"
|
||||
"regexp"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Connect(host string, port string) (conn net.Conn) {
|
||||
dest := host + ":" + port
|
||||
|
||||
fmt.Printf("Connecting to %s...\n", dest)
|
||||
|
||||
conn, err := net.Dial("tcp", dest)
|
||||
|
||||
if err != nil {
|
||||
if _, t := err.(*net.OpError); t {
|
||||
fmt.Println("Some problem connecting.")
|
||||
} else {
|
||||
fmt.Println("Unknown error: " + err.Error())
|
||||
}
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func SetModeInsert(conn net.Conn) {
|
||||
conn.SetWriteDeadline(time.Now().Add(1 * time.Second))
|
||||
conn.Write([]byte("/insert\n"))
|
||||
}
|
||||
|
||||
func Insert(entry interface{}, conn net.Conn) {
|
||||
var data []byte
|
||||
conn.SetWriteDeadline(time.Now().Add(1 * time.Second))
|
||||
data, _ = json.Marshal(entry)
|
||||
conn.Write(data)
|
||||
|
||||
conn.Write([]byte("\n"))
|
||||
}
|
||||
|
||||
func Query(query string, host string, port string) {
|
||||
conn := Connect(host, port)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
go readConnection(&wg, conn)
|
||||
wg.Add(1)
|
||||
|
||||
conn.SetWriteDeadline(time.Now().Add(1 * time.Second))
|
||||
conn.Write([]byte("/query\n"))
|
||||
|
||||
conn.SetWriteDeadline(time.Now().Add(1 * time.Second))
|
||||
conn.Write([]byte(fmt.Sprintf("%s\n", query)))
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func readConnection(wg *sync.WaitGroup, conn net.Conn) {
|
||||
defer wg.Done()
|
||||
for {
|
||||
scanner := bufio.NewScanner(conn)
|
||||
|
||||
for {
|
||||
ok := scanner.Scan()
|
||||
text := scanner.Text()
|
||||
|
||||
command := handleCommands(text)
|
||||
if !command {
|
||||
fmt.Printf("\b\b** %s\n> ", text)
|
||||
|
||||
var data map[string]interface{}
|
||||
if err := json.Unmarshal([]byte(text), &data); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(data["Summary"].(map[string]interface{}))
|
||||
BroadcastToBrowserClients(baseEntryBytes)
|
||||
}
|
||||
|
||||
if !ok {
|
||||
fmt.Println("Reached EOF on server connection.")
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func handleCommands(text string) bool {
|
||||
r, err := regexp.Compile("^%.*%$")
|
||||
if err == nil {
|
||||
if r.MatchString(text) {
|
||||
|
||||
switch {
|
||||
case text == "%quit%":
|
||||
fmt.Println("\b\bServer is leaving. Hanging up.")
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
@ -2,10 +2,11 @@ package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
"mizuserver/pkg/rules"
|
||||
"mizuserver/pkg/utils"
|
||||
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
|
||||
"github.com/google/martian/har"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
@ -42,7 +43,7 @@ type HarFetchRequestQuery struct {
|
||||
|
||||
type WebSocketEntryMessage struct {
|
||||
*shared.WebSocketMessageMetadata
|
||||
Data *tapApi.BaseEntryDetails `json:"data,omitempty"`
|
||||
Data map[string]interface{} `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
type WebSocketTappedEntryMessage struct {
|
||||
@ -55,7 +56,7 @@ type WebsocketOutboundLinkMessage struct {
|
||||
Data *tap.OutboundLink
|
||||
}
|
||||
|
||||
func CreateBaseEntryWebSocketMessage(base *tapApi.BaseEntryDetails) ([]byte, error) {
|
||||
func CreateBaseEntryWebSocketMessage(base map[string]interface{}) ([]byte, error) {
|
||||
message := &WebSocketEntryMessage{
|
||||
WebSocketMessageMetadata: &shared.WebSocketMessageMetadata{
|
||||
MessageType: shared.WebSocketMessageTypeEntry,
|
||||
|
@ -66,6 +66,7 @@ type OutputChannelItem struct {
|
||||
Timestamp int64
|
||||
ConnectionInfo *ConnectionInfo
|
||||
Pair *RequestResponsePair
|
||||
Summary *BaseEntryDetails
|
||||
}
|
||||
|
||||
type SuperTimer struct {
|
||||
|
Loading…
Reference in New Issue
Block a user