diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index 478c4b55e..83e1c98dc 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -103,12 +103,18 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension panic("Channel of captured messages is nil") } +BasenineReconnect: connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort) if err != nil { - logger.Log.Panicf("Can't establish a new connection to Basenine server: %v", err) + logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err) + time.Sleep(shared.BasenineReconnectInterval * time.Second) + goto BasenineReconnect } if err = connection.InsertMode(); err != nil { - logger.Log.Panicf("Insert mode call failed: %v", err) + logger.Log.Errorf("Insert mode call failed: %v", err) + connection.Close() + time.Sleep(shared.BasenineReconnectInterval * time.Second) + goto BasenineReconnect } disableOASValidation := false @@ -128,13 +134,13 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension var httpPair tapApi.HTTPRequestResponsePair if err := json.Unmarshal([]byte(mizuEntry.HTTPPair), &httpPair); err != nil { logger.Log.Error(err) + } else { + contract := handleOAS(ctx, doc, router, httpPair.Request.Payload.RawRequest, httpPair.Response.Payload.RawResponse, contractContent) + mizuEntry.ContractStatus = contract.Status + mizuEntry.ContractRequestReason = contract.RequestReason + mizuEntry.ContractResponseReason = contract.ResponseReason + mizuEntry.ContractContent = contract.Content } - - contract := handleOAS(ctx, doc, router, httpPair.Request.Payload.RawRequest, httpPair.Response.Payload.RawResponse, contractContent) - mizuEntry.ContractStatus = contract.Status - mizuEntry.ContractRequestReason = contract.RequestReason - mizuEntry.ContractResponseReason = contract.ResponseReason - mizuEntry.ContractContent = contract.Content } harEntry, err := har.NewEntry(mizuEntry.Request, mizuEntry.Response, mizuEntry.StartTime, mizuEntry.ElapsedTime) @@ -146,13 +152,17 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension data, err := json.Marshal(mizuEntry) if err != nil { - panic(err) + logger.Log.Errorf("Error while marshaling entry: %v", err) + continue } providers.EntryAdded(len(data)) if err = connection.SendText(string(data)); err != nil { - logger.Log.Panicf("An error occured while inserting a new record to database: %v", err) + logger.Log.Errorf("An error occured while inserting a new record to database: %v", err) + connection.Close() + time.Sleep(shared.BasenineReconnectInterval * time.Second) + goto BasenineReconnect } serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink) diff --git a/agent/pkg/api/socket_data_streamer.go b/agent/pkg/api/socket_data_streamer.go index a981b5449..e958dcaf0 100644 --- a/agent/pkg/api/socket_data_streamer.go +++ b/agent/pkg/api/socket_data_streamer.go @@ -24,7 +24,7 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort) if err != nil { - logger.Log.Errorf("failed to establish a connection to Basenine: %v", err) + logger.Log.Errorf("Failed to establish a connection to Basenine: %v", err) entryStreamerSocketConnector.CleanupSocket(socketId) return err } @@ -80,7 +80,9 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W go handleMetaChannel(connection, meta) if err = connection.Query(query, data, meta); err != nil { - logger.Log.Panicf("Query mode call failed: %v", err) + logger.Log.Errorf("Query mode call failed: %v", err) + entryStreamerSocketConnector.CleanupSocket(socketId) + return err } go func() { diff --git a/agent/pkg/up9/main.go b/agent/pkg/up9/main.go index badd67183..7fe8bb7e5 100644 --- a/agent/pkg/up9/main.go +++ b/agent/pkg/up9/main.go @@ -211,11 +211,15 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva logger.Log.Infof("Getting entries from the database") +BasenineReconnect: var connection *basenine.Connection var err error connection, err = basenine.NewConnection(shared.BasenineHost, shared.BaseninePort) if err != nil { - panic(err) + logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err) + connection.Close() + time.Sleep(shared.BasenineReconnectInterval * time.Second) + goto BasenineReconnect } data := make(chan []byte) @@ -324,7 +328,10 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva wg.Add(2) if err = connection.Query(query, data, meta); err != nil { - logger.Log.Panicf("Query mode call failed: %v", err) + logger.Log.Errorf("Query mode call failed: %v", err) + connection.Close() + time.Sleep(shared.BasenineReconnectInterval * time.Second) + goto BasenineReconnect } wg.Wait() diff --git a/shared/consts.go b/shared/consts.go index b2c94ffa5..0652f56d9 100644 --- a/shared/consts.go +++ b/shared/consts.go @@ -1,19 +1,20 @@ package shared const ( - MizuFilteringOptionsEnvVar = "SENSITIVE_DATA_FILTERING_OPTIONS" - SyncEntriesConfigEnvVar = "SYNC_ENTRIES_CONFIG" - HostModeEnvVar = "HOST_MODE" - NodeNameEnvVar = "NODE_NAME" - ConfigDirPath = "/app/config/" - DataDirPath = "/app/data/" - ValidationRulesFileName = "validation-rules.yaml" - ContractFileName = "contract-oas.yaml" - ConfigFileName = "mizu-config.json" - GoGCEnvVar = "GOGC" - DefaultApiServerPort = 8899 - LogLevelEnvVar = "LOG_LEVEL" - MizuAgentImageRepo = "docker.io/up9inc/mizu" - BasenineHost = "127.0.0.1" - BaseninePort = "9099" + MizuFilteringOptionsEnvVar = "SENSITIVE_DATA_FILTERING_OPTIONS" + SyncEntriesConfigEnvVar = "SYNC_ENTRIES_CONFIG" + HostModeEnvVar = "HOST_MODE" + NodeNameEnvVar = "NODE_NAME" + ConfigDirPath = "/app/config/" + DataDirPath = "/app/data/" + ValidationRulesFileName = "validation-rules.yaml" + ContractFileName = "contract-oas.yaml" + ConfigFileName = "mizu-config.json" + GoGCEnvVar = "GOGC" + DefaultApiServerPort = 8899 + LogLevelEnvVar = "LOG_LEVEL" + MizuAgentImageRepo = "docker.io/up9inc/mizu" + BasenineHost = "127.0.0.1" + BaseninePort = "9099" + BasenineReconnectInterval = 3 )