passive-tapper refactor - first phase

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>
This commit is contained in:
David Levanon
2021-10-20 11:15:22 +03:00
committed by GitHub
parent 3a9c113f77
commit 2944493e2d
18 changed files with 457 additions and 134 deletions

View File

@@ -79,6 +79,8 @@ github.com/gin-contrib/static v0.0.1/go.mod h1:CSxeF+wep05e0kCOsqWdAWbSszmc31zTI
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M= github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/gin-gonic/gin v1.7.2 h1:Tg03T9yM2xa8j6I3Z3oqLaQRSmKvxPd6g/2HJ6zICFA= github.com/gin-gonic/gin v1.7.2 h1:Tg03T9yM2xa8j6I3Z3oqLaQRSmKvxPd6g/2HJ6zICFA=
github.com/gin-gonic/gin v1.7.2/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY= github.com/gin-gonic/gin v1.7.2/go.mod h1:jD2toBW3GZUr5UMcdrwQA10I7RuaFOl/SGeDjXkfUtY=
github.com/go-errors/errors v1.4.1 h1:IvVlgbzSsaUNudsw5dcXSzF3EWyXTi5XrAdngnuhRyg=
github.com/go-errors/errors v1.4.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=

View File

@@ -48,7 +48,7 @@ func (cl *Cleaner) start() {
go func() { go func() {
ticker := time.NewTicker(cl.cleanPeriod) ticker := time.NewTicker(cl.cleanPeriod)
for true { for {
<-ticker.C <-ticker.C
cl.clean() cl.clean()
} }

60
tap/errors_map.go Normal file
View File

@@ -0,0 +1,60 @@
package tap
import (
"fmt"
"sync"
"github.com/up9inc/mizu/shared/logger"
)
type errorsMap struct {
errorsMap map[string]uint
outputLevel int
nErrors uint
errorsMapMutex sync.Mutex
}
func NewErrorsMap(outputLevel int) *errorsMap {
return &errorsMap{
errorsMap: make(map[string]uint),
outputLevel: outputLevel,
}
}
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments logger.Log.Infof
* Note: Too bad for perf that a... is evaluated
*/
func (e *errorsMap) logError(minOutputLevel int, t string, s string, a ...interface{}) {
e.errorsMapMutex.Lock()
e.nErrors++
nb := e.errorsMap[t]
e.errorsMap[t] = nb + 1
e.errorsMapMutex.Unlock()
if e.outputLevel >= minOutputLevel {
formatStr := fmt.Sprintf("%s: %s", t, s)
logger.Log.Errorf(formatStr, a...)
}
}
func (e *errorsMap) Error(t string, s string, a ...interface{}) {
e.logError(0, t, s, a...)
}
func (e *errorsMap) SilentError(t string, s string, a ...interface{}) {
e.logError(2, t, s, a...)
}
func (e *errorsMap) Debug(s string, a ...interface{}) {
logger.Log.Debugf(s, a...)
}
func (e *errorsMap) getErrorsSummary() (int, string) {
e.errorsMapMutex.Lock()
errorMapLen := len(e.errorsMap)
errorsSummery := fmt.Sprintf("%v", e.errorsMap)
e.errorsMapMutex.Unlock()
return errorMapLen, errorsSummery
}

View File

@@ -4,7 +4,9 @@ go 1.16
require ( require (
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4
github.com/go-errors/errors v1.4.1
github.com/google/gopacket v1.1.19 github.com/google/gopacket v1.1.19
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/up9inc/mizu/shared v0.0.0 github.com/up9inc/mizu/shared v0.0.0
github.com/up9inc/mizu/tap/api v0.0.0 github.com/up9inc/mizu/tap/api v0.0.0
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 // indirect golang.org/x/net v0.0.0-20210224082022-3d97a244fca7 // indirect

View File

@@ -1,6 +1,8 @@
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 h1:NJOOlc6ZJjix0A1rAU+nxruZtR8KboG1848yqpIUo4M= github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4 h1:NJOOlc6ZJjix0A1rAU+nxruZtR8KboG1848yqpIUo4M=
github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4/go.mod h1:DQPxZS994Ld1Y8uwnJT+dRL04XPD0cElP/pHH/zEBHM= github.com/bradleyfalzon/tlsx v0.0.0-20170624122154-28fd0e59bac4/go.mod h1:DQPxZS994Ld1Y8uwnJT+dRL04XPD0cElP/pHH/zEBHM=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/go-errors/errors v1.4.1 h1:IvVlgbzSsaUNudsw5dcXSzF3EWyXTi5XrAdngnuhRyg=
github.com/go-errors/errors v1.4.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= github.com/golang-jwt/jwt/v4 v4.1.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8= github.com/google/gopacket v1.1.19 h1:ves8RnFZPGiFnTS0uPQStjwru6uO6h+nlr9j6fL7kF8=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo= github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=

1
tap/main/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
main

110
tap/main/main.go Normal file
View File

@@ -0,0 +1,110 @@
package main
import (
"bufio"
"fmt"
"io/ioutil"
"os"
"path"
"plugin"
"sort"
"strings"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
)
func loadExtensions() ([]*tapApi.Extension, error) {
extensionsDir := "./extensions"
files, err := ioutil.ReadDir(extensionsDir)
if err != nil {
return nil, errors.Wrap(err, 0)
}
extensions := make([]*tapApi.Extension, 0)
for _, file := range files {
filename := file.Name()
if !strings.HasSuffix(filename, ".so") {
continue
}
fmt.Printf("Loading extension: %s\n", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
}
plug, err := plugin.Open(extension.Path)
if err != nil {
return nil, errors.Wrap(err, 0)
}
extension.Plug = plug
symDissector, err := plug.Lookup("Dissector")
if err != nil {
return nil, errors.Wrap(err, 0)
}
dissector, ok := symDissector.(tapApi.Dissector)
if !ok {
return nil, errors.Errorf("Symbol Dissector type error: %v %T\n", file, symDissector)
}
dissector.Register(extension)
extension.Dissector = dissector
extensions = append(extensions, extension)
}
sort.Slice(extensions, func(i, j int) bool {
return extensions[i].Protocol.Priority < extensions[j].Protocol.Priority
})
for _, extension := range extensions {
fmt.Printf("Extension Properties: %+v\n", extension)
}
return extensions, nil
}
func internalRun() error {
opts := tap.TapOpts{
HostMode: false,
}
outputItems := make(chan *tapApi.OutputChannelItem, 1000)
extenssions, err := loadExtensions()
if err != nil {
return err
}
tapOpts := tapApi.TrafficFilteringOptions{}
tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts)
fmt.Printf("Tapping, press enter to exit...\n")
reader := bufio.NewReader(os.Stdin)
reader.ReadLine()
return nil
}
func main() {
err := internalRun()
if err != nil {
switch err := err.(type) {
case *errors.Error:
fmt.Printf("Error: %v\n", err.ErrorStack())
default:
fmt.Printf("Error: %v\n", err)
}
os.Exit(1)
}
}

7
tap/main/test.sh Executable file
View File

@@ -0,0 +1,7 @@
#!/bin/bash
set -e
go build -o main main/main.go
sudo ./main/main "$@"

View File

@@ -27,6 +27,7 @@ func getLocalhostIPs() ([]string, error) {
return myIPs, nil return myIPs, nil
} }
//lint:ignore U1000 will be used in the future
func isPrivateIP(ipStr string) bool { func isPrivateIP(ipStr string) bool {
ip := net.ParseIP(ipStr) ip := net.ParseIP(ipStr)
if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() { if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() {
@@ -54,7 +55,7 @@ func initPrivateIPBlocks() {
} { } {
_, block, err := net.ParseCIDR(cidr) _, block, err := net.ParseCIDR(cidr)
if err != nil { if err != nil {
Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err) tapErrors.Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err)
} else { } else {
privateIPBlocks = append(privateIPBlocks, block) privateIPBlocks = append(privateIPBlocks, block)
} }

View File

@@ -17,7 +17,6 @@ import (
"os" "os"
"os/signal" "os/signal"
"runtime" "runtime"
_debug "runtime/debug"
"runtime/pprof" "runtime/pprof"
"strconv" "strconv"
"strings" "strings"
@@ -36,6 +35,7 @@ import (
const cleanPeriod = time.Second * 10 const cleanPeriod = time.Second * 10
//lint:ignore U1000 will be used in the future
var remoteOnlyOutboundPorts = []int{80, 443} var remoteOnlyOutboundPorts = []int{80, 443}
var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit") var maxcount = flag.Int64("c", -1, "Only grab this many packets, then exit")
@@ -63,6 +63,7 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k
var memprofile = flag.String("memprofile", "", "Write memory profile") var memprofile = flag.String("memprofile", "", "Write memory profile")
var appStats = api.AppStats{} var appStats = api.AppStats{}
var tapErrors *errorsMap
// global // global
var stats struct { var stats struct {
@@ -87,44 +88,10 @@ type TapOpts struct {
HostMode bool HostMode bool
} }
var outputLevel int
var errorsMap map[string]uint
var errorsMapMutex sync.Mutex
var nErrors uint
var ownIps []string // global
var hostMode bool // global var hostMode bool // global
var extensions []*api.Extension // global var extensions []*api.Extension // global
var filteringOptions *api.TrafficFilteringOptions // global var filteringOptions *api.TrafficFilteringOptions // global
const baseStreamChannelTimeoutMs int = 5000 * 100
/* minOutputLevel: Error will be printed only if outputLevel is above this value
* t: key for errorsMap (counting errors)
* s, a: arguments logger.Log.Infof
* Note: Too bad for perf that a... is evaluated
*/
func logError(minOutputLevel int, t string, s string, a ...interface{}) {
errorsMapMutex.Lock()
nErrors++
nb, _ := errorsMap[t]
errorsMap[t] = nb + 1
errorsMapMutex.Unlock()
if outputLevel >= minOutputLevel {
formatStr := fmt.Sprintf("%s: %s", t, s)
logger.Log.Errorf(formatStr, a...)
}
}
func Error(t string, s string, a ...interface{}) {
logError(0, t, s, a...)
}
func SilentError(t string, s string, a ...interface{}) {
logError(2, t, s, a...)
}
func Debug(s string, a ...interface{}) {
logger.Log.Debugf(s, a...)
}
func inArrayInt(arr []int, valueToCheck int) bool { func inArrayInt(arr []int, valueToCheck int) bool {
for _, value := range arr { for _, value := range arr {
if value == valueToCheck { if value == valueToCheck {
@@ -191,7 +158,7 @@ func startMemoryProfiler() {
} }
} }
for true { for {
t := time.Now() t := time.Now()
filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05")) filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05"))
@@ -212,44 +179,11 @@ func startMemoryProfiler() {
}() }()
} }
func closeTimedoutTcpStreamChannels() {
TcpStreamChannelTimeoutMs := GetTcpChannelTimeoutMs()
for {
time.Sleep(10 * time.Millisecond)
_debug.FreeOSMemory()
streams.Range(func(key interface{}, value interface{}) bool {
streamWrapper := value.(*tcpStreamWrapper)
stream := streamWrapper.stream
if stream.superIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) {
stream.Close()
appStats.IncDroppedTcpStreams()
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {
for i := range stream.clients {
reader := &stream.clients[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
for i := range stream.servers {
reader := &stream.servers[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
stream.superIdentifier.IsClosedOthers = true
}
}
return true
})
}
}
func startPassiveTapper(outputItems chan *api.OutputChannelItem) { func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
go closeTimedoutTcpStreamChannels() streamsMap := NewTcpStreamMap()
go streamsMap.closeTimedoutTcpStreamChannels()
var outputLevel int
defer util.Run()() defer util.Run()()
if *debug { if *debug {
@@ -259,19 +193,12 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
} else if *quiet { } else if *quiet {
outputLevel = -1 outputLevel = -1
} }
errorsMap = make(map[string]uint)
if localhostIPs, err := getLocalhostIPs(); err != nil { tapErrors = NewErrorsMap(outputLevel)
// TODO: think this over
logger.Log.Info("Failed to get self IP addresses")
logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
ownIps = make([]string, 0)
} else {
ownIps = localhostIPs
}
var handle *pcap.Handle var handle *pcap.Handle
var err error var err error
if *fname != "" { if *fname != "" {
if handle, err = pcap.OpenOffline(*fname); err != nil { if handle, err = pcap.OpenOffline(*fname); err != nil {
logger.Log.Fatalf("PCAP OpenOffline error: %v", err) logger.Log.Fatalf("PCAP OpenOffline error: %v", err)
@@ -316,7 +243,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
var ok bool var ok bool
decoderName := *decoder decoderName := *decoder
if decoderName == "" { if decoderName == "" {
decoderName = fmt.Sprintf("%s", handle.LinkType()) decoderName = handle.LinkType().String()
} }
if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok { if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok {
logger.Log.Fatal("No decoder named", decoderName) logger.Log.Fatal("No decoder named", decoderName)
@@ -333,9 +260,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
OutputChannel: outputItems, OutputChannel: outputItems,
} }
streamFactory := &tcpStreamFactory{ streamFactory := NewTcpStreamFactory(emitter, streamsMap)
Emitter: emitter,
}
streamPool := reassembly.NewStreamPool(streamFactory) streamPool := reassembly.NewStreamPool(streamFactory)
assembler := reassembly.NewAssembler(streamPool) assembler := reassembly.NewAssembler(streamPool)
@@ -363,17 +288,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
statsPeriod := time.Second * time.Duration(*statsevery) statsPeriod := time.Second * time.Duration(*statsevery)
ticker := time.NewTicker(statsPeriod) ticker := time.NewTicker(statsPeriod)
for true { for {
<-ticker.C <-ticker.C
// Since the start // Since the start
errorsMapMutex.Lock() errorMapLen, errorsSummery := tapErrors.getErrorsSummary()
errorMapLen := len(errorsMap)
errorsSummery := fmt.Sprintf("%v", errorsMap)
errorsMapMutex.Unlock()
logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s", logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s",
time.Since(appStats.StartTime), time.Since(appStats.StartTime),
nErrors, tapErrors.nErrors,
errorMapLen, errorMapLen,
errorsSummery, errorsSummery,
) )
@@ -410,7 +333,9 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
if err == io.EOF { if err == io.EOF {
break break
} else if err != nil { } else if err != nil {
logger.Log.Debugf("Error: %v", err) if err.Error() != "Timeout Expired" {
logger.Log.Debugf("Error: %T", err)
}
continue continue
} }
packetsCount := appStats.IncPacketsCount() packetsCount := appStats.IncPacketsCount()
@@ -470,14 +395,12 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount
if done { if done {
errorsMapMutex.Lock() errorMapLen, _ := tapErrors.getErrorsSummary()
errorMapLen := len(errorsMap)
errorsMapMutex.Unlock()
logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)",
appStats.PacketsCount, appStats.PacketsCount,
appStats.ProcessedBytes, appStats.ProcessedBytes,
time.Since(appStats.StartTime), time.Since(appStats.StartTime),
nErrors, tapErrors.nErrors,
errorMapLen) errorMapLen)
} }
select { select {
@@ -531,9 +454,9 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) {
logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes) logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes)
logger.Log.Infof(" overlap packets:\t%d", stats.overlapPackets) logger.Log.Infof(" overlap packets:\t%d", stats.overlapPackets)
logger.Log.Infof(" overlap bytes:\t\t%d", stats.overlapBytes) logger.Log.Infof(" overlap bytes:\t\t%d", stats.overlapBytes)
logger.Log.Infof("Errors: %d", nErrors) logger.Log.Infof("Errors: %d", tapErrors.nErrors)
for e := range errorsMap { for e := range tapErrors.errorsMap {
logger.Log.Infof(" %s:\t\t%d", e, errorsMap[e]) logger.Log.Infof(" %s:\t\t%d", e, tapErrors.errorsMap[e])
} }
logger.Log.Infof("AppStats: %v", GetStats()) logger.Log.Infof("AppStats: %v", GetStats())
} }

View File

@@ -2,7 +2,6 @@ package tap
import ( import (
"bufio" "bufio"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"sync" "sync"
@@ -20,13 +19,6 @@ type tcpReaderDataMsg struct {
timestamp time.Time timestamp time.Time
} }
type tcpID struct {
srcIP string
dstIP string
srcPort string
dstPort string
}
type ConnectionInfo struct { type ConnectionInfo struct {
ClientIP string ClientIP string
ClientPort string ClientPort string
@@ -35,10 +27,6 @@ type ConnectionInfo struct {
IsOutgoing bool IsOutgoing bool
} }
func (tid *tcpID) String() string {
return fmt.Sprintf("%s->%s %s->%s", tid.srcIP, tid.dstIP, tid.srcPort, tid.dstPort)
}
/* tcpReader gets reads from a channel of bytes of tcp payload, and parses it into requests and responses. /* tcpReader gets reads from a channel of bytes of tcp payload, and parses it into requests and responses.
* The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection. * The payload is written to the channel by a tcpStream object that is dedicated to one tcp connection.
* An tcpReader object is unidirectional: it parses either a client stream or a server stream. * An tcpReader object is unidirectional: it parses either a client stream or a server stream.
@@ -54,7 +42,6 @@ type tcpReader struct {
data []byte data []byte
superTimer *api.SuperTimer superTimer *api.SuperTimer
parent *tcpStream parent *tcpStream
messageCount uint
packetsSeen uint packetsSeen uint
outboundLinkWriter *OutboundLinkWriter outboundLinkWriter *OutboundLinkWriter
extension *api.Extension extension *api.Extension

View File

@@ -28,15 +28,15 @@ type tcpStream struct {
isTapTarget bool isTapTarget bool
clients []tcpReader clients []tcpReader
servers []tcpReader servers []tcpReader
urls []string
ident string ident string
sync.Mutex sync.Mutex
streamsMap *tcpStreamMap
} }
func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool { func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// FSM // FSM
if !t.tcpstate.CheckState(tcp, dir) { if !t.tcpstate.CheckState(tcp, dir) {
SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String()) tapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
stats.rejectFsm++ stats.rejectFsm++
if !t.fsmerr { if !t.fsmerr {
t.fsmerr = true t.fsmerr = true
@@ -49,7 +49,7 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
// Options // Options
err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start) err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start)
if err != nil { if err != nil {
SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err) tapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
stats.rejectOpt++ stats.rejectOpt++
if !*nooptcheck { if !*nooptcheck {
return false return false
@@ -60,10 +60,10 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem
if *checksum { if *checksum {
c, err := tcp.ComputeChecksum() c, err := tcp.ComputeChecksum()
if err != nil { if err != nil {
SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err) tapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
accept = false accept = false
} else if c != 0x0 { } else if c != 0x0 {
SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c) tapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
accept = false accept = false
} }
} }
@@ -97,7 +97,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 { if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 {
// In the original example this was handled with panic(). // In the original example this was handled with panic().
// I don't know what this error means or how to handle it properly. // I don't know what this error means or how to handle it properly.
SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets) tapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
} }
stats.overlapBytes += sgStats.OverlapBytes stats.overlapBytes += sgStats.OverlapBytes
stats.overlapPackets += sgStats.OverlapPackets stats.overlapPackets += sgStats.OverlapPackets
@@ -108,7 +108,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} else { } else {
ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir) ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir)
} }
Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets) tapErrors.Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
if skip == -1 && *allowmissinginit { if skip == -1 && *allowmissinginit {
// this is allowed // this is allowed
} else if skip != 0 { } else if skip != 0 {
@@ -127,18 +127,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} }
dnsSize := binary.BigEndian.Uint16(data[:2]) dnsSize := binary.BigEndian.Uint16(data[:2])
missing := int(dnsSize) - len(data[2:]) missing := int(dnsSize) - len(data[2:])
Debug("dnsSize: %d, missing: %d", dnsSize, missing) tapErrors.Debug("dnsSize: %d, missing: %d", dnsSize, missing)
if missing > 0 { if missing > 0 {
Debug("Missing some bytes: %d", missing) tapErrors.Debug("Missing some bytes: %d", missing)
sg.KeepFrom(0) sg.KeepFrom(0)
return return
} }
p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns) p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns)
err := p.DecodeLayers(data[2:], &decoded) err := p.DecodeLayers(data[2:], &decoded)
if err != nil { if err != nil {
SilentError("DNS-parser", "Failed to decode DNS: %v", err) tapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err)
} else { } else {
Debug("DNS: %s", gopacket.LayerDump(dns)) tapErrors.Debug("DNS: %s", gopacket.LayerDump(dns))
} }
if len(data) > 2+int(dnsSize) { if len(data) > 2+int(dnsSize) {
sg.KeepFrom(2 + int(dnsSize)) sg.KeepFrom(2 + int(dnsSize))
@@ -173,7 +173,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass
} }
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
Debug("%s: Connection closed", t.ident) tapErrors.Debug("%s: Connection closed", t.ident)
if t.isTapTarget && !t.isClosed { if t.isTapTarget && !t.isClosed {
t.Close() t.Close()
} }
@@ -193,7 +193,7 @@ func (t *tcpStream) Close() {
if shouldReturn { if shouldReturn {
return return
} }
streams.Delete(t.id) t.streamsMap.Delete(t.id)
for i := range t.clients { for i := range t.clients {
reader := &t.clients[i] reader := &t.clients[i]

View File

@@ -22,6 +22,8 @@ type tcpStreamFactory struct {
wg sync.WaitGroup wg sync.WaitGroup
outboundLinkWriter *OutboundLinkWriter outboundLinkWriter *OutboundLinkWriter
Emitter api.Emitter Emitter api.Emitter
streamsMap *tcpStreamMap
ownIps []string
} }
type tcpStreamWrapper struct { type tcpStreamWrapper struct {
@@ -29,8 +31,24 @@ type tcpStreamWrapper struct {
createdAt time.Time createdAt time.Time
} }
var streams *sync.Map = &sync.Map{} // global func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap) *tcpStreamFactory {
var streamId int64 = 0 var ownIps []string
if localhostIPs, err := getLocalhostIPs(); err != nil {
// TODO: think this over
logger.Log.Info("Failed to get self IP addresses")
logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
ownIps = make([]string, 0)
} else {
ownIps = localhostIPs
}
return &tcpStreamFactory{
Emitter: emitter,
streamsMap: streamsMap,
ownIps: ownIps,
}
}
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream { func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
logger.Log.Debugf("* NEW: %s %s", net, transport) logger.Log.Debugf("* NEW: %s %s", net, transport)
@@ -56,10 +74,10 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
ident: fmt.Sprintf("%s:%s", net, transport), ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(), optchecker: reassembly.NewTCPOptionCheck(),
superIdentifier: &api.SuperIdentifier{}, superIdentifier: &api.SuperIdentifier{},
streamsMap: factory.streamsMap,
} }
if stream.isTapTarget { if stream.isTapTarget {
streamId++ stream.id = factory.streamsMap.nextId()
stream.id = streamId
for i, extension := range extensions { for i, extension := range extensions {
counterPair := &api.CounterPair{ counterPair := &api.CounterPair{
Request: 0, Request: 0,
@@ -102,7 +120,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
counterPair: counterPair, counterPair: counterPair,
}) })
streams.Store(stream.id, &tcpStreamWrapper{ factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
stream: stream, stream: stream,
createdAt: time.Now(), createdAt: time.Now(),
}) })
@@ -142,9 +160,10 @@ func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, ds
} }
} }
//lint:ignore U1000 will be used in the future
func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPort int) bool { func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPort int) bool {
if inArrayInt(remoteOnlyOutboundPorts, dstPort) { if inArrayInt(remoteOnlyOutboundPorts, dstPort) {
isDirectedHere := inArrayString(ownIps, dstIP) isDirectedHere := inArrayString(factory.ownIps, dstIP)
return !isDirectedHere && !isPrivateIP(dstIP) return !isDirectedHere && !isPrivateIP(dstIP)
} }
return true return true

71
tap/tcp_streams_map.go Normal file
View File

@@ -0,0 +1,71 @@
package tap
import (
"runtime"
_debug "runtime/debug"
"sync"
"time"
"github.com/up9inc/mizu/shared/logger"
)
type tcpStreamMap struct {
streams *sync.Map
streamId int64
}
func NewTcpStreamMap() *tcpStreamMap {
return &tcpStreamMap{
streams: &sync.Map{},
}
}
func (streamMap *tcpStreamMap) Store(key, value interface{}) {
streamMap.streams.Store(key, value)
}
func (streamMap *tcpStreamMap) Delete(key interface{}) {
streamMap.streams.Delete(key)
}
func (streamMap *tcpStreamMap) nextId() int64 {
streamMap.streamId++
return streamMap.streamId
}
func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() {
tcpStreamChannelTimeout := GetTcpChannelTimeoutMs()
for {
time.Sleep(10 * time.Millisecond)
_debug.FreeOSMemory()
streamMap.streams.Range(func(key interface{}, value interface{}) bool {
streamWrapper := value.(*tcpStreamWrapper)
stream := streamWrapper.stream
if stream.superIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) {
stream.Close()
appStats.IncDroppedTcpStreams()
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n",
appStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {
for i := range stream.clients {
reader := &stream.clients[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
for i := range stream.servers {
reader := &stream.servers[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
stream.superIdentifier.IsClosedOthers = true
}
}
return true
})
}
}

1
tap/tester/.gitignore vendored Normal file
View File

@@ -0,0 +1 @@
tester

12
tap/tester/README.md Normal file
View File

@@ -0,0 +1,12 @@
This tester used to launch passive-tapper locally without Docker or Kuberenetes environment.
Its good for testing purposes.
# How to run
From the `tap` folder run:
`./tester/launch.sh`
The tester gets the same arguments the passive_tapper gets, run with `--help` to get a complete list of options.
`./tester/launch.sh --help`

10
tap/tester/launch.sh Executable file
View File

@@ -0,0 +1,10 @@
#!/bin/bash
set -e
echo "Building extensions..."
pushd .. && ./devops/build_extensions.sh && popd
go build -o tester tester/tester.go
sudo ./tester/tester "$@"

115
tap/tester/tester.go Normal file
View File

@@ -0,0 +1,115 @@
package main
import (
"bufio"
"fmt"
"io/ioutil"
"os"
"path"
"plugin"
"sort"
"strings"
"github.com/op/go-logging"
"github.com/go-errors/errors"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap"
tapApi "github.com/up9inc/mizu/tap/api"
)
func loadExtensions() ([]*tapApi.Extension, error) {
extensionsDir := "./extensions"
files, err := ioutil.ReadDir(extensionsDir)
if err != nil {
return nil, errors.Wrap(err, 0)
}
extensions := make([]*tapApi.Extension, 0)
for _, file := range files {
filename := file.Name()
if !strings.HasSuffix(filename, ".so") {
continue
}
fmt.Printf("Loading extension: %s\n", filename)
extension := &tapApi.Extension{
Path: path.Join(extensionsDir, filename),
}
plug, err := plugin.Open(extension.Path)
if err != nil {
return nil, errors.Wrap(err, 0)
}
extension.Plug = plug
symDissector, err := plug.Lookup("Dissector")
if err != nil {
return nil, errors.Wrap(err, 0)
}
dissector, ok := symDissector.(tapApi.Dissector)
if !ok {
return nil, errors.Errorf("Symbol Dissector type error: %v %T\n", file, symDissector)
}
dissector.Register(extension)
extension.Dissector = dissector
extensions = append(extensions, extension)
}
sort.Slice(extensions, func(i, j int) bool {
return extensions[i].Protocol.Priority < extensions[j].Protocol.Priority
})
for _, extension := range extensions {
fmt.Printf("Extension Properties: %+v\n", extension)
}
return extensions, nil
}
func internalRun() error {
logger.InitLoggerStderrOnly(logging.DEBUG)
opts := tap.TapOpts{
HostMode: false,
}
outputItems := make(chan *tapApi.OutputChannelItem, 1000)
extenssions, err := loadExtensions()
if err != nil {
return err
}
tapOpts := tapApi.TrafficFilteringOptions{}
tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts)
fmt.Printf("Tapping, press enter to exit...\n")
reader := bufio.NewReader(os.Stdin)
reader.ReadLine()
return nil
}
func main() {
err := internalRun()
if err != nil {
switch err := err.(type) {
case *errors.Error:
fmt.Printf("Error: %v\n", err.ErrorStack())
default:
fmt.Printf("Error: %v\n", err)
}
os.Exit(1)
}
}