mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-03 17:52:40 +00:00
* Spawn only two Goroutines per TCP stream * Fix the linter error * Use `isProtocolIdentified` method instead * Fix the `Read` method of `tcpReader` * Remove unnecessary `append` * Copy to buffer only a message is received * Remove `exhaustBuffer` field and add `rewind` function * Rename `buffer` field to `pastData` * Update tap/tcp_reader.go Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com> * Use `copy` instead of assignment * No lint * #run_acceptance_tests * Fix `rewind` #run_acceptance_tests * Fix the buffering algorithm #run_acceptance_tests * Add `TODO` * Fix the problems in AMQP and Kafka #run_acceptance_tests * Use `*bytes.Buffer` instead of `[]api.TcpReaderDataMsg` #run_acceptance_tests * Have a single `*bytes.Buffer` * Revert "Have a single `*bytes.Buffer`" This reverts commitfad96a288a
. * Revert "Use `*bytes.Buffer` instead of `[]api.TcpReaderDataMsg` #run_acceptance_tests" This reverts commit0fc70bffe2
. * Fix the early timing out issue #run_acceptance_tests * Remove `NewBytes()` method * Update the `NewTcpReader` method signature #run_acceptance_tests * #run_acceptance_tests * #run_acceptance_tests * #run_acceptance_tests Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
157 lines
4.1 KiB
Go
157 lines
4.1 KiB
Go
package tap
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
v1 "k8s.io/api/core/v1"
|
|
|
|
"github.com/google/gopacket"
|
|
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
|
"github.com/google/gopacket/reassembly"
|
|
)
|
|
|
|
/*
|
|
* The TCP factory: returns a new Stream
|
|
* Implements gopacket.reassembly.StreamFactory interface (New)
|
|
* Generates a new tcp stream for each new tcp connection. Closes the stream when the connection closes.
|
|
*/
|
|
type tcpStreamFactory struct {
|
|
wg sync.WaitGroup
|
|
emitter api.Emitter
|
|
streamsMap api.TcpStreamMap
|
|
ownIps []string
|
|
opts *TapOpts
|
|
}
|
|
|
|
func NewTcpStreamFactory(emitter api.Emitter, streamsMap api.TcpStreamMap, opts *TapOpts) *tcpStreamFactory {
|
|
var ownIps []string
|
|
|
|
if localhostIPs, err := getLocalhostIPs(); err != nil {
|
|
// TODO: think this over
|
|
logger.Log.Info("Failed to get self IP addresses")
|
|
logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
|
|
ownIps = make([]string, 0)
|
|
} else {
|
|
ownIps = localhostIPs
|
|
}
|
|
|
|
return &tcpStreamFactory{
|
|
emitter: emitter,
|
|
streamsMap: streamsMap,
|
|
ownIps: ownIps,
|
|
opts: opts,
|
|
}
|
|
}
|
|
|
|
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
|
|
fsmOptions := reassembly.TCPSimpleFSMOptions{
|
|
SupportMissingEstablishment: *allowmissinginit,
|
|
}
|
|
srcIp := net.Src().String()
|
|
dstIp := net.Dst().String()
|
|
srcPort := transport.Src().String()
|
|
dstPort := transport.Dst().String()
|
|
|
|
props := factory.getStreamProps(srcIp, srcPort, dstIp, dstPort)
|
|
isTapTarget := props.isTapTarget
|
|
stream := NewTcpStream(isTapTarget, factory.streamsMap, getPacketOrigin(ac))
|
|
reassemblyStream := NewTcpReassemblyStream(fmt.Sprintf("%s:%s", net, transport), tcpLayer, fsmOptions, stream)
|
|
if stream.GetIsTapTarget() {
|
|
stream.setId(factory.streamsMap.NextId())
|
|
for _, extension := range extensions {
|
|
counterPair := &api.CounterPair{
|
|
Request: 0,
|
|
Response: 0,
|
|
}
|
|
stream.addCounterPair(counterPair)
|
|
|
|
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
|
|
stream.addReqResMatcher(reqResMatcher)
|
|
}
|
|
|
|
stream.client = NewTcpReader(
|
|
fmt.Sprintf("%s %s", net, transport),
|
|
&api.TcpID{
|
|
SrcIP: srcIp,
|
|
DstIP: dstIp,
|
|
SrcPort: srcPort,
|
|
DstPort: dstPort,
|
|
},
|
|
stream,
|
|
true,
|
|
props.isOutgoing,
|
|
factory.emitter,
|
|
)
|
|
|
|
stream.server = NewTcpReader(
|
|
fmt.Sprintf("%s %s", net, transport),
|
|
&api.TcpID{
|
|
SrcIP: net.Dst().String(),
|
|
DstIP: net.Src().String(),
|
|
SrcPort: transport.Dst().String(),
|
|
DstPort: transport.Src().String(),
|
|
},
|
|
stream,
|
|
false,
|
|
props.isOutgoing,
|
|
factory.emitter,
|
|
)
|
|
|
|
factory.streamsMap.Store(stream.getId(), stream)
|
|
|
|
factory.wg.Add(2)
|
|
go stream.client.run(filteringOptions, &factory.wg)
|
|
go stream.server.run(filteringOptions, &factory.wg)
|
|
}
|
|
return reassemblyStream
|
|
}
|
|
|
|
func (factory *tcpStreamFactory) WaitGoRoutines() {
|
|
factory.wg.Wait()
|
|
}
|
|
|
|
func inArrayPod(pods []v1.Pod, address string) bool {
|
|
for _, pod := range pods {
|
|
if pod.Status.PodIP == address {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
|
|
if factory.opts.HostMode {
|
|
if inArrayPod(tapTargets, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
|
|
return &streamProps{isTapTarget: true, isOutgoing: false}
|
|
} else if inArrayPod(tapTargets, dstIP) {
|
|
return &streamProps{isTapTarget: true, isOutgoing: false}
|
|
} else if inArrayPod(tapTargets, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
|
|
return &streamProps{isTapTarget: true, isOutgoing: true}
|
|
} else if inArrayPod(tapTargets, srcIP) {
|
|
return &streamProps{isTapTarget: true, isOutgoing: true}
|
|
}
|
|
return &streamProps{isTapTarget: false, isOutgoing: false}
|
|
} else {
|
|
return &streamProps{isTapTarget: true}
|
|
}
|
|
}
|
|
|
|
func getPacketOrigin(ac reassembly.AssemblerContext) api.Capture {
|
|
c, ok := ac.(*context)
|
|
|
|
if !ok {
|
|
// If ac is not our context, fallback to Pcap
|
|
return api.Pcap
|
|
}
|
|
|
|
return c.Origin
|
|
}
|
|
|
|
type streamProps struct {
|
|
isTapTarget bool
|
|
isOutgoing bool
|
|
}
|