diff --git a/tap/diagnose/diagnose.go b/tap/diagnose/diagnose.go new file mode 100644 index 000000000..2c64a3b35 --- /dev/null +++ b/tap/diagnose/diagnose.go @@ -0,0 +1,76 @@ +package diagnose + +import ( + "fmt" + "os" + "runtime" + "runtime/pprof" + "strconv" + "time" + + "github.com/up9inc/mizu/shared/logger" + "github.com/up9inc/mizu/tap/api" +) + +var AppStats = api.AppStats{} + +func StartMemoryProfiler(envDumpPath string, envTimeInterval string) { + dumpPath := "/app/pprof" + if envDumpPath != "" { + dumpPath = envDumpPath + } + timeInterval := 60 + if envTimeInterval != "" { + if i, err := strconv.Atoi(envTimeInterval); err == nil { + timeInterval = i + } + } + + logger.Log.Info("Profiling is on, results will be written to %s", dumpPath) + go func() { + if _, err := os.Stat(dumpPath); os.IsNotExist(err) { + if err := os.Mkdir(dumpPath, 0777); err != nil { + logger.Log.Fatal("could not create directory for profile: ", err) + } + } + + for { + t := time.Now() + + filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05")) + + logger.Log.Infof("Writing memory profile to %s\n", filename) + + f, err := os.Create(filename) + if err != nil { + logger.Log.Fatal("could not create memory profile: ", err) + } + runtime.GC() // get up-to-date statistics + if err := pprof.WriteHeapProfile(f); err != nil { + logger.Log.Fatal("could not write memory profile: ", err) + } + _ = f.Close() + time.Sleep(time.Second * time.Duration(timeInterval)) + } + }() +} + +func DumpMemoryProfile(filename string) error { + if filename == "" { + return nil + } + + f, err := os.Create(filename) + + if err != nil { + return err + } + + defer f.Close() + + if err := pprof.WriteHeapProfile(f); err != nil { + return err + } + + return nil +} diff --git a/tap/errors_map.go b/tap/diagnose/errors_map.go similarity index 61% rename from tap/errors_map.go rename to tap/diagnose/errors_map.go index 6c66d74b7..9bc4b8a46 100644 --- a/tap/errors_map.go +++ b/tap/diagnose/errors_map.go @@ -1,23 +1,41 @@ -package tap +package diagnose import ( "fmt" "sync" + "github.com/google/gopacket/examples/util" "github.com/up9inc/mizu/shared/logger" ) +var TapErrors *errorsMap + type errorsMap struct { errorsMap map[string]uint - outputLevel int - nErrors uint + OutputLevel int + ErrorsCount uint errorsMapMutex sync.Mutex } -func NewErrorsMap(outputLevel int) *errorsMap { +func InitializeErrorsMap(debug bool, verbose bool, quiet bool) { + var outputLevel int + + defer util.Run()() + if debug { + outputLevel = 2 + } else if verbose { + outputLevel = 1 + } else if quiet { + outputLevel = -1 + } + + TapErrors = newErrorsMap(outputLevel) +} + +func newErrorsMap(outputLevel int) *errorsMap { return &errorsMap{ errorsMap: make(map[string]uint), - outputLevel: outputLevel, + OutputLevel: outputLevel, } } @@ -28,12 +46,12 @@ func NewErrorsMap(outputLevel int) *errorsMap { */ func (e *errorsMap) logError(minOutputLevel int, t string, s string, a ...interface{}) { e.errorsMapMutex.Lock() - e.nErrors++ + e.ErrorsCount++ nb := e.errorsMap[t] e.errorsMap[t] = nb + 1 e.errorsMapMutex.Unlock() - if e.outputLevel >= minOutputLevel { + if e.OutputLevel >= minOutputLevel { formatStr := fmt.Sprintf("%s: %s", t, s) logger.Log.Errorf(formatStr, a...) } @@ -51,10 +69,17 @@ func (e *errorsMap) Debug(s string, a ...interface{}) { logger.Log.Debugf(s, a...) } -func (e *errorsMap) getErrorsSummary() (int, string) { +func (e *errorsMap) GetErrorsSummary() (int, string) { e.errorsMapMutex.Lock() errorMapLen := len(e.errorsMap) errorsSummery := fmt.Sprintf("%v", e.errorsMap) e.errorsMapMutex.Unlock() return errorMapLen, errorsSummery } + +func (e *errorsMap) PrintSummary() { + logger.Log.Infof("Errors: %d", e.ErrorsCount) + for t := range e.errorsMap { + logger.Log.Infof(" %s:\t\t%d", e, e.errorsMap[t]) + } +} diff --git a/tap/diagnose/internal_stats.go b/tap/diagnose/internal_stats.go new file mode 100644 index 000000000..e37379cb3 --- /dev/null +++ b/tap/diagnose/internal_stats.go @@ -0,0 +1,46 @@ +package diagnose + +import "github.com/up9inc/mizu/shared/logger" + +type tapperInternalStats struct { + Ipdefrag int + MissedBytes int + Pkt int + Sz int + Totalsz int + RejectFsm int + RejectOpt int + RejectConnFsm int + Reassembled int + OutOfOrderBytes int + OutOfOrderPackets int + BiggestChunkBytes int + BiggestChunkPackets int + OverlapBytes int + OverlapPackets int +} + +var InternalStats *tapperInternalStats + +func InitializeTapperInternalStats() { + InternalStats = &tapperInternalStats{} +} + +func (stats *tapperInternalStats) PrintStatsSummary() { + logger.Log.Infof("IPdefrag:\t\t%d", stats.Ipdefrag) + logger.Log.Infof("TCP stats:") + logger.Log.Infof(" missed bytes:\t\t%d", stats.MissedBytes) + logger.Log.Infof(" total packets:\t\t%d", stats.Pkt) + logger.Log.Infof(" rejected FSM:\t\t%d", stats.RejectFsm) + logger.Log.Infof(" rejected Options:\t%d", stats.RejectOpt) + logger.Log.Infof(" reassembled bytes:\t%d", stats.Sz) + logger.Log.Infof(" total TCP bytes:\t%d", stats.Totalsz) + logger.Log.Infof(" conn rejected FSM:\t%d", stats.RejectConnFsm) + logger.Log.Infof(" reassembled chunks:\t%d", stats.Reassembled) + logger.Log.Infof(" out-of-order packets:\t%d", stats.OutOfOrderPackets) + logger.Log.Infof(" out-of-order bytes:\t%d", stats.OutOfOrderBytes) + logger.Log.Infof(" biggest-chunk packets:\t%d", stats.BiggestChunkPackets) + logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.BiggestChunkBytes) + logger.Log.Infof(" overlap packets:\t%d", stats.OverlapPackets) + logger.Log.Infof(" overlap bytes:\t\t%d", stats.OverlapBytes) +} diff --git a/tap/net_utils.go b/tap/net_utils.go index 42f020424..c60fccbd9 100644 --- a/tap/net_utils.go +++ b/tap/net_utils.go @@ -3,6 +3,8 @@ package tap import ( "net" "strings" + + "github.com/up9inc/mizu/tap/diagnose" ) var privateIPBlocks []*net.IPNet @@ -55,7 +57,7 @@ func initPrivateIPBlocks() { } { _, block, err := net.ParseCIDR(cidr) if err != nil { - tapErrors.Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err) + diagnose.TapErrors.Error("Private-IP-Block-Parse", "parse error on %q: %v", cidr, err) } else { privateIPBlocks = append(privateIPBlocks, block) } diff --git a/tap/outboundlinks.go b/tap/outboundlinks.go index 6e8a194b7..2150b1f21 100644 --- a/tap/outboundlinks.go +++ b/tap/outboundlinks.go @@ -7,11 +7,11 @@ const ( ) type OutboundLink struct { - Src string - DstIP string - DstPort int + Src string + DstIP string + DstPort int SuggestedResolvedName string - SuggestedProtocol OutboundLinkProtocol + SuggestedProtocol OutboundLinkProtocol } func NewOutboundLinkWriter() *OutboundLinkWriter { @@ -26,11 +26,11 @@ type OutboundLinkWriter struct { func (olw *OutboundLinkWriter) WriteOutboundLink(src string, DstIP string, DstPort int, SuggestedResolvedName string, SuggestedProtocol OutboundLinkProtocol) { olw.OutChan <- &OutboundLink{ - Src: src, - DstIP: DstIP, - DstPort: DstPort, + Src: src, + DstIP: DstIP, + DstPort: DstPort, SuggestedResolvedName: SuggestedResolvedName, - SuggestedProtocol: SuggestedProtocol, + SuggestedProtocol: SuggestedProtocol, } } diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 3ff3e79f2..6dfe1390a 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -9,28 +9,17 @@ package tap import ( - "encoding/hex" "encoding/json" "flag" - "fmt" - "io" "os" - "os/signal" "runtime" - "runtime/pprof" - "strconv" "strings" - "sync" "time" - "github.com/google/gopacket" - "github.com/google/gopacket/examples/util" - "github.com/google/gopacket/ip4defrag" - "github.com/google/gopacket/layers" // pulls in all layers decoders - "github.com/google/gopacket/pcap" - "github.com/google/gopacket/reassembly" "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/tap/api" + "github.com/up9inc/mizu/tap/diagnose" + "github.com/up9inc/mizu/tap/source" ) const cleanPeriod = time.Second * 10 @@ -62,28 +51,6 @@ var staleTimeoutSeconds = flag.Int("staletimout", 120, "Max time in seconds to k var memprofile = flag.String("memprofile", "", "Write memory profile") -var appStats = api.AppStats{} -var tapErrors *errorsMap - -// global -var stats struct { - ipdefrag int - missedBytes int - pkt int - sz int - totalsz int - rejectFsm int - rejectOpt int - rejectConnFsm int - reassembled int - outOfOrderBytes int - outOfOrderPackets int - biggestChunkBytes int - biggestChunkPackets int - overlapBytes int - overlapPackets int -} - type TapOpts struct { HostMode bool } @@ -110,353 +77,121 @@ func inArrayString(arr []string, valueToCheck string) bool { return false } -// Context -// The assembler context -type Context struct { - CaptureInfo gopacket.CaptureInfo -} - -func GetStats() api.AppStats { - return appStats -} - -func (c *Context) GetCaptureInfo() gopacket.CaptureInfo { - return c.CaptureInfo -} - func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, extensionsRef []*api.Extension, options *api.TrafficFilteringOptions) { hostMode = opts.HostMode extensions = extensionsRef filteringOptions = options if GetMemoryProfilingEnabled() { - startMemoryProfiler() + diagnose.StartMemoryProfiler(os.Getenv(MemoryProfilingDumpPath), os.Getenv(MemoryProfilingTimeIntervalSeconds)) } go startPassiveTapper(outputItems) } -func startMemoryProfiler() { - dumpPath := "/app/pprof" - envDumpPath := os.Getenv(MemoryProfilingDumpPath) - if envDumpPath != "" { - dumpPath = envDumpPath +func printPeriodicStats(cleaner *Cleaner) { + statsPeriod := time.Second * time.Duration(*statsevery) + ticker := time.NewTicker(statsPeriod) + + for { + <-ticker.C + + // Since the start + errorMapLen, errorsSummery := diagnose.TapErrors.GetErrorsSummary() + + logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s", + time.Since(diagnose.AppStats.StartTime), + diagnose.TapErrors.ErrorsCount, + errorMapLen, + errorsSummery, + ) + + // At this moment + memStats := runtime.MemStats{} + runtime.ReadMemStats(&memStats) + logger.Log.Infof( + "mem: %d, goroutines: %d", + memStats.HeapAlloc, + runtime.NumGoroutine(), + ) + + // Since the last print + cleanStats := cleaner.dumpStats() + logger.Log.Infof( + "cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d", + cleanStats.flushed, + cleanStats.closed, + cleanStats.deleted, + ) + currentAppStats := diagnose.AppStats.DumpStats() + appStatsJSON, _ := json.Marshal(currentAppStats) + logger.Log.Infof("app stats - %v", string(appStatsJSON)) } - timeInterval := 60 - envTimeInterval := os.Getenv(MemoryProfilingTimeIntervalSeconds) - if envTimeInterval != "" { - if i, err := strconv.Atoi(envTimeInterval); err == nil { - timeInterval = i - } - } - - logger.Log.Info("Profiling is on, results will be written to %s", dumpPath) - go func() { - if _, err := os.Stat(dumpPath); os.IsNotExist(err) { - if err := os.Mkdir(dumpPath, 0777); err != nil { - logger.Log.Fatal("could not create directory for profile: ", err) - } - } - - for { - t := time.Now() - - filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05")) - - logger.Log.Infof("Writing memory profile to %s\n", filename) - - f, err := os.Create(filename) - if err != nil { - logger.Log.Fatal("could not create memory profile: ", err) - } - runtime.GC() // get up-to-date statistics - if err := pprof.WriteHeapProfile(f); err != nil { - logger.Log.Fatal("could not write memory profile: ", err) - } - _ = f.Close() - time.Sleep(time.Second * time.Duration(timeInterval)) - } - }() } func startPassiveTapper(outputItems chan *api.OutputChannelItem) { streamsMap := NewTcpStreamMap() go streamsMap.closeTimedoutTcpStreamChannels() - var outputLevel int + diagnose.InitializeErrorsMap(*debug, *verbose, *quiet) + diagnose.InitializeTapperInternalStats() - defer util.Run()() - if *debug { - outputLevel = 2 - } else if *verbose { - outputLevel = 1 - } else if *quiet { - outputLevel = -1 - } - - tapErrors = NewErrorsMap(outputLevel) - - var handle *pcap.Handle - var err error - - if *fname != "" { - if handle, err = pcap.OpenOffline(*fname); err != nil { - logger.Log.Fatalf("PCAP OpenOffline error: %v", err) - } - } else { - // This is a little complicated because we want to allow all possible options - // for creating the packet capture handle... instead of all this you can - // just call pcap.OpenLive if you want a simple handle. - inactive, err := pcap.NewInactiveHandle(*iface) - if err != nil { - logger.Log.Fatalf("could not create: %v", err) - } - defer inactive.CleanUp() - if err = inactive.SetSnapLen(*snaplen); err != nil { - logger.Log.Fatalf("could not set snap length: %v", err) - } else if err = inactive.SetPromisc(*promisc); err != nil { - logger.Log.Fatalf("could not set promisc mode: %v", err) - } else if err = inactive.SetTimeout(time.Second); err != nil { - logger.Log.Fatalf("could not set timeout: %v", err) - } - if *tstype != "" { - if t, err := pcap.TimestampSourceFromString(*tstype); err != nil { - logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps()) - } else if err := inactive.SetTimestampSource(t); err != nil { - logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps()) - } - } - if handle, err = inactive.Activate(); err != nil { - logger.Log.Fatalf("PCAP Activate error: %v", err) - } - defer handle.Close() - } + var bpffilter string if len(flag.Args()) > 0 { - bpffilter := strings.Join(flag.Args(), " ") - logger.Log.Infof("Using BPF filter %q", bpffilter) - if err = handle.SetBPFFilter(bpffilter); err != nil { - logger.Log.Fatalf("BPF filter error: %v", err) - } + bpffilter = strings.Join(flag.Args(), " ") } - var dec gopacket.Decoder - var ok bool - decoderName := *decoder - if decoderName == "" { - decoderName = handle.LinkType().String() + packetSource, err := source.NewTcpPacketSource(*fname, *iface, source.TcpPacketSourceBehaviour{ + SnapLength: *snaplen, + Promisc: *promisc, + Tstype: *tstype, + DecoderName: *decoder, + Lazy: *lazy, + BpfFilter: bpffilter, + }) + + if err != nil { + logger.Log.Fatal(err) } - if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok { - logger.Log.Fatal("No decoder named", decoderName) + + defer packetSource.Close() + + if err != nil { + logger.Log.Fatal(err) } - source := gopacket.NewPacketSource(handle, dec) - source.Lazy = *lazy - source.NoCopy = true + + packets := make(chan source.TcpPacketInfo) + assembler := NewTcpAssembler(outputItems, streamsMap) + logger.Log.Info("Starting to read packets") - appStats.SetStartTime(time.Now()) - defragger := ip4defrag.NewIPv4Defragmenter() + diagnose.AppStats.SetStartTime(time.Now()) - var emitter api.Emitter = &api.Emitting{ - AppStats: &appStats, - OutputChannel: outputItems, - } - - streamFactory := NewTcpStreamFactory(emitter, streamsMap) - streamPool := reassembly.NewStreamPool(streamFactory) - assembler := reassembly.NewAssembler(streamPool) - - maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection() - maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal() - logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection) - assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal - assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection - - var assemblerMutex sync.Mutex - - signalChan := make(chan os.Signal, 1) - signal.Notify(signalChan, os.Interrupt) + go packetSource.ReadPackets(!*nodefrag, packets) staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds) cleaner := Cleaner{ - assembler: assembler, - assemblerMutex: &assemblerMutex, + assembler: assembler.Assembler, + assemblerMutex: &assembler.assemblerMutex, cleanPeriod: cleanPeriod, connectionTimeout: staleConnectionTimeout, } cleaner.start() - go func() { - statsPeriod := time.Second * time.Duration(*statsevery) - ticker := time.NewTicker(statsPeriod) + go printPeriodicStats(&cleaner) - for { - <-ticker.C + assembler.processPackets(*hexdumppkt, packets) - // Since the start - errorMapLen, errorsSummery := tapErrors.getErrorsSummary() - - logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s", - time.Since(appStats.StartTime), - tapErrors.nErrors, - errorMapLen, - errorsSummery, - ) - - // At this moment - memStats := runtime.MemStats{} - runtime.ReadMemStats(&memStats) - logger.Log.Infof( - "mem: %d, goroutines: %d", - memStats.HeapAlloc, - runtime.NumGoroutine(), - ) - - // Since the last print - cleanStats := cleaner.dumpStats() - logger.Log.Infof( - "cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d", - cleanStats.flushed, - cleanStats.closed, - cleanStats.deleted, - ) - currentAppStats := appStats.DumpStats() - appStatsJSON, _ := json.Marshal(currentAppStats) - logger.Log.Infof("app stats - %v", string(appStatsJSON)) - } - }() - - if GetMemoryProfilingEnabled() { - startMemoryProfiler() + if diagnose.TapErrors.OutputLevel >= 2 { + assembler.dumpStreamPool() } - for { - packet, err := source.NextPacket() - if err == io.EOF { - break - } else if err != nil { - if err.Error() != "Timeout Expired" { - logger.Log.Debugf("Error: %T", err) - } - continue - } - packetsCount := appStats.IncPacketsCount() - logger.Log.Debugf("PACKET #%d", packetsCount) - data := packet.Data() - appStats.UpdateProcessedBytes(uint64(len(data))) - if *hexdumppkt { - logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data)) - } - - // defrag the IPv4 packet if required - if !*nodefrag { - ip4Layer := packet.Layer(layers.LayerTypeIPv4) - if ip4Layer == nil { - continue - } - ip4 := ip4Layer.(*layers.IPv4) - l := ip4.Length - newip4, err := defragger.DefragIPv4(ip4) - if err != nil { - logger.Log.Fatal("Error while de-fragmenting", err) - } else if newip4 == nil { - logger.Log.Debugf("Fragment...") - continue // packet fragment, we don't have whole packet yet. - } - if newip4.Length != l { - stats.ipdefrag++ - logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType()) - pb, ok := packet.(gopacket.PacketBuilder) - if !ok { - logger.Log.Panic("Not a PacketBuilder") - } - nextDecoder := newip4.NextLayerType() - _ = nextDecoder.Decode(newip4.Payload, pb) - } - } - - tcp := packet.Layer(layers.LayerTypeTCP) - if tcp != nil { - appStats.IncTcpPacketsCount() - tcp := tcp.(*layers.TCP) - if *checksum { - err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer()) - if err != nil { - logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err) - } - } - c := Context{ - CaptureInfo: packet.Metadata().CaptureInfo, - } - stats.totalsz += len(tcp.Payload) - logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort) - assemblerMutex.Lock() - assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c) - assemblerMutex.Unlock() - } - - done := *maxcount > 0 && int64(appStats.PacketsCount) >= *maxcount - if done { - errorMapLen, _ := tapErrors.getErrorsSummary() - logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", - appStats.PacketsCount, - appStats.ProcessedBytes, - time.Since(appStats.StartTime), - tapErrors.nErrors, - errorMapLen) - } - select { - case <-signalChan: - logger.Log.Infof("Caught SIGINT: aborting") - done = true - default: - // NOP: continue - } - if done { - break - } + if err := diagnose.DumpMemoryProfile(*memprofile); err != nil { + logger.Log.Errorf("Error dumping memory profile %v\n", err) } - assemblerMutex.Lock() - closed := assembler.FlushAll() - assemblerMutex.Unlock() - logger.Log.Debugf("Final flush: %d closed", closed) - if outputLevel >= 2 { - streamPool.Dump() - } + assembler.waitAndDump() - if *memprofile != "" { - f, err := os.Create(*memprofile) - if err != nil { - logger.Log.Fatal(err) - } - _ = pprof.WriteHeapProfile(f) - _ = f.Close() - } - - streamFactory.WaitGoRoutines() - assemblerMutex.Lock() - logger.Log.Debugf("%s", assembler.Dump()) - assemblerMutex.Unlock() - if !*nodefrag { - logger.Log.Infof("IPdefrag:\t\t%d", stats.ipdefrag) - } - logger.Log.Infof("TCP stats:") - logger.Log.Infof(" missed bytes:\t\t%d", stats.missedBytes) - logger.Log.Infof(" total packets:\t\t%d", stats.pkt) - logger.Log.Infof(" rejected FSM:\t\t%d", stats.rejectFsm) - logger.Log.Infof(" rejected Options:\t%d", stats.rejectOpt) - logger.Log.Infof(" reassembled bytes:\t%d", stats.sz) - logger.Log.Infof(" total TCP bytes:\t%d", stats.totalsz) - logger.Log.Infof(" conn rejected FSM:\t%d", stats.rejectConnFsm) - logger.Log.Infof(" reassembled chunks:\t%d", stats.reassembled) - logger.Log.Infof(" out-of-order packets:\t%d", stats.outOfOrderPackets) - logger.Log.Infof(" out-of-order bytes:\t%d", stats.outOfOrderBytes) - logger.Log.Infof(" biggest-chunk packets:\t%d", stats.biggestChunkPackets) - logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes) - logger.Log.Infof(" overlap packets:\t%d", stats.overlapPackets) - logger.Log.Infof(" overlap bytes:\t\t%d", stats.overlapBytes) - logger.Log.Infof("Errors: %d", tapErrors.nErrors) - for e := range tapErrors.errorsMap { - logger.Log.Infof(" %s:\t\t%d", e, tapErrors.errorsMap[e]) - } - logger.Log.Infof("AppStats: %v", GetStats()) + diagnose.InternalStats.PrintStatsSummary() + diagnose.TapErrors.PrintSummary() + logger.Log.Infof("AppStats: %v", diagnose.AppStats) } diff --git a/tap/source/tcp_packet_source.go b/tap/source/tcp_packet_source.go new file mode 100644 index 000000000..61ff7446d --- /dev/null +++ b/tap/source/tcp_packet_source.go @@ -0,0 +1,150 @@ +package source + +import ( + "fmt" + "io" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/ip4defrag" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" + "github.com/up9inc/mizu/shared/logger" + "github.com/up9inc/mizu/tap/diagnose" +) + +type TcpPacketSource struct { + source *gopacket.PacketSource + handle *pcap.Handle + defragger *ip4defrag.IPv4Defragmenter + Behaviour *TcpPacketSourceBehaviour +} + +type TcpPacketSourceBehaviour struct { + SnapLength int + Promisc bool + Tstype string + DecoderName string + Lazy bool + BpfFilter string +} + +type TcpPacketInfo struct { + Packet gopacket.Packet + Source *TcpPacketSource +} + +func NewTcpPacketSource(filename string, interfaceName string, + behaviour TcpPacketSourceBehaviour) (*TcpPacketSource, error) { + var err error + + result := &TcpPacketSource{ + defragger: ip4defrag.NewIPv4Defragmenter(), + Behaviour: &behaviour, + } + + if filename != "" { + if result.handle, err = pcap.OpenOffline(filename); err != nil { + return result, fmt.Errorf("PCAP OpenOffline error: %v", err) + } + } else { + // This is a little complicated because we want to allow all possible options + // for creating the packet capture handle... instead of all this you can + // just call pcap.OpenLive if you want a simple handle. + inactive, err := pcap.NewInactiveHandle(interfaceName) + if err != nil { + return result, fmt.Errorf("could not create: %v", err) + } + defer inactive.CleanUp() + if err = inactive.SetSnapLen(behaviour.SnapLength); err != nil { + return result, fmt.Errorf("could not set snap length: %v", err) + } else if err = inactive.SetPromisc(behaviour.Promisc); err != nil { + return result, fmt.Errorf("could not set promisc mode: %v", err) + } else if err = inactive.SetTimeout(time.Second); err != nil { + return result, fmt.Errorf("could not set timeout: %v", err) + } + if behaviour.Tstype != "" { + if t, err := pcap.TimestampSourceFromString(behaviour.Tstype); err != nil { + return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps()) + } else if err := inactive.SetTimestampSource(t); err != nil { + return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps()) + } + } + if result.handle, err = inactive.Activate(); err != nil { + return result, fmt.Errorf("PCAP Activate error: %v", err) + } + } + if behaviour.BpfFilter != "" { + logger.Log.Infof("Using BPF filter %q", behaviour.BpfFilter) + if err = result.handle.SetBPFFilter(behaviour.BpfFilter); err != nil { + return nil, fmt.Errorf("BPF filter error: %v", err) + } + } + + var dec gopacket.Decoder + var ok bool + if behaviour.DecoderName == "" { + behaviour.DecoderName = result.handle.LinkType().String() + } + if dec, ok = gopacket.DecodersByLayerName[behaviour.DecoderName]; !ok { + return nil, fmt.Errorf("no decoder named %v", behaviour.DecoderName) + } + result.source = gopacket.NewPacketSource(result.handle, dec) + result.source.Lazy = behaviour.Lazy + result.source.NoCopy = true + + return result, nil +} + +func (source *TcpPacketSource) Close() { + if source.handle != nil { + source.handle.Close() + } +} + +func (source *TcpPacketSource) ReadPackets(ipdefrag bool, packets chan<- TcpPacketInfo) error { + for { + packet, err := source.source.NextPacket() + + if err == io.EOF { + return err + } else if err != nil { + if err.Error() != "Timeout Expired" { + logger.Log.Debugf("Error: %T", err) + } + continue + } + + // defrag the IPv4 packet if required + if !ipdefrag { + ip4Layer := packet.Layer(layers.LayerTypeIPv4) + if ip4Layer == nil { + continue + } + ip4 := ip4Layer.(*layers.IPv4) + l := ip4.Length + newip4, err := source.defragger.DefragIPv4(ip4) + if err != nil { + logger.Log.Fatal("Error while de-fragmenting", err) + } else if newip4 == nil { + logger.Log.Debugf("Fragment...") + continue // packet fragment, we don't have whole packet yet. + } + if newip4.Length != l { + diagnose.InternalStats.Ipdefrag++ + logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType()) + pb, ok := packet.(gopacket.PacketBuilder) + if !ok { + logger.Log.Panic("Not a PacketBuilder") + } + nextDecoder := newip4.NextLayerType() + _ = nextDecoder.Decode(newip4.Payload, pb) + } + } + + packets <- TcpPacketInfo{ + Packet: packet, + Source: source, + } + } +} diff --git a/tap/tcp_assembler.go b/tap/tcp_assembler.go new file mode 100644 index 000000000..4dd05b095 --- /dev/null +++ b/tap/tcp_assembler.go @@ -0,0 +1,132 @@ +package tap + +import ( + "encoding/hex" + "os" + "os/signal" + "sync" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/reassembly" + "github.com/up9inc/mizu/shared/logger" + "github.com/up9inc/mizu/tap/api" + "github.com/up9inc/mizu/tap/diagnose" + "github.com/up9inc/mizu/tap/source" +) + +type tcpAssembler struct { + *reassembly.Assembler + streamPool *reassembly.StreamPool + streamFactory *tcpStreamFactory + assemblerMutex sync.Mutex +} + +// Context +// The assembler context +type context struct { + CaptureInfo gopacket.CaptureInfo +} + +func (c *context) GetCaptureInfo() gopacket.CaptureInfo { + return c.CaptureInfo +} + +func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap *tcpStreamMap) *tcpAssembler { + var emitter api.Emitter = &api.Emitting{ + AppStats: &diagnose.AppStats, + OutputChannel: outputItems, + } + + streamFactory := NewTcpStreamFactory(emitter, streamsMap) + streamPool := reassembly.NewStreamPool(streamFactory) + assembler := reassembly.NewAssembler(streamPool) + + maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection() + maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal() + logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", + maxBufferedPagesTotal, maxBufferedPagesPerConnection) + assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal + assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection + + return &tcpAssembler{ + Assembler: assembler, + streamPool: streamPool, + streamFactory: streamFactory, + } +} + +func (a *tcpAssembler) processPackets(dumpPacket bool, packets <-chan source.TcpPacketInfo) { + signalChan := make(chan os.Signal, 1) + signal.Notify(signalChan, os.Interrupt) + + for packetInfo := range packets { + packetsCount := diagnose.AppStats.IncPacketsCount() + logger.Log.Debugf("PACKET #%d", packetsCount) + packet := packetInfo.Packet + data := packet.Data() + diagnose.AppStats.UpdateProcessedBytes(uint64(len(data))) + if dumpPacket { + logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data)) + } + + tcp := packet.Layer(layers.LayerTypeTCP) + if tcp != nil { + diagnose.AppStats.IncTcpPacketsCount() + tcp := tcp.(*layers.TCP) + if *checksum { + err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer()) + if err != nil { + logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err) + } + } + c := context{ + CaptureInfo: packet.Metadata().CaptureInfo, + } + diagnose.InternalStats.Totalsz += len(tcp.Payload) + logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort) + a.assemblerMutex.Lock() + a.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c) + a.assemblerMutex.Unlock() + } + + done := *maxcount > 0 && int64(diagnose.AppStats.PacketsCount) >= *maxcount + if done { + errorMapLen, _ := diagnose.TapErrors.GetErrorsSummary() + logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", + diagnose.AppStats.PacketsCount, + diagnose.AppStats.ProcessedBytes, + time.Since(diagnose.AppStats.StartTime), + diagnose.TapErrors.ErrorsCount, + errorMapLen) + } + + select { + case <-signalChan: + logger.Log.Infof("Caught SIGINT: aborting") + done = true + default: + // NOP: continue + } + if done { + break + } + } + + a.assemblerMutex.Lock() + closed := a.FlushAll() + a.assemblerMutex.Unlock() + logger.Log.Debugf("Final flush: %d closed", closed) +} + +func (a *tcpAssembler) dumpStreamPool() { + a.streamPool.Dump() +} + +func (a *tcpAssembler) waitAndDump() { + a.streamFactory.WaitGoRoutines() + a.assemblerMutex.Lock() + logger.Log.Debugf("%s", a.Dump()) + a.assemblerMutex.Unlock() +} diff --git a/tap/tcp_packet_source.go b/tap/tcp_packet_source.go new file mode 100644 index 000000000..d5b96736d --- /dev/null +++ b/tap/tcp_packet_source.go @@ -0,0 +1,150 @@ +package tap + +import ( + "fmt" + "io" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/ip4defrag" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/pcap" + "github.com/up9inc/mizu/shared/logger" + "github.com/up9inc/mizu/tap/diagnose" +) + +type tcpPacketSource struct { + source *gopacket.PacketSource + handle *pcap.Handle + defragger *ip4defrag.IPv4Defragmenter + behaviour *tcpPacketSourceBehaviour +} + +type tcpPacketSourceBehaviour struct { + snapLength int + promisc bool + tstype string + decoderName string + lazy bool + bpfFilter string +} + +type tcpPacketInfo struct { + packet gopacket.Packet + source *tcpPacketSource +} + +func NewTcpPacketSource(filename string, interfaceName string, + behaviour tcpPacketSourceBehaviour) (*tcpPacketSource, error) { + var err error + + result := &tcpPacketSource{ + defragger: ip4defrag.NewIPv4Defragmenter(), + behaviour: &behaviour, + } + + if filename != "" { + if result.handle, err = pcap.OpenOffline(filename); err != nil { + return result, fmt.Errorf("PCAP OpenOffline error: %v", err) + } + } else { + // This is a little complicated because we want to allow all possible options + // for creating the packet capture handle... instead of all this you can + // just call pcap.OpenLive if you want a simple handle. + inactive, err := pcap.NewInactiveHandle(interfaceName) + if err != nil { + return result, fmt.Errorf("could not create: %v", err) + } + defer inactive.CleanUp() + if err = inactive.SetSnapLen(behaviour.snapLength); err != nil { + return result, fmt.Errorf("could not set snap length: %v", err) + } else if err = inactive.SetPromisc(behaviour.promisc); err != nil { + return result, fmt.Errorf("could not set promisc mode: %v", err) + } else if err = inactive.SetTimeout(time.Second); err != nil { + return result, fmt.Errorf("could not set timeout: %v", err) + } + if behaviour.tstype != "" { + if t, err := pcap.TimestampSourceFromString(behaviour.tstype); err != nil { + return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps()) + } else if err := inactive.SetTimestampSource(t); err != nil { + return result, fmt.Errorf("supported timestamp types: %v", inactive.SupportedTimestamps()) + } + } + if result.handle, err = inactive.Activate(); err != nil { + return result, fmt.Errorf("PCAP Activate error: %v", err) + } + } + if behaviour.bpfFilter != "" { + logger.Log.Infof("Using BPF filter %q", behaviour.bpfFilter) + if err = result.handle.SetBPFFilter(behaviour.bpfFilter); err != nil { + return nil, fmt.Errorf("BPF filter error: %v", err) + } + } + + var dec gopacket.Decoder + var ok bool + if behaviour.decoderName == "" { + behaviour.decoderName = result.handle.LinkType().String() + } + if dec, ok = gopacket.DecodersByLayerName[behaviour.decoderName]; !ok { + return nil, fmt.Errorf("no decoder named %v", behaviour.decoderName) + } + result.source = gopacket.NewPacketSource(result.handle, dec) + result.source.Lazy = behaviour.lazy + result.source.NoCopy = true + + return result, nil +} + +func (source *tcpPacketSource) close() { + if source.handle != nil { + source.handle.Close() + } +} + +func (source *tcpPacketSource) readPackets(ipdefrag bool, packets chan<- tcpPacketInfo) error { + for { + packet, err := source.source.NextPacket() + + if err == io.EOF { + return err + } else if err != nil { + if err.Error() != "Timeout Expired" { + logger.Log.Debugf("Error: %T", err) + } + continue + } + + // defrag the IPv4 packet if required + if !ipdefrag { + ip4Layer := packet.Layer(layers.LayerTypeIPv4) + if ip4Layer == nil { + continue + } + ip4 := ip4Layer.(*layers.IPv4) + l := ip4.Length + newip4, err := source.defragger.DefragIPv4(ip4) + if err != nil { + logger.Log.Fatal("Error while de-fragmenting", err) + } else if newip4 == nil { + logger.Log.Debugf("Fragment...") + continue // packet fragment, we don't have whole packet yet. + } + if newip4.Length != l { + diagnose.InternalStats.Ipdefrag++ + logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType()) + pb, ok := packet.(gopacket.PacketBuilder) + if !ok { + logger.Log.Panic("Not a PacketBuilder") + } + nextDecoder := newip4.NextLayerType() + _ = nextDecoder.Decode(newip4.Payload, pb) + } + } + + packets <- tcpPacketInfo{ + packet: packet, + source: source, + } + } +} diff --git a/tap/tcp_stream.go b/tap/tcp_stream.go index 06db746f5..7a9b53e30 100644 --- a/tap/tcp_stream.go +++ b/tap/tcp_stream.go @@ -9,6 +9,7 @@ import ( "github.com/google/gopacket/layers" // pulls in all layers decoders "github.com/google/gopacket/reassembly" "github.com/up9inc/mizu/tap/api" + "github.com/up9inc/mizu/tap/diagnose" ) /* It's a connection (bidirectional) @@ -36,11 +37,11 @@ type tcpStream struct { func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool { // FSM if !t.tcpstate.CheckState(tcp, dir) { - tapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String()) - stats.rejectFsm++ + diagnose.TapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String()) + diagnose.InternalStats.RejectFsm++ if !t.fsmerr { t.fsmerr = true - stats.rejectConnFsm++ + diagnose.InternalStats.RejectConnFsm++ } if !*ignorefsmerr { return false @@ -49,8 +50,8 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem // Options err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start) if err != nil { - tapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err) - stats.rejectOpt++ + diagnose.TapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err) + diagnose.InternalStats.RejectOpt++ if !*nooptcheck { return false } @@ -60,15 +61,15 @@ func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassem if *checksum { c, err := tcp.ComputeChecksum() if err != nil { - tapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err) + diagnose.TapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err) accept = false } else if c != 0x0 { - tapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c) + diagnose.TapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c) accept = false } } if !accept { - stats.rejectOpt++ + diagnose.InternalStats.RejectOpt++ } return accept } @@ -79,28 +80,28 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass // update stats sgStats := sg.Stats() if skip > 0 { - stats.missedBytes += skip + diagnose.InternalStats.MissedBytes += skip } - stats.sz += length - saved - stats.pkt += sgStats.Packets + diagnose.InternalStats.Sz += length - saved + diagnose.InternalStats.Pkt += sgStats.Packets if sgStats.Chunks > 1 { - stats.reassembled++ + diagnose.InternalStats.Reassembled++ } - stats.outOfOrderPackets += sgStats.QueuedPackets - stats.outOfOrderBytes += sgStats.QueuedBytes - if length > stats.biggestChunkBytes { - stats.biggestChunkBytes = length + diagnose.InternalStats.OutOfOrderPackets += sgStats.QueuedPackets + diagnose.InternalStats.OutOfOrderBytes += sgStats.QueuedBytes + if length > diagnose.InternalStats.BiggestChunkBytes { + diagnose.InternalStats.BiggestChunkBytes = length } - if sgStats.Packets > stats.biggestChunkPackets { - stats.biggestChunkPackets = sgStats.Packets + if sgStats.Packets > diagnose.InternalStats.BiggestChunkPackets { + diagnose.InternalStats.BiggestChunkPackets = sgStats.Packets } if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 { // In the original example this was handled with panic(). // I don't know what this error means or how to handle it properly. - tapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets) + diagnose.TapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets) } - stats.overlapBytes += sgStats.OverlapBytes - stats.overlapPackets += sgStats.OverlapPackets + diagnose.InternalStats.OverlapBytes += sgStats.OverlapBytes + diagnose.InternalStats.OverlapPackets += sgStats.OverlapPackets var ident string if dir == reassembly.TCPDirClientToServer { @@ -108,7 +109,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } else { ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir) } - tapErrors.Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets) + diagnose.TapErrors.Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets) if skip == -1 && *allowmissinginit { // this is allowed } else if skip != 0 { @@ -127,18 +128,18 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } dnsSize := binary.BigEndian.Uint16(data[:2]) missing := int(dnsSize) - len(data[2:]) - tapErrors.Debug("dnsSize: %d, missing: %d", dnsSize, missing) + diagnose.TapErrors.Debug("dnsSize: %d, missing: %d", dnsSize, missing) if missing > 0 { - tapErrors.Debug("Missing some bytes: %d", missing) + diagnose.TapErrors.Debug("Missing some bytes: %d", missing) sg.KeepFrom(0) return } p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns) err := p.DecodeLayers(data[2:], &decoded) if err != nil { - tapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err) + diagnose.TapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err) } else { - tapErrors.Debug("DNS: %s", gopacket.LayerDump(dns)) + diagnose.TapErrors.Debug("DNS: %s", gopacket.LayerDump(dns)) } if len(data) > 2+int(dnsSize) { sg.KeepFrom(2 + int(dnsSize)) @@ -147,7 +148,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass if length > 0 { // This is where we pass the reassembled information onwards // This channel is read by an tcpReader object - appStats.IncReassembledTcpPayloadsCount() + diagnose.AppStats.IncReassembledTcpPayloadsCount() timestamp := ac.GetCaptureInfo().Timestamp if dir == reassembly.TCPDirClientToServer { for i := range t.clients { @@ -173,7 +174,7 @@ func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.Ass } func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool { - tapErrors.Debug("%s: Connection closed", t.ident) + diagnose.TapErrors.Debug("%s: Connection closed", t.ident) if t.isTapTarget && !t.isClosed { t.Close() } diff --git a/tap/tcp_streams_map.go b/tap/tcp_streams_map.go index 3fddf6369..2ea55be8e 100644 --- a/tap/tcp_streams_map.go +++ b/tap/tcp_streams_map.go @@ -7,6 +7,7 @@ import ( "time" "github.com/up9inc/mizu/shared/logger" + "github.com/up9inc/mizu/tap/diagnose" ) type tcpStreamMap struct { @@ -44,9 +45,9 @@ func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() { if stream.superIdentifier.Protocol == nil { if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) { stream.Close() - appStats.IncDroppedTcpStreams() + diagnose.AppStats.IncDroppedTcpStreams() logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", - appStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000) + diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000) } } else { if !stream.superIdentifier.IsClosedOthers { diff --git a/tap/tester/tester.go b/tap/tester/tester.go index 91a8fcc36..d42d0dbd8 100644 --- a/tap/tester/tester.go +++ b/tap/tester/tester.go @@ -2,7 +2,6 @@ package main import ( "bufio" - "fmt" "io/ioutil" "os" "path" @@ -34,7 +33,7 @@ func loadExtensions() ([]*tapApi.Extension, error) { continue } - fmt.Printf("Loading extension: %s\n", filename) + logger.Log.Infof("Loading extension: %s\n", filename) extension := &tapApi.Extension{ Path: path.Join(extensionsDir, filename), @@ -69,7 +68,7 @@ func loadExtensions() ([]*tapApi.Extension, error) { }) for _, extension := range extensions { - fmt.Printf("Extension Properties: %+v\n", extension) + logger.Log.Infof("Extension Properties: %+v\n", extension) } return extensions, nil @@ -93,7 +92,7 @@ func internalRun() error { tap.StartPassiveTapper(&opts, outputItems, extenssions, &tapOpts) - fmt.Printf("Tapping, press enter to exit...\n") + logger.Log.Infof("Tapping, press enter to exit...\n") reader := bufio.NewReader(os.Stdin) reader.ReadLine() return nil @@ -105,9 +104,9 @@ func main() { if err != nil { switch err := err.(type) { case *errors.Error: - fmt.Printf("Error: %v\n", err.ErrorStack()) + logger.Log.Errorf("Error: %v\n", err.ErrorStack()) default: - fmt.Printf("Error: %v\n", err) + logger.Log.Errorf("Error: %v\n", err) } os.Exit(1)