Compare commits

..

4 Commits

Author SHA1 Message Date
gadotroee
06c8056443 Tapper stats in stats tracker (#166) 2021-08-04 12:51:51 +03:00
Igor Gov
d18f1f8316 Tapped pods report via endpoint instead of web socket (#164) 2021-08-04 10:41:33 +03:00
Igor Gov
f9202900ee No warning when mizu rbac exists (#163) 2021-08-04 08:41:00 +03:00
Igor Gov
9e34662511 Adding logs and fixing several issues (#162)
* Config grooming and several general fixes
2021-08-04 08:18:07 +03:00
16 changed files with 260 additions and 159 deletions

View File

@@ -85,7 +85,7 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) {
app := gin.Default()
app.GET("/echo", func(c *gin.Context) {
c.String(http.StatusOK, "Hello, World 👋!")
c.String(http.StatusOK, "Here is Mizu agent")
})
eventHandlers := api.RoutesEventHandlers{
@@ -95,9 +95,10 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) {
app.Use(static.ServeRoot("/", "./site"))
app.Use(CORSMiddleware()) // This has to be called after the static middleware, does not work if its called before
routes.WebSocketRoutes(app, &eventHandlers)
api.WebSocketRoutes(app, &eventHandlers)
routes.EntriesRoutes(app)
routes.MetadataRoutes(app)
routes.StatusRoutes(app)
routes.NotFoundRoute(app)
utils.StartServer(app)

View File

@@ -88,9 +88,9 @@ func startReadingFiles(workingDir string) {
for _, entry := range inputHar.Log.Entries {
time.Sleep(time.Millisecond * 250)
connectionInfo := &tap.ConnectionInfo{
ClientIP: fileInfo.Name(),
ClientIP: fileInfo.Name(),
ClientPort: "",
ServerIP: "",
ServerIP: "",
ServerPort: "",
IsOutgoing: false,
}
@@ -118,7 +118,6 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
}
}
func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
entryBytes, _ := json.Marshal(entry)
serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL)
@@ -168,7 +167,7 @@ func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
return
}
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry)
broadcastToBrowserClients(baseEntryBytes)
BroadcastToBrowserClients(baseEntryBytes)
}
func getServiceNameFromUrl(inputUrl string) (string, string) {
@@ -196,6 +195,5 @@ func getEstimatedEntrySizeBytes(mizuEntry models.MizuEntry) int {
sizeBytes += 8 // SizeBytes bytes
sizeBytes += 1 // IsOutgoing bytes
return sizeBytes
}

View File

@@ -1,4 +1,4 @@
package routes
package api
import (
"errors"
@@ -18,10 +18,10 @@ type EventHandlers interface {
}
type SocketConnection struct {
connection *websocket.Conn
lock *sync.Mutex
connection *websocket.Conn
lock *sync.Mutex
eventHandlers EventHandlers
isTapper bool
isTapper bool
}
var websocketUpgrader = websocket.Upgrader{
@@ -91,7 +91,7 @@ func socketCleanup(socketId int, socketConnection *SocketConnection) {
socketConnection.eventHandlers.WebSocketDisconnect(socketId, socketConnection.isTapper)
}
var db = debounce.NewDebouncer(time.Second * 5, func() {
var db = debounce.NewDebouncer(time.Second*5, func() {
fmt.Println("Successfully sent to socket")
})
@@ -102,7 +102,7 @@ func SendToSocket(socketId int, message []byte) error {
}
var sent = false
time.AfterFunc(time.Second * 5, func() {
time.AfterFunc(time.Second*5, func() {
if !sent {
fmt.Println("Socket timed out")
socketCleanup(socketId, socketObj)

View File

@@ -8,7 +8,6 @@ import (
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
"mizuserver/pkg/routes"
"mizuserver/pkg/up9"
"sync"
)
@@ -17,12 +16,12 @@ var browserClientSocketUUIDs = make([]int, 0)
var socketListLock = sync.Mutex{}
type RoutesEventHandlers struct {
routes.EventHandlers
EventHandlers
SocketHarOutChannel chan<- *tap.OutputChannelItem
}
func init() {
go up9.UpdateAnalyzeStatus(broadcastToBrowserClients)
go up9.UpdateAnalyzeStatus(BroadcastToBrowserClients)
}
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
@@ -47,15 +46,14 @@ func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
}
}
func broadcastToBrowserClients(message []byte) {
func BroadcastToBrowserClients(message []byte) {
for _, socketId := range browserClientSocketUUIDs {
go func(socketId int) {
err := routes.SendToSocket(socketId, message)
err := SendToSocket(socketId, message)
if err != nil {
fmt.Printf("error sending message to socket ID %d: %v", socketId, err)
}
}(socketId)
}
}
@@ -81,7 +79,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
} else {
providers.TapStatus.Pods = statusMessage.TappingStatus.Pods
broadcastToBrowserClients(message)
BroadcastToBrowserClients(message)
}
case shared.WebsocketMessageTypeOutboundLink:
var outboundLinkMessage models.WebsocketOutboundLinkMessage
@@ -116,7 +114,7 @@ func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
rlog.Errorf("Error marshaling outbound link message for broadcasting: %v", err)
} else {
fmt.Printf("Broadcasting outboundlink message %s\n", string(marshaledMessage))
broadcastToBrowserClients(marshaledMessage)
BroadcastToBrowserClients(marshaledMessage)
}
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/romana/rlog"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
"mizuserver/pkg/up9"
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
@@ -241,3 +242,15 @@ func GetGeneralStats(c *gin.Context) {
database.GetEntriesTable().Raw(sqlQuery).Scan(&result)
c.JSON(http.StatusOK, result)
}
func GetTappingStatus(c *gin.Context) {
c.JSON(http.StatusOK, providers.TapStatus)
}
func AnalyzeInformation(c *gin.Context) {
c.JSON(http.StatusOK, up9.GetAnalyzeInfo())
}
func GetRecentTLSLinks(c *gin.Context) {
c.JSON(http.StatusOK, providers.GetAllRecentTLSAddresses())
}

View File

@@ -1,20 +1,32 @@
package controllers
import (
"encoding/json"
"github.com/gin-gonic/gin"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"mizuserver/pkg/api"
"mizuserver/pkg/providers"
"mizuserver/pkg/up9"
"mizuserver/pkg/validation"
"net/http"
)
func GetTappingStatus(c *gin.Context) {
c.JSON(http.StatusOK, providers.TapStatus)
}
func AnalyzeInformation(c *gin.Context) {
c.JSON(http.StatusOK, up9.GetAnalyzeInfo())
}
func GetRecentTLSLinks(c *gin.Context) {
c.JSON(http.StatusOK, providers.GetAllRecentTLSAddresses())
func PostTappedPods(c *gin.Context) {
tapStatus := &shared.TapStatus{}
if err := c.Bind(tapStatus); err != nil {
c.JSON(http.StatusBadRequest, err)
return
}
if err := validation.Validate(tapStatus); err != nil {
c.JSON(http.StatusBadRequest, err)
return
}
rlog.Infof("[Status] POST request: %d tapped pods", len(tapStatus.Pods))
providers.TapStatus.Pods = tapStatus.Pods
message := shared.CreateWebSocketStatusMessage(*tapStatus)
if jsonBytes, err := json.Marshal(message); err != nil {
rlog.Errorf("Could not Marshal message %v\n", err)
} else {
api.BroadcastToBrowserClients(jsonBytes)
}
}

View File

@@ -0,0 +1,12 @@
package routes
import (
"github.com/gin-gonic/gin"
"mizuserver/pkg/controllers"
)
func StatusRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/status")
routeGroup.POST("/tappedPods", controllers.PostTappedPods)
}

View File

@@ -8,17 +8,24 @@ import (
"io/ioutil"
)
var outputFileName string
var regenerateFile bool
var configCmd = &cobra.Command{
Use: "config",
Short: "Generate example config file to stdout",
Short: "Generate config with default values",
RunE: func(cmd *cobra.Command, args []string) error {
template := mizu.GetTemplateConfig()
if outputFileName != "" {
template, err := mizu.GetConfigWithDefaults()
if err != nil {
mizu.Log.Errorf("Failed generating config with defaults %v", err)
return nil
}
if regenerateFile {
data := []byte(template)
_ = ioutil.WriteFile(outputFileName, data, 0644)
mizu.Log.Infof(fmt.Sprintf("Template File written to %s", fmt.Sprintf(uiUtils.Purple, outputFileName)))
if err := ioutil.WriteFile(mizu.GetConfigFilePath(), data, 0644); err != nil {
mizu.Log.Errorf("Failed writing config %v", err)
return nil
}
mizu.Log.Infof(fmt.Sprintf("Template File written to %s", fmt.Sprintf(uiUtils.Purple, mizu.GetConfigFilePath())))
} else {
mizu.Log.Debugf("Writing template config.\n%v", template)
fmt.Printf("%v", template)
@@ -29,6 +36,5 @@ var configCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(configCmd)
configCmd.Flags().StringVarP(&outputFileName, "file", "f", "", "Save content to local file")
configCmd.Flags().BoolVarP(&regenerateFile, "regenerate", "r", false, fmt.Sprintf("Regenerate the config file with default values %s", mizu.GetConfigFilePath()))
}

View File

@@ -1,7 +1,9 @@
package cmd
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/mizu"
@@ -9,6 +11,7 @@ import (
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
core "k8s.io/api/core/v1"
errors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/clientcmd"
"net/http"
@@ -53,28 +56,20 @@ func RunMizuTap() {
defer cancel() // cancel will be called when this function exits
targetNamespace := getNamespace(kubernetesProvider)
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
mizu.Log.Infof("Error listing pods: %v", err)
return
}
if mizu.Config.Tap.DryRun {
return
}
urlReadyChan := make(chan string)
go func() {
mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan)
}()
var namespacesStr string
if targetNamespace != mizu.K8sAllNamespaces {
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
} else {
namespacesStr = "all namespaces"
}
mizu.CheckNewerVersion()
mizu.Log.Infof("Tapping pods in %s", namespacesStr)
if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
mizu.Log.Infof("Error listing pods: %v", err)
return
}
if len(currentlyTappedPods) == 0 {
var suggestionStr string
if targetNamespace != mizu.K8sAllNamespaces {
@@ -83,6 +78,10 @@ func RunMizuTap() {
mizu.Log.Infof("Did not find any pods matching the regex argument%s", suggestionStr)
}
if mizu.Config.Tap.DryRun {
return
}
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
if err != nil {
return
@@ -92,10 +91,8 @@ func RunMizuTap() {
return
}
mizu.CheckNewerVersion()
go portForwardApiPod(ctx, kubernetesProvider, cancel, urlReadyChan) // TODO convert this to job for built in pod ttl or have the running app handle this
go createProxyToApiServerPod(ctx, kubernetesProvider, cancel)
go watchPodsForTapping(ctx, kubernetesProvider, cancel)
go syncApiStatus(ctx, cancel)
//block until exit signal or error
waitForFinish(ctx, cancel)
@@ -121,9 +118,10 @@ func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Pro
_, err := kubernetesProvider.CreateNamespace(ctx, mizu.ResourcesNamespace)
if err != nil {
mizu.Log.Infof("Error creating Namespace %s: %v", mizu.ResourcesNamespace, err)
return err
}
return err
mizu.Log.Debugf("Successfully creating Namespace %s", mizu.ResourcesNamespace)
return nil
}
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
@@ -141,17 +139,20 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
mizu.Log.Infof("Error creating mizu %s pod: %v", mizu.ApiServerPodName, err)
return err
}
mizu.Log.Debugf("Successfully created API server pod: %s", mizu.ApiServerPodName)
apiServerService, err = kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.ApiServerPodName, mizu.ApiServerPodName)
if err != nil {
mizu.Log.Infof("Error creating mizu %s service: %v", mizu.ApiServerPodName, err)
return err
}
mizu.Log.Debugf("Successfully created service: %s", mizu.ApiServerPodName)
return nil
}
func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
var compiledRegexSlice []*shared.SerializableRegexp
if mizu.Config.Tap.PlainTextFilterRegexes != nil && len(mizu.Config.Tap.PlainTextFilterRegexes) > 0 {
@@ -192,9 +193,10 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
mizu.Log.Infof("Error creating mizu tapper daemonset: %v", err)
return err
}
mizu.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
} else {
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
mizu.Log.Infof("Error deleting mizu tapper daemonset: %v", err)
mizu.Log.Errorf("Error deleting mizu tapper daemonset: %v", err)
return err
}
}
@@ -237,24 +239,55 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
}
}
func reportTappedPods() {
mizuProxiedUrl := kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Fetch.MizuPort)
tappedPodsUrl := fmt.Sprintf("http://%s/status/tappedPods", mizuProxiedUrl)
podInfos := make([]shared.PodInfo, 0)
for _, pod := range currentlyTappedPods {
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
}
tapStatus := shared.TapStatus{Pods: podInfos}
if jsonValue, err := json.Marshal(tapStatus); err != nil {
mizu.Log.Debugf("[ERROR] failed Marshal the tapped pods %v", err)
} else {
if response, err := http.Post(tappedPodsUrl, "application/json", bytes.NewBuffer(jsonValue)); err != nil {
mizu.Log.Debugf("[ERROR] failed sending to API server the tapped pods %v", err)
} else if response.StatusCode != 200 {
mizu.Log.Debugf("[ERROR] failed sending to API server the tapped pods, response status code %v", response.StatusCode)
} else {
mizu.Log.Debugf("Reported to server API about %d taped pods successfully", len(podInfos))
}
}
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
targetNamespace := getNamespace(kubernetesProvider)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), mizu.Config.Tap.PodRegex())
restartTappers := func() {
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace)
if err != nil {
mizu.Log.Errorf("Error getting pods by regex: %s (%v,%+v)", err, err, err)
cancel()
}
if !changeFound {
mizu.Log.Debugf("Nothing changed update tappers not needed")
return
}
reportTappedPods()
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
if err != nil {
mizu.Log.Infof("Error building node to ips map: %s (%v,%+v)", err, err, err)
mizu.Log.Errorf("Error building node to ips map: %s (%v,%+v)", err, err, err)
cancel()
}
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
mizu.Log.Infof("Error updating daemonset: %s (%v,%+v)", err, err, err)
mizu.Log.Errorf("Error updating daemonset: %s (%v,%+v)", err, err, err)
cancel()
}
}
@@ -262,17 +295,21 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
for {
select {
case <-added:
case <-removed:
case pod := <-added:
mizu.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case modifiedTarget := <-modified:
case pod := <-removed:
mizu.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
restartTappersDebouncer.SetOn()
case pod := <-modified:
mizu.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if modifiedTarget.Status.PodIP != "" {
if pod.Status.PodIP != "" {
restartTappersDebouncer.SetOn()
}
@@ -286,22 +323,25 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
}
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) error {
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) (error, bool) {
changeFound := false
if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespace); err != nil {
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
return err
return err, false
} else {
addedPods, removedPods := getPodArrayDiff(currentlyTappedPods, matchingPods)
for _, addedPod := range addedPods {
changeFound = true
mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name))
}
for _, removedPod := range removedPods {
changeFound = true
mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name))
}
currentlyTappedPods = matchingPods
}
return nil
return nil, changeFound
}
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
@@ -329,43 +369,45 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
return missingPods
}
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, urlReadyChan chan string) {
func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex)
isPodReady := false
timeAfter := time.After(25 * time.Second)
for {
select {
case <-ctx.Done():
return
case <-added:
mizu.Log.Debugf("Got agent pod added event")
continue
case <-removed:
mizu.Log.Infof("%s removed", mizu.ApiServerPodName)
cancel()
return
case modifiedPod := <-modified:
if modifiedPod.Status.Phase == "Running" && !isPodReady {
mizu.Log.Debugf("Got agent pod modified event, status phase: %v", modifiedPod.Status.Phase)
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
isPodReady = true
go func() {
err := kubernetes.StartProxy(kubernetesProvider, mizu.Config.Tap.GuiPort, mizu.ResourcesNamespace, mizu.ApiServerPodName)
if err != nil {
mizu.Log.Infof("Error occurred while running k8s proxy %v", err)
mizu.Log.Errorf("Error occurred while running k8s proxy %v", err)
cancel()
}
}()
mizu.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort))
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
requestForAnalysis()
reportTappedPods()
}
urlReadyChan <- kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
requestForAnalysis()
case <-timeAfter:
if !isPodReady {
mizu.Log.Errorf("error: %s pod was not ready in time", mizu.ApiServerPodName)
mizu.Log.Errorf("Error: %s pod was not ready in time", mizu.ApiServerPodName)
cancel()
}
case <-errorChan:
mizu.Log.Debugf("[ERROR] Agent creation, watching %v namespace", mizu.ResourcesNamespace)
cancel()
}
}
@@ -401,7 +443,7 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P
}
if !mizuRBACExists {
err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.ResourcesNamespace, mizu.ServiceAccountName, mizu.ClusterRoleName, mizu.ClusterRoleBindingName, mizu.RBACVersion)
if err != nil {
if err != nil && !errors.IsAlreadyExists(err) {
mizu.Log.Infof("warning: could not create mizu rbac resources %v", err)
return false
}
@@ -435,28 +477,6 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
}
}
func syncApiStatus(ctx context.Context, cancel context.CancelFunc) {
controlSocketStr := fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort))
controlSocket, err := mizu.CreateControlSocket(controlSocketStr)
if err != nil {
mizu.Log.Infof("error establishing control socket connection %s", err)
cancel()
}
for {
select {
case <-ctx.Done():
return
default:
err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods)
if err != nil {
mizu.Log.Debugf("error Sending message via control socket %v, error: %s", controlSocketStr, err)
}
time.Sleep(10 * time.Second)
}
}
}
func getNamespace(kubernetesProvider *kubernetes.Provider) string {
if mizu.Config.Tap.AllNamespaces {
return mizu.K8sAllNamespaces

View File

@@ -473,6 +473,8 @@ func (provider *Provider) CheckDaemonSetExists(ctx context.Context, namespace st
}
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, tapOutgoing bool) error {
mizu.Log.Debugf("Applying %d tapper deamonsets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
if len(nodeToTappedPodIPMap) == 0 {
return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName)
}
@@ -493,12 +495,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
mizuCmd = append(mizuCmd, "--anydirection")
}
privileged := true
agentContainer := applyconfcore.Container()
agentContainer.WithName(tapperPodName)
agentContainer.WithImage(podImage)
agentContainer.WithImagePullPolicy(core.PullAlways)
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged))
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(true))
agentContainer.WithCommand(mizuCmd...)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),

View File

@@ -17,7 +17,7 @@ import (
)
const (
Separator = "="
Separator = "="
SetCommandName = "set"
)
@@ -29,33 +29,34 @@ func InitConfig(cmd *cobra.Command) error {
}
if err := mergeConfigFile(); err != nil {
Log.Infof(uiUtils.Red, "Invalid config file")
return err
Log.Errorf("Could not load config file, error %v", err)
Log.Fatalf("You can regenerate the file using `mizu config -r` or just remove it %v", GetConfigFilePath())
}
cmd.Flags().Visit(initFlag)
finalConfigPrettified, _ := uiUtils.PrettyJson(Config)
Log.Debugf("Merged all config successfully\n Final config: %v", finalConfigPrettified)
Log.Debugf("Init config finished\n Final config: %v", finalConfigPrettified)
return nil
}
func GetTemplateConfig() string {
prettifiedConfig, _ := uiUtils.PrettyYaml(Config)
return prettifiedConfig
func GetConfigWithDefaults() (string, error) {
defaultConf := ConfigStruct{}
if err := defaults.Set(&defaultConf); err != nil {
return "", err
}
return uiUtils.PrettyYaml(defaultConf)
}
func GetConfigFilePath() string {
return path.Join(getMizuFolderPath(), "config.yaml")
}
func mergeConfigFile() error {
Log.Debugf("Merging config file values")
home, homeDirErr := os.UserHomeDir()
if homeDirErr != nil {
return homeDirErr
}
reader, openErr := os.Open(path.Join(home, ".mizu", "config.yaml"))
reader, openErr := os.Open(GetConfigFilePath())
if openErr != nil {
return openErr
return nil
}
buf, readErr := ioutil.ReadAll(reader)
@@ -66,6 +67,7 @@ func mergeConfigFile() error {
if err := yaml.Unmarshal(buf, &Config); err != nil {
return err
}
Log.Debugf("Found config file, merged to default options")
return nil
}

View File

@@ -1,5 +1,10 @@
package mizu
import (
"os"
"path"
)
var (
SemVer = "0.0.1"
Branch = "develop"
@@ -18,3 +23,11 @@ const (
TapperDaemonSetName = "mizu-tapper-daemon-set"
TapperPodName = "mizu-tapper"
)
func getMizuFolderPath() string {
home, homeDirErr := os.UserHomeDir()
if homeDirErr != nil {
return ""
}
return path.Join(home, ".mizu")
}

View File

@@ -14,10 +14,9 @@ var format = logging.MustStringFormatter(
)
func InitLogger() {
homeDirPath, _ := os.UserHomeDir()
mizuDirPath := path.Join(homeDirPath, ".mizu")
mizuDirPath := getMizuFolderPath()
if err := os.MkdirAll(mizuDirPath, os.ModePerm); err != nil {
panic(fmt.Sprintf("Failed creating .mizu dir: %v, err %v", mizuDirPath, err))
panic(fmt.Sprintf("Failed creating mizu dir: %v, err %v", mizuDirPath, err))
}
logPath := path.Join(mizuDirPath, "log.log")
f, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
@@ -35,5 +34,6 @@ func InitLogger() {
logging.SetBackend(backend1Leveled, backend2Formatter)
Log.Debugf("\n\n\n")
Log.Debugf("Running mizu version %v", SemVer)
}

View File

@@ -15,10 +15,6 @@ func ReportRun(cmd string, args interface{}) {
return
}
if Branch != "main" {
Log.Debugf("reporting only on main branch")
return
}
argsBytes, _ := json.Marshal(args)
argsMap := map[string]string{
"telemetry_type": "execution",
@@ -26,6 +22,7 @@ func ReportRun(cmd string, args interface{}) {
"args": string(argsBytes),
"component": "mizu_cli",
"BuildTimestamp": BuildTimestamp,
"Branch": Branch,
"version": SemVer}
argsMap["message"] = fmt.Sprintf("mizu %v - %v", argsMap["cmd"], string(argsBytes))

View File

@@ -51,7 +51,7 @@ func parseAppPorts(appPortsList string) []int {
return ports
}
var maxcount = flag.Int("c", -1, "Only grab this many packets, then exit")
var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit")
var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)")
var statsevery = flag.Int("stats", 60, "Output statistics every N seconds")
var lazy = flag.Bool("lazy", false, "If true, do lazy decoding")
@@ -175,6 +175,10 @@ type Context struct {
CaptureInfo gopacket.CaptureInfo
}
func GetStats() AppStats {
return statsTracker.appStats
}
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
@@ -336,9 +340,7 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
source.Lazy = *lazy
source.NoCopy = true
rlog.Info("Starting to read packets")
count := 0
bytes := int64(0)
start := time.Now()
statsTracker.setStartTime(time.Now())
defragger := ip4defrag.NewIPv4Defragmenter()
streamFactory := &tcpStreamFactory{
@@ -383,9 +385,9 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
errorsSummery := fmt.Sprintf("%v", errorsMap)
errorsMapMutex.Unlock()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v) - Errors Summary: %s",
count,
bytes,
time.Since(start),
statsTracker.appStats.TotalPacketsCount,
statsTracker.appStats.TotalProcessedBytes,
time.Since(statsTracker.appStats.StartTime),
nErrors,
errorMapLen,
errorsSummery,
@@ -403,13 +405,13 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
// Since the last print
cleanStats := cleaner.dumpStats()
appStats := statsTracker.dumpStats()
matchedMessages := statsTracker.dumpStats()
log.Printf(
"flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d",
cleanStats.flushed,
cleanStats.closed,
cleanStats.deleted,
appStats.matchedMessages,
matchedMessages,
)
}
}()
@@ -419,10 +421,10 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
}
for packet := range source.Packets() {
count++
rlog.Debugf("PACKET #%d", count)
packetsCount := statsTracker.incPacketsCount()
rlog.Debugf("PACKET #%d", packetsCount)
data := packet.Data()
bytes += int64(len(data))
statsTracker.updateProcessedSize(int64(len(data)))
if *hexdumppkt {
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
@@ -473,12 +475,17 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
assemblerMutex.Unlock()
}
done := *maxcount > 0 && count >= *maxcount
done := *maxcount > 0 && statsTracker.appStats.TotalPacketsCount >= *maxcount
if done {
errorsMapMutex.Lock()
errorMapLen := len(errorsMap)
errorsMapMutex.Unlock()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", count, bytes, time.Since(start), nErrors, errorMapLen)
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
statsTracker.appStats.TotalPacketsCount,
statsTracker.appStats.TotalProcessedBytes,
time.Since(statsTracker.appStats.StartTime),
nErrors,
errorMapLen)
}
select {
case <-signalChan:
@@ -535,4 +542,5 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
for e := range errorsMap {
log.Printf(" %s:\t\t%d", e, errorsMap[e])
}
log.Printf("AppStats: %v", GetStats())
}

View File

@@ -2,34 +2,54 @@ package tap
import (
"sync"
"time"
)
type AppStats struct {
matchedMessages int
StartTime time.Time `json:"startTime"`
MatchedMessages int `json:"matchedMessages"`
TotalPacketsCount int64 `json:"totalPacketsCount"`
TotalProcessedBytes int64 `json:"totalProcessedBytes"`
TotalMatchedMessages int64 `json:"totalMatchedMessages"`
}
type StatsTracker struct {
stats AppStats
statsMutex sync.Mutex
appStats AppStats
matchedMessagesMutex sync.Mutex
totalPacketsCountMutex sync.Mutex
totalProcessedSizeMutex sync.Mutex
}
func (st *StatsTracker) incMatchedMessages() {
st.statsMutex.Lock()
st.stats.matchedMessages++
st.statsMutex.Unlock()
st.matchedMessagesMutex.Lock()
st.appStats.MatchedMessages++
st.appStats.TotalMatchedMessages++
st.matchedMessagesMutex.Unlock()
}
func (st *StatsTracker) dumpStats() AppStats {
st.statsMutex.Lock()
stats := AppStats{
matchedMessages: st.stats.matchedMessages,
}
st.stats.matchedMessages = 0
st.statsMutex.Unlock()
return stats
func (st *StatsTracker) incPacketsCount() int64 {
st.totalPacketsCountMutex.Lock()
st.appStats.TotalPacketsCount++
currentPacketsCount := st.appStats.TotalPacketsCount
st.totalPacketsCountMutex.Unlock()
return currentPacketsCount
}
func (st *StatsTracker) updateProcessedSize(size int64) {
st.totalProcessedSizeMutex.Lock()
st.appStats.TotalProcessedBytes += size
st.totalProcessedSizeMutex.Unlock()
}
func (st *StatsTracker) setStartTime(startTime time.Time) {
st.appStats.StartTime = startTime
}
func (st *StatsTracker) dumpStats() int {
st.matchedMessagesMutex.Lock()
matchedMessages := st.appStats.MatchedMessages
st.appStats.MatchedMessages = 0
st.matchedMessagesMutex.Unlock()
return matchedMessages
}