mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-06 21:11:11 +00:00
@@ -31,7 +31,8 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath, tappingOptions.Namespace)
|
||||
|
||||
kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath)
|
||||
|
||||
defer cleanUpMizuResources(kubernetesProvider)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -43,7 +44,7 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||
currentlyTappedPods = matchingPods
|
||||
}
|
||||
|
||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, currentlyTappedPods)
|
||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@@ -132,20 +133,20 @@ func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
|
||||
fmt.Printf("\nRemoving mizu resources\n")
|
||||
|
||||
removalCtx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
|
||||
removalCtx, _ := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
if err := kubernetesProvider.RemovePod(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil {
|
||||
fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err);
|
||||
fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err)
|
||||
}
|
||||
if err := kubernetesProvider.RemoveService(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil {
|
||||
fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err);
|
||||
fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err)
|
||||
}
|
||||
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
||||
fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", mizu.TapperDaemonSetName, mizu.ResourcesNamespace, err, err, err);
|
||||
fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", mizu.TapperDaemonSetName, mizu.ResourcesNamespace, err, err, err)
|
||||
}
|
||||
}
|
||||
|
||||
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex)
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, getNamespace(tappingOptions, kubernetesProvider)), podRegex)
|
||||
|
||||
restartTappers := func() {
|
||||
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex); err != nil {
|
||||
@@ -155,7 +156,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
currentlyTappedPods = matchingPods
|
||||
}
|
||||
|
||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, currentlyTappedPods)
|
||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
|
||||
if err != nil {
|
||||
fmt.Printf("Error building node to ips map: %s (%v,%+v)\n", err, err, err)
|
||||
cancel()
|
||||
@@ -170,14 +171,14 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
|
||||
for {
|
||||
select {
|
||||
case newTarget := <- added:
|
||||
case newTarget := <-added:
|
||||
fmt.Printf("+%s\n", newTarget.Name)
|
||||
|
||||
case removedTarget := <- removed:
|
||||
case removedTarget := <-removed:
|
||||
fmt.Printf("-%s\n", removedTarget.Name)
|
||||
restartTappersDebouncer.SetOn()
|
||||
|
||||
case modifiedTarget := <- modified:
|
||||
case modifiedTarget := <-modified:
|
||||
// Act only if the modified pod has already obtained an IP address.
|
||||
// After filtering for IPs, on a normal pod restart this includes the following events:
|
||||
// - Pod deletion
|
||||
@@ -188,11 +189,11 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
restartTappersDebouncer.SetOn()
|
||||
}
|
||||
|
||||
case <- errorChan:
|
||||
case <-errorChan:
|
||||
// TODO: Does this also perform cleanup?
|
||||
cancel()
|
||||
|
||||
case <- ctx.Done():
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -205,13 +206,13 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
var portForward *kubernetes.PortForward
|
||||
for {
|
||||
select {
|
||||
case <- added:
|
||||
case <-added:
|
||||
continue
|
||||
case <- removed:
|
||||
case <-removed:
|
||||
fmt.Printf("%s removed\n", mizu.AggregatorPodName)
|
||||
cancel()
|
||||
return
|
||||
case modifiedPod := <- modified:
|
||||
case modifiedPod := <-modified:
|
||||
if modifiedPod.Status.Phase == "Running" && !isPodReady {
|
||||
isPodReady = true
|
||||
var err error
|
||||
@@ -223,16 +224,16 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
||||
}
|
||||
}
|
||||
|
||||
case <- time.After(25 * time.Second):
|
||||
case <-time.After(25 * time.Second):
|
||||
if !isPodReady {
|
||||
fmt.Printf("error: %s pod was not ready in time", mizu.AggregatorPodName)
|
||||
cancel()
|
||||
}
|
||||
|
||||
case <- errorChan:
|
||||
case <-errorChan:
|
||||
cancel()
|
||||
|
||||
case <- ctx.Done():
|
||||
case <-ctx.Done():
|
||||
if portForward != nil {
|
||||
portForward.Stop()
|
||||
}
|
||||
@@ -261,12 +262,12 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P
|
||||
return true
|
||||
}
|
||||
|
||||
func getNodeHostToTappedPodIpsMap(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappedPods []core.Pod) (map[string][]string, error) {
|
||||
func getNodeHostToTappedPodIpsMap(tappedPods []core.Pod) (map[string][]string, error) {
|
||||
nodeToTappedPodIPMap := make(map[string][]string, 0)
|
||||
for _, pod := range tappedPods {
|
||||
existingList := nodeToTappedPodIPMap[pod.Spec.NodeName]
|
||||
if existingList == nil {
|
||||
nodeToTappedPodIPMap[pod.Spec.NodeName] = []string {pod.Status.PodIP}
|
||||
nodeToTappedPodIPMap[pod.Spec.NodeName] = []string{pod.Status.PodIP}
|
||||
} else {
|
||||
nodeToTappedPodIPMap[pod.Spec.NodeName] = append(nodeToTappedPodIPMap[pod.Spec.NodeName], pod.Status.PodIP)
|
||||
}
|
||||
@@ -280,9 +281,9 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
|
||||
|
||||
// block until ctx cancel is called or termination signal is received
|
||||
select {
|
||||
case <- ctx.Done():
|
||||
case <-ctx.Done():
|
||||
break
|
||||
case <- sigChan:
|
||||
case <-sigChan:
|
||||
cancel()
|
||||
}
|
||||
}
|
||||
@@ -296,7 +297,7 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption
|
||||
|
||||
for {
|
||||
select {
|
||||
case <- ctx.Done():
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods)
|
||||
@@ -308,3 +309,13 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func getNamespace(tappingOptions *MizuTapOptions, kubernetesProvider *kubernetes.Provider) string {
|
||||
if tappingOptions.AllNamespaces {
|
||||
return mizu.K8sAllNamespaces
|
||||
} else if len(tappingOptions.Namespace) > 0 {
|
||||
return tappingOptions.Namespace
|
||||
} else {
|
||||
return kubernetesProvider.CurrentNamespace()
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user