Support stopping oas/servicemesh/telemetry in flight (#869)

* Update oas_generator.go and servicemap.go

* Update oas_generator.go

* Update esClient.go

* Update servicemap.go
This commit is contained in:
RamiBerm 2022-03-02 09:49:17 +02:00 committed by GitHub
parent c5471c501b
commit 346e904e77
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 30 additions and 7 deletions

View File

@ -4,13 +4,14 @@ import (
"bytes"
"crypto/tls"
"encoding/json"
"net/http"
"sync"
"time"
"github.com/elastic/go-elasticsearch/v7"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
"net/http"
"sync"
"time"
)
type client struct {
@ -31,6 +32,9 @@ func GetInstance() *client {
func (client *client) Configure(config shared.ElasticConfig) {
if config.Url == "" || config.User == "" || config.Password == "" {
if client.es != nil {
client.es = nil
}
logger.Log.Infof("No elastic configuration was supplied, elastic exporter disabled")
return
}
@ -46,13 +50,13 @@ func (client *client) Configure(config shared.ElasticConfig) {
es, err := elasticsearch.NewClient(cfg)
if err != nil {
logger.Log.Fatalf("Failed to initialize elastic client %v", err)
logger.Log.Errorf("Failed to initialize elastic client %v", err)
}
// Have the client instance return a response
res, err := es.Info()
if err != nil {
logger.Log.Fatalf("Elastic client.Info() ERROR: %v", err)
logger.Log.Errorf("Elastic client.Info() ERROR: %v", err)
} else {
client.es = es
client.index = "mizu_traffic_http_" + time.Now().Format("2006_01_02_15_04")

View File

@ -33,10 +33,23 @@ func (g *oasGenerator) Start() {
g.entriesChan = make(chan EntryWithSource, 100) // buffer up to 100 entries for OAS processing
g.ServiceSpecs = &sync.Map{}
g.started = true
go instance.runGeneretor()
go instance.runGenerator()
}
func (g *oasGenerator) runGeneretor() {
func (g *oasGenerator) Stop() {
if !g.started {
return
}
g.cancel()
g.Reset()
g.started = false
}
func (g *oasGenerator) IsStarted() bool {
return g.started
}
func (g *oasGenerator) runGenerator() {
for {
select {
case <-g.ctx.Done():

View File

@ -32,6 +32,7 @@ type serviceMap struct {
type ServiceMap interface {
Enable()
Disable()
IsEnabled() bool
NewTCPEntry(source *tapApi.TCP, destination *tapApi.TCP, protocol *tapApi.Protocol)
GetStatus() ServiceMapStatus
@ -159,6 +160,11 @@ func (s *serviceMap) Enable() {
s.enabled = true
}
func (s *serviceMap) Disable() {
s.Reset()
s.enabled = false
}
func (s *serviceMap) IsEnabled() bool {
return s.enabled
}