mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-19 18:54:07 +00:00
Extracted insert to database functionality (#1082)
This commit is contained in:
parent
0437586908
commit
09a0fca2c2
@ -373,6 +373,7 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) {
|
|||||||
func initializeDependencies() {
|
func initializeDependencies() {
|
||||||
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
|
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
|
||||||
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
|
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
|
||||||
|
dependency.RegisterGenerator(dependency.EntriesInserter, func() interface{} { return api.GetBasenineEntryInserterInstance() })
|
||||||
dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} })
|
dependency.RegisterGenerator(dependency.EntriesProvider, func() interface{} { return &entries.BasenineEntriesProvider{} })
|
||||||
dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} })
|
dependency.RegisterGenerator(dependency.EntriesSocketStreamer, func() interface{} { return &api.BasenineEntryStreamer{} })
|
||||||
dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} })
|
dependency.RegisterGenerator(dependency.EntryStreamerSocketConnector, func() interface{} { return &api.DefaultEntryStreamerSocketConnector{} })
|
||||||
|
@ -25,10 +25,7 @@ import (
|
|||||||
"github.com/up9inc/mizu/agent/pkg/utils"
|
"github.com/up9inc/mizu/agent/pkg/utils"
|
||||||
|
|
||||||
"github.com/up9inc/mizu/logger"
|
"github.com/up9inc/mizu/logger"
|
||||||
"github.com/up9inc/mizu/shared"
|
|
||||||
tapApi "github.com/up9inc/mizu/tap/api"
|
tapApi "github.com/up9inc/mizu/tap/api"
|
||||||
|
|
||||||
basenine "github.com/up9inc/basenine/client/go"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var k8sResolver *resolver.Resolver
|
var k8sResolver *resolver.Resolver
|
||||||
@ -103,20 +100,6 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
|||||||
panic("Channel of captured messages is nil")
|
panic("Channel of captured messages is nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
BasenineReconnect:
|
|
||||||
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
|
||||||
if err != nil {
|
|
||||||
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
|
|
||||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
|
||||||
goto BasenineReconnect
|
|
||||||
}
|
|
||||||
if err = connection.InsertMode(); err != nil {
|
|
||||||
logger.Log.Errorf("Insert mode call failed: %v", err)
|
|
||||||
connection.Close()
|
|
||||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
|
||||||
goto BasenineReconnect
|
|
||||||
}
|
|
||||||
|
|
||||||
disableOASValidation := false
|
disableOASValidation := false
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
doc, contractContent, router, err := loadOAS(ctx)
|
doc, contractContent, router, err := loadOAS(ctx)
|
||||||
@ -163,11 +146,9 @@ BasenineReconnect:
|
|||||||
|
|
||||||
providers.EntryAdded(len(data))
|
providers.EntryAdded(len(data))
|
||||||
|
|
||||||
if err = connection.SendText(string(data)); err != nil {
|
entryInserter := dependency.GetInstance(dependency.EntriesInserter).(EntryInserter)
|
||||||
logger.Log.Errorf("An error occured while inserting a new record to database: %v", err)
|
if err := entryInserter.Insert(mizuEntry); err != nil {
|
||||||
connection.Close()
|
logger.Log.Errorf("Error inserting entry, err: %v", err)
|
||||||
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
|
||||||
goto BasenineReconnect
|
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
|
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
|
||||||
|
71
agent/pkg/api/socket_data_inserter.go
Normal file
71
agent/pkg/api/socket_data_inserter.go
Normal file
@ -0,0 +1,71 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
basenine "github.com/up9inc/basenine/client/go"
|
||||||
|
"github.com/up9inc/mizu/logger"
|
||||||
|
"github.com/up9inc/mizu/shared"
|
||||||
|
"github.com/up9inc/mizu/tap/api"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type EntryInserter interface {
|
||||||
|
Insert(entry *api.Entry) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type BasenineEntryInserter struct {
|
||||||
|
connection *basenine.Connection
|
||||||
|
}
|
||||||
|
|
||||||
|
var instance *BasenineEntryInserter
|
||||||
|
var once sync.Once
|
||||||
|
|
||||||
|
func GetBasenineEntryInserterInstance() *BasenineEntryInserter {
|
||||||
|
once.Do(func() {
|
||||||
|
instance = &BasenineEntryInserter{}
|
||||||
|
})
|
||||||
|
|
||||||
|
return instance
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *BasenineEntryInserter) Insert(entry *api.Entry) error {
|
||||||
|
if e.connection == nil {
|
||||||
|
e.connection = initializeConnection()
|
||||||
|
}
|
||||||
|
|
||||||
|
data, err := json.Marshal(entry)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error marshling entry, err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.connection.SendText(string(data)); err != nil {
|
||||||
|
e.connection.Close()
|
||||||
|
e.connection = nil
|
||||||
|
|
||||||
|
return fmt.Errorf("error sending text to database, err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func initializeConnection() *basenine.Connection{
|
||||||
|
for {
|
||||||
|
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
|
||||||
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = connection.InsertMode(); err != nil {
|
||||||
|
logger.Log.Errorf("Insert mode call failed: %v", err)
|
||||||
|
connection.Close()
|
||||||
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return connection
|
||||||
|
}
|
||||||
|
}
|
@ -5,6 +5,7 @@ type DependencyContainerType string
|
|||||||
const (
|
const (
|
||||||
ServiceMapGeneratorDependency = "ServiceMapGeneratorDependency"
|
ServiceMapGeneratorDependency = "ServiceMapGeneratorDependency"
|
||||||
OasGeneratorDependency = "OasGeneratorDependency"
|
OasGeneratorDependency = "OasGeneratorDependency"
|
||||||
|
EntriesInserter = "EntriesInserter"
|
||||||
EntriesProvider = "EntriesProvider"
|
EntriesProvider = "EntriesProvider"
|
||||||
EntriesSocketStreamer = "EntriesSocketStreamer"
|
EntriesSocketStreamer = "EntriesSocketStreamer"
|
||||||
EntryStreamerSocketConnector = "EntryStreamerSocketConnector"
|
EntryStreamerSocketConnector = "EntryStreamerSocketConnector"
|
||||||
|
Loading…
Reference in New Issue
Block a user