diff --git a/.github/workflows/publish-cli.yml b/.github/workflows/publish-cli.yml index 3790568f2..b9e814931 100644 --- a/.github/workflows/publish-cli.yml +++ b/.github/workflows/publish-cli.yml @@ -2,8 +2,8 @@ name: public-cli on: push: branches: - - 'develop' - - 'main' + - develop + - main jobs: docker: runs-on: ubuntu-latest @@ -15,5 +15,32 @@ jobs: with: service_account_key: ${{ secrets.GCR_JSON_KEY }} export_default_credentials: true + - uses: haya14busa/action-cond@v1 + id: condval + with: + cond: ${{ github.ref == 'refs/heads/main' }} + if_true: "minor" + if_false: "patch" + - name: Auto Increment Semver Action + uses: MCKanpolat/auto-semver-action@1.0.5 + id: versioning + with: + releaseType: ${{ steps.condval.outputs.value }} + github_token: ${{ secrets.GITHUB_TOKEN }} + - name: Get base image name + shell: bash + run: | + echo "##[set-output name=build_timestamp;]$(echo $(date +%s))" + echo "##[set-output name=branch;]$(echo ${GITHUB_REF#refs/heads/})" + id: version_parameters - name: Build and Push CLI - run: make push-cli + run: make push-cli SEM_VER='${{ steps.versioning.outputs.version }}' BUILD_TIMESTAMP='${{ steps.version_parameters.outputs.build_timestamp }}' + - name: publish + uses: ncipollo/release-action@v1 + with: + token: ${{ secrets.GITHUB_TOKEN }} + artifacts: "cli/bin/*" + commit: ${{ steps.version_parameters.outputs.branch }} + tag: ${{ steps.versioning.outputs.version }} + prerelease: ${{ github.ref != 'refs/heads/main' }} + bodyFile: 'cli/bin/README.md' diff --git a/Dockerfile b/Dockerfile index e85745689..157588ec9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -18,12 +18,14 @@ WORKDIR /app/api-build COPY api/go.mod api/go.sum ./ COPY shared/go.mod shared/go.mod ../shared/ +COPY tap/go.mod tap/go.mod ../tap/ RUN go mod download # cheap trick to make the build faster (As long as go.mod wasn't changes) RUN go list -f '{{.Path}}@{{.Version}}' -m all | sed 1d | grep -e 'go-cache' -e 'sqlite' | xargs go get # Copy and build api code COPY shared ../shared +COPY tap ../tap COPY api . RUN go build -ldflags="-s -w" -o mizuagent . diff --git a/README.md b/README.md index efbcc082c..0205f5726 100644 --- a/README.md +++ b/README.md @@ -3,16 +3,76 @@ standalone web app traffic viewer for Kubernetes ## Download -Download `mizu` for your platform as +Download `mizu` for your platform and operating system -* for MacOS - `curl -o mizu https://static.up9.com/mizu/mizu-darwin-amd64 && chmod 755 mizu` -* for Linux - `curl -o mizu https://static.up9.com/mizu/mizu-linux-amd64 && chmod 755 mizu` +### Latest stable release -## Run +* for MacOS - Intel +``` +curl -Lo mizu \ +https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_amd64 \ +&& chmod 755 mizu +``` + +* for MacOS - Apple Silicon +``` +curl -Lo mizu \ +https://github.com/up9inc/mizu/releases/latest/download/mizu_darwin_arm64 \ +&& chmod 755 mizu +``` + +* for Linux - Intel 64bit +``` +curl -Lo mizu \ +https://github.com/up9inc/mizu/releases/latest/download/mizu_linux_amd64 \ +&& chmod 755 mizu +``` +SHA256 checksums are available on the [Releases](https://github.com/up9inc/mizu/releases) page. + +### Development (unstable) build +Pick one from the [Releases](https://github.com/up9inc/mizu/releases) page. + +## How to run 1. Find pod you'd like to tap to in your Kubernetes cluster -2. Run `mizu --pod podname` +2. Run `mizu PODNAME` or `mizu REGEX` 3. Open browser on `http://localhost:8899` as instructed .. 4. Watch the WebAPI traffic flowing .. +5. Type ^C to stop + +## Examples + +Run `mizu help` for usage options + + +To tap specific pod - +``` + $ kubectl get pods + NAME READY STATUS RESTARTS AGE + front-end-649fc5fd6-kqbtn 2/2 Running 0 7m + .. + + $ mizu tap front-end-649fc5fd6-kqbtn + +front-end-649fc5fd6-kqbtn + Web interface is now available at http://localhost:8899 + ^C +``` + +To tap multiple pods using regex - +``` + $ kubectl get pods + NAME READY STATUS RESTARTS AGE + carts-66c77f5fbb-fq65r 2/2 Running 0 20m + catalogue-5f4cb7cf5-7zrmn 2/2 Running 0 20m + front-end-649fc5fd6-kqbtn 2/2 Running 0 20m + .. + + $ mizu tap "^ca.*" + +carts-66c77f5fbb-fq65r + +catalogue-5f4cb7cf5-7zrmn + Web interface is now available at http://localhost:8899 + ^C +``` + diff --git a/api/go.mod b/api/go.mod index 4f4c055c0..f75ea5920 100644 --- a/api/go.mod +++ b/api/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a + github.com/beevik/etree v1.1.0 github.com/djherbis/atime v1.0.0 github.com/fasthttp/websocket v1.4.3-beta.1 // indirect github.com/go-playground/locales v0.13.0 @@ -17,6 +18,7 @@ require ( github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/up9inc/mizu/shared v0.0.0 + github.com/up9inc/mizu/tap v0.0.0 go.mongodb.org/mongo-driver v1.5.1 golang.org/x/net v0.0.0-20210421230115-4e50805a0758 gorm.io/driver/sqlite v1.1.4 @@ -27,3 +29,4 @@ require ( ) replace github.com/up9inc/mizu/shared v0.0.0 => ../shared +replace github.com/up9inc/mizu/tap v0.0.0 => ../tap diff --git a/api/go.sum b/api/go.sum index bc655181d..efa2d4285 100644 --- a/api/go.sum +++ b/api/go.sum @@ -48,6 +48,8 @@ github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a h1:76llBl github.com/antoniodipinto/ikisocket v0.0.0-20210417133349-f1502512d69a/go.mod h1:QvDfsDQDmGxUsvEeWabVZ5pp2FMXpOkwQV0L6SE6cp0= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.34.28/go.mod h1:H7NKnBqNVzoTJpGfLrQkkD+ytBA93eiDYi/+8rV9s48= +github.com/beevik/etree v1.1.0 h1:T0xke/WvNtMoCqgzPhkX2r4rjY3GDZFi+FjpRZY2Jbs= +github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= diff --git a/api/main.go b/api/main.go index 220b53a69..b7bbaa14d 100644 --- a/api/main.go +++ b/api/main.go @@ -7,11 +7,12 @@ import ( "github.com/gofiber/fiber/v2" "github.com/gorilla/websocket" "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/tap" "mizuserver/pkg/api" "mizuserver/pkg/middleware" "mizuserver/pkg/models" "mizuserver/pkg/routes" - "mizuserver/pkg/tap" + "mizuserver/pkg/sensitiveDataFiltering" "mizuserver/pkg/utils" "os" "os/signal" @@ -22,19 +23,24 @@ var aggregator = flag.Bool("aggregator", false, "Run in aggregator mode with API var standalone = flag.Bool("standalone", false, "Run in standalone tapper and API mode") var aggregatorAddress = flag.String("aggregator-address", "", "Address of mizu collector for tapping") -const nodeNameEnvVar = "NODE_NAME" -const tappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST" func main() { flag.Parse() + hostMode := os.Getenv(shared.HostModeEnvVar) == "1" + tapOpts := &tap.TapOpts{HostMode: hostMode} if !*shouldTap && !*aggregator && !*standalone{ panic("One of the flags --tap, --api or --standalone must be provided") } if *standalone { - harOutputChannel := tap.StartPassiveTapper() - go api.StartReadingEntries(harOutputChannel, tap.HarOutputDir) + harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts) + filteredHarChannel := make(chan *tap.OutputChannelItem) + + go filterHarHeaders(harOutputChannel, filteredHarChannel, getTrafficFilteringOptions()) + go api.StartReadingEntries(filteredHarChannel, nil) + go api.StartReadingOutbound(outboundLinkOutputChannel) + hostApi(nil) } else if *shouldTap { if *aggregatorAddress == "" { @@ -43,19 +49,26 @@ func main() { tapTargets := getTapTargets() if tapTargets != nil { - tap.HostAppAddresses = tapTargets - fmt.Println("Filtering for the following addresses:", tap.HostAppAddresses) + tap.SetFilterAuthorities(tapTargets) + fmt.Println("Filtering for the following authorities:", tap.GetFilterIPs()) } - harOutputChannel := tap.StartPassiveTapper() + harOutputChannel, outboundLinkOutputChannel := tap.StartPassiveTapper(tapOpts) + socketConnection, err := shared.ConnectToSocketServer(*aggregatorAddress, shared.DEFAULT_SOCKET_RETRIES, shared.DEFAULT_SOCKET_RETRY_SLEEP_TIME, false) if err != nil { panic(fmt.Sprintf("Error connecting to socket server at %s %v", *aggregatorAddress, err)) } + go pipeChannelToSocket(socketConnection, harOutputChannel) + go api.StartReadingOutbound(outboundLinkOutputChannel) } else if *aggregator { socketHarOutChannel := make(chan *tap.OutputChannelItem, 1000) - go api.StartReadingEntries(socketHarOutChannel, nil) + filteredHarChannel := make(chan *tap.OutputChannelItem) + + go api.StartReadingEntries(filteredHarChannel, nil) + go filterHarHeaders(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions()) + hostApi(socketHarOutChannel) } @@ -89,15 +102,36 @@ func hostApi(socketHarOutputChannel chan<- *tap.OutputChannelItem) { func getTapTargets() []string { - nodeName := os.Getenv(nodeNameEnvVar) + nodeName := os.Getenv(shared.NodeNameEnvVar) var tappedAddressesPerNodeDict map[string][]string - err := json.Unmarshal([]byte(os.Getenv(tappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict) + err := json.Unmarshal([]byte(os.Getenv(shared.TappedAddressesPerNodeDictEnvVar)), &tappedAddressesPerNodeDict) if err != nil { - panic(fmt.Sprintf("env var value of %s is invalid! must be map[string][]string %v", tappedAddressesPerNodeDict, err)) + panic(fmt.Sprintf("env var %s's value of %s is invalid! must be map[string][]string %v", shared.TappedAddressesPerNodeDictEnvVar, tappedAddressesPerNodeDict, err)) } return tappedAddressesPerNodeDict[nodeName] } +func getTrafficFilteringOptions() *shared.TrafficFilteringOptions { + filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar) + if filteringOptionsJson == "" { + return nil + } + var filteringOptions shared.TrafficFilteringOptions + err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions) + if err != nil { + panic(fmt.Sprintf("env var %s's value of %s is invalid! json must match the shared.TrafficFilteringOptions struct %v", shared.MizuFilteringOptionsEnvVar, filteringOptionsJson, err)) + } + + return &filteringOptions +} + +func filterHarHeaders(inChannel <- chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { + for message := range inChannel { + sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) + outChannel <- message + } +} + func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tap.OutputChannelItem) { if connection == nil { panic("Websocket connection is nil") diff --git a/api/pkg/api/main.go b/api/pkg/api/main.go index 1aea81795..0907b0146 100644 --- a/api/pkg/api/main.go +++ b/api/pkg/api/main.go @@ -5,18 +5,20 @@ import ( "context" "encoding/json" "fmt" - "github.com/google/martian/har" - "go.mongodb.org/mongo-driver/bson/primitive" "mizuserver/pkg/database" "mizuserver/pkg/models" "mizuserver/pkg/resolver" - "mizuserver/pkg/tap" "mizuserver/pkg/utils" "net/url" "os" "path" "sort" + "strings" "time" + + "github.com/google/martian/har" + "github.com/up9inc/mizu/tap" + "go.mongodb.org/mongo-driver/bson/primitive" ) var k8sResolver *resolver.Resolver @@ -57,14 +59,21 @@ func startReadingFiles(workingDir string) { for true { dir, _ := os.Open(workingDir) dirFiles, _ := dir.Readdir(-1) - sort.Sort(utils.ByModTime(dirFiles)) - if len(dirFiles) == 0 { + var harFiles []os.FileInfo + for _, fileInfo := range dirFiles { + if strings.HasSuffix(fileInfo.Name(), ".har") { + harFiles = append(harFiles, fileInfo) + } + } + sort.Sort(utils.ByModTime(harFiles)) + + if len(harFiles) == 0 { fmt.Printf("Waiting for new files\n") time.Sleep(3 * time.Second) continue } - fileInfo := dirFiles[0] + fileInfo := harFiles[0] inputFilePath := path.Join(workingDir, fileInfo.Name()) file, err := os.Open(inputFilePath) utils.CheckErr(err) @@ -88,17 +97,25 @@ func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) { } for item := range outputItems { - saveHarToDb(item.HarEntry, item.RequestSenderIp) + saveHarToDb(item.HarEntry, item.ConnectionInfo.ClientIP) } } +func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) { + // tcpStreamFactory will block on write to channel. Empty channel to unblock. + // TODO: Make write to channel optional. + for range outboundLinkChannel { + } +} + + func saveHarToDb(entry *har.Entry, sender string) { entryBytes, _ := json.Marshal(entry) serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL) entryId := primitive.NewObjectID().Hex() var ( - resolvedSource *string - resolvedDestination *string + resolvedSource string + resolvedDestination string ) if k8sResolver != nil { resolvedSource = k8sResolver.Resolve(sender) diff --git a/api/pkg/api/socket_server_handlers.go b/api/pkg/api/socket_server_handlers.go index 12bf42adf..8164df004 100644 --- a/api/pkg/api/socket_server_handlers.go +++ b/api/pkg/api/socket_server_handlers.go @@ -5,10 +5,10 @@ import ( "fmt" "github.com/antoniodipinto/ikisocket" "github.com/up9inc/mizu/shared" + "github.com/up9inc/mizu/tap" "mizuserver/pkg/controllers" "mizuserver/pkg/models" "mizuserver/pkg/routes" - "mizuserver/pkg/tap" ) var browserClientSocketUUIDs = make([]string, 0) diff --git a/api/pkg/controllers/entries_controller.go b/api/pkg/controllers/entries_controller.go index e9702fe3b..282271b65 100644 --- a/api/pkg/controllers/entries_controller.go +++ b/api/pkg/controllers/entries_controller.go @@ -9,6 +9,7 @@ import ( "mizuserver/pkg/models" "mizuserver/pkg/utils" "mizuserver/pkg/validation" + "time" ) const ( @@ -64,7 +65,7 @@ func GetEntries(c *fiber.Ctx) error { return c.Status(fiber.StatusOK).JSON(baseEntries) } -func GetHAR(c *fiber.Ctx) error { +func GetHARs(c *fiber.Ctx) error { entriesFilter := &models.HarFetchRequestBody{} order := OrderDesc if err := c.QueryParser(entriesFilter); err != nil { @@ -75,11 +76,23 @@ func GetHAR(c *fiber.Ctx) error { return c.Status(fiber.StatusBadRequest).JSON(err) } + var timestampFrom, timestampTo int64 + + if entriesFilter.From < 0 { + timestampFrom = 0 + } else { + timestampFrom = entriesFilter.From + } + if entriesFilter.To <= 0 { + timestampTo = time.Now().UnixNano() / int64(time.Millisecond) + } else { + timestampTo = entriesFilter.To + } + var entries []models.MizuEntry database.GetEntriesTable(). + Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)). Order(fmt.Sprintf("timestamp %s", order)). - // Where(fmt.Sprintf("timestamp %s %v", operatorSymbol, entriesFilter.Timestamp)). - Limit(1000). Find(&entries) if len(entries) > 0 { @@ -87,26 +100,28 @@ func GetHAR(c *fiber.Ctx) error { utils.ReverseSlice(entries) } - harsObject := map[string]*har.HAR{} + harsObject := map[string]*models.ExtendedHAR{} for _, entryData := range entries { - harEntryObject := []byte(entryData.Entry) - var harEntry har.Entry - _ = json.Unmarshal(harEntryObject, &harEntry) + _ = json.Unmarshal([]byte(entryData.Entry), &harEntry) - sourceOfEntry := *entryData.ResolvedSource - if harOfSource, ok := harsObject[sourceOfEntry]; ok { + sourceOfEntry := entryData.ResolvedSource + fileName := fmt.Sprintf("%s.har", sourceOfEntry) + if harOfSource, ok := harsObject[fileName]; ok { harOfSource.Log.Entries = append(harOfSource.Log.Entries, &harEntry) } else { var entriesHar []*har.Entry entriesHar = append(entriesHar, &harEntry) - harsObject[sourceOfEntry] = &har.HAR{ - Log: &har.Log{ + harsObject[fileName] = &models.ExtendedHAR{ + Log: &models.ExtendedLog{ Version: "1.2", - Creator: &har.Creator{ - Name: "mizu", - Version: "0.0.1", + Creator: &models.ExtendedCreator{ + Creator: &har.Creator{ + Name: "mizu", + Version: "0.0.2", + }, + Source: sourceOfEntry, }, Entries: entriesHar, }, @@ -123,6 +138,50 @@ func GetHAR(c *fiber.Ctx) error { return c.Status(fiber.StatusOK).SendStream(buffer) } +func GetFullEntries(c *fiber.Ctx) error { + entriesFilter := &models.HarFetchRequestBody{} + order := OrderDesc + if err := c.QueryParser(entriesFilter); err != nil { + return c.Status(fiber.StatusBadRequest).JSON(err) + } + err := validation.Validate(entriesFilter) + if err != nil { + return c.Status(fiber.StatusBadRequest).JSON(err) + } + + var timestampFrom, timestampTo int64 + + if entriesFilter.From < 0 { + timestampFrom = 0 + } else { + timestampFrom = entriesFilter.From + } + if entriesFilter.To <= 0 { + timestampTo = time.Now().UnixNano() / int64(time.Millisecond) + } else { + timestampTo = entriesFilter.To + } + + var entries []models.MizuEntry + database.GetEntriesTable(). + Where(fmt.Sprintf("timestamp BETWEEN %v AND %v", timestampFrom, timestampTo)). + Order(fmt.Sprintf("timestamp %s", order)). + Find(&entries) + + if len(entries) > 0 { + // the entries always order from oldest to newest so we should revers + utils.ReverseSlice(entries) + } + + entriesArray := make([]har.Entry, 0) + for _, entryData := range entries { + var harEntry har.Entry + _ = json.Unmarshal([]byte(entryData.Entry), &harEntry) + entriesArray = append(entriesArray, harEntry) + } + return c.Status(fiber.StatusOK).JSON(entriesArray) +} + func GetEntry(c *fiber.Ctx) error { var entryData models.EntryData database.GetEntriesTable(). @@ -134,8 +193,8 @@ func GetEntry(c *fiber.Ctx) error { unmarshallErr := json.Unmarshal([]byte(entryData.Entry), &fullEntry) utils.CheckErr(unmarshallErr) - if entryData.ResolvedDestination != nil { - fullEntry.Request.URL = utils.SetHostname(fullEntry.Request.URL, *entryData.ResolvedDestination) + if entryData.ResolvedDestination != "" { + fullEntry.Request.URL = utils.SetHostname(fullEntry.Request.URL, entryData.ResolvedDestination) } return c.Status(fiber.StatusOK).JSON(fullEntry) diff --git a/api/pkg/models/models.go b/api/pkg/models/models.go index feb1a4b90..5cde7622c 100644 --- a/api/pkg/models/models.go +++ b/api/pkg/models/models.go @@ -2,8 +2,9 @@ package models import ( "encoding/json" + "github.com/google/martian/har" "github.com/up9inc/mizu/shared" - "mizuserver/pkg/tap" + "github.com/up9inc/mizu/tap" "time" ) @@ -11,17 +12,17 @@ type MizuEntry struct { ID uint `gorm:"primarykey"` CreatedAt time.Time UpdatedAt time.Time - Entry string `json:"entry,omitempty" gorm:"column:entry"` - EntryId string `json:"entryId" gorm:"column:entryId"` - Url string `json:"url" gorm:"column:url"` - Method string `json:"method" gorm:"column:method"` - Status int `json:"status" gorm:"column:status"` - RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"` - Service string `json:"service" gorm:"column:service"` - Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` - Path string `json:"path" gorm:"column:path"` - ResolvedSource *string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"` - ResolvedDestination *string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` + Entry string `json:"entry,omitempty" gorm:"column:entry"` + EntryId string `json:"entryId" gorm:"column:entryId"` + Url string `json:"url" gorm:"column:url"` + Method string `json:"method" gorm:"column:method"` + Status int `json:"status" gorm:"column:status"` + RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"` + Service string `json:"service" gorm:"column:service"` + Timestamp int64 `json:"timestamp" gorm:"column:timestamp"` + Path string `json:"path" gorm:"column:path"` + ResolvedSource string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"` + ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` } type BaseEntryDetails struct { @@ -36,8 +37,8 @@ type BaseEntryDetails struct { } type EntryData struct { - Entry string `json:"entry,omitempty"` - ResolvedDestination *string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` + Entry string `json:"entry,omitempty"` + ResolvedDestination string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"` } type EntriesFilter struct { @@ -47,7 +48,8 @@ type EntriesFilter struct { } type HarFetchRequestBody struct { - Limit int `query:"limit" validate:"max=5000"` + From int64 `query:"from"` + To int64 `query:"to"` } type WebSocketEntryMessage struct { @@ -55,7 +57,6 @@ type WebSocketEntryMessage struct { Data *BaseEntryDetails `json:"data,omitempty"` } - type WebSocketTappedEntryMessage struct { *shared.WebSocketMessageMetadata Data *tap.OutputChannelItem @@ -80,3 +81,23 @@ func CreateWebsocketTappedEntryMessage(base *tap.OutputChannelItem) ([]byte, err } return json.Marshal(message) } + +// ExtendedHAR is the top level object of a HAR log. +type ExtendedHAR struct { + Log *ExtendedLog `json:"log"` +} + +// ExtendedLog is the HAR HTTP request and response log. +type ExtendedLog struct { + // Version number of the HAR format. + Version string `json:"version"` + // Creator holds information about the log creator application. + Creator *ExtendedCreator `json:"creator"` + // Entries is a list containing requests and responses. + Entries []*har.Entry `json:"entries"` +} + +type ExtendedCreator struct { + *har.Creator + Source string `json:"_source"` +} diff --git a/api/pkg/resolver/resolver.go b/api/pkg/resolver/resolver.go index e45ce490a..14076a108 100644 --- a/api/pkg/resolver/resolver.go +++ b/api/pkg/resolver/resolver.go @@ -33,12 +33,12 @@ func (resolver *Resolver) Start(ctx context.Context) { } } -func (resolver *Resolver) Resolve(name string) *string { +func (resolver *Resolver) Resolve(name string) string { resolvedName, isFound := resolver.nameMap[name] if !isFound { - return nil + return "" } - return &resolvedName + return resolvedName } func (resolver *Resolver) watchPods(ctx context.Context) error { diff --git a/api/pkg/routes/public_routes.go b/api/pkg/routes/public_routes.go index c15203189..df589a62c 100644 --- a/api/pkg/routes/public_routes.go +++ b/api/pkg/routes/public_routes.go @@ -11,8 +11,11 @@ func EntriesRoutes(fiberApp *fiber.App) { routeGroup.Get("/entries", controllers.GetEntries) // get entries (base/thin entries) routeGroup.Get("/entries/:entryId", controllers.GetEntry) // get single (full) entry + routeGroup.Get("/exportEntries", controllers.GetFullEntries) + + + routeGroup.Get("/har", controllers.GetHARs) - routeGroup.Get("/har", controllers.GetHAR) routeGroup.Get("/resetDB", controllers.DeleteAllEntries) // get single (full) entry routeGroup.Get("/generalStats", controllers.GetGeneralStats) // get general stats about entries in DB diff --git a/api/pkg/sensitiveDataFiltering/consts.go b/api/pkg/sensitiveDataFiltering/consts.go new file mode 100644 index 000000000..e5624de73 --- /dev/null +++ b/api/pkg/sensitiveDataFiltering/consts.go @@ -0,0 +1,10 @@ +package sensitiveDataFiltering + +const maskedFieldPlaceholderValue = "[REDACTED]" + +//these values MUST be all lower case and contain no `-` or `_` characters +var personallyIdentifiableDataFields = []string{"token", "authorization", "authentication", "cookie", "userid", "password", + "username", "user", "key", "passcode", "pass", "auth", "authtoken", "jwt", + "bearer", "clientid", "clientsecret", "redirecturi", "phonenumber", + "zip", "zipcode", "address", "country", "firstname", "lastname", + "middlename", "fname", "lname", "birthdate"} diff --git a/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go b/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go new file mode 100644 index 000000000..cc0e4d289 --- /dev/null +++ b/api/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go @@ -0,0 +1,198 @@ +package sensitiveDataFiltering + +import ( + "encoding/json" + "encoding/xml" + "errors" + "fmt" + "github.com/up9inc/mizu/tap" + "net/url" + "strings" + + "github.com/beevik/etree" + "github.com/google/martian/har" + "github.com/up9inc/mizu/shared" +) + +func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, options *shared.TrafficFilteringOptions) { + harOutputItem.HarEntry.Request.Headers = filterHarHeaders(harOutputItem.HarEntry.Request.Headers) + harOutputItem.HarEntry.Response.Headers = filterHarHeaders(harOutputItem.HarEntry.Response.Headers) + + harOutputItem.HarEntry.Request.Cookies = make([]har.Cookie, 0, 0) + harOutputItem.HarEntry.Response.Cookies = make([]har.Cookie, 0, 0) + + harOutputItem.HarEntry.Request.URL = filterUrl(harOutputItem.HarEntry.Request.URL) + for i, queryString := range harOutputItem.HarEntry.Request.QueryString { + if isFieldNameSensitive(queryString.Name) { + harOutputItem.HarEntry.Request.QueryString[i].Value = maskedFieldPlaceholderValue + } + } + + if harOutputItem.HarEntry.Request.PostData != nil { + requestContentType := getContentTypeHeaderValue(harOutputItem.HarEntry.Request.Headers) + filteredRequestBody, err := filterHttpBody([]byte(harOutputItem.HarEntry.Request.PostData.Text), requestContentType, options) + if err == nil { + harOutputItem.HarEntry.Request.PostData.Text = string(filteredRequestBody) + } + } + if harOutputItem.HarEntry.Response.Content != nil { + responseContentType := getContentTypeHeaderValue(harOutputItem.HarEntry.Response.Headers) + filteredResponseBody, err := filterHttpBody(harOutputItem.HarEntry.Response.Content.Text, responseContentType, options) + if err == nil { + harOutputItem.HarEntry.Response.Content.Text = filteredResponseBody + } + } +} + +func filterHarHeaders(headers []har.Header) []har.Header { + newHeaders := make([]har.Header, 0) + for i, header := range headers { + if strings.ToLower(header.Name) == "cookie" { + continue + } else if isFieldNameSensitive(header.Name) { + newHeaders = append(newHeaders, har.Header{Name: header.Name, Value: maskedFieldPlaceholderValue}) + headers[i].Value = maskedFieldPlaceholderValue + } else { + newHeaders = append(newHeaders, header) + } + } + return newHeaders +} + +func getContentTypeHeaderValue(headers []har.Header) string { + for _, header := range headers { + if strings.ToLower(header.Name) == "content-type" { + return header.Value + } + } + return "" +} + +func isFieldNameSensitive(fieldName string) bool { + name := strings.ToLower(fieldName) + name = strings.ReplaceAll(name, "_", "") + name = strings.ReplaceAll(name, "-", "") + name = strings.ReplaceAll(name, " ", "") + + for _, sensitiveField := range personallyIdentifiableDataFields { + if strings.Contains(name, sensitiveField) { + return true + } + } + + return false +} + +func filterHttpBody(bytes []byte, contentType string, options *shared.TrafficFilteringOptions) ([]byte, error) { + mimeType := strings.Split(contentType, ";")[0] + switch strings.ToLower(mimeType) { + case "application/json": + return filterJsonBody(bytes) + case "text/html": + fallthrough + case "application/xhtml+xml": + fallthrough + case "text/xml": + fallthrough + case "application/xml": + return filterXmlEtree(bytes) + case "text/plain": + if options != nil && options.PlainTextMaskingRegexes != nil { + return filterPlainText(bytes, options), nil + } + } + return bytes, nil +} + +func filterPlainText(bytes []byte, options *shared.TrafficFilteringOptions) []byte { + for _, regex := range options.PlainTextMaskingRegexes { + bytes = regex.ReplaceAll(bytes, []byte(maskedFieldPlaceholderValue)) + } + return bytes +} + +func filterXmlEtree(bytes []byte) ([]byte, error) { + if !IsValidXML(bytes) { + return nil, errors.New("Invalid XML") + } + xmlDoc := etree.NewDocument() + err := xmlDoc.ReadFromBytes(bytes) + if err != nil { + return nil, err + } else { + filterXmlElement(xmlDoc.Root()) + } + return xmlDoc.WriteToBytes() +} + +func IsValidXML(data []byte) bool { + return xml.Unmarshal(data, new(interface{})) == nil +} + +func filterXmlElement(element *etree.Element) { + for i, attribute := range element.Attr { + if isFieldNameSensitive(attribute.Key) { + element.Attr[i].Value = maskedFieldPlaceholderValue + } + } + if element.ChildElements() == nil || len(element.ChildElements()) == 0 { + if isFieldNameSensitive(element.Tag) { + element.SetText(maskedFieldPlaceholderValue) + } + } else { + for _, element := range element.ChildElements() { + filterXmlElement(element) + } + } +} + +func filterJsonBody(bytes []byte) ([]byte, error) { + var bodyJsonMap map[string] interface{} + err := json.Unmarshal(bytes ,&bodyJsonMap) + if err != nil { + return nil, err + } + filterJsonMap(bodyJsonMap) + return json.Marshal(bodyJsonMap) +} + +func filterJsonMap(jsonMap map[string] interface{}) { + for key, value := range jsonMap { + if value == nil { + return + } + nestedMap, isNested := value.(map[string] interface{}) + if isNested { + filterJsonMap(nestedMap) + } else { + if isFieldNameSensitive(key) { + jsonMap[key] = maskedFieldPlaceholderValue + } + } + } +} + +// receives string representing url, returns string url without sensitive query param values (http://service/api?userId=bob&password=123&type=login -> http://service/api?userId=[REDACTED]&password=[REDACTED]&type=login) +func filterUrl(originalUrl string) string { + parsedUrl, err := url.Parse(originalUrl) + if err != nil { + return fmt.Sprintf("http://%s", maskedFieldPlaceholderValue) + } else { + if len(parsedUrl.RawQuery) > 0 { + newQueryArgs := make([]string, 0) + for urlQueryParamName, urlQueryParamValues := range parsedUrl.Query() { + newValues := urlQueryParamValues + if isFieldNameSensitive(urlQueryParamName) { + newValues = []string {maskedFieldPlaceholderValue} + } + for _, paramValue := range newValues { + newQueryArgs = append(newQueryArgs, fmt.Sprintf("%s=%s", urlQueryParamName, paramValue)) + } + } + + parsedUrl.RawQuery = strings.Join(newQueryArgs, "&") + } + + return parsedUrl.String() + } +} diff --git a/api/pkg/tap/http_matcher.go b/api/pkg/tap/http_matcher.go deleted file mode 100644 index c0a98fb1f..000000000 --- a/api/pkg/tap/http_matcher.go +++ /dev/null @@ -1,209 +0,0 @@ -package tap - -import ( - "fmt" - "net/http" - "strconv" - "strings" - "time" - - "github.com/orcaman/concurrent-map" -) - -type requestResponsePair struct { - Request httpMessage `json:"request"` - Response httpMessage `json:"response"` -} - -type envoyMessageWrapper struct { - HttpBufferedTrace requestResponsePair `json:"http_buffered_trace"` -} - -type headerKeyVal struct { - Key string `json:"key"` - Value string `json:"value"` -} - -type messageBody struct { - Truncated bool `json:"truncated"` - AsBytes string `json:"as_bytes"` -} - -type httpMessage struct { - IsRequest bool - Headers []headerKeyVal `json:"headers"` - HTTPVersion string `json:"httpVersion"` - Body messageBody `json:"body"` - captureTime time.Time - orig interface {} - requestSenderIp string -} - - -// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port} -type requestResponseMatcher struct { - openMessagesMap cmap.ConcurrentMap - -} - -func createResponseRequestMatcher() requestResponseMatcher { - newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()} - return *newMatcher -} - -func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time, body string, isHTTP2 bool) *envoyMessageWrapper { - split := splitIdent(ident) - key := genKey(split) - - messageExtraHeaders := []headerKeyVal{ - {Key: "x-up9-source", Value: split[0]}, - {Key: "x-up9-destination", Value: split[1] + ":" + split[3]}, - } - - requestHTTPMessage := requestToMessage(request, captureTime, body, &messageExtraHeaders, isHTTP2, split[0]) - - if response, found := matcher.openMessagesMap.Pop(key); found { - // Type assertion always succeeds because all of the map's values are of httpMessage type - responseHTTPMessage := response.(*httpMessage) - if responseHTTPMessage.IsRequest { - SilentError("Request-Duplicate", "Got duplicate request with same identifier\n") - return nil - } - Debug("Matched open Response for %s\n", key) - return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage) - } - - matcher.openMessagesMap.Set(key, &requestHTTPMessage) - Debug("Registered open Request for %s\n", key) - return nil -} - -func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time, body string, isHTTP2 bool) *envoyMessageWrapper { - split := splitIdent(ident) - key := genKey(split) - - responseHTTPMessage := responseToMessage(response, captureTime, body, isHTTP2) - - if request, found := matcher.openMessagesMap.Pop(key); found { - // Type assertion always succeeds because all of the map's values are of httpMessage type - requestHTTPMessage := request.(*httpMessage) - if !requestHTTPMessage.IsRequest { - SilentError("Response-Duplicate", "Got duplicate response with same identifier\n") - return nil - } - Debug("Matched open Request for %s\n", key) - return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage) - } - - matcher.openMessagesMap.Set(key, &responseHTTPMessage) - Debug("Registered open Response for %s\n", key) - return nil -} - -func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) *envoyMessageWrapper { - matcher.addDuration(requestHTTPMessage, responseHTTPMessage) - - return &envoyMessageWrapper{ - HttpBufferedTrace: requestResponsePair{ - Request: *requestHTTPMessage, - Response: *responseHTTPMessage, - }, - } -} - -func requestToMessage(request *http.Request, captureTime time.Time, body string, messageExtraHeaders *[]headerKeyVal, isHTTP2 bool, requestSenderIp string) httpMessage { - messageHeaders := make([]headerKeyVal, 0) - - for key, value := range request.Header { - messageHeaders = append(messageHeaders, headerKeyVal{Key: key, Value: value[0]}) - } - - if !isHTTP2 { - messageHeaders = append(messageHeaders, headerKeyVal{Key: ":method", Value: request.Method}) - messageHeaders = append(messageHeaders, headerKeyVal{Key: ":path", Value: request.RequestURI}) - messageHeaders = append(messageHeaders, headerKeyVal{Key: ":authority", Value: request.Host}) - messageHeaders = append(messageHeaders, headerKeyVal{Key: ":scheme", Value: "http"}) - } - - messageHeaders = append(messageHeaders, headerKeyVal{Key: "x-request-start", Value: fmt.Sprintf("%.3f", float64(captureTime.UnixNano()) / float64(1000000000))}) - - messageHeaders = append(messageHeaders, *messageExtraHeaders...) - - httpVersion := request.Proto - - requestBody := messageBody{Truncated: false, AsBytes: body} - - return httpMessage{ - IsRequest: true, - Headers: messageHeaders, - HTTPVersion: httpVersion, - Body: requestBody, - captureTime: captureTime, - orig: request, - requestSenderIp: requestSenderIp, - } -} - -func responseToMessage(response *http.Response, captureTime time.Time, body string, isHTTP2 bool) httpMessage { - messageHeaders := make([]headerKeyVal, 0) - - for key, value := range response.Header { - messageHeaders = append(messageHeaders, headerKeyVal{Key: key, Value: value[0]}) - } - - if !isHTTP2 { - messageHeaders = append(messageHeaders, headerKeyVal{Key: ":status", Value: strconv.Itoa(response.StatusCode)}) - } - - httpVersion := response.Proto - - requestBody := messageBody{Truncated: false, AsBytes: body} - - return httpMessage{ - IsRequest: false, - Headers: messageHeaders, - HTTPVersion: httpVersion, - Body: requestBody, - captureTime: captureTime, - orig: response, - } -} - -func (matcher *requestResponseMatcher) addDuration(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) { - durationMs := float64(responseHTTPMessage.captureTime.UnixNano() / 1000000) - float64(requestHTTPMessage.captureTime.UnixNano() / 1000000) - if durationMs < 1 { - durationMs = 1 - } - - responseHTTPMessage.Headers = append(responseHTTPMessage.Headers, headerKeyVal{Key: "x-up9-duration-ms", Value: fmt.Sprintf("%.0f", durationMs)}) -} - -func splitIdent(ident string) []string { - ident = strings.Replace(ident, "->", " ", -1) - return strings.Split(ident, " ") -} - -func genKey(split []string) string { - key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4]) - return key -} - -func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int { - keysToPop := make([]string, 0) - for item := range matcher.openMessagesMap.IterBuffered() { - // Map only contains values of type httpMessage - message, _ := item.Val.(*httpMessage) - - if message.captureTime.Before(t) { - keysToPop = append(keysToPop, item.Key) - } - } - - numDeleted := len(keysToPop) - - for _, key := range keysToPop { - _, _ = matcher.openMessagesMap.Pop(key) - } - - return numDeleted -} diff --git a/api/pkg/tap/tap_output.go b/api/pkg/tap/tap_output.go deleted file mode 100644 index 561c445c0..000000000 --- a/api/pkg/tap/tap_output.go +++ /dev/null @@ -1,239 +0,0 @@ -package tap - -import ( - "bytes" - "encoding/json" - "fmt" - "log" - "net/http" - "time" - - "github.com/gorilla/websocket" - "github.com/patrickmn/go-cache" -) - - -const ( - // Time allowed to write a message to the peer. - writeWait = 10 * time.Second - - // Time allowed to read the next pong message from the peer. - pongWait = 60 * time.Second - - // Send pings to peer with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 - - // Maximum message size allowed from peer. - maxMessageSize = 512 -) - -var ( - newline = []byte{'\n'} - space = []byte{' '} - hub *Hub - outboundSocketNotifyExpiringCache = cache.New(outboundThrottleCacheExpiryPeriod, outboundThrottleCacheExpiryPeriod) -) - -var upgrader = websocket.Upgrader{ - ReadBufferSize: 1024, - WriteBufferSize: 1024, - CheckOrigin: func (_ *http.Request) bool { return true }, -} - -// Client is a middleman between the websocket connection and the hub. -type Client struct { - hub *Hub - - // The websocket connection. - conn *websocket.Conn - - // Buffered channel of outbound messages. - send chan []byte -} - -type OutBoundLinkMessage struct { - SourceIP string `json:"sourceIP"` - IP string `json:"ip"` - Port int `json:"port"` - Type string `json:"type"` -} - - -// readPump pumps messages from the websocket connection to the hub. -// -// The application runs readPump in a per-connection goroutine. The application -// ensures that there is at most one reader on a connection by executing all -// reads from this goroutine. -func (c *Client) readPump() { - defer func() { - c.hub.unregister <- c - c.conn.Close() - }() - c.conn.SetReadLimit(maxMessageSize) - c.conn.SetReadDeadline(time.Now().Add(pongWait)) - c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) - for { - _, message, err := c.conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - log.Printf("error: %v", err) - } - break - } - message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) - c.hub.onMessageCallback(message) - } -} - -// writePump pumps messages from the hub to the websocket connection. -// -// A goroutine running writePump is started for each connection. The -// application ensures that there is at most one writer to a connection by -// executing all writes from this goroutine. -func (c *Client) writePump() { - ticker := time.NewTicker(pingPeriod) - defer func() { - ticker.Stop() - c.conn.Close() - }() - for { - select { - case message, ok := <-c.send: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if !ok { - // The hub closed the channel. - c.conn.WriteMessage(websocket.CloseMessage, []byte{}) - return - } - - w, err := c.conn.NextWriter(websocket.TextMessage) - if err != nil { - return - } - w.Write(message) - - - if err := w.Close(); err != nil { - return - } - case <-ticker.C: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - return - } - } - } -} - -type Hub struct { - // Registered clients. - clients map[*Client]bool - - // Inbound messages from the clients. - broadcast chan []byte - - // Register requests from the clients. - register chan *Client - - // Unregister requests from clients. - unregister chan *Client - - // Handle messages from client - onMessageCallback func([]byte) - - -} - -func newHub(onMessageCallback func([]byte)) *Hub { - return &Hub{ - broadcast: make(chan []byte), - register: make(chan *Client), - unregister: make(chan *Client), - clients: make(map[*Client]bool), - onMessageCallback: onMessageCallback, - } -} - -func (h *Hub) run() { - for { - select { - case client := <-h.register: - h.clients[client] = true - case client := <-h.unregister: - if _, ok := h.clients[client]; ok { - delete(h.clients, client) - close(client.send) - } - case message := <-h.broadcast: - // matched messages counter is incremented in this thread instead of in multiple http reader - // threads in order to reduce contention. - statsTracker.incMatchedMessages() - - for client := range h.clients { - select { - case client.send <- message: - default: - close(client.send) - delete(h.clients, client) - } - } - } - } -} - - -// serveWs handles websocket requests from the peer. -func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Println(err) - return - } - client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} - client.hub.register <- client - - // Allow collection of memory referenced by the caller by doing all work in - // new goroutines. - go client.writePump() - go client.readPump() -} - -func startOutputServer(port string, messageCallback func([]byte)) { - hub = newHub(messageCallback) - go hub.run() - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - serveWs(hub, w, r) - }) - err := http.ListenAndServe("0.0.0.0:" + port, nil) - if err != nil { - log.Fatal("Output server error: ", err) - } -} - -func broadcastReqResPair(reqResJson []byte) { - hub.broadcast <- reqResJson -} - -func broadcastOutboundLink(srcIP string, dstIP string, dstPort int) { - cacheKey := fmt.Sprintf("%s -> %s:%d", srcIP, dstIP, dstPort) - _, isInCache := outboundSocketNotifyExpiringCache.Get(cacheKey) - if isInCache { - return - } else { - outboundSocketNotifyExpiringCache.SetDefault(cacheKey, true) - } - - socketMessage := OutBoundLinkMessage{ - SourceIP: srcIP, - IP: dstIP, - Port: dstPort, - Type: "outboundSocketDetected", - } - - jsonStr, err := json.Marshal(socketMessage) - if err != nil { - log.Printf("error marshalling outbound socket detection object: %v", err) - } else { - hub.broadcast <- jsonStr - } -} diff --git a/api/pkg/utils/utils.go b/api/pkg/utils/utils.go index 9ecb89522..9492c9b55 100644 --- a/api/pkg/utils/utils.go +++ b/api/pkg/utils/utils.go @@ -65,9 +65,9 @@ func SetHostname(address, newHostname string) string { func GetResolvedBaseEntry(entry models.MizuEntry) models.BaseEntryDetails { entryUrl := entry.Url service := entry.Service - if entry.ResolvedDestination != nil { - entryUrl = SetHostname(entryUrl, *entry.ResolvedDestination) - service = SetHostname(service, *entry.ResolvedDestination) + if entry.ResolvedDestination != "" { + entryUrl = SetHostname(entryUrl, entry.ResolvedDestination) + service = SetHostname(service, entry.ResolvedDestination) } return models.BaseEntryDetails{ Id: entry.EntryId, @@ -84,4 +84,4 @@ func GetResolvedBaseEntry(entry models.MizuEntry) models.BaseEntryDetails { func GetBytesFromStruct(v interface{}) []byte{ a, _ := json.Marshal(v) return a -} \ No newline at end of file +} diff --git a/cli/Makefile b/cli/Makefile index a7af9999c..ef97249c7 100644 --- a/cli/Makefile +++ b/cli/Makefile @@ -1,6 +1,8 @@ -FOLDER=$(GOOS).$(GOARCH) +SUFFIX=$(GOOS)_$(GOARCH) COMMIT_HASH=$(shell git rev-parse HEAD) -GIT_BRANCH=$(shell git branch --show-current) +GIT_BRANCH=$(shell git branch --show-current | tr '[:upper:]' '[:lower:]') +GIT_VERSION=$(shell git branch --show-current | tr '[:upper:]' '[:lower:]') +BUILD_TIMESTAMP=$(shell date +%s) .PHONY: help .DEFAULT_GOAL := help @@ -12,16 +14,22 @@ install: go install mizu.go build: ## build mizu CLI binary (select platform via GOOS / GOARCH env variables) - go build -ldflags="-X 'github.com/up9inc/mizu/cli/mizu.GitCommitHash=$(COMMIT_HASH)' -X 'github.com/up9inc/mizu/cli/mizu.Branch=$(GIT_BRANCH)'" -o bin/$(FOLDER)/mizu mizu.go + go build -ldflags="-X 'github.com/up9inc/mizu/cli/mizu.GitCommitHash=$(COMMIT_HASH)' \ + -X 'github.com/up9inc/mizu/cli/mizu.Branch=$(GIT_BRANCH)' \ + -X 'github.com/up9inc/mizu/cli/mizu.BuildTimestamp=$(BUILD_TIMESTAMP)' \ + -X 'github.com/up9inc/mizu/cli/mizu.SemVer=$(SEM_VER)'" \ + -o bin/mizu_$(SUFFIX) mizu.go + (cd bin && shasum -a 256 mizu_${SUFFIX} > mizu_${SUFFIX}.sha256) build-all: ## build for all supported platforms @echo "Compiling for every OS and Platform" + @mkdir -p bin && echo "SHA256 checksums available for compiled binaries \n\nRun \`shasum -a 256 -c mizu_OS_ARCH.sha256\` to verify\n\n" > bin/README.md @$(MAKE) build GOOS=darwin GOARCH=amd64 @$(MAKE) build GOOS=linux GOARCH=amd64 @# $(MAKE) GOOS=windows GOARCH=amd64 @# $(MAKE) GOOS=linux GOARCH=386 @# $(MAKE) GOOS=windows GOARCH=386 - @# $(MAKE) GOOS=darwin GOARCH=arm64 + @$(MAKE) GOOS=darwin GOARCH=arm64 @# $(MAKE) GOOS=linux GOARCH=arm64 @# $(MAKE) GOOS=windows GOARCH=arm64 @echo "---------" diff --git a/cli/cmd/fetch.go b/cli/cmd/fetch.go index c4e8fb913..9e55f08fa 100644 --- a/cli/cmd/fetch.go +++ b/cli/cmd/fetch.go @@ -5,8 +5,10 @@ import ( ) type MizuFetchOptions struct { - Limit uint16 - Directory string + FromTimestamp int64 + ToTimestamp int64 + Directory string + MizuPort uint } var mizuFetchOptions = MizuFetchOptions{} @@ -23,6 +25,8 @@ var fetchCmd = &cobra.Command{ func init() { rootCmd.AddCommand(fetchCmd) - fetchCmd.Flags().Uint16VarP(&mizuFetchOptions.Limit, "limit", "l", 1000, "Provide a custom limit for entries to fetch") fetchCmd.Flags().StringVarP(&mizuFetchOptions.Directory, "directory", "d", ".", "Provide a custom directory for fetched entries") + fetchCmd.Flags().Int64Var(&mizuFetchOptions.FromTimestamp, "from", 0, "Custom start timestamp for fetched entries") + fetchCmd.Flags().Int64Var(&mizuFetchOptions.ToTimestamp, "to", 0, "Custom end timestamp fetched entries") + fetchCmd.Flags().UintVarP(&mizuFetchOptions.MizuPort, "port", "p", 8899, "Custom port for mizu") } diff --git a/cli/cmd/fetchRunner.go b/cli/cmd/fetchRunner.go index 71db8364d..58db0352b 100644 --- a/cli/cmd/fetchRunner.go +++ b/cli/cmd/fetchRunner.go @@ -14,7 +14,7 @@ import ( ) func RunMizuFetch(fetch *MizuFetchOptions) { - resp, err := http.Get(fmt.Sprintf("http://localhost:8899/api/har?limit=%v", fetch.Limit)) + resp, err := http.Get(fmt.Sprintf("http://localhost:%v/api/har?from=%v&to=%v", fetch.MizuPort, fetch.FromTimestamp, fetch.ToTimestamp)) if err != nil { log.Fatal(err) } @@ -53,7 +53,7 @@ func Unzip(reader *zip.Reader, dest string) error { path := filepath.Join(dest, f.Name) // Check for ZipSlip (Directory traversal) - if !strings.HasPrefix(path, filepath.Clean(dest) + string(os.PathSeparator)) { + if !strings.HasPrefix(path, filepath.Clean(dest)+string(os.PathSeparator)) { return fmt.Errorf("illegal file path: %s", path) } @@ -61,7 +61,7 @@ func Unzip(reader *zip.Reader, dest string) error { _ = os.MkdirAll(path, f.Mode()) } else { _ = os.MkdirAll(filepath.Dir(path), f.Mode()) - fmt.Print("writing HAR file [ ", path, " ] .. ") + fmt.Print("writing HAR file [ ", path, " ] .. ") f, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode()) if err != nil { return err @@ -70,7 +70,7 @@ func Unzip(reader *zip.Reader, dest string) error { if err := f.Close(); err != nil { panic(err) } - fmt.Println(" done") + fmt.Println(" done") }() _, err = io.Copy(f, rc) @@ -90,5 +90,3 @@ func Unzip(reader *zip.Reader, dest string) error { return nil } - - diff --git a/cli/cmd/tap.go b/cli/cmd/tap.go index c392f99ba..69c4b988a 100644 --- a/cli/cmd/tap.go +++ b/cli/cmd/tap.go @@ -10,11 +10,13 @@ import ( ) type MizuTapOptions struct { - GuiPort uint16 - Namespace string - KubeConfigPath string - MizuImage string - MizuPodPort uint16 + GuiPort uint16 + Namespace string + AllNamespaces bool + KubeConfigPath string + MizuImage string + MizuPodPort uint16 + PlainTextFilterRegexes []string } @@ -47,7 +49,9 @@ func init() { tapCmd.Flags().Uint16VarP(&mizuTapOptions.GuiPort, "gui-port", "p", 8899, "Provide a custom port for the web interface webserver") tapCmd.Flags().StringVarP(&mizuTapOptions.Namespace, "namespace", "n", "", "Namespace selector") + tapCmd.Flags().BoolVarP(&mizuTapOptions.AllNamespaces, "all-namespaces", "A", false, "Tap all namespaces") tapCmd.Flags().StringVarP(&mizuTapOptions.KubeConfigPath, "kube-config", "k", "", "Path to kube-config file") tapCmd.Flags().StringVarP(&mizuTapOptions.MizuImage, "mizu-image", "", fmt.Sprintf("gcr.io/up9-docker-hub/mizu/%s:latest", mizu.Branch), "Custom image for mizu collector") tapCmd.Flags().Uint16VarP(&mizuTapOptions.MizuPodPort, "mizu-port", "", 8899, "Port which mizu cli will attempt to forward from the mizu collector pod") + tapCmd.Flags().StringArrayVarP(&mizuTapOptions.PlainTextFilterRegexes, "regex-masking", "r", nil, "List of regex expressions that are used to filter matching values from text/plain http bodies") } diff --git a/cli/cmd/tapRunner.go b/cli/cmd/tapRunner.go index 040957ecb..6c0cc7d9b 100644 --- a/cli/cmd/tapRunner.go +++ b/cli/cmd/tapRunner.go @@ -3,6 +3,7 @@ package cmd import ( "context" "fmt" + "github.com/up9inc/mizu/shared" "os" "os/signal" "regexp" @@ -26,24 +27,30 @@ const ( var currentlyTappedPods []core.Pod func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { - kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath, tappingOptions.Namespace) + mizuApiFilteringOptions, err := getMizuApiFilteringOptions(tappingOptions) + if err != nil { + return + } + + kubernetesProvider := kubernetes.NewProvider(tappingOptions.KubeConfigPath) defer cleanUpMizuResources(kubernetesProvider) ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel will be called when this function exits - if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery); err != nil { + targetNamespace := getNamespace(tappingOptions, kubernetesProvider) + if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegexQuery, targetNamespace); err != nil { return } else { currentlyTappedPods = matchingPods } - nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, currentlyTappedPods) + nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods) if err != nil { return } - if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions); err != nil { + if err := createMizuResources(ctx, kubernetesProvider, nodeToTappedPodIPMap, tappingOptions, mizuApiFilteringOptions); err != nil { return } @@ -57,8 +64,8 @@ func RunMizuTap(podRegexQuery *regexp.Regexp, tappingOptions *MizuTapOptions) { // TODO handle incoming traffic from tapper using a channel } -func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { - if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions); err != nil { +func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error { + if err := createMizuAggregator(ctx, kubernetesProvider, tappingOptions, mizuApiFilteringOptions); err != nil { return err } @@ -69,11 +76,11 @@ func createMizuResources(ctx context.Context, kubernetesProvider *kubernetes.Pro return nil } -func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions) error { +func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappingOptions *MizuTapOptions, mizuApiFilteringOptions *shared.TrafficFilteringOptions) error { var err error mizuServiceAccountExists = createRBACIfNecessary(ctx, kubernetesProvider) - _, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists) + _, err = kubernetesProvider.CreateMizuAggregatorPod(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName, tappingOptions.MizuImage, mizuServiceAccountExists, mizuApiFilteringOptions) if err != nil { fmt.Printf("Error creating mizu collector pod: %v\n", err) return err @@ -88,6 +95,24 @@ func createMizuAggregator(ctx context.Context, kubernetesProvider *kubernetes.Pr return nil } +func getMizuApiFilteringOptions(tappingOptions *MizuTapOptions) (*shared.TrafficFilteringOptions, error) { + if tappingOptions.PlainTextFilterRegexes == nil || len(tappingOptions.PlainTextFilterRegexes) == 0 { + return nil, nil + } + + compiledRegexSlice := make([]*shared.SerializableRegexp, 0) + for _, regexStr := range tappingOptions.PlainTextFilterRegexes { + compiledRegex, err := shared.CompileRegexToSerializableRegexp(regexStr) + if err != nil { + fmt.Printf("Regex %s is invalid: %v", regexStr, err) + return nil, err + } + compiledRegexSlice = append(compiledRegexSlice, compiledRegex) + } + + return &shared.TrafficFilteringOptions{PlainTextMaskingRegexes: compiledRegexSlice}, nil +} + func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provider, nodeToTappedPodIPMap map[string][]string, tappingOptions *MizuTapOptions) error { if err := kubernetesProvider.ApplyMizuTapperDaemonSet( ctx, @@ -109,30 +134,32 @@ func createMizuTappers(ctx context.Context, kubernetesProvider *kubernetes.Provi func cleanUpMizuResources(kubernetesProvider *kubernetes.Provider) { fmt.Printf("\nRemoving mizu resources\n") - removalCtx, _ := context.WithTimeout(context.Background(), 5 * time.Second) + removalCtx, _ := context.WithTimeout(context.Background(), 5*time.Second) if err := kubernetesProvider.RemovePod(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil { - fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err); + fmt.Printf("Error removing Pod %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err) } if err := kubernetesProvider.RemoveService(removalCtx, mizu.ResourcesNamespace, mizu.AggregatorPodName); err != nil { - fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err); + fmt.Printf("Error removing Service %s in namespace %s: %s (%v,%+v)\n", mizu.AggregatorPodName, mizu.ResourcesNamespace, err, err, err) } if err := kubernetesProvider.RemoveDaemonSet(removalCtx, mizu.ResourcesNamespace, mizu.TapperDaemonSetName); err != nil { - fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", mizu.TapperDaemonSetName, mizu.ResourcesNamespace, err, err, err); + fmt.Printf("Error removing DaemonSet %s in namespace %s: %s (%v,%+v)\n", mizu.TapperDaemonSetName, mizu.ResourcesNamespace, err, err, err) } } func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Provider, cancel context.CancelFunc, podRegex *regexp.Regexp, tappingOptions *MizuTapOptions) { - added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, kubernetesProvider.Namespace), podRegex) + targetNamespace := getNamespace(tappingOptions, kubernetesProvider) + + added, modified, removed, errorChan := kubernetes.FilteredWatch(ctx, kubernetesProvider.GetPodWatcher(ctx, targetNamespace), podRegex) restartTappers := func() { - if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex); err != nil { + if matchingPods, err := kubernetesProvider.GetAllPodsMatchingRegex(ctx, podRegex, targetNamespace); err != nil { fmt.Printf("Error getting pods by regex: %s (%v,%+v)\n", err, err, err) cancel() } else { currentlyTappedPods = matchingPods } - nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(ctx, kubernetesProvider, currentlyTappedPods) + nodeToTappedPodIPMap, err := getNodeHostToTappedPodIpsMap(currentlyTappedPods) if err != nil { fmt.Printf("Error building node to ips map: %s (%v,%+v)\n", err, err, err) cancel() @@ -147,14 +174,14 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro for { select { - case newTarget := <- added: + case newTarget := <-added: fmt.Printf("+%s\n", newTarget.Name) - case removedTarget := <- removed: + case removedTarget := <-removed: fmt.Printf("-%s\n", removedTarget.Name) restartTappersDebouncer.SetOn() - case modifiedTarget := <- modified: + case modifiedTarget := <-modified: // Act only if the modified pod has already obtained an IP address. // After filtering for IPs, on a normal pod restart this includes the following events: // - Pod deletion @@ -165,11 +192,11 @@ func watchPodsForTapping(ctx context.Context, kubernetesProvider *kubernetes.Pro restartTappersDebouncer.SetOn() } - case <- errorChan: + case <-errorChan: // TODO: Does this also perform cleanup? cancel() - case <- ctx.Done(): + case <-ctx.Done(): return } } @@ -182,13 +209,13 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi var portForward *kubernetes.PortForward for { select { - case <- added: + case <-added: continue - case <- removed: + case <-removed: fmt.Printf("%s removed\n", mizu.AggregatorPodName) cancel() return - case modifiedPod := <- modified: + case modifiedPod := <-modified: if modifiedPod.Status.Phase == "Running" && !isPodReady { isPodReady = true var err error @@ -200,16 +227,16 @@ func portForwardApiPod(ctx context.Context, kubernetesProvider *kubernetes.Provi } } - case <- time.After(25 * time.Second): + case <-time.After(25 * time.Second): if !isPodReady { fmt.Printf("error: %s pod was not ready in time", mizu.AggregatorPodName) cancel() } - case <- errorChan: + case <-errorChan: cancel() - case <- ctx.Done(): + case <-ctx.Done(): if portForward != nil { portForward.Stop() } @@ -225,11 +252,7 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P return false } if !mizuRBACExists { - var versionString = mizu.Version - if mizu.GitCommitHash != "" { - versionString += "-" + mizu.GitCommitHash - } - err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.ResourcesNamespace, versionString) + err := kubernetesProvider.CreateMizuRBAC(ctx, mizu.ResourcesNamespace, mizu.RBACVersion) if err != nil { fmt.Printf("warning: could not create mizu rbac resources %v\n", err) return false @@ -238,12 +261,12 @@ func createRBACIfNecessary(ctx context.Context, kubernetesProvider *kubernetes.P return true } -func getNodeHostToTappedPodIpsMap(ctx context.Context, kubernetesProvider *kubernetes.Provider, tappedPods []core.Pod) (map[string][]string, error) { +func getNodeHostToTappedPodIpsMap(tappedPods []core.Pod) (map[string][]string, error) { nodeToTappedPodIPMap := make(map[string][]string, 0) for _, pod := range tappedPods { existingList := nodeToTappedPodIPMap[pod.Spec.NodeName] if existingList == nil { - nodeToTappedPodIPMap[pod.Spec.NodeName] = []string {pod.Status.PodIP} + nodeToTappedPodIPMap[pod.Spec.NodeName] = []string{pod.Status.PodIP} } else { nodeToTappedPodIPMap[pod.Spec.NodeName] = append(nodeToTappedPodIPMap[pod.Spec.NodeName], pod.Status.PodIP) } @@ -257,9 +280,9 @@ func waitForFinish(ctx context.Context, cancel context.CancelFunc) { // block until ctx cancel is called or termination signal is received select { - case <- ctx.Done(): + case <-ctx.Done(): break - case <- sigChan: + case <-sigChan: cancel() } } @@ -273,7 +296,7 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption for { select { - case <- ctx.Done(): + case <-ctx.Done(): return default: err = controlSocket.SendNewTappedPodsListMessage(currentlyTappedPods) @@ -285,3 +308,13 @@ func syncApiStatus(ctx context.Context, cancel context.CancelFunc, tappingOption } } + +func getNamespace(tappingOptions *MizuTapOptions, kubernetesProvider *kubernetes.Provider) string { + if tappingOptions.AllNamespaces { + return mizu.K8sAllNamespaces + } else if len(tappingOptions.Namespace) > 0 { + return tappingOptions.Namespace + } else { + return kubernetesProvider.CurrentNamespace() + } +} diff --git a/cli/cmd/version.go b/cli/cmd/version.go index 11a348a27..366b05da2 100644 --- a/cli/cmd/version.go +++ b/cli/cmd/version.go @@ -3,19 +3,38 @@ package cmd import ( "fmt" "github.com/up9inc/mizu/cli/mizu" + "strconv" + "time" "github.com/spf13/cobra" ) +type MizuVersionOptions struct { + DebugInfo bool +} + + +var mizuVersionOptions = &MizuVersionOptions{} + var versionCmd = &cobra.Command{ Use: "version", Short: "Print version info", RunE: func(cmd *cobra.Command, args []string) error { - fmt.Printf("%s (%s) %s\n", mizu.Version, mizu.Branch, mizu.GitCommitHash) + if mizuVersionOptions.DebugInfo { + timeStampInt, _ := strconv.ParseInt(mizu.BuildTimestamp, 10, 0) + fmt.Printf("Version: %s \nBranch: %s (%s) \n", mizu.SemVer, mizu.Branch, mizu.GitCommitHash) + fmt.Printf("Build Time: %s (%s)\n", mizu.BuildTimestamp, time.Unix(timeStampInt, 0)) + + } else { + fmt.Printf("Version: %s (%s)\n", mizu.SemVer, mizu.Branch) + } return nil }, } func init() { rootCmd.AddCommand(versionCmd) + + versionCmd.Flags().BoolVarP(&mizuVersionOptions.DebugInfo, "debug", "d", false, "Provide all information about version") + } diff --git a/cli/cmd/view.go b/cli/cmd/view.go index aa4aa928d..529c756b1 100644 --- a/cli/cmd/view.go +++ b/cli/cmd/view.go @@ -1,7 +1,6 @@ package cmd import ( - "fmt" "github.com/spf13/cobra" ) @@ -9,7 +8,7 @@ var viewCmd = &cobra.Command{ Use: "view", Short: "Open GUI in browser", RunE: func(cmd *cobra.Command, args []string) error { - fmt.Println("Not implemented") + runMizuView() return nil }, } diff --git a/cli/cmd/viewRunner.go b/cli/cmd/viewRunner.go new file mode 100644 index 000000000..059a9b2e2 --- /dev/null +++ b/cli/cmd/viewRunner.go @@ -0,0 +1,33 @@ +package cmd + +import ( + "context" + "fmt" + "github.com/up9inc/mizu/cli/kubernetes" + "github.com/up9inc/mizu/cli/mizu" + "net/http" +) + +func runMizuView() { + kubernetesProvider := kubernetes.NewProvider("") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + exists, err := kubernetesProvider.DoesServicesExist(ctx, mizu.ResourcesNamespace, mizu.AggregatorPodName) + if err != nil { + panic(err) + } + if !exists { + fmt.Printf("The %s service not found\n", mizu.AggregatorPodName) + return + } + + _, err = http.Get("http://localhost:8899/") + if err == nil { + fmt.Printf("Found a running service %s and open port 8899\n", mizu.AggregatorPodName) + return + } + fmt.Printf("Found service %s, creating port forwarding to 8899\n", mizu.AggregatorPodName) + portForwardApiPod(ctx, kubernetesProvider, cancel, &MizuTapOptions{GuiPort: 8899, MizuPodPort: 8899}) +} diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 348068b4c..09b000bcb 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -6,19 +6,19 @@ import ( "encoding/json" "errors" "fmt" - "path/filepath" "regexp" - applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1" - applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1" - applyconfcore "k8s.io/client-go/applyconfigurations/core/v1" + "github.com/up9inc/mizu/shared" core "k8s.io/api/core/v1" rbac "k8s.io/api/rbac/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/watch" + applyconfapp "k8s.io/client-go/applyconfigurations/apps/v1" + applyconfcore "k8s.io/client-go/applyconfigurations/core/v1" + applyconfmeta "k8s.io/client-go/applyconfigurations/meta/v1" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/azure" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -42,7 +42,7 @@ const ( fieldManagerName = "mizu-manager" ) -func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider { +func NewProvider(kubeConfigPath string) *Provider { kubernetesConfig := loadKubernetesConfiguration(kubeConfigPath) restClientConfig, err := kubernetesConfig.ClientConfig() if err != nil { @@ -50,25 +50,18 @@ func NewProvider(kubeConfigPath string, overrideNamespace string) *Provider { } clientSet := getClientSet(restClientConfig) - var namespace string - if len(overrideNamespace) > 0 { - namespace = overrideNamespace - } else { - configuredNamespace, _, err := kubernetesConfig.Namespace() - if err != nil { - panic(err) - } - namespace = configuredNamespace - } - return &Provider{ clientSet: clientSet, kubernetesConfig: kubernetesConfig, clientConfig: *restClientConfig, - Namespace: namespace, } } +func (provider *Provider) CurrentNamespace() string { + ns, _, _ := provider.kubernetesConfig.Namespace() + return ns +} + func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) watch.Interface { watcher, err := provider.clientSet.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{Watch: true}) if err != nil { @@ -77,20 +70,16 @@ func (provider *Provider) GetPodWatcher(ctx context.Context, namespace string) w return watcher } -func (provider *Provider) GetPods(ctx context.Context, namespace string) { - pods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) +func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool, mizuApiFilteringOptions *shared.TrafficFilteringOptions) (*core.Pod, error) { + marshaledFilteringOptions, err := json.Marshal(mizuApiFilteringOptions) if err != nil { - panic(err.Error()) + return nil, err } - fmt.Printf("There are %d pods in Namespace %s\n", len(pods.Items), namespace) -} - -func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace string, podName string, podImage string, linkServiceAccount bool) (*core.Pod, error) { pod := &core.Pod{ ObjectMeta: metav1.ObjectMeta{ Name: podName, Namespace: namespace, - Labels: map[string]string{"app": podName}, + Labels: map[string]string{"app": podName}, }, Spec: core.PodSpec{ Containers: []core.Container{ @@ -98,16 +87,20 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace Name: podName, Image: podImage, ImagePullPolicy: core.PullAlways, - Command: []string {"./mizuagent", "--aggregator"}, + Command: []string{"./mizuagent", "--aggregator"}, Env: []core.EnvVar{ { - Name: "HOST_MODE", + Name: shared.HostModeEnvVar, Value: "1", }, + { + Name: shared.MizuFilteringOptionsEnvVar, + Value: string(marshaledFilteringOptions), + }, }, }, }, - DNSPolicy: "ClusterFirstWithHostNet", + DNSPolicy: core.DNSClusterFirstWithHostNet, TerminationGracePeriodSeconds: new(int64), // Affinity: TODO: define node selector for all relevant nodes for this mizu instance }, @@ -122,19 +115,19 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace func (provider *Provider) CreateService(ctx context.Context, namespace string, serviceName string, appLabelValue string) (*core.Service, error) { service := core.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: serviceName, + Name: serviceName, Namespace: namespace, }, Spec: core.ServiceSpec{ - Ports: []core.ServicePort {{TargetPort: intstr.FromInt(8899), Port: 80}}, - Type: core.ServiceTypeClusterIP, + Ports: []core.ServicePort{{TargetPort: intstr.FromInt(8899), Port: 80}}, + Type: core.ServiceTypeClusterIP, Selector: map[string]string{"app": appLabelValue}, }, } return provider.clientSet.CoreV1().Services(namespace).Create(ctx, &service, metav1.CreateOptions{}) } -func (provider *Provider) DoesMizuRBACExist(ctx context.Context, namespace string) (bool, error){ +func (provider *Provider) DoesMizuRBACExist(ctx context.Context, namespace string) (bool, error) { serviceAccount, err := provider.clientSet.CoreV1().ServiceAccounts(namespace).Get(ctx, serviceAccountName, metav1.GetOptions{}) var statusError *k8serrors.StatusError @@ -150,7 +143,22 @@ func (provider *Provider) DoesMizuRBACExist(ctx context.Context, namespace strin return serviceAccount != nil, nil } -func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string ,version string) error { +func (provider *Provider) DoesServicesExist(ctx context.Context, namespace string, serviceName string) (bool, error) { + service, err := provider.clientSet.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{}) + + var statusError *k8serrors.StatusError + if errors.As(err, &statusError) { + if statusError.ErrStatus.Reason == metav1.StatusReasonNotFound { + return false, nil + } + } + if err != nil { + return false, err + } + return service != nil, nil +} + +func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string, version string) error { clusterRoleName := "mizu-cluster-role" serviceAccount := &core.ServiceAccount{ @@ -162,25 +170,25 @@ func (provider *Provider) CreateMizuRBAC(ctx context.Context, namespace string , } clusterRole := &rbac.ClusterRole{ ObjectMeta: metav1.ObjectMeta{ - Name: clusterRoleName, + Name: clusterRoleName, Labels: map[string]string{"mizu-cli-version": version}, }, Rules: []rbac.PolicyRule{ { - APIGroups: []string {"", "extensions", "apps"}, - Resources: []string {"pods", "services", "endpoints"}, - Verbs: []string {"list", "get", "watch"}, + APIGroups: []string{"", "extensions", "apps"}, + Resources: []string{"pods", "services", "endpoints"}, + Verbs: []string{"list", "get", "watch"}, }, }, } clusterRoleBinding := &rbac.ClusterRoleBinding{ ObjectMeta: metav1.ObjectMeta{ - Name: "mizu-cluster-role-binding", + Name: "mizu-cluster-role-binding", Labels: map[string]string{"mizu-cli-version": version}, }, RoleRef: rbac.RoleRef{ - Name: clusterRoleName, - Kind: "ClusterRole", + Name: clusterRoleName, + Kind: "ClusterRole", APIGroup: "rbac.authorization.k8s.io", }, Subjects: []rbac.Subject{ @@ -232,24 +240,51 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac agentContainer.WithSecurityContext(applyconfcore.SecurityContext().WithPrivileged(privileged)) agentContainer.WithCommand("./mizuagent", "-i", "any", "--tap", "--hardump", "--aggregator-address", fmt.Sprintf("ws://%s/wsTapper", aggregatorPodIp)) agentContainer.WithEnv( - applyconfcore.EnvVar().WithName("HOST_MODE").WithValue("1"), - applyconfcore.EnvVar().WithName("AGGREGATOR_ADDRESS").WithValue(aggregatorPodIp), - applyconfcore.EnvVar().WithName("TAPPED_ADDRESSES_PER_HOST").WithValue(string(nodeToTappedPodIPMapJsonStr)), + applyconfcore.EnvVar().WithName(shared.HostModeEnvVar).WithValue("1"), + applyconfcore.EnvVar().WithName(shared.TappedAddressesPerNodeDictEnvVar).WithValue(string(nodeToTappedPodIPMapJsonStr)), ) agentContainer.WithEnv( - applyconfcore.EnvVar().WithName("NODE_NAME").WithValueFrom( + applyconfcore.EnvVar().WithName(shared.NodeNameEnvVar).WithValueFrom( applyconfcore.EnvVarSource().WithFieldRef( applyconfcore.ObjectFieldSelector().WithAPIVersion("v1").WithFieldPath("spec.nodeName"), ), ), ) - podSpec := applyconfcore.PodSpec().WithHostNetwork(true).WithDNSPolicy("ClusterFirstWithHostNet").WithTerminationGracePeriodSeconds(0) + nodeNames := make([]string, 0, len(nodeToTappedPodIPMap)) + for nodeName := range nodeToTappedPodIPMap { + nodeNames = append(nodeNames, nodeName) + } + nodeSelectorRequirement := applyconfcore.NodeSelectorRequirement() + nodeSelectorRequirement.WithKey("kubernetes.io/hostname") + nodeSelectorRequirement.WithOperator(core.NodeSelectorOpIn) + nodeSelectorRequirement.WithValues(nodeNames...) + nodeSelectorTerm := applyconfcore.NodeSelectorTerm() + nodeSelectorTerm.WithMatchExpressions(nodeSelectorRequirement) + nodeSelector := applyconfcore.NodeSelector() + nodeSelector.WithNodeSelectorTerms(nodeSelectorTerm) + nodeAffinity := applyconfcore.NodeAffinity() + nodeAffinity.WithRequiredDuringSchedulingIgnoredDuringExecution(nodeSelector) + affinity := applyconfcore.Affinity() + affinity.WithNodeAffinity(nodeAffinity) + + noExecuteToleration := applyconfcore.Toleration() + noExecuteToleration.WithOperator(core.TolerationOpExists) + noExecuteToleration.WithEffect(core.TaintEffectNoExecute) + noScheduleToleration := applyconfcore.Toleration() + noScheduleToleration.WithOperator(core.TolerationOpExists) + noScheduleToleration.WithEffect(core.TaintEffectNoSchedule) + + podSpec := applyconfcore.PodSpec() + podSpec.WithHostNetwork(true) + podSpec.WithDNSPolicy(core.DNSClusterFirstWithHostNet) + podSpec.WithTerminationGracePeriodSeconds(0) if linkServiceAccount { podSpec.WithServiceAccountName(serviceAccountName) } podSpec.WithContainers(agentContainer) - + podSpec.WithAffinity(affinity) + podSpec.WithTolerations(noExecuteToleration, noScheduleToleration) podTemplate := applyconfcore.PodTemplateSpec() podTemplate.WithLabels(map[string]string{"app": tapperPodName}) @@ -265,8 +300,8 @@ func (provider *Provider) ApplyMizuTapperDaemonSet(ctx context.Context, namespac return err } -func (provider *Provider) GetAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp) ([]core.Pod, error) { - pods, err := provider.clientSet.CoreV1().Pods("").List(ctx, metav1.ListOptions{}) +func (provider *Provider) GetAllPodsMatchingRegex(ctx context.Context, regex *regexp.Regexp, namespace string) ([]core.Pod, error) { + pods, err := provider.clientSet.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) if err != nil { return nil, err } diff --git a/cli/mizu/consts.go b/cli/mizu/consts.go index 22532d0e6..9f72620ad 100644 --- a/cli/mizu/consts.go +++ b/cli/mizu/consts.go @@ -1,9 +1,11 @@ package mizu var ( - Version = "v0.0.1" - Branch = "develop" - GitCommitHash = "" // this var is overridden using ldflags in makefile when building + SemVer = "0.0.1" + Branch = "develop" + GitCommitHash = "" // this var is overridden using ldflags in makefile when building + BuildTimestamp = "" // this var is overridden using ldflags in makefile when building + RBACVersion = "v1" ) const ( @@ -11,4 +13,5 @@ const ( TapperDaemonSetName = "mizu-tapper-daemon-set" AggregatorPodName = "mizu-collector" TapperPodName = "mizu-tapper" + K8sAllNamespaces = "" ) diff --git a/shared/consts.go b/shared/consts.go new file mode 100644 index 000000000..ccda67615 --- /dev/null +++ b/shared/consts.go @@ -0,0 +1,8 @@ +package shared + +const ( + MizuFilteringOptionsEnvVar = "SENSITIVE_DATA_FILTERING_OPTIONS" + HostModeEnvVar = "HOST_MODE" + NodeNameEnvVar = "NODE_NAME" + TappedAddressesPerNodeDictEnvVar = "TAPPED_ADDRESSES_PER_HOST" +) diff --git a/shared/models.go b/shared/models.go index dba9e1731..cbf73b594 100644 --- a/shared/models.go +++ b/shared/models.go @@ -33,3 +33,7 @@ func CreateWebSocketStatusMessage(tappingStatus TapStatus) WebSocketStatusMessag TappingStatus: tappingStatus, } } + +type TrafficFilteringOptions struct { + PlainTextMaskingRegexes []*SerializableRegexp +} diff --git a/shared/serializableRegexp.go b/shared/serializableRegexp.go new file mode 100644 index 000000000..e311fdeb5 --- /dev/null +++ b/shared/serializableRegexp.go @@ -0,0 +1,30 @@ +package shared + +import "regexp" + +type SerializableRegexp struct { + regexp.Regexp +} + +func CompileRegexToSerializableRegexp(expr string) (*SerializableRegexp, error) { + re, err := regexp.Compile(expr) + if err != nil { + return nil, err + } + return &SerializableRegexp{*re}, nil +} + +// UnmarshalText is by json.Unmarshal. +func (r *SerializableRegexp) UnmarshalText(text []byte) error { + rr, err := CompileRegexToSerializableRegexp(string(text)) + if err != nil { + return err + } + *r = *rr + return nil +} + +// MarshalText is used by json.Marshal. +func (r *SerializableRegexp) MarshalText() ([]byte, error) { + return []byte(r.String()), nil +} diff --git a/api/pkg/tap/cleaner.go b/tap/cleaner.go similarity index 100% rename from api/pkg/tap/cleaner.go rename to tap/cleaner.go diff --git a/tap/go.mod b/tap/go.mod new file mode 100644 index 000000000..d8c38a22c --- /dev/null +++ b/tap/go.mod @@ -0,0 +1,12 @@ +module github.com/up9inc/mizu/tap + +go 1.16 + +require ( + github.com/google/gopacket v1.1.19 + github.com/google/martian v2.1.0+incompatible + github.com/gorilla/websocket v1.4.2 + github.com/orcaman/concurrent-map v0.0.0-20210106121528-16402b402231 + github.com/patrickmn/go-cache v2.1.0+incompatible + golang.org/x/net v0.0.0-20210421230115-4e50805a0758 +) diff --git a/tap/go.sum b/tap/go.sum new file mode 100644 index 000000000..a110e49b4 --- /dev/null +++ b/tap/go.sum @@ -0,0 +1,31 @@ +github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= +github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= +github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= +github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= +github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ= +github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= +github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc= +github.com/patrickmn/go-cache v2.1.0+incompatible/go.mod h1:3Qf8kWWT7OJRJbdiICTKqZju1ZixQ/KpMGzzAfe6+WQ= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo= +golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/api/pkg/tap/grpc_assembler.go b/tap/grpc_assembler.go similarity index 96% rename from api/pkg/tap/grpc_assembler.go rename to tap/grpc_assembler.go index 076faf7cd..72b5665f1 100644 --- a/api/pkg/tap/grpc_assembler.go +++ b/tap/grpc_assembler.go @@ -84,14 +84,14 @@ type GrpcAssembler struct { framer *http2.Framer } -func (ga *GrpcAssembler) readMessage() (uint32, interface{}, string, error) { +func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) { // Exactly one Framer is used for each half connection. // (Instead of creating a new Framer for each ReadFrame operation) // This is needed in order to decompress the headers, // because the compression context is updated with each requests/response. frame, err := ga.framer.ReadFrame() if err != nil { - return 0, nil, "", err + return 0, nil, err } streamID := frame.Header().StreamID @@ -99,7 +99,7 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, string, error) { ga.fragmentsByStream.appendFrame(streamID, frame) if !(ga.isStreamEnd(frame)) { - return 0, nil, "", nil + return 0, nil, nil } headers, data := ga.fragmentsByStream.pop(streamID) @@ -137,10 +137,10 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, string, error) { ContentLength: int64(len(dataString)), } } else { - return 0, nil, "", errors.New("Failed to assemble stream: neither a request nor a message") + return 0, nil, errors.New("Failed to assemble stream: neither a request nor a message") } - return streamID, messageHTTP1, dataString, nil + return streamID, messageHTTP1, nil } func (ga *GrpcAssembler) isStreamEnd(frame http2.Frame) bool { diff --git a/api/pkg/tap/har_writer.go b/tap/har_writer.go similarity index 76% rename from api/pkg/tap/har_writer.go rename to tap/har_writer.go index 1b1eb1096..3126ee4c0 100644 --- a/api/pkg/tap/har_writer.go +++ b/tap/har_writer.go @@ -4,6 +4,7 @@ import ( "encoding/json" "errors" "fmt" + "log" "net/http" "os" "path/filepath" @@ -15,7 +16,8 @@ import ( ) const readPermission = 0644 -const tempFilenamePrefix = "har_writer" +const harFilenameSuffix = ".har" +const tempFilenameSuffix = ".har.tmp" type PairChanItem struct { Request *http.Request @@ -23,12 +25,13 @@ type PairChanItem struct { Response *http.Response ResponseTime time.Time RequestSenderIp string + ConnectionInfo *ConnectionInfo } func openNewHarFile(filename string) *HarFile { file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, readPermission) if err != nil { - panic(fmt.Sprintf("Failed to open output file: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to open output file: %s (%v,%+v)", err, err, err) } harFile := HarFile{file: file, entryCount: 0} @@ -45,13 +48,13 @@ type HarFile struct { func NewEntry(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time) (*har.Entry, error) { harRequest, err := har.NewRequest(request, true) if err != nil { - SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)\n", err, err, err) + SilentError("convert-request-to-har", "Failed converting request to HAR %s (%v,%+v)", err, err, err) return nil, errors.New("Failed converting request to HAR") } harResponse, err := har.NewResponse(response, true) if err != nil { - SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)\n", err, err, err) + SilentError("convert-response-to-har", "Failed converting response to HAR %s (%v,%+v)", err, err, err) return nil, errors.New("Failed converting response to HAR") } @@ -62,7 +65,7 @@ func NewEntry(request *http.Request, requestTime time.Time, response *http.Respo status, err := strconv.Atoi(response.Header.Get(":status")) if err != nil { - SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)\n", err, err, err) + SilentError("convert-response-status-for-har", "Failed converting status to int %s (%v,%+v)", err, err, err) return nil, errors.New("Failed converting response status to int for HAR") } harResponse.Status = status @@ -102,7 +105,7 @@ func NewEntry(request *http.Request, requestTime time.Time, response *http.Respo func (f *HarFile) WriteEntry(harEntry *har.Entry) { harEntryJson, err := json.Marshal(harEntry) if err != nil { - SilentError("har-entry-marshal", "Failed converting har entry object to JSON%s (%v,%+v)\n", err, err, err) + SilentError("har-entry-marshal", "Failed converting har entry object to JSON%s (%v,%+v)", err, err, err) return } @@ -116,7 +119,7 @@ func (f *HarFile) WriteEntry(harEntry *har.Entry) { harEntryString := append([]byte(separator), harEntryJson...) if _, err := f.file.Write(harEntryString); err != nil { - panic(fmt.Sprintf("Failed to write to output file: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to write to output file: %s (%v,%+v)", err, err, err) } f.entryCount++ @@ -131,21 +134,21 @@ func (f *HarFile) Close() { err := f.file.Close() if err != nil { - panic(fmt.Sprintf("Failed to close output file: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to close output file: %s (%v,%+v)", err, err, err) } } func (f*HarFile) writeHeader() { header := []byte(`{"log": {"version": "1.2", "creator": {"name": "Mizu", "version": "0.0.1"}, "entries": [`) if _, err := f.file.Write(header); err != nil { - panic(fmt.Sprintf("Failed to write header to output file: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to write header to output file: %s (%v,%+v)", err, err, err) } } func (f*HarFile) writeTrailer() { trailer := []byte("]}}") if _, err := f.file.Write(trailer); err != nil { - panic(fmt.Sprintf("Failed to write trailer to output file: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to write trailer to output file: %s (%v,%+v)", err, err, err) } } @@ -161,8 +164,8 @@ func NewHarWriter(outputDir string, maxEntries int) *HarWriter { } type OutputChannelItem struct { - HarEntry *har.Entry - RequestSenderIp string + HarEntry *har.Entry + ConnectionInfo *ConnectionInfo } type HarWriter struct { @@ -174,20 +177,20 @@ type HarWriter struct { done chan bool } -func (hw *HarWriter) WritePair(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time, requestSenderIp string) { +func (hw *HarWriter) WritePair(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time, connectionInfo *ConnectionInfo) { hw.PairChan <- &PairChanItem{ - Request: request, - RequestTime: requestTime, - Response: response, - ResponseTime: responseTime, - RequestSenderIp: requestSenderIp, + Request: request, + RequestTime: requestTime, + Response: response, + ResponseTime: responseTime, + ConnectionInfo: connectionInfo, } } func (hw *HarWriter) Start() { if hw.OutputDirPath != "" { if err := os.MkdirAll(hw.OutputDirPath, os.ModePerm); err != nil { - panic(fmt.Sprintf("Failed to create output directory: %s (%v,%+v)", err, err, err)) + log.Panicf("Failed to create output directory: %s (%v,%+v)", err, err, err) } } @@ -210,8 +213,8 @@ func (hw *HarWriter) Start() { } } else { hw.OutChan <- &OutputChannelItem{ - HarEntry: harEntry, - RequestSenderIp: pair.RequestSenderIp, + HarEntry: harEntry, + ConnectionInfo: pair.ConnectionInfo, } } } @@ -226,10 +229,11 @@ func (hw *HarWriter) Start() { func (hw *HarWriter) Stop() { close(hw.PairChan) <-hw.done + close(hw.OutChan) } func (hw *HarWriter) openNewFile() { - filename := filepath.Join(os.TempDir(), fmt.Sprintf("%s_%d", tempFilenamePrefix, time.Now().UnixNano())) + filename := buildFilename(hw.OutputDirPath, time.Now(), tempFilenameSuffix) hw.currentFile = openNewHarFile(filename) } @@ -238,15 +242,15 @@ func (hw *HarWriter) closeFile() { tmpFilename := hw.currentFile.file.Name() hw.currentFile = nil - filename := buildFilename(hw.OutputDirPath, time.Now()) + filename := buildFilename(hw.OutputDirPath, time.Now(), harFilenameSuffix) err := os.Rename(tmpFilename, filename) if err != nil { - SilentError("Rename-file", "cannot rename file: %s (%v,%+v)\n", err, err, err) + SilentError("Rename-file", "cannot rename file: %s (%v,%+v)", err, err, err) } } -func buildFilename(dir string, t time.Time) string { +func buildFilename(dir string, t time.Time, suffix string) string { // (epoch time in nanoseconds)__(YYYY_Month_DD__hh-mm-ss).har - filename := fmt.Sprintf("%d__%s.har", t.UnixNano(), t.Format("2006_Jan_02__15-04-05")) + filename := fmt.Sprintf("%d__%s%s", t.UnixNano(), t.Format("2006_Jan_02__15-04-05"), suffix) return filepath.Join(dir, filename) } diff --git a/tap/http_matcher.go b/tap/http_matcher.go new file mode 100644 index 000000000..a0377cd46 --- /dev/null +++ b/tap/http_matcher.go @@ -0,0 +1,138 @@ +package tap + +import ( + "fmt" + "net/http" + "strings" + "time" + + "github.com/orcaman/concurrent-map" +) + +type requestResponsePair struct { + Request httpMessage `json:"request"` + Response httpMessage `json:"response"` +} + +type ConnectionInfo struct { + ClientIP string + ClientPort string + ServerIP string + ServerPort string +} + +type httpMessage struct { + isRequest bool + captureTime time.Time + orig interface{} + connectionInfo ConnectionInfo +} + + +// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port} +type requestResponseMatcher struct { + openMessagesMap cmap.ConcurrentMap + +} + +func createResponseRequestMatcher() requestResponseMatcher { + newMatcher := &requestResponseMatcher{openMessagesMap: cmap.New()} + return *newMatcher +} + +func (matcher *requestResponseMatcher) registerRequest(ident string, request *http.Request, captureTime time.Time) *requestResponsePair { + split := splitIdent(ident) + key := genKey(split) + + connectionInfo := &ConnectionInfo{ + ClientIP: split[0], + ClientPort: split[2], + ServerIP: split[1], + ServerPort: split[3], + } + + requestHTTPMessage := httpMessage{ + isRequest: true, + captureTime: captureTime, + orig: request, + connectionInfo: *connectionInfo, + } + + if response, found := matcher.openMessagesMap.Pop(key); found { + // Type assertion always succeeds because all of the map's values are of httpMessage type + responseHTTPMessage := response.(*httpMessage) + if responseHTTPMessage.isRequest { + SilentError("Request-Duplicate", "Got duplicate request with same identifier") + return nil + } + Debug("Matched open Response for %s", key) + return matcher.preparePair(&requestHTTPMessage, responseHTTPMessage) + } + + matcher.openMessagesMap.Set(key, &requestHTTPMessage) + Debug("Registered open Request for %s", key) + return nil +} + +func (matcher *requestResponseMatcher) registerResponse(ident string, response *http.Response, captureTime time.Time) *requestResponsePair { + split := splitIdent(ident) + key := genKey(split) + + responseHTTPMessage := httpMessage{ + isRequest: false, + captureTime: captureTime, + orig: response, + } + + if request, found := matcher.openMessagesMap.Pop(key); found { + // Type assertion always succeeds because all of the map's values are of httpMessage type + requestHTTPMessage := request.(*httpMessage) + if !requestHTTPMessage.isRequest { + SilentError("Response-Duplicate", "Got duplicate response with same identifier") + return nil + } + Debug("Matched open Request for %s", key) + return matcher.preparePair(requestHTTPMessage, &responseHTTPMessage) + } + + matcher.openMessagesMap.Set(key, &responseHTTPMessage) + Debug("Registered open Response for %s", key) + return nil +} + +func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *httpMessage, responseHTTPMessage *httpMessage) *requestResponsePair { + return &requestResponsePair{ + Request: *requestHTTPMessage, + Response: *responseHTTPMessage, + } +} + +func splitIdent(ident string) []string { + ident = strings.Replace(ident, "->", " ", -1) + return strings.Split(ident, " ") +} + +func genKey(split []string) string { + key := fmt.Sprintf("%s:%s->%s:%s,%s", split[0], split[2], split[1], split[3], split[4]) + return key +} + +func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int { + keysToPop := make([]string, 0) + for item := range matcher.openMessagesMap.IterBuffered() { + // Map only contains values of type httpMessage + message, _ := item.Val.(*httpMessage) + + if message.captureTime.Before(t) { + keysToPop = append(keysToPop, item.Key) + } + } + + numDeleted := len(keysToPop) + + for _, key := range keysToPop { + _, _ = matcher.openMessagesMap.Pop(key) + } + + return numDeleted +} diff --git a/api/pkg/tap/http_reader.go b/tap/http_reader.go similarity index 60% rename from api/pkg/tap/http_reader.go rename to tap/http_reader.go index 21ce9c788..00d20f425 100644 --- a/api/pkg/tap/http_reader.go +++ b/tap/http_reader.go @@ -3,10 +3,7 @@ package tap import ( "bufio" "bytes" - "compress/gzip" - b64 "encoding/base64" "encoding/hex" - "encoding/json" "fmt" "io" "io/ioutil" @@ -73,7 +70,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) { b := bufio.NewReader(h) if isHTTP2, err := checkIsHTTP2Connection(b, h.isClient); err != nil { - SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)\n", h.ident, err, err, err) + SilentError("HTTP/2-Prepare-Connection", "stream %s Failed to check if client is HTTP/2: %s (%v,%+v)", h.ident, err, err, err) // Do something? } else { h.isHTTP2 = isHTTP2 @@ -82,7 +79,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) { if h.isHTTP2 { err := prepareHTTP2Connection(b, h.isClient) if err != nil { - SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)\n", h.ident, err, err, err) + SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)", h.ident, err, err, err) } h.grpcAssembler = createGrpcAssembler(b) } @@ -93,7 +90,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) { if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - SilentError("HTTP/2", "stream %s error: %s (%v,%+v)\n", h.ident, err, err, err) + SilentError("HTTP/2", "stream %s error: %s (%v,%+v)", h.ident, err, err, err) continue } } else if h.isClient { @@ -101,7 +98,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) { if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - SilentError("HTTP-request", "stream %s Request error: %s (%v,%+v)\n", h.ident, err, err, err) + SilentError("HTTP-request", "stream %s Request error: %s (%v,%+v)", h.ident, err, err, err) continue } } else { @@ -109,7 +106,7 @@ func (h *httpReader) run(wg *sync.WaitGroup) { if err == io.EOF || err == io.ErrUnexpectedEOF { break } else if err != nil { - SilentError("HTTP-response", "stream %s Response error: %s (%v,%+v)\n", h.ident, err, err, err) + SilentError("HTTP-response", "stream %s Response error: %s (%v,%+v)", h.ident, err, err, err) continue } } @@ -117,38 +114,34 @@ func (h *httpReader) run(wg *sync.WaitGroup) { } func (h *httpReader) handleHTTP2Stream() error { - streamID, messageHTTP1, body, err := h.grpcAssembler.readMessage() + streamID, messageHTTP1, err := h.grpcAssembler.readMessage() h.messageCount++ if err != nil { return err } - var reqResPair *envoyMessageWrapper + var reqResPair *requestResponsePair switch messageHTTP1 := messageHTTP1.(type) { case http.Request: ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, streamID) - reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime, body, true) + reqResPair = reqResMatcher.registerRequest(ident, &messageHTTP1, h.captureTime) case http.Response: ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, streamID) - reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime, body, true) + reqResPair = reqResMatcher.registerResponse(ident, &messageHTTP1, h.captureTime) } if reqResPair != nil { + statsTracker.incMatchedMessages() + if h.harWriter != nil { h.harWriter.WritePair( - reqResPair.HttpBufferedTrace.Request.orig.(*http.Request), - reqResPair.HttpBufferedTrace.Request.captureTime, - reqResPair.HttpBufferedTrace.Response.orig.(*http.Response), - reqResPair.HttpBufferedTrace.Response.captureTime, - reqResPair.HttpBufferedTrace.Request.requestSenderIp, + reqResPair.Request.orig.(*http.Request), + reqResPair.Request.captureTime, + reqResPair.Response.orig.(*http.Response), + reqResPair.Response.captureTime, + &reqResPair.Request.connectionInfo, ) - } else { - jsonStr, err := json.Marshal(reqResPair) - if err != nil { - return err - } - broadcastReqResPair(jsonStr) } } @@ -165,37 +158,29 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error { req.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind s := len(body) if err != nil { - SilentError("HTTP-request-body", "stream %s Got body err: %s\n", h.ident, err) + SilentError("HTTP-request-body", "stream %s Got body err: %s", h.ident, err) } else if h.hexdump { - Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body)) + Info("Body(%d/0x%x) - %s", len(body), len(body), hex.Dump(body)) } if err := req.Body.Close(); err != nil { - SilentError("HTTP-request-body-close", "stream %s Failed to close request body: %s\n", h.ident, err) + SilentError("HTTP-request-body-close", "stream %s Failed to close request body: %s", h.ident, err) } encoding := req.Header["Content-Encoding"] - bodyStr, err := readBody(body, encoding) - if err != nil { - SilentError("HTTP-request-body-decode", "stream %s Failed to decode body: %s\n", h.ident, err) - } - Info("HTTP/%s Request: %s %s (Body:%d)\n", h.ident, req.Method, req.URL, s) + Info("HTTP/1 Request: %s %s %s (Body:%d) -> %s", h.ident, req.Method, req.URL, s, encoding) ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, h.messageCount) - reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime, bodyStr, false) + reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime) if reqResPair != nil { + statsTracker.incMatchedMessages() + if h.harWriter != nil { h.harWriter.WritePair( - reqResPair.HttpBufferedTrace.Request.orig.(*http.Request), - reqResPair.HttpBufferedTrace.Request.captureTime, - reqResPair.HttpBufferedTrace.Response.orig.(*http.Response), - reqResPair.HttpBufferedTrace.Response.captureTime, - reqResPair.HttpBufferedTrace.Request.requestSenderIp, + reqResPair.Request.orig.(*http.Request), + reqResPair.Request.captureTime, + reqResPair.Response.orig.(*http.Response), + reqResPair.Response.captureTime, + &reqResPair.Request.connectionInfo, ) - } else { - jsonStr, err := json.Marshal(reqResPair) - if err != nil { - SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err) - } - broadcastReqResPair(jsonStr) } } @@ -224,13 +209,13 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error { res.Body = io.NopCloser(bytes.NewBuffer(body)) // rewind s := len(body) if err != nil { - SilentError("HTTP-response-body", "HTTP/%s: failed to get body(parsed len:%d): %s\n", h.ident, s, err) + SilentError("HTTP-response-body", "HTTP/%s: failed to get body(parsed len:%d): %s", h.ident, s, err) } if h.hexdump { - Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body)) + Info("Body(%d/0x%x) - %s", len(body), len(body), hex.Dump(body)) } if err := res.Body.Close(); err != nil { - SilentError("HTTP-response-body-close", "HTTP/%s: failed to close body(parsed len:%d): %s\n", h.ident, s, err) + SilentError("HTTP-response-body-close", "HTTP/%s: failed to close body(parsed len:%d): %s", h.ident, s, err) } sym := "," if res.ContentLength > 0 && res.ContentLength != int64(s) { @@ -241,54 +226,23 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error { contentType = []string{http.DetectContentType(body)} } encoding := res.Header["Content-Encoding"] - Info("HTTP/%s Response: %s URL:%s (%d%s%d%s) -> %s\n", h.ident, res.Status, req, res.ContentLength, sym, s, contentType, encoding) - bodyStr, err := readBody(body, encoding) - if err != nil { - SilentError("HTTP-response-body-decode", "stream %s Failed to decode body: %s\n", h.ident, err) - } + Info("HTTP/1 Response: %s %s URL:%s (%d%s%d%s) -> %s", h.ident, res.Status, req, res.ContentLength, sym, s, contentType, encoding) ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, h.messageCount) - reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime, bodyStr, false) + reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime) if reqResPair != nil { + statsTracker.incMatchedMessages() + if h.harWriter != nil { h.harWriter.WritePair( - reqResPair.HttpBufferedTrace.Request.orig.(*http.Request), - reqResPair.HttpBufferedTrace.Request.captureTime, - reqResPair.HttpBufferedTrace.Response.orig.(*http.Response), - reqResPair.HttpBufferedTrace.Response.captureTime, - reqResPair.HttpBufferedTrace.Request.requestSenderIp, + reqResPair.Request.orig.(*http.Request), + reqResPair.Request.captureTime, + reqResPair.Response.orig.(*http.Response), + reqResPair.Response.captureTime, + &reqResPair.Request.connectionInfo, ) - } else { - jsonStr, err := json.Marshal(reqResPair) - if err != nil { - SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err) - } - broadcastReqResPair(jsonStr) } } return nil } - -func readBody(bodyBytes []byte, encoding []string) (string, error) { - var bodyBuffer io.Reader - bodyBuffer = bytes.NewBuffer(bodyBytes) - var err error - if len(encoding) > 0 && (encoding[0] == "gzip" || encoding[0] == "deflate") { - bodyBuffer, err = gzip.NewReader(bodyBuffer) - if err != nil { - SilentError("HTTP-gunzip", "Failed to gzip decode: %s\n", err) - return "", err - } - } - if _, ok := bodyBuffer.(*gzip.Reader); ok { - err = bodyBuffer.(*gzip.Reader).Close() - if err != nil { - return "", err - } - } - - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(bodyBuffer) - return b64.StdEncoding.EncodeToString(buf.Bytes()), err -} diff --git a/api/pkg/tap/net_utils.go b/tap/net_utils.go similarity index 100% rename from api/pkg/tap/net_utils.go rename to tap/net_utils.go diff --git a/tap/outboundlinks.go b/tap/outboundlinks.go new file mode 100644 index 000000000..0cb60bbd9 --- /dev/null +++ b/tap/outboundlinks.go @@ -0,0 +1,29 @@ +package tap + +type OutboundLink struct { + Src string + DstIP string + DstPort int +} + +func NewOutboundLinkWriter() *OutboundLinkWriter { + return &OutboundLinkWriter{ + OutChan: make(chan *OutboundLink), + } +} + +type OutboundLinkWriter struct { + OutChan chan *OutboundLink +} + +func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int) { + olw.OutChan <- &OutboundLink{ + Src: src, + DstIP: DstIP, + DstPort: DstPort, + } +} + +func (olw *OutboundLinkWriter) Stop() { + close(olw.OutChan) +} diff --git a/api/pkg/tap/passive_tapper.go b/tap/passive_tapper.go similarity index 70% rename from api/pkg/tap/passive_tapper.go rename to tap/passive_tapper.go index b5cacc9e8..3b53e524a 100644 --- a/api/pkg/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -10,7 +10,6 @@ package tap import ( "encoding/hex" - "encoding/json" "flag" "fmt" "log" @@ -32,13 +31,10 @@ import ( ) const AppPortsEnvVar = "APP_PORTS" -const OutPortEnvVar = "WEB_SOCKET_PORT" const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT" -const hostModeEnvVar = "HOST_MODE" // default is 1MB, more than the max size accepted by collector and traffic-dumper const maxHTTP2DataLenDefault = 1 * 1024 * 1024 const cleanPeriod = time.Second * 10 -const outboundThrottleCacheExpiryPeriod = time.Minute * 15 var remoteOnlyOutboundPorts = []int { 80, 443 } func parseAppPorts(appPortsList string) []int { @@ -46,7 +42,7 @@ func parseAppPorts(appPortsList string) []int { for _, portStr := range strings.Split(appPortsList, ",") { parsedInt, parseError := strconv.Atoi(portStr) if parseError != nil { - fmt.Println("Provided app port ", portStr, " is not a valid number!") + log.Printf("Provided app port %v is not a valid number!", portStr) } else { ports = append(ports, parsedInt) } @@ -54,13 +50,6 @@ func parseAppPorts(appPortsList string) []int { return ports } -func parseHostAppAddresses(hostAppAddressesString string) []string { - if len(hostAppAddressesString) == 0 { - return []string{} - } - return strings.Split(hostAppAddressesString, ",") -} - var maxcount = flag.Int("c", -1, "Only grab this many packets, then exit") var decoder = flag.String("decoder", "", "Name of the decoder to use (default: guess from capture)") var statsevery = flag.Int("stats", 60, "Output statistics every N seconds") @@ -90,7 +79,6 @@ var tstype = flag.String("timestamp_type", "", "Type of timestamps to use") var promisc = flag.Bool("promisc", true, "Set promiscuous mode") var anydirection = flag.Bool("anydirection", false, "Capture http requests to other hosts") var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to keep connections which don't transmit data") -var hostAppAddressesString = flag.String("targets", "", "Comma separated list of ip:ports to tap") var memprofile = flag.String("memprofile", "", "Write memory profile") @@ -121,24 +109,20 @@ var stats struct { overlapPackets int } -type CollectorMessage struct { - MessageType string - Ports *[]int `json:"ports,omitempty"` - Addresses *[]string `json:"addresses,omitempty"` +type TapOpts struct { + HostMode bool } var outputLevel int var errorsMap map[string]uint var errorsMapMutex sync.Mutex var nErrors uint -var appPorts []int // global -var ownIps []string //global -var hostMode bool //global -var HostAppAddresses []string //global +var ownIps []string // global +var hostMode bool // global /* minOutputLevel: Error will be printed only if outputLevel is above this value * t: key for errorsMap (counting errors) - * s, a: arguments fmt.Printf + * s, a: arguments log.Printf * Note: Too bad for perf that a... is evaluated */ func logError(minOutputLevel int, t string, s string, a ...interface{}) { @@ -149,7 +133,7 @@ func logError(minOutputLevel int, t string, s string, a ...interface{}) { errorsMapMutex.Unlock() if outputLevel >= minOutputLevel { formatStr := fmt.Sprintf("%s: %s", t, s) - fmt.Printf(formatStr, a...) + log.Printf(formatStr, a...) } } func Error(t string, s string, a ...interface{}) { @@ -160,12 +144,12 @@ func SilentError(t string, s string, a ...interface{}) { } func Info(s string, a ...interface{}) { if outputLevel >= 1 { - fmt.Printf(s, a...) + log.Printf(s, a...) } } func Debug(s string, a ...interface{}) { if outputLevel >= 2 { - fmt.Printf(s, a...) + log.Printf(s, a...) } } @@ -187,9 +171,8 @@ func inArrayString(arr []string, valueToCheck string) bool { return false } -/* - * The assembler context - */ +// Context +// The assembler context type Context struct { CaptureInfo gopacket.CaptureInfo } @@ -198,22 +181,27 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo { return c.CaptureInfo } -func StartPassiveTapper() <-chan *OutputChannelItem { +func StartPassiveTapper(opts *TapOpts) (<-chan *OutputChannelItem, <-chan *OutboundLink) { + hostMode = opts.HostMode + var harWriter *HarWriter if *dumpToHar { harWriter = NewHarWriter(*HarOutputDir, *harEntriesPerFile) } + outboundLinkWriter := NewOutboundLinkWriter() - go startPassiveTapper(harWriter) + go startPassiveTapper(harWriter, outboundLinkWriter) if harWriter != nil { - return harWriter.OutChan + return harWriter.OutChan, outboundLinkWriter.OutChan } - return nil + return nil, outboundLinkWriter.OutChan } -func startPassiveTapper(harWriter *HarWriter) { +func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWriter) { + log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile) + defer util.Run()() if *debug { outputLevel = 2 @@ -226,68 +214,43 @@ func startPassiveTapper(harWriter *HarWriter) { if localhostIPs, err := getLocalhostIPs(); err != nil { // TODO: think this over - fmt.Println("Failed to get self IP addresses") - Error("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)\n", err, err, err) + log.Println("Failed to get self IP addresses") + Error("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err) ownIps = make([]string, 0) } else { ownIps = localhostIPs } appPortsStr := os.Getenv(AppPortsEnvVar) + var appPorts []int if appPortsStr == "" { - fmt.Println("Received empty/no APP_PORTS env var! only listening to http on port 80!") + log.Println("Received empty/no APP_PORTS env var! only listening to http on port 80!") appPorts = make([]int, 0) } else { appPorts = parseAppPorts(appPortsStr) } - tapOutputPort := os.Getenv(OutPortEnvVar) - if tapOutputPort == "" { - fmt.Println("Received empty/no WEB_SOCKET_PORT env var! falling back to port 8080") - tapOutputPort = "8080" - } + SetFilterPorts(appPorts) envVal := os.Getenv(maxHTTP2DataLenEnvVar) if envVal == "" { - fmt.Println("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault) + log.Println("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault) maxHTTP2DataLen = maxHTTP2DataLenDefault } else { if convertedInt, err := strconv.Atoi(envVal); err != nil { - fmt.Println("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault) + log.Println("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to", maxHTTP2DataLenDefault) maxHTTP2DataLen = maxHTTP2DataLenDefault } else { - fmt.Println("Received HTTP2_DATA_SIZE_LIMIT env var:", maxHTTP2DataLenDefault) + log.Println("Received HTTP2_DATA_SIZE_LIMIT env var:", maxHTTP2DataLenDefault) maxHTTP2DataLen = convertedInt } } - hostMode = os.Getenv(hostModeEnvVar) == "1" - fmt.Printf("App Ports: %v\n", appPorts) - fmt.Printf("Tap output websocket port: %s\n", tapOutputPort) - - var onCollectorMessage = func(message []byte) { - var parsedMessage CollectorMessage - err := json.Unmarshal(message, &parsedMessage) - if err == nil { - - if parsedMessage.MessageType == "setPorts" { - Debug("Got message from collector. Type: %s, Ports: %v\n", parsedMessage.MessageType, parsedMessage.Ports) - appPorts = *parsedMessage.Ports - } else if parsedMessage.MessageType == "setAddresses" { - Debug("Got message from collector. Type: %s, IPs: %v\n", parsedMessage.MessageType, parsedMessage.Addresses) - HostAppAddresses = *parsedMessage.Addresses - Info("Filtering for the following addresses: %s\n", HostAppAddresses) - } - } else { - Error("Collector-Message-Parsing", "Error parsing message from collector: %s (%v,%+v)\n", err, err, err) - } - } - - go startOutputServer(tapOutputPort, onCollectorMessage) + log.Printf("App Ports: %v", gSettings.filterPorts) var handle *pcap.Handle var err error if *fname != "" { if handle, err = pcap.OpenOffline(*fname); err != nil { - log.Fatal("PCAP OpenOffline error:", err) + log.Fatalf("PCAP OpenOffline error: %v", err) } } else { // This is a little complicated because we want to allow all possible options @@ -313,15 +276,15 @@ func startPassiveTapper(harWriter *HarWriter) { } } if handle, err = inactive.Activate(); err != nil { - log.Fatal("PCAP Activate error:", err) + log.Fatalf("PCAP Activate error: %v", err) } defer handle.Close() } if len(flag.Args()) > 0 { bpffilter := strings.Join(flag.Args(), " ") - Info("Using BPF filter %q\n", bpffilter) + Info("Using BPF filter %q", bpffilter) if err = handle.SetBPFFilter(bpffilter); err != nil { - log.Fatal("BPF filter error:", err) + log.Fatalf("BPF filter error: %v", err) } } @@ -329,6 +292,7 @@ func startPassiveTapper(harWriter *HarWriter) { harWriter.Start() defer harWriter.Stop() } + defer outboundLinkWriter.Stop() var dec gopacket.Decoder var ok bool @@ -342,13 +306,18 @@ func startPassiveTapper(harWriter *HarWriter) { source := gopacket.NewPacketSource(handle, dec) source.Lazy = *lazy source.NoCopy = true - Info("Starting to read packets\n") + Info("Starting to read packets") count := 0 bytes := int64(0) start := time.Now() defragger := ip4defrag.NewIPv4Defragmenter() - streamFactory := &tcpStreamFactory{doHTTP: !*nohttp, harWriter: harWriter} + streamFactory := &tcpStreamFactory{ + doHTTP: !*nohttp, + harWriter: harWriter, + outbountLinkWriter: outboundLinkWriter, + + } streamPool := reassembly.NewStreamPool(streamFactory) assembler := reassembly.NewAssembler(streamPool) var assemblerMutex sync.Mutex @@ -378,7 +347,7 @@ func startPassiveTapper(harWriter *HarWriter) { errorMapLen := len(errorsMap) errorsSummery := fmt.Sprintf("%v", errorsMap) errorsMapMutex.Unlock() - fmt.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)\nErrors Summary: %s\n", + log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v) - Errors Summary: %s", count, bytes, time.Since(start), @@ -390,8 +359,8 @@ func startPassiveTapper(harWriter *HarWriter) { // At this moment memStats := runtime.MemStats{} runtime.ReadMemStats(&memStats) - fmt.Printf( - "mem: %d, goroutines: %d, unmatched messages: %d\n", + log.Printf( + "mem: %d, goroutines: %d, unmatched messages: %d", memStats.HeapAlloc, runtime.NumGoroutine(), reqResMatcher.openMessagesMap.Count(), @@ -400,8 +369,8 @@ func startPassiveTapper(harWriter *HarWriter) { // Since the last print cleanStats := cleaner.dumpStats() appStats := statsTracker.dumpStats() - fmt.Printf( - "flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d\n", + log.Printf( + "flushed connections %d, closed connections: %d, deleted messages: %d, matched messages: %d", cleanStats.flushed, cleanStats.closed, cleanStats.deleted, @@ -412,11 +381,11 @@ func startPassiveTapper(harWriter *HarWriter) { for packet := range source.Packets() { count++ - Debug("PACKET #%d\n", count) + Debug("PACKET #%d", count) data := packet.Data() bytes += int64(len(data)) if *hexdumppkt { - Debug("Packet content (%d/0x%x)\n%s\n", len(data), len(data), hex.Dump(data)) + Debug("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data)) } // defrag the IPv4 packet if required @@ -431,18 +400,18 @@ func startPassiveTapper(harWriter *HarWriter) { if err != nil { log.Fatalln("Error while de-fragmenting", err) } else if newip4 == nil { - Debug("Fragment...\n") + Debug("Fragment...") continue // packet fragment, we don't have whole packet yet. } if newip4.Length != l { stats.ipdefrag++ - Debug("Decoding re-assembled packet: %s\n", newip4.NextLayerType()) + Debug("Decoding re-assembled packet: %s", newip4.NextLayerType()) pb, ok := packet.(gopacket.PacketBuilder) if !ok { - panic("Not a PacketBuilder") + log.Panic("Not a PacketBuilder") } nextDecoder := newip4.NextLayerType() - nextDecoder.Decode(newip4.Payload, pb) + _ = nextDecoder.Decode(newip4.Payload, pb) } } @@ -459,7 +428,7 @@ func startPassiveTapper(harWriter *HarWriter) { CaptureInfo: packet.Metadata().CaptureInfo, } stats.totalsz += len(tcp.Payload) - //fmt.Println(packet.NetworkLayer().NetworkFlow().Src(), ":", tcp.SrcPort, " -> ", packet.NetworkLayer().NetworkFlow().Dst(), ":", tcp.DstPort) + // log.Println(packet.NetworkLayer().NetworkFlow().Src(), ":", tcp.SrcPort, " -> ", packet.NetworkLayer().NetworkFlow().Dst(), ":", tcp.DstPort) assemblerMutex.Lock() assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c) assemblerMutex.Unlock() @@ -470,11 +439,11 @@ func startPassiveTapper(harWriter *HarWriter) { errorsMapMutex.Lock() errorMapLen := len(errorsMap) errorsMapMutex.Unlock() - fmt.Fprintf(os.Stderr, "Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)\n", count, bytes, time.Since(start), nErrors, errorMapLen) + log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", count, bytes, time.Since(start), nErrors, errorMapLen) } select { case <-signalChan: - fmt.Fprintf(os.Stderr, "\nCaught SIGINT: aborting\n") + log.Printf("Caught SIGINT: aborting") done = true default: // NOP: continue @@ -497,34 +466,34 @@ func startPassiveTapper(harWriter *HarWriter) { if err != nil { log.Fatal(err) } - pprof.WriteHeapProfile(f) - f.Close() + _ = pprof.WriteHeapProfile(f) + _ = f.Close() } streamFactory.WaitGoRoutines() assemblerMutex.Lock() - Debug("%s\n", assembler.Dump()) + Debug("%s", assembler.Dump()) assemblerMutex.Unlock() if !*nodefrag { - fmt.Printf("IPdefrag:\t\t%d\n", stats.ipdefrag) + log.Printf("IPdefrag:\t\t%d", stats.ipdefrag) } - fmt.Printf("TCP stats:\n") - fmt.Printf(" missed bytes:\t\t%d\n", stats.missedBytes) - fmt.Printf(" total packets:\t\t%d\n", stats.pkt) - fmt.Printf(" rejected FSM:\t\t%d\n", stats.rejectFsm) - fmt.Printf(" rejected Options:\t%d\n", stats.rejectOpt) - fmt.Printf(" reassembled bytes:\t%d\n", stats.sz) - fmt.Printf(" total TCP bytes:\t%d\n", stats.totalsz) - fmt.Printf(" conn rejected FSM:\t%d\n", stats.rejectConnFsm) - fmt.Printf(" reassembled chunks:\t%d\n", stats.reassembled) - fmt.Printf(" out-of-order packets:\t%d\n", stats.outOfOrderPackets) - fmt.Printf(" out-of-order bytes:\t%d\n", stats.outOfOrderBytes) - fmt.Printf(" biggest-chunk packets:\t%d\n", stats.biggestChunkPackets) - fmt.Printf(" biggest-chunk bytes:\t%d\n", stats.biggestChunkBytes) - fmt.Printf(" overlap packets:\t%d\n", stats.overlapPackets) - fmt.Printf(" overlap bytes:\t\t%d\n", stats.overlapBytes) - fmt.Printf("Errors: %d\n", nErrors) + log.Printf("TCP stats:") + log.Printf(" missed bytes:\t\t%d", stats.missedBytes) + log.Printf(" total packets:\t\t%d", stats.pkt) + log.Printf(" rejected FSM:\t\t%d", stats.rejectFsm) + log.Printf(" rejected Options:\t%d", stats.rejectOpt) + log.Printf(" reassembled bytes:\t%d", stats.sz) + log.Printf(" total TCP bytes:\t%d", stats.totalsz) + log.Printf(" conn rejected FSM:\t%d", stats.rejectConnFsm) + log.Printf(" reassembled chunks:\t%d", stats.reassembled) + log.Printf(" out-of-order packets:\t%d", stats.outOfOrderPackets) + log.Printf(" out-of-order bytes:\t%d", stats.outOfOrderBytes) + log.Printf(" biggest-chunk packets:\t%d", stats.biggestChunkPackets) + log.Printf(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes) + log.Printf(" overlap packets:\t%d", stats.overlapPackets) + log.Printf(" overlap bytes:\t\t%d", stats.overlapBytes) + log.Printf("Errors: %d", nErrors) for e := range errorsMap { - fmt.Printf(" %s:\t\t%d\n", e, errorsMap[e]) + log.Printf(" %s:\t\t%d", e, errorsMap[e]) } } diff --git a/tap/settings.go b/tap/settings.go new file mode 100644 index 000000000..cf89dd345 --- /dev/null +++ b/tap/settings.go @@ -0,0 +1,31 @@ +package tap + +type globalSettings struct { + filterPorts []int + filterAuthorities []string +} + +var gSettings = &globalSettings{ + filterPorts: []int{}, + filterAuthorities: []string{}, +} + +func SetFilterPorts(ports []int) { + gSettings.filterPorts = ports +} + +func GetFilterPorts() []int { + ports := make([]int, len(gSettings.filterPorts)) + copy(ports, gSettings.filterPorts) + return ports +} + +func SetFilterAuthorities(ipAddresses []string) { + gSettings.filterAuthorities = ipAddresses +} + +func GetFilterIPs() []string { + addresses := make([]string, len(gSettings.filterAuthorities)) + copy(addresses, gSettings.filterAuthorities) + return addresses +} diff --git a/api/pkg/tap/stats_tracker.go b/tap/stats_tracker.go similarity index 100% rename from api/pkg/tap/stats_tracker.go rename to tap/stats_tracker.go diff --git a/api/pkg/tap/tcp_stream.go b/tap/tcp_stream.go similarity index 83% rename from api/pkg/tap/tcp_stream.go rename to tap/tcp_stream.go index 7e96e9ed5..db5fb59ee 100644 --- a/api/pkg/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -34,7 +34,7 @@ type tcpStream struct { func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool { // FSM if !t.tcpstate.CheckState(tcp, dir) { - //SilentError("FSM", "%s: Packet rejected by FSM (state:%s)\n", t.ident, t.tcpstate.String()) + SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String()) stats.rejectFsm++ if !t.fsmerr { t.fsmerr = true @@ -47,7 +47,7 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem // Options err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start) if err != nil { - //SilentError("OptionChecker", "%s: Packet rejected by OptionChecker: %s\n", t.ident, err) + SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err) stats.rejectOpt++ if !*nooptcheck { return false @@ -58,10 +58,10 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem if *checksum { c, err := tcp.ComputeChecksum() if err != nil { - SilentError("ChecksumCompute", "%s: Got error computing checksum: %s\n", t.ident, err) + SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err) accept = false } else if c != 0x0 { - SilentError("Checksum", "%s: Invalid checksum: 0x%x\n", t.ident, c) + SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c) accept = false } } @@ -95,7 +95,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 { // In the original example this was handled with panic(). // I don't know what this error means or how to handle it properly. - SilentError("Invalid-Overlap", "bytes:%d, pkts:%d\n", sgStats.OverlapBytes, sgStats.OverlapPackets) + SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets) } stats.overlapBytes += sgStats.OverlapBytes stats.overlapPackets += sgStats.OverlapPackets @@ -106,7 +106,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } else { ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir) } - Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)\n", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets) + Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets) if skip == -1 && *allowmissinginit { // this is allowed } else if skip != 0 { @@ -125,18 +125,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } dnsSize := binary.BigEndian.Uint16(data[:2]) missing := int(dnsSize) - len(data[2:]) - Debug("dnsSize: %d, missing: %d\n", dnsSize, missing) + Debug("dnsSize: %d, missing: %d", dnsSize, missing) if missing > 0 { - Info("Missing some bytes: %d\n", missing) + Info("Missing some bytes: %d", missing) sg.KeepFrom(0) return } p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns) err := p.DecodeLayers(data[2:], &decoded) if err != nil { - SilentError("DNS-parser", "Failed to decode DNS: %v\n", err) + SilentError("DNS-parser", "Failed to decode DNS: %v", err) } else { - Debug("DNS: %s\n", gopacket.LayerDump(dns)) + Debug("DNS: %s", gopacket.LayerDump(dns)) } if len(data) > 2+int(dnsSize) { sg.KeepFrom(2 + int(dnsSize)) @@ -144,7 +144,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } else if t.isHTTP { if length > 0 { if *hexdump { - Debug("Feeding http with:\n%s", hex.Dump(data)) + Debug("Feeding http with:%s", hex.Dump(data)) } // This is where we pass the reassembled information onwards // This channel is read by an httpReader object @@ -158,7 +158,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { - Debug("%s: Connection closed\n", t.ident) + Debug("%s: Connection closed", t.ident) if t.isHTTP { close(t.client.msgQueue) close(t.server.msgQueue) diff --git a/api/pkg/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go similarity index 82% rename from api/pkg/tap/tcp_stream_factory.go rename to tap/tcp_stream_factory.go index 23bda51bd..6808cc13e 100644 --- a/api/pkg/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -15,22 +15,23 @@ import ( * Generates a new tcp stream for each new tcp connection. Closes the stream when the connection closes. */ type tcpStreamFactory struct { - wg sync.WaitGroup - doHTTP bool - harWriter *HarWriter + wg sync.WaitGroup + doHTTP bool + harWriter *HarWriter + outbountLinkWriter *OutboundLinkWriter } func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream { - Debug("* NEW: %s %s\n", net, transport) + Debug("* NEW: %s %s", net, transport) fsmOptions := reassembly.TCPSimpleFSMOptions{ SupportMissingEstablishment: *allowmissinginit, } - Debug("Current App Ports: %v\n", appPorts) + Debug("Current App Ports: %v", gSettings.filterPorts) dstIp := net.Dst().String() dstPort := int(tcp.DstPort) if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) { - broadcastOutboundLink(net.Src().String(), dstIp, dstPort) + factory.outbountLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort) } isHTTP := factory.shouldTap(dstIp, dstPort) stream := &tcpStream{ @@ -85,14 +86,14 @@ func (factory *tcpStreamFactory) WaitGoRoutines() { func (factory *tcpStreamFactory) shouldTap(dstIP string, dstPort int) bool { if hostMode { - if inArrayString(HostAppAddresses, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true { + if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%d", dstIP, dstPort)) == true { return true - } else if inArrayString(HostAppAddresses, dstIP) == true { + } else if inArrayString(gSettings.filterAuthorities, dstIP) == true { return true } return false } else { - isTappedPort := dstPort == 80 || (appPorts != nil && (inArrayInt(appPorts, dstPort))) + isTappedPort := dstPort == 80 || (gSettings.filterPorts != nil && (inArrayInt(gSettings.filterPorts, dstPort))) if !isTappedPort { return false } diff --git a/ui/src/components/StatusBar.tsx b/ui/src/components/StatusBar.tsx index ee09369c5..0fb25b1e4 100644 --- a/ui/src/components/StatusBar.tsx +++ b/ui/src/components/StatusBar.tsx @@ -1,5 +1,5 @@ import './style/StatusBar.sass'; -import React from "react"; +import React, {useState} from "react"; export interface TappingStatusPod { name: string; @@ -15,14 +15,31 @@ export interface Props { } const pluralize = (noun: string, amount: number) => { - return `${noun}${amount != 1 ? 's' : ''}` + return `${noun}${amount !== 1 ? 's' : ''}` } export const StatusBar: React.FC = ({tappingStatus}) => { + + const [expandedBar, setExpandedBar] = useState(false); + const uniqueNamespaces = Array.from(new Set(tappingStatus.pods.map(pod => pod.namespace))); const amountOfPods = tappingStatus.pods.length; - return
- {`Tapping ${amountOfPods} ${pluralize('pod', amountOfPods)} in ${pluralize('namespace', uniqueNamespaces.length)} ${uniqueNamespaces.join(", ")}`} + return
setExpandedBar(true)} onMouseLeave={() => setExpandedBar(false)}> +
{`Tapping ${amountOfPods} ${pluralize('pod', amountOfPods)} in ${pluralize('namespace', uniqueNamespaces.length)} ${uniqueNamespaces.join(", ")}`}
+ {expandedBar &&
+ + + + + + + {tappingStatus.pods.map(pod => + + + )} + +
Pod nameNamespace
{pod.name}{pod.namespace}
+
}
; } diff --git a/ui/src/components/style/StatusBar.sass b/ui/src/components/style/StatusBar.sass index 470166b25..671975e8f 100644 --- a/ui/src/components/style/StatusBar.sass +++ b/ui/src/components/style/StatusBar.sass @@ -1,20 +1,35 @@ @import 'variables.module.scss' -.StatusBar +.statusBar position: absolute transform: translate(-50%, -3px) left: 50% z-index: 9999 min-width: 200px - height: 32px background: $blue-color - color: $light-blue-color + color: rgba(255,255,255,0.75) border-bottom-left-radius: 8px border-bottom-right-radius: 8px top: 0 - display: flex - align-items: center padding: 2px 10px - user-select: none font-size: 14px - opacity: 0.8 + transition: max-height 2s ease-out + width: auto + max-height: 32px + overflow: hidden + + .podsCount + display: flex + justify-content: center + padding: 8px + font-weight: 600 + + th + text-align: left + td + padding-right: 15px + padding-top: 5px + +.expandedStatusBar + max-height: 100vh + padding-bottom: 15px \ No newline at end of file