mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-25 07:45:01 +00:00
Fix fatal error: concurrent map read and map write
(#1224)
Co-authored-by: M. Mert Yıldıran <me@mertyildiran.com>
This commit is contained in:
parent
71d1323640
commit
32a08dad29
@ -3,14 +3,15 @@ package tap
|
|||||||
import (
|
import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
lru "github.com/hashicorp/golang-lru"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/google/gopacket"
|
"github.com/google/gopacket"
|
||||||
"github.com/google/gopacket/layers"
|
"github.com/google/gopacket/layers"
|
||||||
"github.com/google/gopacket/reassembly"
|
"github.com/google/gopacket/reassembly"
|
||||||
"github.com/hashicorp/golang-lru/simplelru"
|
|
||||||
"github.com/kubeshark/kubeshark/logger"
|
"github.com/kubeshark/kubeshark/logger"
|
||||||
"github.com/kubeshark/kubeshark/tap/api"
|
"github.com/kubeshark/kubeshark/tap/api"
|
||||||
"github.com/kubeshark/kubeshark/tap/dbgctl"
|
"github.com/kubeshark/kubeshark/tap/dbgctl"
|
||||||
@ -40,7 +41,8 @@ type tcpAssembler struct {
|
|||||||
streamPool *reassembly.StreamPool
|
streamPool *reassembly.StreamPool
|
||||||
streamFactory *tcpStreamFactory
|
streamFactory *tcpStreamFactory
|
||||||
ignoredPorts []uint16
|
ignoredPorts []uint16
|
||||||
lastClosedConnections *simplelru.LRU // Actual type is map[string]int64 which is "connId -> lastSeen"
|
lock sync.RWMutex
|
||||||
|
lastClosedConnections *lru.Cache // Actual type is map[string]int64 which is "connId -> lastSeen"
|
||||||
liveConnections map[connectionId]bool
|
liveConnections map[connectionId]bool
|
||||||
maxLiveStreams int
|
maxLiveStreams int
|
||||||
staleConnectionTimeout time.Duration
|
staleConnectionTimeout time.Duration
|
||||||
@ -64,7 +66,7 @@ func NewTcpAssembler(outputItems chan *api.OutputChannelItem, streamsMap api.Tcp
|
|||||||
OutputChannel: outputItems,
|
OutputChannel: outputItems,
|
||||||
}
|
}
|
||||||
|
|
||||||
lastClosedConnections, err := simplelru.NewLRU(lastClosedConnectionsMaxItems, func(key interface{}, value interface{}) {})
|
lastClosedConnections, err := lru.NewWithEvict(lastClosedConnectionsMaxItems, func(key interface{}, value interface{}) {})
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -185,15 +187,22 @@ func (a *tcpAssembler) processTcpPacket(origin api.Capture, packet gopacket.Pack
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *tcpAssembler) tcpStreamCreated(stream *tcpStream) {
|
func (a *tcpAssembler) tcpStreamCreated(stream *tcpStream) {
|
||||||
|
a.lock.Lock()
|
||||||
a.liveConnections[stream.connectionId] = true
|
a.liveConnections[stream.connectionId] = true
|
||||||
|
a.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *tcpAssembler) tcpStreamClosed(stream *tcpStream) {
|
func (a *tcpAssembler) tcpStreamClosed(stream *tcpStream) {
|
||||||
|
a.lock.Lock()
|
||||||
a.lastClosedConnections.Add(stream.connectionId, time.Now().UnixMilli())
|
a.lastClosedConnections.Add(stream.connectionId, time.Now().UnixMilli())
|
||||||
delete(a.liveConnections, stream.connectionId)
|
delete(a.liveConnections, stream.connectionId)
|
||||||
|
a.lock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *tcpAssembler) isRecentlyClosed(c connectionId) bool {
|
func (a *tcpAssembler) isRecentlyClosed(c connectionId) bool {
|
||||||
|
a.lock.Lock()
|
||||||
|
defer a.lock.Unlock()
|
||||||
|
|
||||||
if closedTimeMillis, ok := a.lastClosedConnections.Get(c); ok {
|
if closedTimeMillis, ok := a.lastClosedConnections.Get(c); ok {
|
||||||
timeSinceClosed := time.Since(time.UnixMilli(closedTimeMillis.(int64)))
|
timeSinceClosed := time.Since(time.UnixMilli(closedTimeMillis.(int64)))
|
||||||
if timeSinceClosed < lastAckThreshold {
|
if timeSinceClosed < lastAckThreshold {
|
||||||
@ -204,6 +213,9 @@ func (a *tcpAssembler) isRecentlyClosed(c connectionId) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (a *tcpAssembler) shouldThrottle(c connectionId) bool {
|
func (a *tcpAssembler) shouldThrottle(c connectionId) bool {
|
||||||
|
a.lock.Lock()
|
||||||
|
defer a.lock.Unlock()
|
||||||
|
|
||||||
if _, ok := a.liveConnections[c]; ok {
|
if _, ok := a.liveConnections[c]; ok {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user