From 774f07fccd5e7cbe342e5e22d40a47168f01fa0d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Mon, 21 Mar 2022 06:54:36 -0700 Subject: [PATCH] Add `/db/flush` and `/db/reset` API endpoints (#896) * Add `/db/flush` and `/db/reset` API endpoints * Handle the unmarshalling errors better in the WebSocket * Handle Basenine connection error better in the WebSocket * Upgrade to Basenine `v0.6.5` * Fix the duplicated `StartTime` state Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com> --- Dockerfile | 4 +-- agent/go.mod | 2 +- agent/go.sum | 4 +-- agent/main.go | 5 ++-- agent/pkg/api/socket_routes.go | 25 ++++++++++++------ agent/pkg/app/main.go | 5 ++-- agent/pkg/controllers/db_controller.go | 28 +++++++++++++++++++++ agent/pkg/controllers/entries_controller.go | 11 +++----- agent/pkg/routes/db_routes.go | 15 +++++++++++ agent/pkg/utils/utils.go | 4 +++ cli/go.mod | 2 +- cli/go.sum | 4 +-- 12 files changed, 80 insertions(+), 29 deletions(-) create mode 100644 agent/pkg/controllers/db_controller.go create mode 100644 agent/pkg/routes/db_routes.go diff --git a/Dockerfile b/Dockerfile index 582eaa5f0..94c1bfe88 100644 --- a/Dockerfile +++ b/Dockerfile @@ -77,8 +77,8 @@ RUN go build -ldflags="-extldflags=-static -s -w \ -X 'github.com/up9inc/mizu/agent/pkg/version.Ver=${VER}'" -o mizuagent . # Download Basenine executable, verify the sha1sum -ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH} -ADD https://github.com/up9inc/basenine/releases/download/v0.6.3/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256 +ADD https://github.com/up9inc/basenine/releases/download/v0.6.5/basenine_linux_${GOARCH} ./basenine_linux_${GOARCH} +ADD https://github.com/up9inc/basenine/releases/download/v0.6.5/basenine_linux_${GOARCH}.sha256 ./basenine_linux_${GOARCH}.sha256 RUN shasum -a 256 -c basenine_linux_${GOARCH}.sha256 RUN chmod +x ./basenine_linux_${GOARCH} RUN mv ./basenine_linux_${GOARCH} ./basenine diff --git a/agent/go.mod b/agent/go.mod index 940c3cfc5..d3912e648 100644 --- a/agent/go.mod +++ b/agent/go.mod @@ -20,7 +20,7 @@ require ( github.com/orcaman/concurrent-map v1.0.0 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/stretchr/testify v1.7.0 - github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e + github.com/up9inc/basenine/client/go v0.0.0-20220317230530-8472d80307f6 github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/tap v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0 diff --git a/agent/go.sum b/agent/go.sum index 5e9538466..f98a773dc 100644 --- a/agent/go.sum +++ b/agent/go.sum @@ -655,8 +655,8 @@ github.com/ugorji/go v1.2.6/go.mod h1:anCg0y61KIhDlPZmnH+so+RQbysYVyDko0IMgJv0Nn github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/ugorji/go/codec v1.2.6 h1:7kbGefxLoDBuYXOms4yD7223OpNMMPNPZxXk5TvFcyQ= github.com/ugorji/go/codec v1.2.6/go.mod h1:V6TCNZ4PHqoHGFZuSG1W8nrCzzdgA2DozYxWFFpvxTw= -github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e h1:/9dFXqvRDHcwPQdIGHP6iz6M0iAWBPOxYf6C+Ntq5w0= -github.com/up9inc/basenine/client/go v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI= +github.com/up9inc/basenine/client/go v0.0.0-20220317230530-8472d80307f6 h1:c0aVbLKYeFDAg246+NDgie2y484bsc20NaKLo8ODV3E= +github.com/up9inc/basenine/client/go v0.0.0-20220317230530-8472d80307f6/go.mod h1:SvJGPoa/6erhUQV7kvHBwM/0x5LyO6XaG2lUaCaKiUI= github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74 h1:gga7acRE695APm9hlsSMoOoE65U4/TcqNj90mc69Rlg= github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74/go.mod h1:DD4vA1DwXk04H54A1oHXtwZmA0grkVMdPxx/VGLCah0= github.com/wI2L/jsondiff v0.1.1 h1:r2TkoEet7E4JMO5+s1RCY2R0LrNPNHY6hbDeow2hRHw= diff --git a/agent/main.go b/agent/main.go index c8f6f922e..eaad0c57a 100644 --- a/agent/main.go +++ b/agent/main.go @@ -47,7 +47,6 @@ var apiServerAddress = flag.String("api-server-address", "", "Address of mizu AP var namespace = flag.String("namespace", "", "Resolve IPs if they belong to resources in this namespace (default is all)") var harsReaderMode = flag.Bool("hars-read", false, "Run in hars-read mode") var harsDir = flag.String("hars-dir", "", "Directory to read hars from") -var startTime int64 const ( socketConnectionRetries = 30 @@ -110,7 +109,7 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engin app.Use(middlewares.CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before - api.WebSocketRoutes(app, &eventHandlers, startTime) + api.WebSocketRoutes(app, &eventHandlers) if config.Config.OAS { routes.OASRoutes(app) @@ -124,6 +123,7 @@ func hostApi(socketHarOutputChannel chan<- *tapApi.OutputChannelItem) *gin.Engin routes.EntriesRoutes(app) routes.MetadataRoutes(app) routes.StatusRoutes(app) + routes.DbRoutes(app) return app } @@ -133,7 +133,6 @@ func runInApiServerMode(namespace string) *gin.Engine { logger.Log.Fatalf("Error loading config file %v", err) } app.ConfigureBasenineServer(shared.BasenineHost, shared.BaseninePort, config.Config.MaxDBSizeBytes, config.Config.LogLevel, config.Config.InsertionFilter) - startTime = time.Now().UnixNano() / int64(time.Millisecond) api.StartResolving(namespace) enableExpFeatureIfNeeded() diff --git a/agent/pkg/api/socket_routes.go b/agent/pkg/api/socket_routes.go index 7ba69966c..964ac231e 100644 --- a/agent/pkg/api/socket_routes.go +++ b/agent/pkg/api/socket_routes.go @@ -8,6 +8,7 @@ import ( "time" "github.com/up9inc/mizu/agent/pkg/models" + "github.com/up9inc/mizu/agent/pkg/utils" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" @@ -59,13 +60,13 @@ func init() { connectedWebsockets = make(map[int]*SocketConnection) } -func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int64) { +func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers) { SocketGetBrowserHandler = func(c *gin.Context) { - websocketHandler(c.Writer, c.Request, eventHandlers, false, startTime) + websocketHandler(c.Writer, c.Request, eventHandlers, false) } SocketGetTapperHandler = func(c *gin.Context) { - websocketHandler(c.Writer, c.Request, eventHandlers, true, startTime) + websocketHandler(c.Writer, c.Request, eventHandlers, true) } app.GET("/ws", func(c *gin.Context) { @@ -77,7 +78,7 @@ func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers, startTime int }) } -func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool, startTime int64) { +func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool) { ws, err := websocketUpgrader.Upgrade(w, r, nil) if err != nil { logger.Log.Errorf("Failed to set websocket upgrade: %v", err) @@ -99,7 +100,9 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even if !isTapper { connection, err = basenine.NewConnection(shared.BasenineHost, shared.BaseninePort) if err != nil { - panic(err) + logger.Log.Errorf("Failed to establish a connection to Basenine: %v", err) + socketCleanup(socketId, connectedWebsockets[socketId]) + return } } @@ -115,7 +118,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even eventHandlers.WebSocketConnect(socketId, isTapper) - startTimeBytes, _ := models.CreateWebsocketStartTimeMessage(startTime) + startTimeBytes, _ := models.CreateWebsocketStartTimeMessage(utils.StartTime) if err = SendToSocket(socketId, startTimeBytes); err != nil { logger.Log.Error(err) @@ -137,7 +140,8 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even if !isTapper && !isQuerySet { if err := json.Unmarshal(msg, ¶ms); err != nil { - logger.Log.Errorf("Error: %v", socketId, err) + logger.Log.Errorf("Error unmarshalling parameters: %v", socketId, err) + continue } query := params.Query @@ -166,6 +170,10 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even var entry *tapApi.Entry err = json.Unmarshal(bytes, &entry) + if err != nil { + logger.Log.Debugf("Error unmarshalling entry: %v", err.Error()) + continue + } var message []byte if params.EnableFullEntries { @@ -193,7 +201,8 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even var metadata *basenine.Metadata err = json.Unmarshal(bytes, &metadata) if err != nil { - logger.Log.Debugf("Error recieving metadata: %v", err.Error()) + logger.Log.Debugf("Error unmarshalling metadata: %v", err.Error()) + continue } metadataBytes, _ := models.CreateWebsocketQueryMetadataMessage(metadata) diff --git a/agent/pkg/app/main.go b/agent/pkg/app/main.go index 971244491..b712fd7de 100644 --- a/agent/pkg/app/main.go +++ b/agent/pkg/app/main.go @@ -9,7 +9,7 @@ import ( "github.com/op/go-logging" basenine "github.com/up9inc/basenine/client/go" "github.com/up9inc/mizu/agent/pkg/api" - "github.com/up9inc/mizu/agent/pkg/controllers" + "github.com/up9inc/mizu/agent/pkg/utils" "github.com/up9inc/mizu/shared/logger" tapApi "github.com/up9inc/mizu/tap/api" amqpExt "github.com/up9inc/mizu/tap/extensions/amqp" @@ -59,7 +59,6 @@ func LoadExtensions() { return Extensions[i].Protocol.Priority < Extensions[j].Protocol.Priority }) - controllers.InitExtensionsMap(ExtensionsMap) api.InitExtensionsMap(ExtensionsMap) } @@ -92,6 +91,8 @@ func ConfigureBasenineServer(host string, port string, dbSize int64, logLevel lo if err := basenine.InsertionFilter(host, port, insertionFilter); err != nil { logger.Log.Errorf("Error while setting the insertion filter: %v", err) } + + utils.StartTime = time.Now().UnixNano() / int64(time.Millisecond) } func GetEntryInputChannel() chan *tapApi.OutputChannelItem { diff --git a/agent/pkg/controllers/db_controller.go b/agent/pkg/controllers/db_controller.go new file mode 100644 index 000000000..265c16d6c --- /dev/null +++ b/agent/pkg/controllers/db_controller.go @@ -0,0 +1,28 @@ +package controllers + +import ( + "net/http" + + "github.com/gin-gonic/gin" + basenine "github.com/up9inc/basenine/client/go" + "github.com/up9inc/mizu/agent/pkg/app" + "github.com/up9inc/mizu/agent/pkg/config" + "github.com/up9inc/mizu/shared" +) + +func Flush(c *gin.Context) { + if err := basenine.Flush(shared.BasenineHost, shared.BaseninePort); err != nil { + c.JSON(http.StatusBadRequest, err) + } else { + c.JSON(http.StatusOK, "Flushed.") + } +} + +func Reset(c *gin.Context) { + if err := basenine.Reset(shared.BasenineHost, shared.BaseninePort); err != nil { + c.JSON(http.StatusBadRequest, err) + } else { + app.ConfigureBasenineServer(shared.BasenineHost, shared.BaseninePort, config.Config.MaxDBSizeBytes, config.Config.LogLevel, config.Config.InsertionFilter) + c.JSON(http.StatusOK, "Resetted.") + } +} diff --git a/agent/pkg/controllers/entries_controller.go b/agent/pkg/controllers/entries_controller.go index 7523552d0..f12dd8b58 100644 --- a/agent/pkg/controllers/entries_controller.go +++ b/agent/pkg/controllers/entries_controller.go @@ -6,6 +6,7 @@ import ( "strconv" "time" + "github.com/up9inc/mizu/agent/pkg/app" "github.com/up9inc/mizu/agent/pkg/har" "github.com/up9inc/mizu/agent/pkg/models" "github.com/up9inc/mizu/agent/pkg/validation" @@ -18,12 +19,6 @@ import ( tapApi "github.com/up9inc/mizu/tap/api" ) -var extensionsMap map[string]*tapApi.Extension // global - -func InitExtensionsMap(ref map[string]*tapApi.Extension) { - extensionsMap = ref -} - func Error(c *gin.Context, err error) bool { if err != nil { logger.Log.Errorf("Error getting entry: %v", err) @@ -77,7 +72,7 @@ func GetEntries(c *gin.Context) { return // exit } - extension := extensionsMap[entry.Protocol.Name] + extension := app.ExtensionsMap[entry.Protocol.Name] base := extension.Dissector.Summarize(entry) dataSlice = append(dataSlice, base) @@ -123,7 +118,7 @@ func GetEntry(c *gin.Context) { return // exit } - extension := extensionsMap[entry.Protocol.Name] + extension := app.ExtensionsMap[entry.Protocol.Name] base := extension.Dissector.Summarize(entry) representation, bodySize, _ := extension.Dissector.Represent(entry.Request, entry.Response) diff --git a/agent/pkg/routes/db_routes.go b/agent/pkg/routes/db_routes.go new file mode 100644 index 000000000..aa55e9c6b --- /dev/null +++ b/agent/pkg/routes/db_routes.go @@ -0,0 +1,15 @@ +package routes + +import ( + "github.com/up9inc/mizu/agent/pkg/controllers" + + "github.com/gin-gonic/gin" +) + +// DdRoutes defines the group of database routes. +func DbRoutes(app *gin.Engine) { + routeGroup := app.Group("/db") + + routeGroup.GET("/flush", controllers.Flush) + routeGroup.GET("/reset", controllers.Reset) +} diff --git a/agent/pkg/utils/utils.go b/agent/pkg/utils/utils.go index 022266921..034ee08f3 100644 --- a/agent/pkg/utils/utils.go +++ b/agent/pkg/utils/utils.go @@ -17,6 +17,10 @@ import ( "github.com/up9inc/mizu/shared/logger" ) +var ( + StartTime int64 // global +) + // StartServer starts the server with a graceful shutdown func StartServer(app *gin.Engine) { signals := make(chan os.Signal, 2) diff --git a/cli/go.mod b/cli/go.mod index a22e184d6..3860b7e93 100644 --- a/cli/go.mod +++ b/cli/go.mod @@ -11,7 +11,7 @@ require ( github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/spf13/cobra v1.3.0 github.com/spf13/pflag v1.0.5 - github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e + github.com/up9inc/basenine/server/lib v0.0.0-20220317230530-8472d80307f6 github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0 golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 diff --git a/cli/go.sum b/cli/go.sum index 1b5f54655..e4d641a9a 100644 --- a/cli/go.sum +++ b/cli/go.sum @@ -600,8 +600,8 @@ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69 github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= -github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e h1:reG/QwyxdfvGObfdrae7DZc3rTMiGwQ6S/4PRkwtBoE= -github.com/up9inc/basenine/server/lib v0.0.0-20220315070758-3a76cfc4378e/go.mod h1:ZIkxWiJm65jYQIso9k+OZKhR7gQ1we2jNyE2kQX9IQI= +github.com/up9inc/basenine/server/lib v0.0.0-20220317230530-8472d80307f6 h1:+RZTD+HdfIW2SMbc65yWkruTY+g5/1Av074m62A74ls= +github.com/up9inc/basenine/server/lib v0.0.0-20220317230530-8472d80307f6/go.mod h1:ZIkxWiJm65jYQIso9k+OZKhR7gQ1we2jNyE2kQX9IQI= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xlab/treeprint v0.0.0-20181112141820-a009c3971eca/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xlab/treeprint v1.1.0 h1:G/1DjNkPpfZCFt9CSh6b5/nY4VimlbHF3Rh4obvtzDk=