Files
kubeshark/api/main.go
nimrod-up9 da24608bec Tap multiple pods statically (#51)
* WIP

* Update tap.go, provider.go, and 2 more files...

* WIP

* WIP

* Solved routine hanging forever: Added missing flag when calling mizuagent.

* Iterate channel with range.

* Panic if har channel is nil or if websocket connection is nil.

* StartPassiveTapper returns read only channel.

* Solved program exiting immediately: Wait for interrupt signal instead of exiting.

* Solve connecting issue - Retry a few times.

* Use lib const instead of magic.

* Nicer error prints.

* Don't coninue piping message if there is an error.

* Comment.

* Dependency injection.

* no message

* Fixed comment.

* Print tapped addresses when they are updated.

* Print errors in cleanup if there are any.

Co-authored-by: RamiBerm <rami.berman@up9.com>
Co-authored-by: Roee Gadot <roee.gadot@up9.com>
2021-05-20 12:22:23 +03:00

97 lines
2.8 KiB
Go

package main
import (
"encoding/json"
"flag"
"fmt"
"github.com/gofiber/fiber/v2"
"mizuserver/pkg/api"
"mizuserver/pkg/middleware"
"mizuserver/pkg/routes"
"mizuserver/pkg/tap"
"mizuserver/pkg/utils"
"os"
"os/signal"
)
var shouldTap = flag.Bool("tap", false, "Run in tapper mode without API")
var aggregator = flag.Bool("aggregator", false, "Run in aggregator mode with API")
var standalone = flag.Bool("standalone", false, "Run in standalone tapper and API mode")
var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu collector for tapping")
const nodeNameEnvVar = "NODE_NAME"
const tappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST"
func main() {
flag.Parse()
if !*shouldTap && !*aggregator && !*standalone{
panic("One of the flags --tap, --api or --standalone must be provided")
}
if *standalone {
harOutputChannel := tap.StartPassiveTapper()
go api.StartReadingEntries(harOutputChannel, tap.HarOutputDir)
hostApi(nil)
} else if *shouldTap {
if *aggregatorAddress == "" {
panic("Aggregator address must be provided with --aggregator-address when using --tap")
}
tapTargets := getTapTargets()
if tapTargets != nil {
tap.HostAppAddresses = tapTargets
fmt.Println("Filtering for the following addresses:", tap.HostAppAddresses)
}
harOutputChannel := tap.StartPassiveTapper()
socketConnection, err := api.ConnectToSocketServer(*aggregatorAddress)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err))
}
go api.PipeChannelToSocket(socketConnection, harOutputChannel)
} else if *aggregator {
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
go api.StartReadingEntries(socketHarOutChannel, nil)
hostApi(socketHarOutChannel)
}
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt)
<-signalChan
fmt.Println("Exiting")
}
func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) {
app := fiber.New()
middleware.FiberMiddleware(app) // Register Fiber's middleware for app.
app.Static("/", "./site")
//Simple route to know server is running
app.Get("/echo", func(c *fiber.Ctx) error {
return c.SendString("Hello, World 👋!")
})
eventHandlers := api.RoutesEventHandlers{
SocketHarOutChannel: socketHarOutputChannel,
}
routes.WebSocketRoutes(app, &eventHandlers)
routes.EntriesRoutes(app)
routes.NotFoundRoute(app)
utils.StartServer(app)
}
func getTapTargets() []string {
nodeName := os.Getenv(nodeNameEnvVar)
var tappedAddressesPerNodeDict map[string][]string
err := json.Unmarshal([]byte(os.Getenv(tappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict)
if err != nil {
panic(fmt.Sprintf("env var value of %s is invalid! must be map[string][]string %v", tappedAddressesPerNodeDict, err))
}
return tappedAddressesPerNodeDict[nodeName]
}