diff --git a/agent/main.go b/agent/main.go index 30d455b78..7feea5dbb 100644 --- a/agent/main.go +++ b/agent/main.go @@ -210,7 +210,7 @@ func runInHarReaderMode() { func enableExpFeatureIfNeeded() { if config.Config.OAS { oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator) - oasGenerator.Start(nil) + oasGenerator.Start() } if config.Config.ServiceMap { serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap) diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index afbaff051..37c49ba12 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -18,6 +18,7 @@ import ( "github.com/up9inc/mizu/agent/pkg/holder" "github.com/up9inc/mizu/agent/pkg/providers" + "github.com/up9inc/mizu/agent/pkg/oas" "github.com/up9inc/mizu/agent/pkg/servicemap" "github.com/up9inc/mizu/agent/pkg/resolver" @@ -152,6 +153,9 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink) serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol) + + oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGeneratorSink) + oasGenerator.HandleEntry(mizuEntry) } } diff --git a/agent/pkg/controllers/oas_controller_test.go b/agent/pkg/controllers/oas_controller_test.go index 419956951..0a588c567 100644 --- a/agent/pkg/controllers/oas_controller_test.go +++ b/agent/pkg/controllers/oas_controller_test.go @@ -1,17 +1,12 @@ package controllers import ( - "bytes" - basenine "github.com/up9inc/basenine/client/go" - "net" "net/http/httptest" "testing" - "time" - - "github.com/up9inc/mizu/agent/pkg/dependency" - "github.com/up9inc/mizu/agent/pkg/oas" "github.com/gin-gonic/gin" + "github.com/up9inc/mizu/agent/pkg/dependency" + "github.com/up9inc/mizu/agent/pkg/oas" ) func TestGetOASServers(t *testing.T) { @@ -37,33 +32,14 @@ func TestGetOASSpec(t *testing.T) { t.Logf("Written body: %s", recorder.Body.String()) } -type fakeConn struct { - sendBuffer *bytes.Buffer - receiveBuffer *bytes.Buffer -} - -func (f fakeConn) Read(p []byte) (int, error) { return f.sendBuffer.Read(p) } -func (f fakeConn) Write(p []byte) (int, error) { return f.receiveBuffer.Write(p) } -func (fakeConn) Close() error { return nil } -func (fakeConn) LocalAddr() net.Addr { return nil } -func (fakeConn) RemoteAddr() net.Addr { return nil } -func (fakeConn) SetDeadline(t time.Time) error { return nil } -func (fakeConn) SetReadDeadline(t time.Time) error { return nil } -func (fakeConn) SetWriteDeadline(t time.Time) error { return nil } - func getRecorderAndContext() (*httptest.ResponseRecorder, *gin.Context) { - dummyConn := new(basenine.Connection) - dummyConn.Conn = fakeConn{ - sendBuffer: bytes.NewBufferString("\n"), - receiveBuffer: bytes.NewBufferString("\n"), - } dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() }) recorder := httptest.NewRecorder() c, _ := gin.CreateTestContext(recorder) - oas.GetDefaultOasGeneratorInstance().Start(dummyConn) + oas.GetDefaultOasGeneratorInstance().Start() oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some")) return recorder, c } diff --git a/agent/pkg/oas/oas_generator.go b/agent/pkg/oas/oas_generator.go index a78254802..39bf2f0bc 100644 --- a/agent/pkg/oas/oas_generator.go +++ b/agent/pkg/oas/oas_generator.go @@ -1,14 +1,11 @@ package oas import ( - "context" "encoding/json" "net/url" "sync" - basenine "github.com/up9inc/basenine/client/go" "github.com/up9inc/mizu/agent/pkg/har" - "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/tap/api" "github.com/up9inc/mizu/logger" @@ -19,22 +16,20 @@ var ( instance *defaultOasGenerator ) +type OasGeneratorSink interface { + HandleEntry(mizuEntry *api.Entry) +} + type OasGenerator interface { - Start(conn *basenine.Connection) + Start() Stop() IsStarted() bool GetServiceSpecs() *sync.Map - SetEntriesQuery(query string) bool } type defaultOasGenerator struct { started bool - ctx context.Context - cancel context.CancelFunc serviceSpecs *sync.Map - dbConn *basenine.Connection - dbMutex sync.Mutex - entriesQuery string } func GetDefaultOasGeneratorInstance() *defaultOasGenerator { @@ -45,102 +40,29 @@ func GetDefaultOasGeneratorInstance() *defaultOasGenerator { return instance } -func (g *defaultOasGenerator) Start(conn *basenine.Connection) { - if g.started { - return - } - - if g.dbConn == nil { - if conn == nil { - logger.Log.Infof("Creating new DB connection for OAS generator to address %s:%s", shared.BasenineHost, shared.BaseninePort) - newConn, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort) - if err != nil { - logger.Log.Error("Error connecting to DB for OAS generator, err: %v", err) - return - } - - conn = newConn - } - - g.dbConn = conn - } - - ctx, cancel := context.WithCancel(context.Background()) - g.cancel = cancel - g.ctx = ctx - g.serviceSpecs = &sync.Map{} - +func (g *defaultOasGenerator) Start() { g.started = true - - go g.runGenerator() } func (g *defaultOasGenerator) Stop() { if !g.started { return } + g.started = false - g.cancel() g.reset() - - g.dbMutex.Lock() - defer g.dbMutex.Unlock() - if g.dbConn != nil { - g.dbConn.Close() - g.dbConn = nil - } } func (g *defaultOasGenerator) IsStarted() bool { return g.started } -func (g *defaultOasGenerator) runGenerator() { - // Make []byte channels to receive the data and the meta - dataChan := make(chan []byte) - metaChan := make(chan []byte) - - g.dbMutex.Lock() - defer g.dbMutex.Unlock() - logger.Log.Infof("Querying DB for OAS generator with query '%s'", g.entriesQuery) - if err := g.dbConn.Query("latest", g.entriesQuery, dataChan, metaChan); err != nil { - logger.Log.Errorf("Query mode call failed: %v", err) +func (g *defaultOasGenerator) HandleEntry(mizuEntry *api.Entry) { + if !g.started { + return } - for { - select { - case <-g.ctx.Done(): - logger.Log.Infof("OAS Generator was canceled") - close(dataChan) - close(metaChan) - return - - case metaBytes, ok := <-metaChan: - if !ok { - logger.Log.Infof("OAS Generator - meta channel closed") - break - } - logger.Log.Debugf("Meta: %s", metaBytes) - - case dataBytes, ok := <-dataChan: - if !ok { - logger.Log.Infof("OAS Generator - entries channel closed") - break - } - - logger.Log.Debugf("Data: %s", dataBytes) - e := new(api.Entry) - err := json.Unmarshal(dataBytes, e) - if err != nil { - continue - } - g.handleEntry(e) - } - } -} - -func (g *defaultOasGenerator) handleEntry(mizuEntry *api.Entry) { if mizuEntry.Protocol.Name == "http" { dest := mizuEntry.Destination.Name if dest == "" { @@ -210,18 +132,9 @@ func (g *defaultOasGenerator) GetServiceSpecs() *sync.Map { return g.serviceSpecs } -func (g *defaultOasGenerator) SetEntriesQuery(query string) bool { - changed := g.entriesQuery != query - g.entriesQuery = query - return changed -} - func NewDefaultOasGenerator() *defaultOasGenerator { return &defaultOasGenerator{ started: false, - ctx: nil, - cancel: nil, - serviceSpecs: nil, - dbConn: nil, + serviceSpecs: &sync.Map{}, } } diff --git a/agent/pkg/oas/oas_generator_test.go b/agent/pkg/oas/oas_generator_test.go index 9728286de..b364b4d57 100644 --- a/agent/pkg/oas/oas_generator_test.go +++ b/agent/pkg/oas/oas_generator_test.go @@ -8,7 +8,7 @@ import ( ) func TestOASGen(t *testing.T) { - gen := new(defaultOasGenerator) + gen := GetDefaultOasGeneratorInstance() e := new(har.Entry) err := json.Unmarshal([]byte(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`), e) @@ -21,8 +21,7 @@ func TestOASGen(t *testing.T) { Entry: *e, } - dummyConn := GetFakeDBConn(`{"startedDateTime": "20000101","request": {"url": "https://host/path", "method": "GET"}, "response": {"status": 200}}`) - gen.Start(dummyConn) + gen.Start() gen.handleHARWithSource(ews) g, ok := gen.serviceSpecs.Load("some") if !ok { diff --git a/agent/pkg/oas/specgen_test.go b/agent/pkg/oas/specgen_test.go index cc7b609ee..f9f501b98 100644 --- a/agent/pkg/oas/specgen_test.go +++ b/agent/pkg/oas/specgen_test.go @@ -1,10 +1,8 @@ package oas import ( - "bytes" "encoding/json" "io/ioutil" - "net" "os" "regexp" "strings" @@ -13,22 +11,11 @@ import ( "time" "github.com/chanced/openapi" + "github.com/up9inc/mizu/agent/pkg/har" "github.com/up9inc/mizu/logger" "github.com/wI2L/jsondiff" - - basenine "github.com/up9inc/basenine/client/go" - "github.com/up9inc/mizu/agent/pkg/har" ) -func GetFakeDBConn(send string) *basenine.Connection { - dummyConn := new(basenine.Connection) - dummyConn.Conn = FakeConn{ - sendBuffer: bytes.NewBufferString(send), - receiveBuffer: bytes.NewBufferString(""), - } - return dummyConn -} - // if started via env, write file into subdir func outputSpec(label string, spec *openapi.OpenAPI, t *testing.T) string { content, err := json.MarshalIndent(spec, "", " ") @@ -278,17 +265,3 @@ func TestLoadValid3_1(t *testing.T) { t.FailNow() } } - -type FakeConn struct { - sendBuffer *bytes.Buffer - receiveBuffer *bytes.Buffer -} - -func (f FakeConn) Read(p []byte) (int, error) { return f.sendBuffer.Read(p) } -func (f FakeConn) Write(p []byte) (int, error) { return f.receiveBuffer.Write(p) } -func (FakeConn) Close() error { return nil } -func (FakeConn) LocalAddr() net.Addr { return nil } -func (FakeConn) RemoteAddr() net.Addr { return nil } -func (FakeConn) SetDeadline(t time.Time) error { return nil } -func (FakeConn) SetReadDeadline(t time.Time) error { return nil } -func (FakeConn) SetWriteDeadline(t time.Time) error { return nil }