From 346e904e770d4337bfb835cebfcd3832919dd68d Mon Sep 17 00:00:00 2001 From: RamiBerm <54766858+RamiBerm@users.noreply.github.com> Date: Wed, 2 Mar 2022 09:49:17 +0200 Subject: [PATCH] Support stopping oas/servicemesh/telemetry in flight (#869) * Update oas_generator.go and servicemap.go * Update oas_generator.go * Update esClient.go * Update servicemap.go --- agent/pkg/elastic/esClient.go | 14 +++++++++----- agent/pkg/oas/oas_generator.go | 17 +++++++++++++++-- agent/pkg/servicemap/servicemap.go | 6 ++++++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/agent/pkg/elastic/esClient.go b/agent/pkg/elastic/esClient.go index acedb8579..9ccc30ebb 100644 --- a/agent/pkg/elastic/esClient.go +++ b/agent/pkg/elastic/esClient.go @@ -4,13 +4,14 @@ import ( "bytes" "crypto/tls" "encoding/json" + "net/http" + "sync" + "time" + "github.com/elastic/go-elasticsearch/v7" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/tap/api" - "net/http" - "sync" - "time" ) type client struct { @@ -31,6 +32,9 @@ func GetInstance() *client { func (client *client) Configure(config shared.ElasticConfig) { if config.Url == "" || config.User == "" || config.Password == "" { + if client.es != nil { + client.es = nil + } logger.Log.Infof("No elastic configuration was supplied, elastic exporter disabled") return } @@ -46,13 +50,13 @@ func (client *client) Configure(config shared.ElasticConfig) { es, err := elasticsearch.NewClient(cfg) if err != nil { - logger.Log.Fatalf("Failed to initialize elastic client %v", err) + logger.Log.Errorf("Failed to initialize elastic client %v", err) } // Have the client instance return a response res, err := es.Info() if err != nil { - logger.Log.Fatalf("Elastic client.Info() ERROR: %v", err) + logger.Log.Errorf("Elastic client.Info() ERROR: %v", err) } else { client.es = es client.index = "mizu_traffic_http_" + time.Now().Format("2006_01_02_15_04") diff --git a/agent/pkg/oas/oas_generator.go b/agent/pkg/oas/oas_generator.go index df7278325..76d9a550b 100644 --- a/agent/pkg/oas/oas_generator.go +++ b/agent/pkg/oas/oas_generator.go @@ -33,10 +33,23 @@ func (g *oasGenerator) Start() { g.entriesChan = make(chan EntryWithSource, 100) // buffer up to 100 entries for OAS processing g.ServiceSpecs = &sync.Map{} g.started = true - go instance.runGeneretor() + go instance.runGenerator() } -func (g *oasGenerator) runGeneretor() { +func (g *oasGenerator) Stop() { + if !g.started { + return + } + g.cancel() + g.Reset() + g.started = false +} + +func (g *oasGenerator) IsStarted() bool { + return g.started +} + +func (g *oasGenerator) runGenerator() { for { select { case <-g.ctx.Done(): diff --git a/agent/pkg/servicemap/servicemap.go b/agent/pkg/servicemap/servicemap.go index 3f7359feb..e27585d3a 100644 --- a/agent/pkg/servicemap/servicemap.go +++ b/agent/pkg/servicemap/servicemap.go @@ -32,6 +32,7 @@ type serviceMap struct { type ServiceMap interface { Enable() + Disable() IsEnabled() bool NewTCPEntry(source *tapApi.TCP, destination *tapApi.TCP, protocol *tapApi.Protocol) GetStatus() ServiceMapStatus @@ -159,6 +160,11 @@ func (s *serviceMap) Enable() { s.enabled = true } +func (s *serviceMap) Disable() { + s.Reset() + s.enabled = false +} + func (s *serviceMap) IsEnabled() bool { return s.enabled }