diff --git a/agent/main.go b/agent/main.go index 07b7bd16a..2dc82400c 100644 --- a/agent/main.go +++ b/agent/main.go @@ -373,6 +373,7 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) { func initializeDependencies() { dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() }) dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() }) + dependency.RegisterGenerator(dependency.EntriesInserter, func() interface{} { return api.GetBasenineEntryInserterInstance() }) dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} }) dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} }) dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} }) diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index ad15f54e2..e8ff72861 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -25,10 +25,7 @@ import ( "github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/logger" - "github.com/up9inc/mizu/shared" tapApi "github.com/up9inc/mizu/tap/api" - - basenine "github.com/up9inc/basenine/client/go" ) var k8sResolver *resolver.Resolver @@ -103,20 +100,6 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension panic("Channel of captured messages is nil") } -BasenineReconnect: - connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort) - if err != nil { - logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err) - time.Sleep(shared.BasenineReconnectInterval * time.Second) - goto BasenineReconnect - } - if err = connection.InsertMode(); err != nil { - logger.Log.Errorf("Insert mode call failed: %v", err) - connection.Close() - time.Sleep(shared.BasenineReconnectInterval * time.Second) - goto BasenineReconnect - } - disableOASValidation := false ctx := context.Background() doc, contractContent, router, err := loadOAS(ctx) @@ -163,11 +146,9 @@ BasenineReconnect: providers.EntryAdded(len(data)) - if err = connection.SendText(string(data)); err != nil { - logger.Log.Errorf("An error occured while inserting a new record to database: %v", err) - connection.Close() - time.Sleep(shared.BasenineReconnectInterval * time.Second) - goto BasenineReconnect + entryInserter := dependency.GetInstance(dependency.EntriesInserter).(EntryInserter) + if err := entryInserter.Insert(mizuEntry); err != nil { + logger.Log.Errorf("Error inserting entry, err: %v", err) } serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink) diff --git a/agent/pkg/api/socket_data_inserter.go b/agent/pkg/api/socket_data_inserter.go new file mode 100644 index 000000000..f49719ace --- /dev/null +++ b/agent/pkg/api/socket_data_inserter.go @@ -0,0 +1,71 @@ +package api + +import ( + "encoding/json" + "fmt" + basenine "github.com/up9inc/basenine/client/go" + "github.com/up9inc/mizu/logger" + "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/tap/api" + "sync" + "time" +) + +type EntryInserter interface { + Insert(entry *api.Entry) error +} + +type BasenineEntryInserter struct { + connection *basenine.Connection +} + +var instance *BasenineEntryInserter +var once sync.Once + +func GetBasenineEntryInserterInstance() *BasenineEntryInserter { + once.Do(func() { + instance = &BasenineEntryInserter{} + }) + + return instance +} + +func (e *BasenineEntryInserter) Insert(entry *api.Entry) error { + if e.connection == nil { + e.connection = initializeConnection() + } + + data, err := json.Marshal(entry) + if err != nil { + return fmt.Errorf("error marshling entry, err: %v", err) + } + + if err := e.connection.SendText(string(data)); err != nil { + e.connection.Close() + e.connection = nil + + return fmt.Errorf("error sending text to database, err: %v", err) + } + + return nil +} + +func initializeConnection() *basenine.Connection{ + for { + connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort) + if err != nil { + logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err) + time.Sleep(shared.BasenineReconnectInterval * time.Second) + continue + } + + if err = connection.InsertMode(); err != nil { + logger.Log.Errorf("Insert mode call failed: %v", err) + connection.Close() + time.Sleep(shared.BasenineReconnectInterval * time.Second) + continue + } + + return connection + } +} diff --git a/agent/pkg/dependency/type_names.go b/agent/pkg/dependency/type_names.go index c7e91decb..811900cb3 100644 --- a/agent/pkg/dependency/type_names.go +++ b/agent/pkg/dependency/type_names.go @@ -5,6 +5,7 @@ type DependencyContainerType string const ( ServiceMapGeneratorDependency = "ServiceMapGeneratorDependency" OasGeneratorDependency = "OasGeneratorDependency" + EntriesInserter = "EntriesInserter" EntriesProvider = "EntriesProvider" EntriesSocketStreamer = "EntriesSocketStreamer" EntryStreamerSocketConnector = "EntryStreamerSocketConnector"