mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-06 04:48:40 +00:00
Merge branch 'develop' into refactor_ws
# Please enter a commit message to explain why this merge is necessary, # especially if it merges an updated upstream into a topic branch. # # Lines starting with '#' will be ignored, and an empty message aborts # the commit.
This commit is contained in:
parent
5ba7423b53
commit
2d38ef95a7
57
agent/pkg/api/entry_streamer_socket_connector.go
Normal file
57
agent/pkg/api/entry_streamer_socket_connector.go
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
basenine "github.com/up9inc/basenine/client/go"
|
||||||
|
"github.com/up9inc/mizu/agent/pkg/models"
|
||||||
|
"github.com/up9inc/mizu/shared/logger"
|
||||||
|
tapApi "github.com/up9inc/mizu/tap/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EntryStreamerSocketConnector interface {
|
||||||
|
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams)
|
||||||
|
SendMetadata(socketId int, metadata *basenine.Metadata)
|
||||||
|
SendToastError(socketId int, err error)
|
||||||
|
CleanupSocket(socketId int)
|
||||||
|
}
|
||||||
|
|
||||||
|
type DefaultEntryStreamerSocketConnector struct{}
|
||||||
|
|
||||||
|
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) {
|
||||||
|
var message []byte
|
||||||
|
if params.EnableFullEntries {
|
||||||
|
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
||||||
|
} else {
|
||||||
|
extension := extensionsMap[entry.Protocol.Name]
|
||||||
|
base := extension.Dissector.Summarize(entry)
|
||||||
|
message, _ = models.CreateBaseEntryWebSocketMessage(base)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := SendToSocket(socketId, message); err != nil {
|
||||||
|
logger.Log.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) {
|
||||||
|
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
|
||||||
|
if err := SendToSocket(socketId, metadataBytes); err != nil {
|
||||||
|
logger.Log.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) {
|
||||||
|
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
|
||||||
|
Type: "error",
|
||||||
|
AutoClose: 5000,
|
||||||
|
Text: fmt.Sprintf("Syntax error: %s", err.Error()),
|
||||||
|
})
|
||||||
|
if err := SendToSocket(socketId, toastBytes); err != nil {
|
||||||
|
logger.Log.Error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) {
|
||||||
|
socketObj := connectedWebsockets[socketId]
|
||||||
|
socketCleanup(socketId, socketObj)
|
||||||
|
}
|
@ -3,11 +3,9 @@ package api
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
|
||||||
|
|
||||||
basenine "github.com/up9inc/basenine/client/go"
|
basenine "github.com/up9inc/basenine/client/go"
|
||||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||||
"github.com/up9inc/mizu/agent/pkg/models"
|
|
||||||
"github.com/up9inc/mizu/shared"
|
"github.com/up9inc/mizu/shared"
|
||||||
"github.com/up9inc/mizu/shared/logger"
|
"github.com/up9inc/mizu/shared/logger"
|
||||||
tapApi "github.com/up9inc/mizu/tap/api"
|
tapApi "github.com/up9inc/mizu/tap/api"
|
||||||
@ -17,53 +15,6 @@ type EntryStreamer interface {
|
|||||||
Get(ctx context.Context, socketId int, params *WebSocketParams) error
|
Get(ctx context.Context, socketId int, params *WebSocketParams) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type EntryStreamerSocketConnector interface {
|
|
||||||
SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams)
|
|
||||||
SendMetadata(socketId int, metadata *basenine.Metadata)
|
|
||||||
SendToastError(socketId int, err error)
|
|
||||||
CleanupSocket(socketId int)
|
|
||||||
}
|
|
||||||
|
|
||||||
type DefaultEntryStreamerSocketConnector struct{}
|
|
||||||
|
|
||||||
func (e *DefaultEntryStreamerSocketConnector) SendEntry(socketId int, entry *tapApi.Entry, params *WebSocketParams) {
|
|
||||||
var message []byte
|
|
||||||
if params.EnableFullEntries {
|
|
||||||
message, _ = models.CreateFullEntryWebSocketMessage(entry)
|
|
||||||
} else {
|
|
||||||
extension := extensionsMap[entry.Protocol.Name]
|
|
||||||
base := extension.Dissector.Summarize(entry)
|
|
||||||
message, _ = models.CreateBaseEntryWebSocketMessage(base)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := SendToSocket(socketId, message); err != nil {
|
|
||||||
logger.Log.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *DefaultEntryStreamerSocketConnector) SendMetadata(socketId int, metadata *basenine.Metadata) {
|
|
||||||
metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata)
|
|
||||||
if err := SendToSocket(socketId, metadataBytes); err != nil {
|
|
||||||
logger.Log.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *DefaultEntryStreamerSocketConnector) SendToastError(socketId int, err error) {
|
|
||||||
toastBytes, _ := models.CreateWebsocketToastMessage(&models.ToastMessage{
|
|
||||||
Type: "error",
|
|
||||||
AutoClose: 5000,
|
|
||||||
Text: fmt.Sprintf("Syntax error: %s", err.Error()),
|
|
||||||
})
|
|
||||||
if err := SendToSocket(socketId, toastBytes); err != nil {
|
|
||||||
logger.Log.Error(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *DefaultEntryStreamerSocketConnector) CleanupSocket(socketId int) {
|
|
||||||
socketObj := connectedWebsockets[socketId]
|
|
||||||
socketCleanup(socketId, socketObj)
|
|
||||||
}
|
|
||||||
|
|
||||||
type BasenineEntryStreamer struct{}
|
type BasenineEntryStreamer struct{}
|
||||||
|
|
||||||
func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *WebSocketParams) error {
|
func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *WebSocketParams) error {
|
||||||
|
Loading…
Reference in New Issue
Block a user