mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-25 03:49:12 +00:00
108 lines
2.2 KiB
Go
108 lines
2.2 KiB
Go
package oas
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"github.com/google/martian/har"
|
|
"github.com/up9inc/mizu/shared/logger"
|
|
"net/url"
|
|
"sync"
|
|
)
|
|
|
|
var (
|
|
syncOnce sync.Once
|
|
instance *oasGenerator
|
|
)
|
|
|
|
func GetOasGeneratorInstance() *oasGenerator {
|
|
syncOnce.Do(func() {
|
|
instance = newOasGenerator()
|
|
logger.Log.Debug("Oas Generator Initialized")
|
|
})
|
|
return instance
|
|
}
|
|
|
|
func (g *oasGenerator) Start() {
|
|
if g.started {
|
|
return
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
g.cancel = cancel
|
|
g.ctx = ctx
|
|
g.entriesChan = make(chan har.Entry, 100) // buffer up to 100 entries for OAS processing
|
|
g.ServiceSpecs = &sync.Map{}
|
|
g.started = true
|
|
go instance.runGeneretor()
|
|
}
|
|
|
|
func (g *oasGenerator) runGeneretor() {
|
|
for {
|
|
select {
|
|
case <-g.ctx.Done():
|
|
logger.Log.Infof("OAS Generator was canceled")
|
|
return
|
|
|
|
case entry, ok := <-g.entriesChan:
|
|
if !ok {
|
|
logger.Log.Infof("OAS Generator - entries channel closed")
|
|
break
|
|
}
|
|
u, err := url.Parse(entry.Request.URL)
|
|
if err != nil {
|
|
logger.Log.Errorf("Failed to parse entry URL: %v, err: %v", entry.Request.URL, err)
|
|
}
|
|
|
|
val, found := g.ServiceSpecs.Load(u.Host)
|
|
var gen *SpecGen
|
|
if !found {
|
|
gen = NewGen(u.Scheme + "://" + u.Host)
|
|
g.ServiceSpecs.Store(u.Host, gen)
|
|
} else {
|
|
gen = val.(*SpecGen)
|
|
}
|
|
|
|
opId, err := gen.feedEntry(entry)
|
|
if err != nil {
|
|
txt, suberr := json.Marshal(entry)
|
|
if suberr == nil {
|
|
logger.Log.Debugf("Problematic entry: %s", txt)
|
|
}
|
|
|
|
logger.Log.Warningf("Failed processing entry: %s", err)
|
|
continue
|
|
}
|
|
|
|
logger.Log.Debugf("Handled entry %s as opId: %s", entry.Request.URL, opId) // TODO: set opId back to entry?
|
|
}
|
|
}
|
|
}
|
|
|
|
func (g *oasGenerator) PushEntry(entry *har.Entry) {
|
|
if !g.started {
|
|
return
|
|
}
|
|
select {
|
|
case g.entriesChan <- *entry:
|
|
default:
|
|
logger.Log.Warningf("OAS Generator - entry wasn't sent to channel because the channel has no buffer or there is no receiver")
|
|
}
|
|
}
|
|
|
|
func newOasGenerator() *oasGenerator {
|
|
return &oasGenerator{
|
|
started: false,
|
|
ctx: nil,
|
|
cancel: nil,
|
|
ServiceSpecs: nil,
|
|
entriesChan: nil,
|
|
}
|
|
}
|
|
|
|
type oasGenerator struct {
|
|
started bool
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
ServiceSpecs *sync.Map
|
|
entriesChan chan har.Entry
|
|
}
|