From 0f3dd66d2df63bd384c11dce1f94207811f38b27 Mon Sep 17 00:00:00 2001 From: Igor Gov Date: Sun, 30 Jan 2022 09:22:13 +0200 Subject: [PATCH] Experimental feature: elastic exporter (#713) --- agent/go.mod | 1 + agent/go.sum | 2 + agent/main.go | 2 + agent/pkg/api/main.go | 9 ++- agent/pkg/elastic/esClient.go | 120 ++++++++++++++++++++++++++++++++++ cli/cmd/installRunner.go | 1 + cli/cmd/tapRunner.go | 1 + cli/config/configStruct.go | 1 + shared/models.go | 7 ++ 9 files changed, 141 insertions(+), 3 deletions(-) create mode 100644 agent/pkg/elastic/esClient.go diff --git a/agent/go.mod b/agent/go.mod index aca0c636e..dd8d8e4b9 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -6,6 +6,7 @@ require ( github.com/antelman107/net-wait-go v0.0.0-20210623112055-cf684aebda7b github.com/chanced/openapi v0.0.6 github.com/djherbis/atime v1.0.0 + github.com/elastic/go-elasticsearch/v7 v7.16.0 github.com/getkin/kin-openapi v0.76.0 github.com/gin-contrib/static v0.0.1 github.com/gin-gonic/gin v1.7.7 diff --git a/agent/go.sum b/agent/go.sum index 59848b5e8..4d036ad33 100644 --- a/agent/go.sum +++ b/agent/go.sum @@ -126,6 +126,8 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/elastic/go-elasticsearch/v7 v7.16.0 h1:GHsxDFXIAlhSleXun4kwA89P7kQFADRChqvgOPeYP5A= +github.com/elastic/go-elasticsearch/v7 v7.16.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= diff --git a/agent/main.go b/agent/main.go index 9c293b388..c7a4d32e9 100644 --- a/agent/main.go +++ b/agent/main.go @@ -9,6 +9,7 @@ import ( "mizuserver/pkg/api" "mizuserver/pkg/config" "mizuserver/pkg/controllers" + "mizuserver/pkg/elastic" "mizuserver/pkg/middlewares" "mizuserver/pkg/models" "mizuserver/pkg/oas" @@ -159,6 +160,7 @@ func enableExpFeatureIfNeeded() { if config.Config.ServiceMap { servicemap.GetInstance().SetConfig(config.Config) } + elastic.GetInstance().Configure(config.Config.Elastic) } func configureBasenineServer(host string, port string) { diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index f70b8d3f0..93c00bbb0 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "mizuserver/pkg/elastic" "mizuserver/pkg/har" "mizuserver/pkg/holder" "mizuserver/pkg/providers" @@ -16,14 +17,15 @@ import ( "mizuserver/pkg/servicemap" - "github.com/up9inc/mizu/shared" - "github.com/up9inc/mizu/shared/logger" - tapApi "github.com/up9inc/mizu/tap/api" "mizuserver/pkg/models" "mizuserver/pkg/oas" "mizuserver/pkg/resolver" "mizuserver/pkg/utils" + "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/shared/logger" + tapApi "github.com/up9inc/mizu/tap/api" + basenine "github.com/up9inc/basenine/client/go" ) @@ -150,6 +152,7 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension connection.SendText(string(data)) servicemap.GetInstance().NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol) + elastic.GetInstance().PushEntry(mizuEntry) } } diff --git a/agent/pkg/elastic/esClient.go b/agent/pkg/elastic/esClient.go new file mode 100644 index 000000000..acedb8579 --- /dev/null +++ b/agent/pkg/elastic/esClient.go @@ -0,0 +1,120 @@ +package elastic + +import ( + "bytes" + "crypto/tls" + "encoding/json" + "github.com/elastic/go-elasticsearch/v7" + "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/shared/logger" + "github.com/up9inc/mizu/tap/api" + "net/http" + "sync" + "time" +) + +type client struct { + es *elasticsearch.Client + index string + insertedCount int +} + +var instance *client +var once sync.Once + +func GetInstance() *client { + once.Do(func() { + instance = newClient() + }) + return instance +} + +func (client *client) Configure(config shared.ElasticConfig) { + if config.Url == "" || config.User == "" || config.Password == "" { + logger.Log.Infof("No elastic configuration was supplied, elastic exporter disabled") + return + } + transport := http.DefaultTransport + tlsClientConfig := &tls.Config{InsecureSkipVerify: true} + transport.(*http.Transport).TLSClientConfig = tlsClientConfig + cfg := elasticsearch.Config{ + Addresses: []string{config.Url}, + Username: config.User, + Password: config.Password, + Transport: transport, + } + + es, err := elasticsearch.NewClient(cfg) + if err != nil { + logger.Log.Fatalf("Failed to initialize elastic client %v", err) + } + + // Have the client instance return a response + res, err := es.Info() + if err != nil { + logger.Log.Fatalf("Elastic client.Info() ERROR: %v", err) + } else { + client.es = es + client.index = "mizu_traffic_http_" + time.Now().Format("2006_01_02_15_04") + client.insertedCount = 0 + logger.Log.Infof("Elastic client configured, index: %s, cluster info: %v", client.index, res) + } + defer res.Body.Close() +} + +func newClient() *client { + return &client{ + es: nil, + index: "", + } +} + +type httpEntry struct { + Source *api.TCP `json:"src"` + Destination *api.TCP `json:"dst"` + Outgoing bool `json:"outgoing"` + CreatedAt time.Time `json:"createdAt"` + Request map[string]interface{} `json:"request"` + Response map[string]interface{} `json:"response"` + Summary string `json:"summary"` + Method string `json:"method"` + Status int `json:"status"` + ElapsedTime int64 `json:"elapsedTime"` + Path string `json:"path"` +} + +func (client *client) PushEntry(entry *api.Entry) { + if client.es == nil { + return + } + + if entry.Protocol.Name != "http" { + return + } + + entryToPush := httpEntry{ + Source: entry.Source, + Destination: entry.Destination, + Outgoing: entry.Outgoing, + CreatedAt: entry.StartTime, + Request: entry.Request, + Response: entry.Response, + Summary: entry.Summary, + Method: entry.Method, + Status: entry.Status, + ElapsedTime: entry.ElapsedTime, + Path: entry.Path, + } + + entryJson, err := json.Marshal(entryToPush) + if err != nil { + logger.Log.Errorf("json.Marshal ERROR: %v", err) + return + } + var buffer bytes.Buffer + buffer.WriteString(string(entryJson)) + res, _ := client.es.Index(client.index, &buffer) + if res.StatusCode == 201 { + client.insertedCount += 1 + } +} diff --git a/cli/cmd/installRunner.go b/cli/cmd/installRunner.go index d2cd51384..99aa57a14 100644 --- a/cli/cmd/installRunner.go +++ b/cli/cmd/installRunner.go @@ -74,6 +74,7 @@ func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Reso StandaloneMode: true, ServiceMap: config.Config.ServiceMap, OAS: config.Config.OAS, + Elastic: config.Config.Elastic, } return &mizuAgentConfig diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 4b8a378ef..bace705d6 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -164,6 +164,7 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig { AgentDatabasePath: shared.DataDirPath, ServiceMap: config.Config.ServiceMap, OAS: config.Config.OAS, + Elastic: config.Config.Elastic, } return &mizuAgentConfig diff --git a/cli/config/configStruct.go b/cli/config/configStruct.go index ecfba8c89..01cd5c537 100644 --- a/cli/config/configStruct.go +++ b/cli/config/configStruct.go @@ -40,6 +40,7 @@ type ConfigStruct struct { LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""` ServiceMap bool `yaml:"service-map,omitempty" default:"false" readonly:""` OAS bool `yaml:"oas,omitempty" default:"false" readonly:""` + Elastic shared.ElasticConfig `yaml:"elastic"` } func (config *ConfigStruct) validate() error { diff --git a/shared/models.go b/shared/models.go index 912061a01..4d415aced 100644 --- a/shared/models.go +++ b/shared/models.go @@ -43,6 +43,13 @@ type MizuAgentConfig struct { StandaloneMode bool `json:"standaloneMode"` ServiceMap bool `json:"serviceMap"` OAS bool `json:"oas"` + Elastic ElasticConfig `json:"elastic"` +} + +type ElasticConfig struct { + User string `yaml:"user,omitempty" default:"" readonly:""` + Password string `yaml:"password,omitempty" default:"" readonly:""` + Url string `yaml:"url,omitempty" default:"" readonly:""` } type WebSocketMessageMetadata struct {