Added send to socket error validation (#1085)

This commit is contained in:
RoyUP9 2022-05-15 14:44:48 +03:00 committed by GitHub
parent 366d34b8d0
commit 8ea2dabb34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 46 additions and 30 deletions

View File

@ -5,20 +5,19 @@ import (
basenine "github.com/up9inc/basenine/client/go" basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/models" "github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/logger"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
) )
type EntryStreamerSocketConnector interface { type EntryStreamerSocketConnector interface {
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error
SendMetadata(socketId int, metadata *basenine.Metadata) SendMetadata(socketId int, metadata *basenine.Metadata) error
SendToastError(socketId int, err error) SendToastError(socketId int, err error) error
CleanupSocket(socketId int) CleanupSocket(socketId int)
} }
type DefaultEntryStreamerSocketConnector struct{} type DefaultEntryStreamerSocketConnector struct{}
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) { func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) error {
var message []byte var message []byte
if params.EnableFullEntries { if params.EnableFullEntries {
message, _ = models.CreateFullEntryWebSocketMessage(entry) message, _ = models.CreateFullEntryWebSocketMessage(entry)
@ -29,26 +28,32 @@ func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tap
} }
if err := SendToSocket(socketId, message); err != nil { if err := SendToSocket(socketId, message); err != nil {
logger.Log.Error(err) return err
} }
return nil
} }
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) { func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) error {
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata) metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
if err := SendToSocket(socketId, metadataBytes); err != nil { if err := SendToSocket(socketId, metadataBytes); err != nil {
logger.Log.Error(err) return err
} }
return nil
} }
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) { func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) error {
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{ toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
Type: "error", Type: "error",
AutoClose: 5000, AutoClose: 5000,
Text: fmt.Sprintf("Syntax error: %s", err.Error()), Text: fmt.Sprintf("Syntax error: %s", err.Error()),
}) })
if err := SendToSocket(socketId, toastBytes); err != nil { if err := SendToSocket(socketId, toastBytes); err != nil {
logger.Log.Error(err) return err
} }
return nil
} }
func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) { func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) {

View File

@ -34,14 +34,18 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
meta := make(chan []byte) meta := make(chan []byte)
query := params.Query query := params.Query
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query) if err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query); err != nil {
if err != nil { if err := entryStreamerSocketConnector.SendToastError(socketId, err); err != nil {
entryStreamerSocketConnector.SendToastError(socketId, err) return err
}
entryStreamerSocketConnector.CleanupSocket(socketId)
return err
} }
leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector) leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector)
if err != nil { if err != nil {
logger.Log.Errorf("Fetch error: %v", err.Error()) logger.Log.Errorf("Fetch error: %v", err)
} }
handleDataChannel := func(c *basenine.Connection, data chan []byte) { handleDataChannel := func(c *basenine.Connection, data chan []byte) {
@ -53,13 +57,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
} }
var entry *tapApi.Entry var entry *tapApi.Entry
err = json.Unmarshal(bytes, &entry) if err = json.Unmarshal(bytes, &entry); err != nil {
if err != nil { logger.Log.Debugf("Error unmarshalling entry: %v", err)
logger.Log.Debugf("Error unmarshalling entry: %v", err.Error())
continue continue
} }
entryStreamerSocketConnector.SendEntry(socketId, entry, params) if err := entryStreamerSocketConnector.SendEntry(socketId, entry, params); err != nil {
logger.Log.Errorf("Error sending entry to socket, err: %v", err)
return
}
} }
} }
@ -72,13 +78,15 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
} }
var metadata *basenine.Metadata var metadata *basenine.Metadata
err = json.Unmarshal(bytes, &metadata) if err = json.Unmarshal(bytes, &metadata); err != nil {
if err != nil { logger.Log.Debugf("Error unmarshalling metadata: %v", err)
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
continue continue
} }
entryStreamerSocketConnector.SendMetadata(socketId, metadata) if err := entryStreamerSocketConnector.SendMetadata(socketId, metadata); err != nil {
logger.Log.Errorf("Error sending metadata to socket, err: %v", err)
return
}
} }
} }
@ -125,28 +133,31 @@ func (e *BasenineEntryStreamer) fetch(socketId int, params *WebSocketParams, con
} }
var firstMetadata *basenine.Metadata var firstMetadata *basenine.Metadata
err = json.Unmarshal(firstMeta, &firstMetadata) if err = json.Unmarshal(firstMeta, &firstMetadata); err != nil {
if err != nil {
return return
} }
leftOff = firstMetadata.LeftOff leftOff = firstMetadata.LeftOff
var lastMetadata *basenine.Metadata var lastMetadata *basenine.Metadata
err = json.Unmarshal(lastMeta, &lastMetadata) if err = json.Unmarshal(lastMeta, &lastMetadata); err != nil {
if err != nil { return
}
if err = connector.SendMetadata(socketId, lastMetadata); err != nil {
return return
} }
connector.SendMetadata(socketId, lastMetadata)
data = e.reverseBytesSlice(data) data = e.reverseBytesSlice(data)
for _, row := range data { for _, row := range data {
var entry *tapApi.Entry var entry *tapApi.Entry
err = json.Unmarshal(row, &entry) if err = json.Unmarshal(row, &entry); err != nil {
if err != nil {
break break
} }
connector.SendEntry(socketId, entry, params) if err = connector.SendEntry(socketId, entry, params); err != nil {
return
}
} }
return return
} }