mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-28 17:19:44 +00:00
Read from service mesh network namespaces upon update (#944)
This commit is contained in:
parent
8eeb0e54c9
commit
27a73e21fb
@ -112,7 +112,7 @@ func UpdateTapTargets(newTapTargets []v1.Pod) {
|
||||
|
||||
tapTargets = newTapTargets
|
||||
|
||||
packetSourceManager.UpdatePods(tapTargets)
|
||||
packetSourceManager.UpdatePods(tapTargets, !*nodefrag, mainPacketInputChan)
|
||||
|
||||
if tlsTapperInstance != nil {
|
||||
if err := tlstapper.UpdateTapTargets(tlsTapperInstance, &tapTargets, *procfs); err != nil {
|
||||
@ -198,12 +198,8 @@ func initializePacketSources() error {
|
||||
}
|
||||
|
||||
var err error
|
||||
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil {
|
||||
return err
|
||||
} else {
|
||||
packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan)
|
||||
return nil
|
||||
}
|
||||
packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour, !*nodefrag, mainPacketInputChan)
|
||||
return err
|
||||
}
|
||||
|
||||
func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) (*tcpStreamMap, *tcpAssembler) {
|
||||
|
@ -24,7 +24,7 @@ type PacketSourceManager struct {
|
||||
}
|
||||
|
||||
func NewPacketSourceManager(procfs string, filename string, interfaceName string,
|
||||
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) {
|
||||
mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) (*PacketSourceManager, error) {
|
||||
hostSource, err := newHostPacketSource(filename, interfaceName, behaviour)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -43,7 +43,7 @@ func NewPacketSourceManager(procfs string, filename string, interfaceName string
|
||||
behaviour: behaviour,
|
||||
}
|
||||
|
||||
sourceManager.UpdatePods(pods)
|
||||
go hostSource.readPackets(ipdefrag, packets)
|
||||
return sourceManager, nil
|
||||
}
|
||||
|
||||
@ -64,16 +64,16 @@ func newHostPacketSource(filename string, interfaceName string,
|
||||
return source, nil
|
||||
}
|
||||
|
||||
func (m *PacketSourceManager) UpdatePods(pods []v1.Pod) {
|
||||
func (m *PacketSourceManager) UpdatePods(pods []v1.Pod, ipdefrag bool, packets chan<- TcpPacketInfo) {
|
||||
if m.config.mtls {
|
||||
m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.behaviour)
|
||||
m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.behaviour, ipdefrag, packets)
|
||||
}
|
||||
|
||||
m.setBPFFilter(pods)
|
||||
}
|
||||
|
||||
func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour) {
|
||||
interfaceName string, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) {
|
||||
|
||||
relevantPids := m.getRelevantPids(procfs, pods)
|
||||
logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources)
|
||||
@ -90,6 +90,7 @@ func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod,
|
||||
source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour)
|
||||
|
||||
if err == nil {
|
||||
go source.readPackets(ipdefrag, packets)
|
||||
m.sources[pid] = source
|
||||
}
|
||||
}
|
||||
@ -153,12 +154,6 @@ func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *PacketSourceManager) ReadPackets(ipdefrag bool, packets chan<- TcpPacketInfo) {
|
||||
for _, src := range m.sources {
|
||||
go src.readPackets(ipdefrag, packets)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *PacketSourceManager) Close() {
|
||||
for _, src := range m.sources {
|
||||
src.close()
|
||||
|
Loading…
Reference in New Issue
Block a user