TRA-3257 Dynamic tappers (#57)

* Defer cleanup.

* Split createMizuResources into two functions.

* Re-create daemonset when changes to tapped pods occur.

* Reordered imports.

* Use Printf instead of Println.

* Workaround for variable scope.

* WIP Apply daemonset instead of create.

* Whitespaces.

* Fixed: Using the right types for Apply.

* Fixed missing pod IP by adding a delay.

* Debounce pod restart.

* Proper field manager name.
This commit is contained in:
nimrod-up9
2021-05-26 17:25:12 +03:00
committed by GitHub
parent 74e9d44b96
commit 620f046a26
3 changed files with 204 additions and 112 deletions

View File

@@ -3,14 +3,24 @@ package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/kubernetes"
core "k8s.io/api/core/v1"
"github.com/up9inc/mizu/cli/mizu"
"os"
"os/signal"
"regexp"
"syscall"
"time"
core "k8s.io/api/core/v1"
"github.com/up9inc/mizu/cli/debounce"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/mizu"
)
var mizuServiceAccountExists bool
var aggregatorService *core.Service
const (
updateTappersDelay = 5 * time.Second
)
var currentlyTappedPods []core.Pod
@@ -18,59 +28,87 @@ var currentlyTappedPods []core.Pod
func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath, tappingOptions.Namespace)
defer cleanUpMizuResources(kubernetesProvider)
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery)
if err != nil {
fmt.Printf("Error getting pods to tap %v\n", err)
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery); err != nil {
return
} else {
currentlyTappedPods = matchingPods
}
currentlyTappedPods = matchingPods
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, matchingPods)
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, currentlyTappedPods)
if err != nil {
cleanUpMizuResources(kubernetesProvider)
return
}
err = createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions)
if err != nil {
cleanUpMizuResources(kubernetesProvider)
return
}
go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions) //TODO convert this to job for built in pod ttl or have the running app handle this
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
return
}
go portForwardApiPod(ctx, kubernetesProvider, cancel, tappingOptions) // TODO convert this to job for built in pod ttl or have the running app handle this
go watchPodsForTapping(ctx, kubernetesProvider, cancel, podRegexQuery, tappingOptions)
go syncApiStatus(ctx, cancel, tappingOptions)
waitForFinish(ctx, cancel) //block until exit signal or error
//block until exit signal or error
waitForFinish(ctx, cancel)
// TODO handle incoming traffic from tapper using a channel
//cleanup
fmt.Printf("\nRemoving mizu resources\n")
cleanUpMizuResources(kubernetesProvider)
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
mizuServiceAccountExists := createRBACIfNecessary(ctx, kubernetesProvider)
_, err := kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists)
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions); err != nil {
return err
}
if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
return err
}
return nil
}
func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions) error {
var err error
mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider)
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists)
if err != nil {
fmt.Printf("Error creating mizu collector pod: %v\n", err)
return err
}
aggregatorService, err := kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, mizu.AggregatorPodName)
aggregatorService, err = kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, mizu.AggregatorPodName)
if err != nil {
fmt.Printf("Error creating mizu collector service: %v\n", err)
return err
}
err = kubernetesProvider.CreateMizuTapperDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName, tappingOptions.MizuImage, mizu.TapperPodName, fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace), nodeToTappedPodIPMap, mizuServiceAccountExists)
if err != nil {
return nil
}
func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
ctx,
mizu.ResourcesNamespace,
mizu.TapperDaemonSetName,
tappingOptions.MizuImage,
mizu.TapperPodName,
fmt.Sprintf("%s.%s.svc.cluster.local", aggregatorService.Name, aggregatorService.Namespace),
nodeToTappedPodIPMap,
mizuServiceAccountExists,
); err != nil {
fmt.Printf("Error creating mizu tapper daemonset: %v\n", err)
return err
}
return nil
}
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
fmt.Printf("\nRemoving mizu resources\n")
removalCtx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
if err := kubernetesProvider.RemovePod(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil {
fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err);
@@ -83,28 +121,59 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
}
}
// will be relevant in the future
//func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp) {
// added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex)
// for {
// select {
// case newTarget := <- added:
// fmt.Printf("+%s\n", newTarget.Name)
//
// case removedTarget := <- removed:
// fmt.Printf("-%s\n", removedTarget.Name)
//
// case <- modified:
// continue
//
// case <- errorChan:
// cancel()
//
// case <- ctx.Done():
// return
// }
// }
//}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) {
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex)
restartTappers := func() {
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex); err != nil {
fmt.Printf("Error getting pods by regex: %s (%v,%+v)\n", err, err, err)
cancel()
} else {
currentlyTappedPods = matchingPods
}
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, currentlyTappedPods)
if err != nil {
fmt.Printf("Error building node to ips map: %s (%v,%+v)\n", err, err, err)
cancel()
}
if err := createMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
fmt.Printf("Error updating daemonset: %s (%v,%+v)\n", err, err, err)
cancel()
}
}
restartTappersDebouncer := debounce.NewDebouncer(updateTappersDelay, restartTappers)
for {
select {
case newTarget := <- added:
fmt.Printf("+%s\n", newTarget.Name)
case removedTarget := <- removed:
fmt.Printf("-%s\n", removedTarget.Name)
restartTappersDebouncer.SetOn()
case modifiedTarget := <- modified:
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
// - Pod reaches start state
// - Pod reaches ready state
// Ready/unready transitions might also trigger this event.
if modifiedTarget.Status.PodIP != "" {
restartTappersDebouncer.SetOn()
}
case <- errorChan:
// TODO: Does this also perform cleanup?
cancel()
case <- ctx.Done():
return
}
}
}
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, tappingOptions *MizuTapOptions) {
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.AggregatorPodName))