kubeshark/tap/tcp_streams_map.go

73 lines
1.9 KiB
Go

package tap
import (
"runtime"
_debug "runtime/debug"
"sync"
"time"
"github.com/up9inc/mizu/shared/logger"
"github.com/up9inc/mizu/tap/diagnose"
)
type tcpStreamMap struct {
streams *sync.Map
streamId int64
}
func NewTcpStreamMap() *tcpStreamMap {
return &tcpStreamMap{
streams: &sync.Map{},
}
}
func (streamMap *tcpStreamMap) Store(key, value interface{}) {
streamMap.streams.Store(key, value)
}
func (streamMap *tcpStreamMap) Delete(key interface{}) {
streamMap.streams.Delete(key)
}
func (streamMap *tcpStreamMap) nextId() int64 {
streamMap.streamId++
return streamMap.streamId
}
func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() {
tcpStreamChannelTimeout := GetTcpChannelTimeoutMs()
for {
time.Sleep(10 * time.Millisecond)
_debug.FreeOSMemory()
streamMap.streams.Range(func(key interface{}, value interface{}) bool {
streamWrapper := value.(*tcpStreamWrapper)
stream := streamWrapper.stream
if stream.superIdentifier.Protocol == nil {
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) {
stream.Close()
diagnose.AppStats.IncDroppedTcpStreams()
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d",
diagnose.AppStats.DroppedTcpStreams, runtime.NumGoroutine(), tcpStreamChannelTimeout/1000000)
}
} else {
if !stream.superIdentifier.IsClosedOthers {
for i := range stream.clients {
reader := &stream.clients[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
for i := range stream.servers {
reader := &stream.servers[i]
if reader.extension.Protocol != stream.superIdentifier.Protocol {
reader.Close()
}
}
stream.superIdentifier.IsClosedOthers = true
}
}
return true
})
}
}