Tag entries destination namespace (#803)

* Update main.go, socket_server_handlers.go, and 7 more files...

* Update cors.go

* omit empty namespace in json so tests pass

* fix indent
This commit is contained in:
RamiBerm 2022-02-15 10:51:38 +02:00 committed by GitHub
parent 5484b7c491
commit 4197ec198c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 49 additions and 32 deletions

View File

@ -118,8 +118,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
for item := range outputItems {
extension := extensionsMap[item.Protocol.Name]
resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation)
resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo)
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace)
if extension.Protocol.Name == "http" {
if !disableOASValidation {
var httpPair tapApi.HTTPRequestResponsePair
@ -158,26 +158,32 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension
}
}
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) {
func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string, namespace string) {
if k8sResolver != nil {
unresolvedSource := connectionInfo.ClientIP
resolvedSource = k8sResolver.Resolve(unresolvedSource)
if resolvedSource == "" {
resolvedSourceObject := k8sResolver.Resolve(unresolvedSource)
if resolvedSourceObject == nil {
logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource)
if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" {
return
}
} else {
resolvedSource = resolvedSourceObject.FullAddress
}
unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort)
resolvedDestination = k8sResolver.Resolve(unresolvedDestination)
if resolvedDestination == "" {
resolvedDestinationObject := k8sResolver.Resolve(unresolvedDestination)
if resolvedDestinationObject == nil {
logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination)
if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" {
return
}
} else {
resolvedDestination = resolvedDestinationObject.FullAddress
namespace = resolvedDestinationObject.Namespace
}
}
return resolvedSource, resolvedDestination
return resolvedSource, resolvedDestination, namespace
}
func CheckIsServiceIP(address string) bool {

View File

@ -104,9 +104,9 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
}
func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
resolvedName := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedName != "" {
outboundLinkMessage.Data.DstIP = resolvedName
resolvedNameObject := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP)
if resolvedNameObject != nil {
outboundLinkMessage.Data.DstIP = resolvedNameObject.FullAddress
} else if outboundLinkMessage.Data.SuggestedResolvedName != "" {
outboundLinkMessage.Data.DstIP = outboundLinkMessage.Data.SuggestedResolvedName
}

View File

@ -7,7 +7,7 @@ func CORSMiddleware() gin.HandlerFunc {
c.Writer.Header().Set("Access-Control-Allow-Origin", "*")
c.Writer.Header().Set("Access-Control-Allow-Credentials", "true")
c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With, x-session-token")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT")
c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE")
if c.Request.Method == "OPTIONS" {
c.AbortWithStatus(204)

View File

@ -30,6 +30,11 @@ type Resolver struct {
namespace string
}
type ResolvedObjectInfo struct {
FullAddress string
Namespace string
}
func (resolver *Resolver) Start(ctx context.Context) {
if !resolver.isStarted {
resolver.isStarted = true
@ -40,12 +45,12 @@ func (resolver *Resolver) Start(ctx context.Context) {
}
}
func (resolver *Resolver) Resolve(name string) string {
func (resolver *Resolver) Resolve(name string) *ResolvedObjectInfo {
resolvedName, isFound := resolver.nameMap.Get(name)
if !isFound {
return ""
return nil
}
return resolvedName.(string)
return resolvedName.(*ResolvedObjectInfo)
}
func (resolver *Resolver) GetMap() cmap.ConcurrentMap {
@ -71,7 +76,7 @@ func (resolver *Resolver) watchPods(ctx context.Context) error {
}
if event.Type == watch.Deleted {
pod := event.Object.(*corev1.Pod)
resolver.saveResolvedName(pod.Status.PodIP, "", event.Type)
resolver.saveResolvedName(pod.Status.PodIP, "", pod.Namespace, event.Type)
}
case <-ctx.Done():
watcher.Stop()
@ -106,10 +111,10 @@ func (resolver *Resolver) watchEndpoints(ctx context.Context) error {
}
if subset.Addresses != nil {
for _, address := range subset.Addresses {
resolver.saveResolvedName(address.IP, serviceHostname, event.Type)
resolver.saveResolvedName(address.IP, serviceHostname, endpoint.Namespace, event.Type)
for _, port := range ports {
ipWithPort := fmt.Sprintf("%s:%d", address.IP, port)
resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type)
resolver.saveResolvedName(ipWithPort, serviceHostname, endpoint.Namespace, event.Type)
}
}
}
@ -139,19 +144,19 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
service := event.Object.(*corev1.Service)
serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace)
if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString {
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
if service.Spec.Ports != nil {
for _, port := range service.Spec.Ports {
if port.Port > 0 {
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, event.Type)
resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, service.Namespace, event.Type)
}
}
}
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type)
resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type)
}
if service.Status.LoadBalancer.Ingress != nil {
for _, ingress := range service.Status.LoadBalancer.Ingress {
resolver.saveResolvedName(ingress.IP, serviceHostname, event.Type)
resolver.saveResolvedName(ingress.IP, serviceHostname, service.Namespace, event.Type)
}
}
case <-ctx.Done():
@ -161,21 +166,22 @@ func (resolver *Resolver) watchServices(ctx context.Context) error {
}
}
func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) {
func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.nameMap.Remove(key)
logger.Log.Infof("setting %s=nil", key)
} else {
resolver.nameMap.Set(key, resolved)
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
logger.Log.Infof("setting %s=%s", key, resolved)
}
}
func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) {
func (resolver *Resolver) saveServiceIP(key string, resolved string, namespace string, eventType watch.EventType) {
if eventType == watch.Deleted {
resolver.serviceMap.Remove(key)
} else {
resolver.serviceMap.Set(key, resolved)
resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace})
}
}

View File

@ -99,7 +99,7 @@ type Dissector interface {
Register(*Extension)
Ping()
Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry
Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry
Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error)
Macros() map[string]string
NewResponseRequestMatcher() RequestResponseMatcher
@ -128,6 +128,7 @@ type Entry struct {
Protocol Protocol `json:"proto"`
Source *TCP `json:"src"`
Destination *TCP `json:"dst"`
Namespace string `json:"namespace,omitempty"`
Outgoing bool `json:"outgoing"`
Timestamp int64 `json:"timestamp"`
StartTime time.Time `json:"startTime"`

View File

@ -212,7 +212,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
@ -254,6 +254,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Method: request["method"].(string),

View File

@ -182,7 +182,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
return nil
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
var host, authority, path string
request := item.Pair.Request.Payload.(map[string]interface{})
@ -280,6 +280,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,

View File

@ -205,7 +205,7 @@ func TestAnalyze(t *testing.T) {
var entries []*api.Entry
for _, item := range items {
entry := dissector.Analyze(item, "", "")
entry := dissector.Analyze(item, "", "", "")
entries = append(entries, entry)
}

View File

@ -62,7 +62,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
apiKey := ApiKey(reqDetails["apiKey"].(float64))
@ -158,6 +158,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}),

View File

@ -63,7 +63,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co
}
}
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry {
func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry {
request := item.Pair.Request.Payload.(map[string]interface{})
response := item.Pair.Response.Payload.(map[string]interface{})
reqDetails := request["details"].(map[string]interface{})
@ -96,6 +96,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string,
IP: item.ConnectionInfo.ServerIP,
Port: item.ConnectionInfo.ServerPort,
},
Namespace: namespace,
Outgoing: item.ConnectionInfo.IsOutgoing,
Request: reqDetails,
Response: resDetails,