kubeshark/agent/pkg/api/main.go
Gustavo Massaneiro d5fd2ff1da
Service Map (#623)
* debug builds and gcflags

* update dockerfile for debug

* service map routes and controller

* service map graph structure

* service map interface and new methods

* adding service map edges from mizu entries

* new service map count methods

* implementing the status endpoint

* ServiceMapResponse and ServiceMapEdge models

* service map get endpoint logic

* reset logic and endpoint

* fixed service map get status

* improvements to graph node structure

* front-end implementation and service map buttons

* new render endpoint to render the graph in real time

* spinner sass

* new ServiceMapModal component

* testing react-force-graph-2d lib

* Improvements to service map graph structure, added node id and updated edge source/destination type

* Revert "testing react-force-graph-2d lib"

This reverts commit 1153938386.

* testing react-graph-vis lib

* updated to work with react-graph-vis lib

* removed render endpoint

* go mod tidy

* serviceMap config flag

* using the serviceMap config flag

* passing mizu config to service map as a dependency

* service map tests

* Removed print functions

* finished service map tests

* new service property

* service map controller tests

* moved service map reset button to service map modal
reset closes the modal

* service map modal refresh button and logic

* reset button resets data and refresh

* service map modal close button

* node size/edge size based on the count value
edge color based on protocol

* nodes and edges shadow

* enabled physics to avoid node overlap, changed kafka protocol color to dark green

* showing edges count values and fixed bidirectional edges overlap

* go mod tidy

* removed console.log

* Using the destination node protocol instead of the source node protocol

* Revert "debug builds and gcflags"
Addressed by #624 and #626

This reverts commit 17ecaece3e.

* Revert "update dockerfile for debug"
Addressed by #635

This reverts commit 5dfc15b140.

* using the entire tap Protocol struct instead of only the protocol name

* using the backend protocol background color for node colors

* fixed test, the node list order can change

* re-factoring to get 100% coverage

* using protocol colors just for edges

* re-factored service map to use TCP Entry data. Node key is the entry ip-address instead of the name

* fallback to ip-address when entry name is unresolved

* re-factored front-end

* adjustment to main div style

* added support for multiple protocols for the same edge

* using the item protocol instead of the extension variable

* fixed controller tests

* displaying service name and ip-address on graph nodes

* fixed service map test, we cannot guarantee the slice order

* auth middleware

* created a new pkg for the service map

* re-factoring

* re-factored front-end

* reverting the import order as previous

* Aligning with other UI feature flags handling

* we don't need to get the status anymore, we have window["isServiceMapEnabled"]

* small adjustments

* renamed from .tsx to .ts

* button styles and minor improvements

* moved service map modal from trafficPage to app component

Co-authored-by: Igor Gov <igor.govorov1@gmail.com>
2022-01-19 15:27:12 -03:00

184 lines
4.9 KiB
Go

package api
import (
"bufio"
"context"
"encoding/json"
"fmt"
"mizuserver/pkg/holder"
"mizuserver/pkg/providers"
"os"
"path"
"sort"
"strings"
"time"
"mizuserver/pkg/servicemap"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/models"
"mizuserver/pkg/oas"
"mizuserver/pkg/resolver"
"mizuserver/pkg/utils"
basenine "github.com/up9inc/basenine/client/go"
)
var k8sResolver *resolver.Resolver
func StartResolving(namespace string) {
errOut := make(chan error, 100)
res, err := resolver.NewFromInCluster(errOut, namespace)
if err != nil {
logger.Log.Infof("error creating k8s resolver %s", err)
return
}
ctx := context.Background()
res.Start(ctx)
go func() {
for {
select {
case err := <-errOut:
logger.Log.Infof("name resolving error %s", err)
}
}
}()
k8sResolver = res
holder.SetResolver(res)
}
func StartReadingEntries(harChannel <-chan *tapApi.OutputChannelItem, workingDir *string, extensionsMap map[string]*tapApi.Extension) {
if workingDir != nil && *workingDir != "" {
startReadingFiles(*workingDir)
} else {
startReadingChannel(harChannel, extensionsMap)
}
}
func startReadingFiles(workingDir string) {
if err := os.MkdirAll(workingDir, os.ModePerm); err != nil {
logger.Log.Errorf("Failed to make dir: %s, err: %v", workingDir, err)
return
}
for true {
dir, _ := os.Open(workingDir)
dirFiles, _ := dir.Readdir(-1)
var harFiles []os.FileInfo
for _, fileInfo := range dirFiles {
if strings.HasSuffix(fileInfo.Name(), ".har") {
harFiles = append(harFiles, fileInfo)
}
}
sort.Sort(utils.ByModTime(harFiles))
if len(harFiles) == 0 {
logger.Log.Infof("Waiting for new files")
time.Sleep(3 * time.Second)
continue
}
fileInfo := harFiles[0]
inputFilePath := path.Join(workingDir, fileInfo.Name())
file, err := os.Open(inputFilePath)
utils.CheckErr(err)
var inputHar har.HAR
decErr := json.NewDecoder(bufio.NewReader(file)).Decode(&inputHar)
utils.CheckErr(decErr)
rmErr := os.Remove(inputFilePath)
utils.CheckErr(rmErr)
}
}
func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extensionsMap map[string]*tapApi.Extension) {
if outputItems == nil {
panic("Channel of captured messages is nil")
}
connection, err := basenine.NewConnection(shared.BasenineHost, shared.BaseninePort)
if err != nil {
panic(err)
}
connection.InsertMode()
disableOASValidation := false
ctx := context.Background()
doc, contractContent, router, err := loadOAS(ctx)
if err != nil {
logger.Log.Infof("Disabled OAS validation: %s", err.Error())
disableOASValidation = true
}
for item := range outputItems {
providers.EntryAdded()
extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation)
if extension.Protocol.Name == "http" {
if !disableOASValidation {
var httpPair tapApi.HTTPRequestResponsePair
json.Unmarshal([]byte(mizuEntry.HTTPPair), &httpPair)
contract := handleOAS(ctx, doc, router, httpPair.Request.Payload.RawRequest, httpPair.Response.Payload.RawResponse, contractContent)
mizuEntry.ContractStatus = contract.Status
mizuEntry.ContractRequestReason = contract.RequestReason
mizuEntry.ContractResponseReason = contract.ResponseReason
mizuEntry.ContractContent = contract.Content
}
harEntry, err := utils.NewEntry(mizuEntry.Request, mizuEntry.Response, mizuEntry.StartTime, mizuEntry.ElapsedTime)
if err == nil {
rules, _, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Destination.Name)
mizuEntry.Rules = rules
}
oas.GetOasGeneratorInstance().PushEntry(harEntry)
}
data, err := json.Marshal(mizuEntry)
if err != nil {
panic(err)
}
connection.SendText(string(data))
servicemap.GetInstance().NewTCPEntry(mizuEntry.Source, mizuEntry.Destination, &item.Protocol)
}
}
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) {
if k8sResolver != nil {
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
}
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}
}
}
return resolvedSource, resolvedDestination
}
func CheckIsServiceIP(address string) bool {
if k8sResolver == nil {
return false
}
return k8sResolver.CheckIsServiceIP(address)
}