diff --git a/agent/main.go b/agent/main.go index 205f98552..5a771bae4 100644 --- a/agent/main.go +++ b/agent/main.go @@ -5,48 +5,27 @@ import ( "errors" "flag" "fmt" - "io/ioutil" "net/http" "os" "os/signal" - "sort" - "strconv" - "strings" "syscall" "time" - "github.com/up9inc/mizu/agent/pkg/middlewares" "github.com/up9inc/mizu/agent/pkg/models" - "github.com/up9inc/mizu/agent/pkg/oas" - "github.com/up9inc/mizu/agent/pkg/routes" - "github.com/up9inc/mizu/agent/pkg/servicemap" - "github.com/up9inc/mizu/agent/pkg/up9" "github.com/up9inc/mizu/agent/pkg/utils" - "github.com/up9inc/mizu/agent/pkg/elastic" - - "github.com/up9inc/mizu/agent/pkg/controllers" - "github.com/up9inc/mizu/agent/pkg/api" + "github.com/up9inc/mizu/agent/pkg/app" "github.com/up9inc/mizu/agent/pkg/config" v1 "k8s.io/api/core/v1" - "github.com/antelman107/net-wait-go/wait" - "github.com/gin-contrib/static" - "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/op/go-logging" - basenine "github.com/up9inc/basenine/client/go" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/tap" tapApi "github.com/up9inc/mizu/tap/api" - - amqpExt "github.com/up9inc/mizu/tap/extensions/amqp" - httpExt "github.com/up9inc/mizu/tap/extensions/http" - kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka" - redisExt "github.com/up9inc/mizu/tap/extensions/redis" ) var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API") @@ -57,11 +36,6 @@ var namespace = flag.String("namespace", "", "Resolve IPs if they belong to reso var harsReaderMode = flag.Bool("hars-read", false, "Run in hars-read mode") var harsDir = flag.String("hars-dir", "", "Directory to read hars from") -var extensions []*tapApi.Extension // global -var extensionsMap map[string]*tapApi.Extension // global - -var startTime int64 - const ( socketConnectionRetries = 30 socketConnectionRetryDelay = time.Second * 2 @@ -75,78 +49,20 @@ func main() { if err := config.LoadConfig(); err != nil { logger.Log.Fatalf("Error loading config file %v", err) } - loadExtensions() + app.LoadExtensions() if !*tapperMode && !*apiServerMode && !*standaloneMode && !*harsReaderMode { panic("One of the flags --tap, --api or --standalone or --hars-read must be provided") } if *standaloneMode { - api.StartResolving(*namespace) - - outputItemsChannel := make(chan *tapApi.OutputChannelItem) - filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) - - filteringOptions := getTrafficFilteringOptions() - hostMode := os.Getenv(shared.HostModeEnvVar) == "1" - tapOpts := &tap.TapOpts{HostMode: hostMode} - tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions) - - go filterItems(outputItemsChannel, filteredOutputItemsChannel) - go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap) - - hostApi(nil) + runInStandaloneMode() } else if *tapperMode { - logger.Log.Infof("Starting tapper, websocket address: %s", *apiServerAddress) - if *apiServerAddress == "" { - panic("API server address must be provided with --api-server-address when using --tap") - } - - hostMode := os.Getenv(shared.HostModeEnvVar) == "1" - tapOpts := &tap.TapOpts{HostMode: hostMode} - tapTargets := getTapTargets() - if tapTargets != nil { - tapOpts.FilterAuthorities = tapTargets - logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities) - } - - filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) - - filteringOptions := getTrafficFilteringOptions() - tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions) - socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay) - if err != nil { - panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err)) - } - logger.Log.Infof("Connected successfully to websocket %s", *apiServerAddress) - - go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel) + runInTapperMode() } else if *apiServerMode { - configureBasenineServer(shared.BasenineHost, shared.BaseninePort) - startTime = time.Now().UnixNano() / int64(time.Millisecond) - api.StartResolving(*namespace) - - outputItemsChannel := make(chan *tapApi.OutputChannelItem) - filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) - enableExpFeatureIfNeeded() - go filterItems(outputItemsChannel, filteredOutputItemsChannel) - go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap) - - syncEntriesConfig := getSyncEntriesConfig() - if syncEntriesConfig != nil { - if err := up9.SyncEntries(syncEntriesConfig); err != nil { - logger.Log.Error("Error syncing entries, err: %v", err) - } - } - - hostApi(outputItemsChannel) + utils.StartServer(app.RunInApiServerMode(*namespace)) } else if *harsReaderMode { - outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000) - filteredHarChannel := make(chan *tapApi.OutputChannelItem) - - go filterItems(outputItemsChannel, filteredHarChannel) - go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap) - hostApi(nil) + runInHarReaderMode() } signalChan := make(chan os.Signal, 1) @@ -156,167 +72,59 @@ func main() { logger.Log.Info("Exiting") } -func enableExpFeatureIfNeeded() { - if config.Config.OAS { - oas.GetOasGeneratorInstance().Start() - } - if config.Config.ServiceMap { - servicemap.GetInstance().SetConfig(config.Config) - } - elastic.GetInstance().Configure(config.Config.Elastic) -} - -func configureBasenineServer(host string, port string) { - if !wait.New( - wait.WithProto("tcp"), - wait.WithWait(200*time.Millisecond), - wait.WithBreak(50*time.Millisecond), - wait.WithDeadline(5*time.Second), - wait.WithDebug(config.Config.LogLevel == logging.DEBUG), - ).Do([]string{fmt.Sprintf("%s:%s", host, port)}) { - logger.Log.Panicf("Basenine is not available!") +func runInTapperMode() { + logger.Log.Infof("Starting tapper, websocket address: %s", *apiServerAddress) + if *apiServerAddress == "" { + panic("API server address must be provided with --api-server-address when using --tap") } - // Limit the database size to default 200MB - err := basenine.Limit(host, port, config.Config.MaxDBSizeBytes) + hostMode := os.Getenv(shared.HostModeEnvVar) == "1" + tapOpts := &tap.TapOpts{HostMode: hostMode} + tapTargets := getTapTargets() + if tapTargets != nil { + tapOpts.FilterAuthorities = tapTargets + logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities) + } + + filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) + + filteringOptions := getTrafficFilteringOptions() + tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, app.Extensions, filteringOptions) + socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay) if err != nil { - logger.Log.Panicf("Error while limiting database size: %v", err) + panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err)) } + logger.Log.Infof("Connected successfully to websocket %s", *apiServerAddress) - // Define the macros - for _, extension := range extensions { - macros := extension.Dissector.Macros() - for macro, expanded := range macros { - err = basenine.Macro(host, port, macro, expanded) - if err != nil { - logger.Log.Panicf("Error while adding a macro: %v", err) - } - } - } + go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel) } -func loadExtensions() { - extensions = make([]*tapApi.Extension, 4) - extensionsMap = make(map[string]*tapApi.Extension) +func runInStandaloneMode() { + api.StartResolving(*namespace) - extensionAmqp := &tapApi.Extension{} - dissectorAmqp := amqpExt.NewDissector() - dissectorAmqp.Register(extensionAmqp) - extensionAmqp.Dissector = dissectorAmqp - extensions[0] = extensionAmqp - extensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp + outputItemsChannel := make(chan *tapApi.OutputChannelItem) + filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) - extensionHttp := &tapApi.Extension{} - dissectorHttp := httpExt.NewDissector() - dissectorHttp.Register(extensionHttp) - extensionHttp.Dissector = dissectorHttp - extensions[1] = extensionHttp - extensionsMap[extensionHttp.Protocol.Name] = extensionHttp + filteringOptions := getTrafficFilteringOptions() + hostMode := os.Getenv(shared.HostModeEnvVar) == "1" + tapOpts := &tap.TapOpts{HostMode: hostMode} + tap.StartPassiveTapper(tapOpts, outputItemsChannel, app.Extensions, filteringOptions) - extensionKafka := &tapApi.Extension{} - dissectorKafka := kafkaExt.NewDissector() - dissectorKafka.Register(extensionKafka) - extensionKafka.Dissector = dissectorKafka - extensions[2] = extensionKafka - extensionsMap[extensionKafka.Protocol.Name] = extensionKafka + go app.FilterItems(outputItemsChannel, filteredOutputItemsChannel) + go api.StartReadingEntries(filteredOutputItemsChannel, nil, app.ExtensionsMap) - extensionRedis := &tapApi.Extension{} - dissectorRedis := redisExt.NewDissector() - dissectorRedis.Register(extensionRedis) - extensionRedis.Dissector = dissectorRedis - extensions[3] = extensionRedis - extensionsMap[extensionRedis.Protocol.Name] = extensionRedis - - sort.Slice(extensions, func(i, j int) bool { - return extensions[i].Protocol.Priority < extensions[j].Protocol.Priority - }) - - for _, extension := range extensions { - logger.Log.Infof("Extension Properties: %+v", extension) - } - - controllers.InitExtensionsMap(extensionsMap) + ginApp := app.HostApi(nil) + utils.StartServer(ginApp) } -func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) { - app := gin.Default() +func runInHarReaderMode() { + outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000) + filteredHarChannel := make(chan *tapApi.OutputChannelItem) - app.GET("/echo", func(c *gin.Context) { - c.String(http.StatusOK, "Here is Mizu agent") - }) - - eventHandlers := api.RoutesEventHandlers{ - SocketOutChannel: socketHarOutputChannel, - } - - app.Use(DisableRootStaticCache()) - - var staticFolder string - if config.Config.StandaloneMode { - staticFolder = "./site-standalone" - } else { - staticFolder = "./site" - } - - indexStaticFile := staticFolder + "/index.html" - if err := setUIFlags(indexStaticFile); err != nil { - logger.Log.Errorf("Error setting ui flags, err: %v", err) - } - - app.Use(static.ServeRoot("/", staticFolder)) - app.NoRoute(func(c *gin.Context) { - c.File(indexStaticFile) - }) - - app.Use(middlewares.CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before - - api.WebSocketRoutes(app, &eventHandlers, startTime) - - if config.Config.StandaloneMode { - routes.ConfigRoutes(app) - routes.UserRoutes(app) - routes.InstallRoutes(app) - } - if config.Config.OAS { - routes.OASRoutes(app) - } - if config.Config.ServiceMap { - routes.ServiceMapRoutes(app) - } - - routes.QueryRoutes(app) - routes.EntriesRoutes(app) - routes.MetadataRoutes(app) - routes.StatusRoutes(app) - utils.StartServer(app) -} - -func DisableRootStaticCache() gin.HandlerFunc { - return func(c *gin.Context) { - if c.Request.RequestURI == "/" { - // Disable cache only for the main static route - c.Writer.Header().Set("Cache-Control", "no-store") - } - - c.Next() - } -} - -func setUIFlags(uiIndexPath string) error { - read, err := ioutil.ReadFile(uiIndexPath) - if err != nil { - return err - } - - replacedContent := strings.Replace(string(read), "__IS_OAS_ENABLED__", strconv.FormatBool(config.Config.OAS), 1) - replacedContent = strings.Replace(replacedContent, "__IS_SERVICE_MAP_ENABLED__", strconv.FormatBool(config.Config.ServiceMap), 1) - - err = ioutil.WriteFile(uiIndexPath, []byte(replacedContent), 0) - if err != nil { - return err - } - - return nil + go app.FilterItems(outputItemsChannel, filteredHarChannel) + go api.StartReadingEntries(filteredHarChannel, harsDir, app.ExtensionsMap) + ginApp := app.HostApi(nil) + utils.StartServer(ginApp) } func parseEnvVar(env string) map[string][]v1.Pod { @@ -357,16 +165,6 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions { return &filteringOptions } -func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) { - for message := range inChannel { - if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) { - continue - } - - outChannel <- message - } -} - func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) { if connection == nil { panic("Websocket connection is nil") @@ -402,21 +200,6 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha } } -func getSyncEntriesConfig() *shared.SyncEntriesConfig { - syncEntriesConfigJson := os.Getenv(shared.SyncEntriesConfigEnvVar) - if syncEntriesConfigJson == "" { - return nil - } - - var syncEntriesConfig = &shared.SyncEntriesConfig{} - err := json.Unmarshal([]byte(syncEntriesConfigJson), syncEntriesConfig) - if err != nil { - panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.SyncEntriesConfig struct, err: %v", shared.SyncEntriesConfigEnvVar, syncEntriesConfigJson, err)) - } - - return syncEntriesConfig -} - func determineLogLevel() (logLevel logging.Level) { logLevel, err := logging.LogLevel(os.Getenv(shared.LogLevelEnvVar)) if err != nil { diff --git a/agent/pkg/api/socket_routes.go b/agent/pkg/api/socket_routes.go index a58c7bf1f..871faf4ef 100644 --- a/agent/pkg/api/socket_routes.go +++ b/agent/pkg/api/socket_routes.go @@ -30,14 +30,18 @@ type SocketConnection struct { isTapper bool } -var websocketUpgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, -} +var ( + websocketUpgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + } -var websocketIdsLock = sync.Mutex{} -var connectedWebsockets map[int]*SocketConnection -var connectedWebsocketIdCounter = 0 + websocketIdsLock = sync.Mutex{} + connectedWebsockets map[int]*SocketConnection + connectedWebsocketIdCounter = 0 + SocketGetBrowserHandler gin.HandlerFunc + SocketGetTapperHandler gin.HandlerFunc +) func init() { websocketUpgrader.CheckOrigin = func(r *http.Request) bool { return true } // like cors for web socket @@ -45,12 +49,20 @@ func init() { } func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int64) { - app.GET("/ws", func(c *gin.Context) { + SocketGetBrowserHandler = func(c *gin.Context) { websocketHandler(c.Writer, c.Request, eventHandlers, false, startTime) + } + + SocketGetTapperHandler = func(c *gin.Context) { + websocketHandler(c.Writer, c.Request, eventHandlers, true, startTime) + } + + app.GET("/ws", func(c *gin.Context) { + SocketGetBrowserHandler(c) }) app.GET("/wsTapper", func(c *gin.Context) { // TODO: add m2m authentication to this route - websocketHandler(c.Writer, c.Request, eventHandlers, true, startTime) + SocketGetTapperHandler(c) }) } diff --git a/agent/pkg/app/extensions.go b/agent/pkg/app/extensions.go new file mode 100644 index 000000000..487493179 --- /dev/null +++ b/agent/pkg/app/extensions.go @@ -0,0 +1,62 @@ +package app + +import ( + "sort" + + "github.com/up9inc/mizu/agent/pkg/controllers" + "github.com/up9inc/mizu/shared/logger" + tapApi "github.com/up9inc/mizu/tap/api" + + amqpExt "github.com/up9inc/mizu/tap/extensions/amqp" + httpExt "github.com/up9inc/mizu/tap/extensions/http" + kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka" + redisExt "github.com/up9inc/mizu/tap/extensions/redis" +) + +var ( + Extensions []*tapApi.Extension // global + ExtensionsMap map[string]*tapApi.Extension // global +) + +func LoadExtensions() { + Extensions = make([]*tapApi.Extension, 4) + ExtensionsMap = make(map[string]*tapApi.Extension) + + extensionAmqp := &tapApi.Extension{} + dissectorAmqp := amqpExt.NewDissector() + dissectorAmqp.Register(extensionAmqp) + extensionAmqp.Dissector = dissectorAmqp + Extensions[0] = extensionAmqp + ExtensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp + + extensionHttp := &tapApi.Extension{} + dissectorHttp := httpExt.NewDissector() + dissectorHttp.Register(extensionHttp) + extensionHttp.Dissector = dissectorHttp + Extensions[1] = extensionHttp + ExtensionsMap[extensionHttp.Protocol.Name] = extensionHttp + + extensionKafka := &tapApi.Extension{} + dissectorKafka := kafkaExt.NewDissector() + dissectorKafka.Register(extensionKafka) + extensionKafka.Dissector = dissectorKafka + Extensions[2] = extensionKafka + ExtensionsMap[extensionKafka.Protocol.Name] = extensionKafka + + extensionRedis := &tapApi.Extension{} + dissectorRedis := redisExt.NewDissector() + dissectorRedis.Register(extensionRedis) + extensionRedis.Dissector = dissectorRedis + Extensions[3] = extensionRedis + ExtensionsMap[extensionRedis.Protocol.Name] = extensionRedis + + sort.Slice(Extensions, func(i, j int) bool { + return Extensions[i].Protocol.Priority < Extensions[j].Protocol.Priority + }) + + for _, extension := range Extensions { + logger.Log.Infof("Extension Properties: %+v", extension) + } + + controllers.InitExtensionsMap(ExtensionsMap) +} diff --git a/agent/pkg/app/server.go b/agent/pkg/app/server.go new file mode 100644 index 000000000..ef4360e45 --- /dev/null +++ b/agent/pkg/app/server.go @@ -0,0 +1,210 @@ +package app + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "strconv" + "strings" + "time" + + "github.com/antelman107/net-wait-go/wait" + "github.com/gin-contrib/static" + "github.com/gin-gonic/gin" + "github.com/op/go-logging" + basenine "github.com/up9inc/basenine/client/go" + "github.com/up9inc/mizu/agent/pkg/api" + "github.com/up9inc/mizu/agent/pkg/config" + "github.com/up9inc/mizu/agent/pkg/elastic" + "github.com/up9inc/mizu/agent/pkg/middlewares" + "github.com/up9inc/mizu/agent/pkg/oas" + "github.com/up9inc/mizu/agent/pkg/routes" + "github.com/up9inc/mizu/agent/pkg/servicemap" + "github.com/up9inc/mizu/agent/pkg/up9" + "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/shared/logger" + tapApi "github.com/up9inc/mizu/tap/api" +) + +var ( + ConfigRoutes *gin.RouterGroup + UserRoutes *gin.RouterGroup + InstallRoutes *gin.RouterGroup + OASRoutes *gin.RouterGroup + ServiceMapRoutes *gin.RouterGroup + QueryRoutes *gin.RouterGroup + EntriesRoutes *gin.RouterGroup + MetadataRoutes *gin.RouterGroup + StatusRoutes *gin.RouterGroup + + startTime int64 +) + +func HostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engine { + app := gin.Default() + + app.GET("/echo", func(c *gin.Context) { + c.String(http.StatusOK, "Here is Mizu agent") + }) + + eventHandlers := api.RoutesEventHandlers{ + SocketOutChannel: socketHarOutputChannel, + } + + app.Use(disableRootStaticCache()) + + var staticFolder string + if config.Config.StandaloneMode { + staticFolder = "./site-standalone" + } else { + staticFolder = "./site" + } + + indexStaticFile := staticFolder + "/index.html" + if err := setUIFlags(indexStaticFile); err != nil { + logger.Log.Errorf("Error setting ui flags, err: %v", err) + } + + app.Use(static.ServeRoot("/", staticFolder)) + app.NoRoute(func(c *gin.Context) { + c.File(indexStaticFile) + }) + + app.Use(middlewares.CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before + + api.WebSocketRoutes(app, &eventHandlers, startTime) + + if config.Config.StandaloneMode { + ConfigRoutes = routes.ConfigRoutes(app) + UserRoutes = routes.UserRoutes(app) + InstallRoutes = routes.InstallRoutes(app) + } + if config.Config.OAS { + OASRoutes = routes.OASRoutes(app) + } + if config.Config.ServiceMap { + ServiceMapRoutes = routes.ServiceMapRoutes(app) + } + + QueryRoutes = routes.QueryRoutes(app) + EntriesRoutes = routes.EntriesRoutes(app) + MetadataRoutes = routes.MetadataRoutes(app) + StatusRoutes = routes.StatusRoutes(app) + + return app +} + +func RunInApiServerMode(namespace string) *gin.Engine { + configureBasenineServer(shared.BasenineHost, shared.BaseninePort) + startTime = time.Now().UnixNano() / int64(time.Millisecond) + api.StartResolving(namespace) + + outputItemsChannel := make(chan *tapApi.OutputChannelItem) + filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) + enableExpFeatureIfNeeded() + go FilterItems(outputItemsChannel, filteredOutputItemsChannel) + go api.StartReadingEntries(filteredOutputItemsChannel, nil, ExtensionsMap) + + syncEntriesConfig := getSyncEntriesConfig() + if syncEntriesConfig != nil { + if err := up9.SyncEntries(syncEntriesConfig); err != nil { + logger.Log.Error("Error syncing entries, err: %v", err) + } + } + + return HostApi(outputItemsChannel) +} + +func configureBasenineServer(host string, port string) { + if !wait.New( + wait.WithProto("tcp"), + wait.WithWait(200*time.Millisecond), + wait.WithBreak(50*time.Millisecond), + wait.WithDeadline(5*time.Second), + wait.WithDebug(config.Config.LogLevel == logging.DEBUG), + ).Do([]string{fmt.Sprintf("%s:%s", host, port)}) { + logger.Log.Panicf("Basenine is not available!") + } + + // Limit the database size to default 200MB + err := basenine.Limit(host, port, config.Config.MaxDBSizeBytes) + if err != nil { + logger.Log.Panicf("Error while limiting database size: %v", err) + } + + // Define the macros + for _, extension := range Extensions { + macros := extension.Dissector.Macros() + for macro, expanded := range macros { + err = basenine.Macro(host, port, macro, expanded) + if err != nil { + logger.Log.Panicf("Error while adding a macro: %v", err) + } + } + } +} + +func getSyncEntriesConfig() *shared.SyncEntriesConfig { + syncEntriesConfigJson := os.Getenv(shared.SyncEntriesConfigEnvVar) + if syncEntriesConfigJson == "" { + return nil + } + + var syncEntriesConfig = &shared.SyncEntriesConfig{} + err := json.Unmarshal([]byte(syncEntriesConfigJson), syncEntriesConfig) + if err != nil { + panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.SyncEntriesConfig struct, err: %v", shared.SyncEntriesConfigEnvVar, syncEntriesConfigJson, err)) + } + + return syncEntriesConfig +} + +func FilterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) { + for message := range inChannel { + if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) { + continue + } + + outChannel <- message + } +} + +func enableExpFeatureIfNeeded() { + if config.Config.OAS { + oas.GetOasGeneratorInstance().Start() + } + if config.Config.ServiceMap { + servicemap.GetInstance().SetConfig(config.Config) + } + elastic.GetInstance().Configure(config.Config.Elastic) +} + +func disableRootStaticCache() gin.HandlerFunc { + return func(c *gin.Context) { + if c.Request.RequestURI == "/" { + // Disable cache only for the main static route + c.Writer.Header().Set("Cache-Control", "no-store") + } + + c.Next() + } +} + +func setUIFlags(uiIndexPath string) error { + read, err := ioutil.ReadFile(uiIndexPath) + if err != nil { + return err + } + + replacedContent := strings.Replace(string(read), "__IS_OAS_ENABLED__", strconv.FormatBool(config.Config.OAS), 1) + replacedContent = strings.Replace(replacedContent, "__IS_SERVICE_MAP_ENABLED__", strconv.FormatBool(config.Config.ServiceMap), 1) + + err = ioutil.WriteFile(uiIndexPath, []byte(replacedContent), 0) + if err != nil { + return err + } + + return nil +} diff --git a/agent/pkg/routes/config_routes.go b/agent/pkg/routes/config_routes.go index fe3cf6426..f44e02cde 100644 --- a/agent/pkg/routes/config_routes.go +++ b/agent/pkg/routes/config_routes.go @@ -7,10 +7,17 @@ import ( "github.com/gin-gonic/gin" ) -func ConfigRoutes(ginApp *gin.Engine) { +var ( + ConfigPostTapConfigHandler = controllers.PostTapConfig + ConfigGetTapConfigHandler = controllers.GetTapConfig +) + +func ConfigRoutes(ginApp *gin.Engine) *gin.RouterGroup { routeGroup := ginApp.Group("/config") routeGroup.Use(middlewares.RequiresAuth()) - routeGroup.POST("/tap", middlewares.RequiresAdmin(), controllers.PostTapConfig) - routeGroup.GET("/tap", controllers.GetTapConfig) + routeGroup.POST("/tap", middlewares.RequiresAdmin(), func(c *gin.Context) { ConfigPostTapConfigHandler(c) }) + routeGroup.GET("/tap", func(c *gin.Context) { ConfigGetTapConfigHandler(c) }) + + return routeGroup } diff --git a/agent/pkg/routes/entries_routes.go b/agent/pkg/routes/entries_routes.go index 5be3651ec..644c00d67 100644 --- a/agent/pkg/routes/entries_routes.go +++ b/agent/pkg/routes/entries_routes.go @@ -7,11 +7,18 @@ import ( "github.com/gin-gonic/gin" ) +var ( + EntriesGetHandler = controllers.GetEntries + EntriesGetSingleHandler = controllers.GetEntry +) + // EntriesRoutes defines the group of har entries routes. -func EntriesRoutes(ginApp *gin.Engine) { +func EntriesRoutes(ginApp *gin.Engine) *gin.RouterGroup { routeGroup := ginApp.Group("/entries") routeGroup.Use(middlewares.RequiresAuth()) - routeGroup.GET("/", controllers.GetEntries) // get entries (base/thin entries) and metadata - routeGroup.GET("/:id", controllers.GetEntry) // get single (full) entry + routeGroup.GET("/", func(c *gin.Context) { EntriesGetHandler(c) }) // get entries (base/thin entries) and metadata + routeGroup.GET("/:id", func(c *gin.Context) { EntriesGetSingleHandler(c) }) // get single (full) entry + + return routeGroup } diff --git a/agent/pkg/routes/install_routes.go b/agent/pkg/routes/install_routes.go index bdf8b602b..5ea0cab4a 100644 --- a/agent/pkg/routes/install_routes.go +++ b/agent/pkg/routes/install_routes.go @@ -6,9 +6,16 @@ import ( "github.com/gin-gonic/gin" ) -func InstallRoutes(ginApp *gin.Engine) { +var ( + InstallGetIsNeededHandler = controllers.IsSetupNecessary + InstallPostAdminHandler = controllers.SetupAdminUser +) + +func InstallRoutes(ginApp *gin.Engine) *gin.RouterGroup { routeGroup := ginApp.Group("/install") - routeGroup.GET("/isNeeded", controllers.IsSetupNecessary) - routeGroup.POST("/admin", controllers.SetupAdminUser) + routeGroup.GET("/isNeeded", func(c *gin.Context) { InstallGetIsNeededHandler(c) }) + routeGroup.POST("/admin", func(c *gin.Context) { InstallPostAdminHandler(c) }) + + return routeGroup } diff --git a/agent/pkg/routes/metadata_routes.go b/agent/pkg/routes/metadata_routes.go index 7428c5598..3f67e79ad 100644 --- a/agent/pkg/routes/metadata_routes.go +++ b/agent/pkg/routes/metadata_routes.go @@ -6,9 +6,15 @@ import ( "github.com/gin-gonic/gin" ) +var ( + MetadataGetVersionHandler = controllers.GetVersion +) + // MetadataRoutes defines the group of metadata routes. -func MetadataRoutes(app *gin.Engine) { +func MetadataRoutes(app *gin.Engine) *gin.RouterGroup { routeGroup := app.Group("/metadata") - routeGroup.GET("/version", controllers.GetVersion) + routeGroup.GET("/version", func(c *gin.Context) { MetadataGetVersionHandler(c) }) + + return routeGroup } diff --git a/agent/pkg/routes/oas_routes.go b/agent/pkg/routes/oas_routes.go index 34b988552..26b18b808 100644 --- a/agent/pkg/routes/oas_routes.go +++ b/agent/pkg/routes/oas_routes.go @@ -7,12 +7,20 @@ import ( "github.com/gin-gonic/gin" ) +var ( + OASGetServersHandler = controllers.GetOASServers + OASGetAllSpecsHandler = controllers.GetOASAllSpecs + OASGetSingleSpecHandler = controllers.GetOASSpec +) + // OASRoutes methods to access OAS spec -func OASRoutes(ginApp *gin.Engine) { +func OASRoutes(ginApp *gin.Engine) *gin.RouterGroup { routeGroup := ginApp.Group("/oas") routeGroup.Use(middlewares.RequiresAuth()) - routeGroup.GET("/", controllers.GetOASServers) // list of servers in OAS map - routeGroup.GET("/all", controllers.GetOASAllSpecs) // list of servers in OAS map - routeGroup.GET("/:id", controllers.GetOASSpec) // get OAS spec for given server + routeGroup.GET("/", func(c *gin.Context) { OASGetServersHandler(c) }) // list of servers in OAS map + routeGroup.GET("/all", func(c *gin.Context) { OASGetAllSpecsHandler(c) }) // list of servers in OAS map + routeGroup.GET("/:id", func(c *gin.Context) { OASGetSingleSpecHandler(c) }) // get OAS spec for given server + + return routeGroup } diff --git a/agent/pkg/routes/query_routes.go b/agent/pkg/routes/query_routes.go index c64066a2c..52cedda21 100644 --- a/agent/pkg/routes/query_routes.go +++ b/agent/pkg/routes/query_routes.go @@ -7,9 +7,15 @@ import ( "github.com/gin-gonic/gin" ) -func QueryRoutes(ginApp *gin.Engine) { +var ( + QueryPostValidateHandler = controllers.PostValidate +) + +func QueryRoutes(ginApp *gin.Engine) *gin.RouterGroup { routeGroup := ginApp.Group("/query") routeGroup.Use(middlewares.RequiresAuth()) - routeGroup.POST("/validate", controllers.PostValidate) + routeGroup.POST("/validate", func(c *gin.Context) { QueryPostValidateHandler(c) }) + + return routeGroup } diff --git a/agent/pkg/routes/service_map_routes.go b/agent/pkg/routes/service_map_routes.go index 1fb831f5e..058a4a8b0 100644 --- a/agent/pkg/routes/service_map_routes.go +++ b/agent/pkg/routes/service_map_routes.go @@ -7,13 +7,25 @@ import ( "github.com/gin-gonic/gin" ) -func ServiceMapRoutes(ginApp *gin.Engine) { +var ( + ServiceMapGetStatus gin.HandlerFunc + ServiceMapGet gin.HandlerFunc + ServiceMapReset gin.HandlerFunc +) + +func ServiceMapRoutes(ginApp *gin.Engine) *gin.RouterGroup { routeGroup := ginApp.Group("/servicemap") routeGroup.Use(middlewares.RequiresAuth()) controller := controllers.NewServiceMapController() - routeGroup.GET("/status", controller.Status) - routeGroup.GET("/get", controller.Get) - routeGroup.GET("/reset", controller.Reset) + ServiceMapGetStatus = controller.Status + ServiceMapGet = controller.Get + ServiceMapReset = controller.Reset + + routeGroup.GET("/status", func(c *gin.Context) { ServiceMapGetStatus(c) }) + routeGroup.GET("/get", func(c *gin.Context) { ServiceMapGet(c) }) + routeGroup.GET("/reset", func(c *gin.Context) { ServiceMapReset(c) }) + + return routeGroup } diff --git a/agent/pkg/routes/status_routes.go b/agent/pkg/routes/status_routes.go index f8a5c180b..9dcce2bf8 100644 --- a/agent/pkg/routes/status_routes.go +++ b/agent/pkg/routes/status_routes.go @@ -7,24 +7,39 @@ import ( "github.com/gin-gonic/gin" ) -func StatusRoutes(ginApp *gin.Engine) { +var ( + StatusGetHealthCheck = controllers.HealthCheck + StatusPostTappedPods = controllers.PostTappedPods + StatusPostTapperStatus = controllers.PostTapperStatus + StatusGetConnectedTappersCount = controllers.GetConnectedTappersCount + StatusGetTappingStatus = controllers.GetTappingStatus + StatusGetAuthStatus = controllers.GetAuthStatus + StatusGetAnalyzeInformation = controllers.AnalyzeInformation + StatusGetGeneralStats = controllers.GetGeneralStats + StatusGetRecentTLSLinks = controllers.GetRecentTLSLinks + StatusGetCurrentResolvingInformation = controllers.GetCurrentResolvingInformation +) + +func StatusRoutes(ginApp *gin.Engine) *gin.RouterGroup { routeGroup := ginApp.Group("/status") routeGroup.Use(middlewares.RequiresAuth()) - routeGroup.GET("/health", controllers.HealthCheck) + routeGroup.GET("/health", func(c *gin.Context) { StatusGetHealthCheck(c) }) - routeGroup.POST("/tappedPods", controllers.PostTappedPods) - routeGroup.POST("/tapperStatus", controllers.PostTapperStatus) - routeGroup.GET("/connectedTappersCount", controllers.GetConnectedTappersCount) - routeGroup.GET("/tap", controllers.GetTappingStatus) + routeGroup.POST("/tappedPods", func(c *gin.Context) { StatusPostTappedPods(c) }) + routeGroup.POST("/tapperStatus", func(c *gin.Context) { StatusPostTapperStatus(c) }) + routeGroup.GET("/connectedTappersCount", func(c *gin.Context) { StatusGetConnectedTappersCount(c) }) + routeGroup.GET("/tap", func(c *gin.Context) { StatusGetTappingStatus(c) }) - routeGroup.GET("/auth", controllers.GetAuthStatus) + routeGroup.GET("/auth", func(c *gin.Context) { StatusGetAuthStatus(c) }) - routeGroup.GET("/analyze", controllers.AnalyzeInformation) + routeGroup.GET("/analyze", func(c *gin.Context) { StatusGetAnalyzeInformation(c) }) - routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB + routeGroup.GET("/general", func(c *gin.Context) { StatusGetGeneralStats(c) }) // get general stats about entries in DB - routeGroup.GET("/recentTLSLinks", controllers.GetRecentTLSLinks) + routeGroup.GET("/recentTLSLinks", func(c *gin.Context) { StatusGetRecentTLSLinks(c) }) - routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation) + routeGroup.GET("/resolving", func(c *gin.Context) { StatusGetCurrentResolvingInformation(c) }) + + return routeGroup } diff --git a/agent/pkg/routes/user_routes.go b/agent/pkg/routes/user_routes.go index bf6e562bd..7ee9f2476 100644 --- a/agent/pkg/routes/user_routes.go +++ b/agent/pkg/routes/user_routes.go @@ -6,10 +6,18 @@ import ( "github.com/gin-gonic/gin" ) -func UserRoutes(ginApp *gin.Engine) { +var ( + UserPostLogin = controllers.Login + UserPostLogout = controllers.Logout + UserPostRegister = controllers.Register +) + +func UserRoutes(ginApp *gin.Engine) *gin.RouterGroup { routeGroup := ginApp.Group("/user") - routeGroup.POST("/login", controllers.Login) - routeGroup.POST("/logout", controllers.Logout) - routeGroup.POST("/register", controllers.Register) + routeGroup.POST("/login", func(c *gin.Context) { UserPostLogin(c) }) + routeGroup.POST("/logout", func(c *gin.Context) { UserPostLogout(c) }) + routeGroup.POST("/register", func(c *gin.Context) { UserPostRegister(c) }) + + return routeGroup }