mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-04-28 20:06:36 +00:00
* close gopacket conn immediately * increase last ack threshold to 3 seconds * remove empty condition * add periodic stats * protect connections from concurrent updates * implement basic throttling base on live streams count * remove assembler mutex * pr fixes * change max conns default to 500 * create connectionId type * fix linter
160 lines
4.4 KiB
Go
160 lines
4.4 KiB
Go
package tap
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/up9inc/mizu/logger"
|
|
"github.com/up9inc/mizu/tap/api"
|
|
v1 "k8s.io/api/core/v1"
|
|
|
|
"github.com/google/gopacket"
|
|
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
|
"github.com/google/gopacket/reassembly"
|
|
)
|
|
|
|
/*
|
|
* The TCP factory: returns a new Stream
|
|
* Implements gopacket.reassembly.StreamFactory interface (New)
|
|
* Generates a new tcp stream for each new tcp connection. Closes the stream when the connection closes.
|
|
*/
|
|
type tcpStreamFactory struct {
|
|
wg sync.WaitGroup
|
|
emitter api.Emitter
|
|
streamsMap api.TcpStreamMap
|
|
ownIps []string
|
|
opts *TapOpts
|
|
streamsCallbacks tcpStreamCallbacks
|
|
}
|
|
|
|
func NewTcpStreamFactory(emitter api.Emitter, streamsMap api.TcpStreamMap, opts *TapOpts, streamsCallbacks tcpStreamCallbacks) *tcpStreamFactory {
|
|
var ownIps []string
|
|
|
|
if localhostIPs, err := getLocalhostIPs(); err != nil {
|
|
// TODO: think this over
|
|
logger.Log.Info("Failed to get self IP addresses")
|
|
logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
|
|
ownIps = make([]string, 0)
|
|
} else {
|
|
ownIps = localhostIPs
|
|
}
|
|
|
|
return &tcpStreamFactory{
|
|
emitter: emitter,
|
|
streamsMap: streamsMap,
|
|
ownIps: ownIps,
|
|
opts: opts,
|
|
streamsCallbacks: streamsCallbacks,
|
|
}
|
|
}
|
|
|
|
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcpLayer *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
|
|
fsmOptions := reassembly.TCPSimpleFSMOptions{
|
|
SupportMissingEstablishment: *allowmissinginit,
|
|
}
|
|
srcIp := net.Src().String()
|
|
dstIp := net.Dst().String()
|
|
srcPort := transport.Src().String()
|
|
dstPort := transport.Dst().String()
|
|
|
|
props := factory.getStreamProps(srcIp, srcPort, dstIp, dstPort)
|
|
isTapTarget := props.isTapTarget
|
|
connectionId := getConnectionId(srcIp, srcPort, dstIp, dstPort)
|
|
stream := NewTcpStream(isTapTarget, factory.streamsMap, getPacketOrigin(ac), connectionId, factory.streamsCallbacks)
|
|
reassemblyStream := NewTcpReassemblyStream(fmt.Sprintf("%s:%s", net, transport), tcpLayer, fsmOptions, stream)
|
|
if stream.GetIsTapTarget() {
|
|
stream.setId(factory.streamsMap.NextId())
|
|
for _, extension := range extensions {
|
|
counterPair := &api.CounterPair{
|
|
Request: 0,
|
|
Response: 0,
|
|
}
|
|
stream.addCounterPair(counterPair)
|
|
|
|
reqResMatcher := extension.Dissector.NewResponseRequestMatcher()
|
|
stream.addReqResMatcher(reqResMatcher)
|
|
}
|
|
|
|
stream.client = NewTcpReader(
|
|
fmt.Sprintf("%s %s", net, transport),
|
|
&api.TcpID{
|
|
SrcIP: srcIp,
|
|
DstIP: dstIp,
|
|
SrcPort: srcPort,
|
|
DstPort: dstPort,
|
|
},
|
|
stream,
|
|
true,
|
|
props.isOutgoing,
|
|
factory.emitter,
|
|
)
|
|
|
|
stream.server = NewTcpReader(
|
|
fmt.Sprintf("%s %s", net, transport),
|
|
&api.TcpID{
|
|
SrcIP: net.Dst().String(),
|
|
DstIP: net.Src().String(),
|
|
SrcPort: transport.Dst().String(),
|
|
DstPort: transport.Src().String(),
|
|
},
|
|
stream,
|
|
false,
|
|
props.isOutgoing,
|
|
factory.emitter,
|
|
)
|
|
|
|
factory.streamsMap.Store(stream.getId(), stream)
|
|
|
|
factory.wg.Add(2)
|
|
go stream.client.run(filteringOptions, &factory.wg)
|
|
go stream.server.run(filteringOptions, &factory.wg)
|
|
}
|
|
return reassemblyStream
|
|
}
|
|
|
|
func (factory *tcpStreamFactory) WaitGoRoutines() {
|
|
factory.wg.Wait()
|
|
}
|
|
|
|
func inArrayPod(pods []v1.Pod, address string) bool {
|
|
for _, pod := range pods {
|
|
if pod.Status.PodIP == address {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
|
|
if factory.opts.HostMode {
|
|
if inArrayPod(tapTargets, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
|
|
return &streamProps{isTapTarget: true, isOutgoing: false}
|
|
} else if inArrayPod(tapTargets, dstIP) {
|
|
return &streamProps{isTapTarget: true, isOutgoing: false}
|
|
} else if inArrayPod(tapTargets, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
|
|
return &streamProps{isTapTarget: true, isOutgoing: true}
|
|
} else if inArrayPod(tapTargets, srcIP) {
|
|
return &streamProps{isTapTarget: true, isOutgoing: true}
|
|
}
|
|
return &streamProps{isTapTarget: false, isOutgoing: false}
|
|
} else {
|
|
return &streamProps{isTapTarget: true}
|
|
}
|
|
}
|
|
|
|
func getPacketOrigin(ac reassembly.AssemblerContext) api.Capture {
|
|
c, ok := ac.(*context)
|
|
|
|
if !ok {
|
|
// If ac is not our context, fallback to Pcap
|
|
return api.Pcap
|
|
}
|
|
|
|
return c.Origin
|
|
}
|
|
|
|
type streamProps struct {
|
|
isTapTarget bool
|
|
isOutgoing bool
|
|
}
|