mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-30 16:23:27 +00:00
Fixes (#171)
This commit is contained in:
parent
60533a9591
commit
0244f12167
@ -13,6 +13,7 @@ require (
|
||||
github.com/go-playground/validator/v10 v10.5.0
|
||||
github.com/google/martian v2.1.0+incompatible
|
||||
github.com/gorilla/websocket v1.4.2
|
||||
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
|
@ -28,13 +28,13 @@ var k8sResolver *resolver.Resolver
|
||||
|
||||
func StartResolving(namespace string) {
|
||||
errOut := make(chan error, 100)
|
||||
res, err := resolver.NewFromInCluster(errOut)
|
||||
res, err := resolver.NewFromInCluster(errOut, namespace)
|
||||
if err != nil {
|
||||
rlog.Infof("error creating k8s resolver %s", err)
|
||||
return
|
||||
}
|
||||
ctx := context.Background()
|
||||
res.Start(ctx, namespace)
|
||||
res.Start(ctx)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
|
@ -1,6 +1,7 @@
|
||||
package resolver
|
||||
|
||||
import (
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
|
||||
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
|
||||
@ -9,7 +10,7 @@ import (
|
||||
restclient "k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
func NewFromInCluster(errOut chan error) (*Resolver, error) {
|
||||
func NewFromInCluster(errOut chan error, namesapce string) (*Resolver, error) {
|
||||
config, err := restclient.InClusterConfig()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -18,5 +19,5 @@ func NewFromInCluster(errOut chan error) (*Resolver, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: make(map[string]string), serviceMap: make(map[string]string), errOut: errOut}, nil
|
||||
return &Resolver{clientConfig: config, clientSet: clientset, nameMap: cmap.New(), serviceMap: cmap.New(), errOut: errOut, namespace: namesapce}, nil
|
||||
}
|
||||
|
@ -7,12 +7,14 @@ import (
|
||||
"github.com/romana/rlog"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
|
||||
"github.com/orcaman/concurrent-map"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
)
|
||||
|
||||
const (
|
||||
kubClientNullString = "None"
|
||||
)
|
||||
@ -20,17 +22,16 @@ const (
|
||||
type Resolver struct {
|
||||
clientConfig *restclient.Config
|
||||
clientSet *kubernetes.Clientset
|
||||
nameMap map[string]string
|
||||
serviceMap map[string]string
|
||||
nameMap cmap.ConcurrentMap
|
||||
serviceMap cmap.ConcurrentMap
|
||||
isStarted bool
|
||||
errOut chan error
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (resolver *Resolver) Start(ctx context.Context, namespace string) {
|
||||
func (resolver *Resolver) Start(ctx context.Context) {
|
||||
if !resolver.isStarted {
|
||||
resolver.isStarted = true
|
||||
resolver.namespace = namespace
|
||||
|
||||
go resolver.infiniteErrorHandleRetryFunc(ctx, resolver.watchServices)
|
||||
go resolver.infiniteErrorHandleRetryFunc(ctx, resolver.watchEndpoints)
|
||||
@ -39,19 +40,19 @@ func (resolver *Resolver) Start(ctx context.Context, namespace string) {
|
||||
}
|
||||
|
||||
func (resolver *Resolver) Resolve(name string) string {
|
||||
resolvedName, isFound := resolver.nameMap[name]
|
||||
resolvedName, isFound := resolver.nameMap.Get(name)
|
||||
if !isFound {
|
||||
return ""
|
||||
}
|
||||
return resolvedName
|
||||
return resolvedName.(string)
|
||||
}
|
||||
|
||||
func (resolver *Resolver) GetMap() map[string]string {
|
||||
func (resolver *Resolver) GetMap() cmap.ConcurrentMap {
|
||||
return resolver.nameMap
|
||||
}
|
||||
|
||||
func (resolver *Resolver) CheckIsServiceIP(address string) bool {
|
||||
_, isFound := resolver.serviceMap[address]
|
||||
_, isFound := resolver.serviceMap.Get(address)
|
||||
return isFound
|
||||
}
|
||||
|
||||
@ -63,17 +64,17 @@ func (resolver *Resolver) watchPods(ctx context.Context) error {
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case event := <- watcher.ResultChan():
|
||||
if event.Object == nil {
|
||||
return errors.New("error in kubectl pod watch")
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
pod := event.Object.(*corev1.Pod)
|
||||
resolver.saveResolvedName(pod.Status.PodIP, "", event.Type)
|
||||
}
|
||||
case <- ctx.Done():
|
||||
watcher.Stop()
|
||||
return nil
|
||||
case event := <-watcher.ResultChan():
|
||||
if event.Object == nil {
|
||||
return errors.New("error in kubectl pod watch")
|
||||
}
|
||||
if event.Type == watch.Deleted {
|
||||
pod := event.Object.(*corev1.Pod)
|
||||
resolver.saveResolvedName(pod.Status.PodIP, "", event.Type)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
watcher.Stop()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -86,37 +87,37 @@ func (resolver *Resolver) watchEndpoints(ctx context.Context) error {
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case event := <- watcher.ResultChan():
|
||||
if event.Object == nil {
|
||||
return errors.New("error in kubectl endpoint watch")
|
||||
}
|
||||
endpoint := event.Object.(*corev1.Endpoints)
|
||||
serviceHostname := fmt.Sprintf("%s.%s", endpoint.Name, endpoint.Namespace)
|
||||
if endpoint.Subsets != nil {
|
||||
for _, subset := range endpoint.Subsets {
|
||||
var ports []int32
|
||||
if subset.Ports != nil {
|
||||
for _, portMapping := range subset.Ports {
|
||||
if portMapping.Port > 0 {
|
||||
ports = append(ports, portMapping.Port)
|
||||
}
|
||||
case event := <-watcher.ResultChan():
|
||||
if event.Object == nil {
|
||||
return errors.New("error in kubectl endpoint watch")
|
||||
}
|
||||
endpoint := event.Object.(*corev1.Endpoints)
|
||||
serviceHostname := fmt.Sprintf("%s.%s", endpoint.Name, endpoint.Namespace)
|
||||
if endpoint.Subsets != nil {
|
||||
for _, subset := range endpoint.Subsets {
|
||||
var ports []int32
|
||||
if subset.Ports != nil {
|
||||
for _, portMapping := range subset.Ports {
|
||||
if portMapping.Port > 0 {
|
||||
ports = append(ports, portMapping.Port)
|
||||
}
|
||||
}
|
||||
if subset.Addresses != nil {
|
||||
for _, address := range subset.Addresses {
|
||||
resolver.saveResolvedName(address.IP, serviceHostname, event.Type)
|
||||
for _, port := range ports {
|
||||
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
|
||||
resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
if subset.Addresses != nil {
|
||||
for _, address := range subset.Addresses {
|
||||
resolver.saveResolvedName(address.IP, serviceHostname, event.Type)
|
||||
for _, port := range ports {
|
||||
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
|
||||
resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
case <- ctx.Done():
|
||||
watcher.Stop()
|
||||
return nil
|
||||
}
|
||||
case <-ctx.Done():
|
||||
watcher.Stop()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -129,7 +130,7 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case event := <- watcher.ResultChan():
|
||||
case event := <-watcher.ResultChan():
|
||||
if event.Object == nil {
|
||||
return errors.New("error in kubectl service watch")
|
||||
}
|
||||
@ -145,7 +146,7 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
resolver.saveResolvedName(ingress.IP, serviceHostname, event.Type)
|
||||
}
|
||||
}
|
||||
case <- ctx.Done():
|
||||
case <-ctx.Done():
|
||||
watcher.Stop()
|
||||
return nil
|
||||
}
|
||||
@ -154,19 +155,19 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
|
||||
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
|
||||
if eventType == watch.Deleted {
|
||||
delete(resolver.nameMap, key)
|
||||
resolver.nameMap.Remove(key)
|
||||
rlog.Infof("setting %s=nil\n", key)
|
||||
} else {
|
||||
resolver.nameMap[key] = resolved
|
||||
resolver.nameMap.Set(key, resolved)
|
||||
rlog.Infof("setting %s=%s\n", key, resolved)
|
||||
}
|
||||
}
|
||||
|
||||
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
|
||||
if eventType == watch.Deleted {
|
||||
delete(resolver.serviceMap, key)
|
||||
resolver.serviceMap.Remove(key)
|
||||
} else {
|
||||
resolver.serviceMap[key] = resolved
|
||||
resolver.serviceMap.Set(key, resolved)
|
||||
}
|
||||
}
|
||||
|
||||
@ -189,4 +190,3 @@ func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,8 +3,8 @@ package cmd
|
||||
import (
|
||||
"context"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/fsUtils"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/logsUtils"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"os"
|
||||
"path"
|
||||
@ -32,7 +32,7 @@ var logsCmd = &cobra.Command{
|
||||
}
|
||||
mizu.Log.Debugf("Using file path %s", filePath)
|
||||
|
||||
if err := logsUtils.DumpLogs(kubernetesProvider, ctx, filePath); err != nil {
|
||||
if err := fsUtils.DumpLogs(kubernetesProvider, ctx, filePath); err != nil {
|
||||
mizu.Log.Errorf("Failed dump logs %v", err)
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/up9inc/mizu/cli/fsUtils"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
)
|
||||
|
||||
@ -13,6 +14,10 @@ var rootCmd = &cobra.Command{
|
||||
Long: `A web traffic viewer for kubernetes
|
||||
Further info is available at https://github.com/up9inc/mizu`,
|
||||
PersistentPreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
if err := fsUtils.EnsureDir(mizu.GetMizuFolderPath()); err != nil {
|
||||
mizu.Log.Errorf("Failed to use mizu folder, %v", err)
|
||||
}
|
||||
mizu.InitLogger()
|
||||
if err := mizu.InitConfig(cmd); err != nil {
|
||||
mizu.Log.Errorf("Invalid config, Exit %s", err)
|
||||
return errors.New(fmt.Sprintf("%v", err))
|
||||
|
@ -5,6 +5,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/fsUtils"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@ -17,7 +18,6 @@ import (
|
||||
|
||||
"github.com/up9inc/mizu/cli/errormessage"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/logsUtils"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"github.com/up9inc/mizu/cli/uiUtils"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
@ -62,7 +62,6 @@ func RunMizuTap() {
|
||||
return
|
||||
}
|
||||
|
||||
defer cleanUpMizuResources(kubernetesProvider)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel() // cancel will be called when this function exits
|
||||
|
||||
@ -96,6 +95,7 @@ func RunMizuTap() {
|
||||
|
||||
nodeToTappedPodIPMap := getNodeHostToTappedPodIpsMap(state.currentlyTappedPods)
|
||||
|
||||
defer cleanUpMizuResources(kubernetesProvider)
|
||||
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, mizuApiFilteringOptions, mizuValidationRules); err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
|
||||
return
|
||||
@ -248,7 +248,7 @@ func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
|
||||
if mizu.Config.DumpLogs {
|
||||
mizuDir := mizu.GetMizuFolderPath()
|
||||
filePath = path.Join(mizuDir, fmt.Sprintf("mizu_logs_%s.zip", time.Now().Format("2006_01_02__15_04_05")))
|
||||
if err := logsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil {
|
||||
if err := fsUtils.DumpLogs(kubernetesProvider, removalCtx, filePath); err != nil {
|
||||
mizu.Log.Errorf("Failed dump logs %v", err)
|
||||
}
|
||||
}
|
||||
@ -355,7 +355,7 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
restartTappers := func() {
|
||||
err, changeFound := updateCurrentlyTappedPods(kubernetesProvider, ctx, targetNamespaces)
|
||||
if err != nil {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error getting pods by regex: %v", errormessage.FormatError(err)))
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("Failed to update currently tapped pods: %v", err))
|
||||
cancel()
|
||||
}
|
||||
|
||||
@ -398,11 +398,15 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
|
||||
restartTappersDebouncer.SetOn()
|
||||
}
|
||||
|
||||
case <-errorChan:
|
||||
case err := <-errorChan:
|
||||
mizu.Log.Debugf("Watching pods loop, got error %v, stopping restart tappers debouncer", err)
|
||||
restartTappersDebouncer.Cancel()
|
||||
// TODO: Does this also perform cleanup?
|
||||
cancel()
|
||||
|
||||
case <-ctx.Done():
|
||||
mizu.Log.Debugf("Watching pods loop, context done, stopping restart tappers debouncer")
|
||||
restartTappersDebouncer.Cancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -470,6 +474,10 @@ func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernet
|
||||
cancel()
|
||||
return
|
||||
case modifiedPod := <-modified:
|
||||
if modifiedPod == nil {
|
||||
mizu.Log.Debugf("Got agent pod modified event, status phase: %v", modifiedPod.Status.Phase)
|
||||
continue
|
||||
}
|
||||
mizu.Log.Debugf("Got agent pod modified event, status phase: %v", modifiedPod.Status.Phase)
|
||||
if modifiedPod.Status.Phase == core.PodRunning && !isPodReady {
|
||||
isPodReady = true
|
||||
@ -489,7 +497,7 @@ func createProxyToApiServerPod(ctx context.Context, kubernetesProvider *kubernet
|
||||
}
|
||||
case <-timeAfter:
|
||||
if !isPodReady {
|
||||
mizu.Log.Errorf(uiUtils.Error, fmt.Sprintf("%s pod was not ready in time", mizu.ApiServerPodName))
|
||||
mizu.Log.Errorf(uiUtils.Error, "Mizu API server was not ready in time")
|
||||
cancel()
|
||||
}
|
||||
case <-errorChan:
|
||||
|
26
cli/fsUtils/dirUtils.go
Normal file
26
cli/fsUtils/dirUtils.go
Normal file
@ -0,0 +1,26 @@
|
||||
package fsUtils
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
)
|
||||
|
||||
func EnsureDir(dirName string) error {
|
||||
err := os.Mkdir(dirName, 0700)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if os.IsExist(err) {
|
||||
// check that the existing path is a directory
|
||||
info, err := os.Stat(dirName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !info.IsDir() {
|
||||
return errors.New(fmt.Sprintf("path exists but is not a directory: %s", dirName))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
58
cli/fsUtils/mizuLogsUtils.go
Normal file
58
cli/fsUtils/mizuLogsUtils.go
Normal file
@ -0,0 +1,58 @@
|
||||
package fsUtils
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"os"
|
||||
"regexp"
|
||||
)
|
||||
|
||||
func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath string) error {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^mizu-"))
|
||||
pods, err := provider.ListAllPodsMatchingRegex(ctx, podExactRegex, []string{mizu.Config.MizuResourcesNamespace})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(pods) == 0 {
|
||||
return fmt.Errorf("no pods found in namespace %s", mizu.Config.MizuResourcesNamespace)
|
||||
}
|
||||
|
||||
newZipFile, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer newZipFile.Close()
|
||||
zipWriter := zip.NewWriter(newZipFile)
|
||||
defer zipWriter.Close()
|
||||
|
||||
for _, pod := range pods {
|
||||
logs, err := provider.GetPodLogs(pod.Namespace, pod.Name, ctx)
|
||||
if err != nil {
|
||||
mizu.Log.Errorf("Failed to get logs, %v", err)
|
||||
continue
|
||||
} else {
|
||||
mizu.Log.Debugf("Successfully read log length %d for pod: %s.%s", len(logs), pod.Namespace, pod.Name)
|
||||
}
|
||||
if err := AddStrToZip(zipWriter, logs, fmt.Sprintf("%s.%s.log", pod.Namespace, pod.Name)); err != nil {
|
||||
mizu.Log.Errorf("Failed write logs, %v", err)
|
||||
} else {
|
||||
mizu.Log.Infof("Successfully added log length %d from pod: %s.%s", len(logs), pod.Namespace, pod.Name)
|
||||
}
|
||||
}
|
||||
if err := AddFileToZip(zipWriter, mizu.GetConfigFilePath()); err != nil {
|
||||
mizu.Log.Debugf("Failed write file, %v", err)
|
||||
} else {
|
||||
mizu.Log.Infof("Successfully added file %s", mizu.GetConfigFilePath())
|
||||
}
|
||||
if err := AddFileToZip(zipWriter, mizu.GetLogFilePath()); err != nil {
|
||||
mizu.Log.Errorf("Failed write file, %v", err)
|
||||
} else {
|
||||
mizu.Log.Infof("Successfully added file %s", mizu.GetLogFilePath())
|
||||
}
|
||||
mizu.Log.Infof("You can find the zip with all logs in %s\n", filePath)
|
||||
return nil
|
||||
}
|
55
cli/fsUtils/zipUtils.go
Normal file
55
cli/fsUtils/zipUtils.go
Normal file
@ -0,0 +1,55 @@
|
||||
package fsUtils
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
func AddFileToZip(zipWriter *zip.Writer, filename string) error {
|
||||
|
||||
fileToZip, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file %s, %w", filename, err)
|
||||
}
|
||||
defer fileToZip.Close()
|
||||
|
||||
// Get the file information
|
||||
info, err := fileToZip.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get file information %s, %w", filename, err)
|
||||
}
|
||||
|
||||
header, err := zip.FileInfoHeader(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Using FileInfoHeader() above only uses the basename of the file. If we want
|
||||
// to preserve the folder structure we can overwrite this with the full path.
|
||||
header.Name = filepath.Base(filename)
|
||||
|
||||
// Change to deflate to gain better compression
|
||||
// see http://golang.org/pkg/archive/zip/#pkg-constants
|
||||
header.Method = zip.Deflate
|
||||
|
||||
writer, err := zipWriter.CreateHeader(header)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create header in zip for %s, %w", filename, err)
|
||||
}
|
||||
_, err = io.Copy(writer, fileToZip)
|
||||
return err
|
||||
}
|
||||
|
||||
func AddStrToZip(writer *zip.Writer, logs string, fileName string) error {
|
||||
if zipFile, err := writer.Create(fileName); err != nil {
|
||||
return fmt.Errorf("couldn't create a log file inside zip for %s, %w", fileName, err)
|
||||
} else {
|
||||
if _, err = zipFile.Write([]byte(logs)); err != nil {
|
||||
return fmt.Errorf("couldn't write logs to zip file: %s, %w", fileName, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -683,7 +683,7 @@ func (provider *Provider) ListAllPodsMatchingRegex(ctx context.Context, regex *r
|
||||
for _, namespace := range namespaces {
|
||||
namespacePods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get pods in ns: %s, %w", namespace, err)
|
||||
return nil, fmt.Errorf("failed to get pods in ns: [%s], %w", namespace, err)
|
||||
}
|
||||
|
||||
pods = append(pods, namespacePods.Items...)
|
||||
|
@ -1,106 +0,0 @@
|
||||
package logsUtils
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/up9inc/mizu/cli/kubernetes"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
)
|
||||
|
||||
func DumpLogs(provider *kubernetes.Provider, ctx context.Context, filePath string) error {
|
||||
podExactRegex := regexp.MustCompile(fmt.Sprintf("^mizu-"))
|
||||
pods, err := provider.ListAllPodsMatchingRegex(ctx, podExactRegex, []string{mizu.Config.MizuResourcesNamespace})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(pods) == 0 {
|
||||
return fmt.Errorf("no pods found in namespace %s", mizu.Config.MizuResourcesNamespace)
|
||||
}
|
||||
|
||||
newZipFile, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer newZipFile.Close()
|
||||
zipWriter := zip.NewWriter(newZipFile)
|
||||
defer zipWriter.Close()
|
||||
|
||||
for _, pod := range pods {
|
||||
logs, err := provider.GetPodLogs(pod.Namespace, pod.Name, ctx)
|
||||
if err != nil {
|
||||
mizu.Log.Errorf("Failed to get logs, %v", err)
|
||||
continue
|
||||
} else {
|
||||
mizu.Log.Debugf("Successfully read log length %d for pod: %s.%s", len(logs), pod.Namespace, pod.Name)
|
||||
}
|
||||
if err := addLogsToZip(zipWriter, logs, fmt.Sprintf("%s.%s.log", pod.Namespace, pod.Name)); err != nil {
|
||||
mizu.Log.Errorf("Failed write logs, %v", err)
|
||||
} else {
|
||||
mizu.Log.Infof("Successfully added log length %d from pod: %s.%s", len(logs), pod.Namespace, pod.Name)
|
||||
}
|
||||
}
|
||||
if err := addFileToZip(zipWriter, mizu.GetConfigFilePath()); err != nil {
|
||||
mizu.Log.Errorf("Failed write file, %v", err)
|
||||
} else {
|
||||
mizu.Log.Infof("Successfully added file %s", mizu.GetConfigFilePath())
|
||||
}
|
||||
if err := addFileToZip(zipWriter, mizu.GetLogFilePath()); err != nil {
|
||||
mizu.Log.Errorf("Failed write file, %v", err)
|
||||
} else {
|
||||
mizu.Log.Infof("Successfully added file %s", mizu.GetLogFilePath())
|
||||
}
|
||||
mizu.Log.Infof("You can find the zip with all logs in %s\n", filePath)
|
||||
return nil
|
||||
}
|
||||
|
||||
func addFileToZip(zipWriter *zip.Writer, filename string) error {
|
||||
|
||||
fileToZip, err := os.Open(filename)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file %s, %w", filename, err)
|
||||
}
|
||||
defer fileToZip.Close()
|
||||
|
||||
// Get the file information
|
||||
info, err := fileToZip.Stat()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get file information %s, %w", filename, err)
|
||||
}
|
||||
|
||||
header, err := zip.FileInfoHeader(info)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Using FileInfoHeader() above only uses the basename of the file. If we want
|
||||
// to preserve the folder structure we can overwrite this with the full path.
|
||||
header.Name = filepath.Base(filename)
|
||||
|
||||
// Change to deflate to gain better compression
|
||||
// see http://golang.org/pkg/archive/zip/#pkg-constants
|
||||
header.Method = zip.Deflate
|
||||
|
||||
writer, err := zipWriter.CreateHeader(header)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create header in zip for %s, %w", filename, err)
|
||||
}
|
||||
_, err = io.Copy(writer, fileToZip)
|
||||
return err
|
||||
}
|
||||
|
||||
func addLogsToZip(writer *zip.Writer, logs string, fileName string) error {
|
||||
if zipFile, err := writer.Create(fileName); err != nil {
|
||||
return fmt.Errorf("couldn't create a log file inside zip for %s, %w", fileName, err)
|
||||
} else {
|
||||
if _, err = zipFile.Write([]byte(logs)); err != nil {
|
||||
return fmt.Errorf("couldn't write logs to zip file: %s, %w", fileName, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
@ -2,10 +2,8 @@ package main
|
||||
|
||||
import (
|
||||
"github.com/up9inc/mizu/cli/cmd"
|
||||
"github.com/up9inc/mizu/cli/mizu"
|
||||
)
|
||||
|
||||
func main() {
|
||||
mizu.InitLogger()
|
||||
cmd.Execute()
|
||||
}
|
||||
|
@ -37,7 +37,7 @@ var Config = ConfigStruct{}
|
||||
func (config *ConfigStruct) Validate() error {
|
||||
if config.IsNsRestrictedMode() {
|
||||
if config.Tap.AllNamespaces || len(config.Tap.Namespaces) != 1 || config.Tap.Namespaces[0] != config.MizuResourcesNamespace {
|
||||
return fmt.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n" +
|
||||
return fmt.Errorf("Not supported mode. Mizu can't resolve IPs in other namespaces when running in namespace restricted mode.\n"+
|
||||
"You can use the same namespace for --%s and --%s", configStructs.NamespacesTapName, MizuResourcesNamespaceConfigName)
|
||||
}
|
||||
}
|
||||
@ -104,38 +104,34 @@ func initFlag(f *pflag.Flag) {
|
||||
}
|
||||
|
||||
if f.Name == SetCommandName {
|
||||
if setError := mergeSetFlag(sliceValue.GetSlice()); setError != nil {
|
||||
Log.Warningf(uiUtils.Red, fmt.Sprintf("%v", setError))
|
||||
}
|
||||
mergeSetFlag(sliceValue.GetSlice())
|
||||
return
|
||||
}
|
||||
|
||||
mergeFlagValues(configElem, f.Name, sliceValue.GetSlice())
|
||||
}
|
||||
|
||||
func mergeSetFlag(setValues []string) error {
|
||||
func mergeSetFlag(setValues []string) {
|
||||
configElem := reflect.ValueOf(&Config).Elem()
|
||||
|
||||
for _, setValue := range setValues {
|
||||
if !strings.Contains(setValue, Separator) {
|
||||
return errors.New(fmt.Sprintf("invalid set argument %s", setValue))
|
||||
Log.Warningf(uiUtils.Warning, fmt.Sprintf("Ignoring set argument %s (set argument format: <flag name>=<flag value>)", setValue))
|
||||
}
|
||||
|
||||
split := strings.SplitN(setValue, Separator, 2)
|
||||
if len(split) != 2 {
|
||||
return errors.New(fmt.Sprintf("invalid set argument %s", setValue))
|
||||
Log.Warningf(uiUtils.Warning, fmt.Sprintf("Ignoring set argument %s (set argument format: <flag name>=<flag value>)", setValue))
|
||||
}
|
||||
|
||||
argumentKey, argumentValue := split[0], split[1]
|
||||
|
||||
if !Contains(allowedSetFlags, argumentKey) {
|
||||
return errors.New(fmt.Sprintf("invalid set flag name %s, allowed set flag names: \"%s\"", argumentKey, strings.Join(allowedSetFlags, "\", \"")))
|
||||
Log.Warningf(uiUtils.Warning, fmt.Sprintf("Ignoring set argument %s, flag name must be one of the following: \"%s\"", setValue, strings.Join(allowedSetFlags, "\", \"")))
|
||||
}
|
||||
|
||||
mergeFlagValue(configElem, argumentKey, argumentValue)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func mergeFlagValue(currentElem reflect.Value, flagKey string, flagValue string) {
|
||||
|
@ -1,7 +1,6 @@
|
||||
package mizu
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/op/go-logging"
|
||||
"os"
|
||||
"path"
|
||||
@ -21,7 +20,7 @@ func InitLogger() {
|
||||
logPath := GetLogFilePath()
|
||||
f, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Failed mizu log file: %v, err %v", logPath, err))
|
||||
Log.Infof("Failed to open mizu log file: %v, err %v", logPath, err)
|
||||
}
|
||||
|
||||
fileLog := logging.NewLogBackend(f, "", 0)
|
||||
|
@ -1,6 +1,7 @@
|
||||
package debounce
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
@ -13,9 +14,10 @@ func NewDebouncer(timeout time.Duration, callback func()) *Debouncer {
|
||||
|
||||
type Debouncer struct {
|
||||
callback func()
|
||||
running bool
|
||||
timeout time.Duration
|
||||
timer *time.Timer
|
||||
running bool
|
||||
canceled bool
|
||||
timeout time.Duration
|
||||
timer *time.Timer
|
||||
}
|
||||
|
||||
func (d *Debouncer) setTimeout(timeout time.Duration) {
|
||||
@ -25,18 +27,28 @@ func (d *Debouncer) setTimeout(timeout time.Duration) {
|
||||
|
||||
func (d *Debouncer) setCallback(callback func()) {
|
||||
callbackWrapped := func() {
|
||||
callback()
|
||||
if !d.canceled {
|
||||
callback()
|
||||
}
|
||||
d.running = false
|
||||
}
|
||||
|
||||
d.callback = callbackWrapped
|
||||
}
|
||||
|
||||
func (d *Debouncer) SetOn() {
|
||||
func (d *Debouncer) Cancel() {
|
||||
d.canceled = true
|
||||
}
|
||||
|
||||
func (d *Debouncer) SetOn() error {
|
||||
if d.canceled {
|
||||
return fmt.Errorf("debouncer cancelled")
|
||||
}
|
||||
if d.running == true {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
|
||||
d.running = true
|
||||
d.timer = time.AfterFunc(d.timeout, d.callback)
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user