mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-20 13:33:48 +00:00
172 lines
4.1 KiB
Go
172 lines
4.1 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
basenine "github.com/up9inc/basenine/client/go"
|
|
"github.com/up9inc/mizu/agent/pkg/dependency"
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/shared"
|
|
tapApi "github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
type EntryStreamer interface {
|
|
Get(ctx context.Context, socketId int, params *WebSocketParams) error
|
|
}
|
|
|
|
type BasenineEntryStreamer struct{}
|
|
|
|
func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *WebSocketParams) error {
|
|
var connection *basenine.Connection
|
|
|
|
entryStreamerSocketConnector := dependency.GetInstance(dependency.EntryStreamerSocketConnector).(EntryStreamerSocketConnector)
|
|
|
|
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
|
if err != nil {
|
|
logger.Log.Errorf("Failed to establish a connection to Basenine: %v", err)
|
|
entryStreamerSocketConnector.CleanupSocket(socketId)
|
|
return err
|
|
}
|
|
|
|
data := make(chan []byte)
|
|
meta := make(chan []byte)
|
|
|
|
query := params.Query
|
|
if err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query); err != nil {
|
|
if err := entryStreamerSocketConnector.SendToastError(socketId, err); err != nil {
|
|
return err
|
|
}
|
|
|
|
entryStreamerSocketConnector.CleanupSocket(socketId)
|
|
return err
|
|
}
|
|
|
|
leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector)
|
|
if err != nil {
|
|
logger.Log.Errorf("Fetch error: %v", err)
|
|
}
|
|
|
|
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
|
|
for {
|
|
bytes := <-data
|
|
|
|
if string(bytes) == basenine.CloseChannel {
|
|
return
|
|
}
|
|
|
|
var entry *tapApi.Entry
|
|
if err = json.Unmarshal(bytes, &entry); err != nil {
|
|
logger.Log.Debugf("Error unmarshalling entry: %v", err)
|
|
continue
|
|
}
|
|
|
|
if err := entryStreamerSocketConnector.SendEntry(socketId, entry, params); err != nil {
|
|
logger.Log.Errorf("Error sending entry to socket, err: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
handleMetaChannel := func(c *basenine.Connection, meta chan []byte) {
|
|
for {
|
|
bytes := <-meta
|
|
|
|
if string(bytes) == basenine.CloseChannel {
|
|
return
|
|
}
|
|
|
|
var metadata *basenine.Metadata
|
|
if err = json.Unmarshal(bytes, &metadata); err != nil {
|
|
logger.Log.Debugf("Error unmarshalling metadata: %v", err)
|
|
continue
|
|
}
|
|
|
|
if err := entryStreamerSocketConnector.SendMetadata(socketId, metadata); err != nil {
|
|
logger.Log.Errorf("Error sending metadata to socket, err: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
go handleDataChannel(connection, data)
|
|
go handleMetaChannel(connection, meta)
|
|
|
|
if err = connection.Query(leftOff, query, data, meta); err != nil {
|
|
logger.Log.Errorf("Query mode call failed: %v", err)
|
|
entryStreamerSocketConnector.CleanupSocket(socketId)
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
data <- []byte(basenine.CloseChannel)
|
|
meta <- []byte(basenine.CloseChannel)
|
|
connection.Close()
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Reverses a []byte slice.
|
|
func (e *BasenineEntryStreamer) fetch(socketId int, params *WebSocketParams, connector EntryStreamerSocketConnector) (leftOff string, err error) {
|
|
if params.Fetch <= 0 {
|
|
leftOff = params.LeftOff
|
|
return
|
|
}
|
|
|
|
var data [][]byte
|
|
var firstMeta []byte
|
|
var lastMeta []byte
|
|
data, firstMeta, lastMeta, err = basenine.Fetch(
|
|
shared.BasenineHost,
|
|
shared.BaseninePort,
|
|
params.LeftOff,
|
|
-1,
|
|
params.Query,
|
|
params.Fetch,
|
|
time.Duration(params.TimeoutMs)*time.Millisecond,
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var firstMetadata *basenine.Metadata
|
|
if err = json.Unmarshal(firstMeta, &firstMetadata); err != nil {
|
|
return
|
|
}
|
|
|
|
leftOff = firstMetadata.LeftOff
|
|
|
|
var lastMetadata *basenine.Metadata
|
|
if err = json.Unmarshal(lastMeta, &lastMetadata); err != nil {
|
|
return
|
|
}
|
|
|
|
if err = connector.SendMetadata(socketId, lastMetadata); err != nil {
|
|
return
|
|
}
|
|
|
|
data = e.reverseBytesSlice(data)
|
|
for _, row := range data {
|
|
var entry *tapApi.Entry
|
|
if err = json.Unmarshal(row, &entry); err != nil {
|
|
break
|
|
}
|
|
|
|
if err = connector.SendEntry(socketId, entry, params); err != nil {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
// Reverses a []byte slice.
|
|
func (e *BasenineEntryStreamer) reverseBytesSlice(arr [][]byte) (newArr [][]byte) {
|
|
for i := len(arr) - 1; i >= 0; i-- {
|
|
newArr = append(newArr, arr[i])
|
|
}
|
|
return newArr
|
|
}
|