mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-07-05 20:38:50 +00:00
* Make `logger` a separate module such that don't depend on `shared` module as a whole for logging * Update `Dockerfile`
307 lines
6.1 KiB
Go
307 lines
6.1 KiB
Go
package servicemap
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"github.com/jinzhu/copier"
|
|
|
|
"github.com/up9inc/mizu/logger"
|
|
tapApi "github.com/up9inc/mizu/tap/api"
|
|
)
|
|
|
|
const (
|
|
ServiceMapEnabled = "enabled"
|
|
ServiceMapDisabled = "disabled"
|
|
UnresolvedNodeName = "unresolved"
|
|
)
|
|
|
|
var instance *defaultServiceMap
|
|
var once sync.Once
|
|
|
|
func GetDefaultServiceMapInstance() *defaultServiceMap {
|
|
once.Do(func() {
|
|
instance = NewDefaultServiceMapGenerator()
|
|
logger.Log.Debug("Service Map Initialized")
|
|
})
|
|
return instance
|
|
}
|
|
|
|
type defaultServiceMap struct {
|
|
enabled bool
|
|
graph *graph
|
|
entriesProcessed int
|
|
}
|
|
|
|
type ServiceMapSink interface {
|
|
NewTCPEntry(source *tapApi.TCP, destination *tapApi.TCP, protocol *tapApi.Protocol)
|
|
}
|
|
|
|
type ServiceMap interface {
|
|
Enable()
|
|
Disable()
|
|
IsEnabled() bool
|
|
GetStatus() ServiceMapStatus
|
|
GetNodes() []ServiceMapNode
|
|
GetEdges() []ServiceMapEdge
|
|
GetEntriesProcessedCount() int
|
|
GetNodesCount() int
|
|
GetEdgesCount() int
|
|
Reset()
|
|
}
|
|
|
|
func NewDefaultServiceMapGenerator() *defaultServiceMap {
|
|
return &defaultServiceMap{
|
|
enabled: false,
|
|
entriesProcessed: 0,
|
|
graph: newDirectedGraph(),
|
|
}
|
|
}
|
|
|
|
type key string
|
|
|
|
type entryData struct {
|
|
key key
|
|
entry *tapApi.TCP
|
|
}
|
|
|
|
type nodeData struct {
|
|
id int
|
|
entry *tapApi.TCP
|
|
count int
|
|
}
|
|
|
|
type edgeProtocol struct {
|
|
protocol *tapApi.Protocol
|
|
count int
|
|
}
|
|
|
|
type edgeData struct {
|
|
data map[key]*edgeProtocol
|
|
}
|
|
|
|
type graph struct {
|
|
Nodes map[key]*nodeData
|
|
Edges map[key]map[key]*edgeData
|
|
}
|
|
|
|
func newDirectedGraph() *graph {
|
|
return &graph{
|
|
Nodes: make(map[key]*nodeData),
|
|
Edges: make(map[key]map[key]*edgeData),
|
|
}
|
|
}
|
|
|
|
func newNodeData(id int, e *tapApi.TCP) *nodeData {
|
|
return &nodeData{
|
|
id: id,
|
|
entry: e,
|
|
count: 1,
|
|
}
|
|
}
|
|
|
|
func newEdgeData(p *tapApi.Protocol) *edgeData {
|
|
return &edgeData{
|
|
data: map[key]*edgeProtocol{
|
|
key(p.Name): {
|
|
protocol: p,
|
|
count: 1,
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
func (s *defaultServiceMap) nodeExists(k key) (*nodeData, bool) {
|
|
n, ok := s.graph.Nodes[k]
|
|
return n, ok
|
|
}
|
|
|
|
func (s *defaultServiceMap) addNode(k key, e *tapApi.TCP) (*nodeData, bool) {
|
|
nd, exists := s.nodeExists(k)
|
|
if !exists {
|
|
s.graph.Nodes[k] = newNodeData(len(s.graph.Nodes)+1, e)
|
|
return s.graph.Nodes[k], true
|
|
}
|
|
return nd, false
|
|
}
|
|
|
|
func (s *defaultServiceMap) addEdge(u, v *entryData, p *tapApi.Protocol) {
|
|
if n, ok := s.addNode(u.key, u.entry); !ok {
|
|
n.count++
|
|
}
|
|
if n, ok := s.addNode(v.key, v.entry); !ok {
|
|
n.count++
|
|
}
|
|
|
|
if _, ok := s.graph.Edges[u.key]; !ok {
|
|
s.graph.Edges[u.key] = make(map[key]*edgeData)
|
|
}
|
|
|
|
// new edge u -> v pair
|
|
// protocol is the same for u and v
|
|
if e, ok := s.graph.Edges[u.key][v.key]; ok {
|
|
// edge data already exists for u -> v pair
|
|
// we have a new protocol for this u -> v pair
|
|
|
|
k := key(p.Name)
|
|
if pd, pOk := e.data[k]; pOk {
|
|
// protocol key already exists, just increment the count
|
|
pd.count++
|
|
} else {
|
|
// new protocol key
|
|
e.data[k] = &edgeProtocol{
|
|
protocol: p,
|
|
count: 1,
|
|
}
|
|
}
|
|
} else {
|
|
// new edge data for u -> v pair
|
|
s.graph.Edges[u.key][v.key] = newEdgeData(p)
|
|
}
|
|
|
|
s.entriesProcessed++
|
|
}
|
|
|
|
func (s *defaultServiceMap) Enable() {
|
|
s.enabled = true
|
|
}
|
|
|
|
func (s *defaultServiceMap) Disable() {
|
|
s.Reset()
|
|
s.enabled = false
|
|
}
|
|
|
|
func (s *defaultServiceMap) IsEnabled() bool {
|
|
return s.enabled
|
|
}
|
|
|
|
func (s *defaultServiceMap) NewTCPEntry(src *tapApi.TCP, dst *tapApi.TCP, p *tapApi.Protocol) {
|
|
if !s.IsEnabled() {
|
|
return
|
|
}
|
|
|
|
var srcEntry *entryData
|
|
var dstEntry *entryData
|
|
|
|
if len(src.Name) == 0 {
|
|
srcEntry = &entryData{
|
|
key: key(src.IP),
|
|
entry: &tapApi.TCP{},
|
|
}
|
|
if err := copier.Copy(srcEntry.entry, src); err != nil {
|
|
logger.Log.Errorf("Error while copying src entry into src entry data")
|
|
}
|
|
|
|
srcEntry.entry.Name = UnresolvedNodeName
|
|
} else {
|
|
srcEntry = &entryData{
|
|
key: key(src.Name),
|
|
entry: src,
|
|
}
|
|
}
|
|
|
|
if len(dst.Name) == 0 {
|
|
dstEntry = &entryData{
|
|
key: key(dst.IP),
|
|
entry: &tapApi.TCP{},
|
|
}
|
|
if err := copier.Copy(dstEntry.entry, dst); err != nil {
|
|
logger.Log.Errorf("Error while copying dst entry into dst entry data")
|
|
}
|
|
|
|
dstEntry.entry.Name = UnresolvedNodeName
|
|
} else {
|
|
dstEntry = &entryData{
|
|
key: key(dst.Name),
|
|
entry: dst,
|
|
}
|
|
}
|
|
|
|
s.addEdge(srcEntry, dstEntry, p)
|
|
}
|
|
|
|
func (s *defaultServiceMap) GetStatus() ServiceMapStatus {
|
|
status := ServiceMapDisabled
|
|
if s.IsEnabled() {
|
|
status = ServiceMapEnabled
|
|
}
|
|
|
|
return ServiceMapStatus{
|
|
Status: status,
|
|
EntriesProcessedCount: s.entriesProcessed,
|
|
NodeCount: s.GetNodesCount(),
|
|
EdgeCount: s.GetEdgesCount(),
|
|
}
|
|
}
|
|
|
|
func (s *defaultServiceMap) GetNodes() []ServiceMapNode {
|
|
nodes := []ServiceMapNode{}
|
|
|
|
for i, n := range s.graph.Nodes {
|
|
nodes = append(nodes, ServiceMapNode{
|
|
Id: n.id,
|
|
Name: string(i),
|
|
Resolved: n.entry.Name != UnresolvedNodeName,
|
|
Entry: n.entry,
|
|
Count: n.count,
|
|
})
|
|
}
|
|
|
|
return nodes
|
|
}
|
|
|
|
func (s *defaultServiceMap) GetEdges() []ServiceMapEdge {
|
|
edges := []ServiceMapEdge{}
|
|
|
|
for u, m := range s.graph.Edges {
|
|
for v := range m {
|
|
for _, p := range s.graph.Edges[u][v].data {
|
|
edges = append(edges, ServiceMapEdge{
|
|
Source: ServiceMapNode{
|
|
Id: s.graph.Nodes[u].id,
|
|
Name: string(u),
|
|
Entry: s.graph.Nodes[u].entry,
|
|
Resolved: s.graph.Nodes[u].entry.Name != UnresolvedNodeName,
|
|
Count: s.graph.Nodes[u].count,
|
|
},
|
|
Destination: ServiceMapNode{
|
|
Id: s.graph.Nodes[v].id,
|
|
Name: string(v),
|
|
Entry: s.graph.Nodes[v].entry,
|
|
Resolved: s.graph.Nodes[v].entry.Name != UnresolvedNodeName,
|
|
Count: s.graph.Nodes[v].count,
|
|
},
|
|
Count: p.count,
|
|
Protocol: p.protocol,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
return edges
|
|
}
|
|
|
|
func (s *defaultServiceMap) GetEntriesProcessedCount() int {
|
|
return s.entriesProcessed
|
|
}
|
|
|
|
func (s *defaultServiceMap) GetNodesCount() int {
|
|
return len(s.graph.Nodes)
|
|
}
|
|
|
|
func (s *defaultServiceMap) GetEdgesCount() int {
|
|
var count int
|
|
for u, m := range s.graph.Edges {
|
|
for v := range m {
|
|
for range s.graph.Edges[u][v].data {
|
|
count++
|
|
}
|
|
}
|
|
}
|
|
return count
|
|
}
|
|
|
|
func (s *defaultServiceMap) Reset() {
|
|
s.entriesProcessed = 0
|
|
s.graph = newDirectedGraph()
|
|
}
|