mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-21 18:17:16 +00:00
228 lines
5.0 KiB
Go
228 lines
5.0 KiB
Go
package oas
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"net/url"
|
|
"sync"
|
|
|
|
basenine "github.com/up9inc/basenine/client/go"
|
|
"github.com/up9inc/mizu/agent/pkg/har"
|
|
"github.com/up9inc/mizu/shared"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
|
|
"github.com/up9inc/mizu/logger"
|
|
)
|
|
|
|
var (
|
|
syncOnce sync.Once
|
|
instance *defaultOasGenerator
|
|
)
|
|
|
|
type OasGenerator interface {
|
|
Start(conn *basenine.Connection)
|
|
Stop()
|
|
IsStarted() bool
|
|
GetServiceSpecs() *sync.Map
|
|
SetEntriesQuery(query string) bool
|
|
}
|
|
|
|
type defaultOasGenerator struct {
|
|
started bool
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
serviceSpecs *sync.Map
|
|
dbConn *basenine.Connection
|
|
dbMutex sync.Mutex
|
|
entriesQuery string
|
|
}
|
|
|
|
func GetDefaultOasGeneratorInstance() *defaultOasGenerator {
|
|
syncOnce.Do(func() {
|
|
instance = NewDefaultOasGenerator()
|
|
logger.Log.Debug("OAS Generator Initialized")
|
|
})
|
|
return instance
|
|
}
|
|
|
|
func (g *defaultOasGenerator) Start(conn *basenine.Connection) {
|
|
if g.started {
|
|
return
|
|
}
|
|
|
|
if g.dbConn == nil {
|
|
if conn == nil {
|
|
logger.Log.Infof("Creating new DB connection for OAS generator to address %s:%s", shared.BasenineHost, shared.BaseninePort)
|
|
newConn, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
|
if err != nil {
|
|
logger.Log.Error("Error connecting to DB for OAS generator, err: %v", err)
|
|
return
|
|
}
|
|
|
|
conn = newConn
|
|
}
|
|
|
|
g.dbConn = conn
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
g.cancel = cancel
|
|
g.ctx = ctx
|
|
g.serviceSpecs = &sync.Map{}
|
|
|
|
g.started = true
|
|
|
|
go g.runGenerator()
|
|
}
|
|
|
|
func (g *defaultOasGenerator) Stop() {
|
|
if !g.started {
|
|
return
|
|
}
|
|
g.started = false
|
|
|
|
g.cancel()
|
|
g.reset()
|
|
|
|
g.dbMutex.Lock()
|
|
defer g.dbMutex.Unlock()
|
|
if g.dbConn != nil {
|
|
g.dbConn.Close()
|
|
g.dbConn = nil
|
|
}
|
|
}
|
|
|
|
func (g *defaultOasGenerator) IsStarted() bool {
|
|
return g.started
|
|
}
|
|
|
|
func (g *defaultOasGenerator) runGenerator() {
|
|
// Make []byte channels to receive the data and the meta
|
|
dataChan := make(chan []byte)
|
|
metaChan := make(chan []byte)
|
|
|
|
g.dbMutex.Lock()
|
|
defer g.dbMutex.Unlock()
|
|
logger.Log.Infof("Querying DB for OAS generator with query '%s'", g.entriesQuery)
|
|
if err := g.dbConn.Query("latest", g.entriesQuery, dataChan, metaChan); err != nil {
|
|
logger.Log.Errorf("Query mode call failed: %v", err)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-g.ctx.Done():
|
|
logger.Log.Infof("OAS Generator was canceled")
|
|
close(dataChan)
|
|
close(metaChan)
|
|
return
|
|
|
|
case metaBytes, ok := <-metaChan:
|
|
if !ok {
|
|
logger.Log.Infof("OAS Generator - meta channel closed")
|
|
break
|
|
}
|
|
logger.Log.Debugf("Meta: %s", metaBytes)
|
|
|
|
case dataBytes, ok := <-dataChan:
|
|
if !ok {
|
|
logger.Log.Infof("OAS Generator - entries channel closed")
|
|
break
|
|
}
|
|
|
|
logger.Log.Debugf("Data: %s", dataBytes)
|
|
e := new(api.Entry)
|
|
err := json.Unmarshal(dataBytes, e)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
g.handleEntry(e)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (g *defaultOasGenerator) handleEntry(mizuEntry *api.Entry) {
|
|
if mizuEntry.Protocol.Name == "http" {
|
|
dest := mizuEntry.Destination.Name
|
|
if dest == "" {
|
|
logger.Log.Debugf("OAS: Unresolved entry %d", mizuEntry.Id)
|
|
return
|
|
}
|
|
|
|
entry, err := har.NewEntry(mizuEntry.Request, mizuEntry.Response, mizuEntry.StartTime, mizuEntry.ElapsedTime)
|
|
if err != nil {
|
|
logger.Log.Warningf("Failed to turn MizuEntry %d into HAR Entry: %s", mizuEntry.Id, err)
|
|
return
|
|
}
|
|
|
|
entryWSource := &EntryWithSource{
|
|
Entry: *entry,
|
|
Source: mizuEntry.Source.Name,
|
|
Destination: dest,
|
|
Id: mizuEntry.Id,
|
|
}
|
|
|
|
g.handleHARWithSource(entryWSource)
|
|
} else {
|
|
logger.Log.Debugf("OAS: Unsupported protocol in entry %d: %s", mizuEntry.Id, mizuEntry.Protocol.Name)
|
|
}
|
|
}
|
|
|
|
func (g *defaultOasGenerator) handleHARWithSource(entryWSource *EntryWithSource) {
|
|
entry := entryWSource.Entry
|
|
gen := g.getGen(entryWSource.Destination, entry.Request.URL)
|
|
|
|
opId, err := gen.feedEntry(entryWSource)
|
|
if err != nil {
|
|
txt, suberr := json.Marshal(entry)
|
|
if suberr == nil {
|
|
logger.Log.Debugf("Problematic entry: %s", txt)
|
|
}
|
|
|
|
logger.Log.Warningf("Failed processing entry %d: %s", entryWSource.Id, err)
|
|
return
|
|
}
|
|
|
|
logger.Log.Debugf("Handled entry %s as opId: %s", entryWSource.Id, opId) // TODO: set opId back to entry?
|
|
}
|
|
|
|
func (g *defaultOasGenerator) getGen(dest string, urlStr string) *SpecGen {
|
|
u, err := url.Parse(urlStr)
|
|
if err != nil {
|
|
logger.Log.Errorf("Failed to parse entry URL: %v, err: %v", urlStr, err)
|
|
}
|
|
|
|
val, found := g.serviceSpecs.Load(dest)
|
|
var gen *SpecGen
|
|
if !found {
|
|
gen = NewGen(u.Scheme + "://" + dest)
|
|
g.serviceSpecs.Store(dest, gen)
|
|
} else {
|
|
gen = val.(*SpecGen)
|
|
}
|
|
return gen
|
|
}
|
|
|
|
func (g *defaultOasGenerator) reset() {
|
|
g.serviceSpecs = &sync.Map{}
|
|
}
|
|
|
|
func (g *defaultOasGenerator) GetServiceSpecs() *sync.Map {
|
|
return g.serviceSpecs
|
|
}
|
|
|
|
func (g *defaultOasGenerator) SetEntriesQuery(query string) bool {
|
|
changed := g.entriesQuery != query
|
|
g.entriesQuery = query
|
|
return changed
|
|
}
|
|
|
|
func NewDefaultOasGenerator() *defaultOasGenerator {
|
|
return &defaultOasGenerator{
|
|
started: false,
|
|
ctx: nil,
|
|
cancel: nil,
|
|
serviceSpecs: nil,
|
|
dbConn: nil,
|
|
}
|
|
}
|