Separate HTTP related code into extensions/http as a Go plugin

This commit is contained in:
M. Mert Yildiran 2021-08-17 02:33:56 +03:00
parent 7f2021c312
commit 32a2b8b823
No known key found for this signature in database
GPG Key ID: D42ADB236521BF7A
16 changed files with 350 additions and 426 deletions

1
.gitignore vendored
View File

@ -19,3 +19,4 @@ build
# Mac OS
.DS_Store
.vscode/

View File

@ -65,3 +65,6 @@ clean-cli: ## Clean CLI.
clean-docker:
@(echo "DOCKER cleanup - NOT IMPLEMENTED YET " )
http:
cd extensions/http && \
go build -buildmode=plugin -o ../http.so .

View File

@ -4,21 +4,20 @@ import (
"encoding/json"
"flag"
"fmt"
"mizuserver/pkg/api"
"mizuserver/pkg/models"
"mizuserver/pkg/routes"
"mizuserver/pkg/utils"
"net/http"
"os"
"os/signal"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/romana/rlog"
"github.com/up9inc/mizu/shared"
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/api"
"mizuserver/pkg/models"
"mizuserver/pkg/routes"
"mizuserver/pkg/sensitiveDataFiltering"
"mizuserver/pkg/utils"
"net/http"
"os"
"os/signal"
"strings"
)
var tapperMode = flag.Bool("tap", false, "Run in tapper mode without API")
@ -153,33 +152,33 @@ var userAgentsToFilter = []string{"kube-probe", "prometheus"}
func filterHarItems(inChannel <-chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
// if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
// continue
// }
// TODO: move this to tappers https://up9.atlassian.net/browse/TRA-3441
if filterOptions.HideHealthChecks && isHealthCheckByUserAgent(message) {
continue
}
if !filterOptions.DisableRedaction {
sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
}
// if !filterOptions.DisableRedaction {
// sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
// }
outChannel <- message
}
}
func isHealthCheckByUserAgent(message *tap.OutputChannelItem) bool {
for _, header := range message.HarEntry.Request.Headers {
if strings.ToLower(header.Name) == "user-agent" {
for _, userAgent := range userAgentsToFilter {
if strings.Contains(strings.ToLower(header.Value), userAgent) {
return true
}
}
return false
}
}
// for _, header := range message.HarEntry.Request.Headers {
// if strings.ToLower(header.Name) == "user-agent" {
// for _, userAgent := range userAgentsToFilter {
// if strings.Contains(strings.ToLower(header.Value), userAgent) {
// return true
// }
// }
// return false
// }
// }
return false
}

View File

@ -16,9 +16,7 @@ import (
"github.com/google/martian/har"
"github.com/romana/rlog"
"github.com/up9inc/mizu/tap"
"go.mongodb.org/mongo-driver/bson/primitive"
"mizuserver/pkg/database"
"mizuserver/pkg/models"
"mizuserver/pkg/resolver"
"mizuserver/pkg/utils"
@ -86,17 +84,17 @@ func startReadingFiles(workingDir string) {
decErr := json.NewDecoder(bufio.NewReader(file)).Decode(&inputHar)
utils.CheckErr(decErr)
for _, entry := range inputHar.Log.Entries {
time.Sleep(time.Millisecond * 250)
connectionInfo := &tap.ConnectionInfo{
ClientIP: fileInfo.Name(),
ClientPort: "",
ServerIP: "",
ServerPort: "",
IsOutgoing: false,
}
saveHarToDb(entry, connectionInfo)
}
// for _, entry := range inputHar.Log.Entries {
// time.Sleep(time.Millisecond * 250)
// // connectionInfo := &tap.ConnectionInfo{
// // ClientIP: fileInfo.Name(),
// // ClientPort: "",
// // ServerIP: "",
// // ServerPort: "",
// // IsOutgoing: false,
// // }
// // saveHarToDb(entry, connectionInfo)
// }
rmErr := os.Remove(inputFilePath)
utils.CheckErr(rmErr)
}
@ -107,9 +105,9 @@ func startReadingChannel(outputItems <-chan *tap.OutputChannelItem) {
panic("Channel of captured messages is nil")
}
for item := range outputItems {
saveHarToDb(item.HarEntry, item.ConnectionInfo)
}
// for item := range outputItems {
// saveHarToDb(item.HarEntry, item.ConnectionInfo)
// }
}
func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
@ -119,59 +117,59 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
}
}
func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
entryBytes, _ := json.Marshal(entry)
serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL)
entryId := primitive.NewObjectID().Hex()
var (
resolvedSource string
resolvedDestination string
)
if k8sResolver != nil {
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
rlog.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
}
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
rlog.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}
}
}
// func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
// entryBytes, _ := json.Marshal(entry)
// serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL)
// entryId := primitive.NewObjectID().Hex()
// var (
// resolvedSource string
// resolvedDestination string
// )
// if k8sResolver != nil {
// unresolvedSource := connectionInfo.ClientIP
// resolvedSource = k8sResolver.Resolve(unresolvedSource)
// if resolvedSource == "" {
// rlog.Debugf("Cannot find resolved name to source: %s\n", unresolvedSource)
// if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
// return
// }
// }
// unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
// resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
// if resolvedDestination == "" {
// rlog.Debugf("Cannot find resolved name to dest: %s\n", unresolvedDestination)
// if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
// return
// }
// }
// }
mizuEntry := models.MizuEntry{
EntryId: entryId,
Entry: string(entryBytes), // simple way to store it and not convert to bytes
Service: serviceName,
Url: entry.Request.URL,
Path: urlPath,
Method: entry.Request.Method,
Status: entry.Response.Status,
RequestSenderIp: connectionInfo.ClientIP,
Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond),
ResolvedSource: resolvedSource,
ResolvedDestination: resolvedDestination,
IsOutgoing: connectionInfo.IsOutgoing,
}
mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry)
database.CreateEntry(&mizuEntry)
// mizuEntry := models.MizuEntry{
// EntryId: entryId,
// Entry: string(entryBytes), // simple way to store it and not convert to bytes
// Service: serviceName,
// Url: entry.Request.URL,
// Path: urlPath,
// Method: entry.Request.Method,
// Status: entry.Response.Status,
// RequestSenderIp: connectionInfo.ClientIP,
// Timestamp: entry.StartedDateTime.UnixNano() / int64(time.Millisecond),
// ResolvedSource: resolvedSource,
// ResolvedDestination: resolvedDestination,
// IsOutgoing: connectionInfo.IsOutgoing,
// }
// mizuEntry.EstimatedSizeBytes = getEstimatedEntrySizeBytes(mizuEntry)
// database.CreateEntry(&mizuEntry)
baseEntry := models.BaseEntryDetails{}
if err := models.GetEntry(&mizuEntry, &baseEntry); err != nil {
return
}
baseEntry.Rules = models.RunValidationRulesState(*entry, serviceName)
baseEntry.Latency = entry.Timings.Receive
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry)
BroadcastToBrowserClients(baseEntryBytes)
}
// baseEntry := models.BaseEntryDetails{}
// if err := models.GetEntry(&mizuEntry, &baseEntry); err != nil {
// return
// }
// baseEntry.Rules = models.RunValidationRulesState(*entry, serviceName)
// baseEntry.Latency = entry.Timings.Receive
// baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry)
// BroadcastToBrowserClients(baseEntryBytes)
// }
func getServiceNameFromUrl(inputUrl string) (string, string) {
parsed, err := url.Parse(inputUrl)

View File

@ -1,198 +0,0 @@
package sensitiveDataFiltering
import (
"encoding/json"
"encoding/xml"
"errors"
"fmt"
"github.com/up9inc/mizu/tap"
"net/url"
"strings"
"github.com/beevik/etree"
"github.com/google/martian/har"
"github.com/up9inc/mizu/shared"
)
func FilterSensitiveInfoFromHarRequest(harOutputItem *tap.OutputChannelItem, options *shared.TrafficFilteringOptions) {
harOutputItem.HarEntry.Request.Headers = filterHarHeaders(harOutputItem.HarEntry.Request.Headers)
harOutputItem.HarEntry.Response.Headers = filterHarHeaders(harOutputItem.HarEntry.Response.Headers)
harOutputItem.HarEntry.Request.Cookies = make([]har.Cookie, 0, 0)
harOutputItem.HarEntry.Response.Cookies = make([]har.Cookie, 0, 0)
harOutputItem.HarEntry.Request.URL = filterUrl(harOutputItem.HarEntry.Request.URL)
for i, queryString := range harOutputItem.HarEntry.Request.QueryString {
if isFieldNameSensitive(queryString.Name) {
harOutputItem.HarEntry.Request.QueryString[i].Value = maskedFieldPlaceholderValue
}
}
if harOutputItem.HarEntry.Request.PostData != nil {
requestContentType := getContentTypeHeaderValue(harOutputItem.HarEntry.Request.Headers)
filteredRequestBody, err := filterHttpBody([]byte(harOutputItem.HarEntry.Request.PostData.Text), requestContentType, options)
if err == nil {
harOutputItem.HarEntry.Request.PostData.Text = string(filteredRequestBody)
}
}
if harOutputItem.HarEntry.Response.Content != nil {
responseContentType := getContentTypeHeaderValue(harOutputItem.HarEntry.Response.Headers)
filteredResponseBody, err := filterHttpBody(harOutputItem.HarEntry.Response.Content.Text, responseContentType, options)
if err == nil {
harOutputItem.HarEntry.Response.Content.Text = filteredResponseBody
}
}
}
func filterHarHeaders(headers []har.Header) []har.Header {
newHeaders := make([]har.Header, 0)
for i, header := range headers {
if strings.ToLower(header.Name) == "cookie" {
continue
} else if isFieldNameSensitive(header.Name) {
newHeaders = append(newHeaders, har.Header{Name: header.Name, Value: maskedFieldPlaceholderValue})
headers[i].Value = maskedFieldPlaceholderValue
} else {
newHeaders = append(newHeaders, header)
}
}
return newHeaders
}
func getContentTypeHeaderValue(headers []har.Header) string {
for _, header := range headers {
if strings.ToLower(header.Name) == "content-type" {
return header.Value
}
}
return ""
}
func isFieldNameSensitive(fieldName string) bool {
name := strings.ToLower(fieldName)
name = strings.ReplaceAll(name, "_", "")
name = strings.ReplaceAll(name, "-", "")
name = strings.ReplaceAll(name, " ", "")
for _, sensitiveField := range personallyIdentifiableDataFields {
if strings.Contains(name, sensitiveField) {
return true
}
}
return false
}
func filterHttpBody(bytes []byte, contentType string, options *shared.TrafficFilteringOptions) ([]byte, error) {
mimeType := strings.Split(contentType, ";")[0]
switch strings.ToLower(mimeType) {
case "application/json":
return filterJsonBody(bytes)
case "text/html":
fallthrough
case "application/xhtml+xml":
fallthrough
case "text/xml":
fallthrough
case "application/xml":
return filterXmlEtree(bytes)
case "text/plain":
if options != nil && options.PlainTextMaskingRegexes != nil {
return filterPlainText(bytes, options), nil
}
}
return bytes, nil
}
func filterPlainText(bytes []byte, options *shared.TrafficFilteringOptions) []byte {
for _, regex := range options.PlainTextMaskingRegexes {
bytes = regex.ReplaceAll(bytes, []byte(maskedFieldPlaceholderValue))
}
return bytes
}
func filterXmlEtree(bytes []byte) ([]byte, error) {
if !IsValidXML(bytes) {
return nil, errors.New("Invalid XML")
}
xmlDoc := etree.NewDocument()
err := xmlDoc.ReadFromBytes(bytes)
if err != nil {
return nil, err
} else {
filterXmlElement(xmlDoc.Root())
}
return xmlDoc.WriteToBytes()
}
func IsValidXML(data []byte) bool {
return xml.Unmarshal(data, new(interface{})) == nil
}
func filterXmlElement(element *etree.Element) {
for i, attribute := range element.Attr {
if isFieldNameSensitive(attribute.Key) {
element.Attr[i].Value = maskedFieldPlaceholderValue
}
}
if element.ChildElements() == nil || len(element.ChildElements()) == 0 {
if isFieldNameSensitive(element.Tag) {
element.SetText(maskedFieldPlaceholderValue)
}
} else {
for _, element := range element.ChildElements() {
filterXmlElement(element)
}
}
}
func filterJsonBody(bytes []byte) ([]byte, error) {
var bodyJsonMap map[string] interface{}
err := json.Unmarshal(bytes ,&bodyJsonMap)
if err != nil {
return nil, err
}
filterJsonMap(bodyJsonMap)
return json.Marshal(bodyJsonMap)
}
func filterJsonMap(jsonMap map[string] interface{}) {
for key, value := range jsonMap {
if value == nil {
return
}
nestedMap, isNested := value.(map[string] interface{})
if isNested {
filterJsonMap(nestedMap)
} else {
if isFieldNameSensitive(key) {
jsonMap[key] = maskedFieldPlaceholderValue
}
}
}
}
// receives string representing url, returns string url without sensitive query param values (http://service/api?userId=bob&password=123&type=login -> http://service/api?userId=[REDACTED]&password=[REDACTED]&type=login)
func filterUrl(originalUrl string) string {
parsedUrl, err := url.Parse(originalUrl)
if err != nil {
return fmt.Sprintf("http://%s", maskedFieldPlaceholderValue)
} else {
if len(parsedUrl.RawQuery) > 0 {
newQueryArgs := make([]string, 0)
for urlQueryParamName, urlQueryParamValues := range parsedUrl.Query() {
newValues := urlQueryParamValues
if isFieldNameSensitive(urlQueryParamName) {
newValues = []string {maskedFieldPlaceholderValue}
}
for _, paramValue := range newValues {
newQueryArgs = append(newQueryArgs, fmt.Sprintf("%s=%s", urlQueryParamName, paramValue))
}
}
parsedUrl.RawQuery = strings.Join(newQueryArgs, "&")
}
return parsedUrl.String()
}
}

12
extensions/http/go.mod Normal file
View File

@ -0,0 +1,12 @@
module github.com/up9inc/mizu/tap
go 1.16
require (
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4
github.com/google/martian v2.1.0+incompatible
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d
golang.org/x/text v0.3.7 // indirect
)

17
extensions/http/go.sum Normal file
View File

@ -0,0 +1,17 @@
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 h1:NJOOlc6ZJjix0A1rAU+nxruZtR8KboG1848yqpIUo4M=
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4/go.mod h1:DQPxZS994Ld1Y8uwnJT+dRL04XPD0cElP/pHH/zEBHM=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc h1:Ak86L+yDSOzKFa7WM5bf5itSOo1e3Xh8bm5YCMUXIjQ=
github.com/orcaman/concurrent-map v0.0.0-20210501183033-44dafcb38ecc/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI=
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7 h1:jkvpcEatpwuMF5O5LVxTnehj6YZ/aEZN4NWD/Xml4pI=
github.com/romana/rlog v0.0.0-20171115192701-f018bc92e7d7/go.mod h1:KTrHyWpO1sevuXPZwyeZc72ddWRFqNSKDFl7uVWKpg0=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d h1:LO7XpTYMwTqxjLcGWPijK3vRXg1aWdlNOVOHRq45d7c=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@ -1,4 +1,4 @@
package tap
package main
import (
"bufio"
@ -17,17 +17,20 @@ import (
)
const frameHeaderLen = 9
var clientPreface = []byte(http2.ClientPreface)
const initialHeaderTableSize = 4096
const protoHTTP2 = "HTTP/2.0"
const protoMajorHTTP2 = 2
const protoMinorHTTP2 = 0
const maxHTTP2DataLenDefault = 1 * 1024 * 1024 // 1MB
var maxHTTP2DataLen int = maxHTTP2DataLenDefault // value initialized during init
type messageFragment struct {
headers []hpack.HeaderField
data []byte
data []byte
}
type fragmentsByStream map[uint32]*messageFragment
@ -46,7 +49,7 @@ func (fbs *fragmentsByStream) appendFrame(streamID uint32, frame http2.Frame) {
if existingFragment, ok := (*fbs)[streamID]; ok {
existingDataLen := len(existingFragment.data)
// Never save more than maxHTTP2DataLen bytes
numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen - existingDataLen), float64(newDataLen)))
numBytesToAppend := int(math.Min(float64(maxHTTP2DataLen-existingDataLen), float64(newDataLen)))
existingFragment.data = append(existingFragment.data, frame.Data()[:numBytesToAppend]...)
} else {
@ -75,13 +78,13 @@ func createGrpcAssembler(b *bufio.Reader) GrpcAssembler {
framer.ReadMetaHeaders = hpack.NewDecoder(initialHeaderTableSize, nil)
return GrpcAssembler{
fragmentsByStream: make(fragmentsByStream),
framer: framer,
framer: framer,
}
}
type GrpcAssembler struct {
fragmentsByStream fragmentsByStream
framer *http2.Framer
framer *http2.Framer
}
func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
@ -118,22 +121,22 @@ func (ga *GrpcAssembler) readMessage() (uint32, interface{}, error) {
var messageHTTP1 interface{}
if _, ok := headersHTTP1[":method"]; ok {
messageHTTP1 = http.Request{
URL: &url.URL{},
Method: "POST",
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
ProtoMinor: protoMinorHTTP2,
Body: io.NopCloser(strings.NewReader(dataString)),
URL: &url.URL{},
Method: "POST",
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
ProtoMinor: protoMinorHTTP2,
Body: io.NopCloser(strings.NewReader(dataString)),
ContentLength: int64(len(dataString)),
}
} else if _, ok := headersHTTP1[":status"]; ok {
messageHTTP1 = http.Response{
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
ProtoMinor: protoMinorHTTP2,
Body: io.NopCloser(strings.NewReader(dataString)),
Header: headersHTTP1,
Proto: protoHTTP2,
ProtoMajor: protoMajorHTTP2,
ProtoMinor: protoMinorHTTP2,
Body: io.NopCloser(strings.NewReader(dataString)),
ContentLength: int64(len(dataString)),
}
} else {
@ -225,7 +228,7 @@ func checkClientPreface(b *bufio.Reader) (bool, error) {
func discardClientPreface(b *bufio.Reader) error {
if isClientPrefacePresent, err := checkClientPreface(b); err != nil {
return err
} else if !isClientPrefacePresent{
} else if !isClientPrefacePresent {
return errors.New("discardClientPreface: does not begin with client preface")
}

View File

@ -1,4 +1,4 @@
package tap
package main
import (
"bytes"
@ -12,11 +12,45 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/google/martian/har"
"github.com/romana/rlog"
)
var outputLevel int
var errorsMap map[string]uint
var errorsMapMutex sync.Mutex
var nErrors uint
var ownIps []string // global
var hostMode bool // global
func logError(minOutputLevel int, t string, s string, a ...interface{}) {
errorsMapMutex.Lock()
nErrors++
nb, _ := errorsMap[t]
errorsMap[t] = nb + 1
errorsMapMutex.Unlock()
if outputLevel >= minOutputLevel {
formatStr := fmt.Sprintf("%s: %s", t, s)
rlog.Errorf(formatStr, a...)
}
}
func Error(t string, s string, a ...interface{}) {
logError(0, t, s, a...)
}
func SilentError(t string, s string, a ...interface{}) {
logError(2, t, s, a...)
}
func Debug(s string, a ...interface{}) {
rlog.Debugf(s, a...)
}
func Trace(s string, a ...interface{}) {
rlog.Tracef(1, s, a...)
}
const readPermission = 0644
const harFilenameSuffix = ".har"
const tempFilenameSuffix = ".har.tmp"
@ -59,7 +93,7 @@ func NewEntry(request *http.Request, requestTime time.Time, response *http.Respo
// instead of harRequest.PostData.Text (as the HAR spec requires it).
// Mizu currently only looks at PostData.Text. Therefore, instead of letting martian/har set the content of
// PostData, always copy the request body to PostData.Text.
if (request.ContentLength > 0) {
if request.ContentLength > 0 {
reqBody, err := ioutil.ReadAll(request.Body)
if err != nil {
SilentError("read-request-body", "Failed converting request to HAR %s (%v,%+v)", err, err, err)

View File

@ -1,4 +1,4 @@
package tap
package main
import (
"fmt"
@ -6,25 +6,25 @@ import (
"strings"
"time"
"github.com/orcaman/concurrent-map"
cmap "github.com/orcaman/concurrent-map"
)
var reqResMatcher = createResponseRequestMatcher() // global
type requestResponsePair struct {
Request httpMessage `json:"request"`
Response httpMessage `json:"response"`
}
type httpMessage struct {
isRequest bool
captureTime time.Time
orig interface{}
isRequest bool
captureTime time.Time
orig interface{}
}
// Key is {client_addr}:{client_port}->{dest_addr}:{dest_port}
type requestResponseMatcher struct {
openMessagesMap cmap.ConcurrentMap
}
func createResponseRequestMatcher() requestResponseMatcher {
@ -37,9 +37,9 @@ func (matcher *requestResponseMatcher) registerRequest(ident string, request *ht
key := genKey(split)
requestHTTPMessage := httpMessage{
isRequest: true,
captureTime: captureTime,
orig: request,
isRequest: true,
captureTime: captureTime,
orig: request,
}
if response, found := matcher.openMessagesMap.Pop(key); found {
@ -113,7 +113,7 @@ func (matcher *requestResponseMatcher) deleteOlderThan(t time.Time) int {
}
numDeleted := len(keysToPop)
for _, key := range keysToPop {
_, _ = matcher.openMessagesMap.Pop(key)
}

View File

@ -0,0 +1,39 @@
package main
type OutboundLinkProtocol string
const (
TLSProtocol OutboundLinkProtocol = "tls"
)
type OutboundLink struct {
Src string
DstIP string
DstPort int
SuggestedResolvedName string
SuggestedProtocol OutboundLinkProtocol
}
func NewOutboundLinkWriter() *OutboundLinkWriter {
return &OutboundLinkWriter{
OutChan: make(chan *OutboundLink),
}
}
type OutboundLinkWriter struct {
OutChan chan *OutboundLink
}
func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int, SuggestedResolvedName string, SuggestedProtocol OutboundLinkProtocol) {
olw.OutChan <- &OutboundLink{
Src: src,
DstIP: DstIP,
DstPort: DstPort,
SuggestedResolvedName: SuggestedResolvedName,
SuggestedProtocol: SuggestedProtocol,
}
}
func (olw *OutboundLinkWriter) Stop() {
close(olw.OutChan)
}

View File

@ -1,17 +1,18 @@
package tap
package main
import (
"bufio"
"bytes"
"encoding/hex"
"fmt"
"github.com/bradleyfalzon/tlsx"
"io"
"io/ioutil"
"net/http"
"strconv"
"sync"
"time"
"github.com/bradleyfalzon/tlsx"
)
const checkTLSPacketAmount = 100
@ -41,7 +42,7 @@ func (tid *tcpID) String() string {
}
/* httpReader gets reads from a channel of bytes of tcp payload, and parses it into HTTP/1 requests and responses.
* The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection.
* The payload is written to the channel by a tap.TcpStream object that is dedicated to one tcp connection.
* An httpReader object is unidirectional: it parses either a client stream or a server stream.
* Implements io.Reader interface (Read)
*/
@ -55,7 +56,6 @@ type httpReader struct {
data []byte
captureTime time.Time
hexdump bool
parent *tcpStream
grpcAssembler GrpcAssembler
messageCount uint
harWriter *HarWriter
@ -176,7 +176,7 @@ func (h *httpReader) handleHTTP2Stream() error {
}
if reqResPair != nil {
statsTracker.incMatchedMessages()
// statsTracker.incMatchedMessages()
if h.harWriter != nil {
h.harWriter.WritePair(
@ -215,7 +215,7 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error {
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.srcIP, h.tcpID.dstIP, h.tcpID.srcPort, h.tcpID.dstPort, h.messageCount)
reqResPair := reqResMatcher.registerRequest(ident, req, h.captureTime)
if reqResPair != nil {
statsTracker.incMatchedMessages()
// statsTracker.incMatchedMessages()
if h.harWriter != nil {
h.harWriter.WritePair(
@ -234,9 +234,9 @@ func (h *httpReader) handleHTTP1ClientStream(b *bufio.Reader) error {
}
}
h.parent.Lock()
h.parent.urls = append(h.parent.urls, req.URL.String())
h.parent.Unlock()
// h.parent.Lock()
// h.parent.urls = append(h.parent.urls, req.URL.String())
// h.parent.Unlock()
return nil
}
@ -245,13 +245,13 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
res, err := http.ReadResponse(b, nil)
h.messageCount++
var req string
h.parent.Lock()
if len(h.parent.urls) == 0 {
req = fmt.Sprintf("<no-request-seen>")
} else {
req, h.parent.urls = h.parent.urls[0], h.parent.urls[1:]
}
h.parent.Unlock()
// h.parent.Lock()
// if len(h.parent.urls) == 0 {
// req = fmt.Sprintf("<no-request-seen>")
// } else {
// req, h.parent.urls = h.parent.urls[0], h.parent.urls[1:]
// }
// h.parent.Unlock()
if err != nil {
return err
}
@ -281,7 +281,7 @@ func (h *httpReader) handleHTTP1ServerStream(b *bufio.Reader) error {
ident := fmt.Sprintf("%s->%s %s->%s %d", h.tcpID.dstIP, h.tcpID.srcIP, h.tcpID.dstPort, h.tcpID.srcPort, h.messageCount)
reqResPair := reqResMatcher.registerResponse(ident, res, h.captureTime)
if reqResPair != nil {
statsTracker.incMatchedMessages()
// statsTracker.incMatchedMessages()
if h.harWriter != nil {
h.harWriter.WritePair(

View File

@ -1,10 +1,11 @@
package tap
import (
"github.com/romana/rlog"
"sync"
"time"
"github.com/romana/rlog"
"github.com/google/gopacket/reassembly"
)
@ -17,7 +18,6 @@ type CleanerStats struct {
type Cleaner struct {
assembler *reassembly.Assembler
assemblerMutex *sync.Mutex
matcher *requestResponseMatcher
cleanPeriod time.Duration
connectionTimeout time.Duration
stats CleanerStats
@ -32,13 +32,10 @@ func (cl *Cleaner) clean() {
flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.assemblerMutex.Unlock()
deleted := cl.matcher.deleteOlderThan(startCleanTime.Add(-cl.connectionTimeout))
cl.statsMutex.Lock()
rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump())
cl.stats.flushed += flushed
cl.stats.closed += closed
cl.stats.deleted += deleted
cl.statsMutex.Unlock()
}

View File

@ -12,10 +12,10 @@ import (
"encoding/hex"
"flag"
"fmt"
"github.com/romana/rlog"
"log"
"os"
"os/signal"
"plugin"
"runtime"
"runtime/pprof"
"strconv"
@ -23,6 +23,8 @@ import (
"sync"
"time"
"github.com/romana/rlog"
"github.com/google/gopacket"
"github.com/google/gopacket/examples/util"
"github.com/google/gopacket/ip4defrag"
@ -88,7 +90,6 @@ var dumpToHar = flag.Bool("hardump", false, "Dump traffic to har files")
var HarOutputDir = flag.String("hardir", "", "Directory in which to store output har files")
var harEntriesPerFile = flag.Int("harentriesperfile", 200, "Number of max number of har entries to store in each file")
var reqResMatcher = createResponseRequestMatcher() // global
var statsTracker = StatsTracker{}
// global
@ -121,6 +122,17 @@ var nErrors uint
var ownIps []string // global
var hostMode bool // global
type Extension struct {
Name string
Path string
Plug *plugin.Plugin
}
var extensions []Extension // global
type OutputChannelItem struct {
}
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments log.Printf
@ -186,19 +198,19 @@ func (c *Context) GetCaptureInfo() gopacket.CaptureInfo {
func StartPassiveTapper(opts *TapOpts) (<-chan *OutputChannelItem, <-chan *OutboundLink) {
hostMode = opts.HostMode
var harWriter *HarWriter
if *dumpToHar {
harWriter = NewHarWriter(*HarOutputDir, *harEntriesPerFile)
}
outboundLinkWriter := NewOutboundLinkWriter()
// var harWriter *HarWriter
// if *dumpToHar {
// harWriter = NewHarWriter(*HarOutputDir, *harEntriesPerFile)
// }
// outboundLinkWriter := NewOutboundLinkWriter()
go startPassiveTapper(harWriter, outboundLinkWriter)
// go startPassiveTapper(harWriter, outboundLinkWriter)
if harWriter != nil {
return harWriter.OutChan, outboundLinkWriter.OutChan
}
// if harWriter != nil {
// return harWriter.OutChan, outboundLinkWriter.OutChan
// }
return nil, outboundLinkWriter.OutChan
return nil, nil
}
func startMemoryProfiler() {
@ -232,7 +244,18 @@ func startMemoryProfiler() {
}()
}
func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWriter) {
func loadExtensions() {
extension := &Extension{
Name: "http",
Path: "./extensions/http.so",
}
plug, _ := plugin.Open(extension.Path)
extension.Plug = plug
}
func startPassiveTapper(outboundLinkWriter *OutboundLinkWriter) {
loadExtensions()
log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile)
defer util.Run()()
@ -263,19 +286,19 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
appPorts = parseAppPorts(appPortsStr)
}
SetFilterPorts(appPorts)
envVal := os.Getenv(maxHTTP2DataLenEnvVar)
if envVal == "" {
rlog.Infof("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault)
maxHTTP2DataLen = maxHTTP2DataLenDefault
} else {
if convertedInt, err := strconv.Atoi(envVal); err != nil {
rlog.Infof("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault)
maxHTTP2DataLen = maxHTTP2DataLenDefault
} else {
rlog.Infof("Received HTTP2_DATA_SIZE_LIMIT env var: %v", maxHTTP2DataLenDefault)
maxHTTP2DataLen = convertedInt
}
}
// envVal := os.Getenv(maxHTTP2DataLenEnvVar)
// if envVal == "" {
// rlog.Infof("Received empty/no HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault)
// maxHTTP2DataLen = maxHTTP2DataLenDefault
// } else {
// if convertedInt, err := strconv.Atoi(envVal); err != nil {
// rlog.Infof("Received invalid HTTP2_DATA_SIZE_LIMIT env var! falling back to %v", maxHTTP2DataLenDefault)
// maxHTTP2DataLen = maxHTTP2DataLenDefault
// } else {
// rlog.Infof("Received HTTP2_DATA_SIZE_LIMIT env var: %v", maxHTTP2DataLenDefault)
// maxHTTP2DataLen = convertedInt
// }
// }
log.Printf("App Ports: %v", gSettings.filterPorts)
@ -321,10 +344,10 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
}
}
if *dumpToHar {
harWriter.Start()
defer harWriter.Stop()
}
// if *dumpToHar {
// harWriter.Start()
// defer harWriter.Stop()
// }
defer outboundLinkWriter.Stop()
var dec gopacket.Decoder
@ -344,8 +367,8 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
defragger := ip4defrag.NewIPv4Defragmenter()
streamFactory := &tcpStreamFactory{
doHTTP: !*nohttp,
harWriter: harWriter,
doHTTP: !*nohttp,
// harWriter: harWriter,
outbountLinkWriter: outboundLinkWriter,
}
streamPool := reassembly.NewStreamPool(streamFactory)
@ -366,7 +389,6 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
cleaner := Cleaner{
assembler: assembler,
assemblerMutex: &assemblerMutex,
matcher: &reqResMatcher,
cleanPeriod: cleanPeriod,
connectionTimeout: staleConnectionTimeout,
}
@ -397,10 +419,9 @@ func startPassiveTapper(harWriter *HarWriter, outboundLinkWriter *OutboundLinkWr
memStats := runtime.MemStats{}
runtime.ReadMemStats(&memStats)
log.Printf(
"mem: %d, goroutines: %d, unmatched messages: %d",
"mem: %d, goroutines: %d",
memStats.HeapAlloc,
runtime.NumGoroutine(),
reqResMatcher.openMessagesMap.Count(),
)
// Since the last print

View File

@ -24,8 +24,6 @@ type tcpStream struct {
isDNS bool
isHTTP bool
reversed bool
client httpReader
server httpReader
urls []string
ident string
sync.Mutex
@ -148,21 +146,21 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
}
// This is where we pass the reassembled information onwards
// This channel is read by an httpReader object
if dir == reassembly.TCPDirClientToServer && !t.reversed {
t.client.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
} else {
t.server.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
}
// if dir == reassembly.TCPDirClientToServer && !t.reversed {
// t.client.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
// } else {
// t.server.msgQueue <- httpReaderDataMsg{data, ac.GetCaptureInfo().Timestamp}
// }
}
}
}
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Trace("%s: Connection closed", t.ident)
if t.isHTTP {
close(t.client.msgQueue)
close(t.server.msgQueue)
}
// if t.isHTTP {
// close(t.client.msgQueue)
// close(t.server.msgQueue)
// }
// do not remove the connection to allow last ACK
return false
}

View File

@ -2,9 +2,10 @@ package tap
import (
"fmt"
"github.com/romana/rlog"
"sync"
"github.com/romana/rlog"
"github.com/google/gopacket"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/reassembly"
@ -18,7 +19,6 @@ import (
type tcpStreamFactory struct {
wg sync.WaitGroup
doHTTP bool
harWriter *HarWriter
outbountLinkWriter *OutboundLinkWriter
}
@ -48,41 +48,41 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
optchecker: reassembly.NewTCPOptionCheck(),
}
if stream.isHTTP {
stream.client = httpReader{
msgQueue: make(chan httpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: tcpID{
srcIP: net.Src().String(),
dstIP: net.Dst().String(),
srcPort: transport.Src().String(),
dstPort: transport.Dst().String(),
},
hexdump: *hexdump,
parent: stream,
isClient: true,
isOutgoing: props.isOutgoing,
harWriter: factory.harWriter,
outboundLinkWriter: factory.outbountLinkWriter,
}
stream.server = httpReader{
msgQueue: make(chan httpReaderDataMsg),
ident: fmt.Sprintf("%s %s", net.Reverse(), transport.Reverse()),
tcpID: tcpID{
srcIP: net.Dst().String(),
dstIP: net.Src().String(),
srcPort: transport.Dst().String(),
dstPort: transport.Src().String(),
},
hexdump: *hexdump,
parent: stream,
isOutgoing: props.isOutgoing,
harWriter: factory.harWriter,
outboundLinkWriter: factory.outbountLinkWriter,
}
factory.wg.Add(2)
// Start reading from channels stream.client.bytes and stream.server.bytes
go stream.client.run(&factory.wg)
go stream.server.run(&factory.wg)
// stream.client = httpReader{
// msgQueue: make(chan httpReaderDataMsg),
// ident: fmt.Sprintf("%s %s", net, transport),
// tcpID: tcpID{
// srcIP: net.Src().String(),
// dstIP: net.Dst().String(),
// srcPort: transport.Src().String(),
// dstPort: transport.Dst().String(),
// },
// hexdump: *hexdump,
// parent: stream,
// isClient: true,
// isOutgoing: props.isOutgoing,
// harWriter: factory.harWriter,
// outboundLinkWriter: factory.outbountLinkWriter,
// }
// stream.server = httpReader{
// msgQueue: make(chan httpReaderDataMsg),
// ident: fmt.Sprintf("%s %s", net.Reverse(), transport.Reverse()),
// tcpID: tcpID{
// srcIP: net.Dst().String(),
// dstIP: net.Src().String(),
// srcPort: transport.Dst().String(),
// dstPort: transport.Src().String(),
// },
// hexdump: *hexdump,
// parent: stream,
// isOutgoing: props.isOutgoing,
// harWriter: factory.harWriter,
// outboundLinkWriter: factory.outbountLinkWriter,
// }
// factory.wg.Add(2)
// // Start reading from channels stream.client.bytes and stream.server.bytes
// go stream.client.run(&factory.wg)
// go stream.server.run(&factory.wg)
}
return stream
}