From 2d38ef95a7868a1e94c51d463d3f6f86df0e6939 Mon Sep 17 00:00:00 2001 From: Rami Berman Date: Mon, 4 Apr 2022 16:35:35 +0300 Subject: [PATCH] Merge branch 'develop' into refactor_ws # Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit. --- .../api/entry_streamer_socket_connector.go | 57 +++++++++++++++++++ agent/pkg/api/socket_data_streamer.go | 49 ---------------- 2 files changed, 57 insertions(+), 49 deletions(-) create mode 100644 agent/pkg/api/entry_streamer_socket_connector.go diff --git a/agent/pkg/api/entry_streamer_socket_connector.go b/agent/pkg/api/entry_streamer_socket_connector.go new file mode 100644 index 000000000..c5e2e7d0d --- /dev/null +++ b/agent/pkg/api/entry_streamer_socket_connector.go @@ -0,0 +1,57 @@ +package api + +import ( + "fmt" + + basenine "github.com/up9inc/basenine/client/go" + "github.com/up9inc/mizu/agent/pkg/models" + "github.com/up9inc/mizu/shared/logger" + tapApi "github.com/up9inc/mizu/tap/api" +) + +type EntryStreamerSocketConnector interface { + SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) + SendMetadata(socketId int, metadata *basenine.Metadata) + SendToastError(socketId int, err error) + CleanupSocket(socketId int) +} + +type DefaultEntryStreamerSocketConnector struct{} + +func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) { + var message []byte + if params.EnableFullEntries { + message, _ = models.CreateFullEntryWebSocketMessage(entry) + } else { + extension := extensionsMap[entry.Protocol.Name] + base := extension.Dissector.Summarize(entry) + message, _ = models.CreateBaseEntryWebSocketMessage(base) + } + + if err := SendToSocket(socketId, message); err != nil { + logger.Log.Error(err) + } +} + +func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) { + metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata) + if err := SendToSocket(socketId, metadataBytes); err != nil { + logger.Log.Error(err) + } +} + +func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) { + toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{ + Type: "error", + AutoClose: 5000, + Text: fmt.Sprintf("Syntax error: %s", err.Error()), + }) + if err := SendToSocket(socketId, toastBytes); err != nil { + logger.Log.Error(err) + } +} + +func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) { + socketObj := connectedWebsockets[socketId] + socketCleanup(socketId, socketObj) +} diff --git a/agent/pkg/api/socket_data_streamer.go b/agent/pkg/api/socket_data_streamer.go index c93254464..0fde336d5 100644 --- a/agent/pkg/api/socket_data_streamer.go +++ b/agent/pkg/api/socket_data_streamer.go @@ -3,11 +3,9 @@ package api import ( "context" "encoding/json" - "fmt" basenine "github.com/up9inc/basenine/client/go" "github.com/up9inc/mizu/agent/pkg/dependency" - "github.com/up9inc/mizu/agent/pkg/models" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared/logger" tapApi "github.com/up9inc/mizu/tap/api" @@ -17,53 +15,6 @@ type EntryStreamer interface { Get(ctx context.Context, socketId int, params *WebSocketParams) error } -type EntryStreamerSocketConnector interface { - SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) - SendMetadata(socketId int, metadata *basenine.Metadata) - SendToastError(socketId int, err error) - CleanupSocket(socketId int) -} - -type DefaultEntryStreamerSocketConnector struct{} - -func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) { - var message []byte - if params.EnableFullEntries { - message, _ = models.CreateFullEntryWebSocketMessage(entry) - } else { - extension := extensionsMap[entry.Protocol.Name] - base := extension.Dissector.Summarize(entry) - message, _ = models.CreateBaseEntryWebSocketMessage(base) - } - - if err := SendToSocket(socketId, message); err != nil { - logger.Log.Error(err) - } -} - -func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) { - metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata) - if err := SendToSocket(socketId, metadataBytes); err != nil { - logger.Log.Error(err) - } -} - -func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) { - toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{ - Type: "error", - AutoClose: 5000, - Text: fmt.Sprintf("Syntax error: %s", err.Error()), - }) - if err := SendToSocket(socketId, toastBytes); err != nil { - logger.Log.Error(err) - } -} - -func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) { - socketObj := connectedWebsockets[socketId] - socketCleanup(socketId, socketObj) -} - type BasenineEntryStreamer struct{} func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *WebSocketParams) error {