mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-21 19:43:58 +00:00
Dependency injection for oas servicemap (#895)
* Update main.go, main.go, and 3 more files... * WIP * Update main.go, oas_controller.go, and 3 more files... * Update main.go, oas_generator.go, and servicemap.go * Update loader.go and resolver.go * Update oas_generator.go * Update oas_generator.go, specgen_test.go, and 3 more files... * Update service_map_controller_test.go * Update oas_controller_test.go
This commit is contained in:
parent
9430e291b4
commit
237002ef29
@ -16,6 +16,7 @@ import (
|
||||
|
||||
"github.com/gin-contrib/static"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/elastic"
|
||||
"github.com/up9inc/mizu/agent/pkg/middlewares"
|
||||
"github.com/up9inc/mizu/agent/pkg/models"
|
||||
@ -55,6 +56,7 @@ const (
|
||||
)
|
||||
|
||||
func main() {
|
||||
initializeDependencies()
|
||||
logLevel := determineLogLevel()
|
||||
logger.InitLoggerStd(logLevel)
|
||||
flag.Parse()
|
||||
@ -203,10 +205,12 @@ func runInHarReaderMode() {
|
||||
|
||||
func enableExpFeatureIfNeeded() {
|
||||
if config.Config.OAS {
|
||||
oas.GetOasGeneratorInstance().Start()
|
||||
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
|
||||
oasGenerator.Start()
|
||||
}
|
||||
if config.Config.ServiceMap {
|
||||
servicemap.GetInstance().Enable()
|
||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
|
||||
serviceMapGenerator.Enable()
|
||||
}
|
||||
elastic.GetInstance().Configure(config.Config.Elastic)
|
||||
}
|
||||
@ -385,3 +389,8 @@ func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func initializeDependencies() {
|
||||
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
|
||||
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
|
||||
}
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/elastic"
|
||||
"github.com/up9inc/mizu/agent/pkg/har"
|
||||
"github.com/up9inc/mizu/agent/pkg/holder"
|
||||
@ -151,7 +152,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
entryWSource.Destination = mizuEntry.Destination.IP + ":" + mizuEntry.Destination.Port
|
||||
}
|
||||
|
||||
oas.GetOasGeneratorInstance().PushEntry(&entryWSource)
|
||||
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGeneratorSink)
|
||||
oasGenerator.PushEntry(&entryWSource)
|
||||
}
|
||||
|
||||
data, err := json.Marshal(mizuEntry)
|
||||
@ -163,7 +165,9 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
||||
|
||||
connection.SendText(string(data))
|
||||
|
||||
servicemap.GetInstance().NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
|
||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMapSink)
|
||||
serviceMapGenerator.NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
|
||||
|
||||
elastic.GetInstance().PushEntry(mizuEntry)
|
||||
}
|
||||
}
|
||||
|
@ -5,13 +5,15 @@ import (
|
||||
|
||||
"github.com/chanced/openapi"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/oas"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
func GetOASServers(c *gin.Context) {
|
||||
m := make([]string, 0)
|
||||
oas.GetOasGeneratorInstance().ServiceSpecs.Range(func(key, value interface{}) bool {
|
||||
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
|
||||
oasGenerator.GetServiceSpecs().Range(func(key, value interface{}) bool {
|
||||
m = append(m, key.(string))
|
||||
return true
|
||||
})
|
||||
@ -20,7 +22,8 @@ func GetOASServers(c *gin.Context) {
|
||||
}
|
||||
|
||||
func GetOASSpec(c *gin.Context) {
|
||||
res, ok := oas.GetOasGeneratorInstance().ServiceSpecs.Load(c.Param("id"))
|
||||
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
|
||||
res, ok := oasGenerator.GetServiceSpecs().Load(c.Param("id"))
|
||||
if !ok {
|
||||
c.JSON(http.StatusNotFound, gin.H{
|
||||
"error": true,
|
||||
@ -48,7 +51,9 @@ func GetOASSpec(c *gin.Context) {
|
||||
|
||||
func GetOASAllSpecs(c *gin.Context) {
|
||||
res := map[string]*openapi.OpenAPI{}
|
||||
oas.GetOasGeneratorInstance().ServiceSpecs.Range(func(key, value interface{}) bool {
|
||||
|
||||
oasGenerator := dependency.GetInstance(dependency.OasGeneratorDependency).(oas.OasGenerator)
|
||||
oasGenerator.GetServiceSpecs().Range(func(key, value interface{}) bool {
|
||||
svc := key.(string)
|
||||
gen := value.(*oas.SpecGen)
|
||||
spec, err := gen.GetSpec()
|
||||
|
@ -4,36 +4,43 @@ import (
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/oas"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
)
|
||||
|
||||
func TestGetOASServers(t *testing.T) {
|
||||
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(recorder)
|
||||
oas.GetOasGeneratorInstance().Start()
|
||||
oas.GetOasGeneratorInstance().ServiceSpecs.Store("some", oas.NewGen("some"))
|
||||
oas.GetDefaultOasGeneratorInstance().Start()
|
||||
oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some"))
|
||||
|
||||
GetOASServers(c)
|
||||
t.Logf("Written body: %s", recorder.Body.String())
|
||||
}
|
||||
|
||||
func TestGetOASAllSpecs(t *testing.T) {
|
||||
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(recorder)
|
||||
oas.GetOasGeneratorInstance().Start()
|
||||
oas.GetOasGeneratorInstance().ServiceSpecs.Store("some", oas.NewGen("some"))
|
||||
oas.GetDefaultOasGeneratorInstance().Start()
|
||||
oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some"))
|
||||
|
||||
GetOASAllSpecs(c)
|
||||
t.Logf("Written body: %s", recorder.Body.String())
|
||||
}
|
||||
|
||||
func TestGetOASSpec(t *testing.T) {
|
||||
dependency.RegisterGenerator(dependency.OasGeneratorDependency, func() interface{} { return oas.GetDefaultOasGeneratorInstance() })
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
c, _ := gin.CreateTestContext(recorder)
|
||||
oas.GetOasGeneratorInstance().Start()
|
||||
oas.GetOasGeneratorInstance().ServiceSpecs.Store("some", oas.NewGen("some"))
|
||||
oas.GetDefaultOasGeneratorInstance().Start()
|
||||
oas.GetDefaultOasGeneratorInstance().GetServiceSpecs().Store("some", oas.NewGen("some"))
|
||||
|
||||
c.Params = []gin.Param{{Key: "id", Value: "some"}}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package controllers
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/servicemap"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@ -13,8 +14,9 @@ type ServiceMapController struct {
|
||||
}
|
||||
|
||||
func NewServiceMapController() *ServiceMapController {
|
||||
serviceMapGenerator := dependency.GetInstance(dependency.ServiceMapGeneratorDependency).(servicemap.ServiceMap)
|
||||
return &ServiceMapController{
|
||||
service: servicemap.GetInstance(),
|
||||
service: serviceMapGenerator,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -7,6 +7,7 @@ import (
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/dependency"
|
||||
"github.com/up9inc/mizu/agent/pkg/servicemap"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
@ -57,9 +58,11 @@ type ServiceMapControllerSuite struct {
|
||||
}
|
||||
|
||||
func (s *ServiceMapControllerSuite) SetupTest() {
|
||||
dependency.RegisterGenerator(dependency.ServiceMapGeneratorDependency, func() interface{} { return servicemap.GetDefaultServiceMapInstance() })
|
||||
|
||||
s.c = NewServiceMapController()
|
||||
s.c.service.Enable()
|
||||
s.c.service.NewTCPEntry(TCPEntryA, TCPEntryB, ProtocolHttp)
|
||||
s.c.service.(servicemap.ServiceMapSink).NewTCPEntry(TCPEntryA, TCPEntryB, ProtocolHttp)
|
||||
|
||||
s.w = httptest.NewRecorder()
|
||||
s.g, _ = gin.CreateTestContext(s.w)
|
||||
|
11
agent/pkg/dependency/container.go
Normal file
11
agent/pkg/dependency/container.go
Normal file
@ -0,0 +1,11 @@
|
||||
package dependency
|
||||
|
||||
var typeIntializerMap = make(map[DependencyContainerType]func() interface{}, 0)
|
||||
|
||||
func RegisterGenerator(name DependencyContainerType, fn func() interface{}) {
|
||||
typeIntializerMap[name] = fn
|
||||
}
|
||||
|
||||
func GetInstance(name DependencyContainerType) interface{} {
|
||||
return typeIntializerMap[name]()
|
||||
}
|
8
agent/pkg/dependency/type_names.go
Normal file
8
agent/pkg/dependency/type_names.go
Normal file
@ -0,0 +1,8 @@
|
||||
package dependency
|
||||
|
||||
type DependencyContainerType string
|
||||
|
||||
const (
|
||||
ServiceMapGeneratorDependency = "ServiceMapGeneratorDependency"
|
||||
OasGeneratorDependency = "OasGeneratorDependency"
|
||||
)
|
@ -147,9 +147,9 @@ func feedEntry(entry *har.Entry, source string, isSync bool, file string) {
|
||||
|
||||
ews := EntryWithSource{Entry: *entry, Source: source, Destination: u.Host, Id: uint(0)}
|
||||
if isSync {
|
||||
GetOasGeneratorInstance().entriesChan <- ews // blocking variant, right?
|
||||
GetDefaultOasGeneratorInstance().entriesChan <- ews // blocking variant, right?
|
||||
} else {
|
||||
GetOasGeneratorInstance().PushEntry(&ews)
|
||||
GetDefaultOasGeneratorInstance().PushEntry(&ews)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -12,18 +12,38 @@ import (
|
||||
|
||||
var (
|
||||
syncOnce sync.Once
|
||||
instance *oasGenerator
|
||||
instance *defaultOasGenerator
|
||||
)
|
||||
|
||||
func GetOasGeneratorInstance() *oasGenerator {
|
||||
type OasGeneratorSink interface {
|
||||
PushEntry(entryWithSource *EntryWithSource)
|
||||
}
|
||||
|
||||
type OasGenerator interface {
|
||||
Start()
|
||||
Stop()
|
||||
IsStarted() bool
|
||||
Reset()
|
||||
GetServiceSpecs() *sync.Map
|
||||
}
|
||||
|
||||
type defaultOasGenerator struct {
|
||||
started bool
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
serviceSpecs *sync.Map
|
||||
entriesChan chan EntryWithSource
|
||||
}
|
||||
|
||||
func GetDefaultOasGeneratorInstance() *defaultOasGenerator {
|
||||
syncOnce.Do(func() {
|
||||
instance = newOasGenerator()
|
||||
instance = NewDefaultOasGenerator()
|
||||
logger.Log.Debug("OAS Generator Initialized")
|
||||
})
|
||||
return instance
|
||||
}
|
||||
|
||||
func (g *oasGenerator) Start() {
|
||||
func (g *defaultOasGenerator) Start() {
|
||||
if g.started {
|
||||
return
|
||||
}
|
||||
@ -31,12 +51,12 @@ func (g *oasGenerator) Start() {
|
||||
g.cancel = cancel
|
||||
g.ctx = ctx
|
||||
g.entriesChan = make(chan EntryWithSource, 100) // buffer up to 100 entries for OAS processing
|
||||
g.ServiceSpecs = &sync.Map{}
|
||||
g.serviceSpecs = &sync.Map{}
|
||||
g.started = true
|
||||
go instance.runGenerator()
|
||||
go g.runGenerator()
|
||||
}
|
||||
|
||||
func (g *oasGenerator) Stop() {
|
||||
func (g *defaultOasGenerator) Stop() {
|
||||
if !g.started {
|
||||
return
|
||||
}
|
||||
@ -45,11 +65,11 @@ func (g *oasGenerator) Stop() {
|
||||
g.started = false
|
||||
}
|
||||
|
||||
func (g *oasGenerator) IsStarted() bool {
|
||||
func (g *defaultOasGenerator) IsStarted() bool {
|
||||
return g.started
|
||||
}
|
||||
|
||||
func (g *oasGenerator) runGenerator() {
|
||||
func (g *defaultOasGenerator) runGenerator() {
|
||||
for {
|
||||
select {
|
||||
case <-g.ctx.Done():
|
||||
@ -67,11 +87,11 @@ func (g *oasGenerator) runGenerator() {
|
||||
logger.Log.Errorf("Failed to parse entry URL: %v, err: %v", entry.Request.URL, err)
|
||||
}
|
||||
|
||||
val, found := g.ServiceSpecs.Load(entryWithSource.Destination)
|
||||
val, found := g.serviceSpecs.Load(entryWithSource.Destination)
|
||||
var gen *SpecGen
|
||||
if !found {
|
||||
gen = NewGen(u.Scheme + "://" + entryWithSource.Destination)
|
||||
g.ServiceSpecs.Store(entryWithSource.Destination, gen)
|
||||
g.serviceSpecs.Store(entryWithSource.Destination, gen)
|
||||
} else {
|
||||
gen = val.(*SpecGen)
|
||||
}
|
||||
@ -92,11 +112,11 @@ func (g *oasGenerator) runGenerator() {
|
||||
}
|
||||
}
|
||||
|
||||
func (g *oasGenerator) Reset() {
|
||||
g.ServiceSpecs = &sync.Map{}
|
||||
func (g *defaultOasGenerator) Reset() {
|
||||
g.serviceSpecs = &sync.Map{}
|
||||
}
|
||||
|
||||
func (g *oasGenerator) PushEntry(entryWithSource *EntryWithSource) {
|
||||
func (g *defaultOasGenerator) PushEntry(entryWithSource *EntryWithSource) {
|
||||
if !g.started {
|
||||
return
|
||||
}
|
||||
@ -107,12 +127,16 @@ func (g *oasGenerator) PushEntry(entryWithSource *EntryWithSource) {
|
||||
}
|
||||
}
|
||||
|
||||
func newOasGenerator() *oasGenerator {
|
||||
return &oasGenerator{
|
||||
func (g *defaultOasGenerator) GetServiceSpecs() *sync.Map {
|
||||
return g.serviceSpecs
|
||||
}
|
||||
|
||||
func NewDefaultOasGenerator() *defaultOasGenerator {
|
||||
return &defaultOasGenerator{
|
||||
started: false,
|
||||
ctx: nil,
|
||||
cancel: nil,
|
||||
ServiceSpecs: nil,
|
||||
serviceSpecs: nil,
|
||||
entriesChan: nil,
|
||||
}
|
||||
}
|
||||
@ -123,11 +147,3 @@ type EntryWithSource struct {
|
||||
Entry har.Entry
|
||||
Id uint
|
||||
}
|
||||
|
||||
type oasGenerator struct {
|
||||
started bool
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
ServiceSpecs *sync.Map
|
||||
entriesChan chan EntryWithSource
|
||||
}
|
||||
|
@ -2,10 +2,6 @@ package oas
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/chanced/openapi"
|
||||
"github.com/op/go-logging"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/wI2L/jsondiff"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"regexp"
|
||||
@ -13,6 +9,11 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/chanced/openapi"
|
||||
"github.com/op/go-logging"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/wI2L/jsondiff"
|
||||
|
||||
"github.com/up9inc/mizu/agent/pkg/har"
|
||||
)
|
||||
|
||||
@ -47,14 +48,14 @@ func TestEntries(t *testing.T) {
|
||||
t.Log(err)
|
||||
t.FailNow()
|
||||
}
|
||||
GetOasGeneratorInstance().Start()
|
||||
GetDefaultOasGeneratorInstance().Start()
|
||||
loadStartingOAS("test_artifacts/catalogue.json", "catalogue")
|
||||
loadStartingOAS("test_artifacts/trcc.json", "trcc-api-service")
|
||||
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(1 * time.Second)
|
||||
GetOasGeneratorInstance().ServiceSpecs.Range(func(key, val interface{}) bool {
|
||||
GetDefaultOasGeneratorInstance().GetServiceSpecs().Range(func(key, val interface{}) bool {
|
||||
svc := key.(string)
|
||||
t.Logf("Getting spec for %s", svc)
|
||||
gen := val.(*SpecGen)
|
||||
@ -76,7 +77,7 @@ func TestEntries(t *testing.T) {
|
||||
waitQueueProcessed()
|
||||
|
||||
svcs := strings.Builder{}
|
||||
GetOasGeneratorInstance().ServiceSpecs.Range(func(key, val interface{}) bool {
|
||||
GetDefaultOasGeneratorInstance().GetServiceSpecs().Range(func(key, val interface{}) bool {
|
||||
gen := val.(*SpecGen)
|
||||
svc := key.(string)
|
||||
svcs.WriteString(svc + ",")
|
||||
@ -98,7 +99,7 @@ func TestEntries(t *testing.T) {
|
||||
return true
|
||||
})
|
||||
|
||||
GetOasGeneratorInstance().ServiceSpecs.Range(func(key, val interface{}) bool {
|
||||
GetDefaultOasGeneratorInstance().GetServiceSpecs().Range(func(key, val interface{}) bool {
|
||||
svc := key.(string)
|
||||
gen := val.(*SpecGen)
|
||||
spec, err := gen.GetSpec()
|
||||
@ -122,8 +123,8 @@ func TestEntries(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestFileSingle(t *testing.T) {
|
||||
GetOasGeneratorInstance().Start()
|
||||
GetOasGeneratorInstance().Reset()
|
||||
GetDefaultOasGeneratorInstance().Start()
|
||||
GetDefaultOasGeneratorInstance().Reset()
|
||||
// loadStartingOAS()
|
||||
file := "test_artifacts/params.har"
|
||||
files := []string{file}
|
||||
@ -135,7 +136,7 @@ func TestFileSingle(t *testing.T) {
|
||||
|
||||
waitQueueProcessed()
|
||||
|
||||
GetOasGeneratorInstance().ServiceSpecs.Range(func(key, val interface{}) bool {
|
||||
GetDefaultOasGeneratorInstance().GetServiceSpecs().Range(func(key, val interface{}) bool {
|
||||
svc := key.(string)
|
||||
gen := val.(*SpecGen)
|
||||
spec, err := gen.GetSpec()
|
||||
@ -191,7 +192,7 @@ func TestFileSingle(t *testing.T) {
|
||||
func waitQueueProcessed() {
|
||||
for {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
queue := len(GetOasGeneratorInstance().entriesChan)
|
||||
queue := len(GetDefaultOasGeneratorInstance().entriesChan)
|
||||
logger.Log.Infof("Queue: %d", queue)
|
||||
if queue < 1 {
|
||||
break
|
||||
@ -221,7 +222,7 @@ func loadStartingOAS(file string, label string) {
|
||||
gen := NewGen(label)
|
||||
gen.StartFromSpec(doc)
|
||||
|
||||
GetOasGeneratorInstance().ServiceSpecs.Store(label, gen)
|
||||
GetDefaultOasGeneratorInstance().GetServiceSpecs().Store(label, gen)
|
||||
}
|
||||
|
||||
func TestEntriesNegative(t *testing.T) {
|
||||
|
@ -1,9 +1,10 @@
|
||||
package oas
|
||||
|
||||
import (
|
||||
"github.com/chanced/openapi"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/chanced/openapi"
|
||||
)
|
||||
|
||||
func TestTree(t *testing.T) {
|
||||
|
@ -10,7 +10,7 @@ import (
|
||||
restclient "k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
func NewFromInCluster(errOut chan error, namesapce string) (*Resolver, error) {
|
||||
func NewFromInCluster(errOut chan error, namespace string) (*Resolver, error) {
|
||||
config, err := restclient.InClusterConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -19,5 +19,5 @@ func NewFromInCluster(errOut chan error, namesapce string) (*Resolver, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: cmap.New(), serviceMap: cmap.New(), errOut: errOut, namespace: namesapce}, nil
|
||||
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: cmap.New(), serviceMap: cmap.New(), errOut: errOut, namespace: namespace}, nil
|
||||
}
|
||||
|
@ -168,11 +168,13 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
|
||||
func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) {
|
||||
if eventType == watch.Deleted {
|
||||
resolver.nameMap.Remove(resolved)
|
||||
resolver.nameMap.Remove(key)
|
||||
logger.Log.Infof("setting %s=nil", key)
|
||||
} else {
|
||||
|
||||
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
|
||||
resolver.nameMap.Set(resolved, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
|
||||
logger.Log.Infof("setting %s=%s", key, resolved)
|
||||
}
|
||||
}
|
||||
|
@ -13,28 +13,31 @@ const (
|
||||
UnresolvedNodeName = "unresolved"
|
||||
)
|
||||
|
||||
var instance *serviceMap
|
||||
var instance *defaultServiceMap
|
||||
var once sync.Once
|
||||
|
||||
func GetInstance() ServiceMap {
|
||||
func GetDefaultServiceMapInstance() *defaultServiceMap {
|
||||
once.Do(func() {
|
||||
instance = newServiceMap()
|
||||
instance = NewDefaultServiceMapGenerator()
|
||||
logger.Log.Debug("Service Map Initialized")
|
||||
})
|
||||
return instance
|
||||
}
|
||||
|
||||
type serviceMap struct {
|
||||
type defaultServiceMap struct {
|
||||
enabled bool
|
||||
graph *graph
|
||||
entriesProcessed int
|
||||
}
|
||||
|
||||
type ServiceMapSink interface {
|
||||
NewTCPEntry(source *tapApi.TCP, destination *tapApi.TCP, protocol *tapApi.Protocol)
|
||||
}
|
||||
|
||||
type ServiceMap interface {
|
||||
Enable()
|
||||
Disable()
|
||||
IsEnabled() bool
|
||||
NewTCPEntry(source *tapApi.TCP, destination *tapApi.TCP, protocol *tapApi.Protocol)
|
||||
GetStatus() ServiceMapStatus
|
||||
GetNodes() []ServiceMapNode
|
||||
GetEdges() []ServiceMapEdge
|
||||
@ -44,8 +47,8 @@ type ServiceMap interface {
|
||||
Reset()
|
||||
}
|
||||
|
||||
func newServiceMap() *serviceMap {
|
||||
return &serviceMap{
|
||||
func NewDefaultServiceMapGenerator() *defaultServiceMap {
|
||||
return &defaultServiceMap{
|
||||
enabled: false,
|
||||
entriesProcessed: 0,
|
||||
graph: newDirectedGraph(),
|
||||
@ -105,12 +108,12 @@ func newEdgeData(p *tapApi.Protocol) *edgeData {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *serviceMap) nodeExists(k key) (*nodeData, bool) {
|
||||
func (s *defaultServiceMap) nodeExists(k key) (*nodeData, bool) {
|
||||
n, ok := s.graph.Nodes[k]
|
||||
return n, ok
|
||||
}
|
||||
|
||||
func (s *serviceMap) addNode(k key, e *tapApi.TCP) (*nodeData, bool) {
|
||||
func (s *defaultServiceMap) addNode(k key, e *tapApi.TCP) (*nodeData, bool) {
|
||||
nd, exists := s.nodeExists(k)
|
||||
if !exists {
|
||||
s.graph.Nodes[k] = newNodeData(len(s.graph.Nodes)+1, e)
|
||||
@ -119,7 +122,7 @@ func (s *serviceMap) addNode(k key, e *tapApi.TCP) (*nodeData, bool) {
|
||||
return nd, false
|
||||
}
|
||||
|
||||
func (s *serviceMap) addEdge(u, v *entryData, p *tapApi.Protocol) {
|
||||
func (s *defaultServiceMap) addEdge(u, v *entryData, p *tapApi.Protocol) {
|
||||
if n, ok := s.addNode(u.key, u.entry); !ok {
|
||||
n.count++
|
||||
}
|
||||
@ -156,20 +159,20 @@ func (s *serviceMap) addEdge(u, v *entryData, p *tapApi.Protocol) {
|
||||
s.entriesProcessed++
|
||||
}
|
||||
|
||||
func (s *serviceMap) Enable() {
|
||||
func (s *defaultServiceMap) Enable() {
|
||||
s.enabled = true
|
||||
}
|
||||
|
||||
func (s *serviceMap) Disable() {
|
||||
func (s *defaultServiceMap) Disable() {
|
||||
s.Reset()
|
||||
s.enabled = false
|
||||
}
|
||||
|
||||
func (s *serviceMap) IsEnabled() bool {
|
||||
func (s *defaultServiceMap) IsEnabled() bool {
|
||||
return s.enabled
|
||||
}
|
||||
|
||||
func (s *serviceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tapApi.Protocol) {
|
||||
func (s *defaultServiceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tapApi.Protocol) {
|
||||
if !s.IsEnabled() {
|
||||
return
|
||||
}
|
||||
@ -206,7 +209,7 @@ func (s *serviceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tapApi.Pro
|
||||
s.addEdge(srcEntry, dstEntry, p)
|
||||
}
|
||||
|
||||
func (s *serviceMap) GetStatus() ServiceMapStatus {
|
||||
func (s *defaultServiceMap) GetStatus() ServiceMapStatus {
|
||||
status := ServiceMapDisabled
|
||||
if s.IsEnabled() {
|
||||
status = ServiceMapEnabled
|
||||
@ -220,7 +223,7 @@ func (s *serviceMap) GetStatus() ServiceMapStatus {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *serviceMap) GetNodes() []ServiceMapNode {
|
||||
func (s *defaultServiceMap) GetNodes() []ServiceMapNode {
|
||||
var nodes []ServiceMapNode
|
||||
for i, n := range s.graph.Nodes {
|
||||
nodes = append(nodes, ServiceMapNode{
|
||||
@ -233,7 +236,7 @@ func (s *serviceMap) GetNodes() []ServiceMapNode {
|
||||
return nodes
|
||||
}
|
||||
|
||||
func (s *serviceMap) GetEdges() []ServiceMapEdge {
|
||||
func (s *defaultServiceMap) GetEdges() []ServiceMapEdge {
|
||||
var edges []ServiceMapEdge
|
||||
for u, m := range s.graph.Edges {
|
||||
for v := range m {
|
||||
@ -260,15 +263,15 @@ func (s *serviceMap) GetEdges() []ServiceMapEdge {
|
||||
return edges
|
||||
}
|
||||
|
||||
func (s *serviceMap) GetEntriesProcessedCount() int {
|
||||
func (s *defaultServiceMap) GetEntriesProcessedCount() int {
|
||||
return s.entriesProcessed
|
||||
}
|
||||
|
||||
func (s *serviceMap) GetNodesCount() int {
|
||||
func (s *defaultServiceMap) GetNodesCount() int {
|
||||
return len(s.graph.Nodes)
|
||||
}
|
||||
|
||||
func (s *serviceMap) GetEdgesCount() int {
|
||||
func (s *defaultServiceMap) GetEdgesCount() int {
|
||||
var count int
|
||||
for u, m := range s.graph.Edges {
|
||||
for v := range m {
|
||||
@ -280,7 +283,7 @@ func (s *serviceMap) GetEdgesCount() int {
|
||||
return count
|
||||
}
|
||||
|
||||
func (s *serviceMap) Reset() {
|
||||
func (s *defaultServiceMap) Reset() {
|
||||
s.entriesProcessed = 0
|
||||
s.graph = newDirectedGraph()
|
||||
}
|
||||
|
@ -80,21 +80,21 @@ var (
|
||||
type ServiceMapDisabledSuite struct {
|
||||
suite.Suite
|
||||
|
||||
instance ServiceMap
|
||||
instance *defaultServiceMap
|
||||
}
|
||||
|
||||
type ServiceMapEnabledSuite struct {
|
||||
suite.Suite
|
||||
|
||||
instance ServiceMap
|
||||
instance *defaultServiceMap
|
||||
}
|
||||
|
||||
func (s *ServiceMapDisabledSuite) SetupTest() {
|
||||
s.instance = GetInstance()
|
||||
s.instance = GetDefaultServiceMapInstance()
|
||||
}
|
||||
|
||||
func (s *ServiceMapEnabledSuite) SetupTest() {
|
||||
s.instance = GetInstance()
|
||||
s.instance = GetDefaultServiceMapInstance()
|
||||
s.instance.Enable()
|
||||
}
|
||||
|
||||
@ -107,7 +107,7 @@ func (s *ServiceMapDisabledSuite) TestServiceMapInstance() {
|
||||
func (s *ServiceMapDisabledSuite) TestServiceMapSingletonInstance() {
|
||||
assert := s.Assert()
|
||||
|
||||
instance2 := GetInstance()
|
||||
instance2 := GetDefaultServiceMapInstance()
|
||||
|
||||
assert.NotNil(s.instance)
|
||||
assert.NotNil(instance2)
|
||||
|
Loading…
Reference in New Issue
Block a user