mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-09-01 02:30:33 +00:00
TRA-4065 support inflight tap target update (#556)
* WIP * WIP * Update main.go * Update main.go and passive_tapper.go * Update passive_tapper.go * Update passive_tapper.go * Update passive_tapper.go * Update passive_tapper.go
This commit is contained in:
@@ -55,7 +55,7 @@ var extensionsMap map[string]*tapApi.Extension // global
|
|||||||
var startTime int64
|
var startTime int64
|
||||||
|
|
||||||
const (
|
const (
|
||||||
socketConnectionRetries = 10
|
socketConnectionRetries = 30
|
||||||
socketConnectionRetryDelay = time.Second * 2
|
socketConnectionRetryDelay = time.Second * 2
|
||||||
socketHandshakeTimeout = time.Second * 2
|
socketHandshakeTimeout = time.Second * 2
|
||||||
)
|
)
|
||||||
@@ -425,12 +425,32 @@ func dialSocketWithRetry(socketAddress string, retryAmount int, retryDelay time.
|
|||||||
time.Sleep(retryDelay)
|
time.Sleep(retryDelay)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
go handleIncomingMessageAsTapper(socketConnection)
|
||||||
return socketConnection, nil
|
return socketConnection, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, lastErr
|
return nil, lastErr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) {
|
||||||
|
for {
|
||||||
|
if _, message, err := socketConnection.ReadMessage(); err != nil {
|
||||||
|
logger.Log.Errorf("error reading message from socket connection, err: %s, (%v,%+v)", err, err, err)
|
||||||
|
if errors.Is(err, syscall.EPIPE) {
|
||||||
|
// socket has disconnected, we can safely stop this goroutine
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
var tapConfigMessage *shared.WebSocketTapConfigMessage
|
||||||
|
if err := json.Unmarshal(message, &tapConfigMessage); err != nil {
|
||||||
|
logger.Log.Errorf("received unknown message from socket connection: %s, err: %s, (%v,%+v)", string(message), err, err, err)
|
||||||
|
} else {
|
||||||
|
tap.UpdateTapTargets(tapConfigMessage.TapTargets)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) (*kubernetes.MizuTapperSyncer, error) {
|
func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) (*kubernetes.MizuTapperSyncer, error) {
|
||||||
tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{
|
tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{
|
||||||
TargetNamespaces: config.Config.TargetNamespaces,
|
TargetNamespaces: config.Config.TargetNamespaces,
|
||||||
|
@@ -1,11 +1,13 @@
|
|||||||
package shared
|
package shared
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io/ioutil"
|
||||||
|
"strings"
|
||||||
|
|
||||||
"github.com/op/go-logging"
|
"github.com/op/go-logging"
|
||||||
"github.com/up9inc/mizu/shared/logger"
|
"github.com/up9inc/mizu/shared/logger"
|
||||||
"github.com/up9inc/mizu/tap/api"
|
"github.com/up9inc/mizu/tap/api"
|
||||||
"io/ioutil"
|
v1 "k8s.io/api/core/v1"
|
||||||
"strings"
|
|
||||||
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
)
|
)
|
||||||
@@ -21,6 +23,7 @@ const (
|
|||||||
WebSocketMessageTypeToast WebSocketMessageType = "toast"
|
WebSocketMessageTypeToast WebSocketMessageType = "toast"
|
||||||
WebSocketMessageTypeQueryMetadata WebSocketMessageType = "queryMetadata"
|
WebSocketMessageTypeQueryMetadata WebSocketMessageType = "queryMetadata"
|
||||||
WebSocketMessageTypeStartTime WebSocketMessageType = "startTime"
|
WebSocketMessageTypeStartTime WebSocketMessageType = "startTime"
|
||||||
|
WebSocketMessageTypeTapConfig WebSocketMessageType = "tapConfig"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Resources struct {
|
type Resources struct {
|
||||||
@@ -67,6 +70,11 @@ type WebSocketStatusMessage struct {
|
|||||||
TappingStatus []TappedPodStatus `json:"tappingStatus"`
|
TappingStatus []TappedPodStatus `json:"tappingStatus"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type WebSocketTapConfigMessage struct {
|
||||||
|
*WebSocketMessageMetadata
|
||||||
|
TapTargets []v1.Pod `json:"pods"`
|
||||||
|
}
|
||||||
|
|
||||||
type TapperStatus struct {
|
type TapperStatus struct {
|
||||||
TapperName string `json:"tapperName"`
|
TapperName string `json:"tapperName"`
|
||||||
NodeName string `json:"nodeName"`
|
NodeName string `json:"nodeName"`
|
||||||
|
@@ -11,6 +11,7 @@ package tap
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"flag"
|
"flag"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -60,8 +61,11 @@ type TapOpts struct {
|
|||||||
FilterAuthorities []v1.Pod
|
FilterAuthorities []v1.Pod
|
||||||
}
|
}
|
||||||
|
|
||||||
var extensions []*api.Extension // global
|
var extensions []*api.Extension // global
|
||||||
var filteringOptions *api.TrafficFilteringOptions // global
|
var filteringOptions *api.TrafficFilteringOptions // global
|
||||||
|
var tapTargets []v1.Pod // global
|
||||||
|
var packetSourceManager *source.PacketSourceManager // global
|
||||||
|
var mainPacketInputChan chan source.TcpPacketInfo // global
|
||||||
|
|
||||||
func inArrayInt(arr []int, valueToCheck int) bool {
|
func inArrayInt(arr []int, valueToCheck int) bool {
|
||||||
for _, value := range arr {
|
for _, value := range arr {
|
||||||
@@ -86,7 +90,9 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
|
|||||||
filteringOptions = options
|
filteringOptions = options
|
||||||
|
|
||||||
if opts.FilterAuthorities == nil {
|
if opts.FilterAuthorities == nil {
|
||||||
opts.FilterAuthorities = []v1.Pod{}
|
tapTargets = []v1.Pod{}
|
||||||
|
} else {
|
||||||
|
tapTargets = opts.FilterAuthorities
|
||||||
}
|
}
|
||||||
|
|
||||||
if GetMemoryProfilingEnabled() {
|
if GetMemoryProfilingEnabled() {
|
||||||
@@ -96,6 +102,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem,
|
|||||||
go startPassiveTapper(opts, outputItems)
|
go startPassiveTapper(opts, outputItems)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func UpdateTapTargets(newTapTargets []v1.Pod) {
|
||||||
|
tapTargets = newTapTargets
|
||||||
|
if err := initializePacketSources(); err != nil {
|
||||||
|
logger.Log.Fatal(err)
|
||||||
|
}
|
||||||
|
printNewTapTargets()
|
||||||
|
}
|
||||||
|
|
||||||
|
func printNewTapTargets() {
|
||||||
|
printStr := ""
|
||||||
|
for _, tapTarget := range tapTargets {
|
||||||
|
printStr += fmt.Sprintf("%s (%s), ", tapTarget.Status.PodIP, tapTarget.Name)
|
||||||
|
}
|
||||||
|
printStr = strings.TrimRight(printStr, ", ")
|
||||||
|
logger.Log.Infof("Now tapping: %s", printStr)
|
||||||
|
}
|
||||||
|
|
||||||
func printPeriodicStats(cleaner *Cleaner) {
|
func printPeriodicStats(cleaner *Cleaner) {
|
||||||
statsPeriod := time.Second * time.Duration(*statsevery)
|
statsPeriod := time.Second * time.Duration(*statsevery)
|
||||||
ticker := time.NewTicker(statsPeriod)
|
ticker := time.NewTicker(statsPeriod)
|
||||||
@@ -136,7 +159,11 @@ func printPeriodicStats(cleaner *Cleaner) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func initializePacketSources(opts *TapOpts) (*source.PacketSourceManager, error) {
|
func initializePacketSources() error {
|
||||||
|
if packetSourceManager != nil {
|
||||||
|
packetSourceManager.Close()
|
||||||
|
}
|
||||||
|
|
||||||
var bpffilter string
|
var bpffilter string
|
||||||
if len(flag.Args()) > 0 {
|
if len(flag.Args()) > 0 {
|
||||||
bpffilter = strings.Join(flag.Args(), " ")
|
bpffilter = strings.Join(flag.Args(), " ")
|
||||||
@@ -151,7 +178,13 @@ func initializePacketSources(opts *TapOpts) (*source.PacketSourceManager, error)
|
|||||||
BpfFilter: bpffilter,
|
BpfFilter: bpffilter,
|
||||||
}
|
}
|
||||||
|
|
||||||
return source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *istio, opts.FilterAuthorities, behaviour)
|
var err error
|
||||||
|
if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *istio, tapTargets, behaviour); err != nil {
|
||||||
|
return err
|
||||||
|
} else {
|
||||||
|
packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) {
|
func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) {
|
||||||
@@ -161,25 +194,15 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem)
|
|||||||
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
diagnose.InitializeErrorsMap(*debug, *verbose, *quiet)
|
||||||
diagnose.InitializeTapperInternalStats()
|
diagnose.InitializeTapperInternalStats()
|
||||||
|
|
||||||
sources, err := initializePacketSources(opts)
|
if err := initializePacketSources(); err != nil {
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logger.Log.Fatal(err)
|
logger.Log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer sources.Close()
|
mainPacketInputChan = make(chan source.TcpPacketInfo)
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
logger.Log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
packets := make(chan source.TcpPacketInfo)
|
|
||||||
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
|
assembler := NewTcpAssembler(outputItems, streamsMap, opts)
|
||||||
|
|
||||||
diagnose.AppStats.SetStartTime(time.Now())
|
diagnose.AppStats.SetStartTime(time.Now())
|
||||||
|
|
||||||
sources.ReadPackets(!*nodefrag, packets)
|
|
||||||
|
|
||||||
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)
|
staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds)
|
||||||
cleaner := Cleaner{
|
cleaner := Cleaner{
|
||||||
assembler: assembler.Assembler,
|
assembler: assembler.Assembler,
|
||||||
@@ -191,7 +214,7 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem)
|
|||||||
|
|
||||||
go printPeriodicStats(&cleaner)
|
go printPeriodicStats(&cleaner)
|
||||||
|
|
||||||
assembler.processPackets(*hexdumppkt, packets)
|
assembler.processPackets(*hexdumppkt, mainPacketInputChan)
|
||||||
|
|
||||||
if diagnose.TapErrors.OutputLevel >= 2 {
|
if diagnose.TapErrors.OutputLevel >= 2 {
|
||||||
assembler.dumpStreamPool()
|
assembler.dumpStreamPool()
|
||||||
|
@@ -151,13 +151,13 @@ func inArrayPod(pods []v1.Pod, address string) bool {
|
|||||||
|
|
||||||
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
|
func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps {
|
||||||
if factory.opts.HostMode {
|
if factory.opts.HostMode {
|
||||||
if inArrayPod(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
|
if inArrayPod(tapTargets, fmt.Sprintf("%s:%s", dstIP, dstPort)) {
|
||||||
return &streamProps{isTapTarget: true, isOutgoing: false}
|
return &streamProps{isTapTarget: true, isOutgoing: false}
|
||||||
} else if inArrayPod(factory.opts.FilterAuthorities, dstIP) {
|
} else if inArrayPod(tapTargets, dstIP) {
|
||||||
return &streamProps{isTapTarget: true, isOutgoing: false}
|
return &streamProps{isTapTarget: true, isOutgoing: false}
|
||||||
} else if inArrayPod(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
|
} else if inArrayPod(tapTargets, fmt.Sprintf("%s:%s", srcIP, srcPort)) {
|
||||||
return &streamProps{isTapTarget: true, isOutgoing: true}
|
return &streamProps{isTapTarget: true, isOutgoing: true}
|
||||||
} else if inArrayPod(factory.opts.FilterAuthorities, srcIP) {
|
} else if inArrayPod(tapTargets, srcIP) {
|
||||||
return &streamProps{isTapTarget: true, isOutgoing: true}
|
return &streamProps{isTapTarget: true, isOutgoing: true}
|
||||||
}
|
}
|
||||||
return &streamProps{isTapTarget: false, isOutgoing: false}
|
return &streamProps{isTapTarget: false, isOutgoing: false}
|
||||||
|
Reference in New Issue
Block a user