Compare commits

..

5 Commits

Author SHA1 Message Date
Igor Gov
7f837fe947 Improving logs and cleaning un-referenced code (#295) 2021-09-22 11:14:20 +03:00
RoyUP9
02bd7883cb fixed general stats (#293) 2021-09-22 10:52:53 +03:00
RoyUP9
1841798646 Fix: analysis feature (#292) 2021-09-22 09:44:02 +03:00
Igor Gov
749bee6d55 Using rlog in agent and tapper & removing unreferenced code (#289)
* .
2021-09-22 06:24:19 +03:00
M. Mert Yıldıran
043b845c06 Fix some errors related to conversion to HAR (#286)
* Fix some errors related to conversion to HAR

* Fix the build error

* Fix the variable name

* Change to `Errorf`
2021-09-20 13:53:20 +03:00
8 changed files with 43 additions and 126 deletions

View File

@@ -61,10 +61,10 @@ func main() {
go filterItems(outputItemsChannel, filteredOutputItemsChannel, filteringOptions)
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
// go api.StartReadingOutbound(outboundLinkOutputChannel)
hostApi(nil)
} else if *tapperMode {
rlog.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
if *apiServerAddress == "" {
panic("API server address must be provided with --api-server-address when using --tap")
}
@@ -77,13 +77,13 @@ func main() {
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, err := shared.ConnectToSocketServer(*apiServerAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
socketConnection, err := utils.ConnectToSocketServer(*apiServerAddress)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
}
rlog.Infof("Connected successfully to websocket %s", *apiServerAddress)
go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
// go pipeOutboundLinksChannelToSocket(socketConnection, outboundLinkOutputChannel)
} else if *apiServerMode {
api.StartResolving(*namespace)
@@ -122,7 +122,7 @@ func loadExtensions() {
extensionsMap = make(map[string]*tapApi.Extension)
for i, file := range files {
filename := file.Name()
log.Printf("Loading extension: %s\n", filename)
rlog.Infof("Loading extension: %s\n", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
}
@@ -290,7 +290,7 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
for messageData := range messageDataChannel {
marshaledData, err := models.CreateWebsocketTappedEntryMessage(messageData)
if err != nil {
rlog.Infof("error converting message to json %s, (%v,%+v)\n", err, err, err)
rlog.Errorf("error converting message to json %v, err: %s, (%v,%+v)", messageData, err, err, err)
continue
}
@@ -298,26 +298,8 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
// and goes into the intermediate WebSocket.
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
if err != nil {
rlog.Infof("error sending message through socket server %s, (%v,%+v)\n", err, err, err)
rlog.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err)
continue
}
}
}
func pipeOutboundLinksChannelToSocket(connection *websocket.Conn, outboundLinkChannel <-chan *tap.OutboundLink) {
for outboundLink := range outboundLinkChannel {
if outboundLink.SuggestedProtocol == tap.TLSProtocol {
marshaledData, err := models.CreateWebsocketOutboundLinkMessage(outboundLink)
if err != nil {
rlog.Infof("Error converting outbound link to json %s, (%v,%+v)", err, err, err)
continue
}
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
if err != nil {
rlog.Infof("error sending outbound link message through socket server %s, (%v,%+v)", err, err, err)
continue
}
}
}
}

View File

@@ -7,7 +7,7 @@ import (
"fmt"
"mizuserver/pkg/database"
"mizuserver/pkg/holder"
"net/url"
"mizuserver/pkg/providers"
"os"
"path"
"sort"
@@ -18,7 +18,6 @@ import (
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/models"
@@ -59,8 +58,10 @@ func StartReadingEntries(harChannel <-chan *tapApi.OutputChannelItem, workingDir
}
func startReadingFiles(workingDir string) {
err := os.MkdirAll(workingDir, os.ModePerm)
utils.CheckErr(err)
if err := os.MkdirAll(workingDir, os.ModePerm); err != nil {
rlog.Errorf("Failed to make dir: %s, err: %v", workingDir, err)
return
}
for true {
dir, _ := os.Open(workingDir)
@@ -88,17 +89,6 @@ func startReadingFiles(workingDir string) {
decErr := json.NewDecoder(bufio.NewReader(file)).Decode(&inputHar)
utils.CheckErr(decErr)
// for _, entry := range inputHar.Log.Entries {
// time.Sleep(time.Millisecond * 250)
// // connectionInfo := &tap.ConnectionInfo{
// // ClientIP: fileInfo.Name(),
// // ClientPort: "",
// // ServerIP: "",
// // ServerPort: "",
// // IsOutgoing: false,
// // }
// // saveHarToDb(entry, connectionInfo)
// }
rmErr := os.Remove(inputFilePath)
utils.CheckErr(rmErr)
}
@@ -110,6 +100,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
}
for item := range outputItems {
providers.EntryAdded()
extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, primitive.NewObjectID().Hex(), resolvedSource, resolvedDestionation)
@@ -119,10 +111,12 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
if extension.Protocol.Name == "http" {
var pair tapApi.RequestResponsePair
json.Unmarshal([]byte(mizuEntry.Entry), &pair)
harEntry, _ := utils.NewEntry(&pair)
rules, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service)
baseEntry.Rules = rules
baseEntry.Latency = mizuEntry.ElapsedTime
harEntry, err := utils.NewEntry(&pair)
if err == nil {
rules, _ := models.RunValidationRulesState(*harEntry, mizuEntry.Service)
baseEntry.Rules = rules
baseEntry.Latency = mizuEntry.ElapsedTime
}
}
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(baseEntry)
@@ -130,13 +124,6 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
}
}
func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
// tcpStreamFactory will block on write to channel. Empty channel to unblock.
// TODO: Make write to channel optional.
for range outboundLinkChannel {
}
}
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) {
if k8sResolver != nil {
unresolvedSource := connectionInfo.ClientIP
@@ -159,12 +146,6 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
return resolvedSource, resolvedDestination
}
func getServiceNameFromUrl(inputUrl string) (string, string) {
parsed, err := url.Parse(inputUrl)
utils.CheckErr(err)
return fmt.Sprintf("%s://%s", parsed.Scheme, parsed.Host), parsed.Path
}
func CheckIsServiceIP(address string) bool {
if k8sResolver == nil {
return false

View File

@@ -61,7 +61,7 @@ func GetEntriesFromDb(timestampFrom int64, timestampTo int64, protocolName *stri
order := OrderDesc
protocolNameCondition := "1 = 1"
if protocolName != nil {
protocolNameCondition = fmt.Sprintf("protocolKey = '%s'", *protocolName)
protocolNameCondition = fmt.Sprintf("protocolName = '%s'", *protocolName)
}
var entries []tapApi.MizuEntry

View File

@@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"github.com/romana/rlog"
"reflect"
"regexp"
"strings"
@@ -65,7 +66,7 @@ func MatchRequestPolicy(harEntry har.Entry, service string) []RulesMatched {
if err != nil {
continue
}
fmt.Println(matchValue, rule.Value)
rlog.Info(matchValue, rule.Value)
} else {
val := fmt.Sprint(out)
matchValue, err = regexp.MatchString(rule.Value, val)

View File

@@ -4,16 +4,14 @@ import (
"bytes"
"errors"
"fmt"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
"strconv"
"strings"
"time"
"github.com/google/martian/har"
"github.com/up9inc/mizu/tap"
"github.com/up9inc/mizu/tap/api"
)
// Keep it because we might want cookies in the future
//func BuildCookies(rawCookies []interface{}) []har.Cookie {
// cookies := make([]har.Cookie, 0, len(rawCookies))
@@ -82,7 +80,7 @@ func BuildHeaders(rawHeaders []interface{}) ([]har.Header, string, string, strin
path = h["value"].(string)
}
if h["name"] == ":status" {
path = h["value"].(string)
status = h["value"].(string)
}
}
@@ -205,7 +203,7 @@ func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err e
if strings.HasPrefix(mimeType.(string), "application/grpc") {
status, err = strconv.Atoi(_status)
if err != nil {
tap.SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)", err, err, err)
rlog.Errorf("Failed converting status to int %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response status to int for HAR")
}
}
@@ -226,14 +224,13 @@ func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err e
func NewEntry(pair *api.RequestResponsePair) (*har.Entry, error) {
harRequest, err := NewRequest(&pair.Request)
if err != nil {
tap.SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)", err, err, err)
rlog.Errorf("Failed converting request to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting request to HAR")
}
harResponse, err := NewResponse(&pair.Response)
if err != nil {
fmt.Printf("err: %+v\n", err)
tap.SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)", err, err, err)
rlog.Errorf("Failed converting response to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response to HAR")
}

View File

@@ -1,8 +1,8 @@
package shared
package utils
import (
"fmt"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"time"
)
@@ -11,24 +11,23 @@ const (
DEFAULT_SOCKET_RETRY_SLEEP_TIME = time.Second * 10
)
func ConnectToSocketServer(address string, retries int, retrySleepTime time.Duration, hideTimeoutErrors bool) (*websocket.Conn, error) {
func ConnectToSocketServer(address string) (*websocket.Conn, error) {
var err error
var connection *websocket.Conn
try := 0
// Connection to server fails if client pod is up before server.
// Retries solve this issue.
for try < retries {
for try < DEFAULT_SOCKET_RETRIES {
rlog.Infof("Trying to connect to websocket: %s, attempt: %v/%v", address, try, DEFAULT_SOCKET_RETRIES)
connection, _, err = websocket.DefaultDialer.Dial(address, nil)
if err != nil {
rlog.Warnf("Failed connecting to websocket: %s, attempt: %v/%v, err: %s, (%v,%+v)", address, try, DEFAULT_SOCKET_RETRIES, err, err, err)
try++
if !hideTimeoutErrors {
fmt.Printf("Failed connecting to websocket server: %s, (%v,%+v)\n", err, err, err)
}
} else {
break
}
time.Sleep(retrySleepTime)
time.Sleep(DEFAULT_SOCKET_RETRY_SLEEP_TIME)
}
if err != nil {

View File

@@ -4,7 +4,6 @@ import (
"context"
"github.com/gin-gonic/gin"
"github.com/romana/rlog"
"log"
"net/http"
"net/url"
"os"
@@ -18,8 +17,8 @@ import (
func StartServer(app *gin.Engine) {
signals := make(chan os.Signal, 2)
signal.Notify(signals,
os.Interrupt, // this catch ctrl + c
syscall.SIGTSTP, // this catch ctrl + z
os.Interrupt, // this catch ctrl + c
syscall.SIGTSTP, // this catch ctrl + z
)
srv := &http.Server{
@@ -36,8 +35,9 @@ func StartServer(app *gin.Engine) {
}()
// Run server.
rlog.Infof("Starting the server...")
if err := app.Run(":8899"); err != nil {
log.Printf("Oops... Server is not running! Reason: %v", err)
rlog.Errorf("Server is not running! Reason: %v", err)
}
}
@@ -54,15 +54,14 @@ func ReverseSlice(data interface{}) {
func CheckErr(e error) {
if e != nil {
log.Printf("%v", e)
//panic(e)
rlog.Errorf("%v", e)
}
}
func SetHostname(address, newHostname string) string {
replacedUrl, err := url.Parse(address)
if err != nil{
log.Printf("error replacing hostname to %s in address %s, returning original %v",newHostname, address, err)
if err != nil {
rlog.Errorf("error replacing hostname to %s in address %s, returning original %v", newHostname, address, err)
return address
}
replacedUrl.Host = newHostname

View File

@@ -1,42 +0,0 @@
package mizu
import (
"encoding/json"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1"
"time"
)
type ControlSocket struct {
connection *websocket.Conn
}
func CreateControlSocket(socketServerAddress string) (*ControlSocket, error) {
connection, err := shared.ConnectToSocketServer(socketServerAddress, 30, 2 * time.Second, true)
if err != nil {
return nil, err
} else {
return &ControlSocket{connection: connection}, nil
}
}
func (controlSocket *ControlSocket) SendNewTappedPodsListMessage(pods []core.Pod) error {
podInfos := make([]shared.PodInfo, 0)
for _, pod := range pods {
podInfos = append(podInfos, shared.PodInfo{Name: pod.Name, Namespace: pod.Namespace})
}
tapStatus := shared.TapStatus{Pods: podInfos}
socketMessage := shared.CreateWebSocketStatusMessage(tapStatus)
jsonMessage, err := json.Marshal(socketMessage)
if err != nil {
return err
}
err = controlSocket.connection.WriteMessage(websocket.TextMessage, jsonMessage)
if err != nil {
return err
}
return nil
}