From 58e9363fda8ea30a9e85d6d39990bab79b5c97ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?M=2E=20Mert=20Y=C4=B1ld=C4=B1ran?= Date: Mon, 18 Oct 2021 16:35:42 +0300 Subject: [PATCH] Replace all `rlog` occurrences with the shared logger in `tap` (#369) --- tap/cleaner.go | 6 +- tap/passive_tapper.go | 129 +++++++++++++++++++------------------- tap/tcp_reader.go | 4 +- tap/tcp_stream_factory.go | 14 ++--- 4 files changed, 75 insertions(+), 78 deletions(-) diff --git a/tap/cleaner.go b/tap/cleaner.go index 10a52968d..813735e66 100644 --- a/tap/cleaner.go +++ b/tap/cleaner.go @@ -5,7 +5,7 @@ import ( "time" "github.com/google/gopacket/reassembly" - "github.com/romana/rlog" + "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/tap/api" ) @@ -28,7 +28,7 @@ func (cl *Cleaner) clean() { startCleanTime := time.Now() cl.assemblerMutex.Lock() - rlog.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump()) + logger.Log.Debugf("Assembler Stats before cleaning %s", cl.assembler.Dump()) flushed, closed := cl.assembler.FlushCloseOlderThan(startCleanTime.Add(-cl.connectionTimeout)) cl.assemblerMutex.Unlock() @@ -38,7 +38,7 @@ func (cl *Cleaner) clean() { } cl.statsMutex.Lock() - rlog.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump()) + logger.Log.Debugf("Assembler Stats after cleaning %s", cl.assembler.Dump()) cl.stats.flushed += flushed cl.stats.closed += closed cl.statsMutex.Unlock() diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 84ecea801..8673e8fc2 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -14,7 +14,6 @@ import ( "flag" "fmt" "io" - "log" "os" "os/signal" "runtime" @@ -25,14 +24,13 @@ import ( "sync" "time" - "github.com/romana/rlog" - "github.com/google/gopacket" "github.com/google/gopacket/examples/util" "github.com/google/gopacket/ip4defrag" "github.com/google/gopacket/layers" // pulls in all layers decoders "github.com/google/gopacket/pcap" "github.com/google/gopacket/reassembly" + "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/tap/api" ) @@ -102,7 +100,7 @@ const baseStreamChannelTimeoutMs int = 5000 * 100 /* minOutputLevel: Error will be printed only if outputLevel is above this value * t: key for errorsMap (counting errors) - * s, a: arguments log.Printf + * s, a: arguments logger.Log.Infof * Note: Too bad for perf that a... is evaluated */ func logError(minOutputLevel int, t string, s string, a ...interface{}) { @@ -114,7 +112,7 @@ func logError(minOutputLevel int, t string, s string, a ...interface{}) { if outputLevel >= minOutputLevel { formatStr := fmt.Sprintf("%s: %s", t, s) - rlog.Errorf(formatStr, a...) + logger.Log.Errorf(formatStr, a...) } } func Error(t string, s string, a ...interface{}) { @@ -124,10 +122,10 @@ func SilentError(t string, s string, a ...interface{}) { logError(2, t, s, a...) } func Debug(s string, a ...interface{}) { - rlog.Debugf(s, a...) + logger.Log.Debugf(s, a...) } func Trace(s string, a ...interface{}) { - rlog.Tracef(1, s, a...) + logger.Log.Infof(s, a...) } func inArrayInt(arr []int, valueToCheck int) bool { @@ -188,11 +186,11 @@ func startMemoryProfiler() { } } - rlog.Info("Profiling is on, results will be written to %s", dumpPath) + logger.Log.Info("Profiling is on, results will be written to %s", dumpPath) go func() { if _, err := os.Stat(dumpPath); os.IsNotExist(err) { if err := os.Mkdir(dumpPath, 0777); err != nil { - log.Fatal("could not create directory for profile: ", err) + logger.Log.Fatal("could not create directory for profile: ", err) } } @@ -201,15 +199,15 @@ func startMemoryProfiler() { filename := fmt.Sprintf("%s/%s__mem.prof", dumpPath, t.Format("15_04_05")) - rlog.Infof("Writing memory profile to %s\n", filename) + logger.Log.Infof("Writing memory profile to %s\n", filename) f, err := os.Create(filename) if err != nil { - log.Fatal("could not create memory profile: ", err) + logger.Log.Fatal("could not create memory profile: ", err) } runtime.GC() // get up-to-date statistics if err := pprof.WriteHeapProfile(f); err != nil { - log.Fatal("could not write memory profile: ", err) + logger.Log.Fatal("could not write memory profile: ", err) } _ = f.Close() time.Sleep(time.Second * time.Duration(timeInterval)) @@ -229,7 +227,7 @@ func closeTimedoutTcpStreamChannels() { if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(TcpStreamChannelTimeoutMs)) { stream.Close() appStats.IncDroppedTcpStreams() - rlog.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000) + logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d\n", appStats.DroppedTcpStreams, runtime.NumGoroutine(), TcpStreamChannelTimeoutMs/1000000) } } else { if !stream.superIdentifier.IsClosedOthers { @@ -254,7 +252,6 @@ func closeTimedoutTcpStreamChannels() { } func startPassiveTapper(outputItems chan *api.OutputChannelItem) { - log.SetFlags(log.LstdFlags | log.LUTC | log.Lshortfile) go closeTimedoutTcpStreamChannels() defer util.Run()() @@ -269,8 +266,8 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { if localhostIPs, err := getLocalhostIPs(); err != nil { // TODO: think this over - rlog.Info("Failed to get self IP addresses") - rlog.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err) + logger.Log.Info("Failed to get self IP addresses") + logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err) ownIps = make([]string, 0) } else { ownIps = localhostIPs @@ -280,7 +277,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { var err error if *fname != "" { if handle, err = pcap.OpenOffline(*fname); err != nil { - log.Fatalf("PCAP OpenOffline error: %v", err) + logger.Log.Fatalf("PCAP OpenOffline error: %v", err) } } else { // This is a little complicated because we want to allow all possible options @@ -288,33 +285,33 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { // just call pcap.OpenLive if you want a simple handle. inactive, err := pcap.NewInactiveHandle(*iface) if err != nil { - log.Fatalf("could not create: %v", err) + logger.Log.Fatalf("could not create: %v", err) } defer inactive.CleanUp() if err = inactive.SetSnapLen(*snaplen); err != nil { - log.Fatalf("could not set snap length: %v", err) + logger.Log.Fatalf("could not set snap length: %v", err) } else if err = inactive.SetPromisc(*promisc); err != nil { - log.Fatalf("could not set promisc mode: %v", err) + logger.Log.Fatalf("could not set promisc mode: %v", err) } else if err = inactive.SetTimeout(time.Second); err != nil { - log.Fatalf("could not set timeout: %v", err) + logger.Log.Fatalf("could not set timeout: %v", err) } if *tstype != "" { if t, err := pcap.TimestampSourceFromString(*tstype); err != nil { - log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps()) + logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps()) } else if err := inactive.SetTimestampSource(t); err != nil { - log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps()) + logger.Log.Fatalf("Supported timestamp types: %v", inactive.SupportedTimestamps()) } } if handle, err = inactive.Activate(); err != nil { - log.Fatalf("PCAP Activate error: %v", err) + logger.Log.Fatalf("PCAP Activate error: %v", err) } defer handle.Close() } if len(flag.Args()) > 0 { bpffilter := strings.Join(flag.Args(), " ") - rlog.Infof("Using BPF filter %q", bpffilter) + logger.Log.Infof("Using BPF filter %q", bpffilter) if err = handle.SetBPFFilter(bpffilter); err != nil { - log.Fatalf("BPF filter error: %v", err) + logger.Log.Fatalf("BPF filter error: %v", err) } } @@ -325,12 +322,12 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { decoderName = fmt.Sprintf("%s", handle.LinkType()) } if dec, ok = gopacket.DecodersByLayerName[decoderName]; !ok { - log.Fatalln("No decoder named", decoderName) + logger.Log.Fatal("No decoder named", decoderName) } source := gopacket.NewPacketSource(handle, dec) source.Lazy = *lazy source.NoCopy = true - rlog.Info("Starting to read packets") + logger.Log.Info("Starting to read packets") appStats.SetStartTime(time.Now()) defragger := ip4defrag.NewIPv4Defragmenter() @@ -347,7 +344,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { maxBufferedPagesTotal := GetMaxBufferedPagesPerConnection() maxBufferedPagesPerConnection := GetMaxBufferedPagesTotal() - rlog.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection) + logger.Log.Infof("Assembler options: maxBufferedPagesTotal=%d, maxBufferedPagesPerConnection=%d", maxBufferedPagesTotal, maxBufferedPagesPerConnection) assembler.AssemblerOptions.MaxBufferedPagesTotal = maxBufferedPagesTotal assembler.AssemblerOptions.MaxBufferedPagesPerConnection = maxBufferedPagesPerConnection @@ -377,7 +374,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { errorMapLen := len(errorsMap) errorsSummery := fmt.Sprintf("%v", errorsMap) errorsMapMutex.Unlock() - log.Printf("%v (errors: %v, errTypes:%v) - Errors Summary: %s", + logger.Log.Infof("%v (errors: %v, errTypes:%v) - Errors Summary: %s", time.Since(appStats.StartTime), nErrors, errorMapLen, @@ -387,7 +384,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { // At this moment memStats := runtime.MemStats{} runtime.ReadMemStats(&memStats) - log.Printf( + logger.Log.Infof( "mem: %d, goroutines: %d", memStats.HeapAlloc, runtime.NumGoroutine(), @@ -395,7 +392,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { // Since the last print cleanStats := cleaner.dumpStats() - log.Printf( + logger.Log.Infof( "cleaner - flushed connections: %d, closed connections: %d, deleted messages: %d", cleanStats.flushed, cleanStats.closed, @@ -403,7 +400,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { ) currentAppStats := appStats.DumpStats() appStatsJSON, _ := json.Marshal(currentAppStats) - log.Printf("app stats - %v", string(appStatsJSON)) + logger.Log.Infof("app stats - %v", string(appStatsJSON)) } }() @@ -416,15 +413,15 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { if err == io.EOF { break } else if err != nil { - rlog.Debugf("Error:", err) + logger.Log.Debugf("Error: %v", err) continue } packetsCount := appStats.IncPacketsCount() - rlog.Debugf("PACKET #%d", packetsCount) + logger.Log.Debugf("PACKET #%d", packetsCount) data := packet.Data() appStats.UpdateProcessedBytes(uint64(len(data))) if *hexdumppkt { - rlog.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data)) + logger.Log.Debugf("Packet content (%d/0x%x) - %s", len(data), len(data), hex.Dump(data)) } // defrag the IPv4 packet if required @@ -437,17 +434,17 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { l := ip4.Length newip4, err := defragger.DefragIPv4(ip4) if err != nil { - log.Fatalln("Error while de-fragmenting", err) + logger.Log.Fatal("Error while de-fragmenting", err) } else if newip4 == nil { - rlog.Debugf("Fragment...") + logger.Log.Debugf("Fragment...") continue // packet fragment, we don't have whole packet yet. } if newip4.Length != l { stats.ipdefrag++ - rlog.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType()) + logger.Log.Debugf("Decoding re-assembled packet: %s", newip4.NextLayerType()) pb, ok := packet.(gopacket.PacketBuilder) if !ok { - log.Panic("Not a PacketBuilder") + logger.Log.Panic("Not a PacketBuilder") } nextDecoder := newip4.NextLayerType() _ = nextDecoder.Decode(newip4.Payload, pb) @@ -461,14 +458,14 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { if *checksum { err := tcp.SetNetworkLayerForChecksum(packet.NetworkLayer()) if err != nil { - log.Fatalf("Failed to set network layer for checksum: %s\n", err) + logger.Log.Fatalf("Failed to set network layer for checksum: %s\n", err) } } c := Context{ CaptureInfo: packet.Metadata().CaptureInfo, } stats.totalsz += len(tcp.Payload) - rlog.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort) + logger.Log.Debugf("%s : %v -> %s : %v", packet.NetworkLayer().NetworkFlow().Src(), tcp.SrcPort, packet.NetworkLayer().NetworkFlow().Dst(), tcp.DstPort) assemblerMutex.Lock() assembler.AssembleWithContext(packet.NetworkLayer().NetworkFlow(), tcp, &c) assemblerMutex.Unlock() @@ -479,7 +476,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { errorsMapMutex.Lock() errorMapLen := len(errorsMap) errorsMapMutex.Unlock() - log.Printf("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", + logger.Log.Infof("Processed %v packets (%v bytes) in %v (errors: %v, errTypes:%v)", appStats.PacketsCount, appStats.ProcessedBytes, time.Since(appStats.StartTime), @@ -488,7 +485,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { } select { case <-signalChan: - log.Printf("Caught SIGINT: aborting") + logger.Log.Infof("Caught SIGINT: aborting") done = true default: // NOP: continue @@ -501,7 +498,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { assemblerMutex.Lock() closed := assembler.FlushAll() assemblerMutex.Unlock() - rlog.Debugf("Final flush: %d closed", closed) + logger.Log.Debugf("Final flush: %d closed", closed) if outputLevel >= 2 { streamPool.Dump() } @@ -509,7 +506,7 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { if *memprofile != "" { f, err := os.Create(*memprofile) if err != nil { - log.Fatal(err) + logger.Log.Fatal(err) } _ = pprof.WriteHeapProfile(f) _ = f.Close() @@ -517,29 +514,29 @@ func startPassiveTapper(outputItems chan *api.OutputChannelItem) { streamFactory.WaitGoRoutines() assemblerMutex.Lock() - rlog.Debugf("%s", assembler.Dump()) + logger.Log.Debugf("%s", assembler.Dump()) assemblerMutex.Unlock() if !*nodefrag { - log.Printf("IPdefrag:\t\t%d", stats.ipdefrag) + logger.Log.Infof("IPdefrag:\t\t%d", stats.ipdefrag) } - log.Printf("TCP stats:") - log.Printf(" missed bytes:\t\t%d", stats.missedBytes) - log.Printf(" total packets:\t\t%d", stats.pkt) - log.Printf(" rejected FSM:\t\t%d", stats.rejectFsm) - log.Printf(" rejected Options:\t%d", stats.rejectOpt) - log.Printf(" reassembled bytes:\t%d", stats.sz) - log.Printf(" total TCP bytes:\t%d", stats.totalsz) - log.Printf(" conn rejected FSM:\t%d", stats.rejectConnFsm) - log.Printf(" reassembled chunks:\t%d", stats.reassembled) - log.Printf(" out-of-order packets:\t%d", stats.outOfOrderPackets) - log.Printf(" out-of-order bytes:\t%d", stats.outOfOrderBytes) - log.Printf(" biggest-chunk packets:\t%d", stats.biggestChunkPackets) - log.Printf(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes) - log.Printf(" overlap packets:\t%d", stats.overlapPackets) - log.Printf(" overlap bytes:\t\t%d", stats.overlapBytes) - log.Printf("Errors: %d", nErrors) + logger.Log.Infof("TCP stats:") + logger.Log.Infof(" missed bytes:\t\t%d", stats.missedBytes) + logger.Log.Infof(" total packets:\t\t%d", stats.pkt) + logger.Log.Infof(" rejected FSM:\t\t%d", stats.rejectFsm) + logger.Log.Infof(" rejected Options:\t%d", stats.rejectOpt) + logger.Log.Infof(" reassembled bytes:\t%d", stats.sz) + logger.Log.Infof(" total TCP bytes:\t%d", stats.totalsz) + logger.Log.Infof(" conn rejected FSM:\t%d", stats.rejectConnFsm) + logger.Log.Infof(" reassembled chunks:\t%d", stats.reassembled) + logger.Log.Infof(" out-of-order packets:\t%d", stats.outOfOrderPackets) + logger.Log.Infof(" out-of-order bytes:\t%d", stats.outOfOrderBytes) + logger.Log.Infof(" biggest-chunk packets:\t%d", stats.biggestChunkPackets) + logger.Log.Infof(" biggest-chunk bytes:\t%d", stats.biggestChunkBytes) + logger.Log.Infof(" overlap packets:\t%d", stats.overlapPackets) + logger.Log.Infof(" overlap bytes:\t\t%d", stats.overlapBytes) + logger.Log.Infof("Errors: %d", nErrors) for e := range errorsMap { - log.Printf(" %s:\t\t%d", e, errorsMap[e]) + logger.Log.Infof(" %s:\t\t%d", e, errorsMap[e]) } - log.Printf("AppStats: %v", GetStats()) + logger.Log.Infof("AppStats: %v", GetStats()) } diff --git a/tap/tcp_reader.go b/tap/tcp_reader.go index 4b9604917..d80872ba7 100644 --- a/tap/tcp_reader.go +++ b/tap/tcp_reader.go @@ -9,7 +9,7 @@ import ( "time" "github.com/bradleyfalzon/tlsx" - "github.com/romana/rlog" + "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/tap/api" ) @@ -79,7 +79,7 @@ func (h *tcpReader) Read(p []byte) (int, error) { clientHello := tlsx.ClientHello{} err := clientHello.Unmarshall(msg.bytes) if err == nil { - rlog.Debugf("Detected TLS client hello with SNI %s\n", clientHello.SNI) + logger.Log.Debugf("Detected TLS client hello with SNI %s\n", clientHello.SNI) // TODO: Throws `panic: runtime error: invalid memory address or nil pointer dereference` error. // numericPort, _ := strconv.Atoi(h.tcpID.DstPort) // h.outboundLinkWriter.WriteOutboundLink(h.tcpID.SrcIP, h.tcpID.DstIP, numericPort, clientHello.SNI, TLSProtocol) diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index ad042556b..e42705106 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/romana/rlog" + "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/tap/api" "github.com/google/gopacket" @@ -33,7 +33,7 @@ var streams *sync.Map = &sync.Map{} // global var streamId int64 = 0 func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream { - rlog.Debugf("* NEW: %s %s", net, transport) + logger.Log.Debugf("* NEW: %s %s", net, transport) fsmOptions := reassembly.TCPSimpleFSMOptions{ SupportMissingEstablishment: *allowmissinginit, } @@ -123,21 +123,21 @@ func (factory *tcpStreamFactory) WaitGoRoutines() { func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps { if hostMode { if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) { - rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort)) + logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host1 %s:%s", dstIP, dstPort)) return &streamProps{isTapTarget: true, isOutgoing: false} } else if inArrayString(gSettings.filterAuthorities, dstIP) { - rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP)) + logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host2 %s", dstIP)) return &streamProps{isTapTarget: true, isOutgoing: false} } else if inArrayString(gSettings.filterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) { - rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s:%s", srcIP, srcPort)) + logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host3 %s:%s", srcIP, srcPort)) return &streamProps{isTapTarget: true, isOutgoing: true} } else if inArrayString(gSettings.filterAuthorities, srcIP) { - rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ host4 %s", srcIP)) + logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ host4 %s", srcIP)) return &streamProps{isTapTarget: true, isOutgoing: true} } return &streamProps{isTapTarget: false, isOutgoing: false} } else { - rlog.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort)) + logger.Log.Debugf("getStreamProps %s", fmt.Sprintf("+ notHost3 %s:%s -> %s:%s", srcIP, srcPort, dstIP, dstPort)) return &streamProps{isTapTarget: true} } }