mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-05 04:18:50 +00:00
354 lines
9.9 KiB
Go
354 lines
9.9 KiB
Go
package up9
|
|
|
|
import (
|
|
"bytes"
|
|
"compress/zlib"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/url"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/agent/pkg/har"
|
|
"github.com/up9inc/mizu/agent/pkg/utils"
|
|
|
|
basenine "github.com/up9inc/basenine/client/go"
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/shared"
|
|
tapApi "github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
const (
|
|
AnalyzeCheckSleepTime = 5 * time.Second
|
|
SentCountLogInterval = 100
|
|
)
|
|
|
|
type GuestToken struct {
|
|
Token string `json:"token"`
|
|
Model string `json:"model"`
|
|
}
|
|
|
|
type ModelStatus struct {
|
|
LastMajorGeneration float64 `json:"lastMajorGeneration"`
|
|
}
|
|
|
|
func GetRemoteUrl(analyzeDestination string, analyzeModel string, analyzeToken string, guestMode bool) string {
|
|
if guestMode {
|
|
return fmt.Sprintf("https://%s/share/%s", analyzeDestination, analyzeToken)
|
|
}
|
|
|
|
return fmt.Sprintf("https://%s/app/workspaces/%s", analyzeDestination, analyzeModel)
|
|
}
|
|
|
|
func CheckIfModelReady(analyzeDestination string, analyzeModel string, analyzeToken string, guestMode bool) bool {
|
|
statusUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/models/%s/status", analyzeDestination, analyzeModel))
|
|
|
|
authHeader := getAuthHeader(guestMode)
|
|
req := &http.Request{
|
|
Method: http.MethodGet,
|
|
URL: statusUrl,
|
|
Header: map[string][]string{
|
|
"Content-Type": {"application/json"},
|
|
authHeader: {analyzeToken},
|
|
},
|
|
}
|
|
statusResp, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
defer statusResp.Body.Close()
|
|
|
|
target := &ModelStatus{}
|
|
_ = json.NewDecoder(statusResp.Body).Decode(&target)
|
|
|
|
return target.LastMajorGeneration > 0
|
|
}
|
|
|
|
func getAuthHeader(guestMode bool) string {
|
|
if guestMode {
|
|
return "Guest-Auth"
|
|
}
|
|
|
|
return "Authorization"
|
|
}
|
|
|
|
func GetTrafficDumpUrl(analyzeDestination string, analyzeModel string) *url.URL {
|
|
strUrl := fmt.Sprintf("https://traffic.%s/dumpTrafficBulk/%s", analyzeDestination, analyzeModel)
|
|
postUrl, _ := url.Parse(strUrl)
|
|
return postUrl
|
|
}
|
|
|
|
type AnalyzeInformation struct {
|
|
IsAnalyzing bool
|
|
GuestMode bool
|
|
SentCount int
|
|
AnalyzedModel string
|
|
AnalyzeToken string
|
|
AnalyzeDestination string
|
|
}
|
|
|
|
func (info *AnalyzeInformation) Reset() {
|
|
info.IsAnalyzing = false
|
|
info.GuestMode = true
|
|
info.AnalyzedModel = ""
|
|
info.AnalyzeToken = ""
|
|
info.AnalyzeDestination = ""
|
|
info.SentCount = 0
|
|
}
|
|
|
|
var analyzeInformation = &AnalyzeInformation{}
|
|
|
|
func GetAnalyzeInfo() *shared.AnalyzeStatus {
|
|
return &shared.AnalyzeStatus{
|
|
IsAnalyzing: analyzeInformation.IsAnalyzing,
|
|
RemoteUrl: GetRemoteUrl(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzedModel, analyzeInformation.AnalyzeToken, analyzeInformation.GuestMode),
|
|
IsRemoteReady: CheckIfModelReady(analyzeInformation.AnalyzeDestination, analyzeInformation.AnalyzedModel, analyzeInformation.AnalyzeToken, analyzeInformation.GuestMode),
|
|
SentCount: analyzeInformation.SentCount,
|
|
}
|
|
}
|
|
|
|
func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
|
|
logger.Log.Infof("Sync entries - started")
|
|
|
|
var (
|
|
token, model string
|
|
guestMode bool
|
|
)
|
|
if syncEntriesConfig.Token == "" {
|
|
logger.Log.Infof("Sync entries - creating anonymous token. env %s", syncEntriesConfig.Env)
|
|
guestToken, err := createAnonymousToken(syncEntriesConfig.Env)
|
|
if err != nil {
|
|
return fmt.Errorf("failed creating anonymous token, err: %v", err)
|
|
}
|
|
|
|
token = guestToken.Token
|
|
model = guestToken.Model
|
|
guestMode = true
|
|
} else {
|
|
token = fmt.Sprintf("bearer %s", syncEntriesConfig.Token)
|
|
model = syncEntriesConfig.Workspace
|
|
guestMode = false
|
|
|
|
logger.Log.Infof("Sync entries - upserting model. env %s, model %s", syncEntriesConfig.Env, model)
|
|
if err := upsertModel(token, model, syncEntriesConfig.Env); err != nil {
|
|
return fmt.Errorf("failed upserting model, err: %v", err)
|
|
}
|
|
}
|
|
|
|
modelRegex, _ := regexp.Compile("[A-Za-z0-9][-A-Za-z0-9_.]*[A-Za-z0-9]+$")
|
|
if len(model) > 63 || !modelRegex.MatchString(model) {
|
|
return fmt.Errorf("invalid model name, model name: %s", model)
|
|
}
|
|
|
|
logger.Log.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v", token, model, guestMode)
|
|
go syncEntriesImpl(token, model, syncEntriesConfig.Env, syncEntriesConfig.UploadIntervalSec, guestMode)
|
|
|
|
return nil
|
|
}
|
|
|
|
func upsertModel(token string, model string, envPrefix string) error {
|
|
upsertModelUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/models/%s", envPrefix, model))
|
|
|
|
authHeader := getAuthHeader(false)
|
|
req := &http.Request{
|
|
Method: http.MethodPost,
|
|
URL: upsertModelUrl,
|
|
Header: map[string][]string{
|
|
authHeader: {token},
|
|
},
|
|
}
|
|
|
|
response, err := http.DefaultClient.Do(req)
|
|
if err != nil {
|
|
return fmt.Errorf("failed request to upsert model, err: %v", err)
|
|
}
|
|
|
|
// In case the model is not created (not 201) and doesn't exists (not 409)
|
|
if response.StatusCode != 201 && response.StatusCode != 409 {
|
|
return fmt.Errorf("failed request to upsert model, status code: %v", response.StatusCode)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func createAnonymousToken(envPrefix string) (*GuestToken, error) {
|
|
tokenUrl := fmt.Sprintf("https://trcc.%s/anonymous/token", envPrefix)
|
|
if strings.HasPrefix(envPrefix, "http") {
|
|
tokenUrl = fmt.Sprintf("%s/api/token", envPrefix)
|
|
}
|
|
token := &GuestToken{}
|
|
if err := getGuestToken(tokenUrl, token); err != nil {
|
|
logger.Log.Infof("Failed to get token, %s", err)
|
|
return nil, err
|
|
}
|
|
return token, nil
|
|
}
|
|
|
|
func getGuestToken(url string, target *GuestToken) error {
|
|
resp, err := http.Get(url)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer resp.Body.Close()
|
|
logger.Log.Infof("Got token from the server, starting to json decode... status code: %v", resp.StatusCode)
|
|
return json.NewDecoder(resp.Body).Decode(target)
|
|
}
|
|
|
|
func syncEntriesImpl(token string, model string, envPrefix string, uploadIntervalSec int, guestMode bool) {
|
|
analyzeInformation.IsAnalyzing = true
|
|
analyzeInformation.GuestMode = guestMode
|
|
analyzeInformation.AnalyzedModel = model
|
|
analyzeInformation.AnalyzeToken = token
|
|
analyzeInformation.AnalyzeDestination = envPrefix
|
|
analyzeInformation.SentCount = 0
|
|
|
|
// "http or grpc" filter indicates that we're only interested in HTTP and gRPC entries
|
|
query := "http or grpc"
|
|
|
|
logger.Log.Infof("Getting entries from the database")
|
|
|
|
BasenineReconnect:
|
|
var connection *basenine.Connection
|
|
var err error
|
|
connection, err = basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
|
|
if err != nil {
|
|
logger.Log.Errorf("Can't establish a new connection to Basenine server: %v", err)
|
|
connection.Close()
|
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
|
goto BasenineReconnect
|
|
}
|
|
|
|
data := make(chan []byte)
|
|
meta := make(chan []byte)
|
|
|
|
defer func() {
|
|
data <- []byte(basenine.CloseChannel)
|
|
meta <- []byte(basenine.CloseChannel)
|
|
connection.Close()
|
|
}()
|
|
|
|
lastTimeSynced := time.Time{}
|
|
|
|
batch := make([]har.Entry, 0)
|
|
|
|
handleDataChannel := func(wg *sync.WaitGroup, connection *basenine.Connection, data chan []byte) {
|
|
defer wg.Done()
|
|
for {
|
|
dataBytes := <-data
|
|
|
|
if string(dataBytes) == basenine.CloseChannel {
|
|
return
|
|
}
|
|
|
|
var dataMap map[string]interface{}
|
|
err = json.Unmarshal(dataBytes, &dataMap)
|
|
|
|
var entry tapApi.Entry
|
|
if err := json.Unmarshal([]byte(dataBytes), &entry); err != nil {
|
|
continue
|
|
}
|
|
harEntry, err := har.NewEntry(entry.Request, entry.Response, entry.StartTime, entry.ElapsedTime)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if entry.Source.Name != "" {
|
|
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-source", Value: entry.Source.Name})
|
|
}
|
|
if entry.Destination.Name != "" {
|
|
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-destination", Value: entry.Destination.Name})
|
|
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, entry.Destination.Name)
|
|
}
|
|
|
|
batch = append(batch, *harEntry)
|
|
|
|
now := time.Now()
|
|
if lastTimeSynced.Add(time.Duration(uploadIntervalSec) * time.Second).After(now) {
|
|
continue
|
|
}
|
|
lastTimeSynced = now
|
|
|
|
body, jMarshalErr := json.Marshal(batch)
|
|
batchSize := len(batch)
|
|
if jMarshalErr != nil {
|
|
analyzeInformation.Reset()
|
|
logger.Log.Infof("Stopping sync entries")
|
|
logger.Log.Fatal(jMarshalErr)
|
|
}
|
|
batch = make([]har.Entry, 0)
|
|
|
|
var in bytes.Buffer
|
|
w := zlib.NewWriter(&in)
|
|
_, _ = w.Write(body)
|
|
_ = w.Close()
|
|
reqBody := ioutil.NopCloser(bytes.NewReader(in.Bytes()))
|
|
|
|
authHeader := getAuthHeader(guestMode)
|
|
req := &http.Request{
|
|
Method: http.MethodPost,
|
|
URL: GetTrafficDumpUrl(envPrefix, model),
|
|
Header: map[string][]string{
|
|
"Content-Encoding": {"deflate"},
|
|
"Content-Type": {"application/octet-stream"},
|
|
authHeader: {token},
|
|
},
|
|
Body: reqBody,
|
|
}
|
|
|
|
if _, postErr := http.DefaultClient.Do(req); postErr != nil {
|
|
analyzeInformation.Reset()
|
|
logger.Log.Info("Stopping sync entries")
|
|
logger.Log.Fatal(postErr)
|
|
}
|
|
analyzeInformation.SentCount += batchSize
|
|
|
|
if analyzeInformation.SentCount%SentCountLogInterval == 0 {
|
|
logger.Log.Infof("Uploaded %v entries until now", analyzeInformation.SentCount)
|
|
}
|
|
}
|
|
}
|
|
|
|
handleMetaChannel := func(wg *sync.WaitGroup, connection *basenine.Connection, meta chan []byte) {
|
|
defer wg.Done()
|
|
for {
|
|
metaBytes := <-meta
|
|
|
|
if string(metaBytes) == basenine.CloseChannel {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
go handleDataChannel(&wg, connection, data)
|
|
go handleMetaChannel(&wg, connection, meta)
|
|
wg.Add(2)
|
|
|
|
if err = connection.Query("latest", query, data, meta); err != nil {
|
|
logger.Log.Errorf("Query mode call failed: %v", err)
|
|
connection.Close()
|
|
time.Sleep(shared.BasenineReconnectInterval * time.Second)
|
|
goto BasenineReconnect
|
|
}
|
|
|
|
wg.Wait()
|
|
}
|
|
|
|
func UpdateAnalyzeStatus(callback func(data []byte)) {
|
|
for {
|
|
if !analyzeInformation.IsAnalyzing {
|
|
time.Sleep(AnalyzeCheckSleepTime)
|
|
continue
|
|
}
|
|
analyzeStatus := GetAnalyzeInfo()
|
|
socketMessage := shared.CreateWebSocketMessageTypeAnalyzeStatus(*analyzeStatus)
|
|
|
|
jsonMessage, _ := json.Marshal(socketMessage)
|
|
callback(jsonMessage)
|
|
time.Sleep(AnalyzeCheckSleepTime)
|
|
}
|
|
}
|