This commit is contained in:
RamiBerm
2021-07-10 18:56:07 +03:00
parent 7abf8b83e3
commit 09702697ad
10 changed files with 253 additions and 23 deletions

View File

@@ -22,6 +22,7 @@ type MizuTapOptions struct {
MizuPodPort uint16
PlainTextFilterRegexes []string
TapOutgoing bool
ServiceType string
}
var mizuTapOptions = &MizuTapOptions{}
@@ -71,4 +72,5 @@ func init() {
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies")
tapCmd.Flags().StringVarP(&direction, "direction", "", "in", "Record traffic that goes in this direction (relative to the tapped pod): in/any")
tapCmd.Flags().StringVarP(&mizuTapOptions.ServiceType, "service-type", "", "ClusterIP", "Set a service type for mizu collector's kubernetes service")
}

View File

@@ -75,6 +75,7 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions) // TODO convert this to job for built in pod ttl or have the running app handle this
go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions)
go syncApiStatus(ctx, cancel, tappingOptions)
//go spamHealthcheck(ctx, tappingOptions)
//block until exit signal or error
waitForFinish(ctx, cancel)
@@ -103,7 +104,7 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr
return err
}
aggregatorService, err = kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, mizu.AggregatorPodName)
aggregatorService, err = kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, mizu.AggregatorPodName, tappingOptions.ServiceType)
if err != nil {
fmt.Printf("Error creating mizu collector service: %v\n", err)
return err
@@ -242,23 +243,28 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
return
case modifiedPod := <-modified:
if modifiedPod.Status.Phase == "Running" && !isPodReady {
isPodReady = true
var portForwardCreateError error
if portForward, portForwardCreateError = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel); portForwardCreateError != nil {
fmt.Printf("error forwarding port to pod %s\n", portForwardCreateError)
cancel()
} else {
fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
time.Sleep(time.Second * 5) // Waiting to be sure port forwarding finished
if tappingOptions.Analyze {
if _, err := http.Get(fmt.Sprintf("http://localhost:%d/api/uploadEntries?dest=%s", tappingOptions.GuiPort, tappingOptions.AnalyzeDestination)); err != nil {
fmt.Println(err)
} else {
fmt.Printf(mizu.Purple, "Traffic is uploading to UP9 cloud for further analsys")
fmt.Println()
}
}
fmt.Println("Pod is running!")
err := kubernetes.StartProxy(ctx, kubernetesProvider, tappingOptions.GuiPort, mizu.ResourcesNamespace, mizu.AggregatorPodName)
if err != nil {
fmt.Printf("Error starting k8s proxy %v\n", err)
}
isPodReady = true
//var portForwardCreateError error
//if portForward, portForwardCreateError = kubernetes.NewPortForward(kubernetesProvider, mizu.ResourcesNamespace, fmt.Sprintf(mizu.AggregatorPodName), tappingOptions.GuiPort, tappingOptions.MizuPodPort, cancel); portForwardCreateError != nil {
// fmt.Printf("error forwarding port to pod %s\n", portForwardCreateError)
// cancel()
//} else {
// fmt.Printf("Web interface is now available at http://localhost:%d\n", tappingOptions.GuiPort)
// time.Sleep(time.Second * 5) // Waiting to be sure port forwarding finished
// if tappingOptions.Analyze {
// if _, err := http.Get(fmt.Sprintf("http://localhost:%d/api/uploadEntries?dest=%s", tappingOptions.GuiPort, tappingOptions.AnalyzeDestination)); err != nil {
// fmt.Println(err)
// } else {
// fmt.Printf(mizu.Purple, "Traffic is uploading to UP9 cloud for further analsys")
// fmt.Println()
// }
// }
//}
}
case <-time.After(25 * time.Second):
@@ -279,6 +285,10 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
}
}
func printMizuReadyMessages(tappingOptions *MizuTapOptions) {
}
func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.Provider) bool {
mizuRBACExists, err := kubernetesProvider.DoesMizuRBACExist(ctx, mizu.ResourcesNamespace)
if err != nil {
@@ -343,6 +353,21 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption
}
func spamHealthcheck(ctx context.Context, tappingOptions *MizuTapOptions) {
for {
select {
case <- ctx.Done():
return
default:
_, err := http.Get(fmt.Sprintf("http://localhost:%d/health", tappingOptions.GuiPort))
if err != nil {
fmt.Printf("Error sending healthcheck %v\n", err)
}
}
time.Sleep(1 * time.Second)
}
}
func getNamespace(tappingOptions *MizuTapOptions, kubernetesProvider *kubernetes.Provider) string {
if tappingOptions.AllNamespaces {
return mizu.K8sAllNamespaces