Compare commits

..

13 Commits

Author SHA1 Message Date
RoyUP9
3aafbd7e1c added upsert workspace before dumping traffic (#368) 2021-10-19 11:06:51 +03:00
M. Mert Yıldıran
58e9363fda Replace all rlog occurrences with the shared logger in tap (#369) 2021-10-18 16:35:42 +03:00
M. Mert Yıldıran
6a85ab53eb Fix the go.mod of acceptanceTests (#371)
* Fix the `go.mod` of `acceptanceTests`

* Enable acceptance tests

* Revert "Enable acceptance tests"

This reverts commit e21c527e69.
2021-10-18 16:35:10 +03:00
M. Mert Yıldıran
212e4687d8 Fix the dependency error in acceptance tests (#370) 2021-10-17 18:36:30 +03:00
M. Mert Yıldıran
167b17dfd2 Move the 8899 integer and string literals into a const named DefaultApiServerPort in shared (#367) 2021-10-17 15:28:33 +03:00
M. Mert Yıldıran
9d179c7227 Ignore an eslint error (#351)
* Ignore an eslint error

* Change the fix
2021-10-17 15:28:03 +03:00
M. Mert Yıldıran
147e812edb Replace all rlog occurrences with the shared logger (#350)
* Replace all `rlog` occurrences with the shared logger

* Use the same log format in `InitLoggerStderrOnly` as well

* Convert one more `log.Fatal` to `logger.Log.Errorf` as well in the `cli`

* Replace `log.` occurrences with `logger.Log.` in `agent`

* Fix `cannot use err (type error)`

* Change the logging level to `DEBUG`

* Replace an `Errorf` with `Fatal`

* Add informative message
2021-10-17 12:15:30 +03:00
M. Mert Yıldıran
91196bb306 Add readiness and liveness probes to API server (#365)
* Add readiness and liveness probes to API server

* Use `intstr.FromInt(8899)` instead
2021-10-17 11:40:18 +03:00
Igor Gov
26834a6e04 Fix documentation from "mizu-image" to "agent-image" (#363) 2021-10-15 14:28:00 +03:00
M. Mert Yıldıran
754f385865 Improve formatting in bug_report.md issue template (#352) 2021-10-15 14:14:51 +03:00
M. Mert Yıldıran
b30b62ef77 Move cli/logger to shared, and refactor all its usages in cli (#349)
* Move `cli/logger` to `shared`, and refactor all its usages in `cli`

* Remove indirect for `op/go-logging` in `shared`
2021-10-14 10:18:01 +03:00
RoyUP9
26788bb3a6 organize routes (#348) 2021-10-13 17:31:15 +03:00
RoyUP9
2706cd4d50 api server remove unused env vars (#347) 2021-10-13 14:14:14 +03:00
55 changed files with 449 additions and 437 deletions

View File

@@ -12,9 +12,9 @@ A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Run mizu <command> '...'
2. Click on '....'
3. Scroll down to '....'
1. Run `mizu <command> ...`
2. Click on '...'
3. Scroll down to '...'
4. See error
**Expected behavior**
@@ -22,17 +22,17 @@ A clear and concise description of what you expected to happen.
**Logs**
Upload logs:
1. Run the mizu command with `--set dump-logs=true` (e.g `mizu tap --set dump-logs=true`)
1. Run the mizu command with `--set dump-logs=true` (e.g `mizu tap --set dump-logs=true`)
2. Try to reproduce the issue
3. CNTRL+C on terminal tab which runs mizu
4. Upload the logs zip file from ~/.mizu/mizu_logs_**.zip
3. <kbd>CTRL</kbd>+<kbd>C</kbd> on terminal tab which runs `mizu`
4. Upload the logs zip file from `~/.mizu/mizu_logs_**.zip`
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Desktop (please complete the following information):**
- OS: [e.g. iOS]
- Browser [e.g. chrome]
- OS: [e.g. macOS]
- Web Browser: [e.g. Google Chrome]
**Additional context**
Add any other context about the problem here.

View File

@@ -2,4 +2,9 @@ module github.com/up9inc/mizu/tests
go 1.16
require gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
require (
github.com/up9inc/mizu/shared v0.0.0
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared

View File

@@ -1,3 +1,7 @@
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/golang-jwt/jwt/v4 v4.1.0 h1:XUgk2Ex5veyVFVeLm0xhusUTQybEbexJXrvPNOKkSY0=
github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=

View File

@@ -66,7 +66,7 @@ func TestTap(t *testing.T) {
entriesCheckFunc := func() error {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
entriesUrl := fmt.Sprintf("%v/api/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, entriesCount, timestamp)
entriesUrl := fmt.Sprintf("%v/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, entriesCount, timestamp)
requestResult, requestErr := executeHttpGetRequest(entriesUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entries, err: %v", requestErr)
@@ -79,7 +79,7 @@ func TestTap(t *testing.T) {
entry := entries[0].(map[string]interface{})
entryUrl := fmt.Sprintf("%v/api/entries/%v", apiServerUrl, entry["id"])
entryUrl := fmt.Sprintf("%v/entries/%v", apiServerUrl, entry["id"])
requestResult, requestErr = executeHttpGetRequest(entryUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
@@ -188,7 +188,7 @@ func TestTapAllNamespaces(t *testing.T) {
return
}
podsUrl := fmt.Sprintf("%v/api/tapStatus", apiServerUrl)
podsUrl := fmt.Sprintf("%v/status/tap", apiServerUrl)
requestResult, requestErr := executeHttpGetRequest(podsUrl)
if requestErr != nil {
t.Errorf("failed to get tap status, err: %v", requestErr)
@@ -269,7 +269,7 @@ func TestTapMultipleNamespaces(t *testing.T) {
return
}
podsUrl := fmt.Sprintf("%v/api/tapStatus", apiServerUrl)
podsUrl := fmt.Sprintf("%v/status/tap", apiServerUrl)
requestResult, requestErr := executeHttpGetRequest(podsUrl)
if requestErr != nil {
t.Errorf("failed to get tap status, err: %v", requestErr)
@@ -352,7 +352,7 @@ func TestTapRegex(t *testing.T) {
return
}
podsUrl := fmt.Sprintf("%v/api/tapStatus", apiServerUrl)
podsUrl := fmt.Sprintf("%v/status/tap", apiServerUrl)
requestResult, requestErr := executeHttpGetRequest(podsUrl)
if requestErr != nil {
t.Errorf("failed to get tap status, err: %v", requestErr)
@@ -486,7 +486,7 @@ func TestTapRedact(t *testing.T) {
redactCheckFunc := func() error {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
entriesUrl := fmt.Sprintf("%v/api/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount, timestamp)
entriesUrl := fmt.Sprintf("%v/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount, timestamp)
requestResult, requestErr := executeHttpGetRequest(entriesUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entries, err: %v", requestErr)
@@ -499,7 +499,7 @@ func TestTapRedact(t *testing.T) {
firstEntry := entries[0].(map[string]interface{})
entryUrl := fmt.Sprintf("%v/api/entries/%v", apiServerUrl, firstEntry["id"])
entryUrl := fmt.Sprintf("%v/entries/%v", apiServerUrl, firstEntry["id"])
requestResult, requestErr = executeHttpGetRequest(entryUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
@@ -601,7 +601,7 @@ func TestTapNoRedact(t *testing.T) {
redactCheckFunc := func() error {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
entriesUrl := fmt.Sprintf("%v/api/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount, timestamp)
entriesUrl := fmt.Sprintf("%v/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount, timestamp)
requestResult, requestErr := executeHttpGetRequest(entriesUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entries, err: %v", requestErr)
@@ -614,7 +614,7 @@ func TestTapNoRedact(t *testing.T) {
firstEntry := entries[0].(map[string]interface{})
entryUrl := fmt.Sprintf("%v/api/entries/%v", apiServerUrl, firstEntry["id"])
entryUrl := fmt.Sprintf("%v/entries/%v", apiServerUrl, firstEntry["id"])
requestResult, requestErr = executeHttpGetRequest(entryUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
@@ -716,7 +716,7 @@ func TestTapRegexMasking(t *testing.T) {
redactCheckFunc := func() error {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
entriesUrl := fmt.Sprintf("%v/api/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount, timestamp)
entriesUrl := fmt.Sprintf("%v/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount, timestamp)
requestResult, requestErr := executeHttpGetRequest(entriesUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entries, err: %v", requestErr)
@@ -729,7 +729,7 @@ func TestTapRegexMasking(t *testing.T) {
firstEntry := entries[0].(map[string]interface{})
entryUrl := fmt.Sprintf("%v/api/entries/%v", apiServerUrl, firstEntry["id"])
entryUrl := fmt.Sprintf("%v/entries/%v", apiServerUrl, firstEntry["id"])
requestResult, requestErr = executeHttpGetRequest(entryUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entry, err: %v", requestErr)
@@ -823,7 +823,7 @@ func TestTapIgnoredUserAgents(t *testing.T) {
ignoredUserAgentsCheckFunc := func() error {
timestamp := time.Now().UnixNano() / int64(time.Millisecond)
entriesUrl := fmt.Sprintf("%v/api/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount * 2, timestamp)
entriesUrl := fmt.Sprintf("%v/entries?limit=%v&operator=lt&timestamp=%v", apiServerUrl, defaultEntriesCount * 2, timestamp)
requestResult, requestErr := executeHttpGetRequest(entriesUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entries, err: %v", requestErr)
@@ -835,7 +835,7 @@ func TestTapIgnoredUserAgents(t *testing.T) {
}
for _, entryInterface := range entries {
entryUrl := fmt.Sprintf("%v/api/entries/%v", apiServerUrl, entryInterface.(map[string]interface{})["id"])
entryUrl := fmt.Sprintf("%v/entries/%v", apiServerUrl, entryInterface.(map[string]interface{})["id"])
requestResult, requestErr = executeHttpGetRequest(entryUrl)
if requestErr != nil {
return fmt.Errorf("failed to get entry, err: %v", requestErr)

View File

@@ -12,12 +12,14 @@ import (
"strings"
"syscall"
"time"
"github.com/up9inc/mizu/shared"
)
const (
longRetriesCount = 100
shortRetriesCount = 10
defaultApiServerPort = 8899
defaultApiServerPort = shared.DefaultApiServerPort
defaultNamespaceName = "mizu-tests"
defaultServiceName = "httpbin"
defaultEntriesCount = 50

View File

@@ -12,7 +12,8 @@ Basic APIs:
`docker build . -t gcr.io/up9-docker-hub/mizu/debug:latest -f debug.Dockerfile && docker push gcr.io/up9-docker-hub/mizu/debug:latest`
### Connecting
1. Start mizu using the cli with the debug image `mizu tap --mizu-image gcr.io/up9-docker-hub/mizu/debug:latest {tapped_pod_name}`
1. Start mizu using the cli with the debug
image `mizu tap --set agent-image=gcr.io/up9-docker-hub/mizu/debug:latest {tapped_pod_name}`
2. Forward the debug port using `kubectl port-forward -n default mizu-api-server 2345:2345`
3. Run the run/debug configuration you've created earlier in Intellij.

View File

@@ -241,6 +241,8 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 h1:fa50YL1pzKW+1SsBnJDOHppJN9stOEwS+CRWyUtyYGU=
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=

View File

@@ -4,15 +4,7 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
"io/ioutil"
"log"
"mizuserver/pkg/api"
"mizuserver/pkg/controllers"
"mizuserver/pkg/models"
@@ -26,6 +18,14 @@ import (
"path/filepath"
"plugin"
"sort"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
)
var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API")
@@ -40,22 +40,23 @@ var extensions []*tapApi.Extension // global
var extensionsMap map[string]*tapApi.Extension // global
func main() {
logger.InitLoggerStderrOnly()
flag.Parse()
loadExtensions()
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
if !*tapperMode && !*apiServerMode && !*standaloneMode && !*harsReaderMode {
panic("One of the flags --tap, --api or --standalone or --hars-read must be provided")
}
filteringOptions := getTrafficFilteringOptions()
if *standaloneMode {
api.StartResolving(*namespace)
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteringOptions := getTrafficFilteringOptions()
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions, filteringOptions)
go filterItems(outputItemsChannel, filteredOutputItemsChannel)
@@ -63,7 +64,7 @@ func main() {
hostApi(nil)
} else if *tapperMode {
rlog.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
logger.Log.Infof("Starting tapper, websocket address: %s", *apiServerAddress)
if *apiServerAddress == "" {
panic("API server address must be provided with --api-server-address when using --tap")
}
@@ -71,16 +72,20 @@ func main() {
tapTargets := getTapTargets()
if tapTargets != nil {
tap.SetFilterAuthorities(tapTargets)
rlog.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
logger.Log.Infof("Filtering for the following authorities: %v", tap.GetFilterIPs())
}
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteringOptions := getTrafficFilteringOptions()
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions, filteringOptions)
socketConnection, _, err := websocket.DefaultDialer.Dial(*apiServerAddress, nil)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *apiServerAddress, err))
}
rlog.Infof("Connected successfully to websocket %s", *apiServerAddress)
logger.Log.Infof("Connected successfully to websocket %s", *apiServerAddress)
go pipeTapChannelToSocket(socketConnection, filteredOutputItemsChannel)
} else if *apiServerMode {
@@ -113,7 +118,7 @@ func main() {
signal.Notify(signalChan, os.Interrupt)
<-signalChan
rlog.Info("Exiting")
logger.Log.Info("Exiting")
}
func loadExtensions() {
@@ -122,13 +127,13 @@ func loadExtensions() {
files, err := ioutil.ReadDir(extensionsDir)
if err != nil {
log.Fatal(err)
logger.Log.Fatal(err)
}
extensions = make([]*tapApi.Extension, len(files))
extensionsMap = make(map[string]*tapApi.Extension)
for i, file := range files {
filename := file.Name()
rlog.Infof("Loading extension: %s\n", filename)
logger.Log.Infof("Loading extension: %s\n", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
}
@@ -153,7 +158,7 @@ func loadExtensions() {
})
for _, extension := range extensions {
log.Printf("Extension Properties: %+v\n", extension)
logger.Log.Infof("Extension Properties: %+v\n", extension)
}
controllers.InitExtensionsMap(extensionsMap)
@@ -270,7 +275,7 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
for messageData := range messageDataChannel {
marshaledData, err := models.CreateWebsocketTappedEntryMessage(messageData)
if err != nil {
rlog.Errorf("error converting message to json %v, err: %s, (%v,%+v)", messageData, err, err, err)
logger.Log.Errorf("error converting message to json %v, err: %s, (%v,%+v)", messageData, err, err, err)
continue
}
@@ -278,7 +283,7 @@ func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-cha
// and goes into the intermediate WebSocket.
err = connection.WriteMessage(websocket.TextMessage, marshaledData)
if err != nil {
rlog.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err)
logger.Log.Errorf("error sending message through socket server %v, err: %s, (%v,%+v)", messageData, err, err, err)
continue
}
}

View File

@@ -17,7 +17,7 @@ import (
"go.mongodb.org/mongo-driver/bson/primitive"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/models"
@@ -31,7 +31,7 @@ func StartResolving(namespace string) {
errOut := make(chan error, 100)
res, err := resolver.NewFromInCluster(errOut, namespace)
if err != nil {
rlog.Infof("error creating k8s resolver %s", err)
logger.Log.Infof("error creating k8s resolver %s", err)
return
}
ctx := context.Background()
@@ -40,7 +40,7 @@ func StartResolving(namespace string) {
for {
select {
case err := <-errOut:
rlog.Infof("name resolving error %s", err)
logger.Log.Infof("name resolving error %s", err)
}
}
}()
@@ -59,7 +59,7 @@ func StartReadingEntries(harChannel <-chan *tapApi.OutputChannelItem, workingDir
func startReadingFiles(workingDir string) {
if err := os.MkdirAll(workingDir, os.ModePerm); err != nil {
rlog.Errorf("Failed to make dir: %s, err: %v", workingDir, err)
logger.Log.Errorf("Failed to make dir: %s, err: %v", workingDir, err)
return
}
@@ -76,7 +76,7 @@ func startReadingFiles(workingDir string) {
sort.Sort(utils.ByModTime(harFiles))
if len(harFiles) == 0 {
rlog.Infof("Waiting for new files\n")
logger.Log.Infof("Waiting for new files\n")
time.Sleep(3 * time.Second)
continue
}
@@ -128,7 +128,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
rlog.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
logger.Log.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
@@ -136,7 +136,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
rlog.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
logger.Log.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}

View File

@@ -2,13 +2,14 @@ package api
import (
"errors"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared/debounce"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
)
type EventHandlers interface {
@@ -50,7 +51,7 @@ func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers) {
func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool) {
conn, err := websocketUpgrader.Upgrade(w, r, nil)
if err != nil {
rlog.Errorf("Failed to set websocket upgrade: %v", err)
logger.Log.Errorf("Failed to set websocket upgrade: %v", err)
return
}
@@ -71,7 +72,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
for {
_, msg, err := conn.ReadMessage()
if err != nil {
rlog.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
logger.Log.Errorf("Error reading message, socket id: %d, error: %v", socketId, err)
break
}
eventHandlers.WebSocketMessage(socketId, msg)
@@ -81,7 +82,7 @@ func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers Even
func socketCleanup(socketId int, socketConnection *SocketConnection) {
err := socketConnection.connection.Close()
if err != nil {
rlog.Errorf("Error closing socket connection for socket id %d: %v\n", socketId, err)
logger.Log.Errorf("Error closing socket connection for socket id %d: %v\n", socketId, err)
}
websocketIdsLock.Lock()
@@ -92,7 +93,7 @@ func socketCleanup(socketId int, socketConnection *SocketConnection) {
}
var db = debounce.NewDebouncer(time.Second*5, func() {
rlog.Error("Successfully sent to socket")
logger.Log.Error("Successfully sent to socket")
})
func SendToSocket(socketId int, message []byte) error {
@@ -104,7 +105,7 @@ func SendToSocket(socketId int, message []byte) error {
var sent = false
time.AfterFunc(time.Second*5, func() {
if !sent {
rlog.Error("Socket timed out")
logger.Log.Error("Socket timed out")
socketCleanup(socketId, socketObj)
}
})

View File

@@ -10,8 +10,8 @@ import (
tapApi "github.com/up9inc/mizu/tap/api"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
var browserClientSocketUUIDs = make([]int, 0)
@@ -28,10 +28,10 @@ func init() {
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
if isTapper {
rlog.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
logger.Log.Infof("Websocket event - Tapper connected, socket ID: %d", socketId)
providers.TapperAdded()
} else {
rlog.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
logger.Log.Infof("Websocket event - Browser socket connected, socket ID: %d", socketId)
socketListLock.Lock()
browserClientSocketUUIDs = append(browserClientSocketUUIDs, socketId)
socketListLock.Unlock()
@@ -40,10 +40,10 @@ func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
if isTapper {
rlog.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
logger.Log.Infof("Websocket event - Tapper disconnected, socket ID: %d", socketId)
providers.TapperRemoved()
} else {
rlog.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
logger.Log.Infof("Websocket event - Browser socket disconnected, socket ID: %d", socketId)
socketListLock.Lock()
removeSocketUUIDFromBrowserSlice(socketId)
socketListLock.Unlock()
@@ -55,7 +55,7 @@ func BroadcastToBrowserClients(message []byte) {
go func(socketId int) {
err := SendToSocket(socketId, message)
if err != nil {
rlog.Errorf("error sending message to socket ID %d: %v", socketId, err)
logger.Log.Errorf("error sending message to socket ID %d: %v", socketId, err)
}
}(socketId)
}
@@ -65,14 +65,14 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var socketMessageBase shared.WebSocketMessageMetadata
err := json.Unmarshal(message, &socketMessageBase)
if err != nil {
rlog.Infof("Could not unmarshal websocket message %v\n", err)
logger.Log.Infof("Could not unmarshal websocket message %v\n", err)
} else {
switch socketMessageBase.MessageType {
case shared.WebSocketMessageTypeTappedEntry:
var tappedEntryMessage models.WebSocketTappedEntryMessage
err := json.Unmarshal(message, &tappedEntryMessage)
if err != nil {
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
} else {
// NOTE: This is where the message comes back from the intermediate WebSocket to code.
h.SocketOutChannel <- tappedEntryMessage.Data
@@ -81,7 +81,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var statusMessage shared.WebSocketStatusMessage
err := json.Unmarshal(message, &statusMessage)
if err != nil {
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
} else {
providers.TapStatus.Pods = statusMessage.TappingStatus.Pods
BroadcastToBrowserClients(message)
@@ -90,12 +90,12 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
var outboundLinkMessage models.WebsocketOutboundLinkMessage
err := json.Unmarshal(message, &outboundLinkMessage)
if err != nil {
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
logger.Log.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
} else {
handleTLSLink(outboundLinkMessage)
}
default:
rlog.Infof("Received socket message of type %s for which no handlers are defined", socketMessageBase.MessageType)
logger.Log.Infof("Received socket message of type %s for which no handlers are defined", socketMessageBase.MessageType)
}
}
}
@@ -116,9 +116,9 @@ func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
}
marshaledMessage, err := json.Marshal(outboundLinkMessage)
if err != nil {
rlog.Errorf("Error marshaling outbound link message for broadcasting: %v", err)
logger.Log.Errorf("Error marshaling outbound link message for broadcasting: %v", err)
} else {
rlog.Errorf("Broadcasting outboundlink message %s", string(marshaledMessage))
logger.Log.Errorf("Broadcasting outboundlink message %s", string(marshaledMessage))
BroadcastToBrowserClients(marshaledMessage)
}
}

View File

@@ -3,19 +3,13 @@ package controllers
import (
"encoding/json"
"fmt"
"github.com/gin-gonic/gin"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
"mizuserver/pkg/up9"
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
"net/http"
"time"
"github.com/google/martian/har"
"github.com/gin-gonic/gin"
tapApi "github.com/up9inc/mizu/tap/api"
)
var extensionsMap map[string]*tapApi.Extension // global
@@ -62,47 +56,6 @@ func GetEntries(c *gin.Context) {
c.JSON(http.StatusOK, baseEntries)
}
func GetFullEntries(c *gin.Context) {
entriesFilter := &models.HarFetchRequestQuery{}
if err := c.BindQuery(entriesFilter); err != nil {
c.JSON(http.StatusBadRequest, err)
}
err := validation.Validate(entriesFilter)
if err != nil {
c.JSON(http.StatusBadRequest, err)
}
var timestampFrom, timestampTo int64
if entriesFilter.From < 0 {
timestampFrom = 0
} else {
timestampFrom = entriesFilter.From
}
if entriesFilter.To <= 0 {
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
} else {
timestampTo = entriesFilter.To
}
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, nil)
result := make([]har.Entry, 0)
for _, data := range entriesArray {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(data.Entry), &pair); err != nil {
continue
}
harEntry, err := utils.NewEntry(&pair)
if err != nil {
continue
}
result = append(result, *harEntry)
}
c.JSON(http.StatusOK, result)
}
func GetEntry(c *gin.Context) {
var entryData tapApi.MizuEntry
database.GetEntriesTable().
@@ -133,30 +86,3 @@ func GetEntry(c *gin.Context) {
IsRulesEnabled: isRulesEnabled,
})
}
func DeleteAllEntries(c *gin.Context) {
database.GetEntriesTable().
Where("1 = 1").
Delete(&tapApi.MizuEntry{})
c.JSON(http.StatusOK, map[string]string{
"msg": "Success",
})
}
func GetGeneralStats(c *gin.Context) {
c.JSON(http.StatusOK, providers.GetGeneralStats())
}
func GetTappingStatus(c *gin.Context) {
c.JSON(http.StatusOK, providers.TapStatus)
}
func AnalyzeInformation(c *gin.Context) {
c.JSON(http.StatusOK, up9.GetAnalyzeInfo())
}
func GetRecentTLSLinks(c *gin.Context) {
c.JSON(http.StatusOK, providers.GetAllRecentTLSAddresses())
}

View File

@@ -1,12 +0,0 @@
package controllers
import (
"github.com/gin-gonic/gin"
"mizuserver/pkg/holder"
"net/http"
)
func GetCurrentResolvingInformation(c *gin.Context) {
c.JSON(http.StatusOK, holder.GetResolver().GetMap())
}

View File

@@ -2,13 +2,16 @@ package controllers
import (
"encoding/json"
"github.com/gin-gonic/gin"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"mizuserver/pkg/api"
"mizuserver/pkg/holder"
"mizuserver/pkg/providers"
"mizuserver/pkg/up9"
"mizuserver/pkg/validation"
"net/http"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
func PostTappedPods(c *gin.Context) {
@@ -21,11 +24,11 @@ func PostTappedPods(c *gin.Context) {
c.JSON(http.StatusBadRequest, err)
return
}
rlog.Infof("[Status] POST request: %d tapped pods", len(tapStatus.Pods))
logger.Log.Infof("[Status] POST request: %d tapped pods", len(tapStatus.Pods))
providers.TapStatus.Pods = tapStatus.Pods
message := shared.CreateWebSocketStatusMessage(*tapStatus)
if jsonBytes, err := json.Marshal(message); err != nil {
rlog.Errorf("Could not Marshal message %v\n", err)
logger.Log.Errorf("Could not Marshal message %v\n", err)
} else {
api.BroadcastToBrowserClients(jsonBytes)
}
@@ -44,3 +47,23 @@ func GetAuthStatus(c *gin.Context) {
c.JSON(http.StatusOK, authStatus)
}
func GetTappingStatus(c *gin.Context) {
c.JSON(http.StatusOK, providers.TapStatus)
}
func AnalyzeInformation(c *gin.Context) {
c.JSON(http.StatusOK, up9.GetAnalyzeInfo())
}
func GetGeneralStats(c *gin.Context) {
c.JSON(http.StatusOK, providers.GetGeneralStats())
}
func GetRecentTLSLinks(c *gin.Context) {
c.JSON(http.StatusOK, providers.GetAllRecentTLSAddresses())
}
func GetCurrentResolvingInformation(c *gin.Context) {
c.JSON(http.StatusOK, holder.GetResolver().GetMap())
}

View File

@@ -1,15 +1,14 @@
package database
import (
"log"
"os"
"strconv"
"time"
"github.com/fsnotify/fsnotify"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/shared/units"
tapApi "github.com/up9inc/mizu/tap/api"
)
@@ -20,13 +19,13 @@ const defaultMaxDatabaseSizeBytes int64 = 200 * 1000 * 1000
func StartEnforcingDatabaseSize() {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Fatalf("Error creating filesystem watcher for db size enforcement: %v\n", err)
logger.Log.Fatalf("Error creating filesystem watcher for db size enforcement: %v\n", err)
return
}
maxEntriesDBByteSize, err := getMaxEntriesDBByteSize()
if err != nil {
log.Fatalf("Error parsing max db size: %v\n", err)
logger.Log.Fatalf("Error parsing max db size: %v\n", err)
return
}
@@ -48,14 +47,14 @@ func StartEnforcingDatabaseSize() {
if !ok {
return // closed channel
}
rlog.Errorf("filesystem watcher encountered error:%v", err)
logger.Log.Errorf("filesystem watcher encountered error:%v", err)
}
}
}()
err = watcher.Add(DBPath)
if err != nil {
log.Fatalf("Error adding %s to filesystem watcher for db size enforcement: %v\n", DBPath, err)
logger.Log.Fatalf("Error adding %s to filesystem watcher for db size enforcement: %v\n", DBPath, err)
}
}
@@ -73,7 +72,7 @@ func getMaxEntriesDBByteSize() (int64, error) {
func checkFileSize(maxSizeBytes int64) {
fileStat, err := os.Stat(DBPath)
if err != nil {
rlog.Errorf("Error checking %s file size: %v", DBPath, err)
logger.Log.Errorf("Error checking %s file size: %v", DBPath, err)
} else {
if fileStat.Size() > maxSizeBytes {
pruneOldEntries(fileStat.Size())
@@ -90,7 +89,7 @@ func pruneOldEntries(currentFileSize int64) {
rows, err := GetEntriesTable().Limit(10000).Order("id").Rows()
if err != nil {
rlog.Errorf("Error getting 10000 first db rows: %v", err)
logger.Log.Errorf("Error getting 10000 first db rows: %v", err)
return
}
@@ -103,7 +102,7 @@ func pruneOldEntries(currentFileSize int64) {
var entry tapApi.MizuEntry
err = DB.ScanRows(rows, &entry)
if err != nil {
rlog.Errorf("Error scanning db row: %v", err)
logger.Log.Errorf("Error scanning db row: %v", err)
continue
}
@@ -115,8 +114,8 @@ func pruneOldEntries(currentFileSize int64) {
GetEntriesTable().Where(entryIdsToRemove).Delete(tapApi.MizuEntry{})
// VACUUM causes sqlite to shrink the db file after rows have been deleted, the db file will not shrink without this
DB.Exec("VACUUM")
rlog.Errorf("Removed %d rows and cleared %s", len(entryIdsToRemove), units.BytesToHumanReadable(bytesToBeRemoved))
logger.Log.Errorf("Removed %d rows and cleared %s", len(entryIdsToRemove), units.BytesToHumanReadable(bytesToBeRemoved))
} else {
rlog.Error("Found no rows to remove when pruning")
logger.Log.Error("Found no rows to remove when pruning")
}
}

View File

@@ -22,11 +22,6 @@ type EntriesFilter struct {
Timestamp int64 `form:"timestamp" validate:"required,min=1"`
}
type HarFetchRequestQuery struct {
From int64 `form:"from"`
To int64 `form:"to"`
}
type WebSocketEntryMessage struct {
*shared.WebSocketMessageMetadata
Data *tapApi.BaseEntryDetails `json:"data,omitempty"`

View File

@@ -32,7 +32,7 @@ Now you will be able to import `github.com/up9inc/mizu/resolver` in any `.go` fi
errOut := make(chan error, 100)
k8sResolver, err := resolver.NewFromOutOfCluster("", errOut)
if err != nil {
rlog.Errorf("error creating k8s resolver %s", err)
logger.Log.Errorf("error creating k8s resolver %s", err)
}
ctx, cancel := context.WithCancel(context.Background())
@@ -40,15 +40,15 @@ k8sResolver.Start(ctx)
resolvedName := k8sResolver.Resolve("10.107.251.91") // will always return `nil` in real scenarios as the internal map takes a moment to populate after `Start` is called
if resolvedName != nil {
rlog.Errorf("resolved 10.107.251.91=%s", *resolvedName)
logger.Log.Errorf("resolved 10.107.251.91=%s", *resolvedName)
} else {
rlog.Error("Could not find a resolved name for 10.107.251.91")
logger.Log.Error("Could not find a resolved name for 10.107.251.91")
}
for {
select {
case err := <- errOut:
rlog.Errorf("name resolving error %s", err)
logger.Log.Errorf("name resolving error %s", err)
}
}
```

View File

@@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared/logger"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
cmap "github.com/orcaman/concurrent-map"
@@ -157,10 +157,10 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.nameMap.Remove(key)
rlog.Infof("setting %s=nil\n", key)
logger.Log.Infof("setting %s=nil\n", key)
} else {
resolver.nameMap.Set(key, resolved)
rlog.Infof("setting %s=%s\n", key, resolved)
logger.Log.Infof("setting %s=%s\n", key, resolved)
}
}
@@ -181,7 +181,7 @@ func (resolver *Resolver) infiniteErrorHandleRetryFunc(ctx context.Context, fun
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) {
if statusError.ErrStatus.Reason == metav1.StatusReasonForbidden {
rlog.Infof("Resolver loop encountered permission error, aborting event listening - %v\n", err)
logger.Log.Infof("Resolver loop encountered permission error, aborting event listening - %v\n", err)
return
}
}

View File

@@ -7,17 +7,8 @@ import (
// EntriesRoutes defines the group of har entries routes.
func EntriesRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/api")
routeGroup := ginApp.Group("/entries")
routeGroup.GET("/entries", controllers.GetEntries) // get entries (base/thin entries)
routeGroup.GET("/entries/:entryId", controllers.GetEntry) // get single (full) entry
routeGroup.GET("/exportEntries", controllers.GetFullEntries)
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
routeGroup.GET("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
routeGroup.GET("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB
routeGroup.GET("/tapStatus", controllers.GetTappingStatus) // get tapping status
routeGroup.GET("/analyzeStatus", controllers.AnalyzeInformation)
routeGroup.GET("/recentTLSLinks", controllers.GetRecentTLSLinks)
routeGroup.GET("/", controllers.GetEntries) // get entries (base/thin entries)
routeGroup.GET("/:entryId", controllers.GetEntry) // get single (full) entry
}

View File

@@ -9,8 +9,16 @@ func StatusRoutes(ginApp *gin.Engine) {
routeGroup := ginApp.Group("/status")
routeGroup.POST("/tappedPods", controllers.PostTappedPods)
routeGroup.GET("/tappersCount", controllers.GetTappersCount)
routeGroup.GET("/tap", controllers.GetTappingStatus)
routeGroup.GET("/auth", controllers.GetAuthStatus)
routeGroup.GET("/analyze", controllers.AnalyzeInformation)
routeGroup.GET("/general", controllers.GetGeneralStats) // get general stats about entries in DB
routeGroup.GET("/recentTLSLinks", controllers.GetRecentTLSLinks)
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
}

View File

@@ -8,7 +8,7 @@ import (
"regexp"
"strings"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared/logger"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
@@ -69,7 +69,7 @@ func MatchRequestPolicy(harEntry har.Entry, service string) (resultPolicyToSend
if err != nil {
continue
}
rlog.Info(matchValue, rule.Value)
logger.Log.Info(matchValue, rule.Value)
} else {
val := fmt.Sprint(out)
matchValue, err = regexp.MatchString(rule.Value, val)

View File

@@ -5,12 +5,7 @@ import (
"compress/zlib"
"encoding/json"
"fmt"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
tapApi "github.com/up9inc/mizu/tap/api"
"io/ioutil"
"log"
"mizuserver/pkg/database"
"mizuserver/pkg/utils"
"net/http"
@@ -18,6 +13,11 @@ import (
"regexp"
"strings"
"time"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
tapApi "github.com/up9inc/mizu/tap/api"
)
const (
@@ -75,9 +75,6 @@ func getAuthHeader(guestMode bool) string {
func GetTrafficDumpUrl(analyzeDestination string, analyzeModel string) *url.URL {
strUrl := fmt.Sprintf("https://traffic.%s/dumpTrafficBulk/%s", analyzeDestination, analyzeModel)
if strings.HasPrefix(analyzeDestination, "http") {
strUrl = fmt.Sprintf("%s/api/workspace/dumpTrafficBulk", analyzeDestination)
}
postUrl, _ := url.Parse(strUrl)
return postUrl
}
@@ -112,14 +109,14 @@ func GetAnalyzeInfo() *shared.AnalyzeStatus {
}
func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
rlog.Infof("Sync entries - started\n")
logger.Log.Infof("Sync entries - started\n")
var (
token, model string
guestMode bool
)
if syncEntriesConfig.Token == "" {
rlog.Infof("Sync entries - creating anonymous token. env %s\n", syncEntriesConfig.Env)
logger.Log.Infof("Sync entries - creating anonymous token. env %s\n", syncEntriesConfig.Env)
guestToken, err := createAnonymousToken(syncEntriesConfig.Env)
if err != nil {
return fmt.Errorf("failed creating anonymous token, err: %v", err)
@@ -132,6 +129,11 @@ func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
token = fmt.Sprintf("bearer %s", syncEntriesConfig.Token)
model = syncEntriesConfig.Workspace
guestMode = false
logger.Log.Infof("Sync entries - upserting model. env %s, model %s\n", syncEntriesConfig.Env, model)
if err := upsertModel(token, model, syncEntriesConfig.Env); err != nil {
return fmt.Errorf("failed upserting model, err: %v", err)
}
}
modelRegex, _ := regexp.Compile("[A-Za-z0-9][-A-Za-z0-9_.]*[A-Za-z0-9]+$")
@@ -139,12 +141,37 @@ func SyncEntries(syncEntriesConfig *shared.SyncEntriesConfig) error {
return fmt.Errorf("invalid model name, model name: %s", model)
}
rlog.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v\n", token, model, guestMode)
logger.Log.Infof("Sync entries - syncing. token: %s, model: %s, guest mode: %v\n", token, model, guestMode)
go syncEntriesImpl(token, model, syncEntriesConfig.Env, syncEntriesConfig.UploadIntervalSec, guestMode)
return nil
}
func upsertModel(token string, model string, envPrefix string) error {
upsertModelUrl, _ := url.Parse(fmt.Sprintf("https://trcc.%s/models/%s", envPrefix, model))
authHeader := getAuthHeader(false)
req := &http.Request{
Method: http.MethodPost,
URL: upsertModelUrl,
Header: map[string][]string{
authHeader: {token},
},
}
response, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed request to upsert model, err: %v", err)
}
// In case the model is not created (not 201) and doesn't exists (not 409)
if response.StatusCode != 201 && response.StatusCode != 409 {
return fmt.Errorf("failed request to upsert model, status code: %v", response.StatusCode)
}
return nil
}
func createAnonymousToken(envPrefix string) (*GuestToken, error) {
tokenUrl := fmt.Sprintf("https://trcc.%s/anonymous/token", envPrefix)
if strings.HasPrefix(envPrefix, "http") {
@@ -152,7 +179,7 @@ func createAnonymousToken(envPrefix string) (*GuestToken, error) {
}
token := &GuestToken{}
if err := getGuestToken(tokenUrl, token); err != nil {
rlog.Infof("Failed to get token, %s", err)
logger.Log.Infof("Failed to get token, %s", err)
return nil, err
}
return token, nil
@@ -164,7 +191,7 @@ func getGuestToken(url string, target *GuestToken) error {
return err
}
defer resp.Body.Close()
rlog.Infof("Got token from the server, starting to json decode... status code: %v", resp.StatusCode)
logger.Log.Infof("Got token from the server, starting to json decode... status code: %v", resp.StatusCode)
return json.NewDecoder(resp.Body).Decode(target)
}
@@ -182,7 +209,7 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
for {
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
rlog.Infof("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
logger.Log.Infof("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
protocolFilter := "http"
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, &protocolFilter)
@@ -207,13 +234,13 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
result = append(result, *harEntry)
}
rlog.Infof("About to upload %v entries\n", len(result))
logger.Log.Infof("About to upload %v entries\n", len(result))
body, jMarshalErr := json.Marshal(result)
if jMarshalErr != nil {
analyzeInformation.Reset()
rlog.Infof("Stopping sync entries")
log.Fatal(jMarshalErr)
logger.Log.Infof("Stopping sync entries")
logger.Log.Fatal(jMarshalErr)
}
var in bytes.Buffer
@@ -236,17 +263,17 @@ func syncEntriesImpl(token string, model string, envPrefix string, uploadInterva
if _, postErr := http.DefaultClient.Do(req); postErr != nil {
analyzeInformation.Reset()
rlog.Info("Stopping sync entries")
log.Fatal(postErr)
logger.Log.Info("Stopping sync entries")
logger.Log.Fatal(postErr)
}
analyzeInformation.SentCount += len(entriesArray)
rlog.Infof("Finish uploading %v entries to %s\n", len(entriesArray), GetTrafficDumpUrl(envPrefix, model))
logger.Log.Infof("Finish uploading %v entries to %s\n", len(entriesArray), GetTrafficDumpUrl(envPrefix, model))
} else {
rlog.Infof("Nothing to upload")
logger.Log.Infof("Nothing to upload")
}
rlog.Infof("Sleeping for %v...\n", sleepTime)
logger.Log.Infof("Sleeping for %v...\n", sleepTime)
time.Sleep(sleepTime)
timestampFrom = timestampTo
}

View File

@@ -4,12 +4,13 @@ import (
"bytes"
"errors"
"fmt"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
"strconv"
"strings"
"time"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
// Keep it because we might want cookies in the future
@@ -203,7 +204,7 @@ func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err e
if strings.HasPrefix(mimeType.(string), "application/grpc") {
status, err = strconv.Atoi(_status)
if err != nil {
rlog.Errorf("Failed converting status to int %s (%v,%+v)", err, err, err)
logger.Log.Errorf("Failed converting status to int %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response status to int for HAR")
}
}
@@ -224,13 +225,13 @@ func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err e
func NewEntry(pair *api.RequestResponsePair) (*har.Entry, error) {
harRequest, err := NewRequest(&pair.Request)
if err != nil {
rlog.Errorf("Failed converting request to HAR %s (%v,%+v)", err, err, err)
logger.Log.Errorf("Failed converting request to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting request to HAR")
}
harResponse, err := NewResponse(&pair.Response)
if err != nil {
rlog.Errorf("Failed converting response to HAR %s (%v,%+v)", err, err, err)
logger.Log.Errorf("Failed converting response to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response to HAR")
}

View File

@@ -3,10 +3,11 @@ package utils
import (
"context"
"fmt"
"github.com/romana/rlog"
"time"
loggerShared "github.com/up9inc/mizu/shared/logger"
"gorm.io/gorm/logger"
"gorm.io/gorm/utils"
"time"
)
// TruncatingLogger implements the gorm logger.Interface interface. Its purpose is to act as gorm's logger while truncating logs to a max of 50 characters to minimise the performance impact
@@ -24,21 +25,21 @@ func (truncatingLogger *TruncatingLogger) Info(_ context.Context, message string
if truncatingLogger.LogLevel < logger.Info {
return
}
rlog.Errorf("gorm info: %.150s", message)
loggerShared.Log.Errorf("gorm info: %.150s", message)
}
func (truncatingLogger *TruncatingLogger) Warn(_ context.Context, message string, __ ...interface{}) {
if truncatingLogger.LogLevel < logger.Warn {
return
}
rlog.Errorf("gorm warning: %.150s", message)
loggerShared.Log.Errorf("gorm warning: %.150s", message)
}
func (truncatingLogger *TruncatingLogger) Error(_ context.Context, message string, __ ...interface{}) {
if truncatingLogger.LogLevel < logger.Error {
return
}
rlog.Errorf("gorm error: %.150s", message)
loggerShared.Log.Errorf("gorm error: %.150s", message)
}
func (truncatingLogger *TruncatingLogger) Trace(ctx context.Context, begin time.Time, fc func() (string, int64), err error) {

View File

@@ -2,8 +2,7 @@ package utils
import (
"context"
"github.com/gin-gonic/gin"
"github.com/romana/rlog"
"fmt"
"net/http"
"net/url"
"os"
@@ -11,6 +10,10 @@ import (
"reflect"
"syscall"
"time"
"github.com/gin-gonic/gin"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
)
// StartServer starts the server with a graceful shutdown
@@ -28,16 +31,16 @@ func StartServer(app *gin.Engine) {
go func() {
_ = <-signals
rlog.Infof("Shutting down...")
logger.Log.Infof("Shutting down...")
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
_ = srv.Shutdown(ctx)
os.Exit(0)
}()
// Run server.
rlog.Infof("Starting the server...")
if err := app.Run(":8899"); err != nil {
rlog.Errorf("Server is not running! Reason: %v", err)
logger.Log.Infof("Starting the server...")
if err := app.Run(fmt.Sprintf(":%d", shared.DefaultApiServerPort)); err != nil {
logger.Log.Errorf("Server is not running! Reason: %v", err)
}
}
@@ -54,14 +57,14 @@ func ReverseSlice(data interface{}) {
func CheckErr(e error) {
if e != nil {
rlog.Errorf("%v", e)
logger.Log.Errorf("%v", e)
}
}
func SetHostname(address, newHostname string) string {
replacedUrl, err := url.Parse(address)
if err != nil {
rlog.Errorf("error replacing hostname to %s in address %s, returning original %v", newHostname, address, err)
logger.Log.Errorf("error replacing hostname to %s in address %s, returning original %v", newHostname, address, err)
return address
}
replacedUrl.Host = newHostname

View File

@@ -4,14 +4,15 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/shared"
"io/ioutil"
core "k8s.io/api/core/v1"
"net/http"
"net/url"
"time"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
core "k8s.io/api/core/v1"
)
type apiServerProvider struct {
@@ -85,7 +86,7 @@ func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, er
if !provider.isReady {
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
}
generalStatsUrl := fmt.Sprintf("%s/api/generalStats", provider.url)
generalStatsUrl := fmt.Sprintf("%s/status/general", provider.url)
response, requestErr := http.Get(generalStatsUrl)
if requestErr != nil {
@@ -108,7 +109,6 @@ func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, er
return generalStats, nil
}
func (provider *apiServerProvider) GetVersion() (string, error) {
if !provider.isReady {
return "", fmt.Errorf("trying to reach api server when not initialized yet")

View File

@@ -4,16 +4,17 @@ import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/uiUtils"
"golang.org/x/oauth2"
"net"
"net/http"
"os"
"time"
"github.com/google/uuid"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger"
"golang.org/x/oauth2"
)
const loginTimeoutInMin = 2

View File

@@ -11,9 +11,9 @@ import (
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger"
)
func GetApiServerUrl() string {
@@ -46,4 +46,3 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
cancel()
}
}

View File

@@ -2,13 +2,14 @@ package cmd
import (
"fmt"
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger"
)
var configCmd = &cobra.Command{

View File

@@ -2,15 +2,16 @@ package cmd
import (
"context"
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
)
var logsCmd = &cobra.Command{

View File

@@ -2,15 +2,16 @@ package cmd
import (
"fmt"
"time"
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/uiUtils"
"time"
"github.com/up9inc/mizu/shared/logger"
)
var rootCmd = &cobra.Command{
@@ -50,7 +51,7 @@ func Execute() {
if err := fsUtils.EnsureDir(mizu.GetMizuFolderPath()); err != nil {
logger.Log.Errorf("Failed to use mizu folder, %v", err)
}
logger.InitLogger()
logger.InitLogger(fsUtils.GetLogFilePath())
versionChan := make(chan string)
defer printNewVersionIfNeeded(versionChan)

View File

@@ -3,17 +3,18 @@ package cmd
import (
"errors"
"fmt"
"os"
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/auth"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"os"
"github.com/up9inc/mizu/shared/logger"
)
const uploadTrafficMessageToConfirm = `NOTE: running mizu with --%s flag will upload recorded traffic for further analysis and enriched presentation options.`

View File

@@ -16,8 +16,8 @@ import (
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/errormessage"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/goUtils"
@@ -25,6 +25,7 @@ import (
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/debounce"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
@@ -103,7 +104,7 @@ func RunMizuTap() {
}
defer finishMizuExecution(kubernetesProvider)
if err := createMizuResources(ctx, kubernetesProvider, mizuApiFilteringOptions, mizuValidationRules); err != nil {
if err := createMizuResources(ctx, kubernetesProvider, mizuValidationRules); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Error creating resources: %v", errormessage.FormatError(err)))
return
}
@@ -125,14 +126,14 @@ func readValidationRules(file string) (string, error) {
return string(newContent), nil
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions, mizuValidationRules string) error {
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuValidationRules string) error {
if !config.Config.IsNsRestrictedMode() {
if err := createMizuNamespace(ctx, kubernetesProvider); err != nil {
return err
}
}
if err := createMizuApiServer(ctx, kubernetesProvider, mizuApiFilteringOptions); err != nil {
if err := createMizuApiServer(ctx, kubernetesProvider); err != nil {
return err
}
@@ -153,7 +154,7 @@ func createMizuNamespace(ctx context.Context, kubernetesProvider *kubernetes.Pro
return err
}
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider, mizuApiFilteringOptions *api.TrafficFilteringOptions) error {
func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Provider) error {
var err error
state.mizuServiceAccountExists, err = createRBACIfNecessary(ctx, kubernetesProvider)
@@ -169,16 +170,15 @@ func createMizuApiServer(ctx context.Context, kubernetesProvider *kubernetes.Pro
}
opts := &kubernetes.ApiServerOptions{
Namespace: config.Config.MizuResourcesNamespace,
PodName: mizu.ApiServerPodName,
PodImage: config.Config.AgentImage,
ServiceAccountName: serviceAccountName,
IsNamespaceRestricted: config.Config.IsNsRestrictedMode(),
MizuApiFilteringOptions: mizuApiFilteringOptions,
SyncEntriesConfig: getSyncEntriesConfig(),
MaxEntriesDBSizeBytes: config.Config.Tap.MaxEntriesDBSizeBytes(),
Resources: config.Config.Tap.ApiServerResources,
ImagePullPolicy: config.Config.ImagePullPolicy(),
Namespace: config.Config.MizuResourcesNamespace,
PodName: mizu.ApiServerPodName,
PodImage: config.Config.AgentImage,
ServiceAccountName: serviceAccountName,
IsNamespaceRestricted: config.Config.IsNsRestrictedMode(),
SyncEntriesConfig: getSyncEntriesConfig(),
MaxEntriesDBSizeBytes: config.Config.Tap.MaxEntriesDBSizeBytes(),
Resources: config.Config.Tap.ApiServerResources,
ImagePullPolicy: config.Config.ImagePullPolicy(),
}
_, err = kubernetesProvider.CreateMizuApiServerPod(ctx, opts)
if err != nil {
@@ -296,7 +296,7 @@ func cleanUpMizuResources(ctx context.Context, cancel context.CancelFunc, kubern
}
if len(leftoverResources) > 0 {
errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", logger.GetLogFilePath())
errMsg := fmt.Sprintf("Failed to remove the following resources, for more info check logs at %s:", fsUtils.GetLogFilePath())
for _, resource := range leftoverResources {
errMsg += "\n- " + resource
}
@@ -563,14 +563,14 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
if modifiedPod.Status.Phase == core.PodPending {
if modifiedPod.Status.Conditions[0].Type == core.PodScheduled && modifiedPod.Status.Conditions[0].Status != core.ConditionTrue {
logger.Log.Debugf("Wasn't able to deploy the API server. Reason: \"%s\"", modifiedPod.Status.Conditions[0].Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", logger.GetLogFilePath()))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
}
if len(modifiedPod.Status.ContainerStatuses) > 0 && modifiedPod.Status.ContainerStatuses[0].State.Waiting != nil && modifiedPod.Status.ContainerStatuses[0].State.Waiting.Reason == "ErrImagePull" {
logger.Log.Debugf("Wasn't able to deploy the API server. (ErrImagePull) Reason: \"%s\"", modifiedPod.Status.ContainerStatuses[0].State.Waiting.Message)
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", logger.GetLogFilePath()))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Wasn't able to deploy the API server: failed to pull the image, for more info check logs at %v", fsUtils.GetLogFilePath()))
cancel()
break
}
@@ -582,7 +582,7 @@ func watchApiServerPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
url := GetApiServerUrl()
if err := apiserver.Provider.InitAndTestConnection(url); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath()))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
cancel()
break
}

View File

@@ -1,13 +1,14 @@
package cmd
import (
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/telemetry"
"strconv"
"time"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/shared/logger"
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/mizu"

View File

@@ -8,10 +8,11 @@ import (
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/logger"
)
func runMizuView() {
@@ -50,7 +51,7 @@ func runMizuView() {
go startProxyReportErrorIfAny(kubernetesProvider, cancel)
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", logger.GetLogFilePath()))
logger.Log.Errorf(uiUtils.Error, fmt.Sprintf("Couldn't connect to API server, for more info check logs at %s", fsUtils.GetLogFilePath()))
return
}
}

View File

@@ -3,14 +3,15 @@ package config
import (
"errors"
"fmt"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/shared"
"io/ioutil"
"os"
"reflect"
"strconv"
"strings"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/shared/logger"
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

View File

@@ -12,7 +12,7 @@ import (
"strconv"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/shared/logger"
"io"
@@ -146,28 +146,22 @@ func (provider *Provider) CreateNamespace(ctx context.Context, name string) (*co
}
type ApiServerOptions struct {
Namespace string
PodName string
PodImage string
ServiceAccountName string
IsNamespaceRestricted bool
MizuApiFilteringOptions *api.TrafficFilteringOptions
SyncEntriesConfig *shared.SyncEntriesConfig
MaxEntriesDBSizeBytes int64
Resources configStructs.Resources
ImagePullPolicy core.PullPolicy
Namespace string
PodName string
PodImage string
ServiceAccountName string
IsNamespaceRestricted bool
SyncEntriesConfig *shared.SyncEntriesConfig
MaxEntriesDBSizeBytes int64
Resources configStructs.Resources
ImagePullPolicy core.PullPolicy
}
func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiServerOptions) (*core.Pod, error) {
marshaledFilteringOptions, err := json.Marshal(opts.MizuApiFilteringOptions)
if err != nil {
return nil, err
}
var marshaledSyncEntriesConfig []byte
if opts.SyncEntriesConfig != nil {
marshaledSyncEntriesConfig, err = json.Marshal(opts.SyncEntriesConfig)
if err != nil {
var err error
if marshaledSyncEntriesConfig, err = json.Marshal(opts.SyncEntriesConfig); err != nil {
return nil, err
}
}
@@ -199,6 +193,8 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
command = append(command, "--namespace", opts.Namespace)
}
port := intstr.FromInt(shared.DefaultApiServerPort)
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: opts.PodName,
@@ -219,14 +215,6 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
},
Command: command,
Env: []core.EnvVar{
{
Name: shared.HostModeEnvVar,
Value: "1",
},
{
Name: shared.MizuFilteringOptionsEnvVar,
Value: string(marshaledFilteringOptions),
},
{
Name: shared.SyncEntriesConfigEnvVar,
Value: string(marshaledSyncEntriesConfig),
@@ -246,6 +234,25 @@ func (provider *Provider) CreateMizuApiServerPod(ctx context.Context, opts *ApiS
"memory": memRequests,
},
},
ReadinessProbe: &core.Probe{
Handler: core.Handler{
TCPSocket: &core.TCPSocketAction{
Port: port,
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
},
LivenessProbe: &core.Probe{
Handler: core.Handler{
HTTPGet: &core.HTTPGetAction{
Path: "/echo",
Port: port,
},
},
InitialDelaySeconds: 5,
PeriodSeconds: 10,
},
},
},
Volumes: []core.Volume{
@@ -274,7 +281,7 @@ func (provider *Provider) CreateService(ctx context.Context, namespace string, s
Namespace: namespace,
},
Spec: core.ServiceSpec{
Ports: []core.ServicePort{{TargetPort: intstr.FromInt(8899), Port: 80}},
Ports: []core.ServicePort{{TargetPort: intstr.FromInt(shared.DefaultApiServerPort), Port: 80}},
Type: core.ServiceTypeClusterIP,
Selector: map[string]string{"app": appLabelValue},
},

View File

@@ -2,12 +2,13 @@ package kubernetes
import (
"fmt"
"github.com/up9inc/mizu/cli/logger"
"k8s.io/kubectl/pkg/proxy"
"net"
"net/http"
"strings"
"time"
"github.com/up9inc/mizu/shared/logger"
"k8s.io/kubectl/pkg/proxy"
)
const k8sProxyApiPrefix = "/"

View File

@@ -4,14 +4,20 @@ import (
"archive/zip"
"context"
"fmt"
"os"
"path"
"regexp"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"os"
"regexp"
"github.com/up9inc/mizu/shared/logger"
)
func GetLogFilePath() string {
return path.Join(mizu.GetMizuFolderPath(), "mizu_cli.log")
}
func DumpLogs(ctx context.Context, provider *kubernetes.Provider, filePath string) error {
podExactRegex := regexp.MustCompile("^" + mizu.MizuResourcesPrefix)
pods, err := provider.ListAllPodsMatchingRegex(ctx, podExactRegex, []string{config.Config.MizuResourcesNamespace})
@@ -66,10 +72,10 @@ func DumpLogs(ctx context.Context, provider *kubernetes.Provider, filePath strin
logger.Log.Debugf("Successfully added file %s", config.Config.ConfigFilePath)
}
if err := AddFileToZip(zipWriter, logger.GetLogFilePath()); err != nil {
if err := AddFileToZip(zipWriter, GetLogFilePath()); err != nil {
logger.Log.Debugf("Failed write file, %v", err)
} else {
logger.Log.Debugf("Successfully added file %s", logger.GetLogFilePath())
logger.Log.Debugf("Successfully added file %s", GetLogFilePath())
}
logger.Log.Infof("You can find the zip file with all logs in %s\n", filePath)

View File

@@ -3,11 +3,12 @@ package fsUtils
import (
"archive/zip"
"fmt"
"github.com/up9inc/mizu/cli/logger"
"io"
"os"
"path/filepath"
"strings"
"github.com/up9inc/mizu/shared/logger"
)
func AddFileToZip(zipWriter *zip.Writer, filename string) error {

View File

@@ -1,9 +1,10 @@
package goUtils
import (
"github.com/up9inc/mizu/cli/logger"
"reflect"
"runtime/debug"
"github.com/up9inc/mizu/shared/logger"
)
func HandleExcWrapper(fn interface{}, params ...interface{}) (result []reflect.Value) {

View File

@@ -3,15 +3,16 @@ package version
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"io/ioutil"
"net/http"
"runtime"
"strings"
"time"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/mizu"
"github.com/up9inc/mizu/shared/logger"
"github.com/google/go-github/v37/github"
"github.com/up9inc/mizu/cli/uiUtils"
"github.com/up9inc/mizu/shared/semver"

View File

@@ -4,12 +4,13 @@ import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"github.com/denisbrodbeck/machineid"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu"
"net/http"
"github.com/up9inc/mizu/shared/logger"
)
const telemetryUrl = "https://us-east4-up9-prod.cloudfunctions.net/mizu-telemetry"

View File

@@ -3,9 +3,10 @@ package uiUtils
import (
"bufio"
"fmt"
"log"
"os"
"strings"
"github.com/up9inc/mizu/shared/logger"
)
func AskForConfirmation(s string) bool {
@@ -15,7 +16,7 @@ func AskForConfirmation(s string) bool {
response, err := reader.ReadString('\n')
if err != nil {
log.Fatal(err)
logger.Log.Fatalf("Error while reading confirmation string, err: %v", err)
}
response = strings.ToLower(strings.TrimSpace(response))
if response == "" || response == "y" || response == "yes" {

View File

@@ -2,9 +2,10 @@ package uiUtils
import (
"fmt"
"github.com/up9inc/mizu/cli/logger"
"os/exec"
"runtime"
"github.com/up9inc/mizu/shared/logger"
)
func OpenBrowser(url string) {

View File

@@ -10,4 +10,5 @@ const (
RulePolicyPath = "/app/enforce-policy/"
RulePolicyFileName = "enforce-policy.yaml"
GoGCEnvVar = "GOGC"
DefaultApiServerPort = 8899
)

View File

@@ -5,5 +5,6 @@ go 1.16
require (
github.com/docker/go-units v0.4.0
github.com/golang-jwt/jwt/v4 v4.1.0
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b
)

View File

@@ -2,6 +2,8 @@ github.com/docker/go-units v0.4.0 h1:3uh0PgVws3nIA0Q+MwDC8yjEPf9zjRfZZWXZYDct3Tw
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/golang-jwt/jwt/v4 v4.1.0 h1:XUgk2Ex5veyVFVeLm0xhusUTQybEbexJXrvPNOKkSY0=
github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 h1:lDH9UUVJtmYCjyT0CI4q8xvlXPxeZ0gYCVvWbmPlp88=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=

View File

@@ -1,24 +1,18 @@
package logger
import (
"github.com/op/go-logging"
"github.com/up9inc/mizu/cli/mizu"
"os"
"path"
"github.com/op/go-logging"
)
var Log = logging.MustGetLogger("mizu_cli")
var Log = logging.MustGetLogger("mizu")
var format = logging.MustStringFormatter(
`%{time} %{level:.5s} ▶ %{pid} %{shortfile} %{shortfunc} ▶ %{message}`,
)
func GetLogFilePath() string {
return path.Join(mizu.GetMizuFolderPath(), "mizu_cli.log")
}
func InitLogger() {
logPath := GetLogFilePath()
func InitLogger(logPath string) {
f, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
Log.Infof("Failed to open mizu log file: %v, err %v", logPath, err)
@@ -33,7 +27,15 @@ func InitLogger() {
backend1Leveled.SetLevel(logging.INFO, "")
logging.SetBackend(backend1Leveled, backend2Formatter)
Log.Debugf("\n\n\n")
Log.Debugf("Running mizu version %v", mizu.SemVer)
}
func InitLoggerStderrOnly() {
consoleLog := logging.NewLogBackend(os.Stderr, "", 0)
backend1Formatter := logging.NewBackendFormatter(consoleLog, format)
backend1Leveled := logging.AddModuleLevel(consoleLog)
backend1Leveled.SetLevel(logging.DEBUG, "")
logging.SetBackend(backend1Leveled, backend1Formatter)
}

View File

@@ -5,7 +5,7 @@ import (
"time"
"github.com/google/gopacket/reassembly"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
@@ -28,7 +28,7 @@ func (cl *Cleaner) clean() {
startCleanTime := time.Now()
cl.assemblerMutex.Lock()
rlog.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump())
logger.Log.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump())
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock()
@@ -38,7 +38,7 @@ func (cl *Cleaner) clean() {
}
cl.statsMutex.Lock()
rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())
logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())
cl.stats.flushed += flushed
cl.stats.closed += closed
cl.statsMutex.Unlock()

View File

@@ -14,7 +14,6 @@ import (
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"runtime"
@@ -25,14 +24,13 @@ import (
"sync"
"time"
"github.com/romana/rlog"
"github.com/google/gopacket"
"github.com/google/gopacket/examples/util"
"github.com/google/gopacket/ip4defrag"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/pcap"
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
@@ -102,7 +100,7 @@ const baseStreamChannelTimeoutMs int = 5000 * 100
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments log.Printf
* s, a: arguments logger.Log.Infof
* Note: Too bad for perf that a... is evaluated
*/
func logError(minOutputLevel int, t string, s string, a ...interface{}) {
@@ -114,7 +112,7 @@ func logError(minOutputLevel int, t string, s string, a ...interface{}) {
if outputLevel >= minOutputLevel {
formatStr := fmt.Sprintf("%s: %s", t, s)
rlog.Errorf(formatStr, a...)
logger.Log.Errorf(formatStr, a...)
}
}
func Error(t string, s string, a ...interface{}) {
@@ -124,10 +122,10 @@ func SilentError(t string, s string, a ...interface{}) {
logError(2, t, s, a...)
}
func Debug(s string, a ...interface{}) {
rlog.Debugf(s, a...)
logger.Log.Debugf(s, a...)
}
func Trace(s string, a ...interface{}) {
rlog.Tracef(1, s, a...)
logger.Log.Infof(s, a...)
}
func inArrayInt(arr []int, valueToCheck int) bool {
@@ -188,11 +186,11 @@ func startMemoryProfiler() {
}
}
rlog.Info("Profiling is on, results will be written to %s", dumpPath)
logger.Log.Info("Profiling is on, results will be written to %s", dumpPath)
go func() {
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
if err := os.Mkdir(dumpPath, 0777); err != nil {
log.Fatal("could not create directory for profile: ", err)
logger.Log.Fatal("could not create directory for profile: ", err)
}
}
@@ -201,15 +199,15 @@ func startMemoryProfiler() {
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
rlog.Infof("Writing memory profile to %s\n", filename)
logger.Log.Infof("Writing memory profile to %s\n", filename)
f, err := os.Create(filename)
if err != nil {
log.Fatal("could not create memory profile: ", err)
logger.Log.Fatal("could not create memory profile: ", err)
}
runtime.GC() // get up-to-date statistics
if err := pprof.WriteHeapProfile(f); err != nil {
log.Fatal("could not write memory profile: ", err)
logger.Log.Fatal("could not write memory profile: ", err)
}
_ = f.Close()
time.Sleep(time.Second * time.Duration(timeInterval))
@@ -229,7 +227,7 @@ func closeTimedoutTcpStreamChannels() {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
stream.Close()
appStats.IncDroppedTcpStreams()
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {
@@ -254,7 +252,6 @@ func closeTimedoutTcpStreamChannels() {
}
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
go closeTimedoutTcpStreamChannels()
defer util.Run()()
@@ -269,8 +266,8 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
if localhostIPs, err := getLocalhostIPs(); err != nil {
// TODO: think this over
rlog.Info("Failed to get self IP addresses")
rlog.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
logger.Log.Info("Failed to get self IP addresses")
logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
ownIps = make([]string, 0)
} else {
ownIps = localhostIPs
@@ -280,7 +277,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
var err error
if *fname != "" {
if handle, err = pcap.OpenOffline(*fname); err != nil {
log.Fatalf("PCAP OpenOffline error: %v", err)
logger.Log.Fatalf("PCAP OpenOffline error: %v", err)
}
} else {
// This is a little complicated because we want to allow all possible options
@@ -288,33 +285,33 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
// just call pcap.OpenLive if you want a simple handle.
inactive, err := pcap.NewInactiveHandle(*iface)
if err != nil {
log.Fatalf("could not create: %v", err)
logger.Log.Fatalf("could not create: %v", err)
}
defer inactive.CleanUp()
if err = inactive.SetSnapLen(*snaplen); err != nil {
log.Fatalf("could not set snap length: %v", err)
logger.Log.Fatalf("could not set snap length: %v", err)
} else if err = inactive.SetPromisc(*promisc); err != nil {
log.Fatalf("could not set promisc mode: %v", err)
logger.Log.Fatalf("could not set promisc mode: %v", err)
} else if err = inactive.SetTimeout(time.Second); err != nil {
log.Fatalf("could not set timeout: %v", err)
logger.Log.Fatalf("could not set timeout: %v", err)
}
if *tstype != "" {
if t, err := pcap.TimestampSourceFromString(*tstype); err != nil {
log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
} else if err := inactive.SetTimestampSource(t); err != nil {
log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps())
}
}
if handle, err = inactive.Activate(); err != nil {
log.Fatalf("PCAP Activate error: %v", err)
logger.Log.Fatalf("PCAP Activate error: %v", err)
}
defer handle.Close()
}
if len(flag.Args()) > 0 {
bpffilter := strings.Join(flag.Args(), " ")
rlog.Infof("Using BPF filter %q", bpffilter)
logger.Log.Infof("Using BPF filter %q", bpffilter)
if err = handle.SetBPFFilter(bpffilter); err != nil {
log.Fatalf("BPF filter error: %v", err)
logger.Log.Fatalf("BPF filter error: %v", err)
}
}
@@ -325,12 +322,12 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
decoderName = fmt.Sprintf("%s", handle.LinkType())
}
if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok {
log.Fatalln("No decoder named", decoderName)
logger.Log.Fatal("No decoder named", decoderName)
}
source := gopacket.NewPacketSource(handle, dec)
source.Lazy = *lazy
source.NoCopy = true
rlog.Info("Starting to read packets")
logger.Log.Info("Starting to read packets")
appStats.SetStartTime(time.Now())
defragger := ip4defrag.NewIPv4Defragmenter()
@@ -347,7 +344,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection()
maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal()
rlog.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection)
logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection)
assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal
assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection
@@ -377,7 +374,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
errorMapLen := len(errorsMap)
errorsSummery := fmt.Sprintf("%v", errorsMap)
errorsMapMutex.Unlock()
log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(appStats.StartTime),
nErrors,
errorMapLen,
@@ -387,7 +384,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
// At this moment
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
log.Printf(
logger.Log.Infof(
"mem: %d, goroutines: %d",
memStats.HeapAlloc,
runtime.NumGoroutine(),
@@ -395,7 +392,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
// Since the last print
cleanStats := cleaner.dumpStats()
log.Printf(
logger.Log.Infof(
"cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d",
cleanStats.flushed,
cleanStats.closed,
@@ -403,7 +400,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
)
currentAppStats := appStats.DumpStats()
appStatsJSON, _ := json.Marshal(currentAppStats)
log.Printf("app stats - %v", string(appStatsJSON))
logger.Log.Infof("app stats - %v", string(appStatsJSON))
}
}()
@@ -416,15 +413,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
if err == io.EOF {
break
} else if err != nil {
rlog.Debugf("Error:", err)
logger.Log.Debugf("Error: %v", err)
continue
}
packetsCount := appStats.IncPacketsCount()
rlog.Debugf("PACKET #%d", packetsCount)
logger.Log.Debugf("PACKET #%d", packetsCount)
data := packet.Data()
appStats.UpdateProcessedBytes(uint64(len(data)))
if *hexdumppkt {
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
// defrag the IPv4 packet if required
@@ -437,17 +434,17 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
l := ip4.Length
newip4, err := defragger.DefragIPv4(ip4)
if err != nil {
log.Fatalln("Error while de-fragmenting", err)
logger.Log.Fatal("Error while de-fragmenting", err)
} else if newip4 == nil {
rlog.Debugf("Fragment...")
logger.Log.Debugf("Fragment...")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
stats.ipdefrag++
rlog.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType())
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
log.Panic("Not a PacketBuilder")
logger.Log.Panic("Not a PacketBuilder")
}
nextDecoder := newip4.NextLayerType()
_ = nextDecoder.Decode(newip4.Payload, pb)
@@ -461,14 +458,14 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
if err != nil {
log.Fatalf("Failed to set network layer for checksum: %s\n", err)
logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err)
}
}
c := Context{
CaptureInfo: packet.Metadata().CaptureInfo,
}
stats.totalsz += len(tcp.Payload)
rlog.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort)
assemblerMutex.Lock()
assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
assemblerMutex.Unlock()
@@ -479,7 +476,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
errorsMapMutex.Lock()
errorMapLen := len(errorsMap)
errorsMapMutex.Unlock()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
appStats.PacketsCount,
appStats.ProcessedBytes,
time.Since(appStats.StartTime),
@@ -488,7 +485,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
}
select {
case <-signalChan:
log.Printf("Caught SIGINT: aborting")
logger.Log.Infof("Caught SIGINT: aborting")
done = true
default:
// NOP: continue
@@ -501,7 +498,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
assemblerMutex.Lock()
closed := assembler.FlushAll()
assemblerMutex.Unlock()
rlog.Debugf("Final flush: %d closed", closed)
logger.Log.Debugf("Final flush: %d closed", closed)
if outputLevel >= 2 {
streamPool.Dump()
}
@@ -509,7 +506,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
if *memprofile != "" {
f, err := os.Create(*memprofile)
if err != nil {
log.Fatal(err)
logger.Log.Fatal(err)
}
_ = pprof.WriteHeapProfile(f)
_ = f.Close()
@@ -517,29 +514,29 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
streamFactory.WaitGoRoutines()
assemblerMutex.Lock()
rlog.Debugf("%s", assembler.Dump())
logger.Log.Debugf("%s", assembler.Dump())
assemblerMutex.Unlock()
if !*nodefrag {
log.Printf("IPdefrag:\t\t%d", stats.ipdefrag)
logger.Log.Infof("IPdefrag:\t\t%d", stats.ipdefrag)
}
log.Printf("TCP stats:")
log.Printf(" missed bytes:\t\t%d", stats.missedBytes)
log.Printf(" total packets:\t\t%d", stats.pkt)
log.Printf(" rejected FSM:\t\t%d", stats.rejectFsm)
log.Printf(" rejected Options:\t%d", stats.rejectOpt)
log.Printf(" reassembled bytes:\t%d", stats.sz)
log.Printf(" total TCP bytes:\t%d", stats.totalsz)
log.Printf(" conn rejected FSM:\t%d", stats.rejectConnFsm)
log.Printf(" reassembled chunks:\t%d", stats.reassembled)
log.Printf(" out-of-order packets:\t%d", stats.outOfOrderPackets)
log.Printf(" out-of-order bytes:\t%d", stats.outOfOrderBytes)
log.Printf(" biggest-chunk packets:\t%d", stats.biggestChunkPackets)
log.Printf(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
log.Printf(" overlap packets:\t%d", stats.overlapPackets)
log.Printf(" overlap bytes:\t\t%d", stats.overlapBytes)
log.Printf("Errors: %d", nErrors)
logger.Log.Infof("TCP stats:")
logger.Log.Infof(" missed bytes:\t\t%d", stats.missedBytes)
logger.Log.Infof(" total packets:\t\t%d", stats.pkt)
logger.Log.Infof(" rejected FSM:\t\t%d", stats.rejectFsm)
logger.Log.Infof(" rejected Options:\t%d", stats.rejectOpt)
logger.Log.Infof(" reassembled bytes:\t%d", stats.sz)
logger.Log.Infof(" total TCP bytes:\t%d", stats.totalsz)
logger.Log.Infof(" conn rejected FSM:\t%d", stats.rejectConnFsm)
logger.Log.Infof(" reassembled chunks:\t%d", stats.reassembled)
logger.Log.Infof(" out-of-order packets:\t%d", stats.outOfOrderPackets)
logger.Log.Infof(" out-of-order bytes:\t%d", stats.outOfOrderBytes)
logger.Log.Infof(" biggest-chunk packets:\t%d", stats.biggestChunkPackets)
logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
logger.Log.Infof(" overlap packets:\t%d", stats.overlapPackets)
logger.Log.Infof(" overlap bytes:\t\t%d", stats.overlapBytes)
logger.Log.Infof("Errors: %d", nErrors)
for e := range errorsMap {
log.Printf(" %s:\t\t%d", e, errorsMap[e])
logger.Log.Infof(" %s:\t\t%d", e, errorsMap[e])
}
log.Printf("AppStats: %v", GetStats())
logger.Log.Infof("AppStats: %v", GetStats())
}

View File

@@ -9,7 +9,7 @@ import (
"time"
"github.com/bradleyfalzon/tlsx"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
)
@@ -79,7 +79,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
clientHello := tlsx.ClientHello{}
err := clientHello.Unmarshall(msg.bytes)
if err == nil {
rlog.Debugf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
logger.Log.Debugf("Detected TLS client hello with SNI %s\n", clientHello.SNI)
// TODO: Throws `panic: runtime error: invalid memory address or nil pointer dereference` error.
// numericPort, _ := strconv.Atoi(h.tcpID.DstPort)
// h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol)

View File

@@ -5,7 +5,7 @@ import (
"sync"
"time"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
"github.com/google/gopacket"
@@ -33,7 +33,7 @@ var streams *sync.Map = &sync.Map{} // global
var streamId int64 = 0
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
rlog.Debugf("* NEW: %s %s", net, transport)
logger.Log.Debugf("* NEW: %s %s", net, transport)
fsmOptions := reassembly.TCPSimpleFSMOptions{
SupportMissingEstablishment: *allowmissinginit,
}
@@ -123,21 +123,21 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
if hostMode {
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort))
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort))
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayString(gSettings.filterAuthorities, dstIP) {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP))
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP))
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s:%s", srcIP, srcPort))
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s:%s", srcIP, srcPort))
return &streamProps{isTapTarget: true, isOutgoing: true}
} else if inArrayString(gSettings.filterAuthorities, srcIP) {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host4 %s", srcIP))
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host4 %s", srcIP))
return &streamProps{isTapTarget: true, isOutgoing: true}
}
return &streamProps{isTapTarget: false, isOutgoing: false}
} else {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort))
logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort))
return &streamProps{isTapTarget: true}
}
}

View File

@@ -35,7 +35,7 @@ const App = () => {
try {
const recentTLSLinks = await api.getRecentTLSLinks();
if (recentTLSLinks?.length > 0) {
setAddressesWithTLS(new Set([...addressesWithTLS, ...recentTLSLinks]));
setAddressesWithTLS(new Set(recentTLSLinks));
setShowTLSWarning(true);
}
const auth = await api.getAuthStatus();
@@ -45,7 +45,7 @@ const App = () => {
}
})();
},[]);
}, []);
const onTLSDetected = (destAddress: string) => {
addressesWithTLS.add(destAddress);

View File

@@ -22,27 +22,27 @@ export default class Api {
}
tapStatus = async () => {
const response = await this.client.get("/api/tapStatus");
const response = await this.client.get("/status/tap");
return response.data;
}
analyzeStatus = async () => {
const response = await this.client.get("/api/analyzeStatus");
const response = await this.client.get("/status/analyze");
return response.data;
}
getEntry = async (entryId) => {
const response = await this.client.get(`/api/entries/${entryId}`);
const response = await this.client.get(`/entries/${entryId}`);
return response.data;
}
fetchEntries = async (operator, timestamp) => {
const response = await this.client.get(`/api/entries?limit=50&operator=${operator}&timestamp=${timestamp}`);
const response = await this.client.get(`/entries?limit=50&operator=${operator}&timestamp=${timestamp}`);
return response.data;
}
getRecentTLSLinks = async () => {
const response = await this.client.get("/api/recentTLSLinks");
const response = await this.client.get("/status/recentTLSLinks");
return response.data;
}