TRA-4263 extensibility refactor (#770)

* Refactor routers

* refactor extension loading

* refactor server creation code

* Update main.go
This commit is contained in:
RamiBerm
2022-02-08 14:59:56 +02:00
committed by GitHub
parent f013b0f03c
commit 0a4674ea7c
13 changed files with 450 additions and 307 deletions

View File

@@ -5,48 +5,27 @@ import (
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"io/ioutil"
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"sort"
"strconv"
"strings"
"syscall" "syscall"
"time" "time"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/models" "github.com/up9inc/mizu/agent/pkg/models"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/agent/pkg/routes"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/up9"
"github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/agent/pkg/utils"
"github.com/up9inc/mizu/agent/pkg/elastic"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/agent/pkg/api" "github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/app"
"github.com/up9inc/mizu/agent/pkg/config" "github.com/up9inc/mizu/agent/pkg/config"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"github.com/antelman107/net-wait-go/wait"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"github.com/op/go-logging" "github.com/op/go-logging"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap" "github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api" tapApi "github.com/up9inc/mizu/tap/api"
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
httpExt "github.com/up9inc/mizu/tap/extensions/http"
kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka"
redisExt "github.com/up9inc/mizu/tap/extensions/redis"
) )
var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API") var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API")
@@ -57,11 +36,6 @@ var namespace = flag.String("namespace", "", "Resolve IPs if they belong to reso
var harsReaderMode = flag.Bool("hars-read", false, "Run in hars-read mode") var harsReaderMode = flag.Bool("hars-read", false, "Run in hars-read mode")
var harsDir = flag.String("hars-dir", "", "Directory to read hars from") var harsDir = flag.String("hars-dir", "", "Directory to read hars from")
var extensions []*tapApi.Extension // global
var extensionsMap map[string]*tapApi.Extension // global
var startTime int64
const ( const (
socketConnectionRetries = 30 socketConnectionRetries = 30
socketConnectionRetryDelay = time.Second * 2 socketConnectionRetryDelay = time.Second * 2
@@ -75,78 +49,20 @@ func main() {
if err := config.LoadConfig(); err != nil { if err := config.LoadConfig(); err != nil {
logger.Log.Fatalf("Error loading config file %v", err) logger.Log.Fatalf("Error loading config file %v", err)
} }
loadExtensions() app.LoadExtensions()
if !*tapperMode && !*apiServerMode && !*standaloneMode && !*harsReaderMode { if !*tapperMode && !*apiServerMode && !*standaloneMode && !*harsReaderMode {
panic("One of the flags --tap, --api or --standalone or --hars-read must be provided") panic("One of the flags --tap, --api or --standalone or --hars-read must be provided")
} }
if *standaloneMode { if *standaloneMode {
api.StartResolving(*namespace) runInStandaloneMode()
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteringOptions := getTrafficFilteringOptions()
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
hostApi(nil)
} else if *tapperMode { } else if *tapperMode {
logger.Log.Infof("Starting tapper, websocket address: %s", *apiServerAddress) runInTapperMode()
if *apiServerAddress == "" {
panic("API server address must be provided with --api-server-address when using --tap")
}
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tapTargets := getTapTargets()
if tapTargets != nil {
tapOpts.FilterAuthorities = tapTargets
logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities)
}
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteringOptions := getTrafficFilteringOptions()
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
}
logger.Log.Infof("Connected successfully to websocket %s", *apiServerAddress)
go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
} else if *apiServerMode { } else if *apiServerMode {
configureBasenineServer(shared.BasenineHost, shared.BaseninePort) utils.StartServer(app.RunInApiServerMode(*namespace))
startTime = time.Now().UnixNano() / int64(time.Millisecond)
api.StartResolving(*namespace)
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
enableExpFeatureIfNeeded()
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
syncEntriesConfig := getSyncEntriesConfig()
if syncEntriesConfig != nil {
if err := up9.SyncEntries(syncEntriesConfig); err != nil {
logger.Log.Error("Error syncing entries, err: %v", err)
}
}
hostApi(outputItemsChannel)
} else if *harsReaderMode { } else if *harsReaderMode {
outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000) runInHarReaderMode()
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
go filterItems(outputItemsChannel, filteredHarChannel)
go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
hostApi(nil)
} }
signalChan := make(chan os.Signal, 1) signalChan := make(chan os.Signal, 1)
@@ -156,167 +72,59 @@ func main() {
logger.Log.Info("Exiting") logger.Log.Info("Exiting")
} }
func enableExpFeatureIfNeeded() { func runInTapperMode() {
if config.Config.OAS { logger.Log.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
oas.GetOasGeneratorInstance().Start() if *apiServerAddress == "" {
} panic("API server address must be provided with --api-server-address when using --tap")
if config.Config.ServiceMap {
servicemap.GetInstance().SetConfig(config.Config)
}
elastic.GetInstance().Configure(config.Config.Elastic)
}
func configureBasenineServer(host string, port string) {
if !wait.New(
wait.WithProto("tcp"),
wait.WithWait(200*time.Millisecond),
wait.WithBreak(50*time.Millisecond),
wait.WithDeadline(5*time.Second),
wait.WithDebug(config.Config.LogLevel == logging.DEBUG),
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
logger.Log.Panicf("Basenine is not available!")
} }
// Limit the database size to default 200MB hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
err := basenine.Limit(host, port, config.Config.MaxDBSizeBytes) tapOpts := &tap.TapOpts{HostMode: hostMode}
tapTargets := getTapTargets()
if tapTargets != nil {
tapOpts.FilterAuthorities = tapTargets
logger.Log.Infof("Filtering for the following authorities: %v", tapOpts.FilterAuthorities)
}
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteringOptions := getTrafficFilteringOptions()
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, app.Extensions, filteringOptions)
socketConnection, err := dialSocketWithRetry(*apiServerAddress, socketConnectionRetries, socketConnectionRetryDelay)
if err != nil { if err != nil {
logger.Log.Panicf("Error while limiting database size: %v", err) panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
} }
logger.Log.Infof("Connected successfully to websocket %s", *apiServerAddress)
// Define the macros go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
for _, extension := range extensions {
macros := extension.Dissector.Macros()
for macro, expanded := range macros {
err = basenine.Macro(host, port, macro, expanded)
if err != nil {
logger.Log.Panicf("Error while adding a macro: %v", err)
}
}
}
} }
func loadExtensions() { func runInStandaloneMode() {
extensions = make([]*tapApi.Extension, 4) api.StartResolving(*namespace)
extensionsMap = make(map[string]*tapApi.Extension)
extensionAmqp := &tapApi.Extension{} outputItemsChannel := make(chan *tapApi.OutputChannelItem)
dissectorAmqp := amqpExt.NewDissector() filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
dissectorAmqp.Register(extensionAmqp)
extensionAmqp.Dissector = dissectorAmqp
extensions[0] = extensionAmqp
extensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp
extensionHttp := &tapApi.Extension{} filteringOptions := getTrafficFilteringOptions()
dissectorHttp := httpExt.NewDissector() hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
dissectorHttp.Register(extensionHttp) tapOpts := &tap.TapOpts{HostMode: hostMode}
extensionHttp.Dissector = dissectorHttp tap.StartPassiveTapper(tapOpts, outputItemsChannel, app.Extensions, filteringOptions)
extensions[1] = extensionHttp
extensionsMap[extensionHttp.Protocol.Name] = extensionHttp
extensionKafka := &tapApi.Extension{} go app.FilterItems(outputItemsChannel, filteredOutputItemsChannel)
dissectorKafka := kafkaExt.NewDissector() go api.StartReadingEntries(filteredOutputItemsChannel, nil, app.ExtensionsMap)
dissectorKafka.Register(extensionKafka)
extensionKafka.Dissector = dissectorKafka
extensions[2] = extensionKafka
extensionsMap[extensionKafka.Protocol.Name] = extensionKafka
extensionRedis := &tapApi.Extension{} ginApp := app.HostApi(nil)
dissectorRedis := redisExt.NewDissector() utils.StartServer(ginApp)
dissectorRedis.Register(extensionRedis)
extensionRedis.Dissector = dissectorRedis
extensions[3] = extensionRedis
extensionsMap[extensionRedis.Protocol.Name] = extensionRedis
sort.Slice(extensions, func(i, j int) bool {
return extensions[i].Protocol.Priority < extensions[j].Protocol.Priority
})
for _, extension := range extensions {
logger.Log.Infof("Extension Properties: %+v", extension)
}
controllers.InitExtensionsMap(extensionsMap)
} }
func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) { func runInHarReaderMode() {
app := gin.Default() outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
filteredHarChannel := make(chan *tapApi.OutputChannelItem)
app.GET("/echo", func(c *gin.Context) { go app.FilterItems(outputItemsChannel, filteredHarChannel)
c.String(http.StatusOK, "Here is Mizu agent") go api.StartReadingEntries(filteredHarChannel, harsDir, app.ExtensionsMap)
}) ginApp := app.HostApi(nil)
utils.StartServer(ginApp)
eventHandlers := api.RoutesEventHandlers{
SocketOutChannel: socketHarOutputChannel,
}
app.Use(DisableRootStaticCache())
var staticFolder string
if config.Config.StandaloneMode {
staticFolder = "./site-standalone"
} else {
staticFolder = "./site"
}
indexStaticFile := staticFolder + "/index.html"
if err := setUIFlags(indexStaticFile); err != nil {
logger.Log.Errorf("Error setting ui flags, err: %v", err)
}
app.Use(static.ServeRoot("/", staticFolder))
app.NoRoute(func(c *gin.Context) {
c.File(indexStaticFile)
})
app.Use(middlewares.CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
api.WebSocketRoutes(app, &eventHandlers, startTime)
if config.Config.StandaloneMode {
routes.ConfigRoutes(app)
routes.UserRoutes(app)
routes.InstallRoutes(app)
}
if config.Config.OAS {
routes.OASRoutes(app)
}
if config.Config.ServiceMap {
routes.ServiceMapRoutes(app)
}
routes.QueryRoutes(app)
routes.EntriesRoutes(app)
routes.MetadataRoutes(app)
routes.StatusRoutes(app)
utils.StartServer(app)
}
func DisableRootStaticCache() gin.HandlerFunc {
return func(c *gin.Context) {
if c.Request.RequestURI == "/" {
// Disable cache only for the main static route
c.Writer.Header().Set("Cache-Control", "no-store")
}
c.Next()
}
}
func setUIFlags(uiIndexPath string) error {
read, err := ioutil.ReadFile(uiIndexPath)
if err != nil {
return err
}
replacedContent := strings.Replace(string(read), "__IS_OAS_ENABLED__", strconv.FormatBool(config.Config.OAS), 1)
replacedContent = strings.Replace(replacedContent, "__IS_SERVICE_MAP_ENABLED__", strconv.FormatBool(config.Config.ServiceMap), 1)
err = ioutil.WriteFile(uiIndexPath, []byte(replacedContent), 0)
if err != nil {
return err
}
return nil
} }
func parseEnvVar(env string) map[string][]v1.Pod { func parseEnvVar(env string) map[string][]v1.Pod {
@@ -357,16 +165,6 @@ func getTrafficFilteringOptions() *tapApi.TrafficFilteringOptions {
return &filteringOptions return &filteringOptions
} }
func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
outChannel <- message
}
}
func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) { func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) {
if connection == nil { if connection == nil {
panic("Websocket connection is nil") panic("Websocket connection is nil")
@@ -402,21 +200,6 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
} }
} }
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
syncEntriesConfigJson := os.Getenv(shared.SyncEntriesConfigEnvVar)
if syncEntriesConfigJson == "" {
return nil
}
var syncEntriesConfig = &shared.SyncEntriesConfig{}
err := json.Unmarshal([]byte(syncEntriesConfigJson), syncEntriesConfig)
if err != nil {
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.SyncEntriesConfig struct, err: %v", shared.SyncEntriesConfigEnvVar, syncEntriesConfigJson, err))
}
return syncEntriesConfig
}
func determineLogLevel() (logLevel logging.Level) { func determineLogLevel() (logLevel logging.Level) {
logLevel, err := logging.LogLevel(os.Getenv(shared.LogLevelEnvVar)) logLevel, err := logging.LogLevel(os.Getenv(shared.LogLevelEnvVar))
if err != nil { if err != nil {

View File

@@ -30,14 +30,18 @@ type SocketConnection struct {
isTapper bool isTapper bool
} }
var websocketUpgrader = websocket.Upgrader{ var (
ReadBufferSize: 1024, websocketUpgrader = websocket.Upgrader{
WriteBufferSize: 1024, ReadBufferSize: 1024,
} WriteBufferSize: 1024,
}
var websocketIdsLock = sync.Mutex{} websocketIdsLock = sync.Mutex{}
var connectedWebsockets map[int]*SocketConnection connectedWebsockets map[int]*SocketConnection
var connectedWebsocketIdCounter = 0 connectedWebsocketIdCounter = 0
SocketGetBrowserHandler gin.HandlerFunc
SocketGetTapperHandler gin.HandlerFunc
)
func init() { func init() {
websocketUpgrader.CheckOrigin = func(r *http.Request) bool { return true } // like cors for web socket websocketUpgrader.CheckOrigin = func(r *http.Request) bool { return true } // like cors for web socket
@@ -45,12 +49,20 @@ func init() {
} }
func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int64) { func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int64) {
app.GET("/ws", func(c *gin.Context) { SocketGetBrowserHandler = func(c *gin.Context) {
websocketHandler(c.Writer, c.Request, eventHandlers, false, startTime) websocketHandler(c.Writer, c.Request, eventHandlers, false, startTime)
}
SocketGetTapperHandler = func(c *gin.Context) {
websocketHandler(c.Writer, c.Request, eventHandlers, true, startTime)
}
app.GET("/ws", func(c *gin.Context) {
SocketGetBrowserHandler(c)
}) })
app.GET("/wsTapper", func(c *gin.Context) { // TODO: add m2m authentication to this route app.GET("/wsTapper", func(c *gin.Context) { // TODO: add m2m authentication to this route
websocketHandler(c.Writer, c.Request, eventHandlers, true, startTime) SocketGetTapperHandler(c)
}) })
} }

View File

@@ -0,0 +1,62 @@
package app
import (
"sort"
"github.com/up9inc/mizu/agent/pkg/controllers"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
amqpExt "github.com/up9inc/mizu/tap/extensions/amqp"
httpExt "github.com/up9inc/mizu/tap/extensions/http"
kafkaExt "github.com/up9inc/mizu/tap/extensions/kafka"
redisExt "github.com/up9inc/mizu/tap/extensions/redis"
)
var (
Extensions []*tapApi.Extension // global
ExtensionsMap map[string]*tapApi.Extension // global
)
func LoadExtensions() {
Extensions = make([]*tapApi.Extension, 4)
ExtensionsMap = make(map[string]*tapApi.Extension)
extensionAmqp := &tapApi.Extension{}
dissectorAmqp := amqpExt.NewDissector()
dissectorAmqp.Register(extensionAmqp)
extensionAmqp.Dissector = dissectorAmqp
Extensions[0] = extensionAmqp
ExtensionsMap[extensionAmqp.Protocol.Name] = extensionAmqp
extensionHttp := &tapApi.Extension{}
dissectorHttp := httpExt.NewDissector()
dissectorHttp.Register(extensionHttp)
extensionHttp.Dissector = dissectorHttp
Extensions[1] = extensionHttp
ExtensionsMap[extensionHttp.Protocol.Name] = extensionHttp
extensionKafka := &tapApi.Extension{}
dissectorKafka := kafkaExt.NewDissector()
dissectorKafka.Register(extensionKafka)
extensionKafka.Dissector = dissectorKafka
Extensions[2] = extensionKafka
ExtensionsMap[extensionKafka.Protocol.Name] = extensionKafka
extensionRedis := &tapApi.Extension{}
dissectorRedis := redisExt.NewDissector()
dissectorRedis.Register(extensionRedis)
extensionRedis.Dissector = dissectorRedis
Extensions[3] = extensionRedis
ExtensionsMap[extensionRedis.Protocol.Name] = extensionRedis
sort.Slice(Extensions, func(i, j int) bool {
return Extensions[i].Protocol.Priority < Extensions[j].Protocol.Priority
})
for _, extension := range Extensions {
logger.Log.Infof("Extension Properties: %+v", extension)
}
controllers.InitExtensionsMap(ExtensionsMap)
}

210
agent/pkg/app/server.go Normal file
View File

@@ -0,0 +1,210 @@
package app
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/antelman107/net-wait-go/wait"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/op/go-logging"
basenine "github.com/up9inc/basenine/client/go"
"github.com/up9inc/mizu/agent/pkg/api"
"github.com/up9inc/mizu/agent/pkg/config"
"github.com/up9inc/mizu/agent/pkg/elastic"
"github.com/up9inc/mizu/agent/pkg/middlewares"
"github.com/up9inc/mizu/agent/pkg/oas"
"github.com/up9inc/mizu/agent/pkg/routes"
"github.com/up9inc/mizu/agent/pkg/servicemap"
"github.com/up9inc/mizu/agent/pkg/up9"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)
var (
ConfigRoutes *gin.RouterGroup
UserRoutes *gin.RouterGroup
InstallRoutes *gin.RouterGroup
OASRoutes *gin.RouterGroup
ServiceMapRoutes *gin.RouterGroup
QueryRoutes *gin.RouterGroup
EntriesRoutes *gin.RouterGroup
MetadataRoutes *gin.RouterGroup
StatusRoutes *gin.RouterGroup
startTime int64
)
func HostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engine {
app := gin.Default()
app.GET("/echo", func(c *gin.Context) {
c.String(http.StatusOK, "Here is Mizu agent")
})
eventHandlers := api.RoutesEventHandlers{
SocketOutChannel: socketHarOutputChannel,
}
app.Use(disableRootStaticCache())
var staticFolder string
if config.Config.StandaloneMode {
staticFolder = "./site-standalone"
} else {
staticFolder = "./site"
}
indexStaticFile := staticFolder + "/index.html"
if err := setUIFlags(indexStaticFile); err != nil {
logger.Log.Errorf("Error setting ui flags, err: %v", err)
}
app.Use(static.ServeRoot("/", staticFolder))
app.NoRoute(func(c *gin.Context) {
c.File(indexStaticFile)
})
app.Use(middlewares.CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
api.WebSocketRoutes(app, &eventHandlers, startTime)
if config.Config.StandaloneMode {
ConfigRoutes = routes.ConfigRoutes(app)
UserRoutes = routes.UserRoutes(app)
InstallRoutes = routes.InstallRoutes(app)
}
if config.Config.OAS {
OASRoutes = routes.OASRoutes(app)
}
if config.Config.ServiceMap {
ServiceMapRoutes = routes.ServiceMapRoutes(app)
}
QueryRoutes = routes.QueryRoutes(app)
EntriesRoutes = routes.EntriesRoutes(app)
MetadataRoutes = routes.MetadataRoutes(app)
StatusRoutes = routes.StatusRoutes(app)
return app
}
func RunInApiServerMode(namespace string) *gin.Engine {
configureBasenineServer(shared.BasenineHost, shared.BaseninePort)
startTime = time.Now().UnixNano() / int64(time.Millisecond)
api.StartResolving(namespace)
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
enableExpFeatureIfNeeded()
go FilterItems(outputItemsChannel, filteredOutputItemsChannel)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, ExtensionsMap)
syncEntriesConfig := getSyncEntriesConfig()
if syncEntriesConfig != nil {
if err := up9.SyncEntries(syncEntriesConfig); err != nil {
logger.Log.Error("Error syncing entries, err: %v", err)
}
}
return HostApi(outputItemsChannel)
}
func configureBasenineServer(host string, port string) {
if !wait.New(
wait.WithProto("tcp"),
wait.WithWait(200*time.Millisecond),
wait.WithBreak(50*time.Millisecond),
wait.WithDeadline(5*time.Second),
wait.WithDebug(config.Config.LogLevel == logging.DEBUG),
).Do([]string{fmt.Sprintf("%s:%s", host, port)}) {
logger.Log.Panicf("Basenine is not available!")
}
// Limit the database size to default 200MB
err := basenine.Limit(host, port, config.Config.MaxDBSizeBytes)
if err != nil {
logger.Log.Panicf("Error while limiting database size: %v", err)
}
// Define the macros
for _, extension := range Extensions {
macros := extension.Dissector.Macros()
for macro, expanded := range macros {
err = basenine.Macro(host, port, macro, expanded)
if err != nil {
logger.Log.Panicf("Error while adding a macro: %v", err)
}
}
}
}
func getSyncEntriesConfig() *shared.SyncEntriesConfig {
syncEntriesConfigJson := os.Getenv(shared.SyncEntriesConfigEnvVar)
if syncEntriesConfigJson == "" {
return nil
}
var syncEntriesConfig = &shared.SyncEntriesConfig{}
err := json.Unmarshal([]byte(syncEntriesConfigJson), syncEntriesConfig)
if err != nil {
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.SyncEntriesConfig struct, err: %v", shared.SyncEntriesConfigEnvVar, syncEntriesConfigJson, err))
}
return syncEntriesConfig
}
func FilterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
outChannel <- message
}
}
func enableExpFeatureIfNeeded() {
if config.Config.OAS {
oas.GetOasGeneratorInstance().Start()
}
if config.Config.ServiceMap {
servicemap.GetInstance().SetConfig(config.Config)
}
elastic.GetInstance().Configure(config.Config.Elastic)
}
func disableRootStaticCache() gin.HandlerFunc {
return func(c *gin.Context) {
if c.Request.RequestURI == "/" {
// Disable cache only for the main static route
c.Writer.Header().Set("Cache-Control", "no-store")
}
c.Next()
}
}
func setUIFlags(uiIndexPath string) error {
read, err := ioutil.ReadFile(uiIndexPath)
if err != nil {
return err
}
replacedContent := strings.Replace(string(read), "__IS_OAS_ENABLED__", strconv.FormatBool(config.Config.OAS), 1)
replacedContent = strings.Replace(replacedContent, "__IS_SERVICE_MAP_ENABLED__", strconv.FormatBool(config.Config.ServiceMap), 1)
err = ioutil.WriteFile(uiIndexPath, []byte(replacedContent), 0)
if err != nil {
return err
}
return nil
}

View File

@@ -7,10 +7,17 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
func ConfigRoutes(ginApp *gin.Engine) { var (
ConfigPostTapConfigHandler = controllers.PostTapConfig
ConfigGetTapConfigHandler = controllers.GetTapConfig
)
func ConfigRoutes(ginApp *gin.Engine) *gin.RouterGroup {
routeGroup := ginApp.Group("/config") routeGroup := ginApp.Group("/config")
routeGroup.Use(middlewares.RequiresAuth()) routeGroup.Use(middlewares.RequiresAuth())
routeGroup.POST("/tap", middlewares.RequiresAdmin(), controllers.PostTapConfig) routeGroup.POST("/tap", middlewares.RequiresAdmin(), func(c *gin.Context) { ConfigPostTapConfigHandler(c) })
routeGroup.GET("/tap", controllers.GetTapConfig) routeGroup.GET("/tap", func(c *gin.Context) { ConfigGetTapConfigHandler(c) })
return routeGroup
} }

View File

@@ -7,11 +7,18 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
var (
EntriesGetHandler = controllers.GetEntries
EntriesGetSingleHandler = controllers.GetEntry
)
// EntriesRoutes defines the group of har entries routes. // EntriesRoutes defines the group of har entries routes.
func EntriesRoutes(ginApp *gin.Engine) { func EntriesRoutes(ginApp *gin.Engine) *gin.RouterGroup {
routeGroup := ginApp.Group("/entries") routeGroup := ginApp.Group("/entries")
routeGroup.Use(middlewares.RequiresAuth()) routeGroup.Use(middlewares.RequiresAuth())
routeGroup.GET("/", controllers.GetEntries) // get entries (base/thin entries) and metadata routeGroup.GET("/", func(c *gin.Context) { EntriesGetHandler(c) }) // get entries (base/thin entries) and metadata
routeGroup.GET("/:id", controllers.GetEntry) // get single (full) entry routeGroup.GET("/:id", func(c *gin.Context) { EntriesGetSingleHandler(c) }) // get single (full) entry
return routeGroup
} }

View File

@@ -6,9 +6,16 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
func InstallRoutes(ginApp *gin.Engine) { var (
InstallGetIsNeededHandler = controllers.IsSetupNecessary
InstallPostAdminHandler = controllers.SetupAdminUser
)
func InstallRoutes(ginApp *gin.Engine) *gin.RouterGroup {
routeGroup := ginApp.Group("/install") routeGroup := ginApp.Group("/install")
routeGroup.GET("/isNeeded", controllers.IsSetupNecessary) routeGroup.GET("/isNeeded", func(c *gin.Context) { InstallGetIsNeededHandler(c) })
routeGroup.POST("/admin", controllers.SetupAdminUser) routeGroup.POST("/admin", func(c *gin.Context) { InstallPostAdminHandler(c) })
return routeGroup
} }

View File

@@ -6,9 +6,15 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
var (
MetadataGetVersionHandler = controllers.GetVersion
)
// MetadataRoutes defines the group of metadata routes. // MetadataRoutes defines the group of metadata routes.
func MetadataRoutes(app *gin.Engine) { func MetadataRoutes(app *gin.Engine) *gin.RouterGroup {
routeGroup := app.Group("/metadata") routeGroup := app.Group("/metadata")
routeGroup.GET("/version", controllers.GetVersion) routeGroup.GET("/version", func(c *gin.Context) { MetadataGetVersionHandler(c) })
return routeGroup
} }

View File

@@ -7,12 +7,20 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
var (
OASGetServersHandler = controllers.GetOASServers
OASGetAllSpecsHandler = controllers.GetOASAllSpecs
OASGetSingleSpecHandler = controllers.GetOASSpec
)
// OASRoutes methods to access OAS spec // OASRoutes methods to access OAS spec
func OASRoutes(ginApp *gin.Engine) { func OASRoutes(ginApp *gin.Engine) *gin.RouterGroup {
routeGroup := ginApp.Group("/oas") routeGroup := ginApp.Group("/oas")
routeGroup.Use(middlewares.RequiresAuth()) routeGroup.Use(middlewares.RequiresAuth())
routeGroup.GET("/", controllers.GetOASServers) // list of servers in OAS map routeGroup.GET("/", func(c *gin.Context) { OASGetServersHandler(c) }) // list of servers in OAS map
routeGroup.GET("/all", controllers.GetOASAllSpecs) // list of servers in OAS map routeGroup.GET("/all", func(c *gin.Context) { OASGetAllSpecsHandler(c) }) // list of servers in OAS map
routeGroup.GET("/:id", controllers.GetOASSpec) // get OAS spec for given server routeGroup.GET("/:id", func(c *gin.Context) { OASGetSingleSpecHandler(c) }) // get OAS spec for given server
return routeGroup
} }

View File

@@ -7,9 +7,15 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
func QueryRoutes(ginApp *gin.Engine) { var (
QueryPostValidateHandler = controllers.PostValidate
)
func QueryRoutes(ginApp *gin.Engine) *gin.RouterGroup {
routeGroup := ginApp.Group("/query") routeGroup := ginApp.Group("/query")
routeGroup.Use(middlewares.RequiresAuth()) routeGroup.Use(middlewares.RequiresAuth())
routeGroup.POST("/validate", controllers.PostValidate) routeGroup.POST("/validate", func(c *gin.Context) { QueryPostValidateHandler(c) })
return routeGroup
} }

View File

@@ -7,13 +7,25 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
func ServiceMapRoutes(ginApp *gin.Engine) { var (
ServiceMapGetStatus gin.HandlerFunc
ServiceMapGet gin.HandlerFunc
ServiceMapReset gin.HandlerFunc
)
func ServiceMapRoutes(ginApp *gin.Engine) *gin.RouterGroup {
routeGroup := ginApp.Group("/servicemap") routeGroup := ginApp.Group("/servicemap")
routeGroup.Use(middlewares.RequiresAuth()) routeGroup.Use(middlewares.RequiresAuth())
controller := controllers.NewServiceMapController() controller := controllers.NewServiceMapController()
routeGroup.GET("/status", controller.Status) ServiceMapGetStatus = controller.Status
routeGroup.GET("/get", controller.Get) ServiceMapGet = controller.Get
routeGroup.GET("/reset", controller.Reset) ServiceMapReset = controller.Reset
routeGroup.GET("/status", func(c *gin.Context) { ServiceMapGetStatus(c) })
routeGroup.GET("/get", func(c *gin.Context) { ServiceMapGet(c) })
routeGroup.GET("/reset", func(c *gin.Context) { ServiceMapReset(c) })
return routeGroup
} }

View File

@@ -7,24 +7,39 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
func StatusRoutes(ginApp *gin.Engine) { var (
StatusGetHealthCheck = controllers.HealthCheck
StatusPostTappedPods = controllers.PostTappedPods
StatusPostTapperStatus = controllers.PostTapperStatus
StatusGetConnectedTappersCount = controllers.GetConnectedTappersCount
StatusGetTappingStatus = controllers.GetTappingStatus
StatusGetAuthStatus = controllers.GetAuthStatus
StatusGetAnalyzeInformation = controllers.AnalyzeInformation
StatusGetGeneralStats = controllers.GetGeneralStats
StatusGetRecentTLSLinks = controllers.GetRecentTLSLinks
StatusGetCurrentResolvingInformation = controllers.GetCurrentResolvingInformation
)
func StatusRoutes(ginApp *gin.Engine) *gin.RouterGroup {
routeGroup := ginApp.Group("/status") routeGroup := ginApp.Group("/status")
routeGroup.Use(middlewares.RequiresAuth()) routeGroup.Use(middlewares.RequiresAuth())
routeGroup.GET("/health", controllers.HealthCheck) routeGroup.GET("/health", func(c *gin.Context) { StatusGetHealthCheck(c) })
routeGroup.POST("/tappedPods", controllers.PostTappedPods) routeGroup.POST("/tappedPods", func(c *gin.Context) { StatusPostTappedPods(c) })
routeGroup.POST("/tapperStatus", controllers.PostTapperStatus) routeGroup.POST("/tapperStatus", func(c *gin.Context) { StatusPostTapperStatus(c) })
routeGroup.GET("/connectedTappersCount", controllers.GetConnectedTappersCount) routeGroup.GET("/connectedTappersCount", func(c *gin.Context) { StatusGetConnectedTappersCount(c) })
routeGroup.GET("/tap", controllers.GetTappingStatus) routeGroup.GET("/tap", func(c *gin.Context) { StatusGetTappingStatus(c) })
routeGroup.GET("/auth", controllers.GetAuthStatus) routeGroup.GET("/auth", func(c *gin.Context) { StatusGetAuthStatus(c) })
routeGroup.GET("/analyze", controllers.AnalyzeInformation) routeGroup.GET("/analyze", func(c *gin.Context) { StatusGetAnalyzeInformation(c) })
routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB routeGroup.GET("/general", func(c *gin.Context) { StatusGetGeneralStats(c) }) // get general stats about entries in DB
routeGroup.GET("/recentTLSLinks", controllers.GetRecentTLSLinks) routeGroup.GET("/recentTLSLinks", func(c *gin.Context) { StatusGetRecentTLSLinks(c) })
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation) routeGroup.GET("/resolving", func(c *gin.Context) { StatusGetCurrentResolvingInformation(c) })
return routeGroup
} }

View File

@@ -6,10 +6,18 @@ import (
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
) )
func UserRoutes(ginApp *gin.Engine) { var (
UserPostLogin = controllers.Login
UserPostLogout = controllers.Logout
UserPostRegister = controllers.Register
)
func UserRoutes(ginApp *gin.Engine) *gin.RouterGroup {
routeGroup := ginApp.Group("/user") routeGroup := ginApp.Group("/user")
routeGroup.POST("/login", controllers.Login) routeGroup.POST("/login", func(c *gin.Context) { UserPostLogin(c) })
routeGroup.POST("/logout", controllers.Logout) routeGroup.POST("/logout", func(c *gin.Context) { UserPostLogout(c) })
routeGroup.POST("/register", controllers.Register) routeGroup.POST("/register", func(c *gin.Context) { UserPostRegister(c) })
return routeGroup
} }