mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-18 08:29:36 +00:00
* Fetch N number of records in M milliseconds timeout before streaming the records * Implement the functionality inside socket data streamer * Reverse the `fetchData` slice * #run_acceptance_tests * Trying to fix the tests. #run_acceptance_tests * javascript compilation error. * #run_acceptance_tests * Name the method better * Upgrade Basenine version to `v0.8.0` * Fix some issues related to `Fetch` * Upgrade the Basenine version in `Dockerfile` as well * Remove underscore from the parameter name * Parameterize fetch timeout ms Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com> Co-authored-by: Roee Gadot <roee.gadot@up9.com>
161 lines
3.7 KiB
Go
161 lines
3.7 KiB
Go
package api
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"time"
|
|
|
|
basenine "github.com/up9inc/basenine/client/go"
|
|
"github.com/up9inc/mizu/agent/pkg/dependency"
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/shared"
|
|
tapApi "github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
type EntryStreamer interface {
|
|
Get(ctx context.Context, socketId int, params *WebSocketParams) error
|
|
}
|
|
|
|
type BasenineEntryStreamer struct{}
|
|
|
|
func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *WebSocketParams) error {
|
|
var connection *basenine.Connection
|
|
|
|
entryStreamerSocketConnector := dependency.GetInstance(dependency.EntryStreamerSocketConnector).(EntryStreamerSocketConnector)
|
|
|
|
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
|
if err != nil {
|
|
logger.Log.Errorf("Failed to establish a connection to Basenine: %v", err)
|
|
entryStreamerSocketConnector.CleanupSocket(socketId)
|
|
return err
|
|
}
|
|
|
|
data := make(chan []byte)
|
|
meta := make(chan []byte)
|
|
|
|
query := params.Query
|
|
err = basenine.Validate(shared.BasenineHost, shared.BaseninePort, query)
|
|
if err != nil {
|
|
entryStreamerSocketConnector.SendToastError(socketId, err)
|
|
}
|
|
|
|
leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector)
|
|
if err != nil {
|
|
logger.Log.Errorf("Fetch error: %v", err.Error())
|
|
}
|
|
|
|
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
|
|
for {
|
|
bytes := <-data
|
|
|
|
if string(bytes) == basenine.CloseChannel {
|
|
return
|
|
}
|
|
|
|
var entry *tapApi.Entry
|
|
err = json.Unmarshal(bytes, &entry)
|
|
if err != nil {
|
|
logger.Log.Debugf("Error unmarshalling entry: %v", err.Error())
|
|
continue
|
|
}
|
|
|
|
entryStreamerSocketConnector.SendEntry(socketId, entry, params)
|
|
}
|
|
}
|
|
|
|
handleMetaChannel := func(c *basenine.Connection, meta chan []byte) {
|
|
for {
|
|
bytes := <-meta
|
|
|
|
if string(bytes) == basenine.CloseChannel {
|
|
return
|
|
}
|
|
|
|
var metadata *basenine.Metadata
|
|
err = json.Unmarshal(bytes, &metadata)
|
|
if err != nil {
|
|
logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error())
|
|
continue
|
|
}
|
|
|
|
entryStreamerSocketConnector.SendMetadata(socketId, metadata)
|
|
}
|
|
}
|
|
|
|
go handleDataChannel(connection, data)
|
|
go handleMetaChannel(connection, meta)
|
|
|
|
if err = connection.Query(leftOff, query, data, meta); err != nil {
|
|
logger.Log.Errorf("Query mode call failed: %v", err)
|
|
entryStreamerSocketConnector.CleanupSocket(socketId)
|
|
return err
|
|
}
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
data <- []byte(basenine.CloseChannel)
|
|
meta <- []byte(basenine.CloseChannel)
|
|
connection.Close()
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Reverses a []byte slice.
|
|
func (e *BasenineEntryStreamer) fetch(socketId int, params *WebSocketParams, connector EntryStreamerSocketConnector) (leftOff string, err error) {
|
|
if params.Fetch <= 0 {
|
|
leftOff = params.LeftOff
|
|
return
|
|
}
|
|
|
|
var data [][]byte
|
|
var firstMeta []byte
|
|
var lastMeta []byte
|
|
data, firstMeta, lastMeta, err = basenine.Fetch(
|
|
shared.BasenineHost,
|
|
shared.BaseninePort,
|
|
params.LeftOff,
|
|
-1,
|
|
params.Query,
|
|
params.Fetch,
|
|
time.Duration(params.TimeoutMs)*time.Millisecond,
|
|
)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
var firstMetadata *basenine.Metadata
|
|
err = json.Unmarshal(firstMeta, &firstMetadata)
|
|
if err != nil {
|
|
return
|
|
}
|
|
leftOff = firstMetadata.LeftOff
|
|
|
|
var lastMetadata *basenine.Metadata
|
|
err = json.Unmarshal(lastMeta, &lastMetadata)
|
|
if err != nil {
|
|
return
|
|
}
|
|
connector.SendMetadata(socketId, lastMetadata)
|
|
|
|
data = e.reverseBytesSlice(data)
|
|
for _, row := range data {
|
|
var entry *tapApi.Entry
|
|
err = json.Unmarshal(row, &entry)
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
connector.SendEntry(socketId, entry, params)
|
|
}
|
|
return
|
|
}
|
|
|
|
// Reverses a []byte slice.
|
|
func (e *BasenineEntryStreamer) reverseBytesSlice(arr [][]byte) (newArr [][]byte) {
|
|
for i := len(arr) - 1; i >= 0; i-- {
|
|
newArr = append(newArr, arr[i])
|
|
}
|
|
return newArr
|
|
}
|