kubeshark/agent/pkg/entries/entries_provider.go
M. Mert Yıldıran eef0ee8023
Fetch N number of records in M milliseconds timeout before streaming the records (#1056)
* Fetch N number of records in M milliseconds timeout before streaming the records

* Implement the functionality inside socket data streamer

* Reverse the `fetchData` slice

* #run_acceptance_tests

* Trying to fix the tests.
#run_acceptance_tests

* javascript compilation error.

* #run_acceptance_tests

* Name the method better

* Upgrade Basenine version to `v0.8.0`

* Fix some issues related to `Fetch`

* Upgrade the Basenine version in `Dockerfile` as well

* Remove underscore from the parameter name

* Parameterize fetch timeout ms

Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: Roee Gadot <roee.gadot@up9.com>
2022-05-09 19:11:27 +03:00

100 lines
2.9 KiB
Go

package entries
import (
"encoding/json"
"errors"
"time"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/app"
"github.com/up9inc/mizu/agent/pkg/har"
"github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/logger"
"github.com/up9inc/mizu/shared"
tapApi "github.com/up9inc/mizu/tap/api"
)
type EntriesProvider interface {
GetEntries(entriesRequest *models.EntriesRequest) ([]*tapApi.EntryWrapper, *basenine.Metadata, error)
GetEntry(singleEntryRequest *models.SingleEntryRequest, entryId string) (*tapApi.EntryWrapper, error)
}
type BasenineEntriesProvider struct{}
func (e *BasenineEntriesProvider) GetEntries(entriesRequest *models.EntriesRequest) ([]*tapApi.EntryWrapper, *basenine.Metadata, error) {
data, _, lastMeta, err := basenine.Fetch(shared.BasenineHost, shared.BaseninePort,
entriesRequest.LeftOff, entriesRequest.Direction, entriesRequest.Query,
entriesRequest.Limit, time.Duration(entriesRequest.TimeoutMs)*time.Millisecond)
if err != nil {
return nil, nil, err
}
var dataSlice []*tapApi.EntryWrapper
for _, row := range data {
var entry *tapApi.Entry
err = json.Unmarshal(row, &entry)
if err != nil {
return nil, nil, err
}
extension := app.ExtensionsMap[entry.Protocol.Name]
base := extension.Dissector.Summarize(entry)
dataSlice = append(dataSlice, &tapApi.EntryWrapper{
Protocol: entry.Protocol,
Data: entry,
Base: base,
})
}
var metadata *basenine.Metadata
err = json.Unmarshal(lastMeta, &metadata)
if err != nil {
logger.Log.Debugf("Error recieving metadata: %v", err.Error())
}
return dataSlice, metadata, nil
}
func (e *BasenineEntriesProvider) GetEntry(singleEntryRequest *models.SingleEntryRequest, entryId string) (*tapApi.EntryWrapper, error) {
var entry *tapApi.Entry
bytes, err := basenine.Single(shared.BasenineHost, shared.BaseninePort, entryId, singleEntryRequest.Query)
if err != nil {
return nil, err
}
err = json.Unmarshal(bytes, &entry)
if err != nil {
return nil, errors.New(string(bytes))
}
extension := app.ExtensionsMap[entry.Protocol.Name]
base := extension.Dissector.Summarize(entry)
var representation []byte
representation, err = extension.Dissector.Represent(entry.Request, entry.Response)
if err != nil {
return nil, err
}
var rules []map[string]interface{}
var isRulesEnabled bool
if entry.Protocol.Name == "http" {
harEntry, _ := har.NewEntry(entry.Request, entry.Response, entry.StartTime, entry.ElapsedTime)
_, rulesMatched, _isRulesEnabled := models.RunValidationRulesState(*harEntry, entry.Destination.Name)
isRulesEnabled = _isRulesEnabled
inrec, _ := json.Marshal(rulesMatched)
if err := json.Unmarshal(inrec, &rules); err != nil {
logger.Log.Error(err)
}
}
return &tapApi.EntryWrapper{
Protocol: entry.Protocol,
Representation: string(representation),
Data: entry,
Base: base,
Rules: rules,
IsRulesEnabled: isRulesEnabled,
}, nil
}