diff --git a/agent/pkg/api/main.go b/agent/pkg/api/main.go index 83e1c98dc..b756bd143 100644 --- a/agent/pkg/api/main.go +++ b/agent/pkg/api/main.go @@ -128,6 +128,11 @@ BasenineReconnect: for item := range outputItems { extension := extensionsMap[item.Protocol.Name] resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo) + + if namespace == "" && item.Namespace != tapApi.UNKNOWN_NAMESPACE { + namespace = item.Namespace + } + mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace) if extension.Protocol.Name == "http" { if !disableOASValidation { diff --git a/shared/kubernetes/utils.go b/shared/kubernetes/utils.go index aece0bc7b..823af34ef 100644 --- a/shared/kubernetes/utils.go +++ b/shared/kubernetes/utils.go @@ -26,7 +26,8 @@ func GetNodeHostToTappedPodsMap(tappedPods []core.Pod) shared.NodeToPodsMap { func getMinimizedPod(fullPod core.Pod) core.Pod { return core.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: fullPod.Name, + Name: fullPod.Name, + Namespace: fullPod.Namespace, }, Status: core.PodStatus{ PodIP: fullPod.Status.PodIP, diff --git a/tap/api/api.go b/tap/api/api.go index 4cc495aa3..a7407e0c6 100644 --- a/tap/api/api.go +++ b/tap/api/api.go @@ -18,6 +18,7 @@ import ( ) const mizuTestEnvVar = "MIZU_TEST" +const UNKNOWN_NAMESPACE = "" var UnknownIp net.IP = net.IP{0, 0, 0, 0} var UnknownPort uint16 = 0 @@ -100,6 +101,7 @@ type OutputChannelItem struct { ConnectionInfo *ConnectionInfo Pair *RequestResponsePair Summary *BaseEntry + Namespace string } type SuperTimer struct { diff --git a/tap/tlstapper/tls_emitter.go b/tap/tlstapper/tls_emitter.go new file mode 100644 index 000000000..51139870e --- /dev/null +++ b/tap/tlstapper/tls_emitter.go @@ -0,0 +1,13 @@ +package tlstapper + +import "github.com/up9inc/mizu/tap/api" + +type tlsEmitter struct { + delegate api.Emitter + namespace string +} + +func (e *tlsEmitter) Emit(item *api.OutputChannelItem) { + item.Namespace = e.namespace + e.delegate.Emit(item) +} diff --git a/tap/tlstapper/tls_poller.go b/tap/tlstapper/tls_poller.go index 50acf40e6..eb4ec115f 100644 --- a/tap/tlstapper/tls_poller.go +++ b/tap/tlstapper/tls_poller.go @@ -5,6 +5,7 @@ import ( "bytes" "fmt" "net" + "sync" "encoding/binary" "encoding/hex" @@ -19,13 +20,14 @@ import ( ) type tlsPoller struct { - tls *TlsTapper - readers map[string]*tlsReader - closedReaders chan string - reqResMatcher api.RequestResponseMatcher - chunksReader *perf.Reader - extension *api.Extension - procfs string + tls *TlsTapper + readers map[string]*tlsReader + closedReaders chan string + reqResMatcher api.RequestResponseMatcher + chunksReader *perf.Reader + extension *api.Extension + procfs string + pidToNamespace sync.Map } func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller { @@ -151,16 +153,21 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k tcpid := p.buildTcpId(chunk, ip, port) - go dissect(extension, reader, chunk.isRequest(), &tcpid, emitter, options, p.reqResMatcher) + tlsEmitter := &tlsEmitter{ + delegate: emitter, + namespace: p.getNamespace(chunk.Pid), + } + + go dissect(extension, reader, chunk.isRequest(), &tcpid, tlsEmitter, options, p.reqResMatcher) return reader } func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid *api.TcpID, - emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) { + tlsEmitter *tlsEmitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) { b := bufio.NewReader(reader) err := extension.Dissector.Dissect(b, reader.progress, api.Ebpf, isRequest, tcpid, &api.CounterPair{}, - &api.SuperTimer{}, &api.SuperIdentifier{}, emitter, options, reqResMatcher) + &api.SuperTimer{}, &api.SuperIdentifier{}, tlsEmitter, options, reqResMatcher) if err != nil { logger.Log.Warningf("Error dissecting TLS %v - %v", tcpid, err) @@ -205,6 +212,33 @@ func (p *tlsPoller) buildTcpId(chunk *tlsChunk, ip net.IP, port uint16) api.TcpI } } +func (p *tlsPoller) addPid(pid uint32, namespace string) { + p.pidToNamespace.Store(pid, namespace) +} + +func (p *tlsPoller) getNamespace(pid uint32) string { + namespaceIfc, ok := p.pidToNamespace.Load(pid) + + if !ok { + return api.UNKNOWN_NAMESPACE + } + + namespace, ok := namespaceIfc.(string) + + if !ok { + return api.UNKNOWN_NAMESPACE + } + + return namespace +} + +func (p *tlsPoller) clearPids() { + p.pidToNamespace.Range(func(key, v interface{}) bool { + p.pidToNamespace.Delete(key) + return true + }) +} + func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) { var flagsStr string diff --git a/tap/tlstapper/tls_process_discoverer.go b/tap/tlstapper/tls_process_discoverer.go index eabcedc92..5322cb9b1 100644 --- a/tap/tlstapper/tls_process_discoverer.go +++ b/tap/tlstapper/tls_process_discoverer.go @@ -24,11 +24,11 @@ func UpdateTapTargets(tls *TlsTapper, pods *[]v1.Pod, procfs string) error { if err != nil { return err } - + tls.ClearPids() - for _, pid := range containerPids { - if err := tls.AddPid(procfs, pid); err != nil { + for pid, pod := range containerPids { + if err := tls.AddPid(procfs, pid, pod.Namespace); err != nil { LogError(err) } } @@ -36,8 +36,8 @@ func UpdateTapTargets(tls *TlsTapper, pods *[]v1.Pod, procfs string) error { return nil } -func findContainerPids(procfs string, containerIds map[string]bool) ([]uint32, error) { - result := make([]uint32, 0) +func findContainerPids(procfs string, containerIds map[string]v1.Pod) (map[uint32]v1.Pod, error) { + result := make(map[uint32]v1.Pod) pids, err := ioutil.ReadDir(procfs) @@ -63,7 +63,9 @@ func findContainerPids(procfs string, containerIds map[string]bool) ([]uint32, e continue } - if _, ok := containerIds[cgroup]; !ok { + pod, ok := containerIds[cgroup] + + if !ok { continue } @@ -73,14 +75,14 @@ func findContainerPids(procfs string, containerIds map[string]bool) ([]uint32, e continue } - result = append(result, uint32(pidNumber)) + result[uint32(pidNumber)] = pod } return result, nil } -func buildContainerIdsMap(pods *[]v1.Pod) map[string]bool { - result := make(map[string]bool) +func buildContainerIdsMap(pods *[]v1.Pod) map[string]v1.Pod { + result := make(map[string]v1.Pod) for _, pod := range *pods { for _, container := range pod.Status.ContainerStatuses { @@ -91,7 +93,7 @@ func buildContainerIdsMap(pods *[]v1.Pod) map[string]bool { continue } - result[url.Host] = true + result[url.Host] = pod } } @@ -141,14 +143,14 @@ func extractCgroup(lines []string) string { // /kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod3beae8e0_164d_4689_a087_efd902d8c2ab.slice/docker-.scope // /kubepods/besteffort/pod7709c1d5-447c-428f-bed9-8ddec35c93f4/ // -// This function extract the out of the cgroup path, the should match +// This function extract the out of the cgroup path, the should match // the "Container ID:" field when running kubectl describe pod // func normalizeCgroup(cgrouppath string) string { basename := strings.TrimSpace(path.Base(cgrouppath)) - + if strings.Contains(basename, "-") { - basename = basename[strings.Index(basename, "-") + 1:] + basename = basename[strings.Index(basename, "-")+1:] } if strings.Contains(basename, ".") { diff --git a/tap/tlstapper/tls_tapper.go b/tap/tlstapper/tls_tapper.go index e3982bd66..efdcbb423 100644 --- a/tap/tlstapper/tls_tapper.go +++ b/tap/tlstapper/tls_tapper.go @@ -1,11 +1,12 @@ package tlstapper import ( + "sync" + "github.com/cilium/ebpf/rlimit" "github.com/go-errors/errors" "github.com/up9inc/mizu/shared/logger" "github.com/up9inc/mizu/tap/api" - "sync" ) const GLOABL_TAP_PID = 0 @@ -58,10 +59,10 @@ func (t *TlsTapper) PollForLogging() { } func (t *TlsTapper) GlobalTap(sslLibrary string) error { - return t.tapPid(GLOABL_TAP_PID, sslLibrary) + return t.tapPid(GLOABL_TAP_PID, sslLibrary, api.UNKNOWN_NAMESPACE) } -func (t *TlsTapper) AddPid(procfs string, pid uint32) error { +func (t *TlsTapper) AddPid(procfs string, pid uint32, namespace string) error { sslLibrary, err := findSsllib(procfs, pid) if err != nil { @@ -69,7 +70,7 @@ func (t *TlsTapper) AddPid(procfs string, pid uint32) error { return nil // hide the error on purpose, its OK for a process to not use libssl.so } - return t.tapPid(pid, sslLibrary) + return t.tapPid(pid, sslLibrary, namespace) } func (t *TlsTapper) RemovePid(pid uint32) error { @@ -85,12 +86,13 @@ func (t *TlsTapper) RemovePid(pid uint32) error { } func (t *TlsTapper) ClearPids() { + t.poller.clearPids() t.registeredPids.Range(func(key, v interface{}) bool { pid := key.(uint32) if pid == GLOABL_TAP_PID { return true } - + if err := t.RemovePid(pid); err != nil { LogError(err) } @@ -133,7 +135,7 @@ func setupRLimit() error { return nil } -func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error { +func (t *TlsTapper) tapPid(pid uint32, sslLibrary string, namespace string) error { logger.Log.Infof("Tapping TLS (pid: %v) (sslLibrary: %v)", pid, sslLibrary) newSsl := sslHooks{} @@ -144,12 +146,14 @@ func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error { t.sslHooksStructs = append(t.sslHooksStructs, newSsl) + t.poller.addPid(pid, namespace) + pids := t.bpfObjects.tlsTapperMaps.PidsMap if err := pids.Put(pid, uint32(1)); err != nil { return errors.Wrap(err, 0) } - + t.registeredPids.Store(pid, true) return nil