Files
kubeshark/tap/tcp_stream.go
David Levanon e696851261 Tapper Refactor (#396)
* introduce tcp_assembler and tcp_packet_source - the motivation is to … (#380)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

* introduce tcp_assembler and tcp_packet_source - the motivation is to split the actual packet sources from the assembler, so we can have a single thread for the assembly which is separated from packet source threads

* make struts private at this point - planning to move some packages to their own package so we can utilize encapsulation

* move context to tcp_assembly + fix error check of tcp source packet

* use param instead of gloab flag for ipdefrag

* fix a typo

* remove unused pid param

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* extract stats functions out of the main tapping function (#381)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

* introduce tcp_assembler and tcp_packet_source - the motivation is to split the actual packet sources from the assembler, so we can have a single thread for the assembly which is separated from packet source threads

* make struts private at this point - planning to move some packages to their own package so we can utilize encapsulation

* extract stats functions out of the main tapping function

* move context to tcp_assembly + fix error check of tcp source packet

* use param instead of gloab flag for ipdefrag

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* Feature/tapper refactor i/internal tapper stats (#384)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

* introduce tcp_assembler and tcp_packet_source - the motivation is to split the actual packet sources from the assembler, so we can have a single thread for the assembly which is separated from packet source threads

* make struts private at this point - planning to move some packages to their own package so we can utilize encapsulation

* extract stats functions out of the main tapping function

* move context to tcp_assembly + fix error check of tcp source packet

* use param instead of gloab flag for ipdefrag

* introduce internal tapper stats

* minor fixes for errors map and internal stats

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* Feature/tapper refactor i/diagnose package (#386)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

* introduce tcp_assembler and tcp_packet_source - the motivation is to split the actual packet sources from the assembler, so we can have a single thread for the assembly which is separated from packet source threads

* make struts private at this point - planning to move some packages to their own package so we can utilize encapsulation

* extract stats functions out of the main tapping function

* move context to tcp_assembly + fix error check of tcp source packet

* use param instead of gloab flag for ipdefrag

* introduce internal tapper stats

* minor fixes for errors map and internal stats

* move errors map + app stats + internal stats + periodic tasks to diagnose package

* initialize tapper internal stats

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move tcp packet source to its packet (#387)

* add passive-tapper main tester (#353)

* add passive-tapper main tester

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* rename main to tester

* build extenssions as part of the tester launch

* add a README to the tester

* solving go.mod and .sum conflicts with addition of go-errors

* trivial warning fixes (#354)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* disable host mode for tester - to avoid filterAuthorities

* tcp streams map (#355)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* change rlog to mizu logger

* errors map (#356)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* change int to uint - errorsmap

* change from int to uint

* Change errorsMap.nErrors to uint.

* change errors map to mizu logger instead of rlog

* init mizu logger in tester + fix errormap declaration

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* move own ips to tcp stream factory (#358)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* Feature/tapper refactor i/move own ips to tcp stream factory (#379)

* add passive-tapper main tester

* trivial warning fixes

* add errors to go.sum of mizu agent

* tcp streams map

* disable host mode for tester - to avoid filterAuthorities

* set tcp streams map for tcp stream factory

* errors map

* move own ips to tcp stream factory

* fix ownips compilation issue

* introduce tcp_assembler and tcp_packet_source - the motivation is to split the actual packet sources from the assembler, so we can have a single thread for the assembly which is separated from packet source threads

* make struts private at this point - planning to move some packages to their own package so we can utilize encapsulation

* extract stats functions out of the main tapping function

* move context to tcp_assembly + fix error check of tcp source packet

* use param instead of gloab flag for ipdefrag

* introduce internal tapper stats

* minor fixes for errors map and internal stats

* move errors map + app stats + internal stats + periodic tasks to diagnose package

* move tcp packet source to its packet

* initialize tapper internal stats

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>

* Fix coding style

* Remove `tap/internal_stats.go`

* make channel between input and assembler blocking - to preserve the same behaviour we have before the refactor

Co-authored-by: Nimrod Gilboa Markevich <nimrod@up9.com>
Co-authored-by: M. Mert Yildiran <mehmet@up9.com>
2021-10-25 13:59:06 +03:00

208 lines
6.1 KiB
Go

package tap
import (
"encoding/binary"
"fmt"
"sync"
"github.com/google/gopacket"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/reassembly"
"github.com/up9inc/mizu/tap/api"
"github.com/up9inc/mizu/tap/diagnose"
)
/* It's a connection (bidirectional)
* Implements gopacket.reassembly.Stream interface (Accept, ReassembledSG, ReassemblyComplete)
* ReassembledSG gets called when new reassembled data is ready (i.e. bytes in order, no duplicates, complete)
* In our implementation, we pass information from ReassembledSG to the tcpReader through a shared channel.
*/
type tcpStream struct {
id int64
isClosed bool
superIdentifier *api.SuperIdentifier
tcpstate *reassembly.TCPSimpleFSM
fsmerr bool
optchecker reassembly.TCPOptionCheck
net, transport gopacket.Flow
isDNS bool
isTapTarget bool
clients []tcpReader
servers []tcpReader
ident string
sync.Mutex
streamsMap *tcpStreamMap
}
func (t *tcpStream) Accept(tcp *layers.TCP, ci gopacket.CaptureInfo, dir reassembly.TCPFlowDirection, nextSeq reassembly.Sequence, start *bool, ac reassembly.AssemblerContext) bool {
// FSM
if !t.tcpstate.CheckState(tcp, dir) {
diagnose.TapErrors.SilentError("FSM-rejection", "%s: Packet rejected by FSM (state:%s)", t.ident, t.tcpstate.String())
diagnose.InternalStats.RejectFsm++
if !t.fsmerr {
t.fsmerr = true
diagnose.InternalStats.RejectConnFsm++
}
if !*ignorefsmerr {
return false
}
}
// Options
err := t.optchecker.Accept(tcp, ci, dir, nextSeq, start)
if err != nil {
diagnose.TapErrors.SilentError("OptionChecker-rejection", "%s: Packet rejected by OptionChecker: %s", t.ident, err)
diagnose.InternalStats.RejectOpt++
if !*nooptcheck {
return false
}
}
// Checksum
accept := true
if *checksum {
c, err := tcp.ComputeChecksum()
if err != nil {
diagnose.TapErrors.SilentError("ChecksumCompute", "%s: Got error computing checksum: %s", t.ident, err)
accept = false
} else if c != 0x0 {
diagnose.TapErrors.SilentError("Checksum", "%s: Invalid checksum: 0x%x", t.ident, c)
accept = false
}
}
if !accept {
diagnose.InternalStats.RejectOpt++
}
return accept
}
func (t *tcpStream) ReassembledSG(sg reassembly.ScatterGather, ac reassembly.AssemblerContext) {
dir, start, end, skip := sg.Info()
length, saved := sg.Lengths()
// update stats
sgStats := sg.Stats()
if skip > 0 {
diagnose.InternalStats.MissedBytes += skip
}
diagnose.InternalStats.Sz += length - saved
diagnose.InternalStats.Pkt += sgStats.Packets
if sgStats.Chunks > 1 {
diagnose.InternalStats.Reassembled++
}
diagnose.InternalStats.OutOfOrderPackets += sgStats.QueuedPackets
diagnose.InternalStats.OutOfOrderBytes += sgStats.QueuedBytes
if length > diagnose.InternalStats.BiggestChunkBytes {
diagnose.InternalStats.BiggestChunkBytes = length
}
if sgStats.Packets > diagnose.InternalStats.BiggestChunkPackets {
diagnose.InternalStats.BiggestChunkPackets = sgStats.Packets
}
if sgStats.OverlapBytes != 0 && sgStats.OverlapPackets == 0 {
// In the original example this was handled with panic().
// I don't know what this error means or how to handle it properly.
diagnose.TapErrors.SilentError("Invalid-Overlap", "bytes:%d, pkts:%d", sgStats.OverlapBytes, sgStats.OverlapPackets)
}
diagnose.InternalStats.OverlapBytes += sgStats.OverlapBytes
diagnose.InternalStats.OverlapPackets += sgStats.OverlapPackets
var ident string
if dir == reassembly.TCPDirClientToServer {
ident = fmt.Sprintf("%v %v(%s): ", t.net, t.transport, dir)
} else {
ident = fmt.Sprintf("%v %v(%s): ", t.net.Reverse(), t.transport.Reverse(), dir)
}
diagnose.TapErrors.Debug("%s: SG reassembled packet with %d bytes (start:%v,end:%v,skip:%d,saved:%d,nb:%d,%d,overlap:%d,%d)", ident, length, start, end, skip, saved, sgStats.Packets, sgStats.Chunks, sgStats.OverlapBytes, sgStats.OverlapPackets)
if skip == -1 && *allowmissinginit {
// this is allowed
} else if skip != 0 {
// Missing bytes in stream: do not even try to parse it
return
}
data := sg.Fetch(length)
if t.isDNS {
dns := &layers.DNS{}
var decoded []gopacket.LayerType
if len(data) < 2 {
if len(data) > 0 {
sg.KeepFrom(0)
}
return
}
dnsSize := binary.BigEndian.Uint16(data[:2])
missing := int(dnsSize) - len(data[2:])
diagnose.TapErrors.Debug("dnsSize: %d, missing: %d", dnsSize, missing)
if missing > 0 {
diagnose.TapErrors.Debug("Missing some bytes: %d", missing)
sg.KeepFrom(0)
return
}
p := gopacket.NewDecodingLayerParser(layers.LayerTypeDNS, dns)
err := p.DecodeLayers(data[2:], &decoded)
if err != nil {
diagnose.TapErrors.SilentError("DNS-parser", "Failed to decode DNS: %v", err)
} else {
diagnose.TapErrors.Debug("DNS: %s", gopacket.LayerDump(dns))
}
if len(data) > 2+int(dnsSize) {
sg.KeepFrom(2 + int(dnsSize))
}
} else if t.isTapTarget {
if length > 0 {
// This is where we pass the reassembled information onwards
// This channel is read by an tcpReader object
diagnose.AppStats.IncReassembledTcpPayloadsCount()
timestamp := ac.GetCaptureInfo().Timestamp
if dir == reassembly.TCPDirClientToServer {
for i := range t.clients {
reader := &t.clients[i]
reader.Lock()
if !reader.isClosed {
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
}
reader.Unlock()
}
} else {
for i := range t.servers {
reader := &t.servers[i]
reader.Lock()
if !reader.isClosed {
reader.msgQueue <- tcpReaderDataMsg{data, timestamp}
}
reader.Unlock()
}
}
}
}
}
func (t *tcpStream) ReassemblyComplete(ac reassembly.AssemblerContext) bool {
diagnose.TapErrors.Debug("%s: Connection closed", t.ident)
if t.isTapTarget && !t.isClosed {
t.Close()
}
// do not remove the connection to allow last ACK
return false
}
func (t *tcpStream) Close() {
shouldReturn := false
t.Lock()
if t.isClosed {
shouldReturn = true
} else {
t.isClosed = true
}
t.Unlock()
if shouldReturn {
return
}
t.streamsMap.Delete(t.id)
for i := range t.clients {
reader := &t.clients[i]
reader.Close()
}
for i := range t.servers {
reader := &t.servers[i]
reader.Close()
}
}