From 32a08dad295861b41cba40b232b044f03bb7ce40 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20J=C3=B6hren?= Date: Tue, 22 Nov 2022 17:35:32 +0100 Subject: [PATCH] Fix `fatal error: concurrent map read and map write` (#1224) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: M. Mert Yıldıran --- tap/tcp_assembler.go | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/tap/tcp_assembler.go b/tap/tcp_assembler.go index 6d20f6ea3..1052e8133 100644 --- a/tap/tcp_assembler.go +++ b/tap/tcp_assembler.go @@ -3,14 +3,15 @@ package tap import ( "encoding/hex" "fmt" + lru "github.com/hashicorp/golang-lru" "os" "os/signal" + "sync" "time" "github.com/google/gopacket" "github.com/google/gopacket/layers" "github.com/google/gopacket/reassembly" - "github.com/hashicorp/golang-lru/simplelru" "github.com/kubeshark/kubeshark/logger" "github.com/kubeshark/kubeshark/tap/api" "github.com/kubeshark/kubeshark/tap/dbgctl" @@ -40,7 +41,8 @@ type tcpAssembler struct { streamPool *reassembly.StreamPool streamFactory *tcpStreamFactory ignoredPorts []uint16 - lastClosedConnections *simplelru.LRU // Actual type is map[string]int64 which is "connId -> lastSeen" + lock sync.RWMutex + lastClosedConnections *lru.Cache // Actual type is map[string]int64 which is "connId -> lastSeen" liveConnections map[connectionId]bool maxLiveStreams int staleConnectionTimeout time.Duration @@ -64,7 +66,7 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp OutputChannel: outputItems, } - lastClosedConnections, err := simplelru.NewLRU(lastClosedConnectionsMaxItems, func(key interface{}, value interface{}) {}) + lastClosedConnections, err := lru.NewWithEvict(lastClosedConnectionsMaxItems, func(key interface{}, value interface{}) {}) if err != nil { return nil, err @@ -185,15 +187,22 @@ func (a *tcpAssembler) processTcpPacket(origin api.Capture, packet gopacket.Pack } func (a *tcpAssembler) tcpStreamCreated(stream *tcpStream) { + a.lock.Lock() a.liveConnections[stream.connectionId] = true + a.lock.Unlock() } func (a *tcpAssembler) tcpStreamClosed(stream *tcpStream) { + a.lock.Lock() a.lastClosedConnections.Add(stream.connectionId, time.Now().UnixMilli()) delete(a.liveConnections, stream.connectionId) + a.lock.Unlock() } func (a *tcpAssembler) isRecentlyClosed(c connectionId) bool { + a.lock.Lock() + defer a.lock.Unlock() + if closedTimeMillis, ok := a.lastClosedConnections.Get(c); ok { timeSinceClosed := time.Since(time.UnixMilli(closedTimeMillis.(int64))) if timeSinceClosed < lastAckThreshold { @@ -204,6 +213,9 @@ func (a *tcpAssembler) isRecentlyClosed(c connectionId) bool { } func (a *tcpAssembler) shouldThrottle(c connectionId) bool { + a.lock.Lock() + defer a.lock.Unlock() + if _, ok := a.liveConnections[c]; ok { return false }