mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-27 16:50:02 +00:00
TRA-3415 ignore unready pods (#160)
* Update tapRunner.go and provider.go * Update tapRunner.go * Update tapRunner.go * Update tapRunner.go * Update tapRunner.go and provider.go Co-authored-by: RamiBerm <rami.berman@up9.com>
This commit is contained in:
parent
2d5b170406
commit
f9396e01ca
@ -54,12 +54,15 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||
defer cancel() // cancel will be called when this function exits
|
||||
|
||||
targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
|
||||
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery, targetNamespace); err != nil {
|
||||
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, podRegexQuery, targetNamespace); err != nil {
|
||||
mizu.Log.Infof("Error listing pods: %v", err)
|
||||
return
|
||||
} else {
|
||||
currentlyTappedPods = matchingPods
|
||||
}
|
||||
urlReadyChan := make(chan string)
|
||||
go func() {
|
||||
mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan)
|
||||
}()
|
||||
|
||||
|
||||
var namespacesStr string
|
||||
if targetNamespace != mizu.K8sAllNamespaces {
|
||||
@ -86,10 +89,9 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||
return
|
||||
}
|
||||
|
||||
urlReadyChan := make(chan string)
|
||||
mizu.CheckNewerVersion()
|
||||
go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions, urlReadyChan) // TODO convert this to job for built in pod ttl or have the running app handle this
|
||||
go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions, urlReadyChan)
|
||||
go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions)
|
||||
go syncApiStatus(ctx, cancel, tappingOptions)
|
||||
|
||||
//block until exit signal or error
|
||||
@ -232,17 +234,15 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
|
||||
}
|
||||
}
|
||||
|
||||
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions, urlReadyChan chan string) {
|
||||
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) {
|
||||
targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
|
||||
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), podRegex)
|
||||
|
||||
restartTappers := func() {
|
||||
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex, targetNamespace); err != nil {
|
||||
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, podRegex, targetNamespace); err != nil {
|
||||
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
|
||||
cancel()
|
||||
} else {
|
||||
currentlyTappedPods = matchingPods
|
||||
}
|
||||
|
||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
|
||||
@ -257,19 +257,11 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
}
|
||||
}
|
||||
restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers)
|
||||
timer := time.AfterFunc(time.Second*10, func() {
|
||||
mizu.Log.Debugf("Waiting for URL...")
|
||||
mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan)
|
||||
})
|
||||
|
||||
for {
|
||||
select {
|
||||
case newTarget := <-added:
|
||||
mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", newTarget.Name))
|
||||
timer.Reset(time.Second * 2)
|
||||
case removedTarget := <-removed:
|
||||
mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedTarget.Name))
|
||||
timer.Reset(time.Second * 2)
|
||||
case <-added:
|
||||
case <-removed:
|
||||
restartTappersDebouncer.SetOn()
|
||||
case modifiedTarget := <-modified:
|
||||
// Act only if the modified pod has already obtained an IP address.
|
||||
@ -292,6 +284,48 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
}
|
||||
}
|
||||
|
||||
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, podRegex *regexp.Regexp, targetNamespace string) error {
|
||||
if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, podRegex, targetNamespace); err != nil {
|
||||
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
|
||||
return err
|
||||
} else {
|
||||
addedPods, removedPods := getPodArrayDiff(currentlyTappedPods, matchingPods)
|
||||
for _, addedPod := range addedPods {
|
||||
mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name))
|
||||
}
|
||||
for _, removedPod := range removedPods {
|
||||
mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name))
|
||||
}
|
||||
currentlyTappedPods = matchingPods
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
|
||||
added = getMissingPods(newPods, oldPods)
|
||||
removed = getMissingPods(oldPods, newPods)
|
||||
|
||||
return added, removed
|
||||
}
|
||||
|
||||
//returns pods present in pods1 array and missing in pods2 array
|
||||
func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
||||
missingPods := make([]core.Pod, 0)
|
||||
for _, pod1 := range pods1 {
|
||||
var found = false
|
||||
for _, pod2 := range pods2 {
|
||||
if pod1.UID == pod2.UID {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
missingPods = append(missingPods, pod1)
|
||||
}
|
||||
}
|
||||
return missingPods
|
||||
}
|
||||
|
||||
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, tappingOptions *MizuTapOptions, urlReadyChan chan string) {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
|
||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex)
|
||||
|
@ -587,14 +587,14 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
||||
return err
|
||||
}
|
||||
|
||||
func (provider *Provider) GetAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespace string) ([]core.Pod, error) {
|
||||
func (provider *Provider) GetAllRunningPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespace string) ([]core.Pod, error) {
|
||||
pods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
matchingPods := make([]core.Pod, 0)
|
||||
for _, pod := range pods.Items {
|
||||
if regex.MatchString(pod.Name) {
|
||||
if regex.MatchString(pod.Name) && isPodRunning(&pod) {
|
||||
matchingPods = append(matchingPods, pod)
|
||||
}
|
||||
}
|
||||
@ -635,3 +635,7 @@ func loadKubernetesConfiguration(kubeConfigPath string) clientcmd.ClientConfig {
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func isPodRunning(pod *core.Pod) bool {
|
||||
return pod.Status.Phase == core.PodRunning
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user