kubeshark/tap/extensions/kafka/tcp_reader_mock_test.go
M. Mert Yıldıran bfa834e840
Spawn only two Goroutines per TCP stream (#1062)
* Spawn only two Goroutines per TCP stream

* Fix the linter error

* Use `isProtocolIdentified` method instead

* Fix the `Read` method of `tcpReader`

* Remove unnecessary `append`

* Copy to buffer only a message is received

* Remove `exhaustBuffer` field and add `rewind` function

* Rename `buffer` field to `pastData`

* Update tap/tcp_reader.go

Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>

* Use `copy` instead of assignment

* No lint

* #run_acceptance_tests

* Fix `rewind` #run_acceptance_tests

* Fix the buffering algorithm #run_acceptance_tests

* Add `TODO`

* Fix the problems in AMQP and Kafka #run_acceptance_tests

* Use `*bytes.Buffer` instead of `[]api.TcpReaderDataMsg` #run_acceptance_tests

* Have a single `*bytes.Buffer`

* Revert "Have a single `*bytes.Buffer`"

This reverts commit fad96a288a.

* Revert "Use `*bytes.Buffer` instead of `[]api.TcpReaderDataMsg` #run_acceptance_tests"

This reverts commit 0fc70bffe2.

* Fix the early timing out issue #run_acceptance_tests

* Remove `NewBytes()` method

* Update the `NewTcpReader` method signature #run_acceptance_tests

* #run_acceptance_tests

* #run_acceptance_tests

* #run_acceptance_tests

Co-authored-by: Nimrod Gilboa Markevich <59927337+nimrod-up9@users.noreply.github.com>
2022-05-16 16:06:36 +03:00

81 lines
1.8 KiB
Go

package kafka
import (
"sync"
"time"
"github.com/up9inc/mizu/tap/api"
)
type tcpReader struct {
ident string
tcpID *api.TcpID
isClosed bool
isClient bool
isOutgoing bool
progress *api.ReadProgress
captureTime time.Time
parent api.TcpStream
extension *api.Extension
emitter api.Emitter
counterPair *api.CounterPair
reqResMatcher api.RequestResponseMatcher
sync.Mutex
}
func NewTcpReader(progress *api.ReadProgress, ident string, tcpId *api.TcpID, captureTime time.Time, parent api.TcpStream, isClient bool, isOutgoing bool, extension *api.Extension, emitter api.Emitter, counterPair *api.CounterPair, reqResMatcher api.RequestResponseMatcher) api.TcpReader {
return &tcpReader{
progress: progress,
ident: ident,
tcpID: tcpId,
captureTime: captureTime,
parent: parent,
isClient: isClient,
isOutgoing: isOutgoing,
extension: extension,
emitter: emitter,
counterPair: counterPair,
reqResMatcher: reqResMatcher,
}
}
func (reader *tcpReader) Read(p []byte) (int, error) {
return 0, nil
}
func (reader *tcpReader) GetReqResMatcher() api.RequestResponseMatcher {
return reader.reqResMatcher
}
func (reader *tcpReader) GetIsClient() bool {
return reader.isClient
}
func (reader *tcpReader) GetReadProgress() *api.ReadProgress {
return reader.progress
}
func (reader *tcpReader) GetParent() api.TcpStream {
return reader.parent
}
func (reader *tcpReader) GetTcpID() *api.TcpID {
return reader.tcpID
}
func (reader *tcpReader) GetCounterPair() *api.CounterPair {
return reader.counterPair
}
func (reader *tcpReader) GetCaptureTime() time.Time {
return reader.captureTime
}
func (reader *tcpReader) GetEmitter() api.Emitter {
return reader.emitter
}
func (reader *tcpReader) GetIsClosed() bool {
return reader.isClosed
}