mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-18 00:18:47 +00:00
Remove tcpStreamWrapper
struct
This commit is contained in:
parent
de533730d3
commit
ba4bab3ed5
@ -34,7 +34,7 @@ func (cl *Cleaner) clean() {
|
|||||||
cl.assemblerMutex.Unlock()
|
cl.assemblerMutex.Unlock()
|
||||||
|
|
||||||
cl.streamsMap.streams.Range(func(k, v interface{}) bool {
|
cl.streamsMap.streams.Range(func(k, v interface{}) bool {
|
||||||
reqResMatcher := v.(*tcpStreamWrapper).reqResMatcher
|
reqResMatcher := v.(*tcpStream).reqResMatcher
|
||||||
if reqResMatcher == nil {
|
if reqResMatcher == nil {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -3,6 +3,7 @@ package tap
|
|||||||
import (
|
import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"sync"
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/google/gopacket"
|
"github.com/google/gopacket"
|
||||||
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
"github.com/google/gopacket/layers" // pulls in all layers decoders
|
||||||
@ -30,6 +31,8 @@ type tcpStream struct {
|
|||||||
servers []tcpReader
|
servers []tcpReader
|
||||||
ident string
|
ident string
|
||||||
origin api.Capture
|
origin api.Capture
|
||||||
|
reqResMatcher api.RequestResponseMatcher
|
||||||
|
createdAt time.Time
|
||||||
sync.Mutex
|
sync.Mutex
|
||||||
streamsMap *tcpStreamMap
|
streamsMap *tcpStreamMap
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ package tap
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/up9inc/mizu/shared/logger"
|
"github.com/up9inc/mizu/shared/logger"
|
||||||
"github.com/up9inc/mizu/tap/api"
|
"github.com/up9inc/mizu/tap/api"
|
||||||
@ -27,12 +26,6 @@ type tcpStreamFactory struct {
|
|||||||
opts *TapOpts
|
opts *TapOpts
|
||||||
}
|
}
|
||||||
|
|
||||||
type tcpStreamWrapper struct {
|
|
||||||
stream *tcpStream
|
|
||||||
reqResMatcher api.RequestResponseMatcher
|
|
||||||
createdAt time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
|
func NewTcpStreamFactory(emitter api.Emitter, streamsMap *tcpStreamMap, opts *TapOpts) *tcpStreamFactory {
|
||||||
var ownIps []string
|
var ownIps []string
|
||||||
|
|
||||||
@ -123,11 +116,7 @@ func (factory *tcpStreamFactory) New(net, transport gopacket.Flow, tcp *layers.T
|
|||||||
reqResMatcher: reqResMatcher,
|
reqResMatcher: reqResMatcher,
|
||||||
})
|
})
|
||||||
|
|
||||||
factory.streamsMap.Store(stream.id, &tcpStreamWrapper{
|
factory.streamsMap.Store(stream.id, stream)
|
||||||
stream: stream,
|
|
||||||
reqResMatcher: reqResMatcher,
|
|
||||||
createdAt: time.Now(),
|
|
||||||
})
|
|
||||||
|
|
||||||
factory.wg.Add(2)
|
factory.wg.Add(2)
|
||||||
// Start reading from channel stream.reader.bytes
|
// Start reading from channel stream.reader.bytes
|
||||||
|
@ -67,10 +67,9 @@ func (streamMap *tcpStreamMap) closeTimedoutTcpStreamChannels() {
|
|||||||
time.Sleep(closeTimedoutTcpChannelsIntervalMs)
|
time.Sleep(closeTimedoutTcpChannelsIntervalMs)
|
||||||
_debug.FreeOSMemory()
|
_debug.FreeOSMemory()
|
||||||
streamMap.streams.Range(func(key interface{}, value interface{}) bool {
|
streamMap.streams.Range(func(key interface{}, value interface{}) bool {
|
||||||
streamWrapper := value.(*tcpStreamWrapper)
|
stream := value.(*tcpStream)
|
||||||
stream := streamWrapper.stream
|
|
||||||
if stream.superIdentifier.Protocol == nil {
|
if stream.superIdentifier.Protocol == nil {
|
||||||
if !stream.isClosed && time.Now().After(streamWrapper.createdAt.Add(tcpStreamChannelTimeout)) {
|
if !stream.isClosed && time.Now().After(stream.createdAt.Add(tcpStreamChannelTimeout)) {
|
||||||
stream.Close()
|
stream.Close()
|
||||||
diagnose.AppStats.IncDroppedTcpStreams()
|
diagnose.AppStats.IncDroppedTcpStreams()
|
||||||
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d",
|
logger.Log.Debugf("Dropped an unidentified TCP stream because of timeout. Total dropped: %d Total Goroutines: %d Timeout (ms): %d",
|
||||||
|
Loading…
Reference in New Issue
Block a user