mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-10 12:53:37 +00:00
Adding logs and fixing several issues (#162)
* Config grooming and several general fixes
This commit is contained in:
parent
1e726e381b
commit
9e34662511
@ -85,7 +85,7 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) {
|
|||||||
app := gin.Default()
|
app := gin.Default()
|
||||||
|
|
||||||
app.GET("/echo", func(c *gin.Context) {
|
app.GET("/echo", func(c *gin.Context) {
|
||||||
c.String(http.StatusOK, "Hello, World 👋!")
|
c.String(http.StatusOK, "Here is Mizu agent")
|
||||||
})
|
})
|
||||||
|
|
||||||
eventHandlers := api.RoutesEventHandlers{
|
eventHandlers := api.RoutesEventHandlers{
|
||||||
|
@ -8,17 +8,24 @@ import (
|
|||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
var outputFileName string
|
var regenerateFile bool
|
||||||
|
|
||||||
var configCmd = &cobra.Command{
|
var configCmd = &cobra.Command{
|
||||||
Use: "config",
|
Use: "config",
|
||||||
Short: "Generate example config file to stdout",
|
Short: "Generate config with default values",
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
template := mizu.GetTemplateConfig()
|
template, err := mizu.GetConfigWithDefaults()
|
||||||
if outputFileName != "" {
|
if err != nil {
|
||||||
|
mizu.Log.Errorf("Failed generating config with defaults %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if regenerateFile {
|
||||||
data := []byte(template)
|
data := []byte(template)
|
||||||
_ = ioutil.WriteFile(outputFileName, data, 0644)
|
if err := ioutil.WriteFile(mizu.GetConfigFilePath(), data, 0644); err != nil {
|
||||||
mizu.Log.Infof(fmt.Sprintf("Template File written to %s", fmt.Sprintf(uiUtils.Purple, outputFileName)))
|
mizu.Log.Errorf("Failed writing config %v", err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
mizu.Log.Infof(fmt.Sprintf("Template File written to %s", fmt.Sprintf(uiUtils.Purple, mizu.GetConfigFilePath())))
|
||||||
} else {
|
} else {
|
||||||
mizu.Log.Debugf("Writing template config.\n%v", template)
|
mizu.Log.Debugf("Writing template config.\n%v", template)
|
||||||
fmt.Printf("%v", template)
|
fmt.Printf("%v", template)
|
||||||
@ -29,6 +36,5 @@ var configCmd = &cobra.Command{
|
|||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
rootCmd.AddCommand(configCmd)
|
rootCmd.AddCommand(configCmd)
|
||||||
|
configCmd.Flags().BoolVarP(®enerateFile, "regenerate", "r", false, fmt.Sprintf("Regenerate the config file with default values %s", mizu.GetConfigFilePath()))
|
||||||
configCmd.Flags().StringVarP(&outputFileName, "file", "f", "", "Save content to local file")
|
|
||||||
}
|
}
|
||||||
|
@ -53,28 +53,20 @@ func RunMizuTap() {
|
|||||||
defer cancel() // cancel will be called when this function exits
|
defer cancel() // cancel will be called when this function exits
|
||||||
|
|
||||||
targetNamespace := getNamespace(kubernetesProvider)
|
targetNamespace := getNamespace(kubernetesProvider)
|
||||||
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
|
|
||||||
mizu.Log.Infof("Error listing pods: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if mizu.Config.Tap.DryRun {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
urlReadyChan := make(chan string)
|
|
||||||
go func() {
|
|
||||||
mizu.Log.Infof("Mizu is available at http://%s", <-urlReadyChan)
|
|
||||||
}()
|
|
||||||
|
|
||||||
var namespacesStr string
|
var namespacesStr string
|
||||||
if targetNamespace != mizu.K8sAllNamespaces {
|
if targetNamespace != mizu.K8sAllNamespaces {
|
||||||
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
|
namespacesStr = fmt.Sprintf("namespace \"%s\"", targetNamespace)
|
||||||
} else {
|
} else {
|
||||||
namespacesStr = "all namespaces"
|
namespacesStr = "all namespaces"
|
||||||
}
|
}
|
||||||
|
mizu.CheckNewerVersion()
|
||||||
mizu.Log.Infof("Tapping pods in %s", namespacesStr)
|
mizu.Log.Infof("Tapping pods in %s", namespacesStr)
|
||||||
|
|
||||||
|
if err, _ := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
|
||||||
|
mizu.Log.Infof("Error listing pods: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if len(currentlyTappedPods) == 0 {
|
if len(currentlyTappedPods) == 0 {
|
||||||
var suggestionStr string
|
var suggestionStr string
|
||||||
if targetNamespace != mizu.K8sAllNamespaces {
|
if targetNamespace != mizu.K8sAllNamespaces {
|
||||||
@ -83,6 +75,10 @@ func RunMizuTap() {
|
|||||||
mizu.Log.Infof("Did not find any pods matching the regex argument%s", suggestionStr)
|
mizu.Log.Infof("Did not find any pods matching the regex argument%s", suggestionStr)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if mizu.Config.Tap.DryRun {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
|
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -92,10 +88,8 @@ func RunMizuTap() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
mizu.CheckNewerVersion()
|
go createProxyToApiServerPod(ctx, kubernetesProvider, cancel)
|
||||||
go portForwardApiPod(ctx, kubernetesProvider, cancel, urlReadyChan) // TODO convert this to job for built in pod ttl or have the running app handle this
|
|
||||||
go watchPodsForTapping(ctx, kubernetesProvider, cancel)
|
go watchPodsForTapping(ctx, kubernetesProvider, cancel)
|
||||||
go syncApiStatus(ctx, cancel)
|
|
||||||
|
|
||||||
//block until exit signal or error
|
//block until exit signal or error
|
||||||
waitForFinish(ctx, cancel)
|
waitForFinish(ctx, cancel)
|
||||||
@ -121,9 +115,10 @@ func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
|||||||
_, err := kubernetesProvider.CreateNamespace(ctx, mizu.ResourcesNamespace)
|
_, err := kubernetesProvider.CreateNamespace(ctx, mizu.ResourcesNamespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mizu.Log.Infof("Error creating Namespace %s: %v", mizu.ResourcesNamespace, err)
|
mizu.Log.Infof("Error creating Namespace %s: %v", mizu.ResourcesNamespace, err)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
mizu.Log.Debugf("Successfully creating Namespace %s", mizu.ResourcesNamespace)
|
||||||
return err
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
|
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
|
||||||
@ -141,17 +136,20 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
|||||||
mizu.Log.Infof("Error creating mizu %s pod: %v", mizu.ApiServerPodName, err)
|
mizu.Log.Infof("Error creating mizu %s pod: %v", mizu.ApiServerPodName, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
mizu.Log.Debugf("Successfully created API server pod: %s", mizu.ApiServerPodName)
|
||||||
|
|
||||||
apiServerService, err = kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.ApiServerPodName, mizu.ApiServerPodName)
|
apiServerService, err = kubernetesProvider.CreateService(ctx, mizu.ResourcesNamespace, mizu.ApiServerPodName, mizu.ApiServerPodName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mizu.Log.Infof("Error creating mizu %s service: %v", mizu.ApiServerPodName, err)
|
mizu.Log.Infof("Error creating mizu %s service: %v", mizu.ApiServerPodName, err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
mizu.Log.Debugf("Successfully created service: %s", mizu.ApiServerPodName)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
|
func getMizuApiFilteringOptions() (*shared.TrafficFilteringOptions, error) {
|
||||||
|
|
||||||
var compiledRegexSlice []*shared.SerializableRegexp
|
var compiledRegexSlice []*shared.SerializableRegexp
|
||||||
|
|
||||||
if mizu.Config.Tap.PlainTextFilterRegexes != nil && len(mizu.Config.Tap.PlainTextFilterRegexes) > 0 {
|
if mizu.Config.Tap.PlainTextFilterRegexes != nil && len(mizu.Config.Tap.PlainTextFilterRegexes) > 0 {
|
||||||
@ -192,9 +190,10 @@ func updateMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
|
|||||||
mizu.Log.Infof("Error creating mizu tapper daemonset: %v", err)
|
mizu.Log.Infof("Error creating mizu tapper daemonset: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
mizu.Log.Debugf("Successfully created %v tappers", len(nodeToTappedPodIPMap))
|
||||||
} else {
|
} else {
|
||||||
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
if err := kubernetesProvider.RemoveDaemonSet(ctx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
|
||||||
mizu.Log.Infof("Error deleting mizu tapper daemonset: %v", err)
|
mizu.Log.Errorf("Error deleting mizu tapper daemonset: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -241,20 +240,42 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
|||||||
targetNamespace := getNamespace(kubernetesProvider)
|
targetNamespace := getNamespace(kubernetesProvider)
|
||||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), mizu.Config.Tap.PodRegex())
|
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), mizu.Config.Tap.PodRegex())
|
||||||
|
|
||||||
|
controlSocketStr := fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort))
|
||||||
|
controlSocket, err := mizu.CreateControlSocket(controlSocketStr)
|
||||||
|
if err != nil {
|
||||||
|
mizu.Log.Infof("error establishing control socket connection %s", err)
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
mizu.Log.Debugf("Control socket created %s", controlSocketStr)
|
||||||
|
err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods)
|
||||||
|
if err != nil {
|
||||||
|
mizu.Log.Debugf("error Sending message via control socket %v, error: %s", controlSocketStr, err)
|
||||||
|
}
|
||||||
restartTappers := func() {
|
restartTappers := func() {
|
||||||
if err := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace); err != nil {
|
err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespace)
|
||||||
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
|
if err != nil {
|
||||||
|
mizu.Log.Errorf("Error getting pods by regex: %s (%v,%+v)", err, err, err)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !changeFound {
|
||||||
|
mizu.Log.Debugf("Nothing changed update tappers not needed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods)
|
||||||
|
if err != nil {
|
||||||
|
mizu.Log.Debugf("error Sending message via control socket %v, error: %s", controlSocketStr, err)
|
||||||
|
}
|
||||||
|
|
||||||
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
|
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mizu.Log.Infof("Error building node to ips map: %s (%v,%+v)", err, err, err)
|
mizu.Log.Errorf("Error building node to ips map: %s (%v,%+v)", err, err, err)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
|
if err := updateMizuTappers(ctx, kubernetesProvider, nodeToTappedPodIPMap); err != nil {
|
||||||
mizu.Log.Infof("Error updating daemonset: %s (%v,%+v)", err, err, err)
|
mizu.Log.Errorf("Error updating daemonset: %s (%v,%+v)", err, err, err)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -262,17 +283,21 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-added:
|
case pod := <-added:
|
||||||
case <-removed:
|
mizu.Log.Debugf("Added matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
||||||
restartTappersDebouncer.SetOn()
|
restartTappersDebouncer.SetOn()
|
||||||
case modifiedTarget := <-modified:
|
case pod := <-removed:
|
||||||
|
mizu.Log.Debugf("Removed matching pod %s, ns: %s", pod.Name, pod.Namespace)
|
||||||
|
restartTappersDebouncer.SetOn()
|
||||||
|
case pod := <-modified:
|
||||||
|
mizu.Log.Debugf("Modified matching pod %s, ns: %s, phase: %s, ip: %s", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status.PodIP)
|
||||||
// Act only if the modified pod has already obtained an IP address.
|
// Act only if the modified pod has already obtained an IP address.
|
||||||
// After filtering for IPs, on a normal pod restart this includes the following events:
|
// After filtering for IPs, on a normal pod restart this includes the following events:
|
||||||
// - Pod deletion
|
// - Pod deletion
|
||||||
// - Pod reaches start state
|
// - Pod reaches start state
|
||||||
// - Pod reaches ready state
|
// - Pod reaches ready state
|
||||||
// Ready/unready transitions might also trigger this event.
|
// Ready/unready transitions might also trigger this event.
|
||||||
if modifiedTarget.Status.PodIP != "" {
|
if pod.Status.PodIP != "" {
|
||||||
restartTappersDebouncer.SetOn()
|
restartTappersDebouncer.SetOn()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -286,22 +311,25 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) error {
|
func updateCurrentlyTappedPods(kubernetesProvider *kubernetes.Provider, ctx context.Context, targetNamespace string) (error, bool) {
|
||||||
|
changeFound := false
|
||||||
if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespace); err != nil {
|
if matchingPods, err := kubernetesProvider.GetAllRunningPodsMatchingRegex(ctx, mizu.Config.Tap.PodRegex(), targetNamespace); err != nil {
|
||||||
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
|
mizu.Log.Infof("Error getting pods by regex: %s (%v,%+v)", err, err, err)
|
||||||
return err
|
return err, false
|
||||||
} else {
|
} else {
|
||||||
addedPods, removedPods := getPodArrayDiff(currentlyTappedPods, matchingPods)
|
addedPods, removedPods := getPodArrayDiff(currentlyTappedPods, matchingPods)
|
||||||
for _, addedPod := range addedPods {
|
for _, addedPod := range addedPods {
|
||||||
|
changeFound = true
|
||||||
mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name))
|
mizu.Log.Infof(uiUtils.Green, fmt.Sprintf("+%s", addedPod.Name))
|
||||||
}
|
}
|
||||||
for _, removedPod := range removedPods {
|
for _, removedPod := range removedPods {
|
||||||
|
changeFound = true
|
||||||
mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name))
|
mizu.Log.Infof(uiUtils.Red, fmt.Sprintf("-%s", removedPod.Name))
|
||||||
}
|
}
|
||||||
currentlyTappedPods = matchingPods
|
currentlyTappedPods = matchingPods
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil, changeFound
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
|
func getPodArrayDiff(oldPods []core.Pod, newPods []core.Pod) (added []core.Pod, removed []core.Pod) {
|
||||||
@ -329,43 +357,44 @@ func getMissingPods(pods1 []core.Pod, pods2 []core.Pod) []core.Pod {
|
|||||||
return missingPods
|
return missingPods
|
||||||
}
|
}
|
||||||
|
|
||||||
func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, urlReadyChan chan string) {
|
func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc) {
|
||||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
|
podExactRegex := regexp.MustCompile(fmt.Sprintf("^%s$", mizu.ApiServerPodName))
|
||||||
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex)
|
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, mizu.ResourcesNamespace), podExactRegex)
|
||||||
isPodReady := false
|
isPodReady := false
|
||||||
timeAfter := time.After(25 * time.Second)
|
timeAfter := time.After(25 * time.Second)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-added:
|
case <-added:
|
||||||
|
mizu.Log.Debugf("Got agent pod added event")
|
||||||
continue
|
continue
|
||||||
case <-removed:
|
case <-removed:
|
||||||
mizu.Log.Infof("%s removed", mizu.ApiServerPodName)
|
mizu.Log.Infof("%s removed", mizu.ApiServerPodName)
|
||||||
cancel()
|
cancel()
|
||||||
return
|
return
|
||||||
case modifiedPod := <-modified:
|
case modifiedPod := <-modified:
|
||||||
if modifiedPod.Status.Phase == "Running" && !isPodReady {
|
mizu.Log.Debugf("Got agent pod modified event, status phase: %v", modifiedPod.Status.Phase)
|
||||||
|
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
||||||
isPodReady = true
|
isPodReady = true
|
||||||
go func() {
|
go func() {
|
||||||
err := kubernetes.StartProxy(kubernetesProvider, mizu.Config.Tap.GuiPort, mizu.ResourcesNamespace, mizu.ApiServerPodName)
|
err := kubernetes.StartProxy(kubernetesProvider, mizu.Config.Tap.GuiPort, mizu.ResourcesNamespace, mizu.ApiServerPodName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
mizu.Log.Infof("Error occurred while running k8s proxy %v", err)
|
mizu.Log.Errorf("Error occurred while running k8s proxy %v", err)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
mizu.Log.Infof("Mizu is available at http://%s\n", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort))
|
||||||
|
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
|
||||||
|
requestForAnalysis()
|
||||||
}
|
}
|
||||||
|
|
||||||
urlReadyChan <- kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort)
|
|
||||||
time.Sleep(time.Second * 5) // Waiting to be sure the proxy is ready
|
|
||||||
requestForAnalysis()
|
|
||||||
case <-timeAfter:
|
case <-timeAfter:
|
||||||
if !isPodReady {
|
if !isPodReady {
|
||||||
mizu.Log.Errorf("error: %s pod was not ready in time", mizu.ApiServerPodName)
|
mizu.Log.Errorf("Error: %s pod was not ready in time", mizu.ApiServerPodName)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
case <-errorChan:
|
case <-errorChan:
|
||||||
|
mizu.Log.Debugf("[ERROR] Agent creation, watching %v namespace", mizu.ResourcesNamespace)
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -435,28 +464,6 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func syncApiStatus(ctx context.Context, cancel context.CancelFunc) {
|
|
||||||
controlSocketStr := fmt.Sprintf("ws://%s/ws", kubernetes.GetMizuApiServerProxiedHostAndPath(mizu.Config.Tap.GuiPort))
|
|
||||||
controlSocket, err := mizu.CreateControlSocket(controlSocketStr)
|
|
||||||
if err != nil {
|
|
||||||
mizu.Log.Infof("error establishing control socket connection %s", err)
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods)
|
|
||||||
if err != nil {
|
|
||||||
mizu.Log.Debugf("error Sending message via control socket %v, error: %s", controlSocketStr, err)
|
|
||||||
}
|
|
||||||
time.Sleep(10 * time.Second)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getNamespace(kubernetesProvider *kubernetes.Provider) string {
|
func getNamespace(kubernetesProvider *kubernetes.Provider) string {
|
||||||
if mizu.Config.Tap.AllNamespaces {
|
if mizu.Config.Tap.AllNamespaces {
|
||||||
return mizu.K8sAllNamespaces
|
return mizu.K8sAllNamespaces
|
||||||
|
@ -473,6 +473,8 @@ func (provider *Provider) CheckDaemonSetExists(ctx context.Context, namespace st
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, tapOutgoing bool) error {
|
func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespace string, daemonSetName string, podImage string, tapperPodName string, apiServerPodIp string, nodeToTappedPodIPMap map[string][]string, serviceAccountName string, tapOutgoing bool) error {
|
||||||
|
mizu.Log.Debugf("Applying %d tapper deamonsets, ns: %s, daemonSetName: %s, podImage: %s, tapperPodName: %s", len(nodeToTappedPodIPMap), namespace, daemonSetName, podImage, tapperPodName)
|
||||||
|
|
||||||
if len(nodeToTappedPodIPMap) == 0 {
|
if len(nodeToTappedPodIPMap) == 0 {
|
||||||
return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName)
|
return fmt.Errorf("Daemon set %s must tap at least 1 pod", daemonSetName)
|
||||||
}
|
}
|
||||||
@ -493,12 +495,11 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
|
|||||||
mizuCmd = append(mizuCmd, "--anydirection")
|
mizuCmd = append(mizuCmd, "--anydirection")
|
||||||
}
|
}
|
||||||
|
|
||||||
privileged := true
|
|
||||||
agentContainer := applyconfcore.Container()
|
agentContainer := applyconfcore.Container()
|
||||||
agentContainer.WithName(tapperPodName)
|
agentContainer.WithName(tapperPodName)
|
||||||
agentContainer.WithImage(podImage)
|
agentContainer.WithImage(podImage)
|
||||||
agentContainer.WithImagePullPolicy(core.PullAlways)
|
agentContainer.WithImagePullPolicy(core.PullAlways)
|
||||||
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged))
|
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(true))
|
||||||
agentContainer.WithCommand(mizuCmd...)
|
agentContainer.WithCommand(mizuCmd...)
|
||||||
agentContainer.WithEnv(
|
agentContainer.WithEnv(
|
||||||
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
|
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
|
||||||
|
@ -17,7 +17,7 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
Separator = "="
|
Separator = "="
|
||||||
SetCommandName = "set"
|
SetCommandName = "set"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -29,33 +29,34 @@ func InitConfig(cmd *cobra.Command) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := mergeConfigFile(); err != nil {
|
if err := mergeConfigFile(); err != nil {
|
||||||
Log.Infof(uiUtils.Red, "Invalid config file")
|
Log.Errorf("Could not load config file, error %v", err)
|
||||||
return err
|
Log.Fatalf("You can regenerate the file using `mizu config -r` or just remove it %v", GetConfigFilePath())
|
||||||
}
|
}
|
||||||
|
|
||||||
cmd.Flags().Visit(initFlag)
|
cmd.Flags().Visit(initFlag)
|
||||||
|
|
||||||
finalConfigPrettified, _ := uiUtils.PrettyJson(Config)
|
finalConfigPrettified, _ := uiUtils.PrettyJson(Config)
|
||||||
Log.Debugf("Merged all config successfully\n Final config: %v", finalConfigPrettified)
|
Log.Debugf("Init config finished\n Final config: %v", finalConfigPrettified)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetTemplateConfig() string {
|
func GetConfigWithDefaults() (string, error) {
|
||||||
prettifiedConfig, _ := uiUtils.PrettyYaml(Config)
|
defaultConf := ConfigStruct{}
|
||||||
return prettifiedConfig
|
if err := defaults.Set(&defaultConf); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return uiUtils.PrettyYaml(defaultConf)
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetConfigFilePath() string {
|
||||||
|
return path.Join(getMizuFolderPath(), "config.yaml")
|
||||||
}
|
}
|
||||||
|
|
||||||
func mergeConfigFile() error {
|
func mergeConfigFile() error {
|
||||||
Log.Debugf("Merging config file values")
|
reader, openErr := os.Open(GetConfigFilePath())
|
||||||
home, homeDirErr := os.UserHomeDir()
|
|
||||||
if homeDirErr != nil {
|
|
||||||
return homeDirErr
|
|
||||||
}
|
|
||||||
|
|
||||||
reader, openErr := os.Open(path.Join(home, ".mizu", "config.yaml"))
|
|
||||||
if openErr != nil {
|
if openErr != nil {
|
||||||
return openErr
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
buf, readErr := ioutil.ReadAll(reader)
|
buf, readErr := ioutil.ReadAll(reader)
|
||||||
@ -66,6 +67,7 @@ func mergeConfigFile() error {
|
|||||||
if err := yaml.Unmarshal(buf, &Config); err != nil {
|
if err := yaml.Unmarshal(buf, &Config); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
Log.Debugf("Found config file, merged to default options")
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,10 @@
|
|||||||
package mizu
|
package mizu
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
SemVer = "0.0.1"
|
SemVer = "0.0.1"
|
||||||
Branch = "develop"
|
Branch = "develop"
|
||||||
@ -18,3 +23,11 @@ const (
|
|||||||
TapperDaemonSetName = "mizu-tapper-daemon-set"
|
TapperDaemonSetName = "mizu-tapper-daemon-set"
|
||||||
TapperPodName = "mizu-tapper"
|
TapperPodName = "mizu-tapper"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func getMizuFolderPath() string {
|
||||||
|
home, homeDirErr := os.UserHomeDir()
|
||||||
|
if homeDirErr != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return path.Join(home, ".mizu")
|
||||||
|
}
|
||||||
|
@ -14,10 +14,9 @@ var format = logging.MustStringFormatter(
|
|||||||
)
|
)
|
||||||
|
|
||||||
func InitLogger() {
|
func InitLogger() {
|
||||||
homeDirPath, _ := os.UserHomeDir()
|
mizuDirPath := getMizuFolderPath()
|
||||||
mizuDirPath := path.Join(homeDirPath, ".mizu")
|
|
||||||
if err := os.MkdirAll(mizuDirPath, os.ModePerm); err != nil {
|
if err := os.MkdirAll(mizuDirPath, os.ModePerm); err != nil {
|
||||||
panic(fmt.Sprintf("Failed creating .mizu dir: %v, err %v", mizuDirPath, err))
|
panic(fmt.Sprintf("Failed creating mizu dir: %v, err %v", mizuDirPath, err))
|
||||||
}
|
}
|
||||||
logPath := path.Join(mizuDirPath, "log.log")
|
logPath := path.Join(mizuDirPath, "log.log")
|
||||||
f, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
f, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||||
@ -35,5 +34,6 @@ func InitLogger() {
|
|||||||
|
|
||||||
logging.SetBackend(backend1Leveled, backend2Formatter)
|
logging.SetBackend(backend1Leveled, backend2Formatter)
|
||||||
|
|
||||||
|
Log.Debugf("\n\n\n")
|
||||||
Log.Debugf("Running mizu version %v", SemVer)
|
Log.Debugf("Running mizu version %v", SemVer)
|
||||||
}
|
}
|
||||||
|
@ -15,10 +15,6 @@ func ReportRun(cmd string, args interface{}) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if Branch != "main" {
|
|
||||||
Log.Debugf("reporting only on main branch")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
argsBytes, _ := json.Marshal(args)
|
argsBytes, _ := json.Marshal(args)
|
||||||
argsMap := map[string]string{
|
argsMap := map[string]string{
|
||||||
"telemetry_type": "execution",
|
"telemetry_type": "execution",
|
||||||
@ -26,6 +22,7 @@ func ReportRun(cmd string, args interface{}) {
|
|||||||
"args": string(argsBytes),
|
"args": string(argsBytes),
|
||||||
"component": "mizu_cli",
|
"component": "mizu_cli",
|
||||||
"BuildTimestamp": BuildTimestamp,
|
"BuildTimestamp": BuildTimestamp,
|
||||||
|
"Branch": Branch,
|
||||||
"version": SemVer}
|
"version": SemVer}
|
||||||
argsMap["message"] = fmt.Sprintf("mizu %v - %v", argsMap["cmd"], string(argsBytes))
|
argsMap["message"] = fmt.Sprintf("mizu %v - %v", argsMap["cmd"], string(argsBytes))
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user