mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-12 21:58:33 +00:00
73 lines
1.9 KiB
Go
73 lines
1.9 KiB
Go
package tap
|
|
|
|
import (
|
|
"runtime"
|
|
_debug "runtime/debug"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/up9inc/mizu/shared/logger"
|
|
"github.com/up9inc/mizu/tap/diagnose"
|
|
)
|
|
|
|
type tcpStreamMap struct {
|
|
streams *sync.Map
|
|
streamId int64
|
|
}
|
|
|
|
func NewTcpStreamMap() *tcpStreamMap {
|
|
return &tcpStreamMap{
|
|
streams: &sync.Map{},
|
|
}
|
|
}
|
|
|
|
func (streamMap *tcpStreamMap) Store(key, value interface{}) {
|
|
streamMap.streams.Store(key, value)
|
|
}
|
|
|
|
func (streamMap *tcpStreamMap) Delete(key interface{}) {
|
|
streamMap.streams.Delete(key)
|
|
}
|
|
|
|
func (streamMap *tcpStreamMap) nextId() int64 {
|
|
streamMap.streamId++
|
|
return streamMap.streamId
|
|
}
|
|
|
|
func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() {
|
|
tcpStreamChannelTimeout := GetTcpChannelTimeoutMs()
|
|
for {
|
|
time.Sleep(10 * time.Millisecond)
|
|
_debug.FreeOSMemory()
|
|
streamMap.streams.Range(func(key interface{}, value interface{}) bool {
|
|
streamWrapper := value.(*tcpStreamWrapper)
|
|
stream := streamWrapper.stream
|
|
if stream.superIdentifier.Protocol == nil {
|
|
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) {
|
|
stream.Close()
|
|
diagnose.AppStats.IncDroppedTcpStreams()
|
|
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d",
|
|
diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
|
|
}
|
|
} else {
|
|
if !stream.superIdentifier.IsClosedOthers {
|
|
for i := range stream.clients {
|
|
reader := &stream.clients[i]
|
|
if reader.extension.Protocol != stream.superIdentifier.Protocol {
|
|
reader.Close()
|
|
}
|
|
}
|
|
for i := range stream.servers {
|
|
reader := &stream.servers[i]
|
|
if reader.extension.Protocol != stream.superIdentifier.Protocol {
|
|
reader.Close()
|
|
}
|
|
}
|
|
stream.superIdentifier.IsClosedOthers = true
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
}
|