Fetch N number of records in M milliseconds timeout before streaming the records (#1056)

* Fetch N number of records in M milliseconds timeout before streaming the records

* Implement the functionality inside socket data streamer

* Reverse the `fetchData` slice

* #run_acceptance_tests

* Trying to fix the tests.
#run_acceptance_tests

* javascript compilation error.

* #run_acceptance_tests

* Name the method better

* Upgrade Basenine version to `v0.8.0`

* Fix some issues related to `Fetch`

* Upgrade the Basenine version in `Dockerfile` as well

* Remove underscore from the parameter name

* Parameterize fetch timeout ms

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: Roee Gadot <roee.gadot@up9.com>
This commit is contained in:
M. Mert Yıldıran
2022-05-09 09:11:27 -07:00
committed by GitHub
parent 4c0aeb8146
commit eef0ee8023
15 changed files with 105 additions and 55 deletions

View File

@@ -3,6 +3,7 @@ package api
import (
"context"
"encoding/json"
"time"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/dependency"
@@ -38,6 +39,11 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
entryStreamerSocketConnector.SendToastError(socketId, err)
}
leftOff, err := e.fetch(socketId, params, entryStreamerSocketConnector)
if err != nil {
logger.Log.Errorf("Fetch error: %v", err.Error())
}
handleDataChannel := func(c *basenine.Connection, data chan []byte) {
for {
bytes := <-data
@@ -79,7 +85,7 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
go handleDataChannel(connection, data)
go handleMetaChannel(connection, meta)
if err = connection.Query(query, data, meta); err != nil {
if err = connection.Query(leftOff, query, data, meta); err != nil {
logger.Log.Errorf("Query mode call failed: %v", err)
entryStreamerSocketConnector.CleanupSocket(socketId)
return err
@@ -94,3 +100,61 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
return nil
}
// Reverses a []byte slice.
func (e *BasenineEntryStreamer) fetch(socketId int, params *WebSocketParams, connector EntryStreamerSocketConnector) (leftOff string, err error) {
if params.Fetch <= 0 {
leftOff = params.LeftOff
return
}
var data [][]byte
var firstMeta []byte
var lastMeta []byte
data, firstMeta, lastMeta, err = basenine.Fetch(
shared.BasenineHost,
shared.BaseninePort,
params.LeftOff,
-1,
params.Query,
params.Fetch,
time.Duration(params.TimeoutMs)*time.Millisecond,
)
if err != nil {
return
}
var firstMetadata *basenine.Metadata
err = json.Unmarshal(firstMeta, &firstMetadata)
if err != nil {
return
}
leftOff = firstMetadata.LeftOff
var lastMetadata *basenine.Metadata
err = json.Unmarshal(lastMeta, &lastMetadata)
if err != nil {
return
}
connector.SendMetadata(socketId, lastMetadata)
data = e.reverseBytesSlice(data)
for _, row := range data {
var entry *tapApi.Entry
err = json.Unmarshal(row, &entry)
if err != nil {
break
}
connector.SendEntry(socketId, entry, params)
}
return
}
// Reverses a []byte slice.
func (e *BasenineEntryStreamer) reverseBytesSlice(arr [][]byte) (newArr [][]byte) {
for i := len(arr) - 1; i >= 0; i-- {
newArr = append(newArr, arr[i])
}
return newArr
}

View File

@@ -34,8 +34,11 @@ type SocketConnection struct {
}
type WebSocketParams struct {
LeftOff string `json:"leftOff"`
Query string `json:"query"`
EnableFullEntries bool `json:"enableFullEntries"`
Fetch int `json:"fetch"`
TimeoutMs int `json:"timeoutMs"`
}
var (