Compare commits

..

15 Commits

Author SHA1 Message Date
M. Mert Yıldıran
7dca1ad889 Move stats_tracker.go into the extension API and increment MatchedPairs from inside the Emit method (#272)
* Move `stats_tracker.go` into the extension API and increment `MatchedPairs` from inside the `Emit` method

* Replace multiple `sync.Mutex`(es) with low-level atomic memory primitives
2021-09-13 16:22:16 +03:00
M. Mert Yıldıran
616eccb2cf Make ScrollableFeed virtualized by replacing react-scrollable-feed with react-scrollable-feed-virtualized (#268)
* Make `ScrollableFeed` virtualized by replacing `react-scrollable-feed` with `react-scrollable-feed-virtualized`

* fix get new entries button

* Fix the not populated `Protocol` struct in case of `GetEntries` endpoint is called

Co-authored-by: Liraz Yehezkel <lirazy@up9.com>
2021-09-13 16:18:43 +03:00
M. Mert Yıldıran
30f07479cb Fix the JSON style to camelCase and rename CONTRIBUTE.md to CONTRIBUTING.md (#274)
* Fix the JSON style to camelCase and rename `CONTRIBUTE.md` to `CONTRIBUTING.md`

* Move `CONTRIBUTING.md` to `.github/` directory
2021-09-13 10:55:46 +03:00
M. Mert Yıldıran
7f880417e9 Add CODE_OF_CONDUCT.md (#275) 2021-09-13 10:28:28 +03:00
RoyUP9
6b52458642 removed exit if browser not supported (#270) 2021-09-12 16:09:04 +03:00
M. Mert Yıldıran
858a64687d Stop the hanging Goroutines by dropping the old, unidentified TCP streams (#260)
* Close the hanging TCP message channels after a dynamically aligned timeout (base `10000` milliseconds)

* Bring back `source.Lazy`

* Add a one more `sync.Map.Delete` call

* Improve the formula by taking base Goroutine count into account

* Reduce duplication

* Include the dropped TCP streams count into the stats tracker and print a debug log whenever it happens

* Add `superIdentifier` field to `tcpStream` to check if it has identified

Also stop the other protocol dissectors if a TCP stream identified by a protocol.

* Take one step forward in fixing the channel closing issue (WIP)

Add `sync.Mutex` to `tcpReader` and make the loops reference based.

* Fix the channel closing issue

* Improve the accuracy of the formula, log better and multiply `baseStreamChannelTimeoutMs` by 100

* Remove `fmt.Printf`

* Replace `runtime.Gosched()` with `time.Sleep(1 * time.Millisecond)`

* Close the channels of other protocols in case of an identification

* Simplify the logic

* Replace the formula with hard timeout 5000 milliseconds and 4000 maximum number of Goroutines
2021-09-12 08:26:48 +03:00
M. Mert Yıldıran
819ccf54cd Fix the build error in PR validation (#269)
* Fix the build error in PR validation (temp)

* Set Go version to 1.16

* Revert "Fix the build error in PR validation (temp)"

This reverts commit 4cb613251c.
2021-09-11 21:23:15 +03:00
M. Mert Yıldıran
7cc077c8a0 Fix the memory exhaustion by optimizing max. AMQP message size and GOGC (#257)
* Permanently resolve the memory exhaustion in AMQP

Introduce;
- `MEMORY_PROFILING_DUMP_PATH`
- `MEMORY_PROFILING_TIME_INTERVAL`
environment variables and make `startMemoryProfiler` method more parameterized.

* Fix a leak in HTTP

* Revert "Fix a leak in HTTP"

This reverts commit 9d46820ff3.

* Set maximum AMQP message size to 16MB

* Set `GOGC` to 12800

* Remove some commented out lines and an unnecessary `else if`
2021-09-09 17:45:37 +03:00
Igor Gov
fae5f22d25 Adding a download command if new mizu version is detected (#267) 2021-09-08 22:28:07 +03:00
M. Mert Yıldıran
eba7a3b476 Temporarily fix the filtering into its original state (#266) 2021-09-07 09:50:33 +03:00
RoyUP9
cf231538f4 added panic catch on tests (#265) 2021-09-06 11:42:47 +03:00
gadotroee
073b0b72d3 Remove fetch command (and direction) (#264) 2021-09-06 10:16:04 +03:00
gadotroee
c8705822b3 TRA-3658 - Fix analysis feature (#261) 2021-09-06 09:46:49 +03:00
M. Mert Yıldıran
d4436d9f15 Turn table and body strings to constants and move them to extension API (#262) 2021-09-05 06:44:16 +03:00
M. Mert Yıldıran
4e0ff74944 Fix body size, receive (elapsed time) and timestamps (#258)
* Fix the HTTP body size (it's not applicable to AMQP and Kafka)

* Fix the elapsed time

* Change JSON fields from snake_case to camelCase
2021-09-04 17:15:39 +03:00
53 changed files with 1167 additions and 23581 deletions

76
.github/CODE_OF_CONDUCT.md vendored Normal file
View File

@@ -0,0 +1,76 @@
# Contributor Covenant Code of Conduct
## Our Pledge
In the interest of fostering an open and welcoming environment, we as
contributors and maintainers pledge to making participation in our project and
our community a harassment-free experience for everyone, regardless of age, body
size, disability, ethnicity, sex characteristics, gender identity and expression,
level of experience, education, socio-economic status, nationality, personal
appearance, race, religion, or sexual identity and orientation.
## Our Standards
Examples of behavior that contributes to creating a positive environment
include:
* Using welcoming and inclusive language
* Being respectful of differing viewpoints and experiences
* Gracefully accepting constructive criticism
* Focusing on what is best for the community
* Showing empathy towards other community members
Examples of unacceptable behavior by participants include:
* The use of sexualized language or imagery and unwelcome sexual attention or
advances
* Trolling, insulting/derogatory comments, and personal or political attacks
* Public or private harassment
* Publishing others' private information, such as a physical or electronic
address, without explicit permission
* Other conduct which could reasonably be considered inappropriate in a
professional setting
## Our Responsibilities
Project maintainers are responsible for clarifying the standards of acceptable
behavior and are expected to take appropriate and fair corrective action in
response to any instances of unacceptable behavior.
Project maintainers have the right and responsibility to remove, edit, or
reject comments, commits, code, wiki edits, issues, and other contributions
that are not aligned to this Code of Conduct, or to ban temporarily or
permanently any contributor for other behaviors that they deem inappropriate,
threatening, offensive, or harmful.
## Scope
This Code of Conduct applies both within project spaces and in public spaces
when an individual is representing the project or its community. Examples of
representing a project or community include using an official project e-mail
address, posting via an official social media account, or acting as an appointed
representative at an online or offline event. Representation of a project may be
further defined and clarified by project maintainers.
## Enforcement
Instances of abusive, harassing, or otherwise unacceptable behavior may be
reported by contacting the project team at mizu@up9.com. All
complaints will be reviewed and investigated and will result in a response that
is deemed necessary and appropriate to the circumstances. The project team is
obligated to maintain confidentiality with regard to the reporter of an incident.
Further details of specific enforcement policies may be posted separately.
Project maintainers who do not follow or enforce the Code of Conduct in good
faith may face temporary or permanent repercussions as determined by other
members of the project's leadership.
## Attribution
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version 1.4,
available at https://www.contributor-covenant.org/version/1/4/code-of-conduct.html
[homepage]: https://www.contributor-covenant.org
For answers to common questions about this code of conduct, see
https://www.contributor-covenant.org/faq

View File

@@ -1,18 +1,20 @@
![Mizu: The API Traffic Viewer for Kubernetes](assets/mizu-logo.svg)
# CONTRIBUTE
# Contributing to Mizu
We welcome code contributions from the community.
Please read and follow the guidelines below.
## Communication
* Before starting work on a major feature, please reach out to us via [GitHub](https://github.com/up9inc/mizu), [Slack](https://join.slack.com/share/zt-u6bbs3pg-X1zhQOXOH0yEoqILgH~csw), [email](mailto:mizu@up9.com), etc. We will make sure no one else is already working on it. A _major feature_ is defined as any change that is > 100 LOC altered (not including tests), or changes any user-facing behavior
* Small patches and bug fixes don't need prior communication.
## Contribution requirements
## Contribution Requirements
* Code style - most of the code is written in Go, please follow [these guidelines](https://golang.org/doc/effective_go)
* Go-tools compatible (`go get`, `go test`, etc)
* Unit-test coverage cant go down ..
* Go-tools compatible (`go get`, `go test`, etc.)
* Code coverage for unit tests must not decrease.
* Code must be usefully commented. Not only for developers on the project, but also for external users of these packages
* When reviewing PRs, you are encouraged to use Golang's [code review comments page](https://github.com/golang/go/wiki/CodeReviewComments)
* Project follows [Google JSON Style Guide](https://google.github.io/styleguide/jsoncstyleguide.xml) for the REST APIs that are provided.

View File

@@ -18,7 +18,7 @@ jobs:
- name: Set up Go 1.16
uses: actions/setup-go@v2
with:
go-version: '^1.16'
go-version: '1.16'
- name: Check out code into the Go module directory
uses: actions/checkout@v2
@@ -33,7 +33,7 @@ jobs:
- name: Set up Go 1.16
uses: actions/setup-go@v2
with:
go-version: '^1.16'
go-version: '1.16'
- name: Check out code into the Go module directory
uses: actions/checkout@v2

3
.gitignore vendored
View File

@@ -26,3 +26,6 @@ build
# Environment variables
.env
# pprof
pprof/*

View File

@@ -4,7 +4,6 @@ import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"strings"
@@ -12,7 +11,7 @@ import (
"time"
)
func TestTapAndFetch(t *testing.T) {
func TestTap(t *testing.T) {
if testing.Short() {
t.Skip("ignored acceptance test")
}
@@ -93,37 +92,6 @@ func TestTapAndFetch(t *testing.T) {
t.Errorf("%v", err)
return
}
fetchCmdArgs := getDefaultFetchCommandArgs()
fetchCmd := exec.Command(cliPath, fetchCmdArgs...)
t.Logf("running command: %v", fetchCmd.String())
if err := fetchCmd.Start(); err != nil {
t.Errorf("failed to start fetch command, err: %v", err)
return
}
harCheckFunc := func() error {
harBytes, readFileErr := ioutil.ReadFile("./unknown_source.har")
if readFileErr != nil {
return fmt.Errorf("failed to read har file, err: %v", readFileErr)
}
harEntries, err := getEntriesFromHarBytes(harBytes)
if err != nil {
return fmt.Errorf("failed to get entries from har, err: %v", err)
}
if len(harEntries) == 0 {
return fmt.Errorf("unexpected har entries result - Expected more than 0 entries")
}
return nil
}
if err := retriesExecute(shortRetriesCount, harCheckFunc); err != nil {
t.Errorf("%v", err)
return
}
})
}
}

View File

@@ -76,13 +76,6 @@ func getDefaultTapNamespace() []string {
return []string{"-n", "mizu-tests"}
}
func getDefaultFetchCommandArgs() []string {
fetchCommand := "fetch"
defaultCmdArgs := getDefaultCommandArgs()
return append([]string{fetchCommand}, defaultCmdArgs...)
}
func getDefaultConfigCommandArgs() []string {
configCommand := "config"
defaultCmdArgs := getDefaultCommandArgs()
@@ -91,10 +84,10 @@ func getDefaultConfigCommandArgs() []string {
}
func retriesExecute(retriesCount int, executeFunc func() error) error {
var lastError error
var lastError interface{}
for i := 0; i < retriesCount; i++ {
if err := executeFunc(); err != nil {
if err := tryExecuteFunc(executeFunc); err != nil {
lastError = err
time.Sleep(1 * time.Second)
@@ -107,6 +100,16 @@ func retriesExecute(retriesCount int, executeFunc func() error) error {
return fmt.Errorf("reached max retries count, retries count: %v, last err: %v", retriesCount, lastError)
}
func tryExecuteFunc(executeFunc func() error) (err interface{}) {
defer func() {
if panicErr := recover(); panicErr != nil {
err = panicErr
}
}()
return executeFunc()
}
func waitTapPodsReady(apiServerUrl string) error {
resolvingUrl := fmt.Sprintf("%v/status/tappersCount", apiServerUrl)
tapPodsReadyFunc := func() error {
@@ -179,19 +182,6 @@ func cleanupCommand(cmd *exec.Cmd) error {
return nil
}
func getEntriesFromHarBytes(harBytes []byte) ([]interface{}, error) {
harInterface, convertErr := jsonBytesToInterface(harBytes)
if convertErr != nil {
return nil, convertErr
}
har := harInterface.(map[string]interface{})
harLog := har["log"].(map[string]interface{})
harEntries := harLog["entries"].([]interface{})
return harEntries, nil
}
func getPods(tapStatusInterface interface{}) ([]map[string]interface{}, error) {
tapStatus := tapStatusInterface.(map[string]interface{})
podsInterface := tapStatus["pods"].([]interface{})

View File

@@ -1,7 +1,6 @@
# mizu agent
Agent for MIZU (API server and tapper)
Basic APIs:
* /fetch - retrieve traffic data
* /stats - retrieve statistics of collected data
* /viewer - web ui

View File

@@ -3,6 +3,7 @@ package controllers
import (
"encoding/json"
"fmt"
"github.com/google/martian/har"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
@@ -10,11 +11,9 @@ import (
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/google/martian/har"
"github.com/romana/rlog"
tapApi "github.com/up9inc/mizu/tap/api"
@@ -48,7 +47,7 @@ func GetEntries(c *gin.Context) {
Find(&entries)
if len(entries) > 0 && order == database.OrderDesc {
// the entries always order from oldest to newest so we should revers
// the entries always order from oldest to newest - we should reverse
utils.ReverseSlice(entries)
}
@@ -64,93 +63,6 @@ func GetEntries(c *gin.Context) {
c.JSON(http.StatusOK, baseEntries)
}
func GetHARs(c *gin.Context) {
entriesFilter := &models.HarFetchRequestQuery{}
order := database.OrderDesc
if err := c.BindQuery(entriesFilter); err != nil {
c.JSON(http.StatusBadRequest, err)
}
err := validation.Validate(entriesFilter)
if err != nil {
c.JSON(http.StatusBadRequest, err)
}
var timestampFrom, timestampTo int64
if entriesFilter.From < 0 {
timestampFrom = 0
} else {
timestampFrom = entriesFilter.From
}
if entriesFilter.To <= 0 {
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
} else {
timestampTo = entriesFilter.To
}
var entries []tapApi.MizuEntry
database.GetEntriesTable().
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
Find(&entries)
if len(entries) > 0 {
// the entries always order from oldest to newest so we should revers
utils.ReverseSlice(entries)
}
harsObject := map[string]*models.ExtendedHAR{}
for _, entryData := range entries {
var harEntry har.Entry
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
if entryData.ResolvedDestination != "" {
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, entryData.ResolvedDestination)
}
var fileName string
sourceOfEntry := entryData.ResolvedSource
if sourceOfEntry != "" {
// naively assumes the proper service source is http
sourceOfEntry = fmt.Sprintf("http://%s", sourceOfEntry)
//replace / from the file name cause they end up creating a corrupted folder
fileName = fmt.Sprintf("%s.har", strings.ReplaceAll(sourceOfEntry, "/", "_"))
} else {
fileName = "unknown_source.har"
}
if harOfSource, ok := harsObject[fileName]; ok {
harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry)
} else {
var entriesHar []*har.Entry
entriesHar = append(entriesHar, &harEntry)
harsObject[fileName] = &models.ExtendedHAR{
Log: &models.ExtendedLog{
Version: "1.2",
Creator: &models.ExtendedCreator{
Creator: &har.Creator{
Name: "mizu",
Version: "0.0.2",
},
},
Entries: entriesHar,
},
}
// leave undefined when no source is present, otherwise modeler assumes source is empty string ""
if sourceOfEntry != "" {
harsObject[fileName].Log.Creator.Source = &sourceOfEntry
}
}
}
retObj := map[string][]byte{}
for k, v := range harsObject {
bytesData, _ := json.Marshal(v)
retObj[k] = bytesData
}
buffer := utils.ZipData(retObj)
c.Data(http.StatusOK, "application/octet-stream", buffer.Bytes())
}
func UploadEntries(c *gin.Context) {
rlog.Infof("Upload entries - started\n")
@@ -202,15 +114,21 @@ func GetFullEntries(c *gin.Context) {
timestampTo = entriesFilter.To
}
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
result := make([]models.FullEntryDetails, 0)
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, nil)
result := make([]har.Entry, 0)
for _, data := range entriesArray {
harEntry := models.FullEntryDetails{}
if err := models.GetEntry(&data, &harEntry); err != nil {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(data.Entry), &pair); err != nil {
continue
}
result = append(result, harEntry)
harEntry, err := utils.NewEntry(&pair)
if err != nil {
continue
}
result = append(result, *harEntry)
}
c.JSON(http.StatusOK, result)
}
@@ -220,27 +138,12 @@ func GetEntry(c *gin.Context) {
Where(map[string]string{"entryId": c.Param("entryId")}).
First(&entryData)
fullEntry := models.FullEntryDetails{}
if err := models.GetEntry(&entryData, &fullEntry); err != nil {
c.JSON(http.StatusInternalServerError, map[string]interface{}{
"error": true,
"msg": "Can't get entry details",
})
}
// FIXME: Fix the part below
// fullEntryWithPolicy := models.FullEntryWithPolicy{}
// if err := models.GetEntry(&entryData, &fullEntryWithPolicy); err != nil {
// c.JSON(http.StatusInternalServerError, map[string]interface{}{
// "error": true,
// "msg": "Can't get entry details",
// })
// }
extension := extensionsMap[entryData.ProtocolName]
protocol, representation, _ := extension.Dissector.Represent(&entryData)
protocol, representation, bodySize, _ := extension.Dissector.Represent(&entryData)
c.JSON(http.StatusOK, tapApi.MizuEntryWrapper{
Protocol: protocol,
Representation: string(representation),
BodySize: bodySize,
Data: entryData,
})
}

View File

@@ -57,10 +57,16 @@ func initDataBase(databasePath string) *gorm.DB {
return temp
}
func GetEntriesFromDb(timestampFrom int64, timestampTo int64) []tapApi.MizuEntry {
func GetEntriesFromDb(timestampFrom int64, timestampTo int64, protocolName *string) []tapApi.MizuEntry {
order := OrderDesc
protocolNameCondition := "1 = 1"
if protocolName != nil {
protocolNameCondition = fmt.Sprintf("protocolKey = '%s'", *protocolName)
}
var entries []tapApi.MizuEntry
GetEntriesTable().
Where(protocolNameCondition).
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
Find(&entries)

View File

@@ -2,9 +2,7 @@ package models
import (
"encoding/json"
tapApi "github.com/up9inc/mizu/tap/api"
"mizuserver/pkg/rules"
"mizuserver/pkg/utils"
@@ -17,47 +15,14 @@ func GetEntry(r *tapApi.MizuEntry, v tapApi.DataUnmarshaler) error {
return v.UnmarshalData(r)
}
func NewApplicableRules(status bool, latency int64, number int) tapApi.ApplicableRules {
ar := tapApi.ApplicableRules{}
ar.Status = status
ar.Latency = latency
ar.NumberOfRules = number
return ar
}
type FullEntryDetails struct {
har.Entry
}
type FullEntryDetailsExtra struct {
har.Entry
}
func (fed *FullEntryDetails) UnmarshalData(entry *tapApi.MizuEntry) error {
if err := json.Unmarshal([]byte(entry.Entry), &fed.Entry); err != nil {
return err
}
if entry.ResolvedDestination != "" {
fed.Entry.Request.URL = utils.SetHostname(fed.Entry.Request.URL, entry.ResolvedDestination)
}
return nil
}
func (fedex *FullEntryDetailsExtra) UnmarshalData(entry *tapApi.MizuEntry) error {
if err := json.Unmarshal([]byte(entry.Entry), &fedex.Entry); err != nil {
return err
}
if entry.ResolvedSource != "" {
fedex.Entry.Request.Headers = append(fedex.Request.Headers, har.Header{Name: "x-mizu-source", Value: entry.ResolvedSource})
}
if entry.ResolvedDestination != "" {
fedex.Entry.Request.Headers = append(fedex.Request.Headers, har.Header{Name: "x-mizu-destination", Value: entry.ResolvedDestination})
fedex.Entry.Request.URL = utils.SetHostname(fedex.Entry.Request.URL, entry.ResolvedDestination)
}
return nil
}
// TODO: until we fixed the Rules feature
//func NewApplicableRules(status bool, latency int64, number int) tapApi.ApplicableRules {
// ar := tapApi.ApplicableRules{}
// ar.Status = status
// ar.Latency = latency
// ar.NumberOfRules = number
// return ar
//}
type EntriesFilter struct {
Limit int `form:"limit" validate:"required,min=1,max=200"`
@@ -147,9 +112,15 @@ type FullEntryWithPolicy struct {
}
func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error {
if err := json.Unmarshal([]byte(entry.Entry), &fewp.Entry); err != nil {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(entry.Entry), &pair); err != nil {
return err
}
harEntry, err := utils.NewEntry(&pair)
if err != nil {
return err
}
fewp.Entry = *harEntry
_, resultPolicyToSend := rules.MatchRequestPolicy(fewp.Entry, entry.Service)
fewp.RulesMatched = resultPolicyToSend
@@ -157,9 +128,10 @@ func (fewp *FullEntryWithPolicy) UnmarshalData(entry *tapApi.MizuEntry) error {
return nil
}
func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules {
numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend, numberOfRules)
ar := NewApplicableRules(statusPolicyToSend, latency, numberOfRules)
return ar
}
// TODO: until we fixed the Rules feature
//func RunValidationRulesState(harEntry har.Entry, service string) tapApi.ApplicableRules {
// numberOfRules, resultPolicyToSend := rules.MatchRequestPolicy(harEntry, service)
// statusPolicyToSend, latency, numberOfRules := rules.PassedValidationRules(resultPolicyToSend, numberOfRules)
// ar := NewApplicableRules(statusPolicyToSend, latency, numberOfRules)
// return ar
//}

View File

@@ -15,8 +15,6 @@ func EntriesRoutes(ginApp *gin.Engine) {
routeGroup.GET("/uploadEntries", controllers.UploadEntries)
routeGroup.GET("/resolving", controllers.GetCurrentResolvingInformation)
routeGroup.GET("/har", controllers.GetHARs)
routeGroup.GET("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
routeGroup.GET("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB

View File

@@ -5,12 +5,14 @@ import (
"compress/zlib"
"encoding/json"
"fmt"
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
tapApi "github.com/up9inc/mizu/tap/api"
"io/ioutil"
"log"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/utils"
"net/http"
"net/url"
"strings"
@@ -129,21 +131,33 @@ func UploadEntriesImpl(token string, model string, envPrefix string, sleepInterv
for {
timestampTo := time.Now().UnixNano() / int64(time.Millisecond)
rlog.Infof("Getting entries from %v, to %v\n", timestampFrom, timestampTo)
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo)
protocolFilter := "http"
entriesArray := database.GetEntriesFromDb(timestampFrom, timestampTo, &protocolFilter)
if len(entriesArray) > 0 {
fullEntriesExtra := make([]models.FullEntryDetailsExtra, 0)
result := make([]har.Entry, 0)
for _, data := range entriesArray {
harEntry := models.FullEntryDetailsExtra{}
if err := models.GetEntry(&data, &harEntry); err != nil {
var pair tapApi.RequestResponsePair
if err := json.Unmarshal([]byte(data.Entry), &pair); err != nil {
continue
}
fullEntriesExtra = append(fullEntriesExtra, harEntry)
harEntry, err := utils.NewEntry(&pair)
if err != nil {
continue
}
if data.ResolvedSource != "" {
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-source", Value: data.ResolvedSource})
}
if data.ResolvedDestination != "" {
harEntry.Request.Headers = append(harEntry.Request.Headers, har.Header{Name: "x-mizu-destination", Value: data.ResolvedDestination})
harEntry.Request.URL = utils.SetHostname(harEntry.Request.URL, data.ResolvedDestination)
}
result = append(result, *harEntry)
}
rlog.Infof("About to upload %v entries\n", len(fullEntriesExtra))
body, jMarshalErr := json.Marshal(fullEntriesExtra)
rlog.Infof("About to upload %v entries\n", len(result))
body, jMarshalErr := json.Marshal(result)
if jMarshalErr != nil {
analyzeInformation.Reset()
rlog.Infof("Stopping analyzing")

259
agent/pkg/utils/har.go Normal file
View File

@@ -0,0 +1,259 @@
package utils
import (
"bytes"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/google/martian/har"
"github.com/up9inc/mizu/tap"
"github.com/up9inc/mizu/tap/api"
)
// Keep it because we might want cookies in the future
//func BuildCookies(rawCookies []interface{}) []har.Cookie {
// cookies := make([]har.Cookie, 0, len(rawCookies))
//
// for _, cookie := range rawCookies {
// c := cookie.(map[string]interface{})
// expiresStr := ""
// if c["expires"] != nil {
// expiresStr = c["expires"].(string)
// }
// expires, _ := time.Parse(time.RFC3339, expiresStr)
// httpOnly := false
// if c["httponly"] != nil {
// httpOnly, _ = strconv.ParseBool(c["httponly"].(string))
// }
// secure := false
// if c["secure"] != nil {
// secure, _ = strconv.ParseBool(c["secure"].(string))
// }
// path := ""
// if c["path"] != nil {
// path = c["path"].(string)
// }
// domain := ""
// if c["domain"] != nil {
// domain = c["domain"].(string)
// }
//
// cookies = append(cookies, har.Cookie{
// Name: c["name"].(string),
// Value: c["value"].(string),
// Path: path,
// Domain: domain,
// HTTPOnly: httpOnly,
// Secure: secure,
// Expires: expires,
// Expires8601: expiresStr,
// })
// }
//
// return cookies
//}
func BuildHeaders(rawHeaders []interface{}) ([]har.Header, string, string, string, string, string) {
var host, scheme, authority, path, status string
headers := make([]har.Header, 0, len(rawHeaders))
for _, header := range rawHeaders {
h := header.(map[string]interface{})
headers = append(headers, har.Header{
Name: h["name"].(string),
Value: h["value"].(string),
})
if h["name"] == "Host" {
host = h["value"].(string)
}
if h["name"] == ":authority" {
authority = h["value"].(string)
}
if h["name"] == ":scheme" {
scheme = h["value"].(string)
}
if h["name"] == ":path" {
path = h["value"].(string)
}
if h["name"] == ":status" {
path = h["value"].(string)
}
}
return headers, host, scheme, authority, path, status
}
func BuildPostParams(rawParams []interface{}) []har.Param {
params := make([]har.Param, 0, len(rawParams))
for _, param := range rawParams {
p := param.(map[string]interface{})
name := ""
if p["name"] != nil {
name = p["name"].(string)
}
value := ""
if p["value"] != nil {
value = p["value"].(string)
}
fileName := ""
if p["fileName"] != nil {
fileName = p["fileName"].(string)
}
contentType := ""
if p["contentType"] != nil {
contentType = p["contentType"].(string)
}
params = append(params, har.Param{
Name: name,
Value: value,
Filename: fileName,
ContentType: contentType,
})
}
return params
}
func NewRequest(request *api.GenericMessage) (harRequest *har.Request, err error) {
reqDetails := request.Payload.(map[string]interface{})["details"].(map[string]interface{})
headers, host, scheme, authority, path, _ := BuildHeaders(reqDetails["headers"].([]interface{}))
cookies := make([]har.Cookie, 0) // BuildCookies(reqDetails["cookies"].([]interface{}))
postData, _ := reqDetails["postData"].(map[string]interface{})
mimeType, _ := postData["mimeType"]
if mimeType == nil || len(mimeType.(string)) == 0 {
mimeType = "text/html"
}
text, _ := postData["text"]
postDataText := ""
if text != nil {
postDataText = text.(string)
}
queryString := make([]har.QueryString, 0)
for _, _qs := range reqDetails["queryString"].([]interface{}) {
qs := _qs.(map[string]interface{})
queryString = append(queryString, har.QueryString{
Name: qs["name"].(string),
Value: qs["value"].(string),
})
}
url := fmt.Sprintf("http://%s%s", host, reqDetails["url"].(string))
if strings.HasPrefix(mimeType.(string), "application/grpc") {
url = fmt.Sprintf("%s://%s%s", scheme, authority, path)
}
harParams := make([]har.Param, 0)
if postData["params"] != nil {
harParams = BuildPostParams(postData["params"].([]interface{}))
}
harRequest = &har.Request{
Method: reqDetails["method"].(string),
URL: url,
HTTPVersion: reqDetails["httpVersion"].(string),
HeadersSize: -1,
BodySize: int64(bytes.NewBufferString(postDataText).Len()),
QueryString: queryString,
Headers: headers,
Cookies: cookies,
PostData: &har.PostData{
MimeType: mimeType.(string),
Params: harParams,
Text: postDataText,
},
}
return
}
func NewResponse(response *api.GenericMessage) (harResponse *har.Response, err error) {
resDetails := response.Payload.(map[string]interface{})["details"].(map[string]interface{})
headers, _, _, _, _, _status := BuildHeaders(resDetails["headers"].([]interface{}))
cookies := make([]har.Cookie, 0) // BuildCookies(resDetails["cookies"].([]interface{}))
content, _ := resDetails["content"].(map[string]interface{})
mimeType, _ := content["mimeType"]
if mimeType == nil || len(mimeType.(string)) == 0 {
mimeType = "text/html"
}
encoding, _ := content["encoding"]
text, _ := content["text"]
bodyText := ""
if text != nil {
bodyText = text.(string)
}
harContent := &har.Content{
Encoding: encoding.(string),
MimeType: mimeType.(string),
Text: []byte(bodyText),
Size: int64(len(bodyText)),
}
status := int(resDetails["status"].(float64))
if strings.HasPrefix(mimeType.(string), "application/grpc") {
status, err = strconv.Atoi(_status)
if err != nil {
tap.SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response status to int for HAR")
}
}
harResponse = &har.Response{
HTTPVersion: resDetails["httpVersion"].(string),
Status: status,
StatusText: resDetails["statusText"].(string),
HeadersSize: -1,
BodySize: int64(bytes.NewBufferString(bodyText).Len()),
Headers: headers,
Cookies: cookies,
Content: harContent,
}
return
}
func NewEntry(pair *api.RequestResponsePair) (*har.Entry, error) {
harRequest, err := NewRequest(&pair.Request)
if err != nil {
tap.SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting request to HAR")
}
harResponse, err := NewResponse(&pair.Response)
if err != nil {
fmt.Printf("err: %+v\n", err)
tap.SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("failed converting response to HAR")
}
totalTime := pair.Response.CaptureTime.Sub(pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
if totalTime < 1 {
totalTime = 1
}
harEntry := har.Entry{
StartedDateTime: pair.Request.CaptureTime,
Time: totalTime,
Request: harRequest,
Response: harResponse,
Cache: &har.Cache{},
Timings: &har.Timings{
Send: -1,
Wait: -1,
Receive: totalTime,
},
}
return &harEntry, nil
}

View File

@@ -1,7 +1,6 @@
package apiserver
import (
"archive/zip"
"bytes"
"encoding/json"
"fmt"
@@ -131,28 +130,6 @@ func (provider *apiServerProvider) GetGeneralStats() (map[string]interface{}, er
return generalStats, nil
}
func (provider *apiServerProvider) GetHars(fromTimestamp int, toTimestamp int) (*zip.Reader, error) {
if !provider.isReady {
return nil, fmt.Errorf("trying to reach api server when not initialized yet")
}
resp, err := http.Get(fmt.Sprintf("%s/api/har?from=%v&to=%v", provider.url, fromTimestamp, toTimestamp))
if err != nil {
return nil, fmt.Errorf("failed getting har from api server %w", err)
}
defer func() { _ = resp.Body.Close() }()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed reading hars %w", err)
}
zipReader, err := zip.NewReader(bytes.NewReader(body), int64(len(body)))
if err != nil {
return nil, fmt.Errorf("failed craeting zip reader %w", err)
}
return zipReader, nil
}
func (provider *apiServerProvider) GetVersion() (string, error) {
if !provider.isReady {

View File

@@ -3,7 +3,6 @@ package cmd
import (
"context"
"fmt"
"log"
"os"
"os/exec"
"os/signal"
@@ -63,8 +62,8 @@ func openBrowser(url string) {
default:
err = fmt.Errorf("unsupported platform")
}
if err != nil {
log.Fatal(err)
}
if err != nil {
logger.Log.Errorf("error while opening browser, %v", err)
}
}

View File

@@ -1,46 +0,0 @@
package cmd
import (
"github.com/creasty/defaults"
"github.com/spf13/cobra"
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/config/configStructs"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/version"
"github.com/up9inc/mizu/cli/telemetry"
"github.com/up9inc/mizu/cli/uiUtils"
)
var fetchCmd = &cobra.Command{
Use: "fetch",
Short: "Download recorded traffic to files",
RunE: func(cmd *cobra.Command, args []string) error {
go telemetry.ReportRun("fetch", config.Config.Fetch)
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, make sure one running")
return nil
}
if isCompatible, err := version.CheckVersionCompatibility(); err != nil {
return err
} else if !isCompatible {
return nil
}
RunMizuFetch()
return nil
},
}
func init() {
rootCmd.AddCommand(fetchCmd)
defaultFetchConfig := configStructs.FetchConfig{}
defaults.Set(&defaultFetchConfig)
fetchCmd.Flags().StringP(configStructs.DirectoryFetchName, "d", defaultFetchConfig.Directory, "Provide a custom directory for fetched entries")
fetchCmd.Flags().Int(configStructs.FromTimestampFetchName, defaultFetchConfig.FromTimestamp, "Custom start timestamp for fetched entries")
fetchCmd.Flags().Int(configStructs.ToTimestampFetchName, defaultFetchConfig.ToTimestamp, "Custom end timestamp fetched entries")
fetchCmd.Flags().Uint16P(configStructs.GuiPortFetchName, "p", defaultFetchConfig.GuiPort, "Provide a custom port for the web interface webserver")
}

View File

@@ -1,25 +0,0 @@
package cmd
import (
"github.com/up9inc/mizu/cli/apiserver"
"github.com/up9inc/mizu/cli/config"
"github.com/up9inc/mizu/cli/logger"
"github.com/up9inc/mizu/cli/mizu/fsUtils"
"github.com/up9inc/mizu/cli/uiUtils"
)
func RunMizuFetch() {
if err := apiserver.Provider.InitAndTestConnection(GetApiServerUrl()); err != nil {
logger.Log.Errorf(uiUtils.Error, "Couldn't connect to API server, check logs")
}
zipReader, err := apiserver.Provider.GetHars(config.Config.Fetch.FromTimestamp, config.Config.Fetch.ToTimestamp)
if err != nil {
logger.Log.Errorf("Failed fetch data from API server %v", err)
return
}
if err := fsUtils.Unzip(zipReader, config.Config.Fetch.Directory); err != nil {
logger.Log.Debugf("[ERROR] failed unzip %v", err)
}
}

View File

@@ -64,7 +64,6 @@ func init() {
tapCmd.Flags().StringSliceP(configStructs.PlainTextFilterRegexesTapName, "r", defaultTapConfig.PlainTextFilterRegexes, "List of regex expressions that are used to filter matching values from text/plain http bodies")
tapCmd.Flags().Bool(configStructs.DisableRedactionTapName, defaultTapConfig.DisableRedaction, "Disables redaction of potentially sensitive request/response headers and body values")
tapCmd.Flags().String(configStructs.HumanMaxEntriesDBSizeTapName, defaultTapConfig.HumanMaxEntriesDBSize, "Override the default max entries db size")
tapCmd.Flags().String(configStructs.DirectionTapName, defaultTapConfig.Direction, "Record traffic that goes in this direction (relative to the tapped pod): in/any")
tapCmd.Flags().Bool(configStructs.DryRunTapName, defaultTapConfig.DryRun, "Preview of all pods matching the regex, without tapping them")
tapCmd.Flags().String(configStructs.EnforcePolicyFile, defaultTapConfig.EnforcePolicyFile, "Yaml file with policy rules")
}

View File

@@ -18,7 +18,6 @@ const (
type ConfigStruct struct {
Tap configStructs.TapConfig `yaml:"tap"`
Fetch configStructs.FetchConfig `yaml:"fetch"`
Version configStructs.VersionConfig `yaml:"version"`
View configStructs.ViewConfig `yaml:"view"`
Logs configStructs.LogsConfig `yaml:"logs"`

View File

@@ -1,15 +0,0 @@
package configStructs
const (
DirectoryFetchName = "directory"
FromTimestampFetchName = "from"
ToTimestampFetchName = "to"
GuiPortFetchName = "gui-port"
)
type FetchConfig struct {
Directory string `yaml:"directory" default:"."`
FromTimestamp int `yaml:"from" default:"0"`
ToTimestamp int `yaml:"to" default:"0"`
GuiPort uint16 `yaml:"gui-port" default:"8899"`
}

View File

@@ -3,10 +3,8 @@ package configStructs
import (
"errors"
"fmt"
"regexp"
"strings"
"github.com/up9inc/mizu/shared/units"
"regexp"
)
const (
@@ -17,7 +15,6 @@ const (
PlainTextFilterRegexesTapName = "regex-masking"
DisableRedactionTapName = "no-redact"
HumanMaxEntriesDBSizeTapName = "max-entries-db-size"
DirectionTapName = "direction"
DryRunTapName = "dry-run"
EnforcePolicyFile = "test-rules"
)
@@ -34,7 +31,6 @@ type TapConfig struct {
HealthChecksUserAgentHeaders []string `yaml:"ignored-user-agents"`
DisableRedaction bool `yaml:"no-redact" default:"false"`
HumanMaxEntriesDBSize string `yaml:"max-entries-db-size" default:"200MB"`
Direction string `yaml:"direction" default:"in"`
DryRun bool `yaml:"dry-run" default:"false"`
EnforcePolicyFile string `yaml:"test-rules"`
ApiServerResources Resources `yaml:"api-server-resources"`
@@ -69,10 +65,5 @@ func (config *TapConfig) Validate() error {
return errors.New(fmt.Sprintf("Could not parse --%s value %s", HumanMaxEntriesDBSizeTapName, config.HumanMaxEntriesDBSize))
}
directionLowerCase := strings.ToLower(config.Direction)
if directionLowerCase != "any" && directionLowerCase != "in" {
return errors.New(fmt.Sprintf("%s is not a valid value for flag --%s. Acceptable values are in/any.", config.Direction, DirectionTapName))
}
return nil
}

View File

@@ -604,6 +604,7 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.GoGCEnvVar).WithValue("12800"),
)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(

View File

@@ -8,6 +8,7 @@ import (
"github.com/up9inc/mizu/cli/mizu"
"io/ioutil"
"net/http"
"runtime"
"time"
"github.com/google/go-github/v37/github"
@@ -76,7 +77,7 @@ func CheckNewerVersion(versionChan chan string) {
logger.Log.Debugf("Finished version validation, github version %v, current version %v, took %v", gitHubVersion, currentSemVer, time.Since(start))
if gitHubVersionSemVer.GreaterThan(currentSemVer) {
versionChan <- fmt.Sprintf("Update available! %v -> %v (%v)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL)
versionChan <- fmt.Sprintf("Update available! %v -> %v (curl -Lo mizu %v/mizu_%s_amd64 && chmod 755 mizu)", mizu.SemVer, gitHubVersion, *latestRelease.HTMLURL, runtime.GOOS)
} else {
versionChan <- ""
}

View File

@@ -8,4 +8,5 @@ const (
MaxEntriesDBSizeBytesEnvVar = "MAX_ENTRIES_DB_BYTES"
RulePolicyPath = "/app/enforce-policy/"
RulePolicyFileName = "enforce-policy.yaml"
GoGCEnvVar = "GOGC"
)

View File

@@ -9,19 +9,19 @@ import (
type Protocol struct {
Name string `json:"name"`
LongName string `json:"long_name"`
LongName string `json:"longName"`
Abbreviation string `json:"abbreviation"`
Version string `json:"version"`
BackgroundColor string `json:"background_color"`
ForegroundColor string `json:"foreground_color"`
FontSize int8 `json:"font_size"`
ReferenceLink string `json:"reference_link"`
BackgroundColor string `json:"backgroundColor"`
ForegroundColor string `json:"foregroundColor"`
FontSize int8 `json:"fontSize"`
ReferenceLink string `json:"referenceLink"`
Ports []string `json:"ports"`
Priority uint8 `json:"priority"`
}
type Extension struct {
Protocol Protocol
Protocol *Protocol
Path string
Plug *plugin.Plugin
Dissector Dissector
@@ -50,8 +50,8 @@ type CounterPair struct {
}
type GenericMessage struct {
IsRequest bool `json:"is_request"`
CaptureTime time.Time `json:"capture_time"`
IsRequest bool `json:"isRequest"`
CaptureTime time.Time `json:"captureTime"`
Payload interface{} `json:"payload"`
}
@@ -68,16 +68,26 @@ type OutputChannelItem struct {
Pair *RequestResponsePair
}
type SuperTimer struct {
CaptureTime time.Time
}
type SuperIdentifier struct {
Protocol *Protocol
IsClosedOthers bool
}
type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, emitter Emitter) error
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter) error
Analyze(item *OutputChannelItem, entryId string, resolvedSource string, resolvedDestination string) *MizuEntry
Summarize(entry *MizuEntry) *BaseEntryDetails
Represent(entry *MizuEntry) (Protocol, []byte, error)
Represent(entry *MizuEntry) (protocol Protocol, object []byte, bodySize int64, err error)
}
type Emitting struct {
AppStats *AppStats
OutputChannel chan *OutputChannelItem
}
@@ -87,36 +97,45 @@ type Emitter interface {
func (e *Emitting) Emit(item *OutputChannelItem) {
e.OutputChannel <- item
e.AppStats.IncMatchedPairs()
}
type MizuEntry struct {
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
ProtocolName string `json:"protocol_key" gorm:"column:protocolKey"`
ProtocolVersion string `json:"protocol_version" gorm:"column:protocolVersion"`
Entry string `json:"entry,omitempty" gorm:"column:entry"`
EntryId string `json:"entryId" gorm:"column:entryId"`
Url string `json:"url" gorm:"column:url"`
Method string `json:"method" gorm:"column:method"`
Status int `json:"status" gorm:"column:status"`
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
SourceIp string `json:"sourceIp,omitempty" gorm:"column:sourceIp"`
DestinationIp string `json:"destinationIp,omitempty" gorm:"column:destinationIp"`
SourcePort string `json:"sourcePort,omitempty" gorm:"column:sourcePort"`
DestinationPort string `json:"destinationPort,omitempty" gorm:"column:destinationPort"`
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"`
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
ProtocolName string `json:"protocolName" gorm:"column:protocolName"`
ProtocolLongName string `json:"protocolLongName" gorm:"column:protocolLongName"`
ProtocolAbbreviation string `json:"protocolAbbreviation" gorm:"column:protocolVersion"`
ProtocolVersion string `json:"protocolVersion" gorm:"column:protocolVersion"`
ProtocolBackgroundColor string `json:"protocolBackgroundColor" gorm:"column:protocolBackgroundColor"`
ProtocolForegroundColor string `json:"protocolForegroundColor" gorm:"column:protocolForegroundColor"`
ProtocolFontSize int8 `json:"protocolFontSize" gorm:"column:protocolFontSize"`
ProtocolReferenceLink string `json:"protocolReferenceLink" gorm:"column:protocolReferenceLink"`
Entry string `json:"entry,omitempty" gorm:"column:entry"`
EntryId string `json:"entryId" gorm:"column:entryId"`
Url string `json:"url" gorm:"column:url"`
Method string `json:"method" gorm:"column:method"`
Status int `json:"status" gorm:"column:status"`
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
ElapsedTime int64 `json:"elapsedTime" gorm:"column:elapsedTime"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
SourceIp string `json:"sourceIp,omitempty" gorm:"column:sourceIp"`
DestinationIp string `json:"destinationIp,omitempty" gorm:"column:destinationIp"`
SourcePort string `json:"sourcePort,omitempty" gorm:"column:sourcePort"`
DestinationPort string `json:"destinationPort,omitempty" gorm:"column:destinationPort"`
IsOutgoing bool `json:"isOutgoing,omitempty" gorm:"column:isOutgoing"`
EstimatedSizeBytes int `json:"-" gorm:"column:estimatedSizeBytes"`
}
type MizuEntryWrapper struct {
Protocol Protocol `json:"protocol"`
Representation string `json:"representation"`
BodySize int64 `json:"bodySize"`
Data MizuEntry `json:"data"`
}
@@ -124,16 +143,17 @@ type BaseEntryDetails struct {
Id string `json:"id,omitempty"`
Protocol Protocol `json:"protocol,omitempty"`
Url string `json:"url,omitempty"`
RequestSenderIp string `json:"request_sender_ip,omitempty"`
RequestSenderIp string `json:"requestSenderIp,omitempty"`
Service string `json:"service,omitempty"`
Path string `json:"path,omitempty"`
Summary string `json:"summary,omitempty"`
StatusCode int `json:"status_code"`
StatusCode int `json:"statusCode"`
Method string `json:"method,omitempty"`
Timestamp int64 `json:"timestamp,omitempty"`
SourceIp string `json:"source_ip,omitempty"`
DestinationIp string `json:"destination_ip,omitempty"`
SourcePort string `json:"source_port,omitempty"`
DestinationPort string `json:"destination_port,omitempty"`
SourceIp string `json:"sourceIp,omitempty"`
DestinationIp string `json:"destinationIp,omitempty"`
SourcePort string `json:"sourcePort,omitempty"`
DestinationPort string `json:"destinationPort,omitempty"`
IsOutgoing bool `json:"isOutgoing,omitempty"`
Latency int64 `json:"latency,omitempty"`
Rules ApplicableRules `json:"rules,omitempty"`
@@ -150,11 +170,19 @@ type DataUnmarshaler interface {
}
func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error {
entryUrl := entry.Url
service := entry.Service
bed.Protocol = Protocol{
Name: entry.ProtocolName,
LongName: entry.ProtocolLongName,
Abbreviation: entry.ProtocolAbbreviation,
Version: entry.ProtocolVersion,
BackgroundColor: entry.ProtocolBackgroundColor,
ForegroundColor: entry.ProtocolForegroundColor,
FontSize: entry.ProtocolFontSize,
ReferenceLink: entry.ProtocolReferenceLink,
}
bed.Id = entry.EntryId
bed.Url = entryUrl
bed.Service = service
bed.Url = entry.Url
bed.Service = entry.Service
bed.Summary = entry.Path
bed.StatusCode = entry.Status
bed.Method = entry.Method
@@ -163,3 +191,8 @@ func (bed *BaseEntryDetails) UnmarshalData(entry *MizuEntry) error {
bed.IsOutgoing = entry.IsOutgoing
return nil
}
const (
TABLE string = "table"
BODY string = "body"
)

70
tap/api/stats_tracker.go Normal file
View File

@@ -0,0 +1,70 @@
package api
import (
"sync/atomic"
"time"
)
type AppStats struct {
StartTime time.Time `json:"-"`
ProcessedBytes uint64 `json:"processedBytes"`
PacketsCount uint64 `json:"packetsCount"`
TcpPacketsCount uint64 `json:"tcpPacketsCount"`
ReassembledTcpPayloadsCount uint64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount uint64 `json:"tlsConnectionsCount"`
MatchedPairs uint64 `json:"matchedPairs"`
DroppedTcpStreams uint64 `json:"droppedTcpStreams"`
}
func (as *AppStats) IncMatchedPairs() {
atomic.AddUint64(&as.MatchedPairs, 1)
}
func (as *AppStats) IncDroppedTcpStreams() {
atomic.AddUint64(&as.DroppedTcpStreams, 1)
}
func (as *AppStats) IncPacketsCount() uint64 {
atomic.AddUint64(&as.PacketsCount, 1)
return as.PacketsCount
}
func (as *AppStats) IncTcpPacketsCount() {
atomic.AddUint64(&as.TcpPacketsCount, 1)
}
func (as *AppStats) IncReassembledTcpPayloadsCount() {
atomic.AddUint64(&as.ReassembledTcpPayloadsCount, 1)
}
func (as *AppStats) IncTlsConnectionsCount() {
atomic.AddUint64(&as.TlsConnectionsCount, 1)
}
func (as *AppStats) UpdateProcessedBytes(size uint64) {
atomic.AddUint64(&as.ProcessedBytes, size)
}
func (as *AppStats) SetStartTime(startTime time.Time) {
as.StartTime = startTime
}
func (as *AppStats) DumpStats() *AppStats {
currentAppStats := &AppStats{StartTime: as.StartTime}
currentAppStats.ProcessedBytes = resetUint64(&as.ProcessedBytes)
currentAppStats.PacketsCount = resetUint64(&as.PacketsCount)
currentAppStats.TcpPacketsCount = resetUint64(&as.TcpPacketsCount)
currentAppStats.ReassembledTcpPayloadsCount = resetUint64(&as.ReassembledTcpPayloadsCount)
currentAppStats.TlsConnectionsCount = resetUint64(&as.TlsConnectionsCount)
currentAppStats.MatchedPairs = resetUint64(&as.MatchedPairs)
currentAppStats.DroppedTcpStreams = resetUint64(&as.DroppedTcpStreams)
return currentAppStats
}
func resetUint64(ref *uint64) (val uint64) {
val = atomic.LoadUint64(ref)
atomic.StoreUint64(ref, 0)
return
}

View File

@@ -93,10 +93,10 @@ type AMQPWrapper struct {
Details interface{} `json:"details"`
}
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, emitter api.Emitter) {
func emitAMQP(event interface{}, _type string, method string, connectionInfo *api.ConnectionInfo, captureTime time.Time, emitter api.Emitter) {
request := &api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
CaptureTime: captureTime,
Payload: AMQPPayload{
Data: &AMQPWrapper{
Method: method,
@@ -107,7 +107,7 @@ func emitAMQP(event interface{}, _type string, method string, connectionInfo *ap
}
item := &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Timestamp: captureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: *request,
@@ -219,7 +219,7 @@ func representProperties(properties map[string]interface{}, rep []interface{}) (
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Properties",
"data": string(props),
})
@@ -249,7 +249,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -267,7 +267,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headersMarshaled),
})
@@ -275,7 +275,7 @@ func representBasicPublish(event map[string]interface{}) []interface{} {
if event["Body"] != nil {
rep = append(rep, map[string]string{
"type": "body",
"type": api.BODY,
"title": "Body",
"encoding": "base64",
"mime_type": contentType,
@@ -326,7 +326,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -344,7 +344,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headersMarshaled),
})
@@ -352,7 +352,7 @@ func representBasicDeliver(event map[string]interface{}) []interface{} {
if event["Body"] != nil {
rep = append(rep, map[string]string{
"type": "body",
"type": api.BODY,
"title": "Body",
"encoding": "base64",
"mime_type": contentType,
@@ -393,7 +393,7 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -408,7 +408,7 @@ func representQueueDeclare(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})
@@ -451,7 +451,7 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -466,7 +466,7 @@ func representExchangeDeclare(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})
@@ -497,7 +497,7 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -524,7 +524,7 @@ func representConnectionStart(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Server Properties",
"data": string(headersMarshaled),
})
@@ -555,7 +555,7 @@ func representConnectionClose(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -585,7 +585,7 @@ func representQueueBind(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -600,7 +600,7 @@ func representQueueBind(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})
@@ -639,7 +639,7 @@ func representBasicConsume(event map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
@@ -654,7 +654,7 @@ func representBasicConsume(event map[string]interface{}) []interface{} {
}
headersMarshaled, _ := json.Marshal(headers)
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Arguments",
"data": string(headersMarshaled),
})

View File

@@ -32,7 +32,7 @@ func init() {
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = protocol
extension.Protocol = &protocol
}
func (d dissecting) Ping() {
@@ -41,7 +41,7 @@ func (d dissecting) Ping() {
const amqpRequest string = "amqp_request"
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
r := AmqpReader{b}
var remaining int
@@ -78,13 +78,14 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
var lastMethodFrameMessage Message
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
return errors.New("Identified by another protocol")
}
frame, err := r.ReadFrame()
if err == io.EOF {
// We must read until we see an EOF... very important!
return errors.New("AMQP EOF")
} else if err != nil {
// TODO: Causes ignoring some methods. Return only in case of a certain error. But what?
return err
}
switch f := frame.(type) {
@@ -101,6 +102,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
case *BasicDeliver:
eventBasicDeliver.Properties = header.Properties
default:
frame = nil
}
case *BodyFrame:
@@ -110,11 +112,15 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
switch lastMethodFrameMessage.(type) {
case *BasicPublish:
eventBasicPublish.Body = f.Body
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicPublish, amqpRequest, basicMethodMap[40], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver:
eventBasicDeliver.Body = f.Body
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicDeliver, amqpRequest, basicMethodMap[60], connectionInfo, superTimer.CaptureTime, emitter)
default:
body = nil
frame = nil
}
case *MethodFrame:
@@ -134,7 +140,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueBind, amqpRequest, queueMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicConsume:
eventBasicConsume := &BasicConsume{
@@ -146,7 +153,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventBasicConsume, amqpRequest, basicMethodMap[20], connectionInfo, superTimer.CaptureTime, emitter)
case *BasicDeliver:
eventBasicDeliver.ConsumerTag = m.ConsumerTag
@@ -165,7 +173,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventQueueDeclare, amqpRequest, queueMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ExchangeDeclare:
eventExchangeDeclare := &ExchangeDeclare{
@@ -178,7 +187,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
NoWait: m.NoWait,
Arguments: m.Arguments,
}
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventExchangeDeclare, amqpRequest, exchangeMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionStart:
eventConnectionStart := &ConnectionStart{
@@ -188,7 +198,8 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
Mechanisms: m.Mechanisms,
Locales: m.Locales,
}
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionStart, amqpRequest, connectionMethodMap[10], connectionInfo, superTimer.CaptureTime, emitter)
case *ConnectionClose:
eventConnectionClose := &ConnectionClose{
@@ -197,9 +208,11 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
ClassId: m.ClassId,
MethodId: m.MethodId,
}
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, emitter)
superIdentifier.Protocol = &protocol
emitAMQP(*eventConnectionClose, amqpRequest, connectionMethodMap[50], connectionInfo, superTimer.CaptureTime, emitter)
default:
frame = nil
}
@@ -254,24 +267,31 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
request["url"] = summary
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: protocol.Name,
ProtocolVersion: protocol.Version,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: request["method"].(string),
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
ProtocolName: protocol.Name,
ProtocolLongName: protocol.LongName,
ProtocolAbbreviation: protocol.Abbreviation,
ProtocolVersion: protocol.Version,
ProtocolBackgroundColor: protocol.BackgroundColor,
ProtocolForegroundColor: protocol.ForegroundColor,
ProtocolFontSize: protocol.FontSize,
ProtocolReferenceLink: protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: request["method"].(string),
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: 0,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}
@@ -300,7 +320,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
p = protocol
bodySize = 0
var root map[string]interface{}
json.Unmarshal([]byte(entry.Entry), &root)
representation := make(map[string]interface{}, 0)
@@ -334,8 +356,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
break
}
representation["request"] = repRequest
object, err := json.Marshal(representation)
return protocol, object, err
object, err = json.Marshal(representation)
return
}
var Dissector dissecting

View File

@@ -54,7 +54,7 @@ func (r *AmqpReader) ReadFrame() (frame frame, err error) {
channel := binary.BigEndian.Uint16(scratch[1:3])
size := binary.BigEndian.Uint32(scratch[3:7])
if size > 1000000 {
if size > 1000000*16 {
return nil, ErrMaxSize
}
@@ -352,6 +352,10 @@ func (r *AmqpReader) parseHeaderFrame(channel uint16, size uint32) (frame frame,
return
}
if hf.Size > 512 {
return nil, ErrMaxHeaderFrameSize
}
var flags uint16
if err = binary.Read(r.R, binary.BigEndian, &flags); err != nil {
@@ -439,6 +443,7 @@ func (r *AmqpReader) parseBodyFrame(channel uint16, size uint32) (frame frame, e
}
if _, err = io.ReadFull(r.R, bf.Body); err != nil {
bf.Body = nil
return nil, err
}

View File

@@ -10,7 +10,6 @@ package main
import (
"encoding/binary"
"fmt"
"io"
)
@@ -19,32 +18,35 @@ import (
// ErrCredentials. The text of the error is likely more interesting than
// these constants.
const (
frameMethod = 1
frameHeader = 2
frameBody = 3
frameHeartbeat = 8
frameMinSize = 4096
frameEnd = 206
replySuccess = 200
ContentTooLarge = 311
NoRoute = 312
NoConsumers = 313
ConnectionForced = 320
InvalidPath = 402
AccessRefused = 403
NotFound = 404
ResourceLocked = 405
PreconditionFailed = 406
FrameError = 501
SyntaxError = 502
CommandInvalid = 503
ChannelError = 504
UnexpectedFrame = 505
ResourceError = 506
NotAllowed = 530
NotImplemented = 540
InternalError = 541
MaxSizeError = 551
frameMethod = 1
frameHeader = 2
frameBody = 3
frameHeartbeat = 8
frameMinSize = 4096
frameEnd = 206
replySuccess = 200
ContentTooLarge = 311
NoRoute = 312
NoConsumers = 313
ConnectionForced = 320
InvalidPath = 402
AccessRefused = 403
NotFound = 404
ResourceLocked = 405
PreconditionFailed = 406
FrameError = 501
SyntaxError = 502
CommandInvalid = 503
ChannelError = 504
UnexpectedFrame = 505
ResourceError = 506
NotAllowed = 530
NotImplemented = 540
InternalError = 541
MaxSizeError = 551
MaxHeaderFrameSizeError = 552
BadMethodFrameUnknownMethod = 601
BadMethodFrameUnknownClass = 602
)
func isSoftExceptionCode(code int) bool {
@@ -2854,7 +2856,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 20: // channel
@@ -2909,7 +2911,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 40: // exchange
@@ -2980,7 +2982,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 50: // queue
@@ -3067,7 +3069,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 60: // basic
@@ -3218,7 +3220,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 90: // tx
@@ -3273,7 +3275,7 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
case 85: // confirm
@@ -3296,11 +3298,11 @@ func (r *AmqpReader) parseMethodFrame(channel uint16, size uint32) (f frame, err
mf.Method = method
default:
return nil, fmt.Errorf("Bad method frame, unknown method %d for class %d", mf.MethodId, mf.ClassId)
return nil, ErrBadMethodFrameUnknownMethod
}
default:
return nil, fmt.Errorf("Bad method frame, unknown class %d", mf.ClassId)
return nil, ErrBadMethodFrameUnknownClass
}
return mf, nil

View File

@@ -60,8 +60,13 @@ var (
// ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
// ErrClosed is returned when the channel or connection is not open
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 1MB"}
ErrMaxSize = &Error{Code: MaxSizeError, Reason: "an AMQP message cannot be bigger than 16MB"}
ErrMaxHeaderFrameSize = &Error{Code: MaxHeaderFrameSizeError, Reason: "an AMQP header cannot be bigger than 512 bytes"}
ErrBadMethodFrameUnknownMethod = &Error{Code: BadMethodFrameUnknownMethod, Reason: "Bad method frame, unknown method"}
ErrBadMethodFrameUnknownClass = &Error{Code: BadMethodFrameUnknownClass, Reason: "Bad method frame, unknown class"}
)
// Error captures the code and reason a channel or connection has been closed

View File

@@ -7,14 +7,13 @@ import (
"io"
"io/ioutil"
"net/http"
"time"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
)
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter api.Emitter) error {
func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) error {
streamID, messageHTTP1, err := grpcAssembler.readMessage()
if err != nil {
return err
@@ -32,7 +31,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
tcpID.DstPort,
streamID,
)
item = reqResMatcher.registerRequest(ident, &messageHTTP1, time.Now())
item = reqResMatcher.registerRequest(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -51,7 +50,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
tcpID.SrcPort,
streamID,
)
item = reqResMatcher.registerResponse(ident, &messageHTTP1, time.Now())
item = reqResMatcher.registerResponse(ident, &messageHTTP1, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,
@@ -71,7 +70,7 @@ func handleHTTP2Stream(grpcAssembler *GrpcAssembler, tcpID *api.TcpID, emitter a
return nil
}
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
req, err := http.ReadRequest(b)
if err != nil {
// log.Println("Error reading stream:", err)
@@ -99,7 +98,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
tcpID.DstPort,
counterPair.Request,
)
item := reqResMatcher.registerRequest(ident, req, time.Now())
item := reqResMatcher.registerRequest(ident, req, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.SrcIP,
@@ -113,7 +112,7 @@ func handleHTTP1ClientStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
return nil
}
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, emitter api.Emitter) error {
res, err := http.ReadResponse(b, nil)
if err != nil {
// log.Println("Error reading stream:", err)
@@ -149,7 +148,7 @@ func handleHTTP1ServerStream(b *bufio.Reader, tcpID *api.TcpID, counterPair *api
tcpID.SrcPort,
counterPair.Response,
)
item := reqResMatcher.registerResponse(ident, res, time.Now())
item := reqResMatcher.registerResponse(ident, res, superTimer.CaptureTime)
if item != nil {
item.ConnectionInfo = &api.ConnectionInfo{
ClientIP: tcpID.DstIP,

View File

@@ -3,10 +3,12 @@ package main
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/url"
"time"
"github.com/romana/rlog"
@@ -51,7 +53,7 @@ func init() {
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = protocol
extension.Protocol = &protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
@@ -59,7 +61,7 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
ident := fmt.Sprintf("%s->%s:%s->%s", tcpID.SrcIP, tcpID.DstIP, tcpID.SrcPort, tcpID.DstPort)
isHTTP2, err := checkIsHTTP2Connection(b, isClient)
if err != nil {
@@ -76,41 +78,46 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
grpcAssembler = createGrpcAssembler(b)
}
success := false
dissected := false
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &protocol {
return errors.New("Identified by another protocol")
}
if isHTTP2 {
err = handleHTTP2Stream(grpcAssembler, tcpID, emitter)
err = handleHTTP2Stream(grpcAssembler, tcpID, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
rlog.Debugf("[HTTP/2] stream %s error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
dissected = true
} else if isClient {
err = handleHTTP1ClientStream(b, tcpID, counterPair, emitter)
err = handleHTTP1ClientStream(b, tcpID, counterPair, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
rlog.Debugf("[HTTP-request] stream %s Request error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
dissected = true
} else {
err = handleHTTP1ServerStream(b, tcpID, counterPair, emitter)
err = handleHTTP1ServerStream(b, tcpID, counterPair, superTimer, emitter)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
rlog.Debugf("[HTTP-response], stream %s Response error: %s (%v,%+v)", ident, err, err, err)
continue
}
success = true
dissected = true
}
}
if !success {
if !dissected {
return err
}
superIdentifier.Protocol = &protocol
return nil
}
@@ -161,26 +168,35 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
} else if resolvedSource != "" {
service = SetHostname(service, resolvedSource)
}
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: protocol.Name,
ProtocolVersion: item.Protocol.Version,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, path),
Method: reqDetails["method"].(string),
Status: int(resDetails["status"].(float64)),
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
Path: path,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
ProtocolName: protocol.Name,
ProtocolLongName: protocol.LongName,
ProtocolAbbreviation: protocol.Abbreviation,
ProtocolVersion: item.Protocol.Version,
ProtocolBackgroundColor: protocol.BackgroundColor,
ProtocolForegroundColor: protocol.ForegroundColor,
ProtocolFontSize: protocol.FontSize,
ProtocolReferenceLink: protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, path),
Method: reqDetails["method"].(string),
Status: int(resDetails["status"].(float64)),
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: path,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}
@@ -197,6 +213,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
Url: entry.Url,
RequestSenderIp: entry.RequestSenderIp,
Service: entry.Service,
Path: entry.Path,
Summary: entry.Path,
StatusCode: entry.Status,
Method: entry.Method,
@@ -214,9 +231,7 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func representRequest(request map[string]interface{}) []interface{} {
repRequest := make([]interface{}, 0)
func representRequest(request map[string]interface{}) (repRequest []interface{}) {
details, _ := json.Marshal([]map[string]string{
{
"name": "Method",
@@ -232,28 +247,28 @@ func representRequest(request map[string]interface{}) []interface{} {
},
})
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
headers, _ := json.Marshal(request["headers"].([]interface{}))
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headers),
})
cookies, _ := json.Marshal(request["cookies"].([]interface{}))
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Cookies",
"data": string(cookies),
})
queryString, _ := json.Marshal(request["queryString"].([]interface{}))
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Query String",
"data": string(queryString),
})
@@ -266,7 +281,7 @@ func representRequest(request map[string]interface{}) []interface{} {
text, _ := postData["text"]
if text != nil {
repRequest = append(repRequest, map[string]string{
"type": "body",
"type": api.BODY,
"title": "POST Data (text/plain)",
"encoding": "",
"mime_type": mimeType.(string),
@@ -285,13 +300,13 @@ func representRequest(request map[string]interface{}) []interface{} {
},
})
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "POST Data (multipart/form-data)",
"data": string(multipart),
})
} else {
repRequest = append(repRequest, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "POST Data (application/x-www-form-urlencoded)",
"data": string(params),
})
@@ -299,11 +314,13 @@ func representRequest(request map[string]interface{}) []interface{} {
}
}
return repRequest
return
}
func representResponse(response map[string]interface{}) []interface{} {
repResponse := make([]interface{}, 0)
func representResponse(response map[string]interface{}) (repResponse []interface{}, bodySize int64) {
repResponse = make([]interface{}, 0)
bodySize = int64(response["bodySize"].(float64))
details, _ := json.Marshal([]map[string]string{
{
@@ -316,25 +333,25 @@ func representResponse(response map[string]interface{}) []interface{} {
},
{
"name": "Body Size",
"value": fmt.Sprintf("%g bytes", response["bodySize"].(float64)),
"value": fmt.Sprintf("%d bytes", bodySize),
},
})
repResponse = append(repResponse, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Details",
"data": string(details),
})
headers, _ := json.Marshal(response["headers"].([]interface{}))
repResponse = append(repResponse, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Headers",
"data": string(headers),
})
cookies, _ := json.Marshal(response["cookies"].([]interface{}))
repResponse = append(repResponse, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Cookies",
"data": string(cookies),
})
@@ -348,7 +365,7 @@ func representResponse(response map[string]interface{}) []interface{} {
text, _ := content["text"]
if text != nil {
repResponse = append(repResponse, map[string]string{
"type": "body",
"type": api.BODY,
"title": "Body",
"encoding": encoding.(string),
"mime_type": mimeType.(string),
@@ -356,11 +373,10 @@ func representResponse(response map[string]interface{}) []interface{} {
})
}
return repResponse
return
}
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
var p api.Protocol
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
if entry.ProtocolVersion == "2.0" {
p = http2Protocol
} else {
@@ -374,11 +390,11 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
reqDetails := request["details"].(map[string]interface{})
resDetails := response["details"].(map[string]interface{})
repRequest := representRequest(reqDetails)
repResponse := representResponse(resDetails)
repResponse, bodySize := representResponse(resDetails)
representation["request"] = repRequest
representation["response"] = repResponse
object, err := json.Marshal(representation)
return p, object, err
object, err = json.Marshal(representation)
return
}
var Dissector dissecting

View File

@@ -85,7 +85,7 @@ func (matcher *requestResponseMatcher) registerResponse(ident string, response *
func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *api.GenericMessage, responseHTTPMessage *api.GenericMessage) *api.OutputChannelItem {
return &api.OutputChannelItem{
Protocol: protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Timestamp: requestHTTPMessage.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: nil,
Pair: &api.RequestResponsePair{
Request: *requestHTTPMessage,

View File

@@ -4,6 +4,8 @@ import (
"encoding/json"
"fmt"
"strconv"
"github.com/up9inc/mizu/tap/api"
)
type KafkaPayload struct {
@@ -48,7 +50,7 @@ func representRequestHeader(data map[string]interface{}, rep []interface{}) []in
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Request Header",
"data": string(requestHeader),
})
@@ -64,7 +66,7 @@ func representResponseHeader(data map[string]interface{}, rep []interface{}) []i
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Response Header",
"data": string(requestHeader),
})
@@ -114,7 +116,7 @@ func representMetadataRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -173,7 +175,7 @@ func representMetadataResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -206,7 +208,7 @@ func representApiVersionsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -244,7 +246,7 @@ func representApiVersionsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -287,7 +289,7 @@ func representProduceRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -317,7 +319,7 @@ func representProduceResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -404,7 +406,7 @@ func representFetchRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -450,7 +452,7 @@ func representFetchResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -476,7 +478,7 @@ func representListOffsetsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -506,7 +508,7 @@ func representListOffsetsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -540,7 +542,7 @@ func representCreateTopicsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -570,7 +572,7 @@ func representCreateTopicsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -609,7 +611,7 @@ func representDeleteTopicsRequest(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})
@@ -639,7 +641,7 @@ func representDeleteTopicsResponse(data map[string]interface{}) []interface{} {
},
})
rep = append(rep, map[string]string{
"type": "table",
"type": api.TABLE,
"title": "Payload",
"data": string(repPayload),
})

View File

@@ -3,8 +3,10 @@ package main
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"log"
"time"
"github.com/up9inc/mizu/tap/api"
)
@@ -29,7 +31,7 @@ func init() {
type dissecting string
func (d dissecting) Register(extension *api.Extension) {
extension.Protocol = _protocol
extension.Protocol = &_protocol
extension.MatcherMap = reqResMatcher.openMessagesMap
}
@@ -37,18 +39,24 @@ func (d dissecting) Ping() {
log.Printf("pong %s\n", _protocol.Name)
}
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, emitter api.Emitter) error {
func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, counterPair *api.CounterPair, superTimer *api.SuperTimer, superIdentifier *api.SuperIdentifier, emitter api.Emitter) error {
for {
if superIdentifier.Protocol != nil && superIdentifier.Protocol != &_protocol {
return errors.New("Identified by another protocol")
}
if isClient {
_, _, err := ReadRequest(b, tcpID)
_, _, err := ReadRequest(b, tcpID, superTimer)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
} else {
err := ReadResponse(b, tcpID, emitter)
err := ReadResponse(b, tcpID, superTimer, emitter)
if err != nil {
return err
}
superIdentifier.Protocol = &_protocol
}
}
}
@@ -131,26 +139,34 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, entryId string, resolve
}
request["url"] = summary
elapsedTime := item.Pair.Response.CaptureTime.Sub(item.Pair.Request.CaptureTime).Round(time.Millisecond).Milliseconds()
entryBytes, _ := json.Marshal(item.Pair)
return &api.MizuEntry{
ProtocolName: _protocol.Name,
ProtocolVersion: _protocol.Version,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: apiNames[apiKey],
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
ProtocolName: _protocol.Name,
ProtocolLongName: _protocol.LongName,
ProtocolAbbreviation: _protocol.Abbreviation,
ProtocolVersion: _protocol.Version,
ProtocolBackgroundColor: _protocol.BackgroundColor,
ProtocolForegroundColor: _protocol.ForegroundColor,
ProtocolFontSize: _protocol.FontSize,
ProtocolReferenceLink: _protocol.ReferenceLink,
EntryId: entryId,
Entry: string(entryBytes),
Url: fmt.Sprintf("%s%s", service, summary),
Method: apiNames[apiKey],
Status: 0,
RequestSenderIp: item.ConnectionInfo.ClientIP,
Service: service,
Timestamp: item.Timestamp,
ElapsedTime: elapsedTime,
Path: summary,
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
SourceIp: item.ConnectionInfo.ClientIP,
DestinationIp: item.ConnectionInfo.ServerIP,
SourcePort: item.ConnectionInfo.ClientPort,
DestinationPort: item.ConnectionInfo.ServerPort,
IsOutgoing: item.ConnectionInfo.IsOutgoing,
}
}
@@ -178,7 +194,9 @@ func (d dissecting) Summarize(entry *api.MizuEntry) *api.BaseEntryDetails {
}
}
func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error) {
func (d dissecting) Represent(entry *api.MizuEntry) (p api.Protocol, object []byte, bodySize int64, err error) {
p = _protocol
bodySize = 0
var root map[string]interface{}
json.Unmarshal([]byte(entry.Entry), &root)
representation := make(map[string]interface{}, 0)
@@ -224,8 +242,8 @@ func (d dissecting) Represent(entry *api.MizuEntry) (api.Protocol, []byte, error
representation["request"] = repRequest
representation["response"] = repResponse
object, err := json.Marshal(representation)
return _protocol, object, err
object, err = json.Marshal(representation)
return
}
var Dissector dissecting

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"reflect"
"time"
"github.com/up9inc/mizu/tap/api"
)
@@ -15,9 +16,10 @@ type Request struct {
CorrelationID int32
ClientID string
Payload interface{}
CaptureTime time.Time
}
func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16, err error) {
func ReadRequest(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer) (apiKey ApiKey, apiVersion int16, err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -213,6 +215,7 @@ func ReadRequest(r io.Reader, tcpID *api.TcpID) (apiKey ApiKey, apiVersion int16
ApiVersion: apiVersion,
CorrelationID: correlationID,
ClientID: clientID,
CaptureTime: superTimer.CaptureTime,
Payload: payload,
}

View File

@@ -13,9 +13,10 @@ type Response struct {
Size int32
CorrelationID int32
Payload interface{}
CaptureTime time.Time
}
func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error) {
func ReadResponse(r io.Reader, tcpID *api.TcpID, superTimer *api.SuperTimer, emitter api.Emitter) (err error) {
d := &decoder{reader: r, remain: 4}
size := d.readInt32()
@@ -39,6 +40,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
Size: size,
CorrelationID: correlationID,
Payload: payload,
CaptureTime: superTimer.CaptureTime,
}
key := fmt.Sprintf(
@@ -258,12 +260,12 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
item := &api.OutputChannelItem{
Protocol: _protocol,
Timestamp: time.Now().UnixNano() / int64(time.Millisecond),
Timestamp: reqResPair.Request.CaptureTime.UnixNano() / int64(time.Millisecond),
ConnectionInfo: connectionInfo,
Pair: &api.RequestResponsePair{
Request: api.GenericMessage{
IsRequest: true,
CaptureTime: time.Now(),
CaptureTime: reqResPair.Request.CaptureTime,
Payload: KafkaPayload{
Data: &KafkaWrapper{
Method: apiNames[apiKey],
@@ -274,7 +276,7 @@ func ReadResponse(r io.Reader, tcpID *api.TcpID, emitter api.Emitter) (err error
},
Response: api.GenericMessage{
IsRequest: false,
CaptureTime: time.Now(),
CaptureTime: reqResPair.Response.CaptureTime,
Payload: KafkaPayload{
Data: &KafkaWrapper{
Method: apiNames[apiKey],

View File

@@ -13,11 +13,13 @@ import (
"encoding/json"
"flag"
"fmt"
"io"
"log"
"os"
"os/signal"
"runtime"
"runtime/pprof"
"strconv"
"strings"
"sync"
"time"
@@ -61,7 +63,7 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k
var memprofile = flag.String("memprofile", "", "Write memory profile")
var statsTracker = StatsTracker{}
var appStats = api.AppStats{}
// global
var stats struct {
@@ -94,6 +96,8 @@ var ownIps []string // global
var hostMode bool // global
var extensions []*api.Extension // global
const baseStreamChannelTimeoutMs int = 5000 * 100
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments log.Printf
@@ -148,8 +152,8 @@ type Context struct {
CaptureInfo gopacket.CaptureInfo
}
func GetStats() AppStats {
return statsTracker.appStats
func GetStats() api.AppStats {
return appStats
}
func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
@@ -168,11 +172,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
}
func startMemoryProfiler() {
dirname := "/app/pprof"
rlog.Info("Profiling is on, results will be written to %s", dirname)
dumpPath := "/app/pprof"
envDumpPath := os.Getenv(MemoryProfilingDumpPath)
if envDumpPath != "" {
dumpPath = envDumpPath
}
timeInterval := 60
envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds)
if envTimeInterval != "" {
if i, err := strconv.Atoi(envTimeInterval); err == nil {
timeInterval = i
}
}
rlog.Info("Profiling is on, results will be written to %s", dumpPath)
go func() {
if _, err := os.Stat(dirname); os.IsNotExist(err) {
if err := os.Mkdir(dirname, 0777); err != nil {
if _, err := os.Stat(dumpPath); os.IsNotExist(err) {
if err := os.Mkdir(dumpPath, 0777); err != nil {
log.Fatal("could not create directory for profile: ", err)
}
}
@@ -180,9 +196,9 @@ func startMemoryProfiler() {
for true {
t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dirname, t.Format("15_04_05"))
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
rlog.Info("Writing memory profile to %s\n", filename)
rlog.Infof("Writing memory profile to %s\n", filename)
f, err := os.Create(filename)
if err != nil {
@@ -193,13 +209,50 @@ func startMemoryProfiler() {
log.Fatal("could not write memory profile: ", err)
}
_ = f.Close()
time.Sleep(time.Minute)
time.Sleep(time.Second * time.Duration(timeInterval))
}
}()
}
func closeTimedoutTcpStreamChannels() {
maxNumberOfGoroutines = GetMaxNumberOfGoroutines()
TcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs()
for {
time.Sleep(10 * time.Millisecond)
streams.Range(func(key interface{}, value interface{}) bool {
streamWrapper := value.(*tcpStreamWrapper)
stream := streamWrapper.stream
if stream.superIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
stream.Close()
appStats.IncDroppedTcpStreams()
rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {
for i := range stream.clients {
reader := &stream.clients[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
for i := range stream.servers {
reader := &stream.servers[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
stream.superIdentifier.IsClosedOthers = true
}
}
return true
})
}
}
func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
go closeTimedoutTcpStreamChannels()
defer util.Run()()
if *debug {
@@ -275,10 +328,11 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
source.Lazy = *lazy
source.NoCopy = true
rlog.Info("Starting to read packets")
statsTracker.setStartTime(time.Now())
appStats.SetStartTime(time.Now())
defragger := ip4defrag.NewIPv4Defragmenter()
var emitter api.Emitter = &api.Emitting{
AppStats: &appStats,
OutputChannel: outputItems,
}
@@ -321,7 +375,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
errorsSummery := fmt.Sprintf("%v", errorsMap)
errorsMapMutex.Unlock()
log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(statsTracker.appStats.StartTime),
time.Since(appStats.StartTime),
nErrors,
errorMapLen,
errorsSummery,
@@ -344,7 +398,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
cleanStats.closed,
cleanStats.deleted,
)
currentAppStats := statsTracker.dumpStats()
currentAppStats := appStats.DumpStats()
appStatsJSON, _ := json.Marshal(currentAppStats)
log.Printf("app stats - %v", string(appStatsJSON))
}
@@ -354,11 +408,18 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
startMemoryProfiler()
}
for packet := range source.Packets() {
packetsCount := statsTracker.incPacketsCount()
for {
packet, err := source.NextPacket()
if err == io.EOF {
break
} else if err != nil {
rlog.Debugf("Error:", err)
continue
}
packetsCount := appStats.IncPacketsCount()
rlog.Debugf("PACKET #%d", packetsCount)
data := packet.Data()
statsTracker.updateProcessedBytes(int64(len(data)))
appStats.UpdateProcessedBytes(uint64(len(data)))
if *hexdumppkt {
rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
@@ -392,7 +453,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
tcp := packet.Layer(layers.LayerTypeTCP)
if tcp != nil {
statsTracker.incTcpPacketsCount()
appStats.IncTcpPacketsCount()
tcp := tcp.(*layers.TCP)
if *checksum {
err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer())
@@ -410,15 +471,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
assemblerMutex.Unlock()
}
done := *maxcount > 0 && statsTracker.appStats.PacketsCount >= *maxcount
done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount
if done {
errorsMapMutex.Lock()
errorMapLen := len(errorsMap)
errorsMapMutex.Unlock()
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
statsTracker.appStats.PacketsCount,
statsTracker.appStats.ProcessedBytes,
time.Since(statsTracker.appStats.StartTime),
appStats.PacketsCount,
appStats.ProcessedBytes,
time.Since(appStats.StartTime),
nErrors,
errorMapLen)
}

View File

@@ -3,14 +3,21 @@ package tap
import (
"os"
"strconv"
"time"
)
const (
MemoryProfilingEnabledEnvVarName = "MEMORY_PROFILING_ENABLED"
MemoryProfilingDumpPath = "MEMORY_PROFILING_DUMP_PATH"
MemoryProfilingTimeIntervalSeconds = "MEMORY_PROFILING_TIME_INTERVAL"
MaxBufferedPagesTotalEnvVarName = "MAX_BUFFERED_PAGES_TOTAL"
MaxBufferedPagesPerConnectionEnvVarName = "MAX_BUFFERED_PAGES_PER_CONNECTION"
TcpStreamChannelTimeoutMsEnvVarName = "TCP_STREAM_CHANNEL_TIMEOUT_MS"
MaxNumberOfGoroutinesEnvVarName = "MAX_NUMBER_OF_GOROUTINES"
MaxBufferedPagesTotalDefaultValue = 5000
MaxBufferedPagesPerConnectionDefaultValue = 5000
TcpStreamChannelTimeoutMsDefaultValue = 5000
MaxNumberOfGoroutinesDefaultValue = 4000
)
type globalSettings struct {
@@ -47,6 +54,22 @@ func GetMaxBufferedPagesPerConnection() int {
return valueFromEnv
}
func GetTcpChannelTimeoutMs() time.Duration {
valueFromEnv, err := strconv.Atoi(os.Getenv(TcpStreamChannelTimeoutMsEnvVarName))
if err != nil {
return TcpStreamChannelTimeoutMsDefaultValue * time.Millisecond
}
return time.Duration(valueFromEnv) * time.Millisecond
}
func GetMaxNumberOfGoroutines() int {
valueFromEnv, err := strconv.Atoi(os.Getenv(MaxNumberOfGoroutinesEnvVarName))
if err != nil {
return MaxNumberOfGoroutinesDefaultValue
}
return valueFromEnv
}
func GetMemoryProfilingEnabled() bool {
return os.Getenv(MemoryProfilingEnabledEnvVarName) == "1"
}

View File

@@ -1,104 +0,0 @@
package tap
import (
"sync"
"time"
)
type AppStats struct {
StartTime time.Time `json:"-"`
ProcessedBytes int64 `json:"processedBytes"`
PacketsCount int64 `json:"packetsCount"`
TcpPacketsCount int64 `json:"tcpPacketsCount"`
ReassembledTcpPayloadsCount int64 `json:"reassembledTcpPayloadsCount"`
TlsConnectionsCount int64 `json:"tlsConnectionsCount"`
MatchedPairs int64 `json:"matchedPairs"`
}
type StatsTracker struct {
appStats AppStats
processedBytesMutex sync.Mutex
packetsCountMutex sync.Mutex
tcpPacketsCountMutex sync.Mutex
reassembledTcpPayloadsCountMutex sync.Mutex
tlsConnectionsCountMutex sync.Mutex
matchedPairsMutex sync.Mutex
}
func (st *StatsTracker) incMatchedPairs() {
st.matchedPairsMutex.Lock()
st.appStats.MatchedPairs++
st.matchedPairsMutex.Unlock()
}
func (st *StatsTracker) incPacketsCount() int64 {
st.packetsCountMutex.Lock()
st.appStats.PacketsCount++
currentPacketsCount := st.appStats.PacketsCount
st.packetsCountMutex.Unlock()
return currentPacketsCount
}
func (st *StatsTracker) incTcpPacketsCount() {
st.tcpPacketsCountMutex.Lock()
st.appStats.TcpPacketsCount++
st.tcpPacketsCountMutex.Unlock()
}
func (st *StatsTracker) incReassembledTcpPayloadsCount() {
st.reassembledTcpPayloadsCountMutex.Lock()
st.appStats.ReassembledTcpPayloadsCount++
st.reassembledTcpPayloadsCountMutex.Unlock()
}
func (st *StatsTracker) incTlsConnectionsCount() {
st.tlsConnectionsCountMutex.Lock()
st.appStats.TlsConnectionsCount++
st.tlsConnectionsCountMutex.Unlock()
}
func (st *StatsTracker) updateProcessedBytes(size int64) {
st.processedBytesMutex.Lock()
st.appStats.ProcessedBytes += size
st.processedBytesMutex.Unlock()
}
func (st *StatsTracker) setStartTime(startTime time.Time) {
st.appStats.StartTime = startTime
}
func (st *StatsTracker) dumpStats() *AppStats {
currentAppStats := &AppStats{StartTime: st.appStats.StartTime}
st.processedBytesMutex.Lock()
currentAppStats.ProcessedBytes = st.appStats.ProcessedBytes
st.appStats.ProcessedBytes = 0
st.processedBytesMutex.Unlock()
st.packetsCountMutex.Lock()
currentAppStats.PacketsCount = st.appStats.PacketsCount
st.appStats.PacketsCount = 0
st.packetsCountMutex.Unlock()
st.tcpPacketsCountMutex.Lock()
currentAppStats.TcpPacketsCount = st.appStats.TcpPacketsCount
st.appStats.TcpPacketsCount = 0
st.tcpPacketsCountMutex.Unlock()
st.reassembledTcpPayloadsCountMutex.Lock()
currentAppStats.ReassembledTcpPayloadsCount = st.appStats.ReassembledTcpPayloadsCount
st.appStats.ReassembledTcpPayloadsCount = 0
st.reassembledTcpPayloadsCountMutex.Unlock()
st.tlsConnectionsCountMutex.Lock()
currentAppStats.TlsConnectionsCount = st.appStats.TlsConnectionsCount
st.appStats.TlsConnectionsCount = 0
st.tlsConnectionsCountMutex.Unlock()
st.matchedPairsMutex.Lock()
currentAppStats.MatchedPairs = st.appStats.MatchedPairs
st.appStats.MatchedPairs = 0
st.matchedPairsMutex.Unlock()
return currentAppStats
}

View File

@@ -47,11 +47,12 @@ func (tid *tcpID) String() string {
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
msgQueue chan tcpReaderDataMsg // Channel of captured reassembled tcp payload
data []byte
captureTime time.Time
superTimer *api.SuperTimer
parent *tcpStream
messageCount uint
packetsSeen uint
@@ -59,6 +60,7 @@ type tcpReader struct {
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
sync.Mutex
}
func (h *tcpReader) Read(p []byte) (int, error) {
@@ -69,7 +71,7 @@ func (h *tcpReader) Read(p []byte) (int, error) {
msg, ok = <-h.msgQueue
h.data = msg.bytes
h.captureTime = msg.timestamp
h.superTimer.CaptureTime = msg.timestamp
if len(h.data) > 0 {
h.packetsSeen += 1
}
@@ -93,10 +95,19 @@ func (h *tcpReader) Read(p []byte) (int, error) {
return l, nil
}
func (h *tcpReader) Close() {
h.Lock()
if !h.isClosed {
h.isClosed = true
close(h.msgQueue)
}
h.Unlock()
}
func (h *tcpReader) run(wg *sync.WaitGroup) {
defer wg.Done()
b := bufio.NewReader(h)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.emitter)
err := h.extension.Dissector.Dissect(b, h.isClient, h.tcpID, h.counterPair, h.superTimer, h.parent.superIdentifier, h.emitter)
if err != nil {
io.Copy(ioutil.Discard, b)
}

View File

@@ -8,6 +8,7 @@ import (
"github.com/google/gopacket"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/tap/api"
)
/* It's a connection (bidirectional)
@@ -16,16 +17,19 @@ import (
* In our implementation, we pass information from ReassembledSG to the tcpReader through a shared channel.
*/
type tcpStream struct {
tcpstate *reassembly.TCPSimpleFSM
fsmerr bool
optchecker reassembly.TCPOptionCheck
net, transport gopacket.Flow
isDNS bool
isTapTarget bool
clients []tcpReader
servers []tcpReader
urls []string
ident string
id int64
isClosed bool
superIdentifier *api.SuperIdentifier
tcpstate *reassembly.TCPSimpleFSM
fsmerr bool
optchecker reassembly.TCPOptionCheck
net, transport gopacket.Flow
isDNS bool
isTapTarget bool
clients []tcpReader
servers []tcpReader
urls []string
ident string
sync.Mutex
}
@@ -143,14 +147,25 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
if length > 0 {
// This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object
statsTracker.incReassembledTcpPayloadsCount()
appStats.IncReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
if dir == reassembly.TCPDirClientToServer {
for _, reader := range t.clients {
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
for i := range t.clients {
reader := &t.clients[i]
reader.Lock()
if !reader.isClosed {
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
}
reader.Unlock()
}
} else {
for _, reader := range t.servers {
reader.msgQueue <- tcpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
for i := range t.servers {
reader := &t.servers[i]
reader.Lock()
if !reader.isClosed {
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
}
reader.Unlock()
}
}
}
@@ -159,14 +174,33 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Trace("%s: Connection closed", t.ident)
if t.isTapTarget {
for _, reader := range t.clients {
close(reader.msgQueue)
}
for _, reader := range t.servers {
close(reader.msgQueue)
}
if t.isTapTarget && !t.isClosed {
t.Close()
}
// do not remove the connection to allow last ACK
return false
}
func (t *tcpStream) Close() {
shouldReturn := false
t.Lock()
if t.isClosed {
shouldReturn = true
} else {
t.isClosed = true
}
t.Unlock()
if shouldReturn {
return
}
streams.Delete(t.id)
for i := range t.clients {
reader := &t.clients[i]
reader.Close()
}
for i := range t.servers {
reader := &t.servers[i]
reader.Close()
}
}

View File

@@ -2,7 +2,9 @@ package tap
import (
"fmt"
"runtime"
"sync"
"time"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap/api"
@@ -23,6 +25,16 @@ type tcpStreamFactory struct {
Emitter api.Emitter
}
type tcpStreamWrapper struct {
stream *tcpStream
createdAt time.Time
}
var streams *sync.Map = &sync.Map{} // global
var streamId int64 = 0
var maxNumberOfGoroutines int
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
rlog.Debugf("* NEW: %s %s", net, transport)
fsmOptions := reassembly.TCPSimpleFSMOptions{
@@ -39,23 +51,32 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
props := factory.getStreamProps(srcIp, srcPort, dstIp, dstPort)
isTapTarget := props.isTapTarget
stream := &tcpStream{
net: net,
transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isTapTarget: isTapTarget,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(),
net: net,
transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isTapTarget: isTapTarget,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(),
superIdentifier: &api.SuperIdentifier{},
}
if stream.isTapTarget {
if runtime.NumGoroutine() > maxNumberOfGoroutines {
appStats.IncDroppedTcpStreams()
rlog.Debugf("Dropped a TCP stream because of load. Total dropped: %d Total Goroutines: %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine())
return stream
}
streamId++
stream.id = streamId
for i, extension := range extensions {
counterPair := &api.CounterPair{
Request: 0,
Response: 0,
}
stream.clients = append(stream.clients, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
msgQueue: make(chan tcpReaderDataMsg),
superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: srcIp,
DstIP: dstIp,
@@ -71,8 +92,9 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
counterPair: counterPair,
})
stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
msgQueue: make(chan tcpReaderDataMsg),
superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
@@ -87,6 +109,12 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
emitter: factory.Emitter,
counterPair: counterPair,
})
streams.Store(stream.id, &tcpStreamWrapper{
stream: stream,
createdAt: time.Now(),
})
factory.wg.Add(2)
// Start reading from channel stream.reader.bytes
go stream.clients[i].run(&factory.wg)
@@ -117,7 +145,7 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, ds
}
return &streamProps{isTapTarget: false, isOutgoing: false}
} else {
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s -> %s:%s", srcIP, dstIP, dstPort))
rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort))
return &streamProps{isTapTarget: true}
}
}

22792
ui/package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -12,8 +12,8 @@
"@types/node": "^12.20.10",
"@types/react": "^17.0.3",
"@types/react-dom": "^17.0.3",
"jsonpath": "^1.1.1",
"axios": "^0.21.1",
"jsonpath": "^1.1.1",
"node-sass": "^5.0.0",
"numeral": "^2.0.6",
"protobuf-decoder": "^0.1.0",
@@ -21,7 +21,7 @@
"react-copy-to-clipboard": "^5.0.3",
"react-dom": "^17.0.2",
"react-scripts": "4.0.3",
"react-scrollable-feed": "^1.3.0",
"react-scrollable-feed-virtualized": "^1.4.2",
"react-syntax-highlighter": "^15.4.3",
"typescript": "^4.2.4",
"web-vitals": "^1.1.1"

View File

@@ -2,7 +2,7 @@ import {EntryItem} from "./EntryListItem/EntryListItem";
import React, {useCallback, useEffect, useMemo, useRef, useState} from "react";
import styles from './style/EntriesList.module.sass';
import spinner from './assets/spinner.svg';
import ScrollableFeed from "react-scrollable-feed";
import ScrollableFeedVirtualized from "react-scrollable-feed-virtualized";
import {StatusType} from "./Filters";
import Api from "../helpers/api";
import down from "./assets/downImg.svg";
@@ -77,9 +77,6 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
}
setIsLoadingTop(false);
const newEntries = [...data, ...entries];
if(newEntries.length >= 1000) {
newEntries.splice(1000);
}
setEntries(newEntries);
if(scrollTo) {
@@ -100,10 +97,6 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
}
scrollTo = document.getElementById(filteredEntries?.[filteredEntries.length -1]?.id);
let newEntries = [...entries, ...data];
if(newEntries.length >= 1000) {
setNoMoreDataTop(false);
newEntries = newEntries.slice(-1000);
}
setEntries(newEntries);
if(scrollTo) {
scrollTo.scrollIntoView({behavior: "smooth"});
@@ -116,19 +109,20 @@ export const EntriesList: React.FC<EntriesListProps> = ({entries, setEntries, fo
{isLoadingTop && <div className={styles.spinnerContainer}>
<img alt="spinner" src={spinner} style={{height: 25}}/>
</div>}
<ScrollableFeed ref={scrollableRef} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
<ScrollableFeedVirtualized ref={scrollableRef} itemHeight={48} marginTop={10} onScroll={(isAtBottom) => onScrollEvent(isAtBottom)}>
{noMoreDataTop && !connectionOpen && <div id="noMoreDataTop" className={styles.noMoreDataAvailable}>No more data available</div>}
{filteredEntries.map(entry => <EntryItem key={entry.id}
entry={entry}
setFocusedEntryId={setFocusedEntryId}
isSelected={focusedEntryId === entry.id}/>)}
{!connectionOpen && !noMoreDataBottom && <div className={styles.fetchButtonContainer}>
<div className={styles.styledButton} onClick={() => getNewEntries()}>Fetch more entries</div>
</div>}
</ScrollableFeed>
<button type="button"
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
onClick={(_) => scrollableRef.current.scrollToBottom()}>
isSelected={focusedEntryId === entry.id}
style={{}}/>)}
</ScrollableFeedVirtualized>
{!connectionOpen && !noMoreDataBottom && <div className={styles.fetchButtonContainer}>
<div className={styles.styledButton} onClick={() => getNewEntries()}>Fetch more entries</div>
</div>}
<button type="button"
className={`${styles.btnLive} ${scrollableList ? styles.showButton : styles.hideButton}`}
onClick={(_) => scrollableRef.current.jumpToBottom()}>
<img alt="down" src={down} />
</button>
</div>

View File

@@ -32,7 +32,7 @@ interface EntryDetailedProps {
export const formatSize = (n: number) => n > 1000 ? `${Math.round(n / 1000)}KB` : `${n} B`;
const EntryTitle: React.FC<any> = ({protocol, data}) => {
const EntryTitle: React.FC<any> = ({protocol, data, bodySize, elapsedTime}) => {
const classes = useStyles();
const {response} = JSON.parse(data.entry);
@@ -40,7 +40,8 @@ const EntryTitle: React.FC<any> = ({protocol, data}) => {
return <div className={classes.entryTitle}>
<Protocol protocol={protocol} horizontal={true}/>
<div style={{right: "30px", position: "absolute", display: "flex"}}>
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(response.payload.bodySize)}</div>}
{response.payload && <div style={{margin: "0 18px", opacity: 0.5}}>{formatSize(bodySize)}</div>}
<div style={{marginRight: 18, opacity: 0.5}}>{Math.round(elapsedTime)}ms</div>
<div style={{opacity: 0.5}}>{'rulesMatched' in data ? data.rulesMatched?.length : '0'} Rules Applied</div>
</div>
</div>;
@@ -63,10 +64,15 @@ const EntrySummary: React.FC<any> = ({data}) => {
export const EntryDetailed: React.FC<EntryDetailedProps> = ({entryData}) => {
return <>
<EntryTitle protocol={entryData.protocol} data={entryData.data}/>
<EntryTitle
protocol={entryData.protocol}
data={entryData.data}
bodySize={entryData.bodySize}
elapsedTime={entryData.data.elapsedTime}
/>
{entryData.data && <EntrySummary data={entryData.data}/>}
<>
{entryData.data && <EntryViewer representation={entryData.representation} color={entryData.protocol.background_color}/>}
{entryData.data && <EntryViewer representation={entryData.representation} color={entryData.protocol.backgroundColor}/>}
</>
</>
};

View File

@@ -2,7 +2,7 @@
.Entry
font-family: "Source Sans Pro", Lucida Grande, Tahoma, sans-serif
height: 100%
height: calc(100% - 70px)
width: 100%
h3,
@@ -44,6 +44,8 @@
border-top-right-radius: 0
.body
height: 100%
overflow-y: auto
background: $main-background-color
color: $blue-gray
border-radius: 4px

View File

@@ -16,14 +16,14 @@ interface Entry {
summary: string,
service: string,
id: string,
status_code?: number;
statusCode?: number;
url?: string;
timestamp: Date;
source_ip: string,
source_port: string,
destination_ip: string,
destination_port: string,
isOutgoing?: boolean;
sourceIp: string,
sourcePort: string,
destinationIp: string,
destinationPort: string,
isOutgoing?: boolean;
latency: number;
rules: Rules;
}
@@ -38,10 +38,11 @@ interface EntryProps {
entry: Entry;
setFocusedEntryId: (id: string) => void;
isSelected?: boolean;
style: object;
}
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected}) => {
const classification = getClassification(entry.status_code)
export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSelected, style}) => {
const classification = getClassification(entry.statusCode)
let ingoingIcon;
let outgoingIcon;
switch(classification) {
@@ -103,11 +104,17 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
className={`${styles.row}
${isSelected ? styles.rowSelected : backgroundColor}`}
onClick={() => setFocusedEntryId(entry.id)}
style={{border: isSelected ? `1px ${entry.protocol.background_color} solid` : "1px transparent solid"}}
style={{
border: isSelected ? `1px ${entry.protocol.backgroundColor} solid` : "1px transparent solid",
position: "absolute",
top: style['top'],
marginTop: style['marginTop'],
width: "calc(100% - 25px)",
}}
>
<Protocol protocol={entry.protocol} horizontal={false}/>
{((entry.protocol.name === "http" && "status_code" in entry) || entry.status_code !== 0) && <div>
<StatusCode statusCode={entry.status_code}/>
{((entry.protocol.name === "http" && "statusCode" in entry) || entry.statusCode !== 0) && <div>
<StatusCode statusCode={entry.statusCode}/>
</div>}
<div className={styles.endpointServiceContainer}>
<EndpointPath method={entry.method} path={entry.summary}/>
@@ -116,13 +123,13 @@ export const EntryItem: React.FC<EntryProps> = ({entry, setFocusedEntryId, isSel
</div>
</div>
<div className={styles.directionContainer}>
<span className={styles.port} title="Source Port">{entry.source_port}</span>
<span className={styles.port} title="Source Port">{entry.sourcePort}</span>
{entry.isOutgoing ?
<img src={outgoingIcon} alt="Ingoing traffic" title="Ingoing"/>
:
<img src={ingoingIcon} alt="Outgoing traffic" title="Outgoing"/>
}
<span className={styles.port} title="Destination Port">{entry.destination_port}</span>
<span className={styles.port} title="Destination Port">{entry.destinationPort}</span>
</div>
<div className={styles.timestamp}>
<span title="Timestamp">

View File

@@ -86,10 +86,6 @@ export const TrafficPage: React.FC<TrafficPageProps> = ({setAnalyzeStatus, onTLS
}
if (!focusedEntryId) setFocusedEntryId(entry.id)
let newEntries = [...entries];
if (entries.length === 1000) {
newEntries = newEntries.splice(1);
setNoMoreDataTop(false);
}
setEntries([...newEntries, entry])
if(listEntry.current) {
if(isScrollable(listEntry.current.firstChild)) {

View File

@@ -3,12 +3,12 @@ import styles from './style/Protocol.module.sass';
export interface ProtocolInterface {
name: string
long_name: string
longName: string
abbreviation: string
background_color: string
foreground_color: string
font_size: number
reference_link: string
backgroundColor: string
foregroundColor: string
fontSize: number
referenceLink: string
ports: string[]
inbound_ports: string
}
@@ -20,29 +20,29 @@ interface ProtocolProps {
const Protocol: React.FC<ProtocolProps> = ({protocol, horizontal}) => {
if (horizontal) {
return <a target="_blank" rel="noopener noreferrer" href={protocol.reference_link}>
return <a target="_blank" rel="noopener noreferrer" href={protocol.referenceLink}>
<span
className={`${styles.base} ${styles.horizontal}`}
style={{
backgroundColor: protocol.background_color,
color: protocol.foreground_color,
backgroundColor: protocol.backgroundColor,
color: protocol.foregroundColor,
fontSize: 13,
}}
title={protocol.abbreviation}
>
{protocol.long_name}
{protocol.longName}
</span>
</a>
} else {
return <a target="_blank" rel="noopener noreferrer" href={protocol.reference_link}>
return <a target="_blank" rel="noopener noreferrer" href={protocol.referenceLink}>
<span
className={`${styles.base} ${styles.vertical}`}
style={{
backgroundColor: protocol.background_color,
color: protocol.foreground_color,
fontSize: protocol.font_size,
backgroundColor: protocol.backgroundColor,
color: protocol.foregroundColor,
fontSize: protocol.fontSize,
}}
title={protocol.long_name}
title={protocol.longName}
>
{protocol.abbreviation}
</span>

View File

@@ -14,7 +14,6 @@
flex-direction: column
overflow: hidden
flex-grow: 1
padding-top: 20px
.footer
display: flex