mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-28 17:19:44 +00:00
TRA-4501 Don't panic in case of a Basenine connection error and try to reconnect with 3 seconds intervals (#1012)
* Don't panic in case of a Basenine connection error and try to reconnect with 3 seconds intervals * More improvements
This commit is contained in:
parent
d7fcf273c0
commit
c5006e5f57
@ -103,12 +103,18 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
|||||||
panic("Channel of captured messages is nil")
|
panic("Channel of captured messages is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BasenineReconnect:
|
||||||
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Log.Panicf("Can't establish a new connection to Basenine server: %v", err)
|
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
|
||||||
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||||
|
goto BasenineReconnect
|
||||||
}
|
}
|
||||||
if err = connection.InsertMode(); err != nil {
|
if err = connection.InsertMode(); err != nil {
|
||||||
logger.Log.Panicf("Insert mode call failed: %v", err)
|
logger.Log.Errorf("Insert mode call failed: %v", err)
|
||||||
|
connection.Close()
|
||||||
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||||
|
goto BasenineReconnect
|
||||||
}
|
}
|
||||||
|
|
||||||
disableOASValidation := false
|
disableOASValidation := false
|
||||||
@ -128,14 +134,14 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
|||||||
var httpPair tapApi.HTTPRequestResponsePair
|
var httpPair tapApi.HTTPRequestResponsePair
|
||||||
if err := json.Unmarshal([]byte(mizuEntry.HTTPPair), &httpPair); err != nil {
|
if err := json.Unmarshal([]byte(mizuEntry.HTTPPair), &httpPair); err != nil {
|
||||||
logger.Log.Error(err)
|
logger.Log.Error(err)
|
||||||
}
|
} else {
|
||||||
|
|
||||||
contract := handleOAS(ctx, doc, router, httpPair.Request.Payload.RawRequest, httpPair.Response.Payload.RawResponse, contractContent)
|
contract := handleOAS(ctx, doc, router, httpPair.Request.Payload.RawRequest, httpPair.Response.Payload.RawResponse, contractContent)
|
||||||
mizuEntry.ContractStatus = contract.Status
|
mizuEntry.ContractStatus = contract.Status
|
||||||
mizuEntry.ContractRequestReason = contract.RequestReason
|
mizuEntry.ContractRequestReason = contract.RequestReason
|
||||||
mizuEntry.ContractResponseReason = contract.ResponseReason
|
mizuEntry.ContractResponseReason = contract.ResponseReason
|
||||||
mizuEntry.ContractContent = contract.Content
|
mizuEntry.ContractContent = contract.Content
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
harEntry, err := har.NewEntry(mizuEntry.Request, mizuEntry.Response, mizuEntry.StartTime, mizuEntry.ElapsedTime)
|
harEntry, err := har.NewEntry(mizuEntry.Request, mizuEntry.Response, mizuEntry.StartTime, mizuEntry.ElapsedTime)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
@ -146,13 +152,17 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
|||||||
|
|
||||||
data, err := json.Marshal(mizuEntry)
|
data, err := json.Marshal(mizuEntry)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
logger.Log.Errorf("Error while marshaling entry: %v", err)
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
providers.EntryAdded(len(data))
|
providers.EntryAdded(len(data))
|
||||||
|
|
||||||
if err = connection.SendText(string(data)); err != nil {
|
if err = connection.SendText(string(data)); err != nil {
|
||||||
logger.Log.Panicf("An error occured while inserting a new record to database: %v", err)
|
logger.Log.Errorf("An error occured while inserting a new record to database: %v", err)
|
||||||
|
connection.Close()
|
||||||
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||||
|
goto BasenineReconnect
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
|
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
|
||||||
|
@ -24,7 +24,7 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
|
|||||||
|
|
||||||
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Log.Errorf("failed to establish a connection to Basenine: %v", err)
|
logger.Log.Errorf("Failed to establish a connection to Basenine: %v", err)
|
||||||
entryStreamerSocketConnector.CleanupSocket(socketId)
|
entryStreamerSocketConnector.CleanupSocket(socketId)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -80,7 +80,9 @@ func (e *BasenineEntryStreamer) Get(ctx context.Context, socketId int, params *W
|
|||||||
go handleMetaChannel(connection, meta)
|
go handleMetaChannel(connection, meta)
|
||||||
|
|
||||||
if err = connection.Query(query, data, meta); err != nil {
|
if err = connection.Query(query, data, meta); err != nil {
|
||||||
logger.Log.Panicf("Query mode call failed: %v", err)
|
logger.Log.Errorf("Query mode call failed: %v", err)
|
||||||
|
entryStreamerSocketConnector.CleanupSocket(socketId)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -211,11 +211,15 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
|||||||
|
|
||||||
logger.Log.Infof("Getting entries from the database")
|
logger.Log.Infof("Getting entries from the database")
|
||||||
|
|
||||||
|
BasenineReconnect:
|
||||||
var connection *basenine.Connection
|
var connection *basenine.Connection
|
||||||
var err error
|
var err error
|
||||||
connection, err = basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
connection, err = basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(err)
|
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
|
||||||
|
connection.Close()
|
||||||
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||||
|
goto BasenineReconnect
|
||||||
}
|
}
|
||||||
|
|
||||||
data := make(chan []byte)
|
data := make(chan []byte)
|
||||||
@ -324,7 +328,10 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
|||||||
wg.Add(2)
|
wg.Add(2)
|
||||||
|
|
||||||
if err = connection.Query(query, data, meta); err != nil {
|
if err = connection.Query(query, data, meta); err != nil {
|
||||||
logger.Log.Panicf("Query mode call failed: %v", err)
|
logger.Log.Errorf("Query mode call failed: %v", err)
|
||||||
|
connection.Close()
|
||||||
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||||
|
goto BasenineReconnect
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
@ -16,4 +16,5 @@ const (
|
|||||||
MizuAgentImageRepo = "docker.io/up9inc/mizu"
|
MizuAgentImageRepo = "docker.io/up9inc/mizu"
|
||||||
BasenineHost = "127.0.0.1"
|
BasenineHost = "127.0.0.1"
|
||||||
BaseninePort = "9099"
|
BaseninePort = "9099"
|
||||||
|
BasenineReconnectInterval = 3
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user