mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-24 23:34:45 +00:00
Experimental feature: elastic exporter (#713)
This commit is contained in:
parent
5536e5bb44
commit
0f3dd66d2d
@ -6,6 +6,7 @@ require (
|
|||||||
github.com/antelman107/net-wait-go v0.0.0-20210623112055-cf684aebda7b
|
github.com/antelman107/net-wait-go v0.0.0-20210623112055-cf684aebda7b
|
||||||
github.com/chanced/openapi v0.0.6
|
github.com/chanced/openapi v0.0.6
|
||||||
github.com/djherbis/atime v1.0.0
|
github.com/djherbis/atime v1.0.0
|
||||||
|
github.com/elastic/go-elasticsearch/v7 v7.16.0
|
||||||
github.com/getkin/kin-openapi v0.76.0
|
github.com/getkin/kin-openapi v0.76.0
|
||||||
github.com/gin-contrib/static v0.0.1
|
github.com/gin-contrib/static v0.0.1
|
||||||
github.com/gin-gonic/gin v1.7.7
|
github.com/gin-gonic/gin v1.7.7
|
||||||
|
@ -126,6 +126,8 @@ github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDD
|
|||||||
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
|
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
|
||||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
|
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
|
||||||
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
|
||||||
|
github.com/elastic/go-elasticsearch/v7 v7.16.0 h1:GHsxDFXIAlhSleXun4kwA89P7kQFADRChqvgOPeYP5A=
|
||||||
|
github.com/elastic/go-elasticsearch/v7 v7.16.0/go.mod h1:OJ4wdbtDNk5g503kvlHLyErCgQwwzmDtaFC4XyOxXA4=
|
||||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
|
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7foRItutHYUIhlcKzcSf5vDpdhQAKTc=
|
||||||
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
|
||||||
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
|
||||||
|
@ -9,6 +9,7 @@ import (
|
|||||||
"mizuserver/pkg/api"
|
"mizuserver/pkg/api"
|
||||||
"mizuserver/pkg/config"
|
"mizuserver/pkg/config"
|
||||||
"mizuserver/pkg/controllers"
|
"mizuserver/pkg/controllers"
|
||||||
|
"mizuserver/pkg/elastic"
|
||||||
"mizuserver/pkg/middlewares"
|
"mizuserver/pkg/middlewares"
|
||||||
"mizuserver/pkg/models"
|
"mizuserver/pkg/models"
|
||||||
"mizuserver/pkg/oas"
|
"mizuserver/pkg/oas"
|
||||||
@ -159,6 +160,7 @@ func enableExpFeatureIfNeeded() {
|
|||||||
if config.Config.ServiceMap {
|
if config.Config.ServiceMap {
|
||||||
servicemap.GetInstance().SetConfig(config.Config)
|
servicemap.GetInstance().SetConfig(config.Config)
|
||||||
}
|
}
|
||||||
|
elastic.GetInstance().Configure(config.Config.Elastic)
|
||||||
}
|
}
|
||||||
|
|
||||||
func configureBasenineServer(host string, port string) {
|
func configureBasenineServer(host string, port string) {
|
||||||
|
@ -5,6 +5,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"mizuserver/pkg/elastic"
|
||||||
"mizuserver/pkg/har"
|
"mizuserver/pkg/har"
|
||||||
"mizuserver/pkg/holder"
|
"mizuserver/pkg/holder"
|
||||||
"mizuserver/pkg/providers"
|
"mizuserver/pkg/providers"
|
||||||
@ -16,14 +17,15 @@ import (
|
|||||||
|
|
||||||
"mizuserver/pkg/servicemap"
|
"mizuserver/pkg/servicemap"
|
||||||
|
|
||||||
"github.com/up9inc/mizu/shared"
|
|
||||||
"github.com/up9inc/mizu/shared/logger"
|
|
||||||
tapApi "github.com/up9inc/mizu/tap/api"
|
|
||||||
"mizuserver/pkg/models"
|
"mizuserver/pkg/models"
|
||||||
"mizuserver/pkg/oas"
|
"mizuserver/pkg/oas"
|
||||||
"mizuserver/pkg/resolver"
|
"mizuserver/pkg/resolver"
|
||||||
"mizuserver/pkg/utils"
|
"mizuserver/pkg/utils"
|
||||||
|
|
||||||
|
"github.com/up9inc/mizu/shared"
|
||||||
|
"github.com/up9inc/mizu/shared/logger"
|
||||||
|
tapApi "github.com/up9inc/mizu/tap/api"
|
||||||
|
|
||||||
basenine "github.com/up9inc/basenine/client/go"
|
basenine "github.com/up9inc/basenine/client/go"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -150,6 +152,7 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
|
|||||||
connection.SendText(string(data))
|
connection.SendText(string(data))
|
||||||
|
|
||||||
servicemap.GetInstance().NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
|
servicemap.GetInstance().NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
|
||||||
|
elastic.GetInstance().PushEntry(mizuEntry)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
120
agent/pkg/elastic/esClient.go
Normal file
120
agent/pkg/elastic/esClient.go
Normal file
@ -0,0 +1,120 @@
|
|||||||
|
package elastic
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
|
"github.com/elastic/go-elasticsearch/v7"
|
||||||
|
"github.com/up9inc/mizu/shared"
|
||||||
|
"github.com/up9inc/mizu/shared/logger"
|
||||||
|
"github.com/up9inc/mizu/tap/api"
|
||||||
|
"net/http"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type client struct {
|
||||||
|
es *elasticsearch.Client
|
||||||
|
index string
|
||||||
|
insertedCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
var instance *client
|
||||||
|
var once sync.Once
|
||||||
|
|
||||||
|
func GetInstance() *client {
|
||||||
|
once.Do(func() {
|
||||||
|
instance = newClient()
|
||||||
|
})
|
||||||
|
return instance
|
||||||
|
}
|
||||||
|
|
||||||
|
func (client *client) Configure(config shared.ElasticConfig) {
|
||||||
|
if config.Url == "" || config.User == "" || config.Password == "" {
|
||||||
|
logger.Log.Infof("No elastic configuration was supplied, elastic exporter disabled")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
transport := http.DefaultTransport
|
||||||
|
tlsClientConfig := &tls.Config{InsecureSkipVerify: true}
|
||||||
|
transport.(*http.Transport).TLSClientConfig = tlsClientConfig
|
||||||
|
cfg := elasticsearch.Config{
|
||||||
|
Addresses: []string{config.Url},
|
||||||
|
Username: config.User,
|
||||||
|
Password: config.Password,
|
||||||
|
Transport: transport,
|
||||||
|
}
|
||||||
|
|
||||||
|
es, err := elasticsearch.NewClient(cfg)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log.Fatalf("Failed to initialize elastic client %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Have the client instance return a response
|
||||||
|
res, err := es.Info()
|
||||||
|
if err != nil {
|
||||||
|
logger.Log.Fatalf("Elastic client.Info() ERROR: %v", err)
|
||||||
|
} else {
|
||||||
|
client.es = es
|
||||||
|
client.index = "mizu_traffic_http_" + time.Now().Format("2006_01_02_15_04")
|
||||||
|
client.insertedCount = 0
|
||||||
|
logger.Log.Infof("Elastic client configured, index: %s, cluster info: %v", client.index, res)
|
||||||
|
}
|
||||||
|
defer res.Body.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClient() *client {
|
||||||
|
return &client{
|
||||||
|
es: nil,
|
||||||
|
index: "",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type httpEntry struct {
|
||||||
|
Source *api.TCP `json:"src"`
|
||||||
|
Destination *api.TCP `json:"dst"`
|
||||||
|
Outgoing bool `json:"outgoing"`
|
||||||
|
CreatedAt time.Time `json:"createdAt"`
|
||||||
|
Request map[string]interface{} `json:"request"`
|
||||||
|
Response map[string]interface{} `json:"response"`
|
||||||
|
Summary string `json:"summary"`
|
||||||
|
Method string `json:"method"`
|
||||||
|
Status int `json:"status"`
|
||||||
|
ElapsedTime int64 `json:"elapsedTime"`
|
||||||
|
Path string `json:"path"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (client *client) PushEntry(entry *api.Entry) {
|
||||||
|
if client.es == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if entry.Protocol.Name != "http" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
entryToPush := httpEntry{
|
||||||
|
Source: entry.Source,
|
||||||
|
Destination: entry.Destination,
|
||||||
|
Outgoing: entry.Outgoing,
|
||||||
|
CreatedAt: entry.StartTime,
|
||||||
|
Request: entry.Request,
|
||||||
|
Response: entry.Response,
|
||||||
|
Summary: entry.Summary,
|
||||||
|
Method: entry.Method,
|
||||||
|
Status: entry.Status,
|
||||||
|
ElapsedTime: entry.ElapsedTime,
|
||||||
|
Path: entry.Path,
|
||||||
|
}
|
||||||
|
|
||||||
|
entryJson, err := json.Marshal(entryToPush)
|
||||||
|
if err != nil {
|
||||||
|
logger.Log.Errorf("json.Marshal ERROR: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var buffer bytes.Buffer
|
||||||
|
buffer.WriteString(string(entryJson))
|
||||||
|
res, _ := client.es.Index(client.index, &buffer)
|
||||||
|
if res.StatusCode == 201 {
|
||||||
|
client.insertedCount += 1
|
||||||
|
}
|
||||||
|
}
|
@ -74,6 +74,7 @@ func getInstallMizuAgentConfig(maxDBSizeBytes int64, tapperResources shared.Reso
|
|||||||
StandaloneMode: true,
|
StandaloneMode: true,
|
||||||
ServiceMap: config.Config.ServiceMap,
|
ServiceMap: config.Config.ServiceMap,
|
||||||
OAS: config.Config.OAS,
|
OAS: config.Config.OAS,
|
||||||
|
Elastic: config.Config.Elastic,
|
||||||
}
|
}
|
||||||
|
|
||||||
return &mizuAgentConfig
|
return &mizuAgentConfig
|
||||||
|
@ -164,6 +164,7 @@ func getTapMizuAgentConfig() *shared.MizuAgentConfig {
|
|||||||
AgentDatabasePath: shared.DataDirPath,
|
AgentDatabasePath: shared.DataDirPath,
|
||||||
ServiceMap: config.Config.ServiceMap,
|
ServiceMap: config.Config.ServiceMap,
|
||||||
OAS: config.Config.OAS,
|
OAS: config.Config.OAS,
|
||||||
|
Elastic: config.Config.Elastic,
|
||||||
}
|
}
|
||||||
|
|
||||||
return &mizuAgentConfig
|
return &mizuAgentConfig
|
||||||
|
@ -40,6 +40,7 @@ type ConfigStruct struct {
|
|||||||
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
|
LogLevelStr string `yaml:"log-level,omitempty" default:"INFO" readonly:""`
|
||||||
ServiceMap bool `yaml:"service-map,omitempty" default:"false" readonly:""`
|
ServiceMap bool `yaml:"service-map,omitempty" default:"false" readonly:""`
|
||||||
OAS bool `yaml:"oas,omitempty" default:"false" readonly:""`
|
OAS bool `yaml:"oas,omitempty" default:"false" readonly:""`
|
||||||
|
Elastic shared.ElasticConfig `yaml:"elastic"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func (config *ConfigStruct) validate() error {
|
func (config *ConfigStruct) validate() error {
|
||||||
|
@ -43,6 +43,13 @@ type MizuAgentConfig struct {
|
|||||||
StandaloneMode bool `json:"standaloneMode"`
|
StandaloneMode bool `json:"standaloneMode"`
|
||||||
ServiceMap bool `json:"serviceMap"`
|
ServiceMap bool `json:"serviceMap"`
|
||||||
OAS bool `json:"oas"`
|
OAS bool `json:"oas"`
|
||||||
|
Elastic ElasticConfig `json:"elastic"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ElasticConfig struct {
|
||||||
|
User string `yaml:"user,omitempty" default:"" readonly:""`
|
||||||
|
Password string `yaml:"password,omitempty" default:"" readonly:""`
|
||||||
|
Url string `yaml:"url,omitempty" default:"" readonly:""`
|
||||||
}
|
}
|
||||||
|
|
||||||
type WebSocketMessageMetadata struct {
|
type WebSocketMessageMetadata struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user