mizu release 2021-06-21 (#79)

* Show pod name and namespace (#61)

* WIP

* Update main.go, consts.go, and 2 more files...

* Update messageSensitiveDataCleaner.go

* Update consts.go and messageSensitiveDataCleaner.go

* Update messageSensitiveDataCleaner.go

* Update main.go, consts.go, and 3 more files...

* WIP

* Update main.go, messageSensitiveDataCleaner.go, and 6 more files...

* Update main.go, messageSensitiveDataCleaner.go, and 3 more files...

* Update consts.go, messageSensitiveDataCleaner.go, and tap.go

* Update provider.go

* Update serializableRegexp.go

* Update tap.go

* TRA-3234 fetch with _source + no hard limit (#64)

* remove the HARD limit of 5000

* TRA-3299 Reduce footprint and Add Tolerances(#65)

* Use lib const for DNSClusterFirstWithHostNet.

* Whitespace.

* Break lines.

* Added affinity to pod names.

* Added tolerations to NoExecute and NoSchedule taints.

* Implementation of Mizu view command

* .

* .

* Update main.go and messageSensitiveDataCleaner.go

* Update main.go

* String and not pointers (#68)

* TRA-3318 - Cookies not null and fix har file names  (#69)

* no message

* TRA-3212 Passive-Tapper and Mizu share code (#70)

* Use log in tap package instead of fmt.

* Moved api/pkg/tap to root.

* Added go.mod and go.sum for tap.

* Added replace for shared.

* api uses tap module instead of tap package.

* Removed dependency of tap in shared by moving env var out of tap.

* Fixed compilation bugs.

* Fixed: Forgot to export struct field HostMode.

* Removed unused flag.

* Close har output channel when done.

* Moved websocket out of mizu and into passive-tapper.

* Send connection details over har output channel.

* Fixed compilation errors.

* Removed unused info from request response cache.

* Renamed connection -> connectionID.

* Fixed rename bug.

* Export setters and getters for filter ips and ports.

* Added tap dependency to Dockerfile.

* Uncomment error messages.

* Renamed `filterIpAddresses` -> `filterAuthorities`.

* Renamed ConnectionID -> ConnectionInfo.

* Fixed: Missed one replace.

* TRA-3342 Mizu/tap dump to har directory fails on Linux (#71)

* Instead of saving incomplete temp har files in a temp dir, save them in the output dir with a *.har.tmp suffix.

* API only loads har from *.har files (by extension).

* Add export entries endpoint for better up9 connect funcionality  (#72)

* no message
* no message
* no message

* Filter 'cookie' header

* Release action  (#73)

* Create main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* Update main.yml

* trying new approach

* no message

* yaml error

* no message

* no message

* no message

* missing )

* no message

* no message

* remove main.yml and fix branches

* Create tag-temp.yaml

* Update tag-temp.yaml

* Update tag-temp.yaml

* no message

* no message

* no message

* no message

* no message

* no message

* no message

* #minor

* no message

* no message

* added checksum calc to CLI makefile

* fixed build error - created bin directory upfront

* using markdown for release text

* use separate checksum files

* fixed release readme

* #minor

* readme updated

Co-authored-by: Alex Haiut <alex@up9.com>

* TRA-3360 Fix: Mizu ignores -n namespace flag and records traffic from all pods (#75)

Do not tap pods in namespaces which were not requested.

* added apple/m1 binary, updated readme (#77)

Co-authored-by: Alex Haiut <alex@up9.com>

* Update README.md (#78)

Co-authored-by: lirazyehezkel <61656597+lirazyehezkel@users.noreply.github.com>
Co-authored-by: RamiBerm <rami.berman@up9.com>
Co-authored-by: RamiBerm <54766858+RamiBerm@users.noreply.github.com>
Co-authored-by: gadotroee <55343099+gadotroee@users.noreply.github.com>
Co-authored-by: nimrod-up9 <59927337+nimrod-up9@users.noreply.github.com>
Co-authored-by: Igor Gov <igor.govorov1@gmail.com>
Co-authored-by: Alex Haiut <alex@up9.com>
This commit is contained in:
Alex Haiut
2021-06-21 15:17:31 +03:00
committed by GitHub
parent 5f2a4deb19
commit d3c023b3ba
46 changed files with 1250 additions and 883 deletions

View File

@@ -2,8 +2,8 @@ name: public-cli
on:
push:
branches:
- 'develop'
- 'main'
- develop
- main
jobs:
docker:
runs-on: ubuntu-latest
@@ -15,5 +15,32 @@ jobs:
with:
service_account_key: ${{ secrets.GCR_JSON_KEY }}
export_default_credentials: true
- uses: haya14busa/action-cond@v1
id: condval
with:
cond: ${{ github.ref == 'refs/heads/main' }}
if_true: "minor"
if_false: "patch"
- name: Auto Increment Semver Action
uses: MCKanpolat/auto-semver-action@1.0.5
id: versioning
with:
releaseType: ${{ steps.condval.outputs.value }}
github_token: ${{ secrets.GITHUB_TOKEN }}
- name: Get base image name
shell: bash
run: |
echo "##[set-output name=build_timestamp;]$(echo $(date +%s))"
echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/})"
id: version_parameters
- name: Build and Push CLI
run: make push-cli
run: make push-cli SEM_VER='${{ steps.versioning.outputs.version }}' BUILD_TIMESTAMP='${{ steps.version_parameters.outputs.build_timestamp }}'
- name: publish
uses: ncipollo/release-action@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}
artifacts: "cli/bin/*"
commit: ${{ steps.version_parameters.outputs.branch }}
tag: ${{ steps.versioning.outputs.version }}
prerelease: ${{ github.ref != 'refs/heads/main' }}
bodyFile: 'cli/bin/README.md'

View File

@@ -18,12 +18,14 @@ WORKDIR /app/api-build
COPY api/go.mod api/go.sum ./
COPY shared/go.mod shared/go.mod ../shared/
COPY tap/go.mod tap/go.mod ../tap/
RUN go mod download
# cheap trick to make the build faster (As long as go.mod wasn't changes)
RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get
# Copy and build api code
COPY shared ../shared
COPY tap ../tap
COPY api .
RUN go build -ldflags="-s -w" -o mizuagent .

View File

@@ -3,16 +3,76 @@ standalone web app traffic viewer for Kubernetes
## Download
Download `mizu` for your platform as
Download `mizu` for your platform and operating system
* for MacOS - `curl -o mizu https://static.up9.com/mizu/mizu-darwin-amd64 && chmod 755 mizu`
* for Linux - `curl -o mizu https://static.up9.com/mizu/mizu-linux-amd64 && chmod 755 mizu`
### Latest stable release
## Run
* for MacOS - Intel
```
curl -Lo mizu \
https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_amd64 \
&& chmod 755 mizu
```
* for MacOS - Apple Silicon
```
curl -Lo mizu \
https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_arm64 \
&& chmod 755 mizu
```
* for Linux - Intel 64bit
```
curl -Lo mizu \
https://github.com/up9inc/mizu/releases/latest/download/mizu_linux_amd64 \
&& chmod 755 mizu
```
SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/releases) page.
### Development (unstable) build
Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page.
## How to run
1. Find pod you'd like to tap to in your Kubernetes cluster
2. Run `mizu --pod podname`
2. Run `mizu PODNAME` or `mizu REGEX`
3. Open browser on `http://localhost:8899` as instructed ..
4. Watch the WebAPI traffic flowing ..
5. Type ^C to stop
## Examples
Run `mizu help` for usage options
To tap specific pod -
```
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
front-end-649fc5fd6-kqbtn 2/2 Running 0 7m
..
$ mizu tap front-end-649fc5fd6-kqbtn
+front-end-649fc5fd6-kqbtn
Web interface is now available at http://localhost:8899
^C
```
To tap multiple pods using regex -
```
$ kubectl get pods
NAME READY STATUS RESTARTS AGE
carts-66c77f5fbb-fq65r 2/2 Running 0 20m
catalogue-5f4cb7cf5-7zrmn 2/2 Running 0 20m
front-end-649fc5fd6-kqbtn 2/2 Running 0 20m
..
$ mizu tap "^ca.*"
+carts-66c77f5fbb-fq65r
+catalogue-5f4cb7cf5-7zrmn
Web interface is now available at http://localhost:8899
^C
```

View File

@@ -4,6 +4,7 @@ go 1.16
require (
github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a
github.com/beevik/etree v1.1.0
github.com/djherbis/atime v1.0.0
github.com/fasthttp/websocket v1.4.3-beta.1 // indirect
github.com/go-playground/locales v0.13.0
@@ -17,6 +18,7 @@ require (
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap v0.0.0
go.mongodb.org/mongo-driver v1.5.1
golang.org/x/net v0.0.0-20210421230115-4e50805a0758
gorm.io/driver/sqlite v1.1.4
@@ -27,3 +29,4 @@ require (
)
replace github.com/up9inc/mizu/shared v0.0.0 => ../shared
replace github.com/up9inc/mizu/tap v0.0.0 => ../tap

View File

@@ -48,6 +48,8 @@ github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a h1:76llBl
github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a/go.mod h1:QvDfsDQDmGxUsvEeWabVZ5pp2FMXpOkwQV0L6SE6cp0=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48=
github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs=
github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=

View File

@@ -7,11 +7,12 @@ import (
"github.com/gofiber/fiber/v2"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/api"
"mizuserver/pkg/middleware"
"mizuserver/pkg/models"
"mizuserver/pkg/routes"
"mizuserver/pkg/tap"
"mizuserver/pkg/sensitiveDataFiltering"
"mizuserver/pkg/utils"
"os"
"os/signal"
@@ -22,19 +23,24 @@ var aggregator = flag.Bool("aggregator", false, "Run in aggregator mode with API
var standalone = flag.Bool("standalone", false, "Run in standalone tapper and API mode")
var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu collector for tapping")
const nodeNameEnvVar = "NODE_NAME"
const tappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST"
func main() {
flag.Parse()
hostMode := os.Getenv(shared.HostModeEnvVar) == "1"
tapOpts := &tap.TapOpts{HostMode: hostMode}
if !*shouldTap && !*aggregator && !*standalone{
panic("One of the flags --tap, --api or --standalone must be provided")
}
if *standalone {
harOutputChannel := tap.StartPassiveTapper()
go api.StartReadingEntries(harOutputChannel, tap.HarOutputDir)
harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
filteredHarChannel := make(chan *tap.OutputChannelItem)
go filterHarHeaders(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions())
go api.StartReadingEntries(filteredHarChannel, nil)
go api.StartReadingOutbound(outboundLinkOutputChannel)
hostApi(nil)
} else if *shouldTap {
if *aggregatorAddress == "" {
@@ -43,19 +49,26 @@ func main() {
tapTargets := getTapTargets()
if tapTargets != nil {
tap.HostAppAddresses = tapTargets
fmt.Println("Filtering for the following addresses:", tap.HostAppAddresses)
tap.SetFilterAuthorities(tapTargets)
fmt.Println("Filtering for the following authorities:", tap.GetFilterIPs())
}
harOutputChannel := tap.StartPassiveTapper()
harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts)
socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false)
if err != nil {
panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err))
}
go pipeChannelToSocket(socketConnection, harOutputChannel)
go api.StartReadingOutbound(outboundLinkOutputChannel)
} else if *aggregator {
socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000)
go api.StartReadingEntries(socketHarOutChannel, nil)
filteredHarChannel := make(chan *tap.OutputChannelItem)
go api.StartReadingEntries(filteredHarChannel, nil)
go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions())
hostApi(socketHarOutChannel)
}
@@ -89,15 +102,36 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) {
func getTapTargets() []string {
nodeName := os.Getenv(nodeNameEnvVar)
nodeName := os.Getenv(shared.NodeNameEnvVar)
var tappedAddressesPerNodeDict map[string][]string
err := json.Unmarshal([]byte(os.Getenv(tappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict)
err := json.Unmarshal([]byte(os.Getenv(shared.TappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict)
if err != nil {
panic(fmt.Sprintf("env var value of %s is invalid! must be map[string][]string %v", tappedAddressesPerNodeDict, err))
panic(fmt.Sprintf("env var %s's value of %s is invalid! must be map[string][]string %v", shared.TappedAddressesPerNodeDictEnvVar, tappedAddressesPerNodeDict, err))
}
return tappedAddressesPerNodeDict[nodeName]
}
func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
if filteringOptionsJson == "" {
return nil
}
var filteringOptions shared.TrafficFilteringOptions
err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions)
if err != nil {
panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err))
}
return &filteringOptions
}
func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
for message := range inChannel {
sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
outChannel <- message
}
}
func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) {
if connection == nil {
panic("Websocket connection is nil")

View File

@@ -5,18 +5,20 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/google/martian/har"
"go.mongodb.org/mongo-driver/bson/primitive"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/resolver"
"mizuserver/pkg/tap"
"mizuserver/pkg/utils"
"net/url"
"os"
"path"
"sort"
"strings"
"time"
"github.com/google/martian/har"
"github.com/up9inc/mizu/tap"
"go.mongodb.org/mongo-driver/bson/primitive"
)
var k8sResolver *resolver.Resolver
@@ -57,14 +59,21 @@ func startReadingFiles(workingDir string) {
for true {
dir, _ := os.Open(workingDir)
dirFiles, _ := dir.Readdir(-1)
sort.Sort(utils.ByModTime(dirFiles))
if len(dirFiles) == 0 {
var harFiles []os.FileInfo
for _, fileInfo := range dirFiles {
if strings.HasSuffix(fileInfo.Name(), ".har") {
harFiles = append(harFiles, fileInfo)
}
}
sort.Sort(utils.ByModTime(harFiles))
if len(harFiles) == 0 {
fmt.Printf("Waiting for new files\n")
time.Sleep(3 * time.Second)
continue
}
fileInfo := dirFiles[0]
fileInfo := harFiles[0]
inputFilePath := path.Join(workingDir, fileInfo.Name())
file, err := os.Open(inputFilePath)
utils.CheckErr(err)
@@ -88,17 +97,25 @@ func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) {
}
for item := range outputItems {
saveHarToDb(item.HarEntry, item.RequestSenderIp)
saveHarToDb(item.HarEntry, item.ConnectionInfo.ClientIP)
}
}
func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
// tcpStreamFactory will block on write to channel. Empty channel to unblock.
// TODO: Make write to channel optional.
for range outboundLinkChannel {
}
}
func saveHarToDb(entry *har.Entry, sender string) {
entryBytes, _ := json.Marshal(entry)
serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL)
entryId := primitive.NewObjectID().Hex()
var (
resolvedSource *string
resolvedDestination *string
resolvedSource string
resolvedDestination string
)
if k8sResolver != nil {
resolvedSource = k8sResolver.Resolve(sender)

View File

@@ -5,10 +5,10 @@ import (
"fmt"
"github.com/antoniodipinto/ikisocket"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/controllers"
"mizuserver/pkg/models"
"mizuserver/pkg/routes"
"mizuserver/pkg/tap"
)
var browserClientSocketUUIDs = make([]string, 0)

View File

@@ -9,6 +9,7 @@ import (
"mizuserver/pkg/models"
"mizuserver/pkg/utils"
"mizuserver/pkg/validation"
"time"
)
const (
@@ -64,7 +65,7 @@ func GetEntries(c *fiber.Ctx) error {
return c.Status(fiber.StatusOK).JSON(baseEntries)
}
func GetHAR(c *fiber.Ctx) error {
func GetHARs(c *fiber.Ctx) error {
entriesFilter := &models.HarFetchRequestBody{}
order := OrderDesc
if err := c.QueryParser(entriesFilter); err != nil {
@@ -75,11 +76,23 @@ func GetHAR(c *fiber.Ctx) error {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
var timestampFrom, timestampTo int64
if entriesFilter.From < 0 {
timestampFrom = 0
} else {
timestampFrom = entriesFilter.From
}
if entriesFilter.To <= 0 {
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
} else {
timestampTo = entriesFilter.To
}
var entries []models.MizuEntry
database.GetEntriesTable().
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
// Where(fmt.Sprintf("timestamp %s %v", operatorSymbol, entriesFilter.Timestamp)).
Limit(1000).
Find(&entries)
if len(entries) > 0 {
@@ -87,26 +100,28 @@ func GetHAR(c *fiber.Ctx) error {
utils.ReverseSlice(entries)
}
harsObject := map[string]*har.HAR{}
harsObject := map[string]*models.ExtendedHAR{}
for _, entryData := range entries {
harEntryObject := []byte(entryData.Entry)
var harEntry har.Entry
_ = json.Unmarshal(harEntryObject, &harEntry)
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
sourceOfEntry := *entryData.ResolvedSource
if harOfSource, ok := harsObject[sourceOfEntry]; ok {
sourceOfEntry := entryData.ResolvedSource
fileName := fmt.Sprintf("%s.har", sourceOfEntry)
if harOfSource, ok := harsObject[fileName]; ok {
harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry)
} else {
var entriesHar []*har.Entry
entriesHar = append(entriesHar, &harEntry)
harsObject[sourceOfEntry] = &har.HAR{
Log: &har.Log{
harsObject[fileName] = &models.ExtendedHAR{
Log: &models.ExtendedLog{
Version: "1.2",
Creator: &har.Creator{
Name: "mizu",
Version: "0.0.1",
Creator: &models.ExtendedCreator{
Creator: &har.Creator{
Name: "mizu",
Version: "0.0.2",
},
Source: sourceOfEntry,
},
Entries: entriesHar,
},
@@ -123,6 +138,50 @@ func GetHAR(c *fiber.Ctx) error {
return c.Status(fiber.StatusOK).SendStream(buffer)
}
func GetFullEntries(c *fiber.Ctx) error {
entriesFilter := &models.HarFetchRequestBody{}
order := OrderDesc
if err := c.QueryParser(entriesFilter); err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
err := validation.Validate(entriesFilter)
if err != nil {
return c.Status(fiber.StatusBadRequest).JSON(err)
}
var timestampFrom, timestampTo int64
if entriesFilter.From < 0 {
timestampFrom = 0
} else {
timestampFrom = entriesFilter.From
}
if entriesFilter.To <= 0 {
timestampTo = time.Now().UnixNano() / int64(time.Millisecond)
} else {
timestampTo = entriesFilter.To
}
var entries []models.MizuEntry
database.GetEntriesTable().
Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)).
Order(fmt.Sprintf("timestamp %s", order)).
Find(&entries)
if len(entries) > 0 {
// the entries always order from oldest to newest so we should revers
utils.ReverseSlice(entries)
}
entriesArray := make([]har.Entry, 0)
for _, entryData := range entries {
var harEntry har.Entry
_ = json.Unmarshal([]byte(entryData.Entry), &harEntry)
entriesArray = append(entriesArray, harEntry)
}
return c.Status(fiber.StatusOK).JSON(entriesArray)
}
func GetEntry(c *fiber.Ctx) error {
var entryData models.EntryData
database.GetEntriesTable().
@@ -134,8 +193,8 @@ func GetEntry(c *fiber.Ctx) error {
unmarshallErr := json.Unmarshal([]byte(entryData.Entry), &fullEntry)
utils.CheckErr(unmarshallErr)
if entryData.ResolvedDestination != nil {
fullEntry.Request.URL = utils.SetHostname(fullEntry.Request.URL, *entryData.ResolvedDestination)
if entryData.ResolvedDestination != "" {
fullEntry.Request.URL = utils.SetHostname(fullEntry.Request.URL, entryData.ResolvedDestination)
}
return c.Status(fiber.StatusOK).JSON(fullEntry)

View File

@@ -2,8 +2,9 @@ package models
import (
"encoding/json"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
"mizuserver/pkg/tap"
"github.com/up9inc/mizu/tap"
"time"
)
@@ -11,17 +12,17 @@ type MizuEntry struct {
ID uint `gorm:"primarykey"`
CreatedAt time.Time
UpdatedAt time.Time
Entry string `json:"entry,omitempty" gorm:"column:entry"`
EntryId string `json:"entryId" gorm:"column:entryId"`
Url string `json:"url" gorm:"column:url"`
Method string `json:"method" gorm:"column:method"`
Status int `json:"status" gorm:"column:status"`
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource *string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination *string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
Entry string `json:"entry,omitempty" gorm:"column:entry"`
EntryId string `json:"entryId" gorm:"column:entryId"`
Url string `json:"url" gorm:"column:url"`
Method string `json:"method" gorm:"column:method"`
Status int `json:"status" gorm:"column:status"`
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
Service string `json:"service" gorm:"column:service"`
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
Path string `json:"path" gorm:"column:path"`
ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
}
type BaseEntryDetails struct {
@@ -36,8 +37,8 @@ type BaseEntryDetails struct {
}
type EntryData struct {
Entry string `json:"entry,omitempty"`
ResolvedDestination *string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
Entry string `json:"entry,omitempty"`
ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
}
type EntriesFilter struct {
@@ -47,7 +48,8 @@ type EntriesFilter struct {
}
type HarFetchRequestBody struct {
Limit int `query:"limit" validate:"max=5000"`
From int64 `query:"from"`
To int64 `query:"to"`
}
type WebSocketEntryMessage struct {
@@ -55,7 +57,6 @@ type WebSocketEntryMessage struct {
Data *BaseEntryDetails `json:"data,omitempty"`
}
type WebSocketTappedEntryMessage struct {
*shared.WebSocketMessageMetadata
Data *tap.OutputChannelItem
@@ -80,3 +81,23 @@ func CreateWebsocketTappedEntryMessage(base *tap.OutputChannelItem) ([]byte, err
}
return json.Marshal(message)
}
// ExtendedHAR is the top level object of a HAR log.
type ExtendedHAR struct {
Log *ExtendedLog `json:"log"`
}
// ExtendedLog is the HAR HTTP request and response log.
type ExtendedLog struct {
// Version number of the HAR format.
Version string `json:"version"`
// Creator holds information about the log creator application.
Creator *ExtendedCreator `json:"creator"`
// Entries is a list containing requests and responses.
Entries []*har.Entry `json:"entries"`
}
type ExtendedCreator struct {
*har.Creator
Source string `json:"_source"`
}

View File

@@ -33,12 +33,12 @@ func (resolver *Resolver) Start(ctx context.Context) {
}
}
func (resolver *Resolver) Resolve(name string) *string {
func (resolver *Resolver) Resolve(name string) string {
resolvedName, isFound := resolver.nameMap[name]
if !isFound {
return nil
return ""
}
return &resolvedName
return resolvedName
}
func (resolver *Resolver) watchPods(ctx context.Context) error {

View File

@@ -11,8 +11,11 @@ func EntriesRoutes(fiberApp *fiber.App) {
routeGroup.Get("/entries", controllers.GetEntries) // get entries (base/thin entries)
routeGroup.Get("/entries/:entryId", controllers.GetEntry) // get single (full) entry
routeGroup.Get("/exportEntries", controllers.GetFullEntries)
routeGroup.Get("/har", controllers.GetHARs)
routeGroup.Get("/har", controllers.GetHAR)
routeGroup.Get("/resetDB", controllers.DeleteAllEntries) // get single (full) entry
routeGroup.Get("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB

View File

@@ -0,0 +1,10 @@
package sensitiveDataFiltering
const maskedFieldPlaceholderValue = "[REDACTED]"
//these values MUST be all lower case and contain no `-` or `_` characters
var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password",
"username", "user", "key", "passcode", "pass", "auth", "authtoken", "jwt",
"bearer", "clientid", "clientsecret", "redirecturi", "phonenumber",
"zip", "zipcode", "address", "country", "firstname", "lastname",
"middlename", "fname", "lname", "birthdate"}

View File

@@ -0,0 +1,198 @@
package sensitiveDataFiltering
import (
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"github.com/up9inc/mizu/tap"
"net/url"
"strings"
"github.com/beevik/etree"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
)
func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, options *shared.TrafficFilteringOptions) {
harOutputItem.HarEntry.Request.Headers = filterHarHeaders(harOutputItem.HarEntry.Request.Headers)
harOutputItem.HarEntry.Response.Headers = filterHarHeaders(harOutputItem.HarEntry.Response.Headers)
harOutputItem.HarEntry.Request.Cookies = make([]har.Cookie, 0, 0)
harOutputItem.HarEntry.Response.Cookies = make([]har.Cookie, 0, 0)
harOutputItem.HarEntry.Request.URL = filterUrl(harOutputItem.HarEntry.Request.URL)
for i, queryString := range harOutputItem.HarEntry.Request.QueryString {
if isFieldNameSensitive(queryString.Name) {
harOutputItem.HarEntry.Request.QueryString[i].Value = maskedFieldPlaceholderValue
}
}
if harOutputItem.HarEntry.Request.PostData != nil {
requestContentType := getContentTypeHeaderValue(harOutputItem.HarEntry.Request.Headers)
filteredRequestBody, err := filterHttpBody([]byte(harOutputItem.HarEntry.Request.PostData.Text), requestContentType, options)
if err == nil {
harOutputItem.HarEntry.Request.PostData.Text = string(filteredRequestBody)
}
}
if harOutputItem.HarEntry.Response.Content != nil {
responseContentType := getContentTypeHeaderValue(harOutputItem.HarEntry.Response.Headers)
filteredResponseBody, err := filterHttpBody(harOutputItem.HarEntry.Response.Content.Text, responseContentType, options)
if err == nil {
harOutputItem.HarEntry.Response.Content.Text = filteredResponseBody
}
}
}
func filterHarHeaders(headers []har.Header) []har.Header {
newHeaders := make([]har.Header, 0)
for i, header := range headers {
if strings.ToLower(header.Name) == "cookie" {
continue
} else if isFieldNameSensitive(header.Name) {
newHeaders = append(newHeaders, har.Header{Name: header.Name, Value: maskedFieldPlaceholderValue})
headers[i].Value = maskedFieldPlaceholderValue
} else {
newHeaders = append(newHeaders, header)
}
}
return newHeaders
}
func getContentTypeHeaderValue(headers []har.Header) string {
for _, header := range headers {
if strings.ToLower(header.Name) == "content-type" {
return header.Value
}
}
return ""
}
func isFieldNameSensitive(fieldName string) bool {
name := strings.ToLower(fieldName)
name = strings.ReplaceAll(name, "_", "")
name = strings.ReplaceAll(name, "-", "")
name = strings.ReplaceAll(name, " ", "")
for _, sensitiveField := range personallyIdentifiableDataFields {
if strings.Contains(name, sensitiveField) {
return true
}
}
return false
}
func filterHttpBody(bytes []byte, contentType string, options *shared.TrafficFilteringOptions) ([]byte, error) {
mimeType := strings.Split(contentType, ";")[0]
switch strings.ToLower(mimeType) {
case "application/json":
return filterJsonBody(bytes)
case "text/html":
fallthrough
case "application/xhtml+xml":
fallthrough
case "text/xml":
fallthrough
case "application/xml":
return filterXmlEtree(bytes)
case "text/plain":
if options != nil && options.PlainTextMaskingRegexes != nil {
return filterPlainText(bytes, options), nil
}
}
return bytes, nil
}
func filterPlainText(bytes []byte, options *shared.TrafficFilteringOptions) []byte {
for _, regex := range options.PlainTextMaskingRegexes {
bytes = regex.ReplaceAll(bytes, []byte(maskedFieldPlaceholderValue))
}
return bytes
}
func filterXmlEtree(bytes []byte) ([]byte, error) {
if !IsValidXML(bytes) {
return nil, errors.New("Invalid XML")
}
xmlDoc := etree.NewDocument()
err := xmlDoc.ReadFromBytes(bytes)
if err != nil {
return nil, err
} else {
filterXmlElement(xmlDoc.Root())
}
return xmlDoc.WriteToBytes()
}
func IsValidXML(data []byte) bool {
return xml.Unmarshal(data, new(interface{})) == nil
}
func filterXmlElement(element *etree.Element) {
for i, attribute := range element.Attr {
if isFieldNameSensitive(attribute.Key) {
element.Attr[i].Value = maskedFieldPlaceholderValue
}
}
if element.ChildElements() == nil || len(element.ChildElements()) == 0 {
if isFieldNameSensitive(element.Tag) {
element.SetText(maskedFieldPlaceholderValue)
}
} else {
for _, element := range element.ChildElements() {
filterXmlElement(element)
}
}
}
func filterJsonBody(bytes []byte) ([]byte, error) {
var bodyJsonMap map[string] interface{}
err := json.Unmarshal(bytes ,&bodyJsonMap)
if err != nil {
return nil, err
}
filterJsonMap(bodyJsonMap)
return json.Marshal(bodyJsonMap)
}
func filterJsonMap(jsonMap map[string] interface{}) {
for key, value := range jsonMap {
if value == nil {
return
}
nestedMap, isNested := value.(map[string] interface{})
if isNested {
filterJsonMap(nestedMap)
} else {
if isFieldNameSensitive(key) {
jsonMap[key] = maskedFieldPlaceholderValue
}
}
}
}
// receives string representing url, returns string url without sensitive query param values (http://service/api?userId=bob&password=123&type=login -> http://service/api?userId=[REDACTED]&password=[REDACTED]&type=login)
func filterUrl(originalUrl string) string {
parsedUrl, err := url.Parse(originalUrl)
if err != nil {
return fmt.Sprintf("http://%s", maskedFieldPlaceholderValue)
} else {
if len(parsedUrl.RawQuery) > 0 {
newQueryArgs := make([]string, 0)
for urlQueryParamName, urlQueryParamValues := range parsedUrl.Query() {
newValues := urlQueryParamValues
if isFieldNameSensitive(urlQueryParamName) {
newValues = []string {maskedFieldPlaceholderValue}
}
for _, paramValue := range newValues {
newQueryArgs = append(newQueryArgs, fmt.Sprintf("%s=%s", urlQueryParamName, paramValue))
}
}
parsedUrl.RawQuery = strings.Join(newQueryArgs, "&")
}
return parsedUrl.String()
}
}

View File

@@ -1,209 +0,0 @@
package tap
import (
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/orcaman/concurrent-map"
)
type requestResponsePair struct {
Request httpMessage `json:"request"`
Response httpMessage `json:"response"`
}
type envoyMessageWrapper struct {
HttpBufferedTrace requestResponsePair `json:"http_buffered_trace"`
}
type headerKeyVal struct {
Key string `json:"key"`
Value string `json:"value"`
}
type messageBody struct {
Truncated bool `json:"truncated"`
AsBytes string `json:"as_bytes"`
}
type httpMessage struct {
IsRequest bool
Headers []headerKeyVal `json:"headers"`
HTTPVersion string `json:"httpVersion"`
Body messageBody `json:"body"`
captureTime time.Time
orig interface {}
requestSenderIp string
}
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}
type requestResponseMatcher struct {
openMessagesMap cmap.ConcurrentMap
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()}
return *newMatcher
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, body string, isHTTP2 bool) *envoyMessageWrapper {
split := splitIdent(ident)
key := genKey(split)
messageExtraHeaders := []headerKeyVal{
{Key: "x-up9-source", Value: split[0]},
{Key: "x-up9-destination", Value: split[1] + ":" + split[3]},
}
requestHTTPMessage := requestToMessage(request, captureTime, body, &messageExtraHeaders, isHTTP2, split[0])
if response, found := matcher.openMessagesMap.Pop(key); found {
// Type assertion always succeeds because all of the map's values are of httpMessage type
responseHTTPMessage := response.(*httpMessage)
if responseHTTPMessage.IsRequest {
SilentError("Request-Duplicate", "Got duplicate request with same identifier\n")
return nil
}
Debug("Matched open Response for %s\n", key)
return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage)
}
matcher.openMessagesMap.Set(key, &requestHTTPMessage)
Debug("Registered open Request for %s\n", key)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, body string, isHTTP2 bool) *envoyMessageWrapper {
split := splitIdent(ident)
key := genKey(split)
responseHTTPMessage := responseToMessage(response, captureTime, body, isHTTP2)
if request, found := matcher.openMessagesMap.Pop(key); found {
// Type assertion always succeeds because all of the map's values are of httpMessage type
requestHTTPMessage := request.(*httpMessage)
if !requestHTTPMessage.IsRequest {
SilentError("Response-Duplicate", "Got duplicate response with same identifier\n")
return nil
}
Debug("Matched open Request for %s\n", key)
return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage)
}
matcher.openMessagesMap.Set(key, &responseHTTPMessage)
Debug("Registered open Response for %s\n", key)
return nil
}
func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) *envoyMessageWrapper {
matcher.addDuration(requestHTTPMessage, responseHTTPMessage)
return &envoyMessageWrapper{
HttpBufferedTrace: requestResponsePair{
Request: *requestHTTPMessage,
Response: *responseHTTPMessage,
},
}
}
func requestToMessage(request *http.Request, captureTime time.Time, body string, messageExtraHeaders *[]headerKeyVal, isHTTP2 bool, requestSenderIp string) httpMessage {
messageHeaders := make([]headerKeyVal, 0)
for key, value := range request.Header {
messageHeaders = append(messageHeaders, headerKeyVal{Key: key, Value: value[0]})
}
if !isHTTP2 {
messageHeaders = append(messageHeaders, headerKeyVal{Key: ":method", Value: request.Method})
messageHeaders = append(messageHeaders, headerKeyVal{Key: ":path", Value: request.RequestURI})
messageHeaders = append(messageHeaders, headerKeyVal{Key: ":authority", Value: request.Host})
messageHeaders = append(messageHeaders, headerKeyVal{Key: ":scheme", Value: "http"})
}
messageHeaders = append(messageHeaders, headerKeyVal{Key: "x-request-start", Value: fmt.Sprintf("%.3f", float64(captureTime.UnixNano()) / float64(1000000000))})
messageHeaders = append(messageHeaders, *messageExtraHeaders...)
httpVersion := request.Proto
requestBody := messageBody{Truncated: false, AsBytes: body}
return httpMessage{
IsRequest: true,
Headers: messageHeaders,
HTTPVersion: httpVersion,
Body: requestBody,
captureTime: captureTime,
orig: request,
requestSenderIp: requestSenderIp,
}
}
func responseToMessage(response *http.Response, captureTime time.Time, body string, isHTTP2 bool) httpMessage {
messageHeaders := make([]headerKeyVal, 0)
for key, value := range response.Header {
messageHeaders = append(messageHeaders, headerKeyVal{Key: key, Value: value[0]})
}
if !isHTTP2 {
messageHeaders = append(messageHeaders, headerKeyVal{Key: ":status", Value: strconv.Itoa(response.StatusCode)})
}
httpVersion := response.Proto
requestBody := messageBody{Truncated: false, AsBytes: body}
return httpMessage{
IsRequest: false,
Headers: messageHeaders,
HTTPVersion: httpVersion,
Body: requestBody,
captureTime: captureTime,
orig: response,
}
}
func (matcher *requestResponseMatcher) addDuration(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) {
durationMs := float64(responseHTTPMessage.captureTime.UnixNano() / 1000000) - float64(requestHTTPMessage.captureTime.UnixNano() / 1000000)
if durationMs < 1 {
durationMs = 1
}
responseHTTPMessage.Headers = append(responseHTTPMessage.Headers, headerKeyVal{Key: "x-up9-duration-ms", Value: fmt.Sprintf("%.0f", durationMs)})
}
func splitIdent(ident string) []string {
ident = strings.Replace(ident, "->", " ", -1)
return strings.Split(ident, " ")
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
return key
}
func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int {
keysToPop := make([]string, 0)
for item := range matcher.openMessagesMap.IterBuffered() {
// Map only contains values of type httpMessage
message, _ := item.Val.(*httpMessage)
if message.captureTime.Before(t) {
keysToPop = append(keysToPop, item.Key)
}
}
numDeleted := len(keysToPop)
for _, key := range keysToPop {
_, _ = matcher.openMessagesMap.Pop(key)
}
return numDeleted
}

View File

@@ -1,239 +0,0 @@
package tap
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
"github.com/patrickmn/go-cache"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
)
var (
newline = []byte{'\n'}
space = []byte{' '}
hub *Hub
outboundSocketNotifyExpiringCache = cache.New(outboundThrottleCacheExpiryPeriod, outboundThrottleCacheExpiryPeriod)
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func (_ *http.Request) bool { return true },
}
// Client is a middleman between the websocket connection and the hub.
type Client struct {
hub *Hub
// The websocket connection.
conn *websocket.Conn
// Buffered channel of outbound messages.
send chan []byte
}
type OutBoundLinkMessage struct {
SourceIP string `json:"sourceIP"`
IP string `json:"ip"`
Port int `json:"port"`
Type string `json:"type"`
}
// readPump pumps messages from the websocket connection to the hub.
//
// The application runs readPump in a per-connection goroutine. The application
// ensures that there is at most one reader on a connection by executing all
// reads from this goroutine.
func (c *Client) readPump() {
defer func() {
c.hub.unregister <- c
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
c.conn.SetReadDeadline(time.Now().Add(pongWait))
c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
for {
_, message, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
c.hub.onMessageCallback(message)
}
}
// writePump pumps messages from the hub to the websocket connection.
//
// A goroutine running writePump is started for each connection. The
// application ensures that there is at most one writer to a connection by
// executing all writes from this goroutine.
func (c *Client) writePump() {
ticker := time.NewTicker(pingPeriod)
defer func() {
ticker.Stop()
c.conn.Close()
}()
for {
select {
case message, ok := <-c.send:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel.
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
w, err := c.conn.NextWriter(websocket.TextMessage)
if err != nil {
return
}
w.Write(message)
if err := w.Close(); err != nil {
return
}
case <-ticker.C:
c.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
type Hub struct {
// Registered clients.
clients map[*Client]bool
// Inbound messages from the clients.
broadcast chan []byte
// Register requests from the clients.
register chan *Client
// Unregister requests from clients.
unregister chan *Client
// Handle messages from client
onMessageCallback func([]byte)
}
func newHub(onMessageCallback func([]byte)) *Hub {
return &Hub{
broadcast: make(chan []byte),
register: make(chan *Client),
unregister: make(chan *Client),
clients: make(map[*Client]bool),
onMessageCallback: onMessageCallback,
}
}
func (h *Hub) run() {
for {
select {
case client := <-h.register:
h.clients[client] = true
case client := <-h.unregister:
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
close(client.send)
}
case message := <-h.broadcast:
// matched messages counter is incremented in this thread instead of in multiple http reader
// threads in order to reduce contention.
statsTracker.incMatchedMessages()
for client := range h.clients {
select {
case client.send <- message:
default:
close(client.send)
delete(h.clients, client)
}
}
}
}
}
// serveWs handles websocket requests from the peer.
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
client.hub.register <- client
// Allow collection of memory referenced by the caller by doing all work in
// new goroutines.
go client.writePump()
go client.readPump()
}
func startOutputServer(port string, messageCallback func([]byte)) {
hub = newHub(messageCallback)
go hub.run()
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
serveWs(hub, w, r)
})
err := http.ListenAndServe("0.0.0.0:" + port, nil)
if err != nil {
log.Fatal("Output server error: ", err)
}
}
func broadcastReqResPair(reqResJson []byte) {
hub.broadcast <- reqResJson
}
func broadcastOutboundLink(srcIP string, dstIP string, dstPort int) {
cacheKey := fmt.Sprintf("%s -> %s:%d", srcIP, dstIP, dstPort)
_, isInCache := outboundSocketNotifyExpiringCache.Get(cacheKey)
if isInCache {
return
} else {
outboundSocketNotifyExpiringCache.SetDefault(cacheKey, true)
}
socketMessage := OutBoundLinkMessage{
SourceIP: srcIP,
IP: dstIP,
Port: dstPort,
Type: "outboundSocketDetected",
}
jsonStr, err := json.Marshal(socketMessage)
if err != nil {
log.Printf("error marshalling outbound socket detection object: %v", err)
} else {
hub.broadcast <- jsonStr
}
}

View File

@@ -65,9 +65,9 @@ func SetHostname(address, newHostname string) string {
func GetResolvedBaseEntry(entry models.MizuEntry) models.BaseEntryDetails {
entryUrl := entry.Url
service := entry.Service
if entry.ResolvedDestination != nil {
entryUrl = SetHostname(entryUrl, *entry.ResolvedDestination)
service = SetHostname(service, *entry.ResolvedDestination)
if entry.ResolvedDestination != "" {
entryUrl = SetHostname(entryUrl, entry.ResolvedDestination)
service = SetHostname(service, entry.ResolvedDestination)
}
return models.BaseEntryDetails{
Id: entry.EntryId,
@@ -84,4 +84,4 @@ func GetResolvedBaseEntry(entry models.MizuEntry) models.BaseEntryDetails {
func GetBytesFromStruct(v interface{}) []byte{
a, _ := json.Marshal(v)
return a
}
}

View File

@@ -1,6 +1,8 @@
FOLDER=$(GOOS).$(GOARCH)
SUFFIX=$(GOOS)_$(GOARCH)
COMMIT_HASH=$(shell git rev-parse HEAD)
GIT_BRANCH=$(shell git branch --show-current)
GIT_BRANCH=$(shell git branch --show-current | tr '[:upper:]' '[:lower:]')
GIT_VERSION=$(shell git branch --show-current | tr '[:upper:]' '[:lower:]')
BUILD_TIMESTAMP=$(shell date +%s)
.PHONY: help
.DEFAULT_GOAL := help
@@ -12,16 +14,22 @@ install:
go install mizu.go
build: ## build mizu CLI binary (select platform via GOOS / GOARCH env variables)
go build -ldflags="-X 'github.com/up9inc/mizu/cli/mizu.GitCommitHash=$(COMMIT_HASH)' -X 'github.com/up9inc/mizu/cli/mizu.Branch=$(GIT_BRANCH)'" -o bin/$(FOLDER)/mizu mizu.go
go build -ldflags="-X 'github.com/up9inc/mizu/cli/mizu.GitCommitHash=$(COMMIT_HASH)' \
-X 'github.com/up9inc/mizu/cli/mizu.Branch=$(GIT_BRANCH)' \
-X 'github.com/up9inc/mizu/cli/mizu.BuildTimestamp=$(BUILD_TIMESTAMP)' \
-X 'github.com/up9inc/mizu/cli/mizu.SemVer=$(SEM_VER)'" \
-o bin/mizu_$(SUFFIX) mizu.go
(cd bin && shasum -a 256 mizu_${SUFFIX} > mizu_${SUFFIX}.sha256)
build-all: ## build for all supported platforms
@echo "Compiling for every OS and Platform"
@mkdir -p bin && echo "SHA256 checksums available for compiled binaries \n\nRun \`shasum -a 256 -c mizu_OS_ARCH.sha256\` to verify\n\n" > bin/README.md
@$(MAKE) build GOOS=darwin GOARCH=amd64
@$(MAKE) build GOOS=linux GOARCH=amd64
@# $(MAKE) GOOS=windows GOARCH=amd64
@# $(MAKE) GOOS=linux GOARCH=386
@# $(MAKE) GOOS=windows GOARCH=386
@# $(MAKE) GOOS=darwin GOARCH=arm64
@$(MAKE) GOOS=darwin GOARCH=arm64
@# $(MAKE) GOOS=linux GOARCH=arm64
@# $(MAKE) GOOS=windows GOARCH=arm64
@echo "---------"

View File

@@ -5,8 +5,10 @@ import (
)
type MizuFetchOptions struct {
Limit uint16
Directory string
FromTimestamp int64
ToTimestamp int64
Directory string
MizuPort uint
}
var mizuFetchOptions = MizuFetchOptions{}
@@ -23,6 +25,8 @@ var fetchCmd = &cobra.Command{
func init() {
rootCmd.AddCommand(fetchCmd)
fetchCmd.Flags().Uint16VarP(&mizuFetchOptions.Limit, "limit", "l", 1000, "Provide a custom limit for entries to fetch")
fetchCmd.Flags().StringVarP(&mizuFetchOptions.Directory, "directory", "d", ".", "Provide a custom directory for fetched entries")
fetchCmd.Flags().Int64Var(&mizuFetchOptions.FromTimestamp, "from", 0, "Custom start timestamp for fetched entries")
fetchCmd.Flags().Int64Var(&mizuFetchOptions.ToTimestamp, "to", 0, "Custom end timestamp fetched entries")
fetchCmd.Flags().UintVarP(&mizuFetchOptions.MizuPort, "port", "p", 8899, "Custom port for mizu")
}

View File

@@ -14,7 +14,7 @@ import (
)
func RunMizuFetch(fetch *MizuFetchOptions) {
resp, err := http.Get(fmt.Sprintf("http://localhost:8899/api/har?limit=%v", fetch.Limit))
resp, err := http.Get(fmt.Sprintf("http://localhost:%v/api/har?from=%v&to=%v", fetch.MizuPort, fetch.FromTimestamp, fetch.ToTimestamp))
if err != nil {
log.Fatal(err)
}
@@ -53,7 +53,7 @@ func Unzip(reader *zip.Reader, dest string) error {
path := filepath.Join(dest, f.Name)
// Check for ZipSlip (Directory traversal)
if !strings.HasPrefix(path, filepath.Clean(dest) + string(os.PathSeparator)) {
if !strings.HasPrefix(path, filepath.Clean(dest)+string(os.PathSeparator)) {
return fmt.Errorf("illegal file path: %s", path)
}
@@ -61,7 +61,7 @@ func Unzip(reader *zip.Reader, dest string) error {
_ = os.MkdirAll(path, f.Mode())
} else {
_ = os.MkdirAll(filepath.Dir(path), f.Mode())
fmt.Print("writing HAR file [ ", path, " ] .. ")
fmt.Print("writing HAR file [ ", path, " ] .. ")
f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode())
if err != nil {
return err
@@ -70,7 +70,7 @@ func Unzip(reader *zip.Reader, dest string) error {
if err := f.Close(); err != nil {
panic(err)
}
fmt.Println(" done")
fmt.Println(" done")
}()
_, err = io.Copy(f, rc)
@@ -90,5 +90,3 @@ func Unzip(reader *zip.Reader, dest string) error {
return nil
}

View File

@@ -10,11 +10,13 @@ import (
)
type MizuTapOptions struct {
GuiPort uint16
Namespace string
KubeConfigPath string
MizuImage string
MizuPodPort uint16
GuiPort uint16
Namespace string
AllNamespaces bool
KubeConfigPath string
MizuImage string
MizuPodPort uint16
PlainTextFilterRegexes []string
}
@@ -47,7 +49,9 @@ func init() {
tapCmd.Flags().Uint16VarP(&mizuTapOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver")
tapCmd.Flags().StringVarP(&mizuTapOptions.Namespace, "namespace", "n", "", "Namespace selector")
tapCmd.Flags().BoolVarP(&mizuTapOptions.AllNamespaces, "all-namespaces", "A", false, "Tap all namespaces")
tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file")
tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector")
tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod")
tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies")
}

View File

@@ -3,6 +3,7 @@ package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/shared"
"os"
"os/signal"
"regexp"
@@ -26,24 +27,30 @@ const (
var currentlyTappedPods []core.Pod
func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath, tappingOptions.Namespace)
mizuApiFilteringOptions, err := getMizuApiFilteringOptions(tappingOptions)
if err != nil {
return
}
kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath)
defer cleanUpMizuResources(kubernetesProvider)
ctx, cancel := context.WithCancel(context.Background())
defer cancel() // cancel will be called when this function exits
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery); err != nil {
targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery, targetNamespace); err != nil {
return
} else {
currentlyTappedPods = matchingPods
}
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, currentlyTappedPods)
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
if err != nil {
return
}
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil {
if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions, mizuApiFilteringOptions); err != nil {
return
}
@@ -57,8 +64,8 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) {
// TODO handle incoming traffic from tapper using a channel
}
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions); err != nil {
func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil {
return err
}
@@ -69,11 +76,11 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro
return nil
}
func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions) error {
func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error {
var err error
mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider)
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists)
_, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions)
if err != nil {
fmt.Printf("Error creating mizu collector pod: %v\n", err)
return err
@@ -88,6 +95,24 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr
return nil
}
func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.TrafficFilteringOptions, error) {
if tappingOptions.PlainTextFilterRegexes == nil || len(tappingOptions.PlainTextFilterRegexes) == 0 {
return nil, nil
}
compiledRegexSlice := make([]*shared.SerializableRegexp, 0)
for _, regexStr := range tappingOptions.PlainTextFilterRegexes {
compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr)
if err != nil {
fmt.Printf("Regex %s is invalid: %v", regexStr, err)
return nil, err
}
compiledRegexSlice = append(compiledRegexSlice, compiledRegex)
}
return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil
}
func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error {
if err := kubernetesProvider.ApplyMizuTapperDaemonSet(
ctx,
@@ -109,30 +134,32 @@ func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi
func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) {
fmt.Printf("\nRemoving mizu resources\n")
removalCtx, _ := context.WithTimeout(context.Background(), 5 * time.Second)
removalCtx, _ := context.WithTimeout(context.Background(), 5*time.Second)
if err := kubernetesProvider.RemovePod(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil {
fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err);
fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err)
}
if err := kubernetesProvider.RemoveService(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil {
fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err);
fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err)
}
if err := kubernetesProvider.RemoveDaemonSet(removalCtx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil {
fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", mizu.TapperDaemonSetName, mizu.ResourcesNamespace, err, err, err);
fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", mizu.TapperDaemonSetName, mizu.ResourcesNamespace, err, err, err)
}
}
func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) {
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex)
targetNamespace := getNamespace(tappingOptions, kubernetesProvider)
added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), podRegex)
restartTappers := func() {
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex); err != nil {
if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex, targetNamespace); err != nil {
fmt.Printf("Error getting pods by regex: %s (%v,%+v)\n", err, err, err)
cancel()
} else {
currentlyTappedPods = matchingPods
}
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, currentlyTappedPods)
nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods)
if err != nil {
fmt.Printf("Error building node to ips map: %s (%v,%+v)\n", err, err, err)
cancel()
@@ -147,14 +174,14 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
for {
select {
case newTarget := <- added:
case newTarget := <-added:
fmt.Printf("+%s\n", newTarget.Name)
case removedTarget := <- removed:
case removedTarget := <-removed:
fmt.Printf("-%s\n", removedTarget.Name)
restartTappersDebouncer.SetOn()
case modifiedTarget := <- modified:
case modifiedTarget := <-modified:
// Act only if the modified pod has already obtained an IP address.
// After filtering for IPs, on a normal pod restart this includes the following events:
// - Pod deletion
@@ -165,11 +192,11 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro
restartTappersDebouncer.SetOn()
}
case <- errorChan:
case <-errorChan:
// TODO: Does this also perform cleanup?
cancel()
case <- ctx.Done():
case <-ctx.Done():
return
}
}
@@ -182,13 +209,13 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
var portForward *kubernetes.PortForward
for {
select {
case <- added:
case <-added:
continue
case <- removed:
case <-removed:
fmt.Printf("%s removed\n", mizu.AggregatorPodName)
cancel()
return
case modifiedPod := <- modified:
case modifiedPod := <-modified:
if modifiedPod.Status.Phase == "Running" && !isPodReady {
isPodReady = true
var err error
@@ -200,16 +227,16 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi
}
}
case <- time.After(25 * time.Second):
case <-time.After(25 * time.Second):
if !isPodReady {
fmt.Printf("error: %s pod was not ready in time", mizu.AggregatorPodName)
cancel()
}
case <- errorChan:
case <-errorChan:
cancel()
case <- ctx.Done():
case <-ctx.Done():
if portForward != nil {
portForward.Stop()
}
@@ -225,11 +252,7 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P
return false
}
if !mizuRBACExists {
var versionString = mizu.Version
if mizu.GitCommitHash != "" {
versionString += "-" + mizu.GitCommitHash
}
err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.ResourcesNamespace, versionString)
err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.ResourcesNamespace, mizu.RBACVersion)
if err != nil {
fmt.Printf("warning: could not create mizu rbac resources %v\n", err)
return false
@@ -238,12 +261,12 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P
return true
}
func getNodeHostToTappedPodIpsMap(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappedPods []core.Pod) (map[string][]string, error) {
func getNodeHostToTappedPodIpsMap(tappedPods []core.Pod) (map[string][]string, error) {
nodeToTappedPodIPMap := make(map[string][]string, 0)
for _, pod := range tappedPods {
existingList := nodeToTappedPodIPMap[pod.Spec.NodeName]
if existingList == nil {
nodeToTappedPodIPMap[pod.Spec.NodeName] = []string {pod.Status.PodIP}
nodeToTappedPodIPMap[pod.Spec.NodeName] = []string{pod.Status.PodIP}
} else {
nodeToTappedPodIPMap[pod.Spec.NodeName] = append(nodeToTappedPodIPMap[pod.Spec.NodeName], pod.Status.PodIP)
}
@@ -257,9 +280,9 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) {
// block until ctx cancel is called or termination signal is received
select {
case <- ctx.Done():
case <-ctx.Done():
break
case <- sigChan:
case <-sigChan:
cancel()
}
}
@@ -273,7 +296,7 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption
for {
select {
case <- ctx.Done():
case <-ctx.Done():
return
default:
err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods)
@@ -285,3 +308,13 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption
}
}
func getNamespace(tappingOptions *MizuTapOptions, kubernetesProvider *kubernetes.Provider) string {
if tappingOptions.AllNamespaces {
return mizu.K8sAllNamespaces
} else if len(tappingOptions.Namespace) > 0 {
return tappingOptions.Namespace
} else {
return kubernetesProvider.CurrentNamespace()
}
}

View File

@@ -3,19 +3,38 @@ package cmd
import (
"fmt"
"github.com/up9inc/mizu/cli/mizu"
"strconv"
"time"
"github.com/spf13/cobra"
)
type MizuVersionOptions struct {
DebugInfo bool
}
var mizuVersionOptions = &MizuVersionOptions{}
var versionCmd = &cobra.Command{
Use: "version",
Short: "Print version info",
RunE: func(cmd *cobra.Command, args []string) error {
fmt.Printf("%s (%s) %s\n", mizu.Version, mizu.Branch, mizu.GitCommitHash)
if mizuVersionOptions.DebugInfo {
timeStampInt, _ := strconv.ParseInt(mizu.BuildTimestamp, 10, 0)
fmt.Printf("Version: %s \nBranch: %s (%s) \n", mizu.SemVer, mizu.Branch, mizu.GitCommitHash)
fmt.Printf("Build Time: %s (%s)\n", mizu.BuildTimestamp, time.Unix(timeStampInt, 0))
} else {
fmt.Printf("Version: %s (%s)\n", mizu.SemVer, mizu.Branch)
}
return nil
},
}
func init() {
rootCmd.AddCommand(versionCmd)
versionCmd.Flags().BoolVarP(&mizuVersionOptions.DebugInfo, "debug", "d", false, "Provide all information about version")
}

View File

@@ -1,7 +1,6 @@
package cmd
import (
"fmt"
"github.com/spf13/cobra"
)
@@ -9,7 +8,7 @@ var viewCmd = &cobra.Command{
Use: "view",
Short: "Open GUI in browser",
RunE: func(cmd *cobra.Command, args []string) error {
fmt.Println("Not implemented")
runMizuView()
return nil
},
}

33
cli/cmd/viewRunner.go Normal file
View File

@@ -0,0 +1,33 @@
package cmd
import (
"context"
"fmt"
"github.com/up9inc/mizu/cli/kubernetes"
"github.com/up9inc/mizu/cli/mizu"
"net/http"
)
func runMizuView() {
kubernetesProvider := kubernetes.NewProvider("")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
exists, err := kubernetesProvider.DoesServicesExist(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName)
if err != nil {
panic(err)
}
if !exists {
fmt.Printf("The %s service not found\n", mizu.AggregatorPodName)
return
}
_, err = http.Get("http://localhost:8899/")
if err == nil {
fmt.Printf("Found a running service %s and open port 8899\n", mizu.AggregatorPodName)
return
}
fmt.Printf("Found service %s, creating port forwarding to 8899\n", mizu.AggregatorPodName)
portForwardApiPod(ctx, kubernetesProvider, cancel, &MizuTapOptions{GuiPort: 8899, MizuPodPort: 8899})
}

View File

@@ -6,19 +6,19 @@ import (
"encoding/json"
"errors"
"fmt"
"path/filepath"
"regexp"
applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1"
applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1"
applyconfcore "k8s.io/client-go/applyconfigurations/core/v1"
"github.com/up9inc/mizu/shared"
core "k8s.io/api/core/v1"
rbac "k8s.io/api/rbac/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/watch"
applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1"
applyconfcore "k8s.io/client-go/applyconfigurations/core/v1"
applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/azure"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
@@ -42,7 +42,7 @@ const (
fieldManagerName = "mizu-manager"
)
func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider {
func NewProvider(kubeConfigPath string) *Provider {
kubernetesConfig := loadKubernetesConfiguration(kubeConfigPath)
restClientConfig, err := kubernetesConfig.ClientConfig()
if err != nil {
@@ -50,25 +50,18 @@ func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider {
}
clientSet := getClientSet(restClientConfig)
var namespace string
if len(overrideNamespace) > 0 {
namespace = overrideNamespace
} else {
configuredNamespace, _, err := kubernetesConfig.Namespace()
if err != nil {
panic(err)
}
namespace = configuredNamespace
}
return &Provider{
clientSet: clientSet,
kubernetesConfig: kubernetesConfig,
clientConfig: *restClientConfig,
Namespace: namespace,
}
}
func (provider *Provider) CurrentNamespace() string {
ns, _, _ := provider.kubernetesConfig.Namespace()
return ns
}
func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) watch.Interface {
watcher, err := provider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true})
if err != nil {
@@ -77,20 +70,16 @@ func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) w
return watcher
}
func (provider *Provider) GetPods(ctx context.Context, namespace string) {
pods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.TrafficFilteringOptions) (*core.Pod, error) {
marshaledFilteringOptions, err := json.Marshal(mizuApiFilteringOptions)
if err != nil {
panic(err.Error())
return nil, err
}
fmt.Printf("There are %d pods in Namespace %s\n", len(pods.Items), namespace)
}
func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool) (*core.Pod, error) {
pod := &core.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
Namespace: namespace,
Labels: map[string]string{"app": podName},
Labels: map[string]string{"app": podName},
},
Spec: core.PodSpec{
Containers: []core.Container{
@@ -98,16 +87,20 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
Name: podName,
Image: podImage,
ImagePullPolicy: core.PullAlways,
Command: []string {"./mizuagent", "--aggregator"},
Command: []string{"./mizuagent", "--aggregator"},
Env: []core.EnvVar{
{
Name: "HOST_MODE",
Name: shared.HostModeEnvVar,
Value: "1",
},
{
Name: shared.MizuFilteringOptionsEnvVar,
Value: string(marshaledFilteringOptions),
},
},
},
},
DNSPolicy: "ClusterFirstWithHostNet",
DNSPolicy: core.DNSClusterFirstWithHostNet,
TerminationGracePeriodSeconds: new(int64),
// Affinity: TODO: define node selector for all relevant nodes for this mizu instance
},
@@ -122,19 +115,19 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
func (provider *Provider) CreateService(ctx context.Context, namespace string, serviceName string, appLabelValue string) (*core.Service, error) {
service := core.Service{
ObjectMeta: metav1.ObjectMeta{
Name: serviceName,
Name: serviceName,
Namespace: namespace,
},
Spec: core.ServiceSpec{
Ports: []core.ServicePort {{TargetPort: intstr.FromInt(8899), Port: 80}},
Type: core.ServiceTypeClusterIP,
Ports: []core.ServicePort{{TargetPort: intstr.FromInt(8899), Port: 80}},
Type: core.ServiceTypeClusterIP,
Selector: map[string]string{"app": appLabelValue},
},
}
return provider.clientSet.CoreV1().Services(namespace).Create(ctx, &service, metav1.CreateOptions{})
}
func (provider *Provider) DoesMizuRBACExist(ctx context.Context, namespace string) (bool, error){
func (provider *Provider) DoesMizuRBACExist(ctx context.Context, namespace string) (bool, error) {
serviceAccount, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Get(ctx, serviceAccountName, metav1.GetOptions{})
var statusError *k8serrors.StatusError
@@ -150,7 +143,22 @@ func (provider *Provider) DoesMizuRBACExist(ctx context.Context, namespace strin
return serviceAccount != nil, nil
}
func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string ,version string) error {
func (provider *Provider) DoesServicesExist(ctx context.Context, namespace string, serviceName string) (bool, error) {
service, err := provider.clientSet.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{})
var statusError *k8serrors.StatusError
if errors.As(err, &statusError) {
if statusError.ErrStatus.Reason == metav1.StatusReasonNotFound {
return false, nil
}
}
if err != nil {
return false, err
}
return service != nil, nil
}
func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string, version string) error {
clusterRoleName := "mizu-cluster-role"
serviceAccount := &core.ServiceAccount{
@@ -162,25 +170,25 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string ,
}
clusterRole := &rbac.ClusterRole{
ObjectMeta: metav1.ObjectMeta{
Name: clusterRoleName,
Name: clusterRoleName,
Labels: map[string]string{"mizu-cli-version": version},
},
Rules: []rbac.PolicyRule{
{
APIGroups: []string {"", "extensions", "apps"},
Resources: []string {"pods", "services", "endpoints"},
Verbs: []string {"list", "get", "watch"},
APIGroups: []string{"", "extensions", "apps"},
Resources: []string{"pods", "services", "endpoints"},
Verbs: []string{"list", "get", "watch"},
},
},
}
clusterRoleBinding := &rbac.ClusterRoleBinding{
ObjectMeta: metav1.ObjectMeta{
Name: "mizu-cluster-role-binding",
Name: "mizu-cluster-role-binding",
Labels: map[string]string{"mizu-cli-version": version},
},
RoleRef: rbac.RoleRef{
Name: clusterRoleName,
Kind: "ClusterRole",
Name: clusterRoleName,
Kind: "ClusterRole",
APIGroup: "rbac.authorization.k8s.io",
},
Subjects: []rbac.Subject{
@@ -232,24 +240,51 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged))
agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp))
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName("HOST_MODE").WithValue("1"),
applyconfcore.EnvVar().WithName("AGGREGATOR_ADDRESS").WithValue(aggregatorPodIp),
applyconfcore.EnvVar().WithName("TAPPED_ADDRESSES_PER_HOST").WithValue(string(nodeToTappedPodIPMapJsonStr)),
applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"),
applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)),
)
agentContainer.WithEnv(
applyconfcore.EnvVar().WithName("NODE_NAME").WithValueFrom(
applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom(
applyconfcore.EnvVarSource().WithFieldRef(
applyconfcore.ObjectFieldSelector().WithAPIVersion("v1").WithFieldPath("spec.nodeName"),
),
),
)
podSpec := applyconfcore.PodSpec().WithHostNetwork(true).WithDNSPolicy("ClusterFirstWithHostNet").WithTerminationGracePeriodSeconds(0)
nodeNames := make([]string, 0, len(nodeToTappedPodIPMap))
for nodeName := range nodeToTappedPodIPMap {
nodeNames = append(nodeNames, nodeName)
}
nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement()
nodeSelectorRequirement.WithKey("kubernetes.io/hostname")
nodeSelectorRequirement.WithOperator(core.NodeSelectorOpIn)
nodeSelectorRequirement.WithValues(nodeNames...)
nodeSelectorTerm := applyconfcore.NodeSelectorTerm()
nodeSelectorTerm.WithMatchExpressions(nodeSelectorRequirement)
nodeSelector := applyconfcore.NodeSelector()
nodeSelector.WithNodeSelectorTerms(nodeSelectorTerm)
nodeAffinity := applyconfcore.NodeAffinity()
nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector)
affinity := applyconfcore.Affinity()
affinity.WithNodeAffinity(nodeAffinity)
noExecuteToleration := applyconfcore.Toleration()
noExecuteToleration.WithOperator(core.TolerationOpExists)
noExecuteToleration.WithEffect(core.TaintEffectNoExecute)
noScheduleToleration := applyconfcore.Toleration()
noScheduleToleration.WithOperator(core.TolerationOpExists)
noScheduleToleration.WithEffect(core.TaintEffectNoSchedule)
podSpec := applyconfcore.PodSpec()
podSpec.WithHostNetwork(true)
podSpec.WithDNSPolicy(core.DNSClusterFirstWithHostNet)
podSpec.WithTerminationGracePeriodSeconds(0)
if linkServiceAccount {
podSpec.WithServiceAccountName(serviceAccountName)
}
podSpec.WithContainers(agentContainer)
podSpec.WithAffinity(affinity)
podSpec.WithTolerations(noExecuteToleration, noScheduleToleration)
podTemplate := applyconfcore.PodTemplateSpec()
podTemplate.WithLabels(map[string]string{"app": tapperPodName})
@@ -265,8 +300,8 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac
return err
}
func (provider *Provider) GetAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp) ([]core.Pod, error) {
pods, err := provider.clientSet.CoreV1().Pods("").List(ctx, metav1.ListOptions{})
func (provider *Provider) GetAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespace string) ([]core.Pod, error) {
pods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}

View File

@@ -1,9 +1,11 @@
package mizu
var (
Version = "v0.0.1"
Branch = "develop"
GitCommitHash = "" // this var is overridden using ldflags in makefile when building
SemVer = "0.0.1"
Branch = "develop"
GitCommitHash = "" // this var is overridden using ldflags in makefile when building
BuildTimestamp = "" // this var is overridden using ldflags in makefile when building
RBACVersion = "v1"
)
const (
@@ -11,4 +13,5 @@ const (
TapperDaemonSetName = "mizu-tapper-daemon-set"
AggregatorPodName = "mizu-collector"
TapperPodName = "mizu-tapper"
K8sAllNamespaces = ""
)

8
shared/consts.go Normal file
View File

@@ -0,0 +1,8 @@
package shared
const (
MizuFilteringOptionsEnvVar = "SENSITIVE_DATA_FILTERING_OPTIONS"
HostModeEnvVar = "HOST_MODE"
NodeNameEnvVar = "NODE_NAME"
TappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST"
)

View File

@@ -33,3 +33,7 @@ func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessag
TappingStatus: tappingStatus,
}
}
type TrafficFilteringOptions struct {
PlainTextMaskingRegexes []*SerializableRegexp
}

View File

@@ -0,0 +1,30 @@
package shared
import "regexp"
type SerializableRegexp struct {
regexp.Regexp
}
func CompileRegexToSerializableRegexp(expr string) (*SerializableRegexp, error) {
re, err := regexp.Compile(expr)
if err != nil {
return nil, err
}
return &SerializableRegexp{*re}, nil
}
// UnmarshalText is by json.Unmarshal.
func (r *SerializableRegexp) UnmarshalText(text []byte) error {
rr, err := CompileRegexToSerializableRegexp(string(text))
if err != nil {
return err
}
*r = *rr
return nil
}
// MarshalText is used by json.Marshal.
func (r *SerializableRegexp) MarshalText() ([]byte, error) {
return []byte(r.String()), nil
}

12
tap/go.mod Normal file
View File

@@ -0,0 +1,12 @@
module github.com/up9inc/mizu/tap
go 1.16
require (
github.com/google/gopacket v1.1.19
github.com/google/martian v2.1.0+incompatible
github.com/gorilla/websocket v1.4.2
github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231
github.com/patrickmn/go-cache v2.1.0+incompatible
golang.org/x/net v0.0.0-20210421230115-4e50805a0758
)

31
tap/go.sum Normal file
View File

@@ -0,0 +1,31 @@
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ=
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

View File

@@ -84,14 +84,14 @@ type GrpcAssembler struct {
framer *http2.Framer
}
func (ga *GrpcAssembler) readMessage() (uint32, interface{}, string, error) {
func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
// Exactly one Framer is used for each half connection.
// (Instead of creating a new Framer for each ReadFrame operation)
// This is needed in order to decompress the headers,
// because the compression context is updated with each requests/response.
frame, err := ga.framer.ReadFrame()
if err != nil {
return 0, nil, "", err
return 0, nil, err
}
streamID := frame.Header().StreamID
@@ -99,7 +99,7 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, string, error) {
ga.fragmentsByStream.appendFrame(streamID, frame)
if !(ga.isStreamEnd(frame)) {
return 0, nil, "", nil
return 0, nil, nil
}
headers, data := ga.fragmentsByStream.pop(streamID)
@@ -137,10 +137,10 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, string, error) {
ContentLength: int64(len(dataString)),
}
} else {
return 0, nil, "", errors.New("Failed to assemble stream: neither a request nor a message")
return 0, nil, errors.New("Failed to assemble stream: neither a request nor a message")
}
return streamID, messageHTTP1, dataString, nil
return streamID, messageHTTP1, nil
}
func (ga *GrpcAssembler) isStreamEnd(frame http2.Frame) bool {

View File

@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"net/http"
"os"
"path/filepath"
@@ -15,7 +16,8 @@ import (
)
const readPermission = 0644
const tempFilenamePrefix = "har_writer"
const harFilenameSuffix = ".har"
const tempFilenameSuffix = ".har.tmp"
type PairChanItem struct {
Request *http.Request
@@ -23,12 +25,13 @@ type PairChanItem struct {
Response *http.Response
ResponseTime time.Time
RequestSenderIp string
ConnectionInfo *ConnectionInfo
}
func openNewHarFile(filename string) *HarFile {
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, readPermission)
if err != nil {
panic(fmt.Sprintf("Failed to open output file: %s (%v,%+v)", err, err, err))
log.Panicf("Failed to open output file: %s (%v,%+v)", err, err, err)
}
harFile := HarFile{file: file, entryCount: 0}
@@ -45,13 +48,13 @@ type HarFile struct {
func NewEntry(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time) (*har.Entry, error) {
harRequest, err := har.NewRequest(request, true)
if err != nil {
SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)\n", err, err, err)
SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("Failed converting request to HAR")
}
harResponse, err := har.NewResponse(response, true)
if err != nil {
SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)\n", err, err, err)
SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)", err, err, err)
return nil, errors.New("Failed converting response to HAR")
}
@@ -62,7 +65,7 @@ func NewEntry(request *http.Request, requestTime time.Time, response *http.Respo
status, err := strconv.Atoi(response.Header.Get(":status"))
if err != nil {
SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)\n", err, err, err)
SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)", err, err, err)
return nil, errors.New("Failed converting response status to int for HAR")
}
harResponse.Status = status
@@ -102,7 +105,7 @@ func NewEntry(request *http.Request, requestTime time.Time, response *http.Respo
func (f *HarFile) WriteEntry(harEntry *har.Entry) {
harEntryJson, err := json.Marshal(harEntry)
if err != nil {
SilentError("har-entry-marshal", "Failed converting har entry object to JSON%s (%v,%+v)\n", err, err, err)
SilentError("har-entry-marshal", "Failed converting har entry object to JSON%s (%v,%+v)", err, err, err)
return
}
@@ -116,7 +119,7 @@ func (f *HarFile) WriteEntry(harEntry *har.Entry) {
harEntryString := append([]byte(separator), harEntryJson...)
if _, err := f.file.Write(harEntryString); err != nil {
panic(fmt.Sprintf("Failed to write to output file: %s (%v,%+v)", err, err, err))
log.Panicf("Failed to write to output file: %s (%v,%+v)", err, err, err)
}
f.entryCount++
@@ -131,21 +134,21 @@ func (f *HarFile) Close() {
err := f.file.Close()
if err != nil {
panic(fmt.Sprintf("Failed to close output file: %s (%v,%+v)", err, err, err))
log.Panicf("Failed to close output file: %s (%v,%+v)", err, err, err)
}
}
func (f*HarFile) writeHeader() {
header := []byte(`{"log": {"version": "1.2", "creator": {"name": "Mizu", "version": "0.0.1"}, "entries": [`)
if _, err := f.file.Write(header); err != nil {
panic(fmt.Sprintf("Failed to write header to output file: %s (%v,%+v)", err, err, err))
log.Panicf("Failed to write header to output file: %s (%v,%+v)", err, err, err)
}
}
func (f*HarFile) writeTrailer() {
trailer := []byte("]}}")
if _, err := f.file.Write(trailer); err != nil {
panic(fmt.Sprintf("Failed to write trailer to output file: %s (%v,%+v)", err, err, err))
log.Panicf("Failed to write trailer to output file: %s (%v,%+v)", err, err, err)
}
}
@@ -161,8 +164,8 @@ func NewHarWriter(outputDir string, maxEntries int) *HarWriter {
}
type OutputChannelItem struct {
HarEntry *har.Entry
RequestSenderIp string
HarEntry *har.Entry
ConnectionInfo *ConnectionInfo
}
type HarWriter struct {
@@ -174,20 +177,20 @@ type HarWriter struct {
done chan bool
}
func (hw *HarWriter) WritePair(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time, requestSenderIp string) {
func (hw *HarWriter) WritePair(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time, connectionInfo *ConnectionInfo) {
hw.PairChan <- &PairChanItem{
Request: request,
RequestTime: requestTime,
Response: response,
ResponseTime: responseTime,
RequestSenderIp: requestSenderIp,
Request: request,
RequestTime: requestTime,
Response: response,
ResponseTime: responseTime,
ConnectionInfo: connectionInfo,
}
}
func (hw *HarWriter) Start() {
if hw.OutputDirPath != "" {
if err := os.MkdirAll(hw.OutputDirPath, os.ModePerm); err != nil {
panic(fmt.Sprintf("Failed to create output directory: %s (%v,%+v)", err, err, err))
log.Panicf("Failed to create output directory: %s (%v,%+v)", err, err, err)
}
}
@@ -210,8 +213,8 @@ func (hw *HarWriter) Start() {
}
} else {
hw.OutChan <- &OutputChannelItem{
HarEntry: harEntry,
RequestSenderIp: pair.RequestSenderIp,
HarEntry: harEntry,
ConnectionInfo: pair.ConnectionInfo,
}
}
}
@@ -226,10 +229,11 @@ func (hw *HarWriter) Start() {
func (hw *HarWriter) Stop() {
close(hw.PairChan)
<-hw.done
close(hw.OutChan)
}
func (hw *HarWriter) openNewFile() {
filename := filepath.Join(os.TempDir(), fmt.Sprintf("%s_%d", tempFilenamePrefix, time.Now().UnixNano()))
filename := buildFilename(hw.OutputDirPath, time.Now(), tempFilenameSuffix)
hw.currentFile = openNewHarFile(filename)
}
@@ -238,15 +242,15 @@ func (hw *HarWriter) closeFile() {
tmpFilename := hw.currentFile.file.Name()
hw.currentFile = nil
filename := buildFilename(hw.OutputDirPath, time.Now())
filename := buildFilename(hw.OutputDirPath, time.Now(), harFilenameSuffix)
err := os.Rename(tmpFilename, filename)
if err != nil {
SilentError("Rename-file", "cannot rename file: %s (%v,%+v)\n", err, err, err)
SilentError("Rename-file", "cannot rename file: %s (%v,%+v)", err, err, err)
}
}
func buildFilename(dir string, t time.Time) string {
func buildFilename(dir string, t time.Time, suffix string) string {
// (epoch time in nanoseconds)__(YYYY_Month_DD__hh-mm-ss).har
filename := fmt.Sprintf("%d__%s.har", t.UnixNano(), t.Format("2006_Jan_02__15-04-05"))
filename := fmt.Sprintf("%d__%s%s", t.UnixNano(), t.Format("2006_Jan_02__15-04-05"), suffix)
return filepath.Join(dir, filename)
}

138
tap/http_matcher.go Normal file
View File

@@ -0,0 +1,138 @@
package tap
import (
"fmt"
"net/http"
"strings"
"time"
"github.com/orcaman/concurrent-map"
)
type requestResponsePair struct {
Request httpMessage `json:"request"`
Response httpMessage `json:"response"`
}
type ConnectionInfo struct {
ClientIP string
ClientPort string
ServerIP string
ServerPort string
}
type httpMessage struct {
isRequest bool
captureTime time.Time
orig interface{}
connectionInfo ConnectionInfo
}
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}
type requestResponseMatcher struct {
openMessagesMap cmap.ConcurrentMap
}
func createResponseRequestMatcher() requestResponseMatcher {
newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()}
return *newMatcher
}
func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time) *requestResponsePair {
split := splitIdent(ident)
key := genKey(split)
connectionInfo := &ConnectionInfo{
ClientIP: split[0],
ClientPort: split[2],
ServerIP: split[1],
ServerPort: split[3],
}
requestHTTPMessage := httpMessage{
isRequest: true,
captureTime: captureTime,
orig: request,
connectionInfo: *connectionInfo,
}
if response, found := matcher.openMessagesMap.Pop(key); found {
// Type assertion always succeeds because all of the map's values are of httpMessage type
responseHTTPMessage := response.(*httpMessage)
if responseHTTPMessage.isRequest {
SilentError("Request-Duplicate", "Got duplicate request with same identifier")
return nil
}
Debug("Matched open Response for %s", key)
return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage)
}
matcher.openMessagesMap.Set(key, &requestHTTPMessage)
Debug("Registered open Request for %s", key)
return nil
}
func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time) *requestResponsePair {
split := splitIdent(ident)
key := genKey(split)
responseHTTPMessage := httpMessage{
isRequest: false,
captureTime: captureTime,
orig: response,
}
if request, found := matcher.openMessagesMap.Pop(key); found {
// Type assertion always succeeds because all of the map's values are of httpMessage type
requestHTTPMessage := request.(*httpMessage)
if !requestHTTPMessage.isRequest {
SilentError("Response-Duplicate", "Got duplicate response with same identifier")
return nil
}
Debug("Matched open Request for %s", key)
return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage)
}
matcher.openMessagesMap.Set(key, &responseHTTPMessage)
Debug("Registered open Response for %s", key)
return nil
}
func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) *requestResponsePair {
return &requestResponsePair{
Request: *requestHTTPMessage,
Response: *responseHTTPMessage,
}
}
func splitIdent(ident string) []string {
ident = strings.Replace(ident, "->", " ", -1)
return strings.Split(ident, " ")
}
func genKey(split []string) string {
key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4])
return key
}
func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int {
keysToPop := make([]string, 0)
for item := range matcher.openMessagesMap.IterBuffered() {
// Map only contains values of type httpMessage
message, _ := item.Val.(*httpMessage)
if message.captureTime.Before(t) {
keysToPop = append(keysToPop, item.Key)
}
}
numDeleted := len(keysToPop)
for _, key := range keysToPop {
_, _ = matcher.openMessagesMap.Pop(key)
}
return numDeleted
}

View File

@@ -3,10 +3,7 @@ package tap
import (
"bufio"
"bytes"
"compress/gzip"
b64 "encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"io/ioutil"
@@ -73,7 +70,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
b := bufio.NewReader(h)
if isHTTP2, err := checkIsHTTP2Connection(b, h.isClient); err != nil {
SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)\n", h.ident, err, err, err)
SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)", h.ident, err, err, err)
// Do something?
} else {
h.isHTTP2 = isHTTP2
@@ -82,7 +79,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
if h.isHTTP2 {
err := prepareHTTP2Connection(b, h.isClient)
if err != nil {
SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)\n", h.ident, err, err, err)
SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)", h.ident, err, err, err)
}
h.grpcAssembler = createGrpcAssembler(b)
}
@@ -93,7 +90,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
SilentError("HTTP/2", "stream %s error: %s (%v,%+v)\n", h.ident, err, err, err)
SilentError("HTTP/2", "stream %s error: %s (%v,%+v)", h.ident, err, err, err)
continue
}
} else if h.isClient {
@@ -101,7 +98,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
SilentError("HTTP-request", "stream %s Request error: %s (%v,%+v)\n", h.ident, err, err, err)
SilentError("HTTP-request", "stream %s Request error: %s (%v,%+v)", h.ident, err, err, err)
continue
}
} else {
@@ -109,7 +106,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} else if err != nil {
SilentError("HTTP-response", "stream %s Response error: %s (%v,%+v)\n", h.ident, err, err, err)
SilentError("HTTP-response", "stream %s Response error: %s (%v,%+v)", h.ident, err, err, err)
continue
}
}
@@ -117,38 +114,34 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
}
func (h *httpReader) handleHTTP2Stream() error {
streamID, messageHTTP1, body, err := h.grpcAssembler.readMessage()
streamID, messageHTTP1, err := h.grpcAssembler.readMessage()
h.messageCount++
if err != nil {
return err
}
var reqResPair *envoyMessageWrapper
var reqResPair *requestResponsePair
switch messageHTTP1 := messageHTTP1.(type) {
case http.Request:
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID)
reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime, body, true)
reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime)
case http.Response:
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID)
reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime, body, true)
reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime)
}
if reqResPair != nil {
statsTracker.incMatchedMessages()
if h.harWriter != nil {
h.harWriter.WritePair(
reqResPair.HttpBufferedTrace.Request.orig.(*http.Request),
reqResPair.HttpBufferedTrace.Request.captureTime,
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
reqResPair.HttpBufferedTrace.Response.captureTime,
reqResPair.HttpBufferedTrace.Request.requestSenderIp,
reqResPair.Request.orig.(*http.Request),
reqResPair.Request.captureTime,
reqResPair.Response.orig.(*http.Response),
reqResPair.Response.captureTime,
&reqResPair.Request.connectionInfo,
)
} else {
jsonStr, err := json.Marshal(reqResPair)
if err != nil {
return err
}
broadcastReqResPair(jsonStr)
}
}
@@ -165,37 +158,29 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error {
req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
s := len(body)
if err != nil {
SilentError("HTTP-request-body", "stream %s Got body err: %s\n", h.ident, err)
SilentError("HTTP-request-body", "stream %s Got body err: %s", h.ident, err)
} else if h.hexdump {
Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body))
Info("Body(%d/0x%x) - %s", len(body), len(body), hex.Dump(body))
}
if err := req.Body.Close(); err != nil {
SilentError("HTTP-request-body-close", "stream %s Failed to close request body: %s\n", h.ident, err)
SilentError("HTTP-request-body-close", "stream %s Failed to close request body: %s", h.ident, err)
}
encoding := req.Header["Content-Encoding"]
bodyStr, err := readBody(body, encoding)
if err != nil {
SilentError("HTTP-request-body-decode", "stream %s Failed to decode body: %s\n", h.ident, err)
}
Info("HTTP/%s Request: %s %s (Body:%d)\n", h.ident, req.Method, req.URL, s)
Info("HTTP/1 Request: %s %s %s (Body:%d) -> %s", h.ident, req.Method, req.URL, s, encoding)
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, h.messageCount)
reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime, bodyStr, false)
reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime)
if reqResPair != nil {
statsTracker.incMatchedMessages()
if h.harWriter != nil {
h.harWriter.WritePair(
reqResPair.HttpBufferedTrace.Request.orig.(*http.Request),
reqResPair.HttpBufferedTrace.Request.captureTime,
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
reqResPair.HttpBufferedTrace.Response.captureTime,
reqResPair.HttpBufferedTrace.Request.requestSenderIp,
reqResPair.Request.orig.(*http.Request),
reqResPair.Request.captureTime,
reqResPair.Response.orig.(*http.Response),
reqResPair.Response.captureTime,
&reqResPair.Request.connectionInfo,
)
} else {
jsonStr, err := json.Marshal(reqResPair)
if err != nil {
SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err)
}
broadcastReqResPair(jsonStr)
}
}
@@ -224,13 +209,13 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind
s := len(body)
if err != nil {
SilentError("HTTP-response-body", "HTTP/%s: failed to get body(parsed len:%d): %s\n", h.ident, s, err)
SilentError("HTTP-response-body", "HTTP/%s: failed to get body(parsed len:%d): %s", h.ident, s, err)
}
if h.hexdump {
Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body))
Info("Body(%d/0x%x) - %s", len(body), len(body), hex.Dump(body))
}
if err := res.Body.Close(); err != nil {
SilentError("HTTP-response-body-close", "HTTP/%s: failed to close body(parsed len:%d): %s\n", h.ident, s, err)
SilentError("HTTP-response-body-close", "HTTP/%s: failed to close body(parsed len:%d): %s", h.ident, s, err)
}
sym := ","
if res.ContentLength > 0 && res.ContentLength != int64(s) {
@@ -241,54 +226,23 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
contentType = []string{http.DetectContentType(body)}
}
encoding := res.Header["Content-Encoding"]
Info("HTTP/%s Response: %s URL:%s (%d%s%d%s) -> %s\n", h.ident, res.Status, req, res.ContentLength, sym, s, contentType, encoding)
bodyStr, err := readBody(body, encoding)
if err != nil {
SilentError("HTTP-response-body-decode", "stream %s Failed to decode body: %s\n", h.ident, err)
}
Info("HTTP/1 Response: %s %s URL:%s (%d%s%d%s) -> %s", h.ident, res.Status, req, res.ContentLength, sym, s, contentType, encoding)
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, h.messageCount)
reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime, bodyStr, false)
reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime)
if reqResPair != nil {
statsTracker.incMatchedMessages()
if h.harWriter != nil {
h.harWriter.WritePair(
reqResPair.HttpBufferedTrace.Request.orig.(*http.Request),
reqResPair.HttpBufferedTrace.Request.captureTime,
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
reqResPair.HttpBufferedTrace.Response.captureTime,
reqResPair.HttpBufferedTrace.Request.requestSenderIp,
reqResPair.Request.orig.(*http.Request),
reqResPair.Request.captureTime,
reqResPair.Response.orig.(*http.Response),
reqResPair.Response.captureTime,
&reqResPair.Request.connectionInfo,
)
} else {
jsonStr, err := json.Marshal(reqResPair)
if err != nil {
SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err)
}
broadcastReqResPair(jsonStr)
}
}
return nil
}
func readBody(bodyBytes []byte, encoding []string) (string, error) {
var bodyBuffer io.Reader
bodyBuffer = bytes.NewBuffer(bodyBytes)
var err error
if len(encoding) > 0 && (encoding[0] == "gzip" || encoding[0] == "deflate") {
bodyBuffer, err = gzip.NewReader(bodyBuffer)
if err != nil {
SilentError("HTTP-gunzip", "Failed to gzip decode: %s\n", err)
return "", err
}
}
if _, ok := bodyBuffer.(*gzip.Reader); ok {
err = bodyBuffer.(*gzip.Reader).Close()
if err != nil {
return "", err
}
}
buf := new(bytes.Buffer)
_, err = buf.ReadFrom(bodyBuffer)
return b64.StdEncoding.EncodeToString(buf.Bytes()), err
}

29
tap/outboundlinks.go Normal file
View File

@@ -0,0 +1,29 @@
package tap
type OutboundLink struct {
Src string
DstIP string
DstPort int
}
func NewOutboundLinkWriter() *OutboundLinkWriter {
return &OutboundLinkWriter{
OutChan: make(chan *OutboundLink),
}
}
type OutboundLinkWriter struct {
OutChan chan *OutboundLink
}
func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int) {
olw.OutChan <- &OutboundLink{
Src: src,
DstIP: DstIP,
DstPort: DstPort,
}
}
func (olw *OutboundLinkWriter) Stop() {
close(olw.OutChan)
}

View File

@@ -10,7 +10,6 @@ package tap
import (
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"log"
@@ -32,13 +31,10 @@ import (
)
const AppPortsEnvVar = "APP_PORTS"
const OutPortEnvVar = "WEB_SOCKET_PORT"
const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT"
const hostModeEnvVar = "HOST_MODE"
// default is 1MB, more than the max size accepted by collector and traffic-dumper
const maxHTTP2DataLenDefault = 1 * 1024 * 1024
const cleanPeriod = time.Second * 10
const outboundThrottleCacheExpiryPeriod = time.Minute * 15
var remoteOnlyOutboundPorts = []int { 80, 443 }
func parseAppPorts(appPortsList string) []int {
@@ -46,7 +42,7 @@ func parseAppPorts(appPortsList string) []int {
for _, portStr := range strings.Split(appPortsList, ",") {
parsedInt, parseError := strconv.Atoi(portStr)
if parseError != nil {
fmt.Println("Provided app port ", portStr, " is not a valid number!")
log.Printf("Provided app port %v is not a valid number!", portStr)
} else {
ports = append(ports, parsedInt)
}
@@ -54,13 +50,6 @@ func parseAppPorts(appPortsList string) []int {
return ports
}
func parseHostAppAddresses(hostAppAddressesString string) []string {
if len(hostAppAddressesString) == 0 {
return []string{}
}
return strings.Split(hostAppAddressesString, ",")
}
var maxcount = flag.Int("c", -1, "Only grab this many packets, then exit")
var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)")
var statsevery = flag.Int("stats", 60, "Output statistics every N seconds")
@@ -90,7 +79,6 @@ var tstype = flag.String("timestamp_type", "", "Type of timestamps to use")
var promisc = flag.Bool("promisc", true, "Set promiscuous mode")
var anydirection = flag.Bool("anydirection", false, "Capture http requests to other hosts")
var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data")
var hostAppAddressesString = flag.String("targets", "", "Comma separated list of ip:ports to tap")
var memprofile = flag.String("memprofile", "", "Write memory profile")
@@ -121,24 +109,20 @@ var stats struct {
overlapPackets int
}
type CollectorMessage struct {
MessageType string
Ports *[]int `json:"ports,omitempty"`
Addresses *[]string `json:"addresses,omitempty"`
type TapOpts struct {
HostMode bool
}
var outputLevel int
var errorsMap map[string]uint
var errorsMapMutex sync.Mutex
var nErrors uint
var appPorts []int // global
var ownIps []string //global
var hostMode bool //global
var HostAppAddresses []string //global
var ownIps []string // global
var hostMode bool // global
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments fmt.Printf
* s, a: arguments log.Printf
* Note: Too bad for perf that a... is evaluated
*/
func logError(minOutputLevel int, t string, s string, a ...interface{}) {
@@ -149,7 +133,7 @@ func logError(minOutputLevel int, t string, s string, a ...interface{}) {
errorsMapMutex.Unlock()
if outputLevel >= minOutputLevel {
formatStr := fmt.Sprintf("%s: %s", t, s)
fmt.Printf(formatStr, a...)
log.Printf(formatStr, a...)
}
}
func Error(t string, s string, a ...interface{}) {
@@ -160,12 +144,12 @@ func SilentError(t string, s string, a ...interface{}) {
}
func Info(s string, a ...interface{}) {
if outputLevel >= 1 {
fmt.Printf(s, a...)
log.Printf(s, a...)
}
}
func Debug(s string, a ...interface{}) {
if outputLevel >= 2 {
fmt.Printf(s, a...)
log.Printf(s, a...)
}
}
@@ -187,9 +171,8 @@ func inArrayString(arr []string, valueToCheck string) bool {
return false
}
/*
* The assembler context
*/
// Context
// The assembler context
type Context struct {
CaptureInfo gopacket.CaptureInfo
}
@@ -198,22 +181,27 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
return c.CaptureInfo
}
func StartPassiveTapper() <-chan *OutputChannelItem {
func StartPassiveTapper(opts *TapOpts) (<-chan *OutputChannelItem, <-chan *OutboundLink) {
hostMode = opts.HostMode
var harWriter *HarWriter
if *dumpToHar {
harWriter = NewHarWriter(*HarOutputDir, *harEntriesPerFile)
}
outboundLinkWriter := NewOutboundLinkWriter()
go startPassiveTapper(harWriter)
go startPassiveTapper(harWriter, outboundLinkWriter)
if harWriter != nil {
return harWriter.OutChan
return harWriter.OutChan, outboundLinkWriter.OutChan
}
return nil
return nil, outboundLinkWriter.OutChan
}
func startPassiveTapper(harWriter *HarWriter) {
func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWriter) {
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
defer util.Run()()
if *debug {
outputLevel = 2
@@ -226,68 +214,43 @@ func startPassiveTapper(harWriter *HarWriter) {
if localhostIPs, err := getLocalhostIPs(); err != nil {
// TODO: think this over
fmt.Println("Failed to get self IP addresses")
Error("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)\n", err, err, err)
log.Println("Failed to get self IP addresses")
Error("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
ownIps = make([]string, 0)
} else {
ownIps = localhostIPs
}
appPortsStr := os.Getenv(AppPortsEnvVar)
var appPorts []int
if appPortsStr == "" {
fmt.Println("Received empty/no APP_PORTS env var! only listening to http on port 80!")
log.Println("Received empty/no APP_PORTS env var! only listening to http on port 80!")
appPorts = make([]int, 0)
} else {
appPorts = parseAppPorts(appPortsStr)
}
tapOutputPort := os.Getenv(OutPortEnvVar)
if tapOutputPort == "" {
fmt.Println("Received empty/no WEB_SOCKET_PORT env var! falling back to port 8080")
tapOutputPort = "8080"
}
SetFilterPorts(appPorts)
envVal := os.Getenv(maxHTTP2DataLenEnvVar)
if envVal == "" {
fmt.Println("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault)
log.Println("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault)
maxHTTP2DataLen = maxHTTP2DataLenDefault
} else {
if convertedInt, err := strconv.Atoi(envVal); err != nil {
fmt.Println("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault)
log.Println("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault)
maxHTTP2DataLen = maxHTTP2DataLenDefault
} else {
fmt.Println("Received HTTP2_DATA_SIZE_LIMIT env var:", maxHTTP2DataLenDefault)
log.Println("Received HTTP2_DATA_SIZE_LIMIT env var:", maxHTTP2DataLenDefault)
maxHTTP2DataLen = convertedInt
}
}
hostMode = os.Getenv(hostModeEnvVar) == "1"
fmt.Printf("App Ports: %v\n", appPorts)
fmt.Printf("Tap output websocket port: %s\n", tapOutputPort)
var onCollectorMessage = func(message []byte) {
var parsedMessage CollectorMessage
err := json.Unmarshal(message, &parsedMessage)
if err == nil {
if parsedMessage.MessageType == "setPorts" {
Debug("Got message from collector. Type: %s, Ports: %v\n", parsedMessage.MessageType, parsedMessage.Ports)
appPorts = *parsedMessage.Ports
} else if parsedMessage.MessageType == "setAddresses" {
Debug("Got message from collector. Type: %s, IPs: %v\n", parsedMessage.MessageType, parsedMessage.Addresses)
HostAppAddresses = *parsedMessage.Addresses
Info("Filtering for the following addresses: %s\n", HostAppAddresses)
}
} else {
Error("Collector-Message-Parsing", "Error parsing message from collector: %s (%v,%+v)\n", err, err, err)
}
}
go startOutputServer(tapOutputPort, onCollectorMessage)
log.Printf("App Ports: %v", gSettings.filterPorts)
var handle *pcap.Handle
var err error
if *fname != "" {
if handle, err = pcap.OpenOffline(*fname); err != nil {
log.Fatal("PCAP OpenOffline error:", err)
log.Fatalf("PCAP OpenOffline error: %v", err)
}
} else {
// This is a little complicated because we want to allow all possible options
@@ -313,15 +276,15 @@ func startPassiveTapper(harWriter *HarWriter) {
}
}
if handle, err = inactive.Activate(); err != nil {
log.Fatal("PCAP Activate error:", err)
log.Fatalf("PCAP Activate error: %v", err)
}
defer handle.Close()
}
if len(flag.Args()) > 0 {
bpffilter := strings.Join(flag.Args(), " ")
Info("Using BPF filter %q\n", bpffilter)
Info("Using BPF filter %q", bpffilter)
if err = handle.SetBPFFilter(bpffilter); err != nil {
log.Fatal("BPF filter error:", err)
log.Fatalf("BPF filter error: %v", err)
}
}
@@ -329,6 +292,7 @@ func startPassiveTapper(harWriter *HarWriter) {
harWriter.Start()
defer harWriter.Stop()
}
defer outboundLinkWriter.Stop()
var dec gopacket.Decoder
var ok bool
@@ -342,13 +306,18 @@ func startPassiveTapper(harWriter *HarWriter) {
source := gopacket.NewPacketSource(handle, dec)
source.Lazy = *lazy
source.NoCopy = true
Info("Starting to read packets\n")
Info("Starting to read packets")
count := 0
bytes := int64(0)
start := time.Now()
defragger := ip4defrag.NewIPv4Defragmenter()
streamFactory := &tcpStreamFactory{doHTTP: !*nohttp, harWriter: harWriter}
streamFactory := &tcpStreamFactory{
doHTTP: !*nohttp,
harWriter: harWriter,
outbountLinkWriter: outboundLinkWriter,
}
streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool)
var assemblerMutex sync.Mutex
@@ -378,7 +347,7 @@ func startPassiveTapper(harWriter *HarWriter) {
errorMapLen := len(errorsMap)
errorsSummery := fmt.Sprintf("%v", errorsMap)
errorsMapMutex.Unlock()
fmt.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)\nErrors Summary: %s\n",
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v) - Errors Summary: %s",
count,
bytes,
time.Since(start),
@@ -390,8 +359,8 @@ func startPassiveTapper(harWriter *HarWriter) {
// At this moment
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
fmt.Printf(
"mem: %d, goroutines: %d, unmatched messages: %d\n",
log.Printf(
"mem: %d, goroutines: %d, unmatched messages: %d",
memStats.HeapAlloc,
runtime.NumGoroutine(),
reqResMatcher.openMessagesMap.Count(),
@@ -400,8 +369,8 @@ func startPassiveTapper(harWriter *HarWriter) {
// Since the last print
cleanStats := cleaner.dumpStats()
appStats := statsTracker.dumpStats()
fmt.Printf(
"flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d\n",
log.Printf(
"flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d",
cleanStats.flushed,
cleanStats.closed,
cleanStats.deleted,
@@ -412,11 +381,11 @@ func startPassiveTapper(harWriter *HarWriter) {
for packet := range source.Packets() {
count++
Debug("PACKET #%d\n", count)
Debug("PACKET #%d", count)
data := packet.Data()
bytes += int64(len(data))
if *hexdumppkt {
Debug("Packet content (%d/0x%x)\n%s\n", len(data), len(data), hex.Dump(data))
Debug("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data))
}
// defrag the IPv4 packet if required
@@ -431,18 +400,18 @@ func startPassiveTapper(harWriter *HarWriter) {
if err != nil {
log.Fatalln("Error while de-fragmenting", err)
} else if newip4 == nil {
Debug("Fragment...\n")
Debug("Fragment...")
continue // packet fragment, we don't have whole packet yet.
}
if newip4.Length != l {
stats.ipdefrag++
Debug("Decoding re-assembled packet: %s\n", newip4.NextLayerType())
Debug("Decoding re-assembled packet: %s", newip4.NextLayerType())
pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
panic("Not a PacketBuilder")
log.Panic("Not a PacketBuilder")
}
nextDecoder := newip4.NextLayerType()
nextDecoder.Decode(newip4.Payload, pb)
_ = nextDecoder.Decode(newip4.Payload, pb)
}
}
@@ -459,7 +428,7 @@ func startPassiveTapper(harWriter *HarWriter) {
CaptureInfo: packet.Metadata().CaptureInfo,
}
stats.totalsz += len(tcp.Payload)
//fmt.Println(packet.NetworkLayer().NetworkFlow().Src(), ":", tcp.SrcPort, " -> ", packet.NetworkLayer().NetworkFlow().Dst(), ":", tcp.DstPort)
// log.Println(packet.NetworkLayer().NetworkFlow().Src(), ":", tcp.SrcPort, " -> ", packet.NetworkLayer().NetworkFlow().Dst(), ":", tcp.DstPort)
assemblerMutex.Lock()
assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c)
assemblerMutex.Unlock()
@@ -470,11 +439,11 @@ func startPassiveTapper(harWriter *HarWriter) {
errorsMapMutex.Lock()
errorMapLen := len(errorsMap)
errorsMapMutex.Unlock()
fmt.Fprintf(os.Stderr, "Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)\n", count, bytes, time.Since(start), nErrors, errorMapLen)
log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", count, bytes, time.Since(start), nErrors, errorMapLen)
}
select {
case <-signalChan:
fmt.Fprintf(os.Stderr, "\nCaught SIGINT: aborting\n")
log.Printf("Caught SIGINT: aborting")
done = true
default:
// NOP: continue
@@ -497,34 +466,34 @@ func startPassiveTapper(harWriter *HarWriter) {
if err != nil {
log.Fatal(err)
}
pprof.WriteHeapProfile(f)
f.Close()
_ = pprof.WriteHeapProfile(f)
_ = f.Close()
}
streamFactory.WaitGoRoutines()
assemblerMutex.Lock()
Debug("%s\n", assembler.Dump())
Debug("%s", assembler.Dump())
assemblerMutex.Unlock()
if !*nodefrag {
fmt.Printf("IPdefrag:\t\t%d\n", stats.ipdefrag)
log.Printf("IPdefrag:\t\t%d", stats.ipdefrag)
}
fmt.Printf("TCP stats:\n")
fmt.Printf(" missed bytes:\t\t%d\n", stats.missedBytes)
fmt.Printf(" total packets:\t\t%d\n", stats.pkt)
fmt.Printf(" rejected FSM:\t\t%d\n", stats.rejectFsm)
fmt.Printf(" rejected Options:\t%d\n", stats.rejectOpt)
fmt.Printf(" reassembled bytes:\t%d\n", stats.sz)
fmt.Printf(" total TCP bytes:\t%d\n", stats.totalsz)
fmt.Printf(" conn rejected FSM:\t%d\n", stats.rejectConnFsm)
fmt.Printf(" reassembled chunks:\t%d\n", stats.reassembled)
fmt.Printf(" out-of-order packets:\t%d\n", stats.outOfOrderPackets)
fmt.Printf(" out-of-order bytes:\t%d\n", stats.outOfOrderBytes)
fmt.Printf(" biggest-chunk packets:\t%d\n", stats.biggestChunkPackets)
fmt.Printf(" biggest-chunk bytes:\t%d\n", stats.biggestChunkBytes)
fmt.Printf(" overlap packets:\t%d\n", stats.overlapPackets)
fmt.Printf(" overlap bytes:\t\t%d\n", stats.overlapBytes)
fmt.Printf("Errors: %d\n", nErrors)
log.Printf("TCP stats:")
log.Printf(" missed bytes:\t\t%d", stats.missedBytes)
log.Printf(" total packets:\t\t%d", stats.pkt)
log.Printf(" rejected FSM:\t\t%d", stats.rejectFsm)
log.Printf(" rejected Options:\t%d", stats.rejectOpt)
log.Printf(" reassembled bytes:\t%d", stats.sz)
log.Printf(" total TCP bytes:\t%d", stats.totalsz)
log.Printf(" conn rejected FSM:\t%d", stats.rejectConnFsm)
log.Printf(" reassembled chunks:\t%d", stats.reassembled)
log.Printf(" out-of-order packets:\t%d", stats.outOfOrderPackets)
log.Printf(" out-of-order bytes:\t%d", stats.outOfOrderBytes)
log.Printf(" biggest-chunk packets:\t%d", stats.biggestChunkPackets)
log.Printf(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
log.Printf(" overlap packets:\t%d", stats.overlapPackets)
log.Printf(" overlap bytes:\t\t%d", stats.overlapBytes)
log.Printf("Errors: %d", nErrors)
for e := range errorsMap {
fmt.Printf(" %s:\t\t%d\n", e, errorsMap[e])
log.Printf(" %s:\t\t%d", e, errorsMap[e])
}
}

31
tap/settings.go Normal file
View File

@@ -0,0 +1,31 @@
package tap
type globalSettings struct {
filterPorts []int
filterAuthorities []string
}
var gSettings = &globalSettings{
filterPorts: []int{},
filterAuthorities: []string{},
}
func SetFilterPorts(ports []int) {
gSettings.filterPorts = ports
}
func GetFilterPorts() []int {
ports := make([]int, len(gSettings.filterPorts))
copy(ports, gSettings.filterPorts)
return ports
}
func SetFilterAuthorities(ipAddresses []string) {
gSettings.filterAuthorities = ipAddresses
}
func GetFilterIPs() []string {
addresses := make([]string, len(gSettings.filterAuthorities))
copy(addresses, gSettings.filterAuthorities)
return addresses
}

View File

@@ -34,7 +34,7 @@ type tcpStream struct {
func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// FSM
if !t.tcpstate.CheckState(tcp, dir) {
//SilentError("FSM", "%s: Packet rejected by FSM (state:%s)\n", t.ident, t.tcpstate.String())
SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
stats.rejectFsm++
if !t.fsmerr {
t.fsmerr = true
@@ -47,7 +47,7 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
// Options
err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start)
if err != nil {
//SilentError("OptionChecker", "%s: Packet rejected by OptionChecker: %s\n", t.ident, err)
SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
stats.rejectOpt++
if !*nooptcheck {
return false
@@ -58,10 +58,10 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
if *checksum {
c, err := tcp.ComputeChecksum()
if err != nil {
SilentError("ChecksumCompute", "%s: Got error computing checksum: %s\n", t.ident, err)
SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
accept = false
} else if c != 0x0 {
SilentError("Checksum", "%s: Invalid checksum: 0x%x\n", t.ident, c)
SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
accept = false
}
}
@@ -95,7 +95,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 {
// In the original example this was handled with panic().
// I don't know what this error means or how to handle it properly.
SilentError("Invalid-Overlap", "bytes:%d, pkts:%d\n", sgStats.OverlapBytes, sgStats.OverlapPackets)
SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
}
stats.overlapBytes += sgStats.OverlapBytes
stats.overlapPackets += sgStats.OverlapPackets
@@ -106,7 +106,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} else {
ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir)
}
Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)\n", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
if skip == -1 && *allowmissinginit {
// this is allowed
} else if skip != 0 {
@@ -125,18 +125,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
dnsSize := binary.BigEndian.Uint16(data[:2])
missing := int(dnsSize) - len(data[2:])
Debug("dnsSize: %d, missing: %d\n", dnsSize, missing)
Debug("dnsSize: %d, missing: %d", dnsSize, missing)
if missing > 0 {
Info("Missing some bytes: %d\n", missing)
Info("Missing some bytes: %d", missing)
sg.KeepFrom(0)
return
}
p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns)
err := p.DecodeLayers(data[2:], &decoded)
if err != nil {
SilentError("DNS-parser", "Failed to decode DNS: %v\n", err)
SilentError("DNS-parser", "Failed to decode DNS: %v", err)
} else {
Debug("DNS: %s\n", gopacket.LayerDump(dns))
Debug("DNS: %s", gopacket.LayerDump(dns))
}
if len(data) > 2+int(dnsSize) {
sg.KeepFrom(2 + int(dnsSize))
@@ -144,7 +144,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} else if t.isHTTP {
if length > 0 {
if *hexdump {
Debug("Feeding http with:\n%s", hex.Dump(data))
Debug("Feeding http with:%s", hex.Dump(data))
}
// This is where we pass the reassembled information onwards
// This channel is read by an httpReader object
@@ -158,7 +158,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Debug("%s: Connection closed\n", t.ident)
Debug("%s: Connection closed", t.ident)
if t.isHTTP {
close(t.client.msgQueue)
close(t.server.msgQueue)

View File

@@ -15,22 +15,23 @@ import (
* Generates a new tcp stream for each new tcp connection. Closes the stream when the connection closes.
*/
type tcpStreamFactory struct {
wg sync.WaitGroup
doHTTP bool
harWriter *HarWriter
wg sync.WaitGroup
doHTTP bool
harWriter *HarWriter
outbountLinkWriter *OutboundLinkWriter
}
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
Debug("* NEW: %s %s\n", net, transport)
Debug("* NEW: %s %s", net, transport)
fsmOptions := reassembly.TCPSimpleFSMOptions{
SupportMissingEstablishment: *allowmissinginit,
}
Debug("Current App Ports: %v\n", appPorts)
Debug("Current App Ports: %v", gSettings.filterPorts)
dstIp := net.Dst().String()
dstPort := int(tcp.DstPort)
if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) {
broadcastOutboundLink(net.Src().String(), dstIp, dstPort)
factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort)
}
isHTTP := factory.shouldTap(dstIp, dstPort)
stream := &tcpStream{
@@ -85,14 +86,14 @@ func (factory *tcpStreamFactory) WaitGoRoutines() {
func (factory *tcpStreamFactory) shouldTap(dstIP string, dstPort int) bool {
if hostMode {
if inArrayString(HostAppAddresses, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true {
if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true {
return true
} else if inArrayString(HostAppAddresses, dstIP) == true {
} else if inArrayString(gSettings.filterAuthorities, dstIP) == true {
return true
}
return false
} else {
isTappedPort := dstPort == 80 || (appPorts != nil && (inArrayInt(appPorts, dstPort)))
isTappedPort := dstPort == 80 || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort)))
if !isTappedPort {
return false
}

View File

@@ -1,5 +1,5 @@
import './style/StatusBar.sass';
import React from "react";
import React, {useState} from "react";
export interface TappingStatusPod {
name: string;
@@ -15,14 +15,31 @@ export interface Props {
}
const pluralize = (noun: string, amount: number) => {
return `${noun}${amount != 1 ? 's' : ''}`
return `${noun}${amount !== 1 ? 's' : ''}`
}
export const StatusBar: React.FC<Props> = ({tappingStatus}) => {
const [expandedBar, setExpandedBar] = useState(false);
const uniqueNamespaces = Array.from(new Set(tappingStatus.pods.map(pod => pod.namespace)));
const amountOfPods = tappingStatus.pods.length;
return <div className='StatusBar'>
<span>{`Tapping ${amountOfPods} ${pluralize('pod', amountOfPods)} in ${pluralize('namespace', uniqueNamespaces.length)} ${uniqueNamespaces.join(", ")}`}</span>
return <div className={'statusBar' + (expandedBar ? ' expandedStatusBar' : "")} onMouseOver={() => setExpandedBar(true)} onMouseLeave={() => setExpandedBar(false)}>
<div className="podsCount">{`Tapping ${amountOfPods} ${pluralize('pod', amountOfPods)} in ${pluralize('namespace', uniqueNamespaces.length)} ${uniqueNamespaces.join(", ")}`}</div>
{expandedBar && <div style={{marginTop: 20}}>
<table>
<tr>
<th>Pod name</th>
<th>Namespace</th>
</tr>
<tbody>
{tappingStatus.pods.map(pod => <tr>
<td>{pod.name}</td>
<td>{pod.namespace}</td>
</tr>)}
</tbody>
</table>
</div>}
</div>;
}

View File

@@ -1,20 +1,35 @@
@import 'variables.module.scss'
.StatusBar
.statusBar
position: absolute
transform: translate(-50%, -3px)
left: 50%
z-index: 9999
min-width: 200px
height: 32px
background: $blue-color
color: $light-blue-color
color: rgba(255,255,255,0.75)
border-bottom-left-radius: 8px
border-bottom-right-radius: 8px
top: 0
display: flex
align-items: center
padding: 2px 10px
user-select: none
font-size: 14px
opacity: 0.8
transition: max-height 2s ease-out
width: auto
max-height: 32px
overflow: hidden
.podsCount
display: flex
justify-content: center
padding: 8px
font-weight: 600
th
text-align: left
td
padding-right: 15px
padding-top: 5px
.expandedStatusBar
max-height: 100vh
padding-bottom: 15px