mirror of
https://github.com/kubeshark/kubeshark.git
synced 2026-03-02 18:59:27 +00:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3aafbd7e1c | ||
|
|
58e9363fda | ||
|
|
6a85ab53eb | ||
|
|
212e4687d8 | ||
|
|
167b17dfd2 | ||
|
|
9d179c7227 | ||
|
|
147e812edb | ||
|
|
91196bb306 | ||
|
|
26834a6e04 |
@@ -2,4 +2,9 @@ module github.com/up9inc/mizu/tests
|
||||
|
||||
go 1.16
|
||||
|
||||
require gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
|
||||
require (
|
||||
github.com/up9inc/mizu/shared v0.0.0
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
|
||||
)
|
||||
|
||||
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
|
||||
github.com/golang-jwt/jwt/v4 v4.1.0 h1:XUgk2Ex5veyVFVeLm0xhusUTQybEbexJXrvPNOKkSY0=
|
||||
github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
|
||||
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
||||
|
||||
@@ -12,12 +12,14 @@ import (
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/up9inc/mizu/shared"
|
||||
)
|
||||
|
||||
const (
|
||||
longRetriesCount = 100
|
||||
shortRetriesCount = 10
|
||||
defaultApiServerPort = 8899
|
||||
defaultApiServerPort = shared.DefaultApiServerPort
|
||||
defaultNamespaceName = "mizu-tests"
|
||||
defaultServiceName = "httpbin"
|
||||
defaultEntriesCount = 50
|
||||
|
||||
@@ -12,7 +12,8 @@ Basic APIs:
|
||||
`docker build . -t gcr.io/up9-docker-hub/mizu/debug:latest -f debug.Dockerfile && docker push gcr.io/up9-docker-hub/mizu/debug:latest`
|
||||
|
||||
### Connecting
|
||||
1. Start mizu using the cli with the debug image `mizu tap --mizu-image gcr.io/up9-docker-hub/mizu/debug:latest {tapped_pod_name}`
|
||||
1. Start mizu using the cli with the debug
|
||||
image `mizu tap --set agent-image=gcr.io/up9-docker-hub/mizu/debug:latest {tapped_pod_name}`
|
||||
2. Forward the debug port using `kubectl port-forward -n default mizu-api-server 2345:2345`
|
||||
3. Run the run/debug configuration you've created earlier in Intellij.
|
||||
|
||||
|
||||
@@ -4,15 +4,7 @@ import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/gin-contrib/static"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"mizuserver/pkg/api"
|
||||
"mizuserver/pkg/controllers"
|
||||
"mizuserver/pkg/models"
|
||||
@@ -26,6 +18,14 @@ import (
|
||||
"path/filepath"
|
||||
"plugin"
|
||||
"sort"
|
||||
|
||||
"github.com/gin-contrib/static"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API")
|
||||
@@ -40,6 +40,7 @@ var extensions []*tapApi.Extension // global
|
||||
var extensionsMap map[string]*tapApi.Extension // global
|
||||
|
||||
func main() {
|
||||
logger.InitLoggerStderrOnly()
|
||||
flag.Parse()
|
||||
loadExtensions()
|
||||
|
||||
@@ -63,7 +64,7 @@ func main() {
|
||||
|
||||
hostApi(nil)
|
||||
} else if *tapperMode {
|
||||
rlog.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
|
||||
logger.Log.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
|
||||
if *apiServerAddress == "" {
|
||||
panic("API server address must be provided with --api-server-address when using --tap")
|
||||
}
|
||||
@@ -71,7 +72,7 @@ func main() {
|
||||
tapTargets := getTapTargets()
|
||||
if tapTargets != nil {
|
||||
tap.SetFilterAuthorities(tapTargets)
|
||||
rlog.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
|
||||
logger.Log.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
|
||||
}
|
||||
|
||||
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
|
||||
@@ -84,7 +85,7 @@ func main() {
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
|
||||
}
|
||||
rlog.Infof("Connected successfully to websocket %s", *apiServerAddress)
|
||||
logger.Log.Infof("Connected successfully to websocket %s", *apiServerAddress)
|
||||
|
||||
go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
|
||||
} else if *apiServerMode {
|
||||
@@ -117,7 +118,7 @@ func main() {
|
||||
signal.Notify(signalChan, os.Interrupt)
|
||||
<-signalChan
|
||||
|
||||
rlog.Info("Exiting")
|
||||
logger.Log.Info("Exiting")
|
||||
}
|
||||
|
||||
func loadExtensions() {
|
||||
@@ -126,13 +127,13 @@ func loadExtensions() {
|
||||
|
||||
files, err := ioutil.ReadDir(extensionsDir)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
logger.Log.Fatal(err)
|
||||
}
|
||||
extensions = make([]*tapApi.Extension, len(files))
|
||||
extensionsMap = make(map[string]*tapApi.Extension)
|
||||
for i, file := range files {
|
||||
filename := file.Name()
|
||||
rlog.Infof("Loading extension: %s\n", filename)
|
||||
logger.Log.Infof("Loading extension: %s\n", filename)
|
||||
extension := &tapApi.Extension{
|
||||
Path: path.Join(extensionsDir, filename),
|
||||
}
|
||||
@@ -157,7 +158,7 @@ func loadExtensions() {
|
||||
})
|
||||
|
||||
for _, extension := range extensions {
|
||||
log.Printf("Extension Properties: %+v\n", extension)
|
||||
logger.Log.Infof("Extension Properties: %+v\n", extension)
|
||||
}
|
||||
|
||||
controllers.InitExtensionsMap(extensionsMap)
|
||||
@@ -274,7 +275,7 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
|
||||
for messageData := range messageDataChannel {
|
||||
marshaledData, err := models.CreateWebsocketTappedEntryMessage(messageData)
|
||||
if err != nil {
|
||||
rlog.Errorf("error converting message to json %v, err: %s, (%v,%+v)", messageData, err, err, err)
|
||||
logger.Log.Errorf("error converting message to json %v, err: %s, (%v,%+v)", messageData, err, err, err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -282,7 +283,7 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
|
||||
// and goes into the intermediate WebSocket.
|
||||
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
|
||||
if err != nil {
|
||||
rlog.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err)
|
||||
logger.Log.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ import (
|
||||
"go.mongodb.org/mongo-driver/bson/primitive"
|
||||
|
||||
"github.com/google/martian/har"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
|
||||
"mizuserver/pkg/models"
|
||||
@@ -31,7 +31,7 @@ func StartResolving(namespace string) {
|
||||
errOut := make(chan error, 100)
|
||||
res, err := resolver.NewFromInCluster(errOut, namespace)
|
||||
if err != nil {
|
||||
rlog.Infof("error creating k8s resolver %s", err)
|
||||
logger.Log.Infof("error creating k8s resolver %s", err)
|
||||
return
|
||||
}
|
||||
ctx := context.Background()
|
||||
@@ -40,7 +40,7 @@ func StartResolving(namespace string) {
|
||||
for {
|
||||
select {
|
||||
case err := <-errOut:
|
||||
rlog.Infof("name resolving error %s", err)
|
||||
logger.Log.Infof("name resolving error %s", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -59,7 +59,7 @@ func StartReadingEntries(harChannel <-chan *tapApi.OutputChannelItem, workingDir
|
||||
|
||||
func startReadingFiles(workingDir string) {
|
||||
if err := os.MkdirAll(workingDir, os.ModePerm); err != nil {
|
||||
rlog.Errorf("Failed to make dir: %s, err: %v", workingDir, err)
|
||||
logger.Log.Errorf("Failed to make dir: %s, err: %v", workingDir, err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -76,7 +76,7 @@ func startReadingFiles(workingDir string) {
|
||||
sort.Sort(utils.ByModTime(harFiles))
|
||||
|
||||
if len(harFiles) == 0 {
|
||||
rlog.Infof("Waiting for new files\n")
|
||||
logger.Log.Infof("Waiting for new files\n")
|
||||
time.Sleep(3 * time.Second)
|
||||
continue
|
||||
}
|
||||
@@ -128,7 +128,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
|
||||
unresolvedSource := connectionInfo.ClientIP
|
||||
resolvedSource = k8sResolver.Resolve(unresolvedSource)
|
||||
if resolvedSource == "" {
|
||||
rlog.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
|
||||
logger.Log.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
|
||||
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
|
||||
return
|
||||
}
|
||||
@@ -136,7 +136,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
|
||||
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
|
||||
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
|
||||
if resolvedDestination == "" {
|
||||
rlog.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
|
||||
logger.Log.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
|
||||
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -2,13 +2,14 @@ package api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared/debounce"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/up9inc/mizu/shared/debounce"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
type EventHandlers interface {
|
||||
@@ -50,7 +51,7 @@ func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers) {
|
||||
func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool) {
|
||||
conn, err := websocketUpgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
rlog.Errorf("Failed to set websocket upgrade: %v", err)
|
||||
logger.Log.Errorf("Failed to set websocket upgrade: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -71,7 +72,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
for {
|
||||
_, msg, err := conn.ReadMessage()
|
||||
if err != nil {
|
||||
rlog.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
|
||||
logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
|
||||
break
|
||||
}
|
||||
eventHandlers.WebSocketMessage(socketId, msg)
|
||||
@@ -81,7 +82,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
|
||||
func socketCleanup(socketId int, socketConnection *SocketConnection) {
|
||||
err := socketConnection.connection.Close()
|
||||
if err != nil {
|
||||
rlog.Errorf("Error closing socket connection for socket id %d: %v\n", socketId, err)
|
||||
logger.Log.Errorf("Error closing socket connection for socket id %d: %v\n", socketId, err)
|
||||
}
|
||||
|
||||
websocketIdsLock.Lock()
|
||||
@@ -92,7 +93,7 @@ func socketCleanup(socketId int, socketConnection *SocketConnection) {
|
||||
}
|
||||
|
||||
var db = debounce.NewDebouncer(time.Second*5, func() {
|
||||
rlog.Error("Successfully sent to socket")
|
||||
logger.Log.Error("Successfully sent to socket")
|
||||
})
|
||||
|
||||
func SendToSocket(socketId int, message []byte) error {
|
||||
@@ -104,7 +105,7 @@ func SendToSocket(socketId int, message []byte) error {
|
||||
var sent = false
|
||||
time.AfterFunc(time.Second*5, func() {
|
||||
if !sent {
|
||||
rlog.Error("Socket timed out")
|
||||
logger.Log.Error("Socket timed out")
|
||||
socketCleanup(socketId, socketObj)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -10,8 +10,8 @@ import (
|
||||
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
var browserClientSocketUUIDs = make([]int, 0)
|
||||
@@ -28,10 +28,10 @@ func init() {
|
||||
|
||||
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||
if isTapper {
|
||||
rlog.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
|
||||
logger.Log.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
|
||||
providers.TapperAdded()
|
||||
} else {
|
||||
rlog.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
||||
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
|
||||
socketListLock.Lock()
|
||||
browserClientSocketUUIDs = append(browserClientSocketUUIDs, socketId)
|
||||
socketListLock.Unlock()
|
||||
@@ -40,10 +40,10 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
|
||||
|
||||
func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
|
||||
if isTapper {
|
||||
rlog.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
|
||||
logger.Log.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
|
||||
providers.TapperRemoved()
|
||||
} else {
|
||||
rlog.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
|
||||
logger.Log.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
|
||||
socketListLock.Lock()
|
||||
removeSocketUUIDFromBrowserSlice(socketId)
|
||||
socketListLock.Unlock()
|
||||
@@ -55,7 +55,7 @@ func BroadcastToBrowserClients(message []byte) {
|
||||
go func(socketId int) {
|
||||
err := SendToSocket(socketId, message)
|
||||
if err != nil {
|
||||
rlog.Errorf("error sending message to socket ID %d: %v", socketId, err)
|
||||
logger.Log.Errorf("error sending message to socket ID %d: %v", socketId, err)
|
||||
}
|
||||
}(socketId)
|
||||
}
|
||||
@@ -65,14 +65,14 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
var socketMessageBase shared.WebSocketMessageMetadata
|
||||
err := json.Unmarshal(message, &socketMessageBase)
|
||||
if err != nil {
|
||||
rlog.Infof("Could not unmarshal websocket message %v\n", err)
|
||||
logger.Log.Infof("Could not unmarshal websocket message %v\n", err)
|
||||
} else {
|
||||
switch socketMessageBase.MessageType {
|
||||
case shared.WebSocketMessageTypeTappedEntry:
|
||||
var tappedEntryMessage models.WebSocketTappedEntryMessage
|
||||
err := json.Unmarshal(message, &tappedEntryMessage)
|
||||
if err != nil {
|
||||
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
|
||||
} else {
|
||||
// NOTE: This is where the message comes back from the intermediate WebSocket to code.
|
||||
h.SocketOutChannel <- tappedEntryMessage.Data
|
||||
@@ -81,7 +81,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
var statusMessage shared.WebSocketStatusMessage
|
||||
err := json.Unmarshal(message, &statusMessage)
|
||||
if err != nil {
|
||||
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
|
||||
} else {
|
||||
providers.TapStatus.Pods = statusMessage.TappingStatus.Pods
|
||||
BroadcastToBrowserClients(message)
|
||||
@@ -90,12 +90,12 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
|
||||
var outboundLinkMessage models.WebsocketOutboundLinkMessage
|
||||
err := json.Unmarshal(message, &outboundLinkMessage)
|
||||
if err != nil {
|
||||
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
|
||||
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
|
||||
} else {
|
||||
handleTLSLink(outboundLinkMessage)
|
||||
}
|
||||
default:
|
||||
rlog.Infof("Received socket message of type %s for which no handlers are defined", socketMessageBase.MessageType)
|
||||
logger.Log.Infof("Received socket message of type %s for which no handlers are defined", socketMessageBase.MessageType)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -116,9 +116,9 @@ func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
|
||||
}
|
||||
marshaledMessage, err := json.Marshal(outboundLinkMessage)
|
||||
if err != nil {
|
||||
rlog.Errorf("Error marshaling outbound link message for broadcasting: %v", err)
|
||||
logger.Log.Errorf("Error marshaling outbound link message for broadcasting: %v", err)
|
||||
} else {
|
||||
rlog.Errorf("Broadcasting outboundlink message %s", string(marshaledMessage))
|
||||
logger.Log.Errorf("Broadcasting outboundlink message %s", string(marshaledMessage))
|
||||
BroadcastToBrowserClients(marshaledMessage)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,15 +2,16 @@ package controllers
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"mizuserver/pkg/api"
|
||||
"mizuserver/pkg/holder"
|
||||
"mizuserver/pkg/providers"
|
||||
"mizuserver/pkg/up9"
|
||||
"mizuserver/pkg/validation"
|
||||
"net/http"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
func PostTappedPods(c *gin.Context) {
|
||||
@@ -23,11 +24,11 @@ func PostTappedPods(c *gin.Context) {
|
||||
c.JSON(http.StatusBadRequest, err)
|
||||
return
|
||||
}
|
||||
rlog.Infof("[Status] POST request: %d tapped pods", len(tapStatus.Pods))
|
||||
logger.Log.Infof("[Status] POST request: %d tapped pods", len(tapStatus.Pods))
|
||||
providers.TapStatus.Pods = tapStatus.Pods
|
||||
message := shared.CreateWebSocketStatusMessage(*tapStatus)
|
||||
if jsonBytes, err := json.Marshal(message); err != nil {
|
||||
rlog.Errorf("Could not Marshal message %v\n", err)
|
||||
logger.Log.Errorf("Could not Marshal message %v\n", err)
|
||||
} else {
|
||||
api.BroadcastToBrowserClients(jsonBytes)
|
||||
}
|
||||
|
||||
@@ -1,15 +1,14 @@
|
||||
package database
|
||||
|
||||
import (
|
||||
"log"
|
||||
"os"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/debounce"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/shared/units"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
@@ -20,13 +19,13 @@ const defaultMaxDatabaseSizeBytes int64 = 200 * 1000 * 1000
|
||||
func StartEnforcingDatabaseSize() {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
log.Fatalf("Error creating filesystem watcher for db size enforcement: %v\n", err)
|
||||
logger.Log.Fatalf("Error creating filesystem watcher for db size enforcement: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
maxEntriesDBByteSize, err := getMaxEntriesDBByteSize()
|
||||
if err != nil {
|
||||
log.Fatalf("Error parsing max db size: %v\n", err)
|
||||
logger.Log.Fatalf("Error parsing max db size: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -48,14 +47,14 @@ func StartEnforcingDatabaseSize() {
|
||||
if !ok {
|
||||
return // closed channel
|
||||
}
|
||||
rlog.Errorf("filesystem watcher encountered error:%v", err)
|
||||
logger.Log.Errorf("filesystem watcher encountered error:%v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
err = watcher.Add(DBPath)
|
||||
if err != nil {
|
||||
log.Fatalf("Error adding %s to filesystem watcher for db size enforcement: %v\n", DBPath, err)
|
||||
logger.Log.Fatalf("Error adding %s to filesystem watcher for db size enforcement: %v\n", DBPath, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,7 +72,7 @@ func getMaxEntriesDBByteSize() (int64, error) {
|
||||
func checkFileSize(maxSizeBytes int64) {
|
||||
fileStat, err := os.Stat(DBPath)
|
||||
if err != nil {
|
||||
rlog.Errorf("Error checking %s file size: %v", DBPath, err)
|
||||
logger.Log.Errorf("Error checking %s file size: %v", DBPath, err)
|
||||
} else {
|
||||
if fileStat.Size() > maxSizeBytes {
|
||||
pruneOldEntries(fileStat.Size())
|
||||
@@ -90,7 +89,7 @@ func pruneOldEntries(currentFileSize int64) {
|
||||
|
||||
rows, err := GetEntriesTable().Limit(10000).Order("id").Rows()
|
||||
if err != nil {
|
||||
rlog.Errorf("Error getting 10000 first db rows: %v", err)
|
||||
logger.Log.Errorf("Error getting 10000 first db rows: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -103,7 +102,7 @@ func pruneOldEntries(currentFileSize int64) {
|
||||
var entry tapApi.MizuEntry
|
||||
err = DB.ScanRows(rows, &entry)
|
||||
if err != nil {
|
||||
rlog.Errorf("Error scanning db row: %v", err)
|
||||
logger.Log.Errorf("Error scanning db row: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -115,8 +114,8 @@ func pruneOldEntries(currentFileSize int64) {
|
||||
GetEntriesTable().Where(entryIdsToRemove).Delete(tapApi.MizuEntry{})
|
||||
// VACUUM causes sqlite to shrink the db file after rows have been deleted, the db file will not shrink without this
|
||||
DB.Exec("VACUUM")
|
||||
rlog.Errorf("Removed %d rows and cleared %s", len(entryIdsToRemove), units.BytesToHumanReadable(bytesToBeRemoved))
|
||||
logger.Log.Errorf("Removed %d rows and cleared %s", len(entryIdsToRemove), units.BytesToHumanReadable(bytesToBeRemoved))
|
||||
} else {
|
||||
rlog.Error("Found no rows to remove when pruning")
|
||||
logger.Log.Error("Found no rows to remove when pruning")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,7 +32,7 @@ Now you will be able to import `github.com/up9inc/mizu/resolver` in any `.go` fi
|
||||
errOut := make(chan error, 100)
|
||||
k8sResolver, err := resolver.NewFromOutOfCluster("", errOut)
|
||||
if err != nil {
|
||||
rlog.Errorf("error creating k8s resolver %s", err)
|
||||
logger.Log.Errorf("error creating k8s resolver %s", err)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@@ -40,15 +40,15 @@ k8sResolver.Start(ctx)
|
||||
|
||||
resolvedName := k8sResolver.Resolve("10.107.251.91") // will always return `nil` in real scenarios as the internal map takes a moment to populate after `Start` is called
|
||||
if resolvedName != nil {
|
||||
rlog.Errorf("resolved 10.107.251.91=%s", *resolvedName)
|
||||
logger.Log.Errorf("resolved 10.107.251.91=%s", *resolvedName)
|
||||
} else {
|
||||
rlog.Error("Could not find a resolved name for 10.107.251.91")
|
||||
logger.Log.Error("Could not find a resolved name for 10.107.251.91")
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case err := <- errOut:
|
||||
rlog.Errorf("name resolving error %s", err)
|
||||
logger.Log.Errorf("name resolving error %s", err)
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
k8serrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
|
||||
cmap "github.com/orcaman/concurrent-map"
|
||||
@@ -157,10 +157,10 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
|
||||
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
|
||||
if eventType == watch.Deleted {
|
||||
resolver.nameMap.Remove(key)
|
||||
rlog.Infof("setting %s=nil\n", key)
|
||||
logger.Log.Infof("setting %s=nil\n", key)
|
||||
} else {
|
||||
resolver.nameMap.Set(key, resolved)
|
||||
rlog.Infof("setting %s=%s\n", key, resolved)
|
||||
logger.Log.Infof("setting %s=%s\n", key, resolved)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -181,7 +181,7 @@ func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun
|
||||
var statusError *k8serrors.StatusError
|
||||
if errors.As(err, &statusError) {
|
||||
if statusError.ErrStatus.Reason == metav1.StatusReasonForbidden {
|
||||
rlog.Infof("Resolver loop encountered permission error, aborting event listening - %v\n", err)
|
||||
logger.Log.Infof("Resolver loop encountered permission error, aborting event listening - %v\n", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -8,7 +8,7 @@ import (
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
|
||||
"github.com/google/martian/har"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
@@ -69,7 +69,7 @@ func MatchRequestPolicy(harEntry har.Entry, service string) (resultPolicyToSend
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
rlog.Info(matchValue, rule.Value)
|
||||
logger.Log.Info(matchValue, rule.Value)
|
||||
} else {
|
||||
val := fmt.Sprint(out)
|
||||
matchValue, err = regexp.MatchString(rule.Value, val)
|
||||
|
||||
@@ -5,12 +5,7 @@ import (
|
||||
"compress/zlib"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"github.com/google/martian/har"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"mizuserver/pkg/database"
|
||||
"mizuserver/pkg/utils"
|
||||
"net/http"
|
||||
@@ -18,6 +13,11 @@ import (
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/martian/har"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
tapApi "github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -75,9 +75,6 @@ func getAuthHeader(guestMode bool) string {
|
||||
|
||||
func GetTrafficDumpUrl(analyzeDestination string, analyzeModel string) *url.URL {
|
||||
strUrl := fmt.Sprintf("https://traffic.%s/dumpTrafficBulk/%s", analyzeDestination, analyzeModel)
|
||||
if strings.HasPrefix(analyzeDestination, "http") {
|
||||
strUrl = fmt.Sprintf("%s/api/workspace/dumpTrafficBulk", analyzeDestination)
|
||||
}
|
||||
postUrl, _ := url.Parse(strUrl)
|
||||
return postUrl
|
||||
}
|
||||
@@ -112,14 +109,14 @@ func GetAnalyzeInfo() *shared.AnalyzeStatus {
|
||||
}
|
||||
|
||||
func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
|
||||
rlog.Infof("Sync entries - started\n")
|
||||
logger.Log.Infof("Sync entries - started\n")
|
||||
|
||||
var (
|
||||
token, model string
|
||||
guestMode bool
|
||||
)
|
||||
if syncEntriesConfig.Token == "" {
|
||||
rlog.Infof("Sync entries - creating anonymous token. env %s\n", syncEntriesConfig.Env)
|
||||
logger.Log.Infof("Sync entries - creating anonymous token. env %s\n", syncEntriesConfig.Env)
|
||||
guestToken, err := createAnonymousToken(syncEntriesConfig.Env)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed creating anonymous token, err: %v", err)
|
||||
@@ -132,6 +129,11 @@ func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
|
||||
token = fmt.Sprintf("bearer %s", syncEntriesConfig.Token)
|
||||
model = syncEntriesConfig.Workspace
|
||||
guestMode = false
|
||||
|
||||
logger.Log.Infof("Sync entries - upserting model. env %s, model %s\n", syncEntriesConfig.Env, model)
|
||||
if err := upsertModel(token, model, syncEntriesConfig.Env); err != nil {
|
||||
return fmt.Errorf("failed upserting model, err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
modelRegex, _ := regexp.Compile("[A-Za-z0-9][-A-Za-z0-9_.]*[A-Za-z0-9]+$")
|
||||
@@ -139,12 +141,37 @@ func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
|
||||
return fmt.Errorf("invalid model name, model name: %s", model)
|
||||
}
|
||||
|
||||
rlog.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v\n", token, model, guestMode)
|
||||
logger.Log.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v\n", token, model, guestMode)
|
||||
go syncEntriesImpl(token, model, syncEntriesConfig.Env, syncEntriesConfig.UploadIntervalSec, guestMode)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func upsertModel(token string, model string, envPrefix string) error {
|
||||
upsertModelUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/models/%s", envPrefix, model))
|
||||
|
||||
authHeader := getAuthHeader(false)
|
||||
req := &http.Request{
|
||||
Method: http.MethodPost,
|
||||
URL: upsertModelUrl,
|
||||
Header: map[string][]string{
|
||||
authHeader: {token},
|
||||
},
|
||||
}
|
||||
|
||||
response, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed request to upsert model, err: %v", err)
|
||||
}
|
||||
|
||||
// In case the model is not created (not 201) and doesn't exists (not 409)
|
||||
if response.StatusCode != 201 && response.StatusCode != 409 {
|
||||
return fmt.Errorf("failed request to upsert model, status code: %v", response.StatusCode)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func createAnonymousToken(envPrefix string) (*GuestToken, error) {
|
||||
tokenUrl := fmt.Sprintf("https://trcc.%s/anonymous/token", envPrefix)
|
||||
if strings.HasPrefix(envPrefix, "http") {
|
||||
@@ -152,7 +179,7 @@ func createAnonymousToken(envPrefix string) (*GuestToken, error) {
|
||||
}
|
||||
token := &GuestToken{}
|
||||
if err := getGuestToken(tokenUrl, token); err != nil {
|
||||
rlog.Infof("Failed to get token, %s", err)
|
||||
logger.Log.Infof("Failed to get token, %s", err)
|
||||
return nil, err
|
||||
}
|
||||
return token, nil
|
||||
@@ -164,7 +191,7 @@ func getGuestToken(url string, target *GuestToken) error {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
rlog.Infof("Got token from the server, starting to json decode... status code: %v", resp.StatusCode)
|
||||
logger.Log.Infof("Got token from the server, starting to json decode... status code: %v", resp.StatusCode)
|
||||
return json.NewDecoder(resp.Body).Decode(target)
|
||||
}
|
||||
|
||||
@@ -182,7 +209,7 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
||||
|
||||
for {
|
||||
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
|
||||
rlog.Infof("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
|
||||
logger.Log.Infof("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
|
||||
protocolFilter := "http"
|
||||
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, &protocolFilter)
|
||||
|
||||
@@ -207,13 +234,13 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
||||
result = append(result, *harEntry)
|
||||
}
|
||||
|
||||
rlog.Infof("About to upload %v entries\n", len(result))
|
||||
logger.Log.Infof("About to upload %v entries\n", len(result))
|
||||
|
||||
body, jMarshalErr := json.Marshal(result)
|
||||
if jMarshalErr != nil {
|
||||
analyzeInformation.Reset()
|
||||
rlog.Infof("Stopping sync entries")
|
||||
log.Fatal(jMarshalErr)
|
||||
logger.Log.Infof("Stopping sync entries")
|
||||
logger.Log.Fatal(jMarshalErr)
|
||||
}
|
||||
|
||||
var in bytes.Buffer
|
||||
@@ -236,17 +263,17 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
|
||||
|
||||
if _, postErr := http.DefaultClient.Do(req); postErr != nil {
|
||||
analyzeInformation.Reset()
|
||||
rlog.Info("Stopping sync entries")
|
||||
log.Fatal(postErr)
|
||||
logger.Log.Info("Stopping sync entries")
|
||||
logger.Log.Fatal(postErr)
|
||||
}
|
||||
analyzeInformation.SentCount += len(entriesArray)
|
||||
rlog.Infof("Finish uploading %v entries to %s\n", len(entriesArray), GetTrafficDumpUrl(envPrefix, model))
|
||||
logger.Log.Infof("Finish uploading %v entries to %s\n", len(entriesArray), GetTrafficDumpUrl(envPrefix, model))
|
||||
|
||||
} else {
|
||||
rlog.Infof("Nothing to upload")
|
||||
logger.Log.Infof("Nothing to upload")
|
||||
}
|
||||
|
||||
rlog.Infof("Sleeping for %v...\n", sleepTime)
|
||||
logger.Log.Infof("Sleeping for %v...\n", sleepTime)
|
||||
time.Sleep(sleepTime)
|
||||
timestampFrom = timestampTo
|
||||
}
|
||||
|
||||
@@ -4,12 +4,13 @@ import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/google/martian/har"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/martian/har"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
// Keep it because we might want cookies in the future
|
||||
@@ -203,7 +204,7 @@ func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err e
|
||||
if strings.HasPrefix(mimeType.(string), "application/grpc") {
|
||||
status, err = strconv.Atoi(_status)
|
||||
if err != nil {
|
||||
rlog.Errorf("Failed converting status to int %s (%v,%+v)", err, err, err)
|
||||
logger.Log.Errorf("Failed converting status to int %s (%v,%+v)", err, err, err)
|
||||
return nil, errors.New("failed converting response status to int for HAR")
|
||||
}
|
||||
}
|
||||
@@ -224,13 +225,13 @@ func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err e
|
||||
func NewEntry(pair *api.RequestResponsePair) (*har.Entry, error) {
|
||||
harRequest, err := NewRequest(&pair.Request)
|
||||
if err != nil {
|
||||
rlog.Errorf("Failed converting request to HAR %s (%v,%+v)", err, err, err)
|
||||
logger.Log.Errorf("Failed converting request to HAR %s (%v,%+v)", err, err, err)
|
||||
return nil, errors.New("failed converting request to HAR")
|
||||
}
|
||||
|
||||
harResponse, err := NewResponse(&pair.Response)
|
||||
if err != nil {
|
||||
rlog.Errorf("Failed converting response to HAR %s (%v,%+v)", err, err, err)
|
||||
logger.Log.Errorf("Failed converting response to HAR %s (%v,%+v)", err, err, err)
|
||||
return nil, errors.New("failed converting response to HAR")
|
||||
}
|
||||
|
||||
|
||||
@@ -3,10 +3,11 @@ package utils
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/romana/rlog"
|
||||
"time"
|
||||
|
||||
loggerShared "github.com/up9inc/mizu/shared/logger"
|
||||
"gorm.io/gorm/logger"
|
||||
"gorm.io/gorm/utils"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TruncatingLogger implements the gorm logger.Interface interface. Its purpose is to act as gorm's logger while truncating logs to a max of 50 characters to minimise the performance impact
|
||||
@@ -24,21 +25,21 @@ func (truncatingLogger *TruncatingLogger) Info(_ context.Context, message string
|
||||
if truncatingLogger.LogLevel < logger.Info {
|
||||
return
|
||||
}
|
||||
rlog.Errorf("gorm info: %.150s", message)
|
||||
loggerShared.Log.Errorf("gorm info: %.150s", message)
|
||||
}
|
||||
|
||||
func (truncatingLogger *TruncatingLogger) Warn(_ context.Context, message string, __ ...interface{}) {
|
||||
if truncatingLogger.LogLevel < logger.Warn {
|
||||
return
|
||||
}
|
||||
rlog.Errorf("gorm warning: %.150s", message)
|
||||
loggerShared.Log.Errorf("gorm warning: %.150s", message)
|
||||
}
|
||||
|
||||
func (truncatingLogger *TruncatingLogger) Error(_ context.Context, message string, __ ...interface{}) {
|
||||
if truncatingLogger.LogLevel < logger.Error {
|
||||
return
|
||||
}
|
||||
rlog.Errorf("gorm error: %.150s", message)
|
||||
loggerShared.Log.Errorf("gorm error: %.150s", message)
|
||||
}
|
||||
|
||||
func (truncatingLogger *TruncatingLogger) Trace(ctx context.Context, begin time.Time, fc func() (string, int64), err error) {
|
||||
|
||||
@@ -2,8 +2,7 @@ package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/romana/rlog"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
@@ -11,6 +10,10 @@ import (
|
||||
"reflect"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/gin-gonic/gin"
|
||||
"github.com/up9inc/mizu/shared"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
// StartServer starts the server with a graceful shutdown
|
||||
@@ -28,16 +31,16 @@ func StartServer(app *gin.Engine) {
|
||||
|
||||
go func() {
|
||||
_ = <-signals
|
||||
rlog.Infof("Shutting down...")
|
||||
logger.Log.Infof("Shutting down...")
|
||||
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
_ = srv.Shutdown(ctx)
|
||||
os.Exit(0)
|
||||
}()
|
||||
|
||||
// Run server.
|
||||
rlog.Infof("Starting the server...")
|
||||
if err := app.Run(":8899"); err != nil {
|
||||
rlog.Errorf("Server is not running! Reason: %v", err)
|
||||
logger.Log.Infof("Starting the server...")
|
||||
if err := app.Run(fmt.Sprintf(":%d", shared.DefaultApiServerPort)); err != nil {
|
||||
logger.Log.Errorf("Server is not running! Reason: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -54,14 +57,14 @@ func ReverseSlice(data interface{}) {
|
||||
|
||||
func CheckErr(e error) {
|
||||
if e != nil {
|
||||
rlog.Errorf("%v", e)
|
||||
logger.Log.Errorf("%v", e)
|
||||
}
|
||||
}
|
||||
|
||||
func SetHostname(address, newHostname string) string {
|
||||
replacedUrl, err := url.Parse(address)
|
||||
if err != nil {
|
||||
rlog.Errorf("error replacing hostname to %s in address %s, returning original %v", newHostname, address, err)
|
||||
logger.Log.Errorf("error replacing hostname to %s in address %s, returning original %v", newHostname, address, err)
|
||||
return address
|
||||
}
|
||||
replacedUrl.Host = newHostname
|
||||
|
||||
@@ -193,6 +193,8 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
|
||||
command = append(command, "--namespace", opts.Namespace)
|
||||
}
|
||||
|
||||
port := intstr.FromInt(shared.DefaultApiServerPort)
|
||||
|
||||
pod := &core.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: opts.PodName,
|
||||
@@ -232,6 +234,25 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
|
||||
"memory": memRequests,
|
||||
},
|
||||
},
|
||||
ReadinessProbe: &core.Probe{
|
||||
Handler: core.Handler{
|
||||
TCPSocket: &core.TCPSocketAction{
|
||||
Port: port,
|
||||
},
|
||||
},
|
||||
InitialDelaySeconds: 5,
|
||||
PeriodSeconds: 10,
|
||||
},
|
||||
LivenessProbe: &core.Probe{
|
||||
Handler: core.Handler{
|
||||
HTTPGet: &core.HTTPGetAction{
|
||||
Path: "/echo",
|
||||
Port: port,
|
||||
},
|
||||
},
|
||||
InitialDelaySeconds: 5,
|
||||
PeriodSeconds: 10,
|
||||
},
|
||||
},
|
||||
},
|
||||
Volumes: []core.Volume{
|
||||
@@ -260,7 +281,7 @@ func (provider *Provider) CreateService(ctx context.Context, namespace string, s
|
||||
Namespace: namespace,
|
||||
},
|
||||
Spec: core.ServiceSpec{
|
||||
Ports: []core.ServicePort{{TargetPort: intstr.FromInt(8899), Port: 80}},
|
||||
Ports: []core.ServicePort{{TargetPort: intstr.FromInt(shared.DefaultApiServerPort), Port: 80}},
|
||||
Type: core.ServiceTypeClusterIP,
|
||||
Selector: map[string]string{"app": appLabelValue},
|
||||
},
|
||||
|
||||
@@ -3,9 +3,10 @@ package uiUtils
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
)
|
||||
|
||||
func AskForConfirmation(s string) bool {
|
||||
@@ -15,7 +16,7 @@ func AskForConfirmation(s string) bool {
|
||||
|
||||
response, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
logger.Log.Fatalf("Error while reading confirmation string, err: %v", err)
|
||||
}
|
||||
response = strings.ToLower(strings.TrimSpace(response))
|
||||
if response == "" || response == "y" || response == "yes" {
|
||||
|
||||
@@ -10,4 +10,5 @@ const (
|
||||
RulePolicyPath = "/app/enforce-policy/"
|
||||
RulePolicyFileName = "enforce-policy.yaml"
|
||||
GoGCEnvVar = "GOGC"
|
||||
DefaultApiServerPort = 8899
|
||||
)
|
||||
|
||||
@@ -28,3 +28,14 @@ func InitLogger(logPath string) {
|
||||
|
||||
logging.SetBackend(backend1Leveled, backend2Formatter)
|
||||
}
|
||||
|
||||
func InitLoggerStderrOnly() {
|
||||
consoleLog := logging.NewLogBackend(os.Stderr, "", 0)
|
||||
|
||||
backend1Formatter := logging.NewBackendFormatter(consoleLog, format)
|
||||
|
||||
backend1Leveled := logging.AddModuleLevel(consoleLog)
|
||||
backend1Leveled.SetLevel(logging.DEBUG, "")
|
||||
|
||||
logging.SetBackend(backend1Leveled, backend1Formatter)
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/gopacket/reassembly"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
@@ -28,7 +28,7 @@ func (cl *Cleaner) clean() {
|
||||
startCleanTime := time.Now()
|
||||
|
||||
cl.assemblerMutex.Lock()
|
||||
rlog.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump())
|
||||
logger.Log.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump())
|
||||
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
|
||||
cl.assemblerMutex.Unlock()
|
||||
|
||||
@@ -38,7 +38,7 @@ func (cl *Cleaner) clean() {
|
||||
}
|
||||
|
||||
cl.statsMutex.Lock()
|
||||
rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())
|
||||
logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())
|
||||
cl.stats.flushed += flushed
|
||||
cl.stats.closed += closed
|
||||
cl.statsMutex.Unlock()
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
@@ -25,14 +24,13 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
|
||||
"github.com/google/gopacket"
|
||||
"github.com/google/gopacket/examples/util"
|
||||
"github.com/google/gopacket/ip4defrag"
|
||||
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
||||
"github.com/google/gopacket/pcap"
|
||||
"github.com/google/gopacket/reassembly"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
@@ -102,7 +100,7 @@ const baseStreamChannelTimeoutMs int = 5000 * 100
|
||||
|
||||
/* minOutputLevel: Error will be printed only if outputLevel is above this value
|
||||
* t: key for errorsMap (counting errors)
|
||||
* s, a: arguments log.Printf
|
||||
* s, a: arguments logger.Log.Infof
|
||||
* Note: Too bad for perf that a... is evaluated
|
||||
*/
|
||||
func logError(minOutputLevel int, t string, s string, a ...interface{}) {
|
||||
@@ -114,7 +112,7 @@ func logError(minOutputLevel int, t string, s string, a ...interface{}) {
|
||||
|
||||
if outputLevel >= minOutputLevel {
|
||||
formatStr := fmt.Sprintf("%s: %s", t, s)
|
||||
rlog.Errorf(formatStr, a...)
|
||||
logger.Log.Errorf(formatStr, a...)
|
||||
}
|
||||
}
|
||||
func Error(t string, s string, a ...interface{}) {
|
||||
@@ -124,10 +122,10 @@ func SilentError(t string, s string, a ...interface{}) {
|
||||
logError(2, t, s, a...)
|
||||
}
|
||||
func Debug(s string, a ...interface{}) {
|
||||
rlog.Debugf(s, a...)
|
||||
logger.Log.Debugf(s, a...)
|
||||
}
|
||||
func Trace(s string, a ...interface{}) {
|
||||
rlog.Tracef(1, s, a...)
|
||||
logger.Log.Infof(s, a...)
|
||||
}
|
||||
|
||||
func inArrayInt(arr []int, valueToCheck int) bool {
|
||||
@@ -188,11 +186,11 @@ func startMemoryProfiler() {
|
||||
}
|
||||
}
|
||||
|
||||
rlog.Info("Profiling is on, results will be written to %s", dumpPath)
|
||||
logger.Log.Info("Profiling is on, results will be written to %s", dumpPath)
|
||||
go func() {
|
||||
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
|
||||
if err := os.Mkdir(dumpPath, 0777); err != nil {
|
||||
log.Fatal("could not create directory for profile: ", err)
|
||||
logger.Log.Fatal("could not create directory for profile: ", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,15 +199,15 @@ func startMemoryProfiler() {
|
||||
|
||||
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
|
||||
|
||||
rlog.Infof("Writing memory profile to %s\n", filename)
|
||||
logger.Log.Infof("Writing memory profile to %s\n", filename)
|
||||
|
||||
f, err := os.Create(filename)
|
||||
if err != nil {
|
||||
log.Fatal("could not create memory profile: ", err)
|
||||
logger.Log.Fatal("could not create memory profile: ", err)
|
||||
}
|
||||
runtime.GC() // get up-to-date statistics
|
||||
if err := pprof.WriteHeapProfile(f); err != nil {
|
||||
log.Fatal("could not write memory profile: ", err)
|
||||
logger.Log.Fatal("could not write memory profile: ", err)
|
||||
}
|
||||
_ = f.Close()
|
||||
time.Sleep(time.Second * time.Duration(timeInterval))
|
||||
@@ -229,7 +227,7 @@ func closeTimedoutTcpStreamChannels() {
|
||||
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
|
||||
stream.Close()
|
||||
appStats.IncDroppedTcpStreams()
|
||||
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
|
||||
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
|
||||
}
|
||||
} else {
|
||||
if !stream.superIdentifier.IsClosedOthers {
|
||||
@@ -254,7 +252,6 @@ func closeTimedoutTcpStreamChannels() {
|
||||
}
|
||||
|
||||
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
|
||||
go closeTimedoutTcpStreamChannels()
|
||||
|
||||
defer util.Run()()
|
||||
@@ -269,8 +266,8 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
|
||||
if localhostIPs, err := getLocalhostIPs(); err != nil {
|
||||
// TODO: think this over
|
||||
rlog.Info("Failed to get self IP addresses")
|
||||
rlog.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
|
||||
logger.Log.Info("Failed to get self IP addresses")
|
||||
logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
|
||||
ownIps = make([]string, 0)
|
||||
} else {
|
||||
ownIps = localhostIPs
|
||||
@@ -280,7 +277,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
var err error
|
||||
if *fname != "" {
|
||||
if handle, err = pcap.OpenOffline(*fname); err != nil {
|
||||
log.Fatalf("PCAP OpenOffline error: %v", err)
|
||||
logger.Log.Fatalf("PCAP OpenOffline error: %v", err)
|
||||
}
|
||||
} else {
|
||||
// This is a little complicated because we want to allow all possible options
|
||||
@@ -288,33 +285,33 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
// just call pcap.OpenLive if you want a simple handle.
|
||||
inactive, err := pcap.NewInactiveHandle(*iface)
|
||||
if err != nil {
|
||||
log.Fatalf("could not create: %v", err)
|
||||
logger.Log.Fatalf("could not create: %v", err)
|
||||
}
|
||||
defer inactive.CleanUp()
|
||||
if err = inactive.SetSnapLen(*snaplen); err != nil {
|
||||
log.Fatalf("could not set snap length: %v", err)
|
||||
logger.Log.Fatalf("could not set snap length: %v", err)
|
||||
} else if err = inactive.SetPromisc(*promisc); err != nil {
|
||||
log.Fatalf("could not set promisc mode: %v", err)
|
||||
logger.Log.Fatalf("could not set promisc mode: %v", err)
|
||||
} else if err = inactive.SetTimeout(time.Second); err != nil {
|
||||
log.Fatalf("could not set timeout: %v", err)
|
||||
logger.Log.Fatalf("could not set timeout: %v", err)
|
||||
}
|
||||
if *tstype != "" {
|
||||
if t, err := pcap.TimestampSourceFromString(*tstype); err != nil {
|
||||
log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
|
||||
logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
|
||||
} else if err := inactive.SetTimestampSource(t); err != nil {
|
||||
log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
|
||||
logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
|
||||
}
|
||||
}
|
||||
if handle, err = inactive.Activate(); err != nil {
|
||||
log.Fatalf("PCAP Activate error: %v", err)
|
||||
logger.Log.Fatalf("PCAP Activate error: %v", err)
|
||||
}
|
||||
defer handle.Close()
|
||||
}
|
||||
if len(flag.Args()) > 0 {
|
||||
bpffilter := strings.Join(flag.Args(), " ")
|
||||
rlog.Infof("Using BPF filter %q", bpffilter)
|
||||
logger.Log.Infof("Using BPF filter %q", bpffilter)
|
||||
if err = handle.SetBPFFilter(bpffilter); err != nil {
|
||||
log.Fatalf("BPF filter error: %v", err)
|
||||
logger.Log.Fatalf("BPF filter error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -325,12 +322,12 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
decoderName = fmt.Sprintf("%s", handle.LinkType())
|
||||
}
|
||||
if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok {
|
||||
log.Fatalln("No decoder named", decoderName)
|
||||
logger.Log.Fatal("No decoder named", decoderName)
|
||||
}
|
||||
source := gopacket.NewPacketSource(handle, dec)
|
||||
source.Lazy = *lazy
|
||||
source.NoCopy = true
|
||||
rlog.Info("Starting to read packets")
|
||||
logger.Log.Info("Starting to read packets")
|
||||
appStats.SetStartTime(time.Now())
|
||||
defragger := ip4defrag.NewIPv4Defragmenter()
|
||||
|
||||
@@ -347,7 +344,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
|
||||
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
|
||||
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
|
||||
rlog.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection)
|
||||
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection)
|
||||
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
|
||||
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
|
||||
|
||||
@@ -377,7 +374,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
errorMapLen := len(errorsMap)
|
||||
errorsSummery := fmt.Sprintf("%v", errorsMap)
|
||||
errorsMapMutex.Unlock()
|
||||
log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
||||
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
|
||||
time.Since(appStats.StartTime),
|
||||
nErrors,
|
||||
errorMapLen,
|
||||
@@ -387,7 +384,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
// At this moment
|
||||
memStats := runtime.MemStats{}
|
||||
runtime.ReadMemStats(&memStats)
|
||||
log.Printf(
|
||||
logger.Log.Infof(
|
||||
"mem: %d, goroutines: %d",
|
||||
memStats.HeapAlloc,
|
||||
runtime.NumGoroutine(),
|
||||
@@ -395,7 +392,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
|
||||
// Since the last print
|
||||
cleanStats := cleaner.dumpStats()
|
||||
log.Printf(
|
||||
logger.Log.Infof(
|
||||
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
|
||||
cleanStats.flushed,
|
||||
cleanStats.closed,
|
||||
@@ -403,7 +400,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
)
|
||||
currentAppStats := appStats.DumpStats()
|
||||
appStatsJSON, _ := json.Marshal(currentAppStats)
|
||||
log.Printf("app stats - %v", string(appStatsJSON))
|
||||
logger.Log.Infof("app stats - %v", string(appStatsJSON))
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -416,15 +413,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
if err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
rlog.Debugf("Error:", err)
|
||||
logger.Log.Debugf("Error: %v", err)
|
||||
continue
|
||||
}
|
||||
packetsCount := appStats.IncPacketsCount()
|
||||
rlog.Debugf("PACKET #%d", packetsCount)
|
||||
logger.Log.Debugf("PACKET #%d", packetsCount)
|
||||
data := packet.Data()
|
||||
appStats.UpdateProcessedBytes(uint64(len(data)))
|
||||
if *hexdumppkt {
|
||||
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
|
||||
logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
|
||||
}
|
||||
|
||||
// defrag the IPv4 packet if required
|
||||
@@ -437,17 +434,17 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
l := ip4.Length
|
||||
newip4, err := defragger.DefragIPv4(ip4)
|
||||
if err != nil {
|
||||
log.Fatalln("Error while de-fragmenting", err)
|
||||
logger.Log.Fatal("Error while de-fragmenting", err)
|
||||
} else if newip4 == nil {
|
||||
rlog.Debugf("Fragment...")
|
||||
logger.Log.Debugf("Fragment...")
|
||||
continue // packet fragment, we don't have whole packet yet.
|
||||
}
|
||||
if newip4.Length != l {
|
||||
stats.ipdefrag++
|
||||
rlog.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
|
||||
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
|
||||
pb, ok := packet.(gopacket.PacketBuilder)
|
||||
if !ok {
|
||||
log.Panic("Not a PacketBuilder")
|
||||
logger.Log.Panic("Not a PacketBuilder")
|
||||
}
|
||||
nextDecoder := newip4.NextLayerType()
|
||||
_ = nextDecoder.Decode(newip4.Payload, pb)
|
||||
@@ -461,14 +458,14 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
if *checksum {
|
||||
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to set network layer for checksum: %s\n", err)
|
||||
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
|
||||
}
|
||||
}
|
||||
c := Context{
|
||||
CaptureInfo: packet.Metadata().CaptureInfo,
|
||||
}
|
||||
stats.totalsz += len(tcp.Payload)
|
||||
rlog.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
|
||||
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
|
||||
assemblerMutex.Lock()
|
||||
assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
|
||||
assemblerMutex.Unlock()
|
||||
@@ -479,7 +476,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
errorsMapMutex.Lock()
|
||||
errorMapLen := len(errorsMap)
|
||||
errorsMapMutex.Unlock()
|
||||
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
|
||||
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
|
||||
appStats.PacketsCount,
|
||||
appStats.ProcessedBytes,
|
||||
time.Since(appStats.StartTime),
|
||||
@@ -488,7 +485,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
}
|
||||
select {
|
||||
case <-signalChan:
|
||||
log.Printf("Caught SIGINT: aborting")
|
||||
logger.Log.Infof("Caught SIGINT: aborting")
|
||||
done = true
|
||||
default:
|
||||
// NOP: continue
|
||||
@@ -501,7 +498,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
assemblerMutex.Lock()
|
||||
closed := assembler.FlushAll()
|
||||
assemblerMutex.Unlock()
|
||||
rlog.Debugf("Final flush: %d closed", closed)
|
||||
logger.Log.Debugf("Final flush: %d closed", closed)
|
||||
if outputLevel >= 2 {
|
||||
streamPool.Dump()
|
||||
}
|
||||
@@ -509,7 +506,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
if *memprofile != "" {
|
||||
f, err := os.Create(*memprofile)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
logger.Log.Fatal(err)
|
||||
}
|
||||
_ = pprof.WriteHeapProfile(f)
|
||||
_ = f.Close()
|
||||
@@ -517,29 +514,29 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
|
||||
|
||||
streamFactory.WaitGoRoutines()
|
||||
assemblerMutex.Lock()
|
||||
rlog.Debugf("%s", assembler.Dump())
|
||||
logger.Log.Debugf("%s", assembler.Dump())
|
||||
assemblerMutex.Unlock()
|
||||
if !*nodefrag {
|
||||
log.Printf("IPdefrag:\t\t%d", stats.ipdefrag)
|
||||
logger.Log.Infof("IPdefrag:\t\t%d", stats.ipdefrag)
|
||||
}
|
||||
log.Printf("TCP stats:")
|
||||
log.Printf(" missed bytes:\t\t%d", stats.missedBytes)
|
||||
log.Printf(" total packets:\t\t%d", stats.pkt)
|
||||
log.Printf(" rejected FSM:\t\t%d", stats.rejectFsm)
|
||||
log.Printf(" rejected Options:\t%d", stats.rejectOpt)
|
||||
log.Printf(" reassembled bytes:\t%d", stats.sz)
|
||||
log.Printf(" total TCP bytes:\t%d", stats.totalsz)
|
||||
log.Printf(" conn rejected FSM:\t%d", stats.rejectConnFsm)
|
||||
log.Printf(" reassembled chunks:\t%d", stats.reassembled)
|
||||
log.Printf(" out-of-order packets:\t%d", stats.outOfOrderPackets)
|
||||
log.Printf(" out-of-order bytes:\t%d", stats.outOfOrderBytes)
|
||||
log.Printf(" biggest-chunk packets:\t%d", stats.biggestChunkPackets)
|
||||
log.Printf(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
|
||||
log.Printf(" overlap packets:\t%d", stats.overlapPackets)
|
||||
log.Printf(" overlap bytes:\t\t%d", stats.overlapBytes)
|
||||
log.Printf("Errors: %d", nErrors)
|
||||
logger.Log.Infof("TCP stats:")
|
||||
logger.Log.Infof(" missed bytes:\t\t%d", stats.missedBytes)
|
||||
logger.Log.Infof(" total packets:\t\t%d", stats.pkt)
|
||||
logger.Log.Infof(" rejected FSM:\t\t%d", stats.rejectFsm)
|
||||
logger.Log.Infof(" rejected Options:\t%d", stats.rejectOpt)
|
||||
logger.Log.Infof(" reassembled bytes:\t%d", stats.sz)
|
||||
logger.Log.Infof(" total TCP bytes:\t%d", stats.totalsz)
|
||||
logger.Log.Infof(" conn rejected FSM:\t%d", stats.rejectConnFsm)
|
||||
logger.Log.Infof(" reassembled chunks:\t%d", stats.reassembled)
|
||||
logger.Log.Infof(" out-of-order packets:\t%d", stats.outOfOrderPackets)
|
||||
logger.Log.Infof(" out-of-order bytes:\t%d", stats.outOfOrderBytes)
|
||||
logger.Log.Infof(" biggest-chunk packets:\t%d", stats.biggestChunkPackets)
|
||||
logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
|
||||
logger.Log.Infof(" overlap packets:\t%d", stats.overlapPackets)
|
||||
logger.Log.Infof(" overlap bytes:\t\t%d", stats.overlapBytes)
|
||||
logger.Log.Infof("Errors: %d", nErrors)
|
||||
for e := range errorsMap {
|
||||
log.Printf(" %s:\t\t%d", e, errorsMap[e])
|
||||
logger.Log.Infof(" %s:\t\t%d", e, errorsMap[e])
|
||||
}
|
||||
log.Printf("AppStats: %v", GetStats())
|
||||
logger.Log.Infof("AppStats: %v", GetStats())
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/bradleyfalzon/tlsx"
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
)
|
||||
|
||||
@@ -79,7 +79,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
|
||||
clientHello := tlsx.ClientHello{}
|
||||
err := clientHello.Unmarshall(msg.bytes)
|
||||
if err == nil {
|
||||
rlog.Debugf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
|
||||
logger.Log.Debugf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
|
||||
// TODO: Throws `panic: runtime error: invalid memory address or nil pointer dereference` error.
|
||||
// numericPort, _ := strconv.Atoi(h.tcpID.DstPort)
|
||||
// h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol)
|
||||
|
||||
@@ -5,7 +5,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/romana/rlog"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
|
||||
"github.com/google/gopacket"
|
||||
@@ -33,7 +33,7 @@ var streams *sync.Map = &sync.Map{} // global
|
||||
var streamId int64 = 0
|
||||
|
||||
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
|
||||
rlog.Debugf("* NEW: %s %s", net, transport)
|
||||
logger.Log.Debugf("* NEW: %s %s", net, transport)
|
||||
fsmOptions := reassembly.TCPSimpleFSMOptions{
|
||||
SupportMissingEstablishment: *allowmissinginit,
|
||||
}
|
||||
@@ -123,21 +123,21 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
|
||||
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
|
||||
if hostMode {
|
||||
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
|
||||
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort))
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: false}
|
||||
} else if inArrayString(gSettings.filterAuthorities, dstIP) {
|
||||
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP))
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: false}
|
||||
} else if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
|
||||
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s:%s", srcIP, srcPort))
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s:%s", srcIP, srcPort))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: true}
|
||||
} else if inArrayString(gSettings.filterAuthorities, srcIP) {
|
||||
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host4 %s", srcIP))
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host4 %s", srcIP))
|
||||
return &streamProps{isTapTarget: true, isOutgoing: true}
|
||||
}
|
||||
return &streamProps{isTapTarget: false, isOutgoing: false}
|
||||
} else {
|
||||
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort))
|
||||
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort))
|
||||
return &streamProps{isTapTarget: true}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ const App = () => {
|
||||
try {
|
||||
const recentTLSLinks = await api.getRecentTLSLinks();
|
||||
if (recentTLSLinks?.length > 0) {
|
||||
setAddressesWithTLS(new Set([...addressesWithTLS, ...recentTLSLinks]));
|
||||
setAddressesWithTLS(new Set(recentTLSLinks));
|
||||
setShowTLSWarning(true);
|
||||
}
|
||||
const auth = await api.getAuthStatus();
|
||||
@@ -45,7 +45,7 @@ const App = () => {
|
||||
}
|
||||
|
||||
})();
|
||||
},[]);
|
||||
}, []);
|
||||
|
||||
const onTLSDetected = (destAddress: string) => {
|
||||
addressesWithTLS.add(destAddress);
|
||||
|
||||
Reference in New Issue
Block a user