diff --git a/.gitignore b/.gitignore index cc606960f..890791ba3 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,4 @@ build # Mac OS .DS_Store +.vscode/ diff --git a/Makefile b/Makefile index bb85ff8ea..8ed6ab620 100644 --- a/Makefile +++ b/Makefile @@ -65,3 +65,6 @@ clean-cli: ## Clean CLI. clean-docker: @(echo "DOCKER cleanup - NOT IMPLEMENTED YET " ) +http: + cd extensions/http && \ + go build -buildmode=plugin -o ../http.so . diff --git a/agent/main.go b/agent/main.go index 381433832..cce894095 100644 --- a/agent/main.go +++ b/agent/main.go @@ -4,21 +4,20 @@ import ( "encoding/json" "flag" "fmt" + "mizuserver/pkg/api" + "mizuserver/pkg/models" + "mizuserver/pkg/routes" + "mizuserver/pkg/utils" + "net/http" + "os" + "os/signal" + "github.com/gin-contrib/static" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/romana/rlog" "github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/tap" - "mizuserver/pkg/api" - "mizuserver/pkg/models" - "mizuserver/pkg/routes" - "mizuserver/pkg/sensitiveDataFiltering" - "mizuserver/pkg/utils" - "net/http" - "os" - "os/signal" - "strings" ) var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API") @@ -153,33 +152,33 @@ var userAgentsToFilter = []string{"kube-probe", "prometheus"} func filterHarItems(inChannel <-chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { for message := range inChannel { - if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) { - continue - } + // if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) { + // continue + // } // TODO: move this to tappers https://up9.atlassian.net/browse/TRA-3441 if filterOptions.HideHealthChecks && isHealthCheckByUserAgent(message) { continue } - if !filterOptions.DisableRedaction { - sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) - } + // if !filterOptions.DisableRedaction { + // sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions) + // } outChannel <- message } } func isHealthCheckByUserAgent(message *tap.OutputChannelItem) bool { - for _, header := range message.HarEntry.Request.Headers { - if strings.ToLower(header.Name) == "user-agent" { - for _, userAgent := range userAgentsToFilter { - if strings.Contains(strings.ToLower(header.Value), userAgent) { - return true - } - } - return false - } - } + // for _, header := range message.HarEntry.Request.Headers { + // if strings.ToLower(header.Name) == "user-agent" { + // for _, userAgent := range userAgentsToFilter { + // if strings.Contains(strings.ToLower(header.Value), userAgent) { + // return true + // } + // } + // return false + // } + // } return false } diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index 942f09569..3e55b529a 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -16,9 +16,7 @@ import ( "github.com/google/martian/har" "github.com/romana/rlog" "github.com/up9inc/mizu/tap" - "go.mongodb.org/mongo-driver/bson/primitive" - "mizuserver/pkg/database" "mizuserver/pkg/models" "mizuserver/pkg/resolver" "mizuserver/pkg/utils" @@ -86,17 +84,17 @@ func startReadingFiles(workingDir string) { decErr := json.NewDecoder(bufio.NewReader(file)).Decode(&inputHar) utils.CheckErr(decErr) - for _, entry := range inputHar.Log.Entries { - time.Sleep(time.Millisecond * 250) - connectionInfo := &tap.ConnectionInfo{ - ClientIP: fileInfo.Name(), - ClientPort: "", - ServerIP: "", - ServerPort: "", - IsOutgoing: false, - } - saveHarToDb(entry, connectionInfo) - } + // for _, entry := range inputHar.Log.Entries { + // time.Sleep(time.Millisecond * 250) + // // connectionInfo := &tap.ConnectionInfo{ + // // ClientIP: fileInfo.Name(), + // // ClientPort: "", + // // ServerIP: "", + // // ServerPort: "", + // // IsOutgoing: false, + // // } + // // saveHarToDb(entry, connectionInfo) + // } rmErr := os.Remove(inputFilePath) utils.CheckErr(rmErr) } @@ -107,9 +105,9 @@ func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) { panic("Channel of captured messages is nil") } - for item := range outputItems { - saveHarToDb(item.HarEntry, item.ConnectionInfo) - } + // for item := range outputItems { + // saveHarToDb(item.HarEntry, item.ConnectionInfo) + // } } func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) { @@ -119,59 +117,59 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) { } } -func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) { - entryBytes, _ := json.Marshal(entry) - serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL) - entryId := primitive.NewObjectID().Hex() - var ( - resolvedSource string - resolvedDestination string - ) - if k8sResolver != nil { - unresolvedSource := connectionInfo.ClientIP - resolvedSource = k8sResolver.Resolve(unresolvedSource) - if resolvedSource == "" { - rlog.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource) - if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" { - return - } - } - unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort) - resolvedDestination = k8sResolver.Resolve(unresolvedDestination) - if resolvedDestination == "" { - rlog.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination) - if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" { - return - } - } - } +// func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) { +// entryBytes, _ := json.Marshal(entry) +// serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL) +// entryId := primitive.NewObjectID().Hex() +// var ( +// resolvedSource string +// resolvedDestination string +// ) +// if k8sResolver != nil { +// unresolvedSource := connectionInfo.ClientIP +// resolvedSource = k8sResolver.Resolve(unresolvedSource) +// if resolvedSource == "" { +// rlog.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource) +// if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" { +// return +// } +// } +// unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort) +// resolvedDestination = k8sResolver.Resolve(unresolvedDestination) +// if resolvedDestination == "" { +// rlog.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination) +// if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" { +// return +// } +// } +// } - mizuEntry := models.MizuEntry{ - EntryId: entryId, - Entry: string(entryBytes), // simple way to store it and not convert to bytes - Service: serviceName, - Url: entry.Request.URL, - Path: urlPath, - Method: entry.Request.Method, - Status: entry.Response.Status, - RequestSenderIp: connectionInfo.ClientIP, - Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond), - ResolvedSource: resolvedSource, - ResolvedDestination: resolvedDestination, - IsOutgoing: connectionInfo.IsOutgoing, - } - mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry) - database.CreateEntry(&mizuEntry) +// mizuEntry := models.MizuEntry{ +// EntryId: entryId, +// Entry: string(entryBytes), // simple way to store it and not convert to bytes +// Service: serviceName, +// Url: entry.Request.URL, +// Path: urlPath, +// Method: entry.Request.Method, +// Status: entry.Response.Status, +// RequestSenderIp: connectionInfo.ClientIP, +// Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond), +// ResolvedSource: resolvedSource, +// ResolvedDestination: resolvedDestination, +// IsOutgoing: connectionInfo.IsOutgoing, +// } +// mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry) +// database.CreateEntry(&mizuEntry) - baseEntry := models.BaseEntryDetails{} - if err := models.GetEntry(&mizuEntry, &baseEntry); err != nil { - return - } - baseEntry.Rules = models.RunValidationRulesState(*entry, serviceName) - baseEntry.Latency = entry.Timings.Receive - baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry) - BroadcastToBrowserClients(baseEntryBytes) -} +// baseEntry := models.BaseEntryDetails{} +// if err := models.GetEntry(&mizuEntry, &baseEntry); err != nil { +// return +// } +// baseEntry.Rules = models.RunValidationRulesState(*entry, serviceName) +// baseEntry.Latency = entry.Timings.Receive +// baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry) +// BroadcastToBrowserClients(baseEntryBytes) +// } func getServiceNameFromUrl(inputUrl string) (string, string) { parsed, err := url.Parse(inputUrl) diff --git a/agent/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go b/agent/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go deleted file mode 100644 index cc0e4d289..000000000 --- a/agent/pkg/sensitiveDataFiltering/messageSensitiveDataCleaner.go +++ /dev/null @@ -1,198 +0,0 @@ -package sensitiveDataFiltering - -import ( - "encoding/json" - "encoding/xml" - "errors" - "fmt" - "github.com/up9inc/mizu/tap" - "net/url" - "strings" - - "github.com/beevik/etree" - "github.com/google/martian/har" - "github.com/up9inc/mizu/shared" -) - -func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, options *shared.TrafficFilteringOptions) { - harOutputItem.HarEntry.Request.Headers = filterHarHeaders(harOutputItem.HarEntry.Request.Headers) - harOutputItem.HarEntry.Response.Headers = filterHarHeaders(harOutputItem.HarEntry.Response.Headers) - - harOutputItem.HarEntry.Request.Cookies = make([]har.Cookie, 0, 0) - harOutputItem.HarEntry.Response.Cookies = make([]har.Cookie, 0, 0) - - harOutputItem.HarEntry.Request.URL = filterUrl(harOutputItem.HarEntry.Request.URL) - for i, queryString := range harOutputItem.HarEntry.Request.QueryString { - if isFieldNameSensitive(queryString.Name) { - harOutputItem.HarEntry.Request.QueryString[i].Value = maskedFieldPlaceholderValue - } - } - - if harOutputItem.HarEntry.Request.PostData != nil { - requestContentType := getContentTypeHeaderValue(harOutputItem.HarEntry.Request.Headers) - filteredRequestBody, err := filterHttpBody([]byte(harOutputItem.HarEntry.Request.PostData.Text), requestContentType, options) - if err == nil { - harOutputItem.HarEntry.Request.PostData.Text = string(filteredRequestBody) - } - } - if harOutputItem.HarEntry.Response.Content != nil { - responseContentType := getContentTypeHeaderValue(harOutputItem.HarEntry.Response.Headers) - filteredResponseBody, err := filterHttpBody(harOutputItem.HarEntry.Response.Content.Text, responseContentType, options) - if err == nil { - harOutputItem.HarEntry.Response.Content.Text = filteredResponseBody - } - } -} - -func filterHarHeaders(headers []har.Header) []har.Header { - newHeaders := make([]har.Header, 0) - for i, header := range headers { - if strings.ToLower(header.Name) == "cookie" { - continue - } else if isFieldNameSensitive(header.Name) { - newHeaders = append(newHeaders, har.Header{Name: header.Name, Value: maskedFieldPlaceholderValue}) - headers[i].Value = maskedFieldPlaceholderValue - } else { - newHeaders = append(newHeaders, header) - } - } - return newHeaders -} - -func getContentTypeHeaderValue(headers []har.Header) string { - for _, header := range headers { - if strings.ToLower(header.Name) == "content-type" { - return header.Value - } - } - return "" -} - -func isFieldNameSensitive(fieldName string) bool { - name := strings.ToLower(fieldName) - name = strings.ReplaceAll(name, "_", "") - name = strings.ReplaceAll(name, "-", "") - name = strings.ReplaceAll(name, " ", "") - - for _, sensitiveField := range personallyIdentifiableDataFields { - if strings.Contains(name, sensitiveField) { - return true - } - } - - return false -} - -func filterHttpBody(bytes []byte, contentType string, options *shared.TrafficFilteringOptions) ([]byte, error) { - mimeType := strings.Split(contentType, ";")[0] - switch strings.ToLower(mimeType) { - case "application/json": - return filterJsonBody(bytes) - case "text/html": - fallthrough - case "application/xhtml+xml": - fallthrough - case "text/xml": - fallthrough - case "application/xml": - return filterXmlEtree(bytes) - case "text/plain": - if options != nil && options.PlainTextMaskingRegexes != nil { - return filterPlainText(bytes, options), nil - } - } - return bytes, nil -} - -func filterPlainText(bytes []byte, options *shared.TrafficFilteringOptions) []byte { - for _, regex := range options.PlainTextMaskingRegexes { - bytes = regex.ReplaceAll(bytes, []byte(maskedFieldPlaceholderValue)) - } - return bytes -} - -func filterXmlEtree(bytes []byte) ([]byte, error) { - if !IsValidXML(bytes) { - return nil, errors.New("Invalid XML") - } - xmlDoc := etree.NewDocument() - err := xmlDoc.ReadFromBytes(bytes) - if err != nil { - return nil, err - } else { - filterXmlElement(xmlDoc.Root()) - } - return xmlDoc.WriteToBytes() -} - -func IsValidXML(data []byte) bool { - return xml.Unmarshal(data, new(interface{})) == nil -} - -func filterXmlElement(element *etree.Element) { - for i, attribute := range element.Attr { - if isFieldNameSensitive(attribute.Key) { - element.Attr[i].Value = maskedFieldPlaceholderValue - } - } - if element.ChildElements() == nil || len(element.ChildElements()) == 0 { - if isFieldNameSensitive(element.Tag) { - element.SetText(maskedFieldPlaceholderValue) - } - } else { - for _, element := range element.ChildElements() { - filterXmlElement(element) - } - } -} - -func filterJsonBody(bytes []byte) ([]byte, error) { - var bodyJsonMap map[string] interface{} - err := json.Unmarshal(bytes ,&bodyJsonMap) - if err != nil { - return nil, err - } - filterJsonMap(bodyJsonMap) - return json.Marshal(bodyJsonMap) -} - -func filterJsonMap(jsonMap map[string] interface{}) { - for key, value := range jsonMap { - if value == nil { - return - } - nestedMap, isNested := value.(map[string] interface{}) - if isNested { - filterJsonMap(nestedMap) - } else { - if isFieldNameSensitive(key) { - jsonMap[key] = maskedFieldPlaceholderValue - } - } - } -} - -// receives string representing url, returns string url without sensitive query param values (http://service/api?userId=bob&password=123&type=login -> http://service/api?userId=[REDACTED]&password=[REDACTED]&type=login) -func filterUrl(originalUrl string) string { - parsedUrl, err := url.Parse(originalUrl) - if err != nil { - return fmt.Sprintf("http://%s", maskedFieldPlaceholderValue) - } else { - if len(parsedUrl.RawQuery) > 0 { - newQueryArgs := make([]string, 0) - for urlQueryParamName, urlQueryParamValues := range parsedUrl.Query() { - newValues := urlQueryParamValues - if isFieldNameSensitive(urlQueryParamName) { - newValues = []string {maskedFieldPlaceholderValue} - } - for _, paramValue := range newValues { - newQueryArgs = append(newQueryArgs, fmt.Sprintf("%s=%s", urlQueryParamName, paramValue)) - } - } - - parsedUrl.RawQuery = strings.Join(newQueryArgs, "&") - } - - return parsedUrl.String() - } -} diff --git a/extensions/http/go.mod b/extensions/http/go.mod new file mode 100644 index 000000000..55628d223 --- /dev/null +++ b/extensions/http/go.mod @@ -0,0 +1,12 @@ +module github.com/up9inc/mizu/tap + +go 1.16 + +require ( + github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 + github.com/google/martian v2.1.0+incompatible + github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc + github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7 + golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d + golang.org/x/text v0.3.7 // indirect +) diff --git a/extensions/http/go.sum b/extensions/http/go.sum new file mode 100644 index 000000000..459fe3fa8 --- /dev/null +++ b/extensions/http/go.sum @@ -0,0 +1,17 @@ +github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 h1:NJOOlc6ZJjix0A1rAU+nxruZtR8KboG1848yqpIUo4M= +github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4/go.mod h1:DQPxZS994Ld1Y8uwnJT+dRL04XPD0cElP/pHH/zEBHM= +github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no= +github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= +github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ= +github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= +github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7 h1:jkvpcEatpwuMF5O5LVxTnehj6YZ/aEZN4NWD/Xml4pI= +github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7/go.mod h1:KTrHyWpO1sevuXPZwyeZc72ddWRFqNSKDFl7uVWKpg0= +golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRXg1aWdlNOVOHRq45d7c= +golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/tap/grpc_assembler.go b/extensions/http/grpc_assembler.go similarity index 89% rename from tap/grpc_assembler.go rename to extensions/http/grpc_assembler.go index 72b5665f1..eb270c2a6 100644 --- a/tap/grpc_assembler.go +++ b/extensions/http/grpc_assembler.go @@ -1,4 +1,4 @@ -package tap +package main import ( "bufio" @@ -17,17 +17,20 @@ import ( ) const frameHeaderLen = 9 + var clientPreface = []byte(http2.ClientPreface) + const initialHeaderTableSize = 4096 const protoHTTP2 = "HTTP/2.0" const protoMajorHTTP2 = 2 const protoMinorHTTP2 = 0 +const maxHTTP2DataLenDefault = 1 * 1024 * 1024 // 1MB var maxHTTP2DataLen int = maxHTTP2DataLenDefault // value initialized during init type messageFragment struct { headers []hpack.HeaderField - data []byte + data []byte } type fragmentsByStream map[uint32]*messageFragment @@ -46,7 +49,7 @@ func (fbs *fragmentsByStream) appendFrame(streamID uint32, frame http2.Frame) { if existingFragment, ok := (*fbs)[streamID]; ok { existingDataLen := len(existingFragment.data) // Never save more than maxHTTP2DataLen bytes - numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen - existingDataLen), float64(newDataLen))) + numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen-existingDataLen), float64(newDataLen))) existingFragment.data = append(existingFragment.data, frame.Data()[:numBytesToAppend]...) } else { @@ -75,13 +78,13 @@ func createGrpcAssembler(b *bufio.Reader) GrpcAssembler { framer.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil) return GrpcAssembler{ fragmentsByStream: make(fragmentsByStream), - framer: framer, + framer: framer, } } type GrpcAssembler struct { fragmentsByStream fragmentsByStream - framer *http2.Framer + framer *http2.Framer } func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) { @@ -118,22 +121,22 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) { var messageHTTP1 interface{} if _, ok := headersHTTP1[":method"]; ok { messageHTTP1 = http.Request{ - URL: &url.URL{}, - Method: "POST", - Header: headersHTTP1, - Proto: protoHTTP2, - ProtoMajor: protoMajorHTTP2, - ProtoMinor: protoMinorHTTP2, - Body: io.NopCloser(strings.NewReader(dataString)), + URL: &url.URL{}, + Method: "POST", + Header: headersHTTP1, + Proto: protoHTTP2, + ProtoMajor: protoMajorHTTP2, + ProtoMinor: protoMinorHTTP2, + Body: io.NopCloser(strings.NewReader(dataString)), ContentLength: int64(len(dataString)), } } else if _, ok := headersHTTP1[":status"]; ok { messageHTTP1 = http.Response{ - Header: headersHTTP1, - Proto: protoHTTP2, - ProtoMajor: protoMajorHTTP2, - ProtoMinor: protoMinorHTTP2, - Body: io.NopCloser(strings.NewReader(dataString)), + Header: headersHTTP1, + Proto: protoHTTP2, + ProtoMajor: protoMajorHTTP2, + ProtoMinor: protoMinorHTTP2, + Body: io.NopCloser(strings.NewReader(dataString)), ContentLength: int64(len(dataString)), } } else { @@ -225,7 +228,7 @@ func checkClientPreface(b *bufio.Reader) (bool, error) { func discardClientPreface(b *bufio.Reader) error { if isClientPrefacePresent, err := checkClientPreface(b); err != nil { return err - } else if !isClientPrefacePresent{ + } else if !isClientPrefacePresent { return errors.New("discardClientPreface: does not begin with client preface") } diff --git a/tap/har_writer.go b/extensions/http/har_writer.go similarity index 90% rename from tap/har_writer.go rename to extensions/http/har_writer.go index a20dbfb99..bf292fc68 100644 --- a/tap/har_writer.go +++ b/extensions/http/har_writer.go @@ -1,4 +1,4 @@ -package tap +package main import ( "bytes" @@ -12,11 +12,45 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" "github.com/google/martian/har" + "github.com/romana/rlog" ) +var outputLevel int +var errorsMap map[string]uint +var errorsMapMutex sync.Mutex +var nErrors uint +var ownIps []string // global +var hostMode bool // global + +func logError(minOutputLevel int, t string, s string, a ...interface{}) { + errorsMapMutex.Lock() + nErrors++ + nb, _ := errorsMap[t] + errorsMap[t] = nb + 1 + errorsMapMutex.Unlock() + + if outputLevel >= minOutputLevel { + formatStr := fmt.Sprintf("%s: %s", t, s) + rlog.Errorf(formatStr, a...) + } +} +func Error(t string, s string, a ...interface{}) { + logError(0, t, s, a...) +} +func SilentError(t string, s string, a ...interface{}) { + logError(2, t, s, a...) +} +func Debug(s string, a ...interface{}) { + rlog.Debugf(s, a...) +} +func Trace(s string, a ...interface{}) { + rlog.Tracef(1, s, a...) +} + const readPermission = 0644 const harFilenameSuffix = ".har" const tempFilenameSuffix = ".har.tmp" @@ -59,7 +93,7 @@ func NewEntry(request *http.Request, requestTime time.Time, response *http.Respo // instead of harRequest.PostData.Text (as the HAR spec requires it). // Mizu currently only looks at PostData.Text. Therefore, instead of letting martian/har set the content of // PostData, always copy the request body to PostData.Text. - if (request.ContentLength > 0) { + if request.ContentLength > 0 { reqBody, err := ioutil.ReadAll(request.Body) if err != nil { SilentError("read-request-body", "Failed converting request to HAR %s (%v,%+v)", err, err, err) diff --git a/tap/http_matcher.go b/extensions/http/matcher.go similarity index 92% rename from tap/http_matcher.go rename to extensions/http/matcher.go index 5832cf3b1..40c527186 100644 --- a/tap/http_matcher.go +++ b/extensions/http/matcher.go @@ -1,4 +1,4 @@ -package tap +package main import ( "fmt" @@ -6,25 +6,25 @@ import ( "strings" "time" - "github.com/orcaman/concurrent-map" + cmap "github.com/orcaman/concurrent-map" ) +var reqResMatcher = createResponseRequestMatcher() // global + type requestResponsePair struct { Request httpMessage `json:"request"` Response httpMessage `json:"response"` } type httpMessage struct { - isRequest bool - captureTime time.Time - orig interface{} + isRequest bool + captureTime time.Time + orig interface{} } - // Key is {client_addr}:{client_port}->{dest_addr}:{dest_port} type requestResponseMatcher struct { openMessagesMap cmap.ConcurrentMap - } func createResponseRequestMatcher() requestResponseMatcher { @@ -37,9 +37,9 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht key := genKey(split) requestHTTPMessage := httpMessage{ - isRequest: true, - captureTime: captureTime, - orig: request, + isRequest: true, + captureTime: captureTime, + orig: request, } if response, found := matcher.openMessagesMap.Pop(key); found { @@ -113,7 +113,7 @@ func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int { } numDeleted := len(keysToPop) - + for _, key := range keysToPop { _, _ = matcher.openMessagesMap.Pop(key) } diff --git a/extensions/http/outboundlinks.go b/extensions/http/outboundlinks.go new file mode 100644 index 000000000..faa6ec367 --- /dev/null +++ b/extensions/http/outboundlinks.go @@ -0,0 +1,39 @@ +package main + +type OutboundLinkProtocol string + +const ( + TLSProtocol OutboundLinkProtocol = "tls" +) + +type OutboundLink struct { + Src string + DstIP string + DstPort int + SuggestedResolvedName string + SuggestedProtocol OutboundLinkProtocol +} + +func NewOutboundLinkWriter() *OutboundLinkWriter { + return &OutboundLinkWriter{ + OutChan: make(chan *OutboundLink), + } +} + +type OutboundLinkWriter struct { + OutChan chan *OutboundLink +} + +func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int, SuggestedResolvedName string, SuggestedProtocol OutboundLinkProtocol) { + olw.OutChan <- &OutboundLink{ + Src: src, + DstIP: DstIP, + DstPort: DstPort, + SuggestedResolvedName: SuggestedResolvedName, + SuggestedProtocol: SuggestedProtocol, + } +} + +func (olw *OutboundLinkWriter) Stop() { + close(olw.OutChan) +} diff --git a/tap/http_reader.go b/extensions/http/reader.go similarity index 93% rename from tap/http_reader.go rename to extensions/http/reader.go index 6f059afaa..5eb43b297 100644 --- a/tap/http_reader.go +++ b/extensions/http/reader.go @@ -1,17 +1,18 @@ -package tap +package main import ( "bufio" "bytes" "encoding/hex" "fmt" - "github.com/bradleyfalzon/tlsx" "io" "io/ioutil" "net/http" "strconv" "sync" "time" + + "github.com/bradleyfalzon/tlsx" ) const checkTLSPacketAmount = 100 @@ -41,7 +42,7 @@ func (tid *tcpID) String() string { } /* httpReader gets reads from a channel of bytes of tcp payload, and parses it into HTTP/1 requests and responses. - * The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection. + * The payload is written to the channel by a tap.TcpStream object that is dedicated to one tcp connection. * An httpReader object is unidirectional: it parses either a client stream or a server stream. * Implements io.Reader interface (Read) */ @@ -55,7 +56,6 @@ type httpReader struct { data []byte captureTime time.Time hexdump bool - parent *tcpStream grpcAssembler GrpcAssembler messageCount uint harWriter *HarWriter @@ -176,7 +176,7 @@ func (h *httpReader) handleHTTP2Stream() error { } if reqResPair != nil { - statsTracker.incMatchedMessages() + // statsTracker.incMatchedMessages() if h.harWriter != nil { h.harWriter.WritePair( @@ -215,7 +215,7 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error { ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, h.messageCount) reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime) if reqResPair != nil { - statsTracker.incMatchedMessages() + // statsTracker.incMatchedMessages() if h.harWriter != nil { h.harWriter.WritePair( @@ -234,9 +234,9 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error { } } - h.parent.Lock() - h.parent.urls = append(h.parent.urls, req.URL.String()) - h.parent.Unlock() + // h.parent.Lock() + // h.parent.urls = append(h.parent.urls, req.URL.String()) + // h.parent.Unlock() return nil } @@ -245,13 +245,13 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error { res, err := http.ReadResponse(b, nil) h.messageCount++ var req string - h.parent.Lock() - if len(h.parent.urls) == 0 { - req = fmt.Sprintf("") - } else { - req, h.parent.urls = h.parent.urls[0], h.parent.urls[1:] - } - h.parent.Unlock() + // h.parent.Lock() + // if len(h.parent.urls) == 0 { + // req = fmt.Sprintf("") + // } else { + // req, h.parent.urls = h.parent.urls[0], h.parent.urls[1:] + // } + // h.parent.Unlock() if err != nil { return err } @@ -281,7 +281,7 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error { ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, h.messageCount) reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime) if reqResPair != nil { - statsTracker.incMatchedMessages() + // statsTracker.incMatchedMessages() if h.harWriter != nil { h.harWriter.WritePair( diff --git a/tap/cleaner.go b/tap/cleaner.go index 96972fc9e..738b8b239 100644 --- a/tap/cleaner.go +++ b/tap/cleaner.go @@ -1,10 +1,11 @@ package tap import ( - "github.com/romana/rlog" "sync" "time" + "github.com/romana/rlog" + "github.com/google/gopacket/reassembly" ) @@ -17,7 +18,6 @@ type CleanerStats struct { type Cleaner struct { assembler *reassembly.Assembler assemblerMutex *sync.Mutex - matcher *requestResponseMatcher cleanPeriod time.Duration connectionTimeout time.Duration stats CleanerStats @@ -32,13 +32,10 @@ func (cl *Cleaner) clean() { flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout)) cl.assemblerMutex.Unlock() - deleted := cl.matcher.deleteOlderThan(startCleanTime.Add(-cl.connectionTimeout)) - cl.statsMutex.Lock() rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump()) cl.stats.flushed += flushed cl.stats.closed += closed - cl.stats.deleted += deleted cl.statsMutex.Unlock() } diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 79274029f..05849d22b 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -12,10 +12,10 @@ import ( "encoding/hex" "flag" "fmt" - "github.com/romana/rlog" "log" "os" "os/signal" + "plugin" "runtime" "runtime/pprof" "strconv" @@ -23,6 +23,8 @@ import ( "sync" "time" + "github.com/romana/rlog" + "github.com/google/gopacket" "github.com/google/gopacket/examples/util" "github.com/google/gopacket/ip4defrag" @@ -88,7 +90,6 @@ var dumpToHar = flag.Bool("hardump", false, "Dump traffic to har files") var HarOutputDir = flag.String("hardir", "", "Directory in which to store output har files") var harEntriesPerFile = flag.Int("harentriesperfile", 200, "Number of max number of har entries to store in each file") -var reqResMatcher = createResponseRequestMatcher() // global var statsTracker = StatsTracker{} // global @@ -121,6 +122,17 @@ var nErrors uint var ownIps []string // global var hostMode bool // global +type Extension struct { + Name string + Path string + Plug *plugin.Plugin +} + +var extensions []Extension // global + +type OutputChannelItem struct { +} + /* minOutputLevel: Error will be printed only if outputLevel is above this value * t: key for errorsMap (counting errors) * s, a: arguments log.Printf @@ -186,19 +198,19 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo { func StartPassiveTapper(opts *TapOpts) (<-chan *OutputChannelItem, <-chan *OutboundLink) { hostMode = opts.HostMode - var harWriter *HarWriter - if *dumpToHar { - harWriter = NewHarWriter(*HarOutputDir, *harEntriesPerFile) - } - outboundLinkWriter := NewOutboundLinkWriter() + // var harWriter *HarWriter + // if *dumpToHar { + // harWriter = NewHarWriter(*HarOutputDir, *harEntriesPerFile) + // } + // outboundLinkWriter := NewOutboundLinkWriter() - go startPassiveTapper(harWriter, outboundLinkWriter) + // go startPassiveTapper(harWriter, outboundLinkWriter) - if harWriter != nil { - return harWriter.OutChan, outboundLinkWriter.OutChan - } + // if harWriter != nil { + // return harWriter.OutChan, outboundLinkWriter.OutChan + // } - return nil, outboundLinkWriter.OutChan + return nil, nil } func startMemoryProfiler() { @@ -232,7 +244,18 @@ func startMemoryProfiler() { }() } -func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWriter) { +func loadExtensions() { + extension := &Extension{ + Name: "http", + Path: "./extensions/http.so", + } + plug, _ := plugin.Open(extension.Path) + extension.Plug = plug +} + +func startPassiveTapper(outboundLinkWriter *OutboundLinkWriter) { + loadExtensions() + log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile) defer util.Run()() @@ -263,19 +286,19 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr appPorts = parseAppPorts(appPortsStr) } SetFilterPorts(appPorts) - envVal := os.Getenv(maxHTTP2DataLenEnvVar) - if envVal == "" { - rlog.Infof("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault) - maxHTTP2DataLen = maxHTTP2DataLenDefault - } else { - if convertedInt, err := strconv.Atoi(envVal); err != nil { - rlog.Infof("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault) - maxHTTP2DataLen = maxHTTP2DataLenDefault - } else { - rlog.Infof("Received HTTP2_DATA_SIZE_LIMIT env var: %v", maxHTTP2DataLenDefault) - maxHTTP2DataLen = convertedInt - } - } + // envVal := os.Getenv(maxHTTP2DataLenEnvVar) + // if envVal == "" { + // rlog.Infof("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault) + // maxHTTP2DataLen = maxHTTP2DataLenDefault + // } else { + // if convertedInt, err := strconv.Atoi(envVal); err != nil { + // rlog.Infof("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault) + // maxHTTP2DataLen = maxHTTP2DataLenDefault + // } else { + // rlog.Infof("Received HTTP2_DATA_SIZE_LIMIT env var: %v", maxHTTP2DataLenDefault) + // maxHTTP2DataLen = convertedInt + // } + // } log.Printf("App Ports: %v", gSettings.filterPorts) @@ -321,10 +344,10 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr } } - if *dumpToHar { - harWriter.Start() - defer harWriter.Stop() - } + // if *dumpToHar { + // harWriter.Start() + // defer harWriter.Stop() + // } defer outboundLinkWriter.Stop() var dec gopacket.Decoder @@ -344,8 +367,8 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr defragger := ip4defrag.NewIPv4Defragmenter() streamFactory := &tcpStreamFactory{ - doHTTP: !*nohttp, - harWriter: harWriter, + doHTTP: !*nohttp, + // harWriter: harWriter, outbountLinkWriter: outboundLinkWriter, } streamPool := reassembly.NewStreamPool(streamFactory) @@ -366,7 +389,6 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr cleaner := Cleaner{ assembler: assembler, assemblerMutex: &assemblerMutex, - matcher: &reqResMatcher, cleanPeriod: cleanPeriod, connectionTimeout: staleConnectionTimeout, } @@ -397,10 +419,9 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr memStats := runtime.MemStats{} runtime.ReadMemStats(&memStats) log.Printf( - "mem: %d, goroutines: %d, unmatched messages: %d", + "mem: %d, goroutines: %d", memStats.HeapAlloc, runtime.NumGoroutine(), - reqResMatcher.openMessagesMap.Count(), ) // Since the last print diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index e2ac51c55..f8bf3461c 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -24,8 +24,6 @@ type tcpStream struct { isDNS bool isHTTP bool reversed bool - client httpReader - server httpReader urls []string ident string sync.Mutex @@ -148,21 +146,21 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } // This is where we pass the reassembled information onwards // This channel is read by an httpReader object - if dir == reassembly.TCPDirClientToServer && !t.reversed { - t.client.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} - } else { - t.server.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} - } + // if dir == reassembly.TCPDirClientToServer && !t.reversed { + // t.client.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + // } else { + // t.server.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp} + // } } } } func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { Trace("%s: Connection closed", t.ident) - if t.isHTTP { - close(t.client.msgQueue) - close(t.server.msgQueue) - } + // if t.isHTTP { + // close(t.client.msgQueue) + // close(t.server.msgQueue) + // } // do not remove the connection to allow last ACK return false } diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index c03600584..d4567e9f7 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -2,9 +2,10 @@ package tap import ( "fmt" - "github.com/romana/rlog" "sync" + "github.com/romana/rlog" + "github.com/google/gopacket" "github.com/google/gopacket/layers" // pulls in all layers decoders "github.com/google/gopacket/reassembly" @@ -18,7 +19,6 @@ import ( type tcpStreamFactory struct { wg sync.WaitGroup doHTTP bool - harWriter *HarWriter outbountLinkWriter *OutboundLinkWriter } @@ -48,41 +48,41 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T optchecker: reassembly.NewTCPOptionCheck(), } if stream.isHTTP { - stream.client = httpReader{ - msgQueue: make(chan httpReaderDataMsg), - ident: fmt.Sprintf("%s %s", net, transport), - tcpID: tcpID{ - srcIP: net.Src().String(), - dstIP: net.Dst().String(), - srcPort: transport.Src().String(), - dstPort: transport.Dst().String(), - }, - hexdump: *hexdump, - parent: stream, - isClient: true, - isOutgoing: props.isOutgoing, - harWriter: factory.harWriter, - outboundLinkWriter: factory.outbountLinkWriter, - } - stream.server = httpReader{ - msgQueue: make(chan httpReaderDataMsg), - ident: fmt.Sprintf("%s %s", net.Reverse(), transport.Reverse()), - tcpID: tcpID{ - srcIP: net.Dst().String(), - dstIP: net.Src().String(), - srcPort: transport.Dst().String(), - dstPort: transport.Src().String(), - }, - hexdump: *hexdump, - parent: stream, - isOutgoing: props.isOutgoing, - harWriter: factory.harWriter, - outboundLinkWriter: factory.outbountLinkWriter, - } - factory.wg.Add(2) - // Start reading from channels stream.client.bytes and stream.server.bytes - go stream.client.run(&factory.wg) - go stream.server.run(&factory.wg) + // stream.client = httpReader{ + // msgQueue: make(chan httpReaderDataMsg), + // ident: fmt.Sprintf("%s %s", net, transport), + // tcpID: tcpID{ + // srcIP: net.Src().String(), + // dstIP: net.Dst().String(), + // srcPort: transport.Src().String(), + // dstPort: transport.Dst().String(), + // }, + // hexdump: *hexdump, + // parent: stream, + // isClient: true, + // isOutgoing: props.isOutgoing, + // harWriter: factory.harWriter, + // outboundLinkWriter: factory.outbountLinkWriter, + // } + // stream.server = httpReader{ + // msgQueue: make(chan httpReaderDataMsg), + // ident: fmt.Sprintf("%s %s", net.Reverse(), transport.Reverse()), + // tcpID: tcpID{ + // srcIP: net.Dst().String(), + // dstIP: net.Src().String(), + // srcPort: transport.Dst().String(), + // dstPort: transport.Src().String(), + // }, + // hexdump: *hexdump, + // parent: stream, + // isOutgoing: props.isOutgoing, + // harWriter: factory.harWriter, + // outboundLinkWriter: factory.outbountLinkWriter, + // } + // factory.wg.Add(2) + // // Start reading from channels stream.client.bytes and stream.server.bytes + // go stream.client.run(&factory.wg) + // go stream.server.run(&factory.wg) } return stream }