Bring back filterItems and isHealthCheckByUserAgent functions

This commit is contained in:
M. Mert Yildiran
2021-08-26 03:53:13 +03:00
parent 92fea9bbf4
commit 81bcf85e91
2 changed files with 58 additions and 45 deletions

View File

@@ -18,6 +18,7 @@ import (
"path/filepath" "path/filepath"
"plugin" "plugin"
"sort" "sort"
"strings"
"github.com/gin-contrib/static" "github.com/gin-contrib/static"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -52,10 +53,11 @@ func main() {
if *standaloneMode { if *standaloneMode {
api.StartResolving(*namespace) api.StartResolving(*namespace)
outputItemsChannel := make(chan *tapApi.OutputChannelItem)
filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem) filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
tap.StartPassiveTapper(tapOpts, filteredOutputItemsChannel, extensions) tap.StartPassiveTapper(tapOpts, outputItemsChannel, extensions)
// go filterHarItems(harOutputChannel, filteredOutputItemsChannel, getTrafficFilteringOptions()) go filterItems(outputItemsChannel, filteredOutputItemsChannel, getTrafficFilteringOptions())
go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap) go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
// go api.StartReadingOutbound(outboundLinkOutputChannel) // go api.StartReadingOutbound(outboundLinkOutputChannel)
@@ -84,20 +86,19 @@ func main() {
} else if *apiServerMode { } else if *apiServerMode {
api.StartResolving(*namespace) api.StartResolving(*namespace)
socketHarOutChannel := make(chan *tapApi.OutputChannelItem, 1000) outputItemsChannel := make(chan *tapApi.OutputChannelItem)
// TODO: filtered does not work filteredOutputItemsChannel := make(chan *tapApi.OutputChannelItem)
// filteredHarChannel := make(chan *tapApi.OutputChannelItem)
// go filterHarItems(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions()) go filterItems(outputItemsChannel, filteredOutputItemsChannel, getTrafficFilteringOptions())
go api.StartReadingEntries(socketHarOutChannel, nil, extensionsMap) go api.StartReadingEntries(filteredOutputItemsChannel, nil, extensionsMap)
hostApi(socketHarOutChannel) hostApi(outputItemsChannel)
} else if *harsReaderMode { } else if *harsReaderMode {
socketHarOutChannel := make(chan *tapApi.OutputChannelItem, 1000) outputItemsChannel := make(chan *tapApi.OutputChannelItem, 1000)
// filteredHarChannel := make(chan *tap.OutputChannelItem) filteredHarChannel := make(chan *tapApi.OutputChannelItem)
// go filterHarItems(socketHarOutChannel, filteredHarChannel, getTrafficFilteringOptions()) go filterItems(outputItemsChannel, filteredHarChannel, getTrafficFilteringOptions())
go api.StartReadingEntries(socketHarOutChannel, harsDir, extensionsMap) go api.StartReadingEntries(filteredHarChannel, harsDir, extensionsMap)
hostApi(nil) hostApi(nil)
} }
@@ -211,7 +212,9 @@ func getTapTargets() []string {
func getTrafficFilteringOptions() *shared.TrafficFilteringOptions { func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar) filteringOptionsJson := os.Getenv(shared.MizuFilteringOptionsEnvVar)
if filteringOptionsJson == "" { if filteringOptionsJson == "" {
return nil return &shared.TrafficFilteringOptions{
HealthChecksUserAgentHeaders: []string{"kube-probe", "prometheus"},
}
} }
var filteringOptions shared.TrafficFilteringOptions var filteringOptions shared.TrafficFilteringOptions
err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions) err := json.Unmarshal([]byte(filteringOptionsJson), &filteringOptions)
@@ -222,39 +225,45 @@ func getTrafficFilteringOptions() *shared.TrafficFilteringOptions {
return &filteringOptions return &filteringOptions
} }
// var userAgentsToFilter = []string{"kube-probe", "prometheus"} func filterItems(inChannel <-chan *tapApi.OutputChannelItem, outChannel chan *tapApi.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) {
for message := range inChannel {
if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) {
continue
}
// TODO: move this to tappers https://up9.atlassian.net/browse/TRA-3441
if isHealthCheckByUserAgent(message, filterOptions.HealthChecksUserAgentHeaders) {
continue
}
//func filterHarItems(inChannel <-chan *tap.OutputChannelItem, outChannel chan *tap.OutputChannelItem, filterOptions *shared.TrafficFilteringOptions) { // if !filterOptions.DisableRedaction {
// for message := range inChannel { // sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
// if message.ConnectionInfo.IsOutgoing && api.CheckIsServiceIP(message.ConnectionInfo.ServerIP) { // }
// continue
// }
// // TODO: move this to tappers https://up9.atlassian.net/browse/TRA-3441
// if filterOptions.HideHealthChecks && isHealthCheckByUserAgent(message) {
// continue
// }
//
// if !filterOptions.DisableRedaction {
// sensitiveDataFiltering.FilterSensitiveInfoFromHarRequest(message, filterOptions)
// }
//
// outChannel <- message
// }
//}
//func isHealthCheckByUserAgent(message *tap.OutputChannelItem) bool { outChannel <- message
// // for _, header := range message.HarEntry.Request.Headers { }
// // if strings.ToLower(header.Name) == "user-agent" { }
// // for _, userAgent := range userAgentsToFilter {
// // if strings.Contains(strings.ToLower(header.Value), userAgent) { func isHealthCheckByUserAgent(item *tapApi.OutputChannelItem, userAgentsToIgnore []string) bool {
// // return true if item.Protocol.Name != "http" {
// // } return false
// // } }
// // return false
// // } request := item.Pair.Request.Payload.(map[string]interface{})
// // } reqDetails := request["details"].(map[string]interface{})
// return false
//} for _, header := range reqDetails["headers"].([]interface{}) {
h := header.(map[string]interface{})
if strings.ToLower(h["name"].(string)) == "user-agent" {
for _, userAgent := range userAgentsToIgnore {
if strings.Contains(strings.ToLower(h["value"].(string)), strings.ToLower(userAgent)) {
return true
}
}
return false
}
}
return false
}
func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) { func pipeTapChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *tapApi.OutputChannelItem) {
if connection == nil { if connection == nil {

View File

@@ -4,10 +4,11 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"github.com/romana/rlog" "github.com/romana/rlog"
k8serrors "k8s.io/apimachinery/pkg/api/errors" k8serrors "k8s.io/apimachinery/pkg/api/errors"
"github.com/orcaman/concurrent-map" cmap "github.com/orcaman/concurrent-map"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch" "k8s.io/apimachinery/pkg/watch"
@@ -52,6 +53,9 @@ func (resolver *Resolver) GetMap() cmap.ConcurrentMap {
} }
func (resolver *Resolver) CheckIsServiceIP(address string) bool { func (resolver *Resolver) CheckIsServiceIP(address string) bool {
if resolver == nil {
return false
}
_, isFound := resolver.serviceMap.Get(address) _, isFound := resolver.serviceMap.Get(address)
return isFound return isFound
} }