diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index 3886af658..685ef2516 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -118,8 +118,8 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension for item := range outputItems { extension := extensionsMap[item.Protocol.Name] - resolvedSource, resolvedDestionation := resolveIP(item.ConnectionInfo) - mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation) + resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo) + mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace) if extension.Protocol.Name == "http" { if !disableOASValidation { var httpPair tapApi.HTTPRequestResponsePair @@ -158,26 +158,32 @@ func startReadingChannel(outputItems <-chan *tapApi.OutputChannelItem, extension } } -func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string) { +func resolveIP(connectionInfo *tapApi.ConnectionInfo) (resolvedSource string, resolvedDestination string, namespace string) { if k8sResolver != nil { unresolvedSource := connectionInfo.ClientIP - resolvedSource = k8sResolver.Resolve(unresolvedSource) - if resolvedSource == "" { + resolvedSourceObject := k8sResolver.Resolve(unresolvedSource) + if resolvedSourceObject == nil { logger.Log.Debugf("Cannot find resolved name to source: %s", unresolvedSource) if os.Getenv("SKIP_NOT_RESOLVED_SOURCE") == "1" { return } + } else { + resolvedSource = resolvedSourceObject.FullAddress } + unresolvedDestination := fmt.Sprintf("%s:%s", connectionInfo.ServerIP, connectionInfo.ServerPort) - resolvedDestination = k8sResolver.Resolve(unresolvedDestination) - if resolvedDestination == "" { + resolvedDestinationObject := k8sResolver.Resolve(unresolvedDestination) + if resolvedDestinationObject == nil { logger.Log.Debugf("Cannot find resolved name to dest: %s", unresolvedDestination) if os.Getenv("SKIP_NOT_RESOLVED_DEST") == "1" { return } + } else { + resolvedDestination = resolvedDestinationObject.FullAddress + namespace = resolvedDestinationObject.Namespace } } - return resolvedSource, resolvedDestination + return resolvedSource, resolvedDestination, namespace } func CheckIsServiceIP(address string) bool { diff --git a/agent/pkg/api/socket_server_handlers.go b/agent/pkg/api/socket_server_handlers.go index a22508362..bedde49a7 100644 --- a/agent/pkg/api/socket_server_handlers.go +++ b/agent/pkg/api/socket_server_handlers.go @@ -104,9 +104,9 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) { } func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) { - resolvedName := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP) - if resolvedName != "" { - outboundLinkMessage.Data.DstIP = resolvedName + resolvedNameObject := k8sResolver.Resolve(outboundLinkMessage.Data.DstIP) + if resolvedNameObject != nil { + outboundLinkMessage.Data.DstIP = resolvedNameObject.FullAddress } else if outboundLinkMessage.Data.SuggestedResolvedName != "" { outboundLinkMessage.Data.DstIP = outboundLinkMessage.Data.SuggestedResolvedName } diff --git a/agent/pkg/middlewares/cors.go b/agent/pkg/middlewares/cors.go index 04afb297a..e5d711ad9 100644 --- a/agent/pkg/middlewares/cors.go +++ b/agent/pkg/middlewares/cors.go @@ -7,7 +7,7 @@ func CORSMiddleware() gin.HandlerFunc { c.Writer.Header().Set("Access-Control-Allow-Origin", "*") c.Writer.Header().Set("Access-Control-Allow-Credentials", "true") c.Writer.Header().Set("Access-Control-Allow-Headers", "Content-Type, Content-Length, Accept-Encoding, X-CSRF-Token, Authorization, accept, origin, Cache-Control, X-Requested-With, x-session-token") - c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT") + c.Writer.Header().Set("Access-Control-Allow-Methods", "POST, OPTIONS, GET, PUT, DELETE") if c.Request.Method == "OPTIONS" { c.AbortWithStatus(204) diff --git a/agent/pkg/resolver/resolver.go b/agent/pkg/resolver/resolver.go index 60533704d..a1cdc52de 100644 --- a/agent/pkg/resolver/resolver.go +++ b/agent/pkg/resolver/resolver.go @@ -30,6 +30,11 @@ type Resolver struct { namespace string } +type ResolvedObjectInfo struct { + FullAddress string + Namespace string +} + func (resolver *Resolver) Start(ctx context.Context) { if !resolver.isStarted { resolver.isStarted = true @@ -40,12 +45,12 @@ func (resolver *Resolver) Start(ctx context.Context) { } } -func (resolver *Resolver) Resolve(name string) string { +func (resolver *Resolver) Resolve(name string) *ResolvedObjectInfo { resolvedName, isFound := resolver.nameMap.Get(name) if !isFound { - return "" + return nil } - return resolvedName.(string) + return resolvedName.(*ResolvedObjectInfo) } func (resolver *Resolver) GetMap() cmap.ConcurrentMap { @@ -71,7 +76,7 @@ func (resolver *Resolver) watchPods(ctx context.Context) error { } if event.Type == watch.Deleted { pod := event.Object.(*corev1.Pod) - resolver.saveResolvedName(pod.Status.PodIP, "", event.Type) + resolver.saveResolvedName(pod.Status.PodIP, "", pod.Namespace, event.Type) } case <-ctx.Done(): watcher.Stop() @@ -106,10 +111,10 @@ func (resolver *Resolver) watchEndpoints(ctx context.Context) error { } if subset.Addresses != nil { for _, address := range subset.Addresses { - resolver.saveResolvedName(address.IP, serviceHostname, event.Type) + resolver.saveResolvedName(address.IP, serviceHostname, endpoint.Namespace, event.Type) for _, port := range ports { ipWithPort := fmt.Sprintf("%s:%d", address.IP, port) - resolver.saveResolvedName(ipWithPort, serviceHostname, event.Type) + resolver.saveResolvedName(ipWithPort, serviceHostname, endpoint.Namespace, event.Type) } } } @@ -139,19 +144,19 @@ func (resolver *Resolver) watchServices(ctx context.Context) error { service := event.Object.(*corev1.Service) serviceHostname := fmt.Sprintf("%s.%s", service.Name, service.Namespace) if service.Spec.ClusterIP != "" && service.Spec.ClusterIP != kubClientNullString { - resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, event.Type) + resolver.saveResolvedName(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type) if service.Spec.Ports != nil { for _, port := range service.Spec.Ports { if port.Port > 0 { - resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, event.Type) + resolver.saveResolvedName(fmt.Sprintf("%s:%d", service.Spec.ClusterIP, port.Port), serviceHostname, service.Namespace, event.Type) } } } - resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, event.Type) + resolver.saveServiceIP(service.Spec.ClusterIP, serviceHostname, service.Namespace, event.Type) } if service.Status.LoadBalancer.Ingress != nil { for _, ingress := range service.Status.LoadBalancer.Ingress { - resolver.saveResolvedName(ingress.IP, serviceHostname, event.Type) + resolver.saveResolvedName(ingress.IP, serviceHostname, service.Namespace, event.Type) } } case <-ctx.Done(): @@ -161,21 +166,22 @@ func (resolver *Resolver) watchServices(ctx context.Context) error { } } -func (resolver *Resolver) saveResolvedName(key string, resolved string, eventType watch.EventType) { +func (resolver *Resolver) saveResolvedName(key string, resolved string, namespace string, eventType watch.EventType) { if eventType == watch.Deleted { resolver.nameMap.Remove(key) logger.Log.Infof("setting %s=nil", key) } else { - resolver.nameMap.Set(key, resolved) + + resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace}) logger.Log.Infof("setting %s=%s", key, resolved) } } -func (resolver *Resolver) saveServiceIP(key string, resolved string, eventType watch.EventType) { +func (resolver *Resolver) saveServiceIP(key string, resolved string, namespace string, eventType watch.EventType) { if eventType == watch.Deleted { resolver.serviceMap.Remove(key) } else { - resolver.serviceMap.Set(key, resolved) + resolver.nameMap.Set(key, &ResolvedObjectInfo{FullAddress: resolved, Namespace: namespace}) } } diff --git a/tap/api/api.go b/tap/api/api.go index 19feed2ce..d1a812e80 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -99,7 +99,7 @@ type Dissector interface { Register(*Extension) Ping() Dissect(b *bufio.Reader, isClient bool, tcpID *TcpID, counterPair *CounterPair, superTimer *SuperTimer, superIdentifier *SuperIdentifier, emitter Emitter, options *TrafficFilteringOptions, reqResMatcher RequestResponseMatcher) error - Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string) *Entry + Analyze(item *OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *Entry Represent(request map[string]interface{}, response map[string]interface{}) (object []byte, bodySize int64, err error) Macros() map[string]string NewResponseRequestMatcher() RequestResponseMatcher @@ -128,6 +128,7 @@ type Entry struct { Protocol Protocol `json:"proto"` Source *TCP `json:"src"` Destination *TCP `json:"dst"` + Namespace string `json:"namespace,omitempty"` Outgoing bool `json:"outgoing"` Timestamp int64 `json:"timestamp"` StartTime time.Time `json:"startTime"` diff --git a/tap/extensions/amqp/main.go b/tap/extensions/amqp/main.go index 581b9b4af..f672dba7b 100644 --- a/tap/extensions/amqp/main.go +++ b/tap/extensions/amqp/main.go @@ -212,7 +212,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } } -func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry { +func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry { request := item.Pair.Request.Payload.(map[string]interface{}) reqDetails := request["details"].(map[string]interface{}) @@ -254,6 +254,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, IP: item.ConnectionInfo.ServerIP, Port: item.ConnectionInfo.ServerPort, }, + Namespace: namespace, Outgoing: item.ConnectionInfo.IsOutgoing, Request: reqDetails, Method: request["method"].(string), diff --git a/tap/extensions/http/main.go b/tap/extensions/http/main.go index 32664cb61..2b3d781f1 100644 --- a/tap/extensions/http/main.go +++ b/tap/extensions/http/main.go @@ -182,7 +182,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co return nil } -func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry { +func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry { var host, authority, path string request := item.Pair.Request.Payload.(map[string]interface{}) @@ -280,6 +280,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, IP: item.ConnectionInfo.ServerIP, Port: item.ConnectionInfo.ServerPort, }, + Namespace: namespace, Outgoing: item.ConnectionInfo.IsOutgoing, Request: reqDetails, Response: resDetails, diff --git a/tap/extensions/http/main_test.go b/tap/extensions/http/main_test.go index f1934a994..90f90ab39 100644 --- a/tap/extensions/http/main_test.go +++ b/tap/extensions/http/main_test.go @@ -205,7 +205,7 @@ func TestAnalyze(t *testing.T) { var entries []*api.Entry for _, item := range items { - entry := dissector.Analyze(item, "", "") + entry := dissector.Analyze(item, "", "", "") entries = append(entries, entry) } diff --git a/tap/extensions/kafka/main.go b/tap/extensions/kafka/main.go index 3fdadd364..0be1ccdbf 100644 --- a/tap/extensions/kafka/main.go +++ b/tap/extensions/kafka/main.go @@ -62,7 +62,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } } -func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry { +func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry { request := item.Pair.Request.Payload.(map[string]interface{}) reqDetails := request["details"].(map[string]interface{}) apiKey := ApiKey(reqDetails["apiKey"].(float64)) @@ -158,6 +158,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, IP: item.ConnectionInfo.ServerIP, Port: item.ConnectionInfo.ServerPort, }, + Namespace: namespace, Outgoing: item.ConnectionInfo.IsOutgoing, Request: reqDetails, Response: item.Pair.Response.Payload.(map[string]interface{})["details"].(map[string]interface{}), diff --git a/tap/extensions/redis/main.go b/tap/extensions/redis/main.go index c27cd431f..db7480a7b 100644 --- a/tap/extensions/redis/main.go +++ b/tap/extensions/redis/main.go @@ -63,7 +63,7 @@ func (d dissecting) Dissect(b *bufio.Reader, isClient bool, tcpID *api.TcpID, co } } -func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string) *api.Entry { +func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, resolvedDestination string, namespace string) *api.Entry { request := item.Pair.Request.Payload.(map[string]interface{}) response := item.Pair.Response.Payload.(map[string]interface{}) reqDetails := request["details"].(map[string]interface{}) @@ -96,6 +96,7 @@ func (d dissecting) Analyze(item *api.OutputChannelItem, resolvedSource string, IP: item.ConnectionInfo.ServerIP, Port: item.ConnectionInfo.ServerPort, }, + Namespace: namespace, Outgoing: item.ConnectionInfo.IsOutgoing, Request: reqDetails, Response: resDetails,