kubeshark/tap/tcp_stream_factory.go
RamiBerm cfe9e863b7
TRA-4065 support inflight tap target update (#556)
* WIP

* WIP

* Update main.go

* Update main.go and passive_tapper.go

* Update passive_tapper.go

* Update passive_tapper.go

* Update passive_tapper.go

* Update passive_tapper.go
2021-12-27 11:50:34 +02:00

182 lines
5.4 KiB
Go

package tap
import (
"fmt"
"sync"
"time"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/api"
v1 "k8s.io/api/core/v1"
"github.com/google/gopacket"
"github.com/google/gopacket/layers" // pulls in all layers decoders
"github.com/google/gopacket/reassembly"
)
/*
* The TCP factory: returns a new Stream
* Implements gopacket.reassembly.StreamFactory interface (New)
* Generates a new tcp stream for each new tcp connection. Closes the stream when the connection closes.
*/
type tcpStreamFactory struct {
wg sync.WaitGroup
outboundLinkWriter *OutboundLinkWriter
Emitter api.Emitter
streamsMap *tcpStreamMap
ownIps []string
opts *TapOpts
}
type tcpStreamWrapper struct {
stream *tcpStream
createdAt time.Time
}
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
var ownIps []string
if localhostIPs, err := getLocalhostIPs(); err != nil {
// TODO: think this over
logger.Log.Info("Failed to get self IP addresses")
logger.Log.Errorf("Getting-Self-Address", "Error getting self ip address: %s (%v,%+v)", err, err, err)
ownIps = make([]string, 0)
} else {
ownIps = localhostIPs
}
return &tcpStreamFactory{
Emitter: emitter,
streamsMap: streamsMap,
ownIps: ownIps,
opts: opts,
}
}
func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.TCP, ac reassembly.AssemblerContext) reassembly.Stream {
fsmOptions := reassembly.TCPSimpleFSMOptions{
SupportMissingEstablishment: *allowmissinginit,
}
srcIp := net.Src().String()
dstIp := net.Dst().String()
srcPort := transport.Src().String()
dstPort := transport.Dst().String()
// if factory.shouldNotifyOnOutboundLink(dstIp, dstPort) {
// factory.outboundLinkWriter.WriteOutboundLink(net.Src().String(), dstIp, dstPort, "", "")
// }
props := factory.getStreamProps(srcIp, srcPort, dstIp, dstPort)
isTapTarget := props.isTapTarget
stream := &tcpStream{
net: net,
transport: transport,
isDNS: tcp.SrcPort == 53 || tcp.DstPort == 53,
isTapTarget: isTapTarget,
tcpstate: reassembly.NewTCPSimpleFSM(fsmOptions),
ident: fmt.Sprintf("%s:%s", net, transport),
optchecker: reassembly.NewTCPOptionCheck(),
superIdentifier: &api.SuperIdentifier{},
streamsMap: factory.streamsMap,
}
if stream.isTapTarget {
stream.id = factory.streamsMap.nextId()
for i, extension := range extensions {
counterPair := &api.CounterPair{
Request: 0,
Response: 0,
}
stream.clients = append(stream.clients, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: srcIp,
DstIP: dstIp,
SrcPort: srcPort,
DstPort: dstPort,
},
parent: stream,
isClient: true,
isOutgoing: props.isOutgoing,
outboundLinkWriter: factory.outboundLinkWriter,
extension: extension,
emitter: factory.Emitter,
counterPair: counterPair,
})
stream.servers = append(stream.servers, tcpReader{
msgQueue: make(chan tcpReaderDataMsg),
superTimer: &api.SuperTimer{},
ident: fmt.Sprintf("%s %s", net, transport),
tcpID: &api.TcpID{
SrcIP: net.Dst().String(),
DstIP: net.Src().String(),
SrcPort: transport.Dst().String(),
DstPort: transport.Src().String(),
},
parent: stream,
isClient: false,
isOutgoing: props.isOutgoing,
outboundLinkWriter: factory.outboundLinkWriter,
extension: extension,
emitter: factory.Emitter,
counterPair: counterPair,
})
factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
stream: stream,
createdAt: time.Now(),
})
factory.wg.Add(2)
// Start reading from channel stream.reader.bytes
go stream.clients[i].run(&factory.wg)
go stream.servers[i].run(&factory.wg)
}
}
return stream
}
func (factory *tcpStreamFactory) WaitGoRoutines() {
factory.wg.Wait()
}
func inArrayPod(pods []v1.Pod, address string) bool {
for _, pod := range pods {
if pod.Status.PodIP == address {
return true
}
}
return false
}
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
if factory.opts.HostMode {
if inArrayPod(tapTargets, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayPod(tapTargets, dstIP) {
return &streamProps{isTapTarget: true, isOutgoing: false}
} else if inArrayPod(tapTargets, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
return &streamProps{isTapTarget: true, isOutgoing: true}
} else if inArrayPod(tapTargets, srcIP) {
return &streamProps{isTapTarget: true, isOutgoing: true}
}
return &streamProps{isTapTarget: false, isOutgoing: false}
} else {
return &streamProps{isTapTarget: true}
}
}
//lint:ignore U1000 will be used in the future
func (factory *tcpStreamFactory) shouldNotifyOnOutboundLink(dstIP string, dstPort int) bool {
if inArrayInt(remoteOnlyOutboundPorts, dstPort) {
isDirectedHere := inArrayString(factory.ownIps, dstIP)
return !isDirectedHere && !isPrivateIP(dstIP)
}
return true
}
type streamProps struct {
isTapTarget bool
isOutgoing bool
}