diff --git a/agent/go.sum b/agent/go.sum index 0bb7d54a6..3f255f5a9 100644 --- a/agent/go.sum +++ b/agent/go.sum @@ -79,6 +79,8 @@ github.com/gin-contrib/static v0.0.1/go.mod h1:CSxeF+wep05e0kCOsqWdAWbSszmc31zTI github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/gin-gonic/gin v1.7.2 h1:Tg03T9yM2xa8j6I3Z3oqLaQRSmKvxPd6g/2HJ6zICFA= github.com/gin-gonic/gin v1.7.2/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= +github.com/go-errors/errors v1.4.1 h1:IvVlgbzSsaUNudsw5dcXSzF3EWyXTi5XrAdngnuhRyg= +github.com/go-errors/errors v1.4.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= diff --git a/tap/cleaner.go b/tap/cleaner.go index 813735e66..61be717f3 100644 --- a/tap/cleaner.go +++ b/tap/cleaner.go @@ -48,7 +48,7 @@ func (cl *Cleaner) start() { go func() { ticker := time.NewTicker(cl.cleanPeriod) - for true { + for { <-ticker.C cl.clean() } diff --git a/tap/errors_map.go b/tap/errors_map.go new file mode 100644 index 000000000..6c66d74b7 --- /dev/null +++ b/tap/errors_map.go @@ -0,0 +1,60 @@ +package tap + +import ( + "fmt" + "sync" + + "github.com/up9inc/mizu/shared/logger" +) + +type errorsMap struct { + errorsMap map[string]uint + outputLevel int + nErrors uint + errorsMapMutex sync.Mutex +} + +func NewErrorsMap(outputLevel int) *errorsMap { + return &errorsMap{ + errorsMap: make(map[string]uint), + outputLevel: outputLevel, + } +} + +/* minOutputLevel: Error will be printed only if outputLevel is above this value + * t: key for errorsMap (counting errors) + * s, a: arguments logger.Log.Infof + * Note: Too bad for perf that a... is evaluated + */ +func (e *errorsMap) logError(minOutputLevel int, t string, s string, a ...interface{}) { + e.errorsMapMutex.Lock() + e.nErrors++ + nb := e.errorsMap[t] + e.errorsMap[t] = nb + 1 + e.errorsMapMutex.Unlock() + + if e.outputLevel >= minOutputLevel { + formatStr := fmt.Sprintf("%s: %s", t, s) + logger.Log.Errorf(formatStr, a...) + } +} + +func (e *errorsMap) Error(t string, s string, a ...interface{}) { + e.logError(0, t, s, a...) +} + +func (e *errorsMap) SilentError(t string, s string, a ...interface{}) { + e.logError(2, t, s, a...) +} + +func (e *errorsMap) Debug(s string, a ...interface{}) { + logger.Log.Debugf(s, a...) +} + +func (e *errorsMap) getErrorsSummary() (int, string) { + e.errorsMapMutex.Lock() + errorMapLen := len(e.errorsMap) + errorsSummery := fmt.Sprintf("%v", e.errorsMap) + e.errorsMapMutex.Unlock() + return errorMapLen, errorsSummery +} diff --git a/tap/go.mod b/tap/go.mod index 8c8fce755..12b93ef10 100644 --- a/tap/go.mod +++ b/tap/go.mod @@ -4,7 +4,9 @@ go 1.16 require ( github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 + github.com/go-errors/errors v1.4.1 github.com/google/gopacket v1.1.19 + github.com/op/go-logging v0.0.0-20160315200505-970db520ece7 github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0 golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 // indirect diff --git a/tap/go.sum b/tap/go.sum index 57a018ead..965486e84 100644 --- a/tap/go.sum +++ b/tap/go.sum @@ -1,6 +1,8 @@ github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 h1:NJOOlc6ZJjix0A1rAU+nxruZtR8KboG1848yqpIUo4M= github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4/go.mod h1:DQPxZS994Ld1Y8uwnJT+dRL04XPD0cElP/pHH/zEBHM= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/go-errors/errors v1.4.1 h1:IvVlgbzSsaUNudsw5dcXSzF3EWyXTi5XrAdngnuhRyg= +github.com/go-errors/errors v1.4.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og= github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= diff --git a/tap/main/.gitignore b/tap/main/.gitignore new file mode 100644 index 000000000..88d050b19 --- /dev/null +++ b/tap/main/.gitignore @@ -0,0 +1 @@ +main \ No newline at end of file diff --git a/tap/main/main.go b/tap/main/main.go new file mode 100644 index 000000000..7e975217a --- /dev/null +++ b/tap/main/main.go @@ -0,0 +1,110 @@ +package main + +import ( + "bufio" + "fmt" + "io/ioutil" + "os" + "path" + "plugin" + "sort" + "strings" + + "github.com/go-errors/errors" + "github.com/up9inc/mizu/tap" + tapApi "github.com/up9inc/mizu/tap/api" +) + +func loadExtensions() ([]*tapApi.Extension, error) { + extensionsDir := "./extensions" + files, err := ioutil.ReadDir(extensionsDir) + + if err != nil { + return nil, errors.Wrap(err, 0) + } + + extensions := make([]*tapApi.Extension, 0) + for _, file := range files { + filename := file.Name() + + if !strings.HasSuffix(filename, ".so") { + continue + } + + fmt.Printf("Loading extension: %s\n", filename) + + extension := &tapApi.Extension{ + Path: path.Join(extensionsDir, filename), + } + + plug, err := plugin.Open(extension.Path) + + if err != nil { + return nil, errors.Wrap(err, 0) + } + + extension.Plug = plug + symDissector, err := plug.Lookup("Dissector") + + if err != nil { + return nil, errors.Wrap(err, 0) + } + + dissector, ok := symDissector.(tapApi.Dissector) + + if !ok { + return nil, errors.Errorf("Symbol Dissector type error: %v %T\n", file, symDissector) + } + + dissector.Register(extension) + extension.Dissector = dissector + extensions = append(extensions, extension) + } + + sort.Slice(extensions, func(i, j int) bool { + return extensions[i].Protocol.Priority < extensions[j].Protocol.Priority + }) + + for _, extension := range extensions { + fmt.Printf("Extension Properties: %+v\n", extension) + } + + return extensions, nil +} + +func internalRun() error { + opts := tap.TapOpts{ + HostMode: false, + } + + outputItems := make(chan *tapApi.OutputChannelItem, 1000) + extenssions, err := loadExtensions() + + if err != nil { + return err + } + + tapOpts := tapApi.TrafficFilteringOptions{} + + tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts) + + fmt.Printf("Tapping, press enter to exit...\n") + reader := bufio.NewReader(os.Stdin) + reader.ReadLine() + return nil +} + +func main() { + err := internalRun() + + if err != nil { + switch err := err.(type) { + case *errors.Error: + fmt.Printf("Error: %v\n", err.ErrorStack()) + default: + fmt.Printf("Error: %v\n", err) + } + + os.Exit(1) + } +} diff --git a/tap/main/test.sh b/tap/main/test.sh new file mode 100755 index 000000000..491f401d9 --- /dev/null +++ b/tap/main/test.sh @@ -0,0 +1,7 @@ +#!/bin/bash + +set -e + +go build -o main main/main.go + +sudo ./main/main "$@" diff --git a/tap/net_utils.go b/tap/net_utils.go index deafdc910..42f020424 100644 --- a/tap/net_utils.go +++ b/tap/net_utils.go @@ -27,6 +27,7 @@ func getLocalhostIPs() ([]string, error) { return myIPs, nil } +//lint:ignore U1000 will be used in the future func isPrivateIP(ipStr string) bool { ip := net.ParseIP(ipStr) if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() { @@ -54,7 +55,7 @@ func initPrivateIPBlocks() { } { _, block, err := net.ParseCIDR(cidr) if err != nil { - Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err) + tapErrors.Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err) } else { privateIPBlocks = append(privateIPBlocks, block) } diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 830f707c5..3ff3e79f2 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -17,7 +17,6 @@ import ( "os" "os/signal" "runtime" - _debug "runtime/debug" "runtime/pprof" "strconv" "strings" @@ -36,6 +35,7 @@ import ( const cleanPeriod = time.Second * 10 +//lint:ignore U1000 will be used in the future var remoteOnlyOutboundPorts = []int{80, 443} var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit") @@ -63,6 +63,7 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k var memprofile = flag.String("memprofile", "", "Write memory profile") var appStats = api.AppStats{} +var tapErrors *errorsMap // global var stats struct { @@ -87,44 +88,10 @@ type TapOpts struct { HostMode bool } -var outputLevel int -var errorsMap map[string]uint -var errorsMapMutex sync.Mutex -var nErrors uint -var ownIps []string // global var hostMode bool // global var extensions []*api.Extension // global var filteringOptions *api.TrafficFilteringOptions // global -const baseStreamChannelTimeoutMs int = 5000 * 100 - -/* minOutputLevel: Error will be printed only if outputLevel is above this value - * t: key for errorsMap (counting errors) - * s, a: arguments logger.Log.Infof - * Note: Too bad for perf that a... is evaluated - */ -func logError(minOutputLevel int, t string, s string, a ...interface{}) { - errorsMapMutex.Lock() - nErrors++ - nb, _ := errorsMap[t] - errorsMap[t] = nb + 1 - errorsMapMutex.Unlock() - - if outputLevel >= minOutputLevel { - formatStr := fmt.Sprintf("%s: %s", t, s) - logger.Log.Errorf(formatStr, a...) - } -} -func Error(t string, s string, a ...interface{}) { - logError(0, t, s, a...) -} -func SilentError(t string, s string, a ...interface{}) { - logError(2, t, s, a...) -} -func Debug(s string, a ...interface{}) { - logger.Log.Debugf(s, a...) -} - func inArrayInt(arr []int, valueToCheck int) bool { for _, value := range arr { if value == valueToCheck { @@ -191,7 +158,7 @@ func startMemoryProfiler() { } } - for true { + for { t := time.Now() filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05")) @@ -212,44 +179,11 @@ func startMemoryProfiler() { }() } -func closeTimedoutTcpStreamChannels() { - TcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs() - for { - time.Sleep(10 * time.Millisecond) - _debug.FreeOSMemory() - streams.Range(func(key interface{}, value interface{}) bool { - streamWrapper := value.(*tcpStreamWrapper) - stream := streamWrapper.stream - if stream.superIdentifier.Protocol == nil { - if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) { - stream.Close() - appStats.IncDroppedTcpStreams() - logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000) - } - } else { - if !stream.superIdentifier.IsClosedOthers { - for i := range stream.clients { - reader := &stream.clients[i] - if reader.extension.Protocol != stream.superIdentifier.Protocol { - reader.Close() - } - } - for i := range stream.servers { - reader := &stream.servers[i] - if reader.extension.Protocol != stream.superIdentifier.Protocol { - reader.Close() - } - } - stream.superIdentifier.IsClosedOthers = true - } - } - return true - }) - } -} - func startPassiveTapper(outputItems chan *api.OutputChannelItem) { - go closeTimedoutTcpStreamChannels() + streamsMap := NewTcpStreamMap() + go streamsMap.closeTimedoutTcpStreamChannels() + + var outputLevel int defer util.Run()() if *debug { @@ -259,19 +193,12 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { } else if *quiet { outputLevel = -1 } - errorsMap = make(map[string]uint) - if localhostIPs, err := getLocalhostIPs(); err != nil { - // TODO: think this over - logger.Log.Info("Failed to get self IP addresses") - logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err) - ownIps = make([]string, 0) - } else { - ownIps = localhostIPs - } + tapErrors = NewErrorsMap(outputLevel) var handle *pcap.Handle var err error + if *fname != "" { if handle, err = pcap.OpenOffline(*fname); err != nil { logger.Log.Fatalf("PCAP OpenOffline error: %v", err) @@ -316,7 +243,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { var ok bool decoderName := *decoder if decoderName == "" { - decoderName = fmt.Sprintf("%s", handle.LinkType()) + decoderName = handle.LinkType().String() } if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok { logger.Log.Fatal("No decoder named", decoderName) @@ -333,9 +260,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { OutputChannel: outputItems, } - streamFactory := &tcpStreamFactory{ - Emitter: emitter, - } + streamFactory := NewTcpStreamFactory(emitter, streamsMap) streamPool := reassembly.NewStreamPool(streamFactory) assembler := reassembly.NewAssembler(streamPool) @@ -363,17 +288,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { statsPeriod := time.Second * time.Duration(*statsevery) ticker := time.NewTicker(statsPeriod) - for true { + for { <-ticker.C // Since the start - errorsMapMutex.Lock() - errorMapLen := len(errorsMap) - errorsSummery := fmt.Sprintf("%v", errorsMap) - errorsMapMutex.Unlock() + errorMapLen, errorsSummery := tapErrors.getErrorsSummary() + logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s", time.Since(appStats.StartTime), - nErrors, + tapErrors.nErrors, errorMapLen, errorsSummery, ) @@ -410,7 +333,9 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { if err == io.EOF { break } else if err != nil { - logger.Log.Debugf("Error: %v", err) + if err.Error() != "Timeout Expired" { + logger.Log.Debugf("Error: %T", err) + } continue } packetsCount := appStats.IncPacketsCount() @@ -470,14 +395,12 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount if done { - errorsMapMutex.Lock() - errorMapLen := len(errorsMap) - errorsMapMutex.Unlock() + errorMapLen, _ := tapErrors.getErrorsSummary() logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", appStats.PacketsCount, appStats.ProcessedBytes, time.Since(appStats.StartTime), - nErrors, + tapErrors.nErrors, errorMapLen) } select { @@ -531,9 +454,9 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes) logger.Log.Infof(" overlap packets:\t%d", stats.overlapPackets) logger.Log.Infof(" overlap bytes:\t\t%d", stats.overlapBytes) - logger.Log.Infof("Errors: %d", nErrors) - for e := range errorsMap { - logger.Log.Infof(" %s:\t\t%d", e, errorsMap[e]) + logger.Log.Infof("Errors: %d", tapErrors.nErrors) + for e := range tapErrors.errorsMap { + logger.Log.Infof(" %s:\t\t%d", e, tapErrors.errorsMap[e]) } logger.Log.Infof("AppStats: %v", GetStats()) } diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index d80872ba7..fff0a5a4e 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -2,7 +2,6 @@ package tap import ( "bufio" - "fmt" "io" "io/ioutil" "sync" @@ -20,13 +19,6 @@ type tcpReaderDataMsg struct { timestamp time.Time } -type tcpID struct { - srcIP string - dstIP string - srcPort string - dstPort string -} - type ConnectionInfo struct { ClientIP string ClientPort string @@ -35,10 +27,6 @@ type ConnectionInfo struct { IsOutgoing bool } -func (tid *tcpID) String() string { - return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort) -} - /* tcpReader gets reads from a channel of bytes of tcp payload, and parses it into requests and responses. * The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection. * An tcpReader object is unidirectional: it parses either a client stream or a server stream. @@ -54,7 +42,6 @@ type tcpReader struct { data []byte superTimer *api.SuperTimer parent *tcpStream - messageCount uint packetsSeen uint outboundLinkWriter *OutboundLinkWriter extension *api.Extension diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index 2ab626a96..06db746f5 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -28,15 +28,15 @@ type tcpStream struct { isTapTarget bool clients []tcpReader servers []tcpReader - urls []string ident string sync.Mutex + streamsMap *tcpStreamMap } func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool { // FSM if !t.tcpstate.CheckState(tcp, dir) { - SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String()) + tapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String()) stats.rejectFsm++ if !t.fsmerr { t.fsmerr = true @@ -49,7 +49,7 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem // Options err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start) if err != nil { - SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err) + tapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err) stats.rejectOpt++ if !*nooptcheck { return false @@ -60,10 +60,10 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem if *checksum { c, err := tcp.ComputeChecksum() if err != nil { - SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err) + tapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err) accept = false } else if c != 0x0 { - SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c) + tapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c) accept = false } } @@ -97,7 +97,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 { // In the original example this was handled with panic(). // I don't know what this error means or how to handle it properly. - SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets) + tapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets) } stats.overlapBytes += sgStats.OverlapBytes stats.overlapPackets += sgStats.OverlapPackets @@ -108,7 +108,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } else { ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir) } - Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets) + tapErrors.Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets) if skip == -1 && *allowmissinginit { // this is allowed } else if skip != 0 { @@ -127,18 +127,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } dnsSize := binary.BigEndian.Uint16(data[:2]) missing := int(dnsSize) - len(data[2:]) - Debug("dnsSize: %d, missing: %d", dnsSize, missing) + tapErrors.Debug("dnsSize: %d, missing: %d", dnsSize, missing) if missing > 0 { - Debug("Missing some bytes: %d", missing) + tapErrors.Debug("Missing some bytes: %d", missing) sg.KeepFrom(0) return } p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns) err := p.DecodeLayers(data[2:], &decoded) if err != nil { - SilentError("DNS-parser", "Failed to decode DNS: %v", err) + tapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err) } else { - Debug("DNS: %s", gopacket.LayerDump(dns)) + tapErrors.Debug("DNS: %s", gopacket.LayerDump(dns)) } if len(data) > 2+int(dnsSize) { sg.KeepFrom(2 + int(dnsSize)) @@ -173,7 +173,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { - Debug("%s: Connection closed", t.ident) + tapErrors.Debug("%s: Connection closed", t.ident) if t.isTapTarget && !t.isClosed { t.Close() } @@ -193,7 +193,7 @@ func (t *tcpStream) Close() { if shouldReturn { return } - streams.Delete(t.id) + t.streamsMap.Delete(t.id) for i := range t.clients { reader := &t.clients[i] diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index e42705106..5176f20aa 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -22,6 +22,8 @@ type tcpStreamFactory struct { wg sync.WaitGroup outboundLinkWriter *OutboundLinkWriter Emitter api.Emitter + streamsMap *tcpStreamMap + ownIps []string } type tcpStreamWrapper struct { @@ -29,8 +31,24 @@ type tcpStreamWrapper struct { createdAt time.Time } -var streams *sync.Map = &sync.Map{} // global -var streamId int64 = 0 +func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStreamFactory { + var ownIps []string + + if localhostIPs, err := getLocalhostIPs(); err != nil { + // TODO: think this over + logger.Log.Info("Failed to get self IP addresses") + logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err) + ownIps = make([]string, 0) + } else { + ownIps = localhostIPs + } + + return &tcpStreamFactory{ + Emitter: emitter, + streamsMap: streamsMap, + ownIps: ownIps, + } +} func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream { logger.Log.Debugf("* NEW: %s %s", net, transport) @@ -56,10 +74,10 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T ident: fmt.Sprintf("%s:%s", net, transport), optchecker: reassembly.NewTCPOptionCheck(), superIdentifier: &api.SuperIdentifier{}, + streamsMap: factory.streamsMap, } if stream.isTapTarget { - streamId++ - stream.id = streamId + stream.id = factory.streamsMap.nextId() for i, extension := range extensions { counterPair := &api.CounterPair{ Request: 0, @@ -102,7 +120,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T counterPair: counterPair, }) - streams.Store(stream.id, &tcpStreamWrapper{ + factory.streamsMap.Store(stream.id, &tcpStreamWrapper{ stream: stream, createdAt: time.Now(), }) @@ -142,9 +160,10 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, ds } } +//lint:ignore U1000 will be used in the future func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPort int) bool { if inArrayInt(remoteOnlyOutboundPorts, dstPort) { - isDirectedHere := inArrayString(ownIps, dstIP) + isDirectedHere := inArrayString(factory.ownIps, dstIP) return !isDirectedHere && !isPrivateIP(dstIP) } return true diff --git a/tap/tcp_streams_map.go b/tap/tcp_streams_map.go new file mode 100644 index 000000000..3fddf6369 --- /dev/null +++ b/tap/tcp_streams_map.go @@ -0,0 +1,71 @@ +package tap + +import ( + "runtime" + _debug "runtime/debug" + "sync" + "time" + + "github.com/up9inc/mizu/shared/logger" +) + +type tcpStreamMap struct { + streams *sync.Map + streamId int64 +} + +func NewTcpStreamMap() *tcpStreamMap { + return &tcpStreamMap{ + streams: &sync.Map{}, + } +} + +func (streamMap *tcpStreamMap) Store(key, value interface{}) { + streamMap.streams.Store(key, value) +} + +func (streamMap *tcpStreamMap) Delete(key interface{}) { + streamMap.streams.Delete(key) +} + +func (streamMap *tcpStreamMap) nextId() int64 { + streamMap.streamId++ + return streamMap.streamId +} + +func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() { + tcpStreamChannelTimeout := GetTcpChannelTimeoutMs() + for { + time.Sleep(10 * time.Millisecond) + _debug.FreeOSMemory() + streamMap.streams.Range(func(key interface{}, value interface{}) bool { + streamWrapper := value.(*tcpStreamWrapper) + stream := streamWrapper.stream + if stream.superIdentifier.Protocol == nil { + if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) { + stream.Close() + appStats.IncDroppedTcpStreams() + logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", + appStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000) + } + } else { + if !stream.superIdentifier.IsClosedOthers { + for i := range stream.clients { + reader := &stream.clients[i] + if reader.extension.Protocol != stream.superIdentifier.Protocol { + reader.Close() + } + } + for i := range stream.servers { + reader := &stream.servers[i] + if reader.extension.Protocol != stream.superIdentifier.Protocol { + reader.Close() + } + } + stream.superIdentifier.IsClosedOthers = true + } + } + return true + }) + } +} diff --git a/tap/tester/.gitignore b/tap/tester/.gitignore new file mode 100644 index 000000000..6028450ae --- /dev/null +++ b/tap/tester/.gitignore @@ -0,0 +1 @@ +tester \ No newline at end of file diff --git a/tap/tester/README.md b/tap/tester/README.md new file mode 100644 index 000000000..3fc7e4c65 --- /dev/null +++ b/tap/tester/README.md @@ -0,0 +1,12 @@ + +This tester used to launch passive-tapper locally without Docker or Kuberenetes environment. + +Its good for testing purposes. + +# How to run + +From the `tap` folder run: +`./tester/launch.sh` + +The tester gets the same arguments the passive_tapper gets, run with `--help` to get a complete list of options. +`./tester/launch.sh --help` diff --git a/tap/tester/launch.sh b/tap/tester/launch.sh new file mode 100755 index 000000000..5abab4a93 --- /dev/null +++ b/tap/tester/launch.sh @@ -0,0 +1,10 @@ +#!/bin/bash + +set -e + +echo "Building extensions..." +pushd .. && ./devops/build_extensions.sh && popd + +go build -o tester tester/tester.go + +sudo ./tester/tester "$@" diff --git a/tap/tester/tester.go b/tap/tester/tester.go new file mode 100644 index 000000000..91a8fcc36 --- /dev/null +++ b/tap/tester/tester.go @@ -0,0 +1,115 @@ +package main + +import ( + "bufio" + "fmt" + "io/ioutil" + "os" + "path" + "plugin" + "sort" + "strings" + + "github.com/op/go-logging" + + "github.com/go-errors/errors" + "github.com/up9inc/mizu/shared/logger" + "github.com/up9inc/mizu/tap" + tapApi "github.com/up9inc/mizu/tap/api" +) + +func loadExtensions() ([]*tapApi.Extension, error) { + extensionsDir := "./extensions" + files, err := ioutil.ReadDir(extensionsDir) + + if err != nil { + return nil, errors.Wrap(err, 0) + } + + extensions := make([]*tapApi.Extension, 0) + for _, file := range files { + filename := file.Name() + + if !strings.HasSuffix(filename, ".so") { + continue + } + + fmt.Printf("Loading extension: %s\n", filename) + + extension := &tapApi.Extension{ + Path: path.Join(extensionsDir, filename), + } + + plug, err := plugin.Open(extension.Path) + + if err != nil { + return nil, errors.Wrap(err, 0) + } + + extension.Plug = plug + symDissector, err := plug.Lookup("Dissector") + + if err != nil { + return nil, errors.Wrap(err, 0) + } + + dissector, ok := symDissector.(tapApi.Dissector) + + if !ok { + return nil, errors.Errorf("Symbol Dissector type error: %v %T\n", file, symDissector) + } + + dissector.Register(extension) + extension.Dissector = dissector + extensions = append(extensions, extension) + } + + sort.Slice(extensions, func(i, j int) bool { + return extensions[i].Protocol.Priority < extensions[j].Protocol.Priority + }) + + for _, extension := range extensions { + fmt.Printf("Extension Properties: %+v\n", extension) + } + + return extensions, nil +} + +func internalRun() error { + logger.InitLoggerStderrOnly(logging.DEBUG) + + opts := tap.TapOpts{ + HostMode: false, + } + + outputItems := make(chan *tapApi.OutputChannelItem, 1000) + extenssions, err := loadExtensions() + + if err != nil { + return err + } + + tapOpts := tapApi.TrafficFilteringOptions{} + + tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts) + + fmt.Printf("Tapping, press enter to exit...\n") + reader := bufio.NewReader(os.Stdin) + reader.ReadLine() + return nil +} + +func main() { + err := internalRun() + + if err != nil { + switch err := err.(type) { + case *errors.Error: + fmt.Printf("Error: %v\n", err.ErrorStack()) + default: + fmt.Printf("Error: %v\n", err) + } + + os.Exit(1) + } +}