mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-02-26 00:24:17 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
06c8056443 | ||
|
|
d18f1f8316 | ||
|
|
f9202900ee | ||
|
|
9e34662511 |
@@ -85,7 +85,7 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) {
|
||||
app := gin.Default()
|
||||
|
||||
app.GET("/echo", func(c *gin.Context) {
|
||||
c.String(http.StatusOK, "Hello, World 👋!")
|
||||
c.String(http.StatusOK, "Here is Mizu agent")
|
||||
})
|
||||
|
||||
eventHandlers := api.RoutesEventHandlers{
|
||||
@@ -95,9 +95,10 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) {
|
||||
app.Use(static.ServeRoot("/", "./site"))
|
||||
app.Use(CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
|
||||
|
||||
routes.WebSocketRoutes(app, &eventHandlers)
|
||||
api.WebSocketRoutes(app, &eventHandlers)
|
||||
routes.EntriesRoutes(app)
|
||||
routes.MetadataRoutes(app)
|
||||
routes.StatusRoutes(app)
|
||||
routes.NotFoundRoute(app)
|
||||
|
||||
utils.StartServer(app)
|
||||
|
||||
@@ -88,9 +88,9 @@ func startReadingFiles(workingDir string) {
|
||||
for _, entry := range inputHar.Log.Entries {
|
||||
time.Sleep(time.Millisecond * 250)
|
||||
connectionInfo := &tap.ConnectionInfo{
|
||||
ClientIP: fileInfo.Name(),
|
||||
ClientIP: fileInfo.Name(),
|
||||
ClientPort: "",
|
||||
ServerIP: "",
|
||||
ServerIP: "",
|
||||
ServerPort: "",
|
||||
IsOutgoing: false,
|
||||
}
|
||||
@@ -118,7 +118,6 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
|
||||
entryBytes, _ := json.Marshal(entry)
|
||||
serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL)
|
||||
@@ -168,7 +167,7 @@ func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
|
||||
return
|
||||
}
|
||||
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry)
|
||||
broadcastToBrowserClients(baseEntryBytes)
|
||||
BroadcastToBrowserClients(baseEntryBytes)
|
||||
}
|
||||
|
||||
func getServiceNameFromUrl(inputUrl string) (string, string) {
|
||||
@@ -196,6 +195,5 @@ func getEstimatedEntrySizeBytes(mizuEntry models.MizuEntry) int {
|
||||
sizeBytes += 8 // SizeBytes bytes
|
||||
sizeBytes += 1 // IsOutgoing bytes
|
||||
|
||||
|
||||
return sizeBytes
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package routes
|
||||
package api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
@@ -18,10 +18,10 @@ type EventHandlers interface {
|
||||
}
|
||||
|
||||
type SocketConnection struct {
|
||||
connection *websocket.Conn
|
||||
lock *sync.Mutex
|
||||
connection *websocket.Conn
|
||||
lock *sync.Mutex
|
||||
eventHandlers EventHandlers
|
||||
isTapper bool
|
||||
isTapper bool
|
||||
}
|
||||
|
||||
var websocketUpgrader = websocket.Upgrader{
|
||||
@@ -91,7 +91,7 @@ func socketCleanup(socketId int, socketConnection *SocketConnection) {
|
||||
socketConnection.eventHandlers.WebSocketDisconnect(socketId, socketConnection.isTapper)
|
||||
}
|
||||
|
||||
var db = debounce.NewDebouncer(time.Second * 5, func() {
|
||||
var db = debounce.NewDebouncer(time.Second*5, func() {
|
||||
fmt.Println("Successfully sent to socket")
|
||||
})
|
||||
|
||||
@@ -102,7 +102,7 @@ func SendToSocket(socketId int, message []byte) error {
|
||||
}
|
||||
|
||||
var sent = false
|
||||
time.AfterFunc(time.Second * 5, func() {
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
if !sent {
|
||||
fmt.Println("Socket timed out")
|
||||
socketCleanup(socketId, socketObj)
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"github.com/up9inc/mizu/tap"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/providers"
|
||||
"mizuserver/pkg/routes"
|
||||
"mizuserver/pkg/up9"
|
||||
"sync"
|
||||
)
|
||||
@@ -17,12 +16,12 @@ var browserClientSocketUUIDs = make([]int, 0)
|
||||
var socketListLock = sync.Mutex{}
|
||||
|
||||
type RoutesEventHandlers struct {
|
||||
routes.EventHandlers
|
||||
EventHandlers
|
||||
SocketHarOutChannel chan<- *tap.OutputChannelItem
|
||||
}
|
||||
|
||||
func init() {
|
||||
go up9.UpdateAnalyzeStatus(broadcastToBrowserClients)
|
||||
go up9.UpdateAnalyzeStatus(BroadcastToBrowserClients)
|
||||
}
|
||||
|
||||
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||
@@ -47,15 +46,14 @@ func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
|
||||
}
|
||||
}
|
||||
|
||||
func broadcastToBrowserClients(message []byte) {
|
||||
func BroadcastToBrowserClients(message []byte) {
|
||||
for _, socketId := range browserClientSocketUUIDs {
|
||||
go func(socketId int) {
|
||||
err := routes.SendToSocket(socketId, message)
|
||||
err := SendToSocket(socketId, message)
|
||||
if err != nil {
|
||||
fmt.Printf("error sending message to socket ID %d: %v", socketId, err)
|
||||
}
|
||||
}(socketId)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@@ -81,7 +79,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
|
||||
} else {
|
||||
providers.TapStatus.Pods = statusMessage.TappingStatus.Pods
|
||||
broadcastToBrowserClients(message)
|
||||
BroadcastToBrowserClients(message)
|
||||
}
|
||||
case shared.WebsocketMessageTypeOutboundLink:
|
||||
var outboundLinkMessage models.WebsocketOutboundLinkMessage
|
||||
@@ -116,7 +114,7 @@ func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
|
||||
rlog.Errorf("Error marshaling outbound link message for broadcasting: %v", err)
|
||||
} else {
|
||||
fmt.Printf("Broadcasting outboundlink message %s\n", string(marshaledMessage))
|
||||
broadcastToBrowserClients(marshaledMessage)
|
||||
BroadcastToBrowserClients(marshaledMessage)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"github.com/romana/rlog"
|
||||
"mizuserver/pkg/database"
|
||||
"mizuserver/pkg/models"
|
||||
"mizuserver/pkg/providers"
|
||||
"mizuserver/pkg/up9"
|
||||
"mizuserver/pkg/utils"
|
||||
"mizuserver/pkg/validation"
|
||||
@@ -241,3 +242,15 @@ func GetGeneralStats(c *gin.Context) {
|
||||
database.GetEntriesTable().Raw(sqlQuery).Scan(&result)
|
||||
c.JSON(http.StatusOK, result)
|
||||
}
|
||||
|
||||
func GetTappingStatus(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, providers.TapStatus)
|
||||
}
|
||||
|
||||
func AnalyzeInformation(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, up9.GetAnalyzeInfo())
|
||||
}
|
||||
|
||||
func GetRecentTLSLinks(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, providers.GetAllRecentTLSAddresses())
|
||||
}
|
||||
|
||||
@@ -1,20 +1,32 @@
|
||||
package controllers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"mizuserver/pkg/api"
|
||||
"mizuserver/pkg/providers"
|
||||
"mizuserver/pkg/up9"
|
||||
"mizuserver/pkg/validation"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
func GetTappingStatus(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, providers.TapStatus)
|
||||
}
|
||||
|
||||
func AnalyzeInformation(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, up9.GetAnalyzeInfo())
|
||||
}
|
||||
|
||||
func GetRecentTLSLinks(c *gin.Context) {
|
||||
c.JSON(http.StatusOK, providers.GetAllRecentTLSAddresses())
|
||||
func PostTappedPods(c *gin.Context) {
|
||||
tapStatus := &shared.TapStatus{}
|
||||
if err := c.Bind(tapStatus); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
if err := validation.Validate(tapStatus); err != nil {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
rlog.Infof("[Status] POST request: %d tapped pods", len(tapStatus.Pods))
|
||||
providers.TapStatus.Pods = tapStatus.Pods
|
||||
message := shared.CreateWebSocketStatusMessage(*tapStatus)
|
||||
if jsonBytes, err := json.Marshal(message); err != nil {
|
||||
rlog.Errorf("Could not Marshal message %v\n", err)
|
||||
} else {
|
||||
api.BroadcastToBrowserClients(jsonBytes)
|
||||
}
|
||||
}
|
||||
|
||||
12
agent/pkg/routes/status_routes.go
Normal file
12
agent/pkg/routes/status_routes.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package routes
|
||||
|
||||
import (
|
||||
"github.com/gin-gonic/gin"
|
||||
"mizuserver/pkg/controllers"
|
||||
)
|
||||
|
||||
func StatusRoutes(ginApp *gin.Engine) {
|
||||
routeGroup := ginApp.Group("/status")
|
||||
|
||||
routeGroup.POST("/tappedPods", controllers.PostTappedPods)
|
||||
}
|
||||
@@ -8,17 +8,24 @@ import (
|
||||
"io/ioutil"
|
||||
)
|
||||
|
||||
var outputFileName string
|
||||
var regenerateFile bool
|
||||
|
||||
var configCmd = &cobra.Command{
|
||||
Use: "config",
|
||||
Short: "Generate example config file to stdout",
|
||||
Short: "Generate config with default values",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
template := mizu.GetTemplateConfig()
|
||||
if outputFileName != "" {
|
||||
template, err := mizu.GetConfigWithDefaults()
|
||||
if err != nil {
|
||||
mizu.Log.Errorf("Failed generating config with defaults %v", err)
|
||||
return nil
|
||||
}
|
||||
if regenerateFile {
|
||||
data := []byte(template)
|
||||
_ = ioutil.WriteFile(outputFileName, data, 0644)
|
||||
mizu.Log.Infof(fmt.Sprintf("Template File written to %s", fmt.Sprintf(uiUtils.Purple, outputFileName)))
|
||||
if err := ioutil.WriteFile(mizu.GetConfigFilePath(), data, 0644); err != nil {
|
||||
mizu.Log.Errorf("Failed writing config %v", err)
|
||||
return nil
|
||||
}
|
||||
mizu.Log.Infof(fmt.Sprintf("Template File written to %s", fmt.Sprintf(uiUtils.Purple, mizu.GetConfigFilePath())))
|
||||
} else {
|
||||
mizu.Log.Debugf("Writing template config.\n%v", template)
|
||||
fmt.Printf("%v", template)
|
||||
@@ -29,6 +36,5 @@ var configCmd = &cobra.Command{
|
||||
|
||||
func init() {
|
||||
rootCmd.AddCommand(configCmd)
|
||||
|
||||
configCmd.Flags().StringVarP(&outputFileName, "file", "f", "", "Save content to local file")
|
||||
configCmd.Flags().BoolVarP(®enerateFile, "regenerate", "r", false, fmt.Sprintf("Regenerate the config file with default values %s", mizu.GetConfigFilePath()))
|
||||
}
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
@@ -9,6 +11,7 @@ import (
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/debounce"
|
||||
core "k8s.io/api/core/v1"
|
||||
errors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"net/http"
|
||||
@@ -53,28 +56,20 @@ func RunMizuTap() {
|
||||
defer cancel() // cancel will be called when this function exits
|
||||
|
||||
targetNamespace := getNamespace(kubernetesProvider)
|
||||
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
|
||||
mizu.Log.Infof("Error listing pods: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if mizu.Config.Tap.DryRun {
|
||||
return
|
||||
}
|
||||
|
||||
urlReadyChan := make(chan string)
|
||||
go func() {
|
||||
mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan)
|
||||
}()
|
||||
|
||||
var namespacesStr string
|
||||
if targetNamespace != mizu.K8sAllNamespaces {
|
||||
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
|
||||
} else {
|
||||
namespacesStr = "all namespaces"
|
||||
}
|
||||
mizu.CheckNewerVersion()
|
||||
mizu.Log.Infof("Tapping pods in %s", namespacesStr)
|
||||
|
||||
if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
|
||||
mizu.Log.Infof("Error listing pods: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if len(currentlyTappedPods) == 0 {
|
||||
var suggestionStr string
|
||||
if targetNamespace != mizu.K8sAllNamespaces {
|
||||
@@ -83,6 +78,10 @@ func RunMizuTap() {
|
||||
mizu.Log.Infof("Did not find any pods matching the regex argument%s", suggestionStr)
|
||||
}
|
||||
|
||||
if mizu.Config.Tap.DryRun {
|
||||
return
|
||||
}
|
||||
|
||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
|
||||
if err != nil {
|
||||
return
|
||||
@@ -92,10 +91,8 @@ func RunMizuTap() {
|
||||
return
|
||||
}
|
||||
|
||||
mizu.CheckNewerVersion()
|
||||
go portForwardApiPod(ctx, kubernetesProvider, cancel, urlReadyChan) // TODO convert this to job for built in pod ttl or have the running app handle this
|
||||
go createProxyToApiServerPod(ctx, kubernetesProvider, cancel)
|
||||
go watchPodsForTapping(ctx, kubernetesProvider, cancel)
|
||||
go syncApiStatus(ctx, cancel)
|
||||
|
||||
//block until exit signal or error
|
||||
waitForFinish(ctx, cancel)
|
||||
@@ -121,9 +118,10 @@ func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
_, err := kubernetesProvider.CreateNamespace(ctx, mizu.ResourcesNamespace)
|
||||
if err != nil {
|
||||
mizu.Log.Infof("Error creating Namespace %s: %v", mizu.ResourcesNamespace, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
mizu.Log.Debugf("Successfully creating Namespace %s", mizu.ResourcesNamespace)
|
||||
return nil
|
||||
}
|
||||
|
||||
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
|
||||
@@ -141,17 +139,20 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
mizu.Log.Infof("Error creating mizu %s pod: %v", mizu.ApiServerPodName, err)
|
||||
return err
|
||||
}
|
||||
mizu.Log.Debugf("Successfully created API server pod: %s", mizu.ApiServerPodName)
|
||||
|
||||
apiServerService, err = kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.ApiServerPodName, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
mizu.Log.Infof("Error creating mizu %s service: %v", mizu.ApiServerPodName, err)
|
||||
return err
|
||||
}
|
||||
mizu.Log.Debugf("Successfully created service: %s", mizu.ApiServerPodName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
|
||||
|
||||
var compiledRegexSlice []*shared.SerializableRegexp
|
||||
|
||||
if mizu.Config.Tap.PlainTextFilterRegexes != nil && len(mizu.Config.Tap.PlainTextFilterRegexes) > 0 {
|
||||
@@ -192,9 +193,10 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
mizu.Log.Infof("Error creating mizu tapper daemonset: %v", err)
|
||||
return err
|
||||
}
|
||||
mizu.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
|
||||
} else {
|
||||
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
||||
mizu.Log.Infof("Error deleting mizu tapper daemonset: %v", err)
|
||||
mizu.Log.Errorf("Error deleting mizu tapper daemonset: %v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -237,24 +239,55 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
|
||||
}
|
||||
}
|
||||
|
||||
func reportTappedPods() {
|
||||
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Fetch.MizuPort)
|
||||
tappedPodsUrl := fmt.Sprintf("http://%s/status/tappedPods", mizuProxiedUrl)
|
||||
|
||||
podInfos := make([]shared.PodInfo, 0)
|
||||
for _, pod := range currentlyTappedPods {
|
||||
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
|
||||
}
|
||||
tapStatus := shared.TapStatus{Pods: podInfos}
|
||||
|
||||
if jsonValue, err := json.Marshal(tapStatus); err != nil {
|
||||
mizu.Log.Debugf("[ERROR] failed Marshal the tapped pods %v", err)
|
||||
} else {
|
||||
if response, err := http.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
|
||||
mizu.Log.Debugf("[ERROR] failed sending to API server the tapped pods %v", err)
|
||||
} else if response.StatusCode != 200 {
|
||||
mizu.Log.Debugf("[ERROR] failed sending to API server the tapped pods, response status code %v", response.StatusCode)
|
||||
} else {
|
||||
mizu.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
targetNamespace := getNamespace(kubernetesProvider)
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), mizu.Config.Tap.PodRegex())
|
||||
|
||||
restartTappers := func() {
|
||||
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
|
||||
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
|
||||
err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace)
|
||||
if err != nil {
|
||||
mizu.Log.Errorf("Error getting pods by regex: %s (%v,%+v)", err, err, err)
|
||||
cancel()
|
||||
}
|
||||
|
||||
if !changeFound {
|
||||
mizu.Log.Debugf("Nothing changed update tappers not needed")
|
||||
return
|
||||
}
|
||||
|
||||
reportTappedPods()
|
||||
|
||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
|
||||
if err != nil {
|
||||
mizu.Log.Infof("Error building node to ips map: %s (%v,%+v)", err, err, err)
|
||||
mizu.Log.Errorf("Error building node to ips map: %s (%v,%+v)", err, err, err)
|
||||
cancel()
|
||||
}
|
||||
|
||||
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
|
||||
mizu.Log.Infof("Error updating daemonset: %s (%v,%+v)", err, err, err)
|
||||
mizu.Log.Errorf("Error updating daemonset: %s (%v,%+v)", err, err, err)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
@@ -262,17 +295,21 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-added:
|
||||
case <-removed:
|
||||
case pod := <-added:
|
||||
mizu.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
||||
restartTappersDebouncer.SetOn()
|
||||
case modifiedTarget := <-modified:
|
||||
case pod := <-removed:
|
||||
mizu.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
||||
restartTappersDebouncer.SetOn()
|
||||
case pod := <-modified:
|
||||
mizu.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
|
||||
// Act only if the modified pod has already obtained an IP address.
|
||||
// After filtering for IPs, on a normal pod restart this includes the following events:
|
||||
// - Pod deletion
|
||||
// - Pod reaches start state
|
||||
// - Pod reaches ready state
|
||||
// Ready/unready transitions might also trigger this event.
|
||||
if modifiedTarget.Status.PodIP != "" {
|
||||
if pod.Status.PodIP != "" {
|
||||
restartTappersDebouncer.SetOn()
|
||||
}
|
||||
|
||||
@@ -286,22 +323,25 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
}
|
||||
}
|
||||
|
||||
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) error {
|
||||
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) (error, bool) {
|
||||
changeFound := false
|
||||
if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespace); err != nil {
|
||||
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
|
||||
return err
|
||||
return err, false
|
||||
} else {
|
||||
addedPods, removedPods := getPodArrayDiff(currentlyTappedPods, matchingPods)
|
||||
for _, addedPod := range addedPods {
|
||||
changeFound = true
|
||||
mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name))
|
||||
}
|
||||
for _, removedPod := range removedPods {
|
||||
changeFound = true
|
||||
mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name))
|
||||
}
|
||||
currentlyTappedPods = matchingPods
|
||||
}
|
||||
|
||||
return nil
|
||||
return nil, changeFound
|
||||
}
|
||||
|
||||
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
|
||||
@@ -329,43 +369,45 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
||||
return missingPods
|
||||
}
|
||||
|
||||
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, urlReadyChan chan string) {
|
||||
func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex)
|
||||
isPodReady := false
|
||||
timeAfter := time.After(25 * time.Second)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-added:
|
||||
mizu.Log.Debugf("Got agent pod added event")
|
||||
continue
|
||||
case <-removed:
|
||||
mizu.Log.Infof("%s removed", mizu.ApiServerPodName)
|
||||
cancel()
|
||||
return
|
||||
case modifiedPod := <-modified:
|
||||
if modifiedPod.Status.Phase == "Running" && !isPodReady {
|
||||
mizu.Log.Debugf("Got agent pod modified event, status phase: %v", modifiedPod.Status.Phase)
|
||||
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
||||
isPodReady = true
|
||||
go func() {
|
||||
err := kubernetes.StartProxy(kubernetesProvider, mizu.Config.Tap.GuiPort, mizu.ResourcesNamespace, mizu.ApiServerPodName)
|
||||
if err != nil {
|
||||
mizu.Log.Infof("Error occurred while running k8s proxy %v", err)
|
||||
mizu.Log.Errorf("Error occurred while running k8s proxy %v", err)
|
||||
cancel()
|
||||
}
|
||||
}()
|
||||
mizu.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort))
|
||||
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
|
||||
requestForAnalysis()
|
||||
reportTappedPods()
|
||||
}
|
||||
|
||||
urlReadyChan <- kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)
|
||||
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
|
||||
requestForAnalysis()
|
||||
case <-timeAfter:
|
||||
if !isPodReady {
|
||||
mizu.Log.Errorf("error: %s pod was not ready in time", mizu.ApiServerPodName)
|
||||
mizu.Log.Errorf("Error: %s pod was not ready in time", mizu.ApiServerPodName)
|
||||
cancel()
|
||||
}
|
||||
case <-errorChan:
|
||||
mizu.Log.Debugf("[ERROR] Agent creation, watching %v namespace", mizu.ResourcesNamespace)
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
@@ -401,7 +443,7 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P
|
||||
}
|
||||
if !mizuRBACExists {
|
||||
err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.ResourcesNamespace, mizu.ServiceAccountName, mizu.ClusterRoleName, mizu.ClusterRoleBindingName, mizu.RBACVersion)
|
||||
if err != nil {
|
||||
if err != nil && !errors.IsAlreadyExists(err) {
|
||||
mizu.Log.Infof("warning: could not create mizu rbac resources %v", err)
|
||||
return false
|
||||
}
|
||||
@@ -435,28 +477,6 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
|
||||
}
|
||||
}
|
||||
|
||||
func syncApiStatus(ctx context.Context, cancel context.CancelFunc) {
|
||||
controlSocketStr := fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort))
|
||||
controlSocket, err := mizu.CreateControlSocket(controlSocketStr)
|
||||
if err != nil {
|
||||
mizu.Log.Infof("error establishing control socket connection %s", err)
|
||||
cancel()
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods)
|
||||
if err != nil {
|
||||
mizu.Log.Debugf("error Sending message via control socket %v, error: %s", controlSocketStr, err)
|
||||
}
|
||||
time.Sleep(10 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func getNamespace(kubernetesProvider *kubernetes.Provider) string {
|
||||
if mizu.Config.Tap.AllNamespaces {
|
||||
return mizu.K8sAllNamespaces
|
||||
|
||||
@@ -473,6 +473,8 @@ func (provider *Provider) CheckDaemonSetExists(ctx context.Context, namespace st
|
||||
}
|
||||
|
||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, tapOutgoing bool) error {
|
||||
mizu.Log.Debugf("Applying %d tapper deamonsets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
|
||||
|
||||
if len(nodeToTappedPodIPMap) == 0 {
|
||||
return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName)
|
||||
}
|
||||
@@ -493,12 +495,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
mizuCmd = append(mizuCmd, "--anydirection")
|
||||
}
|
||||
|
||||
privileged := true
|
||||
agentContainer := applyconfcore.Container()
|
||||
agentContainer.WithName(tapperPodName)
|
||||
agentContainer.WithImage(podImage)
|
||||
agentContainer.WithImagePullPolicy(core.PullAlways)
|
||||
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged))
|
||||
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(true))
|
||||
agentContainer.WithCommand(mizuCmd...)
|
||||
agentContainer.WithEnv(
|
||||
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
Separator = "="
|
||||
Separator = "="
|
||||
SetCommandName = "set"
|
||||
)
|
||||
|
||||
@@ -29,33 +29,34 @@ func InitConfig(cmd *cobra.Command) error {
|
||||
}
|
||||
|
||||
if err := mergeConfigFile(); err != nil {
|
||||
Log.Infof(uiUtils.Red, "Invalid config file")
|
||||
return err
|
||||
Log.Errorf("Could not load config file, error %v", err)
|
||||
Log.Fatalf("You can regenerate the file using `mizu config -r` or just remove it %v", GetConfigFilePath())
|
||||
}
|
||||
|
||||
cmd.Flags().Visit(initFlag)
|
||||
|
||||
finalConfigPrettified, _ := uiUtils.PrettyJson(Config)
|
||||
Log.Debugf("Merged all config successfully\n Final config: %v", finalConfigPrettified)
|
||||
Log.Debugf("Init config finished\n Final config: %v", finalConfigPrettified)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetTemplateConfig() string {
|
||||
prettifiedConfig, _ := uiUtils.PrettyYaml(Config)
|
||||
return prettifiedConfig
|
||||
func GetConfigWithDefaults() (string, error) {
|
||||
defaultConf := ConfigStruct{}
|
||||
if err := defaults.Set(&defaultConf); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return uiUtils.PrettyYaml(defaultConf)
|
||||
}
|
||||
|
||||
func GetConfigFilePath() string {
|
||||
return path.Join(getMizuFolderPath(), "config.yaml")
|
||||
}
|
||||
|
||||
func mergeConfigFile() error {
|
||||
Log.Debugf("Merging config file values")
|
||||
home, homeDirErr := os.UserHomeDir()
|
||||
if homeDirErr != nil {
|
||||
return homeDirErr
|
||||
}
|
||||
|
||||
reader, openErr := os.Open(path.Join(home, ".mizu", "config.yaml"))
|
||||
reader, openErr := os.Open(GetConfigFilePath())
|
||||
if openErr != nil {
|
||||
return openErr
|
||||
return nil
|
||||
}
|
||||
|
||||
buf, readErr := ioutil.ReadAll(reader)
|
||||
@@ -66,6 +67,7 @@ func mergeConfigFile() error {
|
||||
if err := yaml.Unmarshal(buf, &Config); err != nil {
|
||||
return err
|
||||
}
|
||||
Log.Debugf("Found config file, merged to default options")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,5 +1,10 @@
|
||||
package mizu
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
var (
|
||||
SemVer = "0.0.1"
|
||||
Branch = "develop"
|
||||
@@ -18,3 +23,11 @@ const (
|
||||
TapperDaemonSetName = "mizu-tapper-daemon-set"
|
||||
TapperPodName = "mizu-tapper"
|
||||
)
|
||||
|
||||
func getMizuFolderPath() string {
|
||||
home, homeDirErr := os.UserHomeDir()
|
||||
if homeDirErr != nil {
|
||||
return ""
|
||||
}
|
||||
return path.Join(home, ".mizu")
|
||||
}
|
||||
|
||||
@@ -14,10 +14,9 @@ var format = logging.MustStringFormatter(
|
||||
)
|
||||
|
||||
func InitLogger() {
|
||||
homeDirPath, _ := os.UserHomeDir()
|
||||
mizuDirPath := path.Join(homeDirPath, ".mizu")
|
||||
mizuDirPath := getMizuFolderPath()
|
||||
if err := os.MkdirAll(mizuDirPath, os.ModePerm); err != nil {
|
||||
panic(fmt.Sprintf("Failed creating .mizu dir: %v, err %v", mizuDirPath, err))
|
||||
panic(fmt.Sprintf("Failed creating mizu dir: %v, err %v", mizuDirPath, err))
|
||||
}
|
||||
logPath := path.Join(mizuDirPath, "log.log")
|
||||
f, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||
@@ -35,5 +34,6 @@ func InitLogger() {
|
||||
|
||||
logging.SetBackend(backend1Leveled, backend2Formatter)
|
||||
|
||||
Log.Debugf("\n\n\n")
|
||||
Log.Debugf("Running mizu version %v", SemVer)
|
||||
}
|
||||
|
||||
@@ -15,10 +15,6 @@ func ReportRun(cmd string, args interface{}) {
|
||||
return
|
||||
}
|
||||
|
||||
if Branch != "main" {
|
||||
Log.Debugf("reporting only on main branch")
|
||||
return
|
||||
}
|
||||
argsBytes, _ := json.Marshal(args)
|
||||
argsMap := map[string]string{
|
||||
"telemetry_type": "execution",
|
||||
@@ -26,6 +22,7 @@ func ReportRun(cmd string, args interface{}) {
|
||||
"args": string(argsBytes),
|
||||
"component": "mizu_cli",
|
||||
"BuildTimestamp": BuildTimestamp,
|
||||
"Branch": Branch,
|
||||
"version": SemVer}
|
||||
argsMap["message"] = fmt.Sprintf("mizu %v - %v", argsMap["cmd"], string(argsBytes))
|
||||
|
||||
|
||||
@@ -51,7 +51,7 @@ func parseAppPorts(appPortsList string) []int {
|
||||
return ports
|
||||
}
|
||||
|
||||
var maxcount = flag.Int("c", -1, "Only grab this many packets, then exit")
|
||||
var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit")
|
||||
var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)")
|
||||
var statsevery = flag.Int("stats", 60, "Output statistics every N seconds")
|
||||
var lazy = flag.Bool("lazy", false, "If true, do lazy decoding")
|
||||
@@ -175,6 +175,10 @@ type Context struct {
|
||||
CaptureInfo gopacket.CaptureInfo
|
||||
}
|
||||
|
||||
func GetStats() AppStats {
|
||||
return statsTracker.appStats
|
||||
}
|
||||
|
||||
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
|
||||
return c.CaptureInfo
|
||||
}
|
||||
@@ -336,9 +340,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
source.Lazy = *lazy
|
||||
source.NoCopy = true
|
||||
rlog.Info("Starting to read packets")
|
||||
count := 0
|
||||
bytes := int64(0)
|
||||
start := time.Now()
|
||||
statsTracker.setStartTime(time.Now())
|
||||
defragger := ip4defrag.NewIPv4Defragmenter()
|
||||
|
||||
streamFactory := &tcpStreamFactory{
|
||||
@@ -383,9 +385,9 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
errorsSummery := fmt.Sprintf("%v", errorsMap)
|
||||
errorsMapMutex.Unlock()
|
||||
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
||||
count,
|
||||
bytes,
|
||||
time.Since(start),
|
||||
statsTracker.appStats.TotalPacketsCount,
|
||||
statsTracker.appStats.TotalProcessedBytes,
|
||||
time.Since(statsTracker.appStats.StartTime),
|
||||
nErrors,
|
||||
errorMapLen,
|
||||
errorsSummery,
|
||||
@@ -403,13 +405,13 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
|
||||
// Since the last print
|
||||
cleanStats := cleaner.dumpStats()
|
||||
appStats := statsTracker.dumpStats()
|
||||
matchedMessages := statsTracker.dumpStats()
|
||||
log.Printf(
|
||||
"flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d",
|
||||
cleanStats.flushed,
|
||||
cleanStats.closed,
|
||||
cleanStats.deleted,
|
||||
appStats.matchedMessages,
|
||||
matchedMessages,
|
||||
)
|
||||
}
|
||||
}()
|
||||
@@ -419,10 +421,10 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
}
|
||||
|
||||
for packet := range source.Packets() {
|
||||
count++
|
||||
rlog.Debugf("PACKET #%d", count)
|
||||
packetsCount := statsTracker.incPacketsCount()
|
||||
rlog.Debugf("PACKET #%d", packetsCount)
|
||||
data := packet.Data()
|
||||
bytes += int64(len(data))
|
||||
statsTracker.updateProcessedSize(int64(len(data)))
|
||||
if *hexdumppkt {
|
||||
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
|
||||
}
|
||||
@@ -473,12 +475,17 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
assemblerMutex.Unlock()
|
||||
}
|
||||
|
||||
done := *maxcount > 0 && count >= *maxcount
|
||||
done := *maxcount > 0 && statsTracker.appStats.TotalPacketsCount >= *maxcount
|
||||
if done {
|
||||
errorsMapMutex.Lock()
|
||||
errorMapLen := len(errorsMap)
|
||||
errorsMapMutex.Unlock()
|
||||
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", count, bytes, time.Since(start), nErrors, errorMapLen)
|
||||
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
|
||||
statsTracker.appStats.TotalPacketsCount,
|
||||
statsTracker.appStats.TotalProcessedBytes,
|
||||
time.Since(statsTracker.appStats.StartTime),
|
||||
nErrors,
|
||||
errorMapLen)
|
||||
}
|
||||
select {
|
||||
case <-signalChan:
|
||||
@@ -535,4 +542,5 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
|
||||
for e := range errorsMap {
|
||||
log.Printf(" %s:\t\t%d", e, errorsMap[e])
|
||||
}
|
||||
log.Printf("AppStats: %v", GetStats())
|
||||
}
|
||||
|
||||
@@ -2,34 +2,54 @@ package tap
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type AppStats struct {
|
||||
matchedMessages int
|
||||
StartTime time.Time `json:"startTime"`
|
||||
MatchedMessages int `json:"matchedMessages"`
|
||||
TotalPacketsCount int64 `json:"totalPacketsCount"`
|
||||
TotalProcessedBytes int64 `json:"totalProcessedBytes"`
|
||||
TotalMatchedMessages int64 `json:"totalMatchedMessages"`
|
||||
}
|
||||
|
||||
type StatsTracker struct {
|
||||
stats AppStats
|
||||
statsMutex sync.Mutex
|
||||
appStats AppStats
|
||||
matchedMessagesMutex sync.Mutex
|
||||
totalPacketsCountMutex sync.Mutex
|
||||
totalProcessedSizeMutex sync.Mutex
|
||||
}
|
||||
|
||||
func (st *StatsTracker) incMatchedMessages() {
|
||||
st.statsMutex.Lock()
|
||||
st.stats.matchedMessages++
|
||||
st.statsMutex.Unlock()
|
||||
st.matchedMessagesMutex.Lock()
|
||||
st.appStats.MatchedMessages++
|
||||
st.appStats.TotalMatchedMessages++
|
||||
st.matchedMessagesMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) dumpStats() AppStats {
|
||||
st.statsMutex.Lock()
|
||||
|
||||
stats := AppStats{
|
||||
matchedMessages: st.stats.matchedMessages,
|
||||
}
|
||||
|
||||
st.stats.matchedMessages = 0
|
||||
|
||||
st.statsMutex.Unlock()
|
||||
|
||||
return stats
|
||||
func (st *StatsTracker) incPacketsCount() int64 {
|
||||
st.totalPacketsCountMutex.Lock()
|
||||
st.appStats.TotalPacketsCount++
|
||||
currentPacketsCount := st.appStats.TotalPacketsCount
|
||||
st.totalPacketsCountMutex.Unlock()
|
||||
return currentPacketsCount
|
||||
}
|
||||
|
||||
func (st *StatsTracker) updateProcessedSize(size int64) {
|
||||
st.totalProcessedSizeMutex.Lock()
|
||||
st.appStats.TotalProcessedBytes += size
|
||||
st.totalProcessedSizeMutex.Unlock()
|
||||
}
|
||||
|
||||
func (st *StatsTracker) setStartTime(startTime time.Time) {
|
||||
st.appStats.StartTime = startTime
|
||||
}
|
||||
|
||||
func (st *StatsTracker) dumpStats() int {
|
||||
st.matchedMessagesMutex.Lock()
|
||||
matchedMessages := st.appStats.MatchedMessages
|
||||
st.appStats.MatchedMessages = 0
|
||||
st.matchedMessagesMutex.Unlock()
|
||||
|
||||
return matchedMessages
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user