mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-08-01 08:50:27 +00:00
Add kube namespace to tls (TRA-4443) (#1013)
* add namespace to tls - initial commit * add tls namespace to mizu entry
This commit is contained in:
parent
189c158150
commit
1213162b85
@ -128,6 +128,11 @@ BasenineReconnect:
|
||||
for item := range outputItems {
|
||||
extension := extensionsMap[item.Protocol.Name]
|
||||
resolvedSource, resolvedDestionation, namespace := resolveIP(item.ConnectionInfo)
|
||||
|
||||
if namespace == "" && item.Namespace != tapApi.UNKNOWN_NAMESPACE {
|
||||
namespace = item.Namespace
|
||||
}
|
||||
|
||||
mizuEntry := extension.Dissector.Analyze(item, resolvedSource, resolvedDestionation, namespace)
|
||||
if extension.Protocol.Name == "http" {
|
||||
if !disableOASValidation {
|
||||
|
@ -26,7 +26,8 @@ func GetNodeHostToTappedPodsMap(tappedPods []core.Pod) shared.NodeToPodsMap {
|
||||
func getMinimizedPod(fullPod core.Pod) core.Pod {
|
||||
return core.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fullPod.Name,
|
||||
Name: fullPod.Name,
|
||||
Namespace: fullPod.Namespace,
|
||||
},
|
||||
Status: core.PodStatus{
|
||||
PodIP: fullPod.Status.PodIP,
|
||||
|
@ -18,6 +18,7 @@ import (
|
||||
)
|
||||
|
||||
const mizuTestEnvVar = "MIZU_TEST"
|
||||
const UNKNOWN_NAMESPACE = ""
|
||||
|
||||
var UnknownIp net.IP = net.IP{0, 0, 0, 0}
|
||||
var UnknownPort uint16 = 0
|
||||
@ -100,6 +101,7 @@ type OutputChannelItem struct {
|
||||
ConnectionInfo *ConnectionInfo
|
||||
Pair *RequestResponsePair
|
||||
Summary *BaseEntry
|
||||
Namespace string
|
||||
}
|
||||
|
||||
type SuperTimer struct {
|
||||
|
13
tap/tlstapper/tls_emitter.go
Normal file
13
tap/tlstapper/tls_emitter.go
Normal file
@ -0,0 +1,13 @@
|
||||
package tlstapper
|
||||
|
||||
import "github.com/up9inc/mizu/tap/api"
|
||||
|
||||
type tlsEmitter struct {
|
||||
delegate api.Emitter
|
||||
namespace string
|
||||
}
|
||||
|
||||
func (e *tlsEmitter) Emit(item *api.OutputChannelItem) {
|
||||
item.Namespace = e.namespace
|
||||
e.delegate.Emit(item)
|
||||
}
|
@ -5,6 +5,7 @@ import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"net"
|
||||
"sync"
|
||||
|
||||
"encoding/binary"
|
||||
"encoding/hex"
|
||||
@ -19,13 +20,14 @@ import (
|
||||
)
|
||||
|
||||
type tlsPoller struct {
|
||||
tls *TlsTapper
|
||||
readers map[string]*tlsReader
|
||||
closedReaders chan string
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
chunksReader *perf.Reader
|
||||
extension *api.Extension
|
||||
procfs string
|
||||
tls *TlsTapper
|
||||
readers map[string]*tlsReader
|
||||
closedReaders chan string
|
||||
reqResMatcher api.RequestResponseMatcher
|
||||
chunksReader *perf.Reader
|
||||
extension *api.Extension
|
||||
procfs string
|
||||
pidToNamespace sync.Map
|
||||
}
|
||||
|
||||
func newTlsPoller(tls *TlsTapper, extension *api.Extension, procfs string) *tlsPoller {
|
||||
@ -151,16 +153,21 @@ func (p *tlsPoller) startNewTlsReader(chunk *tlsChunk, ip net.IP, port uint16, k
|
||||
|
||||
tcpid := p.buildTcpId(chunk, ip, port)
|
||||
|
||||
go dissect(extension, reader, chunk.isRequest(), &tcpid, emitter, options, p.reqResMatcher)
|
||||
tlsEmitter := &tlsEmitter{
|
||||
delegate: emitter,
|
||||
namespace: p.getNamespace(chunk.Pid),
|
||||
}
|
||||
|
||||
go dissect(extension, reader, chunk.isRequest(), &tcpid, tlsEmitter, options, p.reqResMatcher)
|
||||
return reader
|
||||
}
|
||||
|
||||
func dissect(extension *api.Extension, reader *tlsReader, isRequest bool, tcpid *api.TcpID,
|
||||
emitter api.Emitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) {
|
||||
tlsEmitter *tlsEmitter, options *api.TrafficFilteringOptions, reqResMatcher api.RequestResponseMatcher) {
|
||||
b := bufio.NewReader(reader)
|
||||
|
||||
err := extension.Dissector.Dissect(b, reader.progress, api.Ebpf, isRequest, tcpid, &api.CounterPair{},
|
||||
&api.SuperTimer{}, &api.SuperIdentifier{}, emitter, options, reqResMatcher)
|
||||
&api.SuperTimer{}, &api.SuperIdentifier{}, tlsEmitter, options, reqResMatcher)
|
||||
|
||||
if err != nil {
|
||||
logger.Log.Warningf("Error dissecting TLS %v - %v", tcpid, err)
|
||||
@ -205,6 +212,33 @@ func (p *tlsPoller) buildTcpId(chunk *tlsChunk, ip net.IP, port uint16) api.TcpI
|
||||
}
|
||||
}
|
||||
|
||||
func (p *tlsPoller) addPid(pid uint32, namespace string) {
|
||||
p.pidToNamespace.Store(pid, namespace)
|
||||
}
|
||||
|
||||
func (p *tlsPoller) getNamespace(pid uint32) string {
|
||||
namespaceIfc, ok := p.pidToNamespace.Load(pid)
|
||||
|
||||
if !ok {
|
||||
return api.UNKNOWN_NAMESPACE
|
||||
}
|
||||
|
||||
namespace, ok := namespaceIfc.(string)
|
||||
|
||||
if !ok {
|
||||
return api.UNKNOWN_NAMESPACE
|
||||
}
|
||||
|
||||
return namespace
|
||||
}
|
||||
|
||||
func (p *tlsPoller) clearPids() {
|
||||
p.pidToNamespace.Range(func(key, v interface{}) bool {
|
||||
p.pidToNamespace.Delete(key)
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (p *tlsPoller) logTls(chunk *tlsChunk, ip net.IP, port uint16) {
|
||||
var flagsStr string
|
||||
|
||||
|
@ -24,11 +24,11 @@ func UpdateTapTargets(tls *TlsTapper, pods *[]v1.Pod, procfs string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
tls.ClearPids()
|
||||
|
||||
for _, pid := range containerPids {
|
||||
if err := tls.AddPid(procfs, pid); err != nil {
|
||||
for pid, pod := range containerPids {
|
||||
if err := tls.AddPid(procfs, pid, pod.Namespace); err != nil {
|
||||
LogError(err)
|
||||
}
|
||||
}
|
||||
@ -36,8 +36,8 @@ func UpdateTapTargets(tls *TlsTapper, pods *[]v1.Pod, procfs string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func findContainerPids(procfs string, containerIds map[string]bool) ([]uint32, error) {
|
||||
result := make([]uint32, 0)
|
||||
func findContainerPids(procfs string, containerIds map[string]v1.Pod) (map[uint32]v1.Pod, error) {
|
||||
result := make(map[uint32]v1.Pod)
|
||||
|
||||
pids, err := ioutil.ReadDir(procfs)
|
||||
|
||||
@ -63,7 +63,9 @@ func findContainerPids(procfs string, containerIds map[string]bool) ([]uint32, e
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := containerIds[cgroup]; !ok {
|
||||
pod, ok := containerIds[cgroup]
|
||||
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
@ -73,14 +75,14 @@ func findContainerPids(procfs string, containerIds map[string]bool) ([]uint32, e
|
||||
continue
|
||||
}
|
||||
|
||||
result = append(result, uint32(pidNumber))
|
||||
result[uint32(pidNumber)] = pod
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func buildContainerIdsMap(pods *[]v1.Pod) map[string]bool {
|
||||
result := make(map[string]bool)
|
||||
func buildContainerIdsMap(pods *[]v1.Pod) map[string]v1.Pod {
|
||||
result := make(map[string]v1.Pod)
|
||||
|
||||
for _, pod := range *pods {
|
||||
for _, container := range pod.Status.ContainerStatuses {
|
||||
@ -91,7 +93,7 @@ func buildContainerIdsMap(pods *[]v1.Pod) map[string]bool {
|
||||
continue
|
||||
}
|
||||
|
||||
result[url.Host] = true
|
||||
result[url.Host] = pod
|
||||
}
|
||||
}
|
||||
|
||||
@ -141,14 +143,14 @@ func extractCgroup(lines []string) string {
|
||||
// /kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod3beae8e0_164d_4689_a087_efd902d8c2ab.slice/docker-<ID>.scope
|
||||
// /kubepods/besteffort/pod7709c1d5-447c-428f-bed9-8ddec35c93f4/<ID>
|
||||
//
|
||||
// This function extract the <ID> out of the cgroup path, the <ID> should match
|
||||
// This function extract the <ID> out of the cgroup path, the <ID> should match
|
||||
// the "Container ID:" field when running kubectl describe pod <POD>
|
||||
//
|
||||
func normalizeCgroup(cgrouppath string) string {
|
||||
basename := strings.TrimSpace(path.Base(cgrouppath))
|
||||
|
||||
|
||||
if strings.Contains(basename, "-") {
|
||||
basename = basename[strings.Index(basename, "-") + 1:]
|
||||
basename = basename[strings.Index(basename, "-")+1:]
|
||||
}
|
||||
|
||||
if strings.Contains(basename, ".") {
|
||||
|
@ -1,11 +1,12 @@
|
||||
package tlstapper
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/cilium/ebpf/rlimit"
|
||||
"github.com/go-errors/errors"
|
||||
"github.com/up9inc/mizu/shared/logger"
|
||||
"github.com/up9inc/mizu/tap/api"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const GLOABL_TAP_PID = 0
|
||||
@ -58,10 +59,10 @@ func (t *TlsTapper) PollForLogging() {
|
||||
}
|
||||
|
||||
func (t *TlsTapper) GlobalTap(sslLibrary string) error {
|
||||
return t.tapPid(GLOABL_TAP_PID, sslLibrary)
|
||||
return t.tapPid(GLOABL_TAP_PID, sslLibrary, api.UNKNOWN_NAMESPACE)
|
||||
}
|
||||
|
||||
func (t *TlsTapper) AddPid(procfs string, pid uint32) error {
|
||||
func (t *TlsTapper) AddPid(procfs string, pid uint32, namespace string) error {
|
||||
sslLibrary, err := findSsllib(procfs, pid)
|
||||
|
||||
if err != nil {
|
||||
@ -69,7 +70,7 @@ func (t *TlsTapper) AddPid(procfs string, pid uint32) error {
|
||||
return nil // hide the error on purpose, its OK for a process to not use libssl.so
|
||||
}
|
||||
|
||||
return t.tapPid(pid, sslLibrary)
|
||||
return t.tapPid(pid, sslLibrary, namespace)
|
||||
}
|
||||
|
||||
func (t *TlsTapper) RemovePid(pid uint32) error {
|
||||
@ -85,12 +86,13 @@ func (t *TlsTapper) RemovePid(pid uint32) error {
|
||||
}
|
||||
|
||||
func (t *TlsTapper) ClearPids() {
|
||||
t.poller.clearPids()
|
||||
t.registeredPids.Range(func(key, v interface{}) bool {
|
||||
pid := key.(uint32)
|
||||
if pid == GLOABL_TAP_PID {
|
||||
return true
|
||||
}
|
||||
|
||||
|
||||
if err := t.RemovePid(pid); err != nil {
|
||||
LogError(err)
|
||||
}
|
||||
@ -133,7 +135,7 @@ func setupRLimit() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error {
|
||||
func (t *TlsTapper) tapPid(pid uint32, sslLibrary string, namespace string) error {
|
||||
logger.Log.Infof("Tapping TLS (pid: %v) (sslLibrary: %v)", pid, sslLibrary)
|
||||
|
||||
newSsl := sslHooks{}
|
||||
@ -144,12 +146,14 @@ func (t *TlsTapper) tapPid(pid uint32, sslLibrary string) error {
|
||||
|
||||
t.sslHooksStructs = append(t.sslHooksStructs, newSsl)
|
||||
|
||||
t.poller.addPid(pid, namespace)
|
||||
|
||||
pids := t.bpfObjects.tlsTapperMaps.PidsMap
|
||||
|
||||
if err := pids.Put(pid, uint32(1)); err != nil {
|
||||
return errors.Wrap(err, 0)
|
||||
}
|
||||
|
||||
|
||||
t.registeredPids.Store(pid, true)
|
||||
|
||||
return nil
|
||||
|
Loading…
Reference in New Issue
Block a user