Compare commits

...

4 Commits

Author SHA1 Message Date
M. Mert Yıldıran
32dfe40e18 Make EntryItem more responsive (#552) 2021-12-23 10:25:39 +03:00
M. Mert Yıldıran
12aaa762f6 Fix React Hook useEffect has a missing dependency: 'handleQueryChange' warning (#551) 2021-12-22 20:23:21 +03:00
David Levanon
a75bac181d support linkerd (#547)
* support linkerd - initial commit

* renaming readEnvironmentVariable
2021-12-20 13:57:58 +02:00
gadotroee
2d78785558 Fix acceptance tests (after pods status request change) (#545) 2021-12-19 13:46:14 +02:00
10 changed files with 177 additions and 72 deletions

View File

@@ -304,11 +304,10 @@ func cleanupCommand(cmd *exec.Cmd) error {
}
func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
tapStatus := tapStatusInterface.(map[string]interface{})
podsInterface := tapStatus["pods"].([]interface{})
tapPodsInterface := tapStatusInterface.([]interface{})
var pods []map[string]interface{}
for _, podInterface := range podsInterface {
for _, podInterface := range tapPodsInterface {
pods = append(pods, podInterface.(map[string]interface{}))
}

View File

@@ -23,7 +23,6 @@ import (
"path/filepath"
"plugin"
"sort"
"strings"
"syscall"
"time"
@@ -468,11 +467,7 @@ func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) (
}
providers.TapStatus = shared.TapStatus{Pods: kubernetes.GetPodInfosForPods(tapperSyncer.CurrentlyTappedPods)}
tappedPodsStatus := make([]shared.TappedPodStatus, 0)
for _, pod := range providers.TapStatus.Pods {
isTapped := strings.ToLower(providers.TappersStatus[pod.NodeName].Status) == "started"
tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped})
}
tappedPodsStatus := utils.GetTappedPodsStatus()
serializedTapStatus, err := json.Marshal(shared.CreateWebSocketStatusMessage(tappedPodsStatus))
if err != nil {

View File

@@ -3,18 +3,17 @@ package controllers
import (
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"mizuserver/pkg/api"
"mizuserver/pkg/config"
"mizuserver/pkg/holder"
"mizuserver/pkg/providers"
"mizuserver/pkg/up9"
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
"net/http"
"strings"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
func HealthCheck(c *gin.Context) {
@@ -54,11 +53,7 @@ func PostTappedPods(c *gin.Context) {
}
func broadcastTappedPodsStatus() {
tappedPodsStatus := make([]shared.TappedPodStatus, 0)
for _, pod := range providers.TapStatus.Pods {
isTapped := strings.ToLower(providers.TappersStatus[pod.NodeName].Status) == "started"
tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped})
}
tappedPodsStatus := utils.GetTappedPodsStatus()
message := shared.CreateWebSocketStatusMessage(tappedPodsStatus)
if jsonBytes, err := json.Marshal(message); err != nil {
@@ -101,11 +96,7 @@ func GetAuthStatus(c *gin.Context) {
}
func GetTappingStatus(c *gin.Context) {
tappedPodsStatus := make([]shared.TappedPodStatus, 0)
for _, pod := range providers.TapStatus.Pods {
isTapped := strings.ToLower(providers.TappersStatus[pod.NodeName].Status) == "started"
tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped})
}
tappedPodsStatus := utils.GetTappedPodsStatus()
c.JSON(http.StatusOK, tappedPodsStatus)
}

View File

@@ -3,11 +3,12 @@ package utils
import (
"context"
"fmt"
"mizuserver/pkg/providers"
"net/http"
"net/url"
"os"
"os/signal"
"reflect"
"strings"
"syscall"
"time"
@@ -44,15 +45,13 @@ func StartServer(app *gin.Engine) {
}
}
func ReverseSlice(data interface{}) {
value := reflect.ValueOf(data)
valueLen := value.Len()
for i := 0; i <= int((valueLen-1)/2); i++ {
reverseIndex := valueLen - 1 - i
tmp := value.Index(reverseIndex).Interface()
value.Index(reverseIndex).Set(value.Index(i))
value.Index(i).Set(reflect.ValueOf(tmp))
func GetTappedPodsStatus() []shared.TappedPodStatus {
tappedPodsStatus := make([]shared.TappedPodStatus, 0)
for _, pod := range providers.TapStatus.Pods {
isTapped := strings.ToLower(providers.TappersStatus[pod.NodeName].Status) == "started"
tappedPodsStatus = append(tappedPodsStatus, shared.TappedPodStatus{Name: pod.Name, Namespace: pod.Namespace, IsTapped: isTapped})
}
return tappedPodsStatus
}
func CheckErr(e error) {

View File

@@ -0,0 +1,38 @@
package source
import (
"io/ioutil"
"regexp"
"strings"
"github.com/up9inc/mizu/shared/logger"
)
var numberRegex = regexp.MustCompile("[0-9]+")
func getSingleValueFromEnvironmentVariableFile(filePath string, variableName string) (string, error) {
bytes, err := ioutil.ReadFile(filePath)
if err != nil {
logger.Log.Warningf("Error reading environment file %v - %v", filePath, err)
return "", err
}
envs := strings.Split(string(bytes), string([]byte{0}))
for _, env := range envs {
if !strings.Contains(env, "=") {
continue
}
parts := strings.Split(env, "=")
varName := parts[0]
value := parts[1]
if variableName == varName {
return value, nil
}
}
return "", nil
}

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"io/ioutil"
"os"
"regexp"
"strings"
"github.com/up9inc/mizu/shared/logger"
@@ -13,8 +12,6 @@ import (
const envoyBinary = "/envoy"
var numberRegex = regexp.MustCompile("[0-9]+")
func discoverRelevantEnvoyPids(procfs string, pods []v1.Pod) ([]string, error) {
result := make([]string, 0)
@@ -36,7 +33,7 @@ func discoverRelevantEnvoyPids(procfs string, pods []v1.Pod) ([]string, error) {
continue
}
if checkPid(procfs, pid.Name(), pods) {
if checkEnvoyPid(procfs, pid.Name(), pods) {
result = append(result, pid.Name())
}
}
@@ -46,7 +43,7 @@ func discoverRelevantEnvoyPids(procfs string, pods []v1.Pod) ([]string, error) {
return result, nil
}
func checkPid(procfs string, pid string, pods []v1.Pod) bool {
func checkEnvoyPid(procfs string, pid string, pods []v1.Pod) bool {
execLink := fmt.Sprintf("%v/%v/exe", procfs, pid)
exec, err := os.Readlink(execLink)
@@ -63,7 +60,7 @@ func checkPid(procfs string, pid string, pods []v1.Pod) bool {
}
environmentFile := fmt.Sprintf("%v/%v/environ", procfs, pid)
podIp, err := readEnvironmentVariable(environmentFile, "INSTANCE_IP")
podIp, err := getSingleValueFromEnvironmentVariableFile(environmentFile, "INSTANCE_IP")
if err != nil {
return false
@@ -84,30 +81,3 @@ func checkPid(procfs string, pid string, pods []v1.Pod) bool {
return false
}
func readEnvironmentVariable(file string, name string) (string, error) {
bytes, err := ioutil.ReadFile(file)
if err != nil {
logger.Log.Warningf("Error reading environment file %v - %v", file, err)
return "", err
}
envs := strings.Split(string(bytes), string([]byte{0}))
for _, env := range envs {
if !strings.Contains(env, "=") {
continue
}
parts := strings.Split(env, "=")
varName := parts[0]
value := parts[1]
if name == varName {
return value, nil
}
}
return "", nil
}

View File

@@ -0,0 +1,83 @@
package source
import (
"fmt"
"io/ioutil"
"os"
"strings"
"github.com/up9inc/mizu/shared/logger"
v1 "k8s.io/api/core/v1"
)
const linkerdBinary = "/linkerd2-proxy"
func discoverRelevantLinkerdPids(procfs string, pods []v1.Pod) ([]string, error) {
result := make([]string, 0)
pids, err := ioutil.ReadDir(procfs)
if err != nil {
return result, err
}
logger.Log.Infof("Starting linkerd auto discoverer %v %v - scanning %v potential pids",
procfs, pods, len(pids))
for _, pid := range pids {
if !pid.IsDir() {
continue
}
if !numberRegex.MatchString(pid.Name()) {
continue
}
if checkLinkerdPid(procfs, pid.Name(), pods) {
result = append(result, pid.Name())
}
}
logger.Log.Infof("Found %v relevant linkerd processes - %v", len(result), result)
return result, nil
}
func checkLinkerdPid(procfs string, pid string, pods []v1.Pod) bool {
execLink := fmt.Sprintf("%v/%v/exe", procfs, pid)
exec, err := os.Readlink(execLink)
if err != nil {
// Debug on purpose - it may happen due to many reasons and we only care
// for it during troubleshooting
//
logger.Log.Debugf("Unable to read link %v - %v\n", execLink, err)
return false
}
if !strings.HasSuffix(exec, linkerdBinary) {
return false
}
environmentFile := fmt.Sprintf("%v/%v/environ", procfs, pid)
podName, err := getSingleValueFromEnvironmentVariableFile(environmentFile, "_pod_name")
if err != nil {
return false
}
if podName == "" {
logger.Log.Debugf("Found a linkerd process without _pod_name variable %v\n", pid)
return false
}
logger.Log.Infof("Found linkerd pid %v with pod name %v", pid, podName)
for _, pod := range pods {
if pod.Name == podName {
return true
}
}
return false
}

View File

@@ -16,7 +16,7 @@ type PacketSourceManager struct {
}
func NewPacketSourceManager(procfs string, pids string, filename string, interfaceName string,
istio bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
sources := make([]*tcpPacketSource, 0)
sources, err := createHostSource(sources, filename, interfaceName, behaviour)
@@ -25,7 +25,8 @@ func NewPacketSourceManager(procfs string, pids string, filename string, interfa
}
sources = createSourcesFromPids(sources, procfs, pids, interfaceName, behaviour)
sources = createSourcesFromEnvoy(sources, istio, procfs, pods, interfaceName, behaviour)
sources = createSourcesFromEnvoy(sources, mtls, procfs, pods, interfaceName, behaviour)
sources = createSourcesFromLinkerd(sources, mtls, procfs, pods, interfaceName, behaviour)
return &PacketSourceManager{
sources: sources,
@@ -54,13 +55,13 @@ func createSourcesFromPids(sources []*tcpPacketSource, procfs string, pids strin
return sources
}
func createSourcesFromEnvoy(sources []*tcpPacketSource, istio bool, procfs string, clusterIps []v1.Pod,
func createSourcesFromEnvoy(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if !istio {
if !mtls {
return sources
}
envoyPids, err := discoverRelevantEnvoyPids(procfs, clusterIps)
envoyPids, err := discoverRelevantEnvoyPids(procfs, pods)
if err != nil {
logger.Log.Warningf("Unable to discover envoy pids - %v", err)
@@ -73,6 +74,25 @@ func createSourcesFromEnvoy(sources []*tcpPacketSource, istio bool, procfs strin
return sources
}
func createSourcesFromLinkerd(sources []*tcpPacketSource, mtls bool, procfs string, pods []v1.Pod,
interfaceName string, behaviour TcpPacketSourceBehaviour) []*tcpPacketSource {
if !mtls {
return sources
}
linkerdPids, err := discoverRelevantLinkerdPids(procfs, pods)
if err != nil {
logger.Log.Warningf("Unable to discover linkerd pids - %v", err)
return sources
}
netnsSources := newNetnsPacketSources(procfs, linkerdPids, interfaceName, behaviour)
sources = append(sources, netnsSources...)
return sources
}
func newHostPacketSource(filename string, interfaceName string,
behaviour TcpPacketSourceBehaviour) (*tcpPacketSource, error) {
var name string

View File

@@ -92,3 +92,13 @@
.ip
margin-left: 5px
@media (max-width: 1760px)
.timestamp
display: none
.separatorRight
border-right: 0px
@media (max-width: 1340px)
.separatorRight
display: none

View File

@@ -91,7 +91,7 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
useEffect(() => {
handleQueryChange(query);
}, [query]);
}, [query, handleQueryChange]);
useEffect(() => {
if (query) {