mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-17 16:08:12 +00:00
renamed upload entries to sync entries (#330)
This commit is contained in:
parent
14b616a856
commit
8b8c4609ce
@ -64,15 +64,15 @@ func GetEntries(c *gin.Context) {
|
|||||||
c.JSON(http.StatusOK, baseEntries)
|
c.JSON(http.StatusOK, baseEntries)
|
||||||
}
|
}
|
||||||
|
|
||||||
func UploadEntries(c *gin.Context) {
|
func SyncEntries(c *gin.Context) {
|
||||||
rlog.Infof("Upload entries - started\n")
|
rlog.Infof("Sync entries - started\n")
|
||||||
|
|
||||||
uploadParams := &models.UploadEntriesRequestQuery{}
|
syncParams := &models.SyncEntriesRequestQuery{}
|
||||||
if err := c.BindQuery(uploadParams); err != nil {
|
if err := c.BindQuery(syncParams); err != nil {
|
||||||
c.JSON(http.StatusBadRequest, err)
|
c.JSON(http.StatusBadRequest, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := validation.Validate(uploadParams); err != nil {
|
if err := validation.Validate(syncParams); err != nil {
|
||||||
c.JSON(http.StatusBadRequest, err)
|
c.JSON(http.StatusBadRequest, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -81,14 +81,14 @@ func UploadEntries(c *gin.Context) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
rlog.Infof("Upload entries - creating token. dest %s\n", uploadParams.Dest)
|
rlog.Infof("Sync entries - creating token. env %s\n", syncParams.Env)
|
||||||
token, err := up9.CreateAnonymousToken(uploadParams.Dest)
|
token, err := up9.CreateAnonymousToken(syncParams.Env)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.String(http.StatusServiceUnavailable, "Cannot analyze, mizu is already analyzing")
|
c.String(http.StatusServiceUnavailable, "Cannot analyze, mizu is already analyzing")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rlog.Infof("Upload entries - uploading. token: %s model: %s\n", token.Token, token.Model)
|
rlog.Infof("Sync entries - syncing. token: %s model: %s\n", token.Token, token.Model)
|
||||||
go up9.UploadEntriesImpl(token.Token, token.Model, uploadParams.Dest, uploadParams.SleepIntervalSec)
|
go up9.SyncEntriesImpl(token.Token, token.Model, syncParams.Env, syncParams.SleepIntervalSec)
|
||||||
c.String(http.StatusOK, "OK")
|
c.String(http.StatusOK, "OK")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -22,8 +22,8 @@ type EntriesFilter struct {
|
|||||||
Timestamp int64 `form:"timestamp" validate:"required,min=1"`
|
Timestamp int64 `form:"timestamp" validate:"required,min=1"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type UploadEntriesRequestQuery struct {
|
type SyncEntriesRequestQuery struct {
|
||||||
Dest string `form:"dest"`
|
Env string `form:"env"`
|
||||||
SleepIntervalSec int `form:"interval"`
|
SleepIntervalSec int `form:"interval"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -12,7 +12,7 @@ func EntriesRoutes(ginApp *gin.Engine) {
|
|||||||
routeGroup.GET("/entries", controllers.GetEntries) // get entries (base/thin entries)
|
routeGroup.GET("/entries", controllers.GetEntries) // get entries (base/thin entries)
|
||||||
routeGroup.GET("/entries/:entryId", controllers.GetEntry) // get single (full) entry
|
routeGroup.GET("/entries/:entryId", controllers.GetEntry) // get single (full) entry
|
||||||
routeGroup.GET("/exportEntries", controllers.GetFullEntries)
|
routeGroup.GET("/exportEntries", controllers.GetFullEntries)
|
||||||
routeGroup.GET("/uploadEntries", controllers.UploadEntries)
|
routeGroup.GET("/syncEntries", controllers.SyncEntries)
|
||||||
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
|
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
|
||||||
|
|
||||||
routeGroup.GET("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
|
routeGroup.GET("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
|
||||||
|
@ -117,7 +117,7 @@ func GetAnalyzeInfo() *shared.AnalyzeStatus {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func UploadEntriesImpl(token string, model string, envPrefix string, sleepIntervalSec int) {
|
func SyncEntriesImpl(token string, model string, envPrefix string, sleepIntervalSec int) {
|
||||||
analyzeInformation.IsAnalyzing = true
|
analyzeInformation.IsAnalyzing = true
|
||||||
analyzeInformation.AnalyzedModel = model
|
analyzeInformation.AnalyzedModel = model
|
||||||
analyzeInformation.AnalyzeToken = token
|
analyzeInformation.AnalyzeToken = token
|
||||||
@ -160,7 +160,7 @@ func UploadEntriesImpl(token string, model string, envPrefix string, sleepInterv
|
|||||||
body, jMarshalErr := json.Marshal(result)
|
body, jMarshalErr := json.Marshal(result)
|
||||||
if jMarshalErr != nil {
|
if jMarshalErr != nil {
|
||||||
analyzeInformation.Reset()
|
analyzeInformation.Reset()
|
||||||
rlog.Infof("Stopping analyzing")
|
rlog.Infof("Stopping sync entries")
|
||||||
log.Fatal(jMarshalErr)
|
log.Fatal(jMarshalErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -183,7 +183,7 @@ func UploadEntriesImpl(token string, model string, envPrefix string, sleepInterv
|
|||||||
|
|
||||||
if _, postErr := http.DefaultClient.Do(req); postErr != nil {
|
if _, postErr := http.DefaultClient.Do(req); postErr != nil {
|
||||||
analyzeInformation.Reset()
|
analyzeInformation.Reset()
|
||||||
rlog.Info("Stopping analyzing")
|
rlog.Info("Stopping sync entries")
|
||||||
log.Fatal(postErr)
|
log.Fatal(postErr)
|
||||||
}
|
}
|
||||||
analyzeInformation.SentCount += len(entriesArray)
|
analyzeInformation.SentCount += len(entriesArray)
|
||||||
|
@ -82,23 +82,23 @@ func (provider *apiServerProvider) ReportTappedPods(pods []core.Pod) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (provider *apiServerProvider) RequestAnalysis(analysisDestination string, sleepIntervalSec int) error {
|
func (provider *apiServerProvider) RequestSyncEntries(analysisDestination string, sleepIntervalSec int) error {
|
||||||
if !provider.isReady {
|
if !provider.isReady {
|
||||||
return fmt.Errorf("trying to reach api server when not initialized yet")
|
return fmt.Errorf("trying to reach api server when not initialized yet")
|
||||||
}
|
}
|
||||||
urlPath := fmt.Sprintf("%s/api/uploadEntries?dest=%s&interval=%v", provider.url, url.QueryEscape(analysisDestination), sleepIntervalSec)
|
urlPath := fmt.Sprintf("%s/api/syncEntries?env=%s&interval=%v", provider.url, url.QueryEscape(analysisDestination), sleepIntervalSec)
|
||||||
u, parseErr := url.ParseRequestURI(urlPath)
|
syncEntriesUrl, parseErr := url.ParseRequestURI(urlPath)
|
||||||
if parseErr != nil {
|
if parseErr != nil {
|
||||||
logger.Log.Fatal("Failed parsing the URL (consider changing the analysis dest URL), err: %v", parseErr)
|
logger.Log.Fatal("Failed parsing the URL (consider changing the env name), err: %v", parseErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Log.Debugf("Analysis url %v", u.String())
|
logger.Log.Debugf("Sync entries url %v", syncEntriesUrl.String())
|
||||||
if response, requestErr := http.Get(u.String()); requestErr != nil {
|
if response, requestErr := http.Get(syncEntriesUrl.String()); requestErr != nil {
|
||||||
return fmt.Errorf("failed to notify agent for analysis, err: %w", requestErr)
|
return fmt.Errorf("failed to notify api server for sync entries, err: %w", requestErr)
|
||||||
} else if response.StatusCode != 200 {
|
} else if response.StatusCode != 200 {
|
||||||
return fmt.Errorf("failed to notify agent for analysis, status code: %v", response.StatusCode)
|
return fmt.Errorf("failed to notify api server for sync entries, status code: %v", response.StatusCode)
|
||||||
} else {
|
} else {
|
||||||
logger.Log.Infof(uiUtils.Purple, "Traffic is uploading to UP9 for further analysis")
|
logger.Log.Infof(uiUtils.Purple, "Entries are syncing to UP9 for further analysis")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -579,7 +579,7 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
|||||||
|
|
||||||
logger.Log.Infof("Mizu is available at %s\n", url)
|
logger.Log.Infof("Mizu is available at %s\n", url)
|
||||||
uiUtils.OpenBrowser(url)
|
uiUtils.OpenBrowser(url)
|
||||||
requestForAnalysisIfNeeded()
|
requestForSyncEntriesIfNeeded()
|
||||||
if err := apiserver.Provider.ReportTappedPods(state.currentlyTappedPods); err != nil {
|
if err := apiserver.Provider.ReportTappedPods(state.currentlyTappedPods); err != nil {
|
||||||
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
logger.Log.Debugf("[Error] failed update tapped pods %v", err)
|
||||||
}
|
}
|
||||||
@ -671,12 +671,12 @@ func watchTapperPod(ctx context.Context, kubernetesProvider *kubernetes.Provider
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func requestForAnalysisIfNeeded() {
|
func requestForSyncEntriesIfNeeded() {
|
||||||
if !config.Config.Tap.Analysis {
|
if !config.Config.Tap.Analysis {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if err := apiserver.Provider.RequestAnalysis(config.Config.Tap.AnalysisDestination, config.Config.Tap.SleepIntervalSec); err != nil {
|
if err := apiserver.Provider.RequestSyncEntries(config.Config.Tap.AnalysisDestination, config.Config.Tap.UploadIntervalSec); err != nil {
|
||||||
logger.Log.Debugf("[Error] failed requesting for analysis %v", err)
|
logger.Log.Debugf("[Error] failed requesting for sync entries, err: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -20,21 +20,21 @@ const (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type TapConfig struct {
|
type TapConfig struct {
|
||||||
AnalysisDestination string `yaml:"dest" default:"up9.app"`
|
AnalysisDestination string `yaml:"dest" default:"up9.app"`
|
||||||
SleepIntervalSec int `yaml:"upload-interval" default:"10"`
|
UploadIntervalSec int `yaml:"upload-interval" default:"10"`
|
||||||
PodRegexStr string `yaml:"regex" default:".*"`
|
PodRegexStr string `yaml:"regex" default:".*"`
|
||||||
GuiPort uint16 `yaml:"gui-port" default:"8899"`
|
GuiPort uint16 `yaml:"gui-port" default:"8899"`
|
||||||
Namespaces []string `yaml:"namespaces"`
|
Namespaces []string `yaml:"namespaces"`
|
||||||
Analysis bool `yaml:"analysis" default:"false"`
|
Analysis bool `yaml:"analysis" default:"false"`
|
||||||
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
|
AllNamespaces bool `yaml:"all-namespaces" default:"false"`
|
||||||
PlainTextFilterRegexes []string `yaml:"regex-masking"`
|
PlainTextFilterRegexes []string `yaml:"regex-masking"`
|
||||||
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
|
IgnoredUserAgents []string `yaml:"ignored-user-agents"`
|
||||||
DisableRedaction bool `yaml:"no-redact" default:"false"`
|
DisableRedaction bool `yaml:"no-redact" default:"false"`
|
||||||
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
|
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
|
||||||
DryRun bool `yaml:"dry-run" default:"false"`
|
DryRun bool `yaml:"dry-run" default:"false"`
|
||||||
EnforcePolicyFile string `yaml:"traffic-validation-file"`
|
EnforcePolicyFile string `yaml:"traffic-validation-file"`
|
||||||
ApiServerResources Resources `yaml:"api-server-resources"`
|
ApiServerResources Resources `yaml:"api-server-resources"`
|
||||||
TapperResources Resources `yaml:"tapper-resources"`
|
TapperResources Resources `yaml:"tapper-resources"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type Resources struct {
|
type Resources struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user