Fix for OAS reset not working (#978)

This commit is contained in:
RoyUP9 2022-04-07 18:14:03 +03:00 committed by GitHub
parent f344bd2633
commit d8fb8ff710
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 46 additions and 33 deletions

View File

@ -199,7 +199,7 @@ func runInHarReaderMode() {
func enableExpFeatureIfNeeded() { func enableExpFeatureIfNeeded() {
if config.Config.OAS { if config.Config.OAS {
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator) oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
oasGenerator.Start() oasGenerator.Start(nil)
} }
if config.Config.ServiceMap { if config.Config.ServiceMap {
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap) serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
@ -371,7 +371,7 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) {
func initializeDependencies() { func initializeDependencies() {
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() }) dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance(nil) }) dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} }) dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} })
dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} }) dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} })
dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} }) dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} })

View File

@ -58,12 +58,12 @@ func getRecorderAndContext() (*httptest.ResponseRecorder, *gin.Context) {
receiveBuffer: bytes.NewBufferString("\n"), receiveBuffer: bytes.NewBufferString("\n"),
} }
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} {
return oas.GetDefaultOasGeneratorInstance(dummyConn) return oas.GetDefaultOasGeneratorInstance()
}) })
recorder := httptest.NewRecorder() recorder := httptest.NewRecorder()
c, _ := gin.CreateTestContext(recorder) c, _ := gin.CreateTestContext(recorder)
oas.GetDefaultOasGeneratorInstance(dummyConn).Start() oas.GetDefaultOasGeneratorInstance().Start(dummyConn)
oas.GetDefaultOasGeneratorInstance(dummyConn).GetServiceSpecs().Store("some", oas.NewGen("some")) oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some"))
return recorder, c return recorder, c
} }

View File

@ -19,7 +19,7 @@ var (
) )
type OasGenerator interface { type OasGenerator interface {
Start() Start(conn *basenine.Connection)
Stop() Stop()
IsStarted() bool IsStarted() bool
GetServiceSpecs() *sync.Map GetServiceSpecs() *sync.Map
@ -35,23 +35,41 @@ type defaultOasGenerator struct {
entriesQuery string entriesQuery string
} }
func GetDefaultOasGeneratorInstance(conn *basenine.Connection) *defaultOasGenerator { func GetDefaultOasGeneratorInstance() *defaultOasGenerator {
syncOnce.Do(func() { syncOnce.Do(func() {
instance = NewDefaultOasGenerator(conn) instance = NewDefaultOasGenerator()
logger.Log.Debug("OAS Generator Initialized") logger.Log.Debug("OAS Generator Initialized")
}) })
return instance return instance
} }
func (g *defaultOasGenerator) Start() { func (g *defaultOasGenerator) Start(conn *basenine.Connection) {
if g.started { if g.started {
return return
} }
if g.dbConn == nil {
if conn == nil {
logger.Log.Infof("Creating new DB connection for OAS generator to address %s:%s", shared.BasenineHost, shared.BaseninePort)
newConn, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
logger.Log.Error("Error connecting to DB for OAS generator, err: %v", err)
return
}
conn = newConn
}
g.dbConn = conn
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
g.cancel = cancel g.cancel = cancel
g.ctx = ctx g.ctx = ctx
g.serviceSpecs = &sync.Map{} g.serviceSpecs = &sync.Map{}
g.started = true g.started = true
go g.runGenerator() go g.runGenerator()
} }
@ -59,8 +77,15 @@ func (g *defaultOasGenerator) Stop() {
if !g.started { if !g.started {
return return
} }
if g.dbConn != nil {
g.dbConn.Close()
g.dbConn = nil
}
g.cancel() g.cancel()
g.reset() g.reset()
g.started = false g.started = false
} }
@ -69,7 +94,7 @@ func (g *defaultOasGenerator) IsStarted() bool {
} }
func (g *defaultOasGenerator) runGenerator() { func (g *defaultOasGenerator) runGenerator() {
// Make []byte channels to recieve the data and the meta // Make []byte channels to receive the data and the meta
dataChan := make(chan []byte) dataChan := make(chan []byte)
metaChan := make(chan []byte) metaChan := make(chan []byte)
@ -80,6 +105,8 @@ func (g *defaultOasGenerator) runGenerator() {
select { select {
case <-g.ctx.Done(): case <-g.ctx.Done():
logger.Log.Infof("OAS Generator was canceled") logger.Log.Infof("OAS Generator was canceled")
close(dataChan)
close(metaChan)
return return
case metaBytes, ok := <-metaChan: case metaBytes, ok := <-metaChan:
@ -181,21 +208,12 @@ func (g *defaultOasGenerator) SetEntriesQuery(query string) bool {
return changed return changed
} }
func NewDefaultOasGenerator(conn *basenine.Connection) *defaultOasGenerator { func NewDefaultOasGenerator() *defaultOasGenerator {
if conn == nil {
logger.Log.Infof("Creating new DB connection for OAS generator to address %s:%s", shared.BasenineHost, shared.BaseninePort)
newConn, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
panic(err)
}
conn = newConn
}
return &defaultOasGenerator{ return &defaultOasGenerator{
started: false, started: false,
ctx: nil, ctx: nil,
cancel: nil, cancel: nil,
serviceSpecs: nil, serviceSpecs: nil,
dbConn: conn, dbConn: nil,
} }
} }

View File

@ -3,14 +3,11 @@ package oas
import ( import (
"encoding/json" "encoding/json"
"github.com/up9inc/mizu/agent/pkg/har" "github.com/up9inc/mizu/agent/pkg/har"
"sync"
"testing" "testing"
) )
func TestOASGen(t *testing.T) { func TestOASGen(t *testing.T) {
gen := new(defaultOasGenerator) gen := new(defaultOasGenerator)
gen.dbConn = GetFakeDBConn(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`)
gen.serviceSpecs = &sync.Map{}
e := new(har.Entry) e := new(har.Entry)
err := json.Unmarshal([]byte(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`), e) err := json.Unmarshal([]byte(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`), e)
@ -22,7 +19,9 @@ func TestOASGen(t *testing.T) {
Destination: "some", Destination: "some",
Entry: *e, Entry: *e,
} }
gen.Start()
dummyConn := GetFakeDBConn(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`)
gen.Start(dummyConn)
gen.handleHARWithSource(ews) gen.handleHARWithSource(ews)
g, ok := gen.serviceSpecs.Load("some") g, ok := gen.serviceSpecs.Load("some")
if !ok { if !ok {

View File

@ -61,8 +61,7 @@ func TestEntries(t *testing.T) {
t.FailNow() t.FailNow()
} }
dummyConn := GetFakeDBConn("\n") gen := NewDefaultOasGenerator()
gen := NewDefaultOasGenerator(dummyConn)
gen.serviceSpecs = new(sync.Map) gen.serviceSpecs = new(sync.Map)
loadStartingOAS("test_artifacts/catalogue.json", "catalogue", gen.serviceSpecs) loadStartingOAS("test_artifacts/catalogue.json", "catalogue", gen.serviceSpecs)
loadStartingOAS("test_artifacts/trcc.json", "trcc-api-service", gen.serviceSpecs) loadStartingOAS("test_artifacts/trcc.json", "trcc-api-service", gen.serviceSpecs)
@ -136,8 +135,7 @@ func TestEntries(t *testing.T) {
} }
func TestFileSingle(t *testing.T) { func TestFileSingle(t *testing.T) {
dummyConn := GetFakeDBConn("\n") gen := NewDefaultOasGenerator()
gen := NewDefaultOasGenerator(dummyConn)
gen.serviceSpecs = new(sync.Map) gen.serviceSpecs = new(sync.Map)
// loadStartingOAS() // loadStartingOAS()
file := "test_artifacts/params.har" file := "test_artifacts/params.har"
@ -227,8 +225,7 @@ func loadStartingOAS(file string, label string, specs *sync.Map) {
} }
func TestEntriesNegative(t *testing.T) { func TestEntriesNegative(t *testing.T) {
dummyConn := GetFakeDBConn("\n") gen := NewDefaultOasGenerator()
gen := NewDefaultOasGenerator(dummyConn)
gen.serviceSpecs = new(sync.Map) gen.serviceSpecs = new(sync.Map)
files := []string{"invalid"} files := []string{"invalid"}
_, err := feedEntries(files, false, gen) _, err := feedEntries(files, false, gen)
@ -239,8 +236,7 @@ func TestEntriesNegative(t *testing.T) {
} }
func TestEntriesPositive(t *testing.T) { func TestEntriesPositive(t *testing.T) {
dummyConn := GetFakeDBConn("\n") gen := NewDefaultOasGenerator()
gen := NewDefaultOasGenerator(dummyConn)
gen.serviceSpecs = new(sync.Map) gen.serviceSpecs = new(sync.Map)
files := []string{"test_artifacts/params.har"} files := []string{"test_artifacts/params.har"}
_, err := feedEntries(files, false, gen) _, err := feedEntries(files, false, gen)