diff --git a/agent/pkg/api/entry_streamer_socket_connector.go b/agent/pkg/api/entry_streamer_socket_connector.go index a40438739..1f376c23d 100644 --- a/agent/pkg/api/entry_streamer_socket_connector.go +++ b/agent/pkg/api/entry_streamer_socket_connector.go @@ -5,20 +5,19 @@ import ( basenine "github.com/up9inc/basenine/client/go" "github.com/up9inc/mizu/agent/pkg/models" - "github.com/up9inc/mizu/logger" tapApi "github.com/up9inc/mizu/tap/api" ) type EntryStreamerSocketConnector interface { - SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) - SendMetadata(socketId int, metadata *basenine.Metadata) - SendToastError(socketId int, err error) + SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error + SendMetadata(socketId int, metadata *basenine.Metadata) error + SendToastError(socketId int, err error) error CleanupSocket(socketId int) } type DefaultEntryStreamerSocketConnector struct{} -func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) { +func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error { var message []byte if params.EnableFullEntries { message, _ = models.CreateFullEntryWebSocketMessage(entry) @@ -29,26 +28,32 @@ func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tap } if err := SendToSocket(socketId, message); err != nil { - logger.Log.Error(err) + return err } + + return nil } -func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) { +func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) error { metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata) if err := SendToSocket(socketId, metadataBytes); err != nil { - logger.Log.Error(err) + return err } + + return nil } -func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) { +func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) error { toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{ Type: "error", AutoClose: 5000, Text: fmt.Sprintf("Syntax error: %s", err.Error()), }) if err := SendToSocket(socketId, toastBytes); err != nil { - logger.Log.Error(err) + return err } + + return nil } func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) { diff --git a/agent/pkg/api/socket_data_streamer.go b/agent/pkg/api/socket_data_streamer.go index 7ed1d01cb..0a9df3ba0 100644 --- a/agent/pkg/api/socket_data_streamer.go +++ b/agent/pkg/api/socket_data_streamer.go @@ -34,14 +34,18 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W meta := make(chan []byte) query := params.Query - err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query) - if err != nil { - entryStreamerSocketConnector.SendToastError(socketId, err) + if err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query); err != nil { + if err := entryStreamerSocketConnector.SendToastError(socketId, err); err != nil { + return err + } + + entryStreamerSocketConnector.CleanupSocket(socketId) + return err } leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector) if err != nil { - logger.Log.Errorf("Fetch error: %v", err.Error()) + logger.Log.Errorf("Fetch error: %v", err) } handleDataChannel := func(c *basenine.Connection, data chan []byte) { @@ -53,13 +57,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W } var entry *tapApi.Entry - err = json.Unmarshal(bytes, &entry) - if err != nil { - logger.Log.Debugf("Error unmarshalling entry: %v", err.Error()) + if err = json.Unmarshal(bytes, &entry); err != nil { + logger.Log.Debugf("Error unmarshalling entry: %v", err) continue } - entryStreamerSocketConnector.SendEntry(socketId, entry, params) + if err := entryStreamerSocketConnector.SendEntry(socketId, entry, params); err != nil { + logger.Log.Errorf("Error sending entry to socket, err: %v", err) + return + } } } @@ -72,13 +78,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W } var metadata *basenine.Metadata - err = json.Unmarshal(bytes, &metadata) - if err != nil { - logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error()) + if err = json.Unmarshal(bytes, &metadata); err != nil { + logger.Log.Debugf("Error unmarshalling metadata: %v", err) continue } - entryStreamerSocketConnector.SendMetadata(socketId, metadata) + if err := entryStreamerSocketConnector.SendMetadata(socketId, metadata); err != nil { + logger.Log.Errorf("Error sending metadata to socket, err: %v", err) + return + } } } @@ -125,28 +133,31 @@ func (e *BasenineEntryStreamer) fetch(socketId int, params *WebSocketParams, con } var firstMetadata *basenine.Metadata - err = json.Unmarshal(firstMeta, &firstMetadata) - if err != nil { + if err = json.Unmarshal(firstMeta, &firstMetadata); err != nil { return } + leftOff = firstMetadata.LeftOff var lastMetadata *basenine.Metadata - err = json.Unmarshal(lastMeta, &lastMetadata) - if err != nil { + if err = json.Unmarshal(lastMeta, &lastMetadata); err != nil { + return + } + + if err = connector.SendMetadata(socketId, lastMetadata); err != nil { return } - connector.SendMetadata(socketId, lastMetadata) data = e.reverseBytesSlice(data) for _, row := range data { var entry *tapApi.Entry - err = json.Unmarshal(row, &entry) - if err != nil { + if err = json.Unmarshal(row, &entry); err != nil { break } - connector.SendEntry(socketId, entry, params) + if err = connector.SendEntry(socketId, entry, params); err != nil { + return + } } return }