mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-23 14:58:44 +00:00
Merge branch 'develop'
Conflicts: api/pkg/controllers/entries_controller.go api/pkg/inserter/main.go api/pkg/models/models.go cli/Makefile
This commit is contained in:
commit
d1921d61c9
19
.github/workflows/publish-cli.yml
vendored
Normal file
19
.github/workflows/publish-cli.yml
vendored
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
name: public-cli
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches:
|
||||||
|
- 'develop'
|
||||||
|
- 'main'
|
||||||
|
jobs:
|
||||||
|
docker:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
steps:
|
||||||
|
- name: Checkout
|
||||||
|
uses: actions/checkout@v2
|
||||||
|
- name: Set up Cloud SDK
|
||||||
|
uses: google-github-actions/setup-gcloud@master
|
||||||
|
with:
|
||||||
|
service_account_key: ${{ secrets.GCR_JSON_KEY }}
|
||||||
|
export_default_credentials: true
|
||||||
|
- name: Build and Push CLI
|
||||||
|
run: make push-cli
|
@ -1,4 +1,4 @@
|
|||||||
name: ci
|
name: publish-docker
|
||||||
on:
|
on:
|
||||||
push:
|
push:
|
||||||
branches:
|
branches:
|
||||||
@ -23,8 +23,8 @@ jobs:
|
|||||||
type=sha
|
type=sha
|
||||||
type=raw,${{ github.sha }}
|
type=raw,${{ github.sha }}
|
||||||
type=raw,latest
|
type=raw,latest
|
||||||
- name: Login to DockerHub
|
- name: Login to DockerHub
|
||||||
uses: docker/login-action@v1
|
uses: docker/login-action@v1
|
||||||
with:
|
with:
|
||||||
registry: gcr.io
|
registry: gcr.io
|
||||||
username: _json_key
|
username: _json_key
|
||||||
@ -35,4 +35,5 @@ jobs:
|
|||||||
context: .
|
context: .
|
||||||
push: true
|
push: true
|
||||||
tags: ${{ steps.meta.outputs.tags }}
|
tags: ${{ steps.meta.outputs.tags }}
|
||||||
labels: ${{ steps.meta.outputs.labels }}
|
labels: ${{ steps.meta.outputs.labels }}
|
||||||
|
|
30
Makefile
30
Makefile
@ -17,30 +17,43 @@ help: ## This help.
|
|||||||
|
|
||||||
# Variables and lists
|
# Variables and lists
|
||||||
TS_SUFFIX="$(shell date '+%s')"
|
TS_SUFFIX="$(shell date '+%s')"
|
||||||
|
GIT_BRANCH="$(shell git branch | grep \* | cut -d ' ' -f2 | tr '[:upper:]' '[:lower:]' | tr '/' '_')"
|
||||||
|
BUCKET_PATH=static.up9.io/mizu/$(GIT_BRANCH)
|
||||||
|
|
||||||
ui: ## build UI
|
ui: ## build UI
|
||||||
@(cd ui; npm i ; npm run build; )
|
@(cd ui; npm i ; npm run build; )
|
||||||
@ls -l ui/build
|
@ls -l ui/build
|
||||||
|
|
||||||
cli: # build CLI
|
cli: # build CLI
|
||||||
@(cd cli; echo "building cli" )
|
@echo "building cli"; cd cli && $(MAKE) build
|
||||||
|
|
||||||
api: ## build API server
|
api: ## build API server
|
||||||
@(echo "building API server .." )
|
@(echo "building API server .." )
|
||||||
@(cd api; go build -o build/apiserver main.go)
|
@(cd api; go build -o build/apiserver main.go)
|
||||||
@ls -l api/build
|
@ls -l api/build
|
||||||
|
|
||||||
tap: ## build tap binary
|
#tap: ## build tap binary
|
||||||
@(cd tap; go build -o build/tap ./src)
|
# @(cd tap; go build -o build/tap ./src)
|
||||||
@ls -l tap/build
|
# @ls -l tap/build
|
||||||
|
|
||||||
docker: ## build Docker image
|
docker: ## build Docker image
|
||||||
@(echo "building docker image" )
|
@(echo "building docker image" )
|
||||||
|
docker build -t up9inc/mizu:latest .
|
||||||
|
#./build-push-featurebranch.sh
|
||||||
|
|
||||||
|
push: push-docker push-cli ## build and publish Mizu docker image & CLI
|
||||||
|
|
||||||
|
push-docker:
|
||||||
|
@echo "publishing Docker image .. "
|
||||||
./build-push-featurebranch.sh
|
./build-push-featurebranch.sh
|
||||||
|
|
||||||
publish: ## build and publish Mizu docker image & CLI
|
push-cli:
|
||||||
@echo "publishing Docker image .. "
|
|
||||||
@echo "publishing CLI .. "
|
@echo "publishing CLI .. "
|
||||||
|
@cd cli; $(MAKE) build-all
|
||||||
|
@echo "publishing file ${OUTPUT_FILE} .."
|
||||||
|
#gsutil mv gs://${BUCKET_PATH}/${OUTPUT_FILE} gs://${BUCKET_PATH}/${OUTPUT_FILE}.${SUFFIX}
|
||||||
|
gsutil cp -r ./cli/bin/* gs://${BUCKET_PATH}/
|
||||||
|
gsutil setmeta -r -h "Cache-Control:public, max-age=30" gs://${BUCKET_PATH}/\*
|
||||||
|
|
||||||
|
|
||||||
clean: clean-ui clean-api clean-cli clean-docker ## Clean all build artifacts
|
clean: clean-ui clean-api clean-cli clean-docker ## Clean all build artifacts
|
||||||
@ -52,10 +65,7 @@ clean-api:
|
|||||||
@(rm -rf api/build ; echo "api cleanup done" )
|
@(rm -rf api/build ; echo "api cleanup done" )
|
||||||
|
|
||||||
clean-cli:
|
clean-cli:
|
||||||
@(echo "CLI cleanup - NOT IMPLEMENTED YET " )
|
@(cd cli; make clean ; echo "CLI cleanup done" )
|
||||||
|
|
||||||
clean-tap:
|
|
||||||
@(cd tap; rm -rf build ; echo "tap cleanup done")
|
|
||||||
|
|
||||||
clean-docker:
|
clean-docker:
|
||||||
@(echo "DOCKER cleanup - NOT IMPLEMENTED YET " )
|
@(echo "DOCKER cleanup - NOT IMPLEMENTED YET " )
|
||||||
|
@ -2,7 +2,6 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"flag"
|
"flag"
|
||||||
|
|
||||||
"github.com/gofiber/fiber/v2"
|
"github.com/gofiber/fiber/v2"
|
||||||
"mizuserver/pkg/inserter"
|
"mizuserver/pkg/inserter"
|
||||||
"mizuserver/pkg/middleware"
|
"mizuserver/pkg/middleware"
|
||||||
@ -19,8 +18,7 @@ func main() {
|
|||||||
app := fiber.New()
|
app := fiber.New()
|
||||||
|
|
||||||
// process to read files / channel and insert to DB
|
// process to read files / channel and insert to DB
|
||||||
go inserter.StartReadingFiles(harOutputChannel, tap.HarOutputDir)
|
go inserter.StartReadingEntries(harOutputChannel, tap.HarOutputDir)
|
||||||
|
|
||||||
|
|
||||||
middleware.FiberMiddleware(app) // Register Fiber's middleware for app.
|
middleware.FiberMiddleware(app) // Register Fiber's middleware for app.
|
||||||
app.Static("/", "./site")
|
app.Static("/", "./site")
|
||||||
|
@ -29,7 +29,6 @@ var (
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
func GetEntries(c *fiber.Ctx) error {
|
func GetEntries(c *fiber.Ctx) error {
|
||||||
entriesFilter := &models.EntriesFilter{}
|
entriesFilter := &models.EntriesFilter{}
|
||||||
|
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"mizuserver/pkg/database"
|
"mizuserver/pkg/database"
|
||||||
"mizuserver/pkg/models"
|
"mizuserver/pkg/models"
|
||||||
"mizuserver/pkg/resolver"
|
"mizuserver/pkg/resolver"
|
||||||
|
mizuserver/pkg/tap"
|
||||||
"mizuserver/pkg/utils"
|
"mizuserver/pkg/utils"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
@ -42,7 +43,7 @@ func init() {
|
|||||||
k8sResolver = res
|
k8sResolver = res
|
||||||
}
|
}
|
||||||
|
|
||||||
func StartReadingFiles(harChannel chan *har.Entry, workingDir *string) {
|
func StartReadingEntries(harChannel chan *tap.OutputChannelItem, workingDir *string) {
|
||||||
if workingDir != nil && *workingDir != "" {
|
if workingDir != nil && *workingDir != "" {
|
||||||
startReadingFiles(*workingDir)
|
startReadingFiles(*workingDir)
|
||||||
} else {
|
} else {
|
||||||
@ -59,7 +60,7 @@ func startReadingFiles(workingDir string) {
|
|||||||
dirFiles, _ := dir.Readdir(-1)
|
dirFiles, _ := dir.Readdir(-1)
|
||||||
sort.Sort(utils.ByModTime(dirFiles))
|
sort.Sort(utils.ByModTime(dirFiles))
|
||||||
|
|
||||||
if len(dirFiles) == 0{
|
if len(dirFiles) == 0 {
|
||||||
fmt.Printf("Waiting for new files\n")
|
fmt.Printf("Waiting for new files\n")
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(3 * time.Second)
|
||||||
continue
|
continue
|
||||||
@ -75,20 +76,20 @@ func startReadingFiles(workingDir string) {
|
|||||||
|
|
||||||
for _, entry := range inputHar.Log.Entries {
|
for _, entry := range inputHar.Log.Entries {
|
||||||
time.Sleep(time.Millisecond * 250)
|
time.Sleep(time.Millisecond * 250)
|
||||||
saveHarToDb(*entry, "")
|
saveHarToDb(entry, fileInfo.Name())
|
||||||
}
|
}
|
||||||
rmErr := os.Remove(inputFilePath)
|
rmErr := os.Remove(inputFilePath)
|
||||||
utils.CheckErr(rmErr)
|
utils.CheckErr(rmErr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func startReadingChannel(harChannel chan *har.Entry) {
|
func startReadingChannel(outputItems chan *tap.OutputChannelItem) {
|
||||||
for entry := range harChannel {
|
for item := range outputItems {
|
||||||
saveHarToDb(*entry, "")
|
saveHarToDb(item.HarEntry, item.RequestSenderIp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func saveHarToDb(entry har.Entry, source string) {
|
func saveHarToDb(entry *har.Entry, sender string) {
|
||||||
entryBytes, _ := json.Marshal(entry)
|
entryBytes, _ := json.Marshal(entry)
|
||||||
serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL)
|
serviceName, urlPath, serviceHostName := getServiceNameFromUrl(entry.Request.URL)
|
||||||
entryId := primitive.NewObjectID().Hex()
|
entryId := primitive.NewObjectID().Hex()
|
||||||
@ -101,15 +102,15 @@ func saveHarToDb(entry har.Entry, source string) {
|
|||||||
resolvedDestination = k8sResolver.Resolve(serviceHostName)
|
resolvedDestination = k8sResolver.Resolve(serviceHostName)
|
||||||
}
|
}
|
||||||
mizuEntry := models.MizuEntry{
|
mizuEntry := models.MizuEntry{
|
||||||
EntryId: entryId,
|
EntryId: entryId,
|
||||||
Entry: string(entryBytes), // simple way to store it and not convert to bytes
|
Entry: string(entryBytes), // simple way to store it and not convert to bytes
|
||||||
Service: serviceName,
|
Service: serviceName,
|
||||||
Url: entry.Request.URL,
|
Url: entry.Request.URL,
|
||||||
Path: urlPath,
|
Path: urlPath,
|
||||||
Method: entry.Request.Method,
|
Method: entry.Request.Method,
|
||||||
Status: entry.Response.Status,
|
Status: entry.Response.Status,
|
||||||
Source: source,
|
RequestSenderIp: sender,
|
||||||
Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond),
|
Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond),
|
||||||
ResolvedSource: resolvedSource,
|
ResolvedSource: resolvedSource,
|
||||||
ResolvedDestination: resolvedDestination,
|
ResolvedDestination: resolvedDestination,
|
||||||
}
|
}
|
||||||
|
@ -3,30 +3,31 @@ package models
|
|||||||
import "time"
|
import "time"
|
||||||
|
|
||||||
type MizuEntry struct {
|
type MizuEntry struct {
|
||||||
ID uint `gorm:"primarykey"`
|
ID uint `gorm:"primarykey"`
|
||||||
CreatedAt time.Time
|
CreatedAt time.Time
|
||||||
UpdatedAt time.Time
|
UpdatedAt time.Time
|
||||||
Entry string `json:"entry,omitempty" gorm:"column:entry"`
|
Entry string `json:"entry,omitempty" gorm:"column:entry"`
|
||||||
EntryId string `json:"entryId" gorm:"column:entryId"`
|
EntryId string `json:"entryId" gorm:"column:entryId"`
|
||||||
Url string `json:"url" gorm:"column:url"`
|
Url string `json:"url" gorm:"column:url"`
|
||||||
Method string `json:"method" gorm:"column:method"`
|
Method string `json:"method" gorm:"column:method"`
|
||||||
Status int `json:"status" gorm:"column:status"`
|
Status int `json:"status" gorm:"column:status"`
|
||||||
Source string `json:"source" gorm:"column:source"`
|
RequestSenderIp string `json:"requestSenderIp" gorm:"column:requestSenderIp"`
|
||||||
Service string `json:"service" gorm:"column:service"`
|
Service string `json:"service" gorm:"column:service"`
|
||||||
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
|
Timestamp int64 `json:"timestamp" gorm:"column:timestamp"`
|
||||||
Path string `json:"path" gorm:"column:path"`
|
Path string `json:"path" gorm:"column:path"`
|
||||||
ResolvedSource *string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
|
ResolvedSource *string `json:"resolvedSource,omitempty" gorm:"column:resolvedSource"`
|
||||||
ResolvedDestination *string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
|
ResolvedDestination *string `json:"resolvedDestination,omitempty" gorm:"column:resolvedDestination"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type BaseEntryDetails struct {
|
type BaseEntryDetails struct {
|
||||||
Id string `json:"id,omitempty"`
|
Id string `json:"id,omitempty"`
|
||||||
Url string `json:"url,omitempty"`
|
Url string `json:"url,omitempty"`
|
||||||
Service string `json:"service,omitempty"`
|
RequestSenderIp string `json:"requestSenderIp,omitempty"`
|
||||||
Path string `json:"path,omitempty"`
|
Service string `json:"service,omitempty"`
|
||||||
StatusCode int `json:"statusCode,omitempty"`
|
Path string `json:"path,omitempty"`
|
||||||
Method string `json:"method,omitempty"`
|
StatusCode int `json:"statusCode,omitempty"`
|
||||||
Timestamp int64 `json:"timestamp,omitempty"`
|
Method string `json:"method,omitempty"`
|
||||||
|
Timestamp int64 `json:"timestamp,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type EntryData struct {
|
type EntryData struct {
|
||||||
@ -35,7 +36,7 @@ type EntryData struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type EntriesFilter struct {
|
type EntriesFilter struct {
|
||||||
Limit int `query:"limit" validate:"required,min=1,max=200"`
|
Limit int `query:"limit" validate:"required,min=1,max=200"`
|
||||||
Operator string `query:"operator" validate:"required,oneof='lt' 'gt'"`
|
Operator string `query:"operator" validate:"required,oneof='lt' 'gt'"`
|
||||||
Timestamp int64 `query:"timestamp" validate:"required,min=1"`
|
Timestamp int64 `query:"timestamp" validate:"required,min=1"`
|
||||||
}
|
}
|
||||||
|
@ -64,7 +64,7 @@ func (fbs *fragmentsByStream) appendFrame(streamID uint32, frame http2.Frame) {
|
|||||||
func (fbs *fragmentsByStream) pop(streamID uint32) ([]hpack.HeaderField, []byte) {
|
func (fbs *fragmentsByStream) pop(streamID uint32) ([]hpack.HeaderField, []byte) {
|
||||||
headers := (*fbs)[streamID].headers
|
headers := (*fbs)[streamID].headers
|
||||||
data := (*fbs)[streamID].data
|
data := (*fbs)[streamID].data
|
||||||
delete((*fbs), streamID)
|
delete(*fbs, streamID)
|
||||||
|
|
||||||
return headers, data
|
return headers, data
|
||||||
}
|
}
|
||||||
@ -193,7 +193,7 @@ func checkIsHTTP2ServerStream(b *bufio.Reader) (bool, error) {
|
|||||||
|
|
||||||
// Check server connection preface (a settings frame)
|
// Check server connection preface (a settings frame)
|
||||||
frameHeader := http2.FrameHeader{
|
frameHeader := http2.FrameHeader{
|
||||||
Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])),
|
Length: uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2]),
|
||||||
Type: http2.FrameType(buf[3]),
|
Type: http2.FrameType(buf[3]),
|
||||||
Flags: http2.Flags(buf[4]),
|
Flags: http2.Flags(buf[4]),
|
||||||
StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
|
StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1),
|
||||||
|
@ -18,10 +18,11 @@ const readPermission = 0644
|
|||||||
const tempFilenamePrefix = "har_writer"
|
const tempFilenamePrefix = "har_writer"
|
||||||
|
|
||||||
type PairChanItem struct {
|
type PairChanItem struct {
|
||||||
Request *http.Request
|
Request *http.Request
|
||||||
RequestTime time.Time
|
RequestTime time.Time
|
||||||
Response *http.Response
|
Response *http.Response
|
||||||
ResponseTime time.Time
|
ResponseTime time.Time
|
||||||
|
RequestSenderIp string
|
||||||
}
|
}
|
||||||
|
|
||||||
func openNewHarFile(filename string) *HarFile {
|
func openNewHarFile(filename string) *HarFile {
|
||||||
@ -153,27 +154,33 @@ func NewHarWriter(outputDir string, maxEntries int) *HarWriter {
|
|||||||
OutputDirPath: outputDir,
|
OutputDirPath: outputDir,
|
||||||
MaxEntries: maxEntries,
|
MaxEntries: maxEntries,
|
||||||
PairChan: make(chan *PairChanItem),
|
PairChan: make(chan *PairChanItem),
|
||||||
OutChan: make(chan *har.Entry, 1000),
|
OutChan: make(chan *OutputChannelItem, 1000),
|
||||||
currentFile: nil,
|
currentFile: nil,
|
||||||
done: make(chan bool),
|
done: make(chan bool),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type OutputChannelItem struct {
|
||||||
|
HarEntry *har.Entry
|
||||||
|
RequestSenderIp string
|
||||||
|
}
|
||||||
|
|
||||||
type HarWriter struct {
|
type HarWriter struct {
|
||||||
OutputDirPath string
|
OutputDirPath string
|
||||||
MaxEntries int
|
MaxEntries int
|
||||||
PairChan chan *PairChanItem
|
PairChan chan *PairChanItem
|
||||||
OutChan chan *har.Entry
|
OutChan chan *OutputChannelItem
|
||||||
currentFile *HarFile
|
currentFile *HarFile
|
||||||
done chan bool
|
done chan bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (hw *HarWriter) WritePair(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time) {
|
func (hw *HarWriter) WritePair(request *http.Request, requestTime time.Time, response *http.Response, responseTime time.Time, requestSenderIp string) {
|
||||||
hw.PairChan <- &PairChanItem{
|
hw.PairChan <- &PairChanItem{
|
||||||
Request: request,
|
Request: request,
|
||||||
RequestTime: requestTime,
|
RequestTime: requestTime,
|
||||||
Response: response,
|
Response: response,
|
||||||
ResponseTime: responseTime,
|
ResponseTime: responseTime,
|
||||||
|
RequestSenderIp: requestSenderIp,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -202,7 +209,10 @@ func (hw *HarWriter) Start() {
|
|||||||
hw.closeFile()
|
hw.closeFile()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
hw.OutChan <- harEntry
|
hw.OutChan <- &OutputChannelItem{
|
||||||
|
HarEntry: harEntry,
|
||||||
|
RequestSenderIp: pair.RequestSenderIp,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,7 +239,10 @@ func (hw *HarWriter) closeFile() {
|
|||||||
hw.currentFile = nil
|
hw.currentFile = nil
|
||||||
|
|
||||||
filename := buildFilename(hw.OutputDirPath, time.Now())
|
filename := buildFilename(hw.OutputDirPath, time.Now())
|
||||||
os.Rename(tmpFilename, filename)
|
err := os.Rename(tmpFilename, filename)
|
||||||
|
if err != nil {
|
||||||
|
SilentError("Rename-file", "cannot rename file: %s (%v,%+v)\n", err, err, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildFilename(dir string, t time.Time) string {
|
func buildFilename(dir string, t time.Time) string {
|
||||||
|
@ -30,14 +30,16 @@ type messageBody struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type httpMessage struct {
|
type httpMessage struct {
|
||||||
IsRequest bool
|
IsRequest bool
|
||||||
Headers []headerKeyVal `json:"headers"`
|
Headers []headerKeyVal `json:"headers"`
|
||||||
HTTPVersion string `json:"httpVersion"`
|
HTTPVersion string `json:"httpVersion"`
|
||||||
Body messageBody `json:"body"`
|
Body messageBody `json:"body"`
|
||||||
captureTime time.Time
|
captureTime time.Time
|
||||||
orig interface {}
|
orig interface {}
|
||||||
|
requestSenderIp string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}
|
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}
|
||||||
type requestResponseMatcher struct {
|
type requestResponseMatcher struct {
|
||||||
openMessagesMap cmap.ConcurrentMap
|
openMessagesMap cmap.ConcurrentMap
|
||||||
@ -58,7 +60,7 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
|
|||||||
{Key: "x-up9-destination", Value: split[1] + ":" + split[3]},
|
{Key: "x-up9-destination", Value: split[1] + ":" + split[3]},
|
||||||
}
|
}
|
||||||
|
|
||||||
requestHTTPMessage := requestToMessage(request, captureTime, body, &messageExtraHeaders, isHTTP2)
|
requestHTTPMessage := requestToMessage(request, captureTime, body, &messageExtraHeaders, isHTTP2, split[0])
|
||||||
|
|
||||||
if response, found := matcher.openMessagesMap.Pop(key); found {
|
if response, found := matcher.openMessagesMap.Pop(key); found {
|
||||||
// Type assertion always succeeds because all of the map's values are of httpMessage type
|
// Type assertion always succeeds because all of the map's values are of httpMessage type
|
||||||
@ -109,7 +111,7 @@ func (matcher *requestResponseMatcher) preparePair(requestHTTPMessage *httpMessa
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func requestToMessage(request *http.Request, captureTime time.Time, body string, messageExtraHeaders *[]headerKeyVal, isHTTP2 bool) httpMessage {
|
func requestToMessage(request *http.Request, captureTime time.Time, body string, messageExtraHeaders *[]headerKeyVal, isHTTP2 bool, requestSenderIp string) httpMessage {
|
||||||
messageHeaders := make([]headerKeyVal, 0)
|
messageHeaders := make([]headerKeyVal, 0)
|
||||||
|
|
||||||
for key, value := range request.Header {
|
for key, value := range request.Header {
|
||||||
@ -132,12 +134,13 @@ func requestToMessage(request *http.Request, captureTime time.Time, body string,
|
|||||||
requestBody := messageBody{Truncated: false, AsBytes: body}
|
requestBody := messageBody{Truncated: false, AsBytes: body}
|
||||||
|
|
||||||
return httpMessage{
|
return httpMessage{
|
||||||
IsRequest: true,
|
IsRequest: true,
|
||||||
Headers: messageHeaders,
|
Headers: messageHeaders,
|
||||||
HTTPVersion: httpVersion,
|
HTTPVersion: httpVersion,
|
||||||
Body: requestBody,
|
Body: requestBody,
|
||||||
captureTime: captureTime,
|
captureTime: captureTime,
|
||||||
orig: request,
|
orig: request,
|
||||||
|
requestSenderIp: requestSenderIp,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ func (tid *tcpID) String() string {
|
|||||||
/* httpReader gets reads from a channel of bytes of tcp payload, and parses it into HTTP/1 requests and responses.
|
/* httpReader gets reads from a channel of bytes of tcp payload, and parses it into HTTP/1 requests and responses.
|
||||||
* The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection.
|
* The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection.
|
||||||
* An httpReader object is unidirectional: it parses either a client stream or a server stream.
|
* An httpReader object is unidirectional: it parses either a client stream or a server stream.
|
||||||
* Implemets io.Reader interface (Read)
|
* Implements io.Reader interface (Read)
|
||||||
*/
|
*/
|
||||||
type httpReader struct {
|
type httpReader struct {
|
||||||
ident string
|
ident string
|
||||||
@ -80,13 +80,16 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if h.isHTTP2 {
|
if h.isHTTP2 {
|
||||||
prepareHTTP2Connection(b, h.isClient)
|
err := prepareHTTP2Connection(b, h.isClient)
|
||||||
|
if err != nil {
|
||||||
|
SilentError("HTTP/2-Prepare-Connection-After-Check", "stream %s error: %s (%v,%+v)\n", h.ident, err, err, err)
|
||||||
|
}
|
||||||
h.grpcAssembler = createGrpcAssembler(b)
|
h.grpcAssembler = createGrpcAssembler(b)
|
||||||
}
|
}
|
||||||
|
|
||||||
for true {
|
for true {
|
||||||
if h.isHTTP2 {
|
if h.isHTTP2 {
|
||||||
err := h.handleHTTP2Stream(b)
|
err := h.handleHTTP2Stream()
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
break
|
break
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
@ -113,11 +116,11 @@ func (h *httpReader) run(wg *sync.WaitGroup) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *httpReader) handleHTTP2Stream(b *bufio.Reader) error {
|
func (h *httpReader) handleHTTP2Stream() error {
|
||||||
streamID, messageHTTP1, body, error := h.grpcAssembler.readMessage()
|
streamID, messageHTTP1, body, err := h.grpcAssembler.readMessage()
|
||||||
h.messageCount++
|
h.messageCount++
|
||||||
if error != nil {
|
if err != nil {
|
||||||
return error
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
var reqResPair *envoyMessageWrapper
|
var reqResPair *envoyMessageWrapper
|
||||||
@ -138,15 +141,14 @@ func (h *httpReader) handleHTTP2Stream(b *bufio.Reader) error {
|
|||||||
reqResPair.HttpBufferedTrace.Request.captureTime,
|
reqResPair.HttpBufferedTrace.Request.captureTime,
|
||||||
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
|
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
|
||||||
reqResPair.HttpBufferedTrace.Response.captureTime,
|
reqResPair.HttpBufferedTrace.Response.captureTime,
|
||||||
|
reqResPair.HttpBufferedTrace.Request.requestSenderIp,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
jsonStr, err := json.Marshal(reqResPair)
|
jsonStr, err := json.Marshal(reqResPair)
|
||||||
|
|
||||||
broadcastReqResPair(jsonStr)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
broadcastReqResPair(jsonStr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -167,7 +169,9 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error {
|
|||||||
} else if h.hexdump {
|
} else if h.hexdump {
|
||||||
Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body))
|
Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body))
|
||||||
}
|
}
|
||||||
req.Body.Close()
|
if err := req.Body.Close(); err != nil {
|
||||||
|
SilentError("HTTP-request-body-close", "stream %s Failed to close request body: %s\n", h.ident, err)
|
||||||
|
}
|
||||||
encoding := req.Header["Content-Encoding"]
|
encoding := req.Header["Content-Encoding"]
|
||||||
bodyStr, err := readBody(body, encoding)
|
bodyStr, err := readBody(body, encoding)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -184,15 +188,14 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error {
|
|||||||
reqResPair.HttpBufferedTrace.Request.captureTime,
|
reqResPair.HttpBufferedTrace.Request.captureTime,
|
||||||
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
|
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
|
||||||
reqResPair.HttpBufferedTrace.Response.captureTime,
|
reqResPair.HttpBufferedTrace.Response.captureTime,
|
||||||
|
reqResPair.HttpBufferedTrace.Request.requestSenderIp,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
jsonStr, err := json.Marshal(reqResPair)
|
jsonStr, err := json.Marshal(reqResPair)
|
||||||
|
|
||||||
broadcastReqResPair(jsonStr)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err)
|
SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err)
|
||||||
}
|
}
|
||||||
|
broadcastReqResPair(jsonStr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -226,7 +229,9 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
|
|||||||
if h.hexdump {
|
if h.hexdump {
|
||||||
Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body))
|
Info("Body(%d/0x%x)\n%s\n", len(body), len(body), hex.Dump(body))
|
||||||
}
|
}
|
||||||
res.Body.Close()
|
if err := res.Body.Close(); err != nil {
|
||||||
|
SilentError("HTTP-response-body-close", "HTTP/%s: failed to close body(parsed len:%d): %s\n", h.ident, s, err)
|
||||||
|
}
|
||||||
sym := ","
|
sym := ","
|
||||||
if res.ContentLength > 0 && res.ContentLength != int64(s) {
|
if res.ContentLength > 0 && res.ContentLength != int64(s) {
|
||||||
sym = "!="
|
sym = "!="
|
||||||
@ -251,15 +256,14 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
|
|||||||
reqResPair.HttpBufferedTrace.Request.captureTime,
|
reqResPair.HttpBufferedTrace.Request.captureTime,
|
||||||
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
|
reqResPair.HttpBufferedTrace.Response.orig.(*http.Response),
|
||||||
reqResPair.HttpBufferedTrace.Response.captureTime,
|
reqResPair.HttpBufferedTrace.Response.captureTime,
|
||||||
|
reqResPair.HttpBufferedTrace.Request.requestSenderIp,
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
jsonStr, err := json.Marshal(reqResPair)
|
jsonStr, err := json.Marshal(reqResPair)
|
||||||
|
|
||||||
broadcastReqResPair(jsonStr)
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err)
|
SilentError("HTTP-marshal", "stream %s Error convert request response to json: %s\n", h.ident, err)
|
||||||
}
|
}
|
||||||
|
broadcastReqResPair(jsonStr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,11 +29,10 @@ import (
|
|||||||
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
||||||
"github.com/google/gopacket/pcap"
|
"github.com/google/gopacket/pcap"
|
||||||
"github.com/google/gopacket/reassembly"
|
"github.com/google/gopacket/reassembly"
|
||||||
"github.com/google/martian/har"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const AppPortsEnvVar = "APP_PORTS"
|
const AppPortsEnvVar = "APP_PORTS"
|
||||||
const TapOutPortEnvVar = "WEB_SOCKET_PORT"
|
const OutPortEnvVar = "WEB_SOCKET_PORT"
|
||||||
const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT"
|
const maxHTTP2DataLenEnvVar = "HTTP2_DATA_SIZE_LIMIT"
|
||||||
const hostModeEnvVar = "HOST_MODE"
|
const hostModeEnvVar = "HOST_MODE"
|
||||||
// default is 1MB, more than the max size accepted by collector and traffic-dumper
|
// default is 1MB, more than the max size accepted by collector and traffic-dumper
|
||||||
@ -199,7 +198,7 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
|
|||||||
return c.CaptureInfo
|
return c.CaptureInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
func StartPassiveTapper() chan *har.Entry {
|
func StartPassiveTapper() chan *OutputChannelItem {
|
||||||
var harWriter *HarWriter
|
var harWriter *HarWriter
|
||||||
if *dumpToHar {
|
if *dumpToHar {
|
||||||
harWriter = NewHarWriter(*HarOutputDir, *harEntriesPerFile)
|
harWriter = NewHarWriter(*HarOutputDir, *harEntriesPerFile)
|
||||||
@ -243,7 +242,7 @@ func startPassiveTapper(harWriter *HarWriter) {
|
|||||||
}
|
}
|
||||||
hostAppAddresses = parseHostAppAddresses(*hostAppAddressesString)
|
hostAppAddresses = parseHostAppAddresses(*hostAppAddressesString)
|
||||||
fmt.Println("Filtering for the following addresses:", hostAppAddresses)
|
fmt.Println("Filtering for the following addresses:", hostAppAddresses)
|
||||||
tapOutputPort := os.Getenv(TapOutPortEnvVar)
|
tapOutputPort := os.Getenv(OutPortEnvVar)
|
||||||
if tapOutputPort == "" {
|
if tapOutputPort == "" {
|
||||||
fmt.Println("Received empty/no WEB_SOCKET_PORT env var! falling back to port 8080")
|
fmt.Println("Received empty/no WEB_SOCKET_PORT env var! falling back to port 8080")
|
||||||
tapOutputPort = "8080"
|
tapOutputPort = "8080"
|
||||||
@ -297,15 +296,15 @@ func startPassiveTapper(harWriter *HarWriter) {
|
|||||||
// just call pcap.OpenLive if you want a simple handle.
|
// just call pcap.OpenLive if you want a simple handle.
|
||||||
inactive, err := pcap.NewInactiveHandle(*iface)
|
inactive, err := pcap.NewInactiveHandle(*iface)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatal("could not create: %v", err)
|
log.Fatalf("could not create: %v", err)
|
||||||
}
|
}
|
||||||
defer inactive.CleanUp()
|
defer inactive.CleanUp()
|
||||||
if err = inactive.SetSnapLen(*snaplen); err != nil {
|
if err = inactive.SetSnapLen(*snaplen); err != nil {
|
||||||
log.Fatal("could not set snap length: %v", err)
|
log.Fatalf("could not set snap length: %v", err)
|
||||||
} else if err = inactive.SetPromisc(*promisc); err != nil {
|
} else if err = inactive.SetPromisc(*promisc); err != nil {
|
||||||
log.Fatal("could not set promisc mode: %v", err)
|
log.Fatalf("could not set promisc mode: %v", err)
|
||||||
} else if err = inactive.SetTimeout(time.Second); err != nil {
|
} else if err = inactive.SetTimeout(time.Second); err != nil {
|
||||||
log.Fatal("could not set timeout: %v", err)
|
log.Fatalf("could not set timeout: %v", err)
|
||||||
}
|
}
|
||||||
if *tstype != "" {
|
if *tstype != "" {
|
||||||
if t, err := pcap.TimestampSourceFromString(*tstype); err != nil {
|
if t, err := pcap.TimestampSourceFromString(*tstype); err != nil {
|
||||||
@ -334,12 +333,12 @@ func startPassiveTapper(harWriter *HarWriter) {
|
|||||||
|
|
||||||
var dec gopacket.Decoder
|
var dec gopacket.Decoder
|
||||||
var ok bool
|
var ok bool
|
||||||
decoder_name := *decoder
|
decoderName := *decoder
|
||||||
if decoder_name == "" {
|
if decoderName == "" {
|
||||||
decoder_name = fmt.Sprintf("%s", handle.LinkType())
|
decoderName = fmt.Sprintf("%s", handle.LinkType())
|
||||||
}
|
}
|
||||||
if dec, ok = gopacket.DecodersByLayerName[decoder_name]; !ok {
|
if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok {
|
||||||
log.Fatalln("No decoder named", decoder_name)
|
log.Fatalln("No decoder named", decoderName)
|
||||||
}
|
}
|
||||||
source := gopacket.NewPacketSource(handle, dec)
|
source := gopacket.NewPacketSource(handle, dec)
|
||||||
source.Lazy = *lazy
|
source.Lazy = *lazy
|
||||||
@ -526,7 +525,7 @@ func startPassiveTapper(harWriter *HarWriter) {
|
|||||||
fmt.Printf(" overlap packets:\t%d\n", stats.overlapPackets)
|
fmt.Printf(" overlap packets:\t%d\n", stats.overlapPackets)
|
||||||
fmt.Printf(" overlap bytes:\t\t%d\n", stats.overlapBytes)
|
fmt.Printf(" overlap bytes:\t\t%d\n", stats.overlapBytes)
|
||||||
fmt.Printf("Errors: %d\n", nErrors)
|
fmt.Printf("Errors: %d\n", nErrors)
|
||||||
for e, _ := range errorsMap {
|
for e := range errorsMap {
|
||||||
fmt.Printf(" %s:\t\t%d\n", e, errorsMap[e])
|
fmt.Printf(" %s:\t\t%d\n", e, errorsMap[e])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -78,5 +78,6 @@ func GetResolvedBaseEntry(entry models.MizuEntry) models.BaseEntryDetails {
|
|||||||
StatusCode: entry.Status,
|
StatusCode: entry.Status,
|
||||||
Method: entry.Method,
|
Method: entry.Method,
|
||||||
Timestamp: entry.Timestamp,
|
Timestamp: entry.Timestamp,
|
||||||
|
RequestSenderIp: entry.RequestSenderIp,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
39
cli/Makefile
39
cli/Makefile
@ -1,21 +1,30 @@
|
|||||||
|
FOLDER=$(GOOS).$(GOARCH)
|
||||||
|
|
||||||
|
.PHONY: help
|
||||||
|
.DEFAULT_GOAL := help
|
||||||
|
|
||||||
|
help: ## This help.
|
||||||
|
@awk 'BEGIN {FS = ":.*?## "} /^[a-zA-Z_-]+:.*?## / {printf "\033[36m%-30s\033[0m %s\n", $$1, $$2}' $(MAKEFILE_LIST)
|
||||||
|
|
||||||
install:
|
install:
|
||||||
go install mizu.go
|
go install mizu.go
|
||||||
|
|
||||||
build:
|
build: ## build mizu CLI binary (select platform via GOOS / GOARCH env variables)
|
||||||
go build -o bin/mizu mizu.go
|
go build -o bin/$(FOLDER)/mizu mizu.go
|
||||||
|
|
||||||
build-cr:
|
build-all: ## build for all supported platforms
|
||||||
@echo "Compiling for every OS and Platform"
|
@echo "Compiling for every OS and Platform"
|
||||||
@echo ""
|
@$(MAKE) build GOOS=darwin GOARCH=amd64
|
||||||
GOOS=darwin GOARCH=amd64 go build -o bin/mizu-darwin-amd64 mizu.go
|
@$(MAKE) build GOOS=linux GOARCH=amd64
|
||||||
GOOS=linux GOARCH=amd64 go build -o bin/mizu-linux-amd64 mizu.go
|
@# $(MAKE) GOOS=windows GOARCH=amd64
|
||||||
@#GOOS=windows GOARCH=amd64 go build -o bin/mizu-windows-amd64.exe mizu.go
|
@# $(MAKE) GOOS=linux GOARCH=386
|
||||||
@#GOOS=linux GOARCH=386 go build -o bin/mizu-linux-386 mizu.go
|
@# $(MAKE) GOOS=windows GOARCH=386
|
||||||
@#GOOS=windows GOARCH=386 go build -o bin/mizu-windows-386.exe mizu.go
|
@# $(MAKE) GOOS=darwin GOARCH=arm64
|
||||||
@#GOOS=darwin GOARCH=arm64 go build -o bin/mizu-darwin-arm64 mizu.go
|
@# $(MAKE) GOOS=linux GOARCH=arm64
|
||||||
@#GOOS=linux GOARCH=arm64 go build -o bin/mizu-linux-arm64 mizu.go
|
@# $(MAKE) GOOS=windows GOARCH=arm64
|
||||||
@#GOOS=windows GOARCH=arm64 go build -o bin/mizu-windows-arm64 mizu.go
|
@echo "---------"
|
||||||
|
@find ./bin -ls
|
||||||
|
|
||||||
clean:
|
clean: ## clean all build artifacts
|
||||||
#go clean
|
go clean
|
||||||
rm -f ./bin/*
|
rm -rf ./bin/*
|
||||||
|
Loading…
Reference in New Issue
Block a user