From 2ef6afa39583ac6f3bfee4d322a3d201209c66ab Mon Sep 17 00:00:00 2001 From: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com> Date: Wed, 30 Mar 2022 17:13:56 +0300 Subject: [PATCH] Hotfix tap issues - restart service mesh tapping when tap targets change, fallback to source namespace (#953) * Read from service mesh network namespaces upon update (#944) #patch * Set the entry namespace to the source namespace if the destination is not resolved (#950) --- agent/pkg/api/main.go | 7 ++++++- tap/passive_tapper.go | 10 +++------- tap/source/packet_source_manager.go | 17 ++++++----------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index d37fbe0da..f4f83b5be 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -183,6 +183,7 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re } } else { resolvedSource = resolvedSourceObject.FullAddress + namespace = resolvedSourceObject.Namespace } unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort) @@ -194,7 +195,11 @@ func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, re } } else { resolvedDestination = resolvedDestinationObject.FullAddress - namespace = resolvedDestinationObject.Namespace + // Overwrite namespace (if it was set according to the source) + // Only overwrite if non-empty + if resolvedDestinationObject.Namespace != "" { + namespace = resolvedDestinationObject.Namespace + } } } return resolvedSource, resolvedDestination, namespace diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index e9ecf5216..57fd2a075 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -112,7 +112,7 @@ func UpdateTapTargets(newTapTargets []v1.Pod) { tapTargets = newTapTargets - packetSourceManager.UpdatePods(tapTargets) + packetSourceManager.UpdatePods(tapTargets, !*nodefrag, mainPacketInputChan) if tlsTapperInstance != nil { if err := tlstapper.UpdateTapTargets(tlsTapperInstance, &tapTargets, *procfs); err != nil { @@ -198,12 +198,8 @@ func initializePacketSources() error { } var err error - if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour); err != nil { - return err - } else { - packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan) - return nil - } + packetSourceManager, err = source.NewPacketSourceManager(*procfs, *fname, *iface, *servicemesh, tapTargets, behaviour, !*nodefrag, mainPacketInputChan) + return err } func initializePassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) (*tcpStreamMap, *tcpAssembler) { diff --git a/tap/source/packet_source_manager.go b/tap/source/packet_source_manager.go index 3fc0ee0b9..bf527ef2e 100644 --- a/tap/source/packet_source_manager.go +++ b/tap/source/packet_source_manager.go @@ -24,7 +24,7 @@ type PacketSourceManager struct { } func NewPacketSourceManager(procfs string, filename string, interfaceName string, - mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour) (*PacketSourceManager, error) { + mtls bool, pods []v1.Pod, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) (*PacketSourceManager, error) { hostSource, err := newHostPacketSource(filename, interfaceName, behaviour) if err != nil { return nil, err @@ -43,7 +43,7 @@ func NewPacketSourceManager(procfs string, filename string, interfaceName string behaviour: behaviour, } - sourceManager.UpdatePods(pods) + go hostSource.readPackets(ipdefrag, packets) return sourceManager, nil } @@ -64,16 +64,16 @@ func newHostPacketSource(filename string, interfaceName string, return source, nil } -func (m *PacketSourceManager) UpdatePods(pods []v1.Pod) { +func (m *PacketSourceManager) UpdatePods(pods []v1.Pod, ipdefrag bool, packets chan<- TcpPacketInfo) { if m.config.mtls { - m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.behaviour) + m.updateMtlsPods(m.config.procfs, pods, m.config.interfaceName, m.config.behaviour, ipdefrag, packets) } m.setBPFFilter(pods) } func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod, - interfaceName string, behaviour TcpPacketSourceBehaviour) { + interfaceName string, behaviour TcpPacketSourceBehaviour, ipdefrag bool, packets chan<- TcpPacketInfo) { relevantPids := m.getRelevantPids(procfs, pods) logger.Log.Infof("Updating mtls pods (new: %v) (current: %v)", relevantPids, m.sources) @@ -90,6 +90,7 @@ func (m *PacketSourceManager) updateMtlsPods(procfs string, pods []v1.Pod, source, err := newNetnsPacketSource(procfs, pid, interfaceName, behaviour) if err == nil { + go source.readPackets(ipdefrag, packets) m.sources[pid] = source } } @@ -153,12 +154,6 @@ func (m *PacketSourceManager) setBPFFilter(pods []v1.Pod) { } } -func (m *PacketSourceManager) ReadPackets(ipdefrag bool, packets chan<- TcpPacketInfo) { - for _, src := range m.sources { - go src.readPackets(ipdefrag, packets) - } -} - func (m *PacketSourceManager) Close() { for _, src := range m.sources { src.close()