From cfe9e863b7318f66460ec477b0b40f3f580e3c43 Mon Sep 17 00:00:00 2001 From: RamiBerm <54766858+RamiBerm@users.noreply.github.com> Date: Mon, 27 Dec 2021 11:50:34 +0200 Subject: [PATCH] TRA-4065 support inflight tap target update (#556) * WIP * WIP * Update main.go * Update main.go and passive_tapper.go * Update passive_tapper.go * Update passive_tapper.go * Update passive_tapper.go * Update passive_tapper.go --- agent/main.go | 22 ++++++++++++++- shared/models.go | 12 ++++++-- tap/passive_tapper.go | 59 +++++++++++++++++++++++++++------------ tap/tcp_stream_factory.go | 8 +++--- 4 files changed, 76 insertions(+), 25 deletions(-) diff --git a/agent/main.go b/agent/main.go index f90dc0f96..6f1df5b3c 100644 --- a/agent/main.go +++ b/agent/main.go @@ -55,7 +55,7 @@ var extensionsMap map[string]*tapApi.Extension // global var startTime int64 const ( - socketConnectionRetries = 10 + socketConnectionRetries = 30 socketConnectionRetryDelay = time.Second * 2 socketHandshakeTimeout = time.Second * 2 ) @@ -425,12 +425,32 @@ func dialSocketWithRetry(socketAddress string, retryAmount int, retryDelay time. time.Sleep(retryDelay) } } else { + go handleIncomingMessageAsTapper(socketConnection) return socketConnection, nil } } return nil, lastErr } +func handleIncomingMessageAsTapper(socketConnection *websocket.Conn) { + for { + if _, message, err := socketConnection.ReadMessage(); err != nil { + logger.Log.Errorf("error reading message from socket connection, err: %s, (%v,%+v)", err, err, err) + if errors.Is(err, syscall.EPIPE) { + // socket has disconnected, we can safely stop this goroutine + return + } + } else { + var tapConfigMessage *shared.WebSocketTapConfigMessage + if err := json.Unmarshal(message, &tapConfigMessage); err != nil { + logger.Log.Errorf("received unknown message from socket connection: %s, err: %s, (%v,%+v)", string(message), err, err, err) + } else { + tap.UpdateTapTargets(tapConfigMessage.TapTargets) + } + } + } +} + func startMizuTapperSyncer(ctx context.Context, provider *kubernetes.Provider) (*kubernetes.MizuTapperSyncer, error) { tapperSyncer, err := kubernetes.CreateAndStartMizuTapperSyncer(ctx, provider, kubernetes.TapperSyncerConfig{ TargetNamespaces: config.Config.TargetNamespaces, diff --git a/shared/models.go b/shared/models.go index ef0cb2949..f18b833e8 100644 --- a/shared/models.go +++ b/shared/models.go @@ -1,11 +1,13 @@ package shared import ( + "io/ioutil" + "strings" + "github.com/op/go-logging" "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/tap/api" - "io/ioutil" - "strings" + v1 "k8s.io/api/core/v1" "gopkg.in/yaml.v3" ) @@ -21,6 +23,7 @@ const ( WebSocketMessageTypeToast WebSocketMessageType = "toast" WebSocketMessageTypeQueryMetadata WebSocketMessageType = "queryMetadata" WebSocketMessageTypeStartTime WebSocketMessageType = "startTime" + WebSocketMessageTypeTapConfig WebSocketMessageType = "tapConfig" ) type Resources struct { @@ -67,6 +70,11 @@ type WebSocketStatusMessage struct { TappingStatus []TappedPodStatus `json:"tappingStatus"` } +type WebSocketTapConfigMessage struct { + *WebSocketMessageMetadata + TapTargets []v1.Pod `json:"pods"` +} + type TapperStatus struct { TapperName string `json:"tapperName"` NodeName string `json:"nodeName"` diff --git a/tap/passive_tapper.go b/tap/passive_tapper.go index 275f0288d..95f4e1c4d 100644 --- a/tap/passive_tapper.go +++ b/tap/passive_tapper.go @@ -11,6 +11,7 @@ package tap import ( "encoding/json" "flag" + "fmt" "os" "runtime" "strings" @@ -60,8 +61,11 @@ type TapOpts struct { FilterAuthorities []v1.Pod } -var extensions []*api.Extension // global -var filteringOptions *api.TrafficFilteringOptions // global +var extensions []*api.Extension // global +var filteringOptions *api.TrafficFilteringOptions // global +var tapTargets []v1.Pod // global +var packetSourceManager *source.PacketSourceManager // global +var mainPacketInputChan chan source.TcpPacketInfo // global func inArrayInt(arr []int, valueToCheck int) bool { for _, value := range arr { @@ -86,7 +90,9 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, filteringOptions = options if opts.FilterAuthorities == nil { - opts.FilterAuthorities = []v1.Pod{} + tapTargets = []v1.Pod{} + } else { + tapTargets = opts.FilterAuthorities } if GetMemoryProfilingEnabled() { @@ -96,6 +102,23 @@ func StartPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem, go startPassiveTapper(opts, outputItems) } +func UpdateTapTargets(newTapTargets []v1.Pod) { + tapTargets = newTapTargets + if err := initializePacketSources(); err != nil { + logger.Log.Fatal(err) + } + printNewTapTargets() +} + +func printNewTapTargets() { + printStr := "" + for _, tapTarget := range tapTargets { + printStr += fmt.Sprintf("%s (%s), ", tapTarget.Status.PodIP, tapTarget.Name) + } + printStr = strings.TrimRight(printStr, ", ") + logger.Log.Infof("Now tapping: %s", printStr) +} + func printPeriodicStats(cleaner *Cleaner) { statsPeriod := time.Second * time.Duration(*statsevery) ticker := time.NewTicker(statsPeriod) @@ -136,7 +159,11 @@ func printPeriodicStats(cleaner *Cleaner) { } } -func initializePacketSources(opts *TapOpts) (*source.PacketSourceManager, error) { +func initializePacketSources() error { + if packetSourceManager != nil { + packetSourceManager.Close() + } + var bpffilter string if len(flag.Args()) > 0 { bpffilter = strings.Join(flag.Args(), " ") @@ -151,7 +178,13 @@ func initializePacketSources(opts *TapOpts) (*source.PacketSourceManager, error) BpfFilter: bpffilter, } - return source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *istio, opts.FilterAuthorities, behaviour) + var err error + if packetSourceManager, err = source.NewPacketSourceManager(*procfs, *pids, *fname, *iface, *istio, tapTargets, behaviour); err != nil { + return err + } else { + packetSourceManager.ReadPackets(!*nodefrag, mainPacketInputChan) + return nil + } } func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) { @@ -161,25 +194,15 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) diagnose.InitializeErrorsMap(*debug, *verbose, *quiet) diagnose.InitializeTapperInternalStats() - sources, err := initializePacketSources(opts) - - if err != nil { + if err := initializePacketSources(); err != nil { logger.Log.Fatal(err) } - defer sources.Close() - - if err != nil { - logger.Log.Fatal(err) - } - - packets := make(chan source.TcpPacketInfo) + mainPacketInputChan = make(chan source.TcpPacketInfo) assembler := NewTcpAssembler(outputItems, streamsMap, opts) diagnose.AppStats.SetStartTime(time.Now()) - sources.ReadPackets(!*nodefrag, packets) - staleConnectionTimeout := time.Second * time.Duration(*staleTimeoutSeconds) cleaner := Cleaner{ assembler: assembler.Assembler, @@ -191,7 +214,7 @@ func startPassiveTapper(opts *TapOpts, outputItems chan *api.OutputChannelItem) go printPeriodicStats(&cleaner) - assembler.processPackets(*hexdumppkt, packets) + assembler.processPackets(*hexdumppkt, mainPacketInputChan) if diagnose.TapErrors.OutputLevel >= 2 { assembler.dumpStreamPool() diff --git a/tap/tcp_stream_factory.go b/tap/tcp_stream_factory.go index c063f7bfe..90f09b5d8 100644 --- a/tap/tcp_stream_factory.go +++ b/tap/tcp_stream_factory.go @@ -151,13 +151,13 @@ func inArrayPod(pods []v1.Pod, address string) bool { func (factory *tcpStreamFactory) getStreamProps(srcIP string, srcPort string, dstIP string, dstPort string) *streamProps { if factory.opts.HostMode { - if inArrayPod(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", dstIP, dstPort)) { + if inArrayPod(tapTargets, fmt.Sprintf("%s:%s", dstIP, dstPort)) { return &streamProps{isTapTarget: true, isOutgoing: false} - } else if inArrayPod(factory.opts.FilterAuthorities, dstIP) { + } else if inArrayPod(tapTargets, dstIP) { return &streamProps{isTapTarget: true, isOutgoing: false} - } else if inArrayPod(factory.opts.FilterAuthorities, fmt.Sprintf("%s:%s", srcIP, srcPort)) { + } else if inArrayPod(tapTargets, fmt.Sprintf("%s:%s", srcIP, srcPort)) { return &streamProps{isTapTarget: true, isOutgoing: true} - } else if inArrayPod(factory.opts.FilterAuthorities, srcIP) { + } else if inArrayPod(tapTargets, srcIP) { return &streamProps{isTapTarget: true, isOutgoing: true} } return &streamProps{isTapTarget: false, isOutgoing: false}