mirror of
https://github.com/kubeshark/kubeshark.git
synced 2025-06-23 06:48:47 +00:00
WIP
This commit is contained in:
parent
c1409251e2
commit
2f8f253743
@ -107,11 +107,7 @@ func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *
|
|||||||
}
|
}
|
||||||
|
|
||||||
for messageData := range messageDataChannel {
|
for messageData := range messageDataChannel {
|
||||||
socketMessage := shared.MizuSocketMessage{
|
marshaledData, err := json.Marshal(messageData)
|
||||||
MessageType: shared.TAPPED_MESSAGE_TYPE,
|
|
||||||
Data: messageData,
|
|
||||||
}
|
|
||||||
marshaledData, err := json.Marshal(socketMessage)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err)
|
fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err)
|
||||||
continue
|
continue
|
||||||
|
@ -55,18 +55,28 @@ func (h *RoutesEventHandlers) WebSocketError(ep *ikisocket.EventPayload) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) {
|
func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) {
|
||||||
|
if ep.Kws.GetAttribute("is_tapper") == true && h.SocketHarOutChannel != nil {
|
||||||
|
h.handleTapperMessage(ep)
|
||||||
|
} else {
|
||||||
|
h.handleControlMessage(ep)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *RoutesEventHandlers) handleTapperMessage(ep *ikisocket.EventPayload) {
|
||||||
|
var tapOutput tap.OutputChannelItem
|
||||||
|
err := json.Unmarshal(ep.Data, &tapOutput)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("Could not unmarshal message received from tapper websocket %v", err)
|
||||||
|
} else {
|
||||||
|
h.SocketHarOutChannel <- &tapOutput
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *RoutesEventHandlers) handleControlMessage(ep *ikisocket.EventPayload) {
|
||||||
var socketMessage shared.MizuSocketMessage
|
var socketMessage shared.MizuSocketMessage
|
||||||
err := json.Unmarshal(ep.Data, &socketMessage)
|
err := json.Unmarshal(ep.Data, &socketMessage)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("Could not unmarshal websocket message %v\n", err)
|
fmt.Printf("Could not unmarshal websocket message %v\n", err)
|
||||||
} else if socketMessage.MessageType == shared.TAPPED_MESSAGE_TYPE {
|
|
||||||
var tapOutput tap.OutputChannelItem
|
|
||||||
err := mapstructure.Decode(socketMessage.Data, &tapOutput)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("Could not decode map of message type %s %v", shared.TAPPED_MESSAGE_TYPE, err)
|
|
||||||
} else {
|
|
||||||
h.SocketHarOutChannel <- &tapOutput
|
|
||||||
}
|
|
||||||
} else if socketMessage.MessageType == shared.TAPPING_STATUS_MESSAGE_TYPE {
|
} else if socketMessage.MessageType == shared.TAPPING_STATUS_MESSAGE_TYPE {
|
||||||
var tapStatus shared.TapStatus
|
var tapStatus shared.TapStatus
|
||||||
err := mapstructure.Decode(socketMessage.Data, &tapStatus)
|
err := mapstructure.Decode(socketMessage.Data, &tapStatus)
|
||||||
|
@ -93,7 +93,7 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
|
|||||||
Name: podName,
|
Name: podName,
|
||||||
Image: podImage,
|
Image: podImage,
|
||||||
ImagePullPolicy: core.PullAlways,
|
ImagePullPolicy: core.PullAlways,
|
||||||
Command: []string {"./mizuagent", "--aggregator"},
|
//Command: []string {"./mizuagent", "--aggregator"},
|
||||||
Env: []core.EnvVar{
|
Env: []core.EnvVar{
|
||||||
{
|
{
|
||||||
Name: "HOST_MODE",
|
Name: "HOST_MODE",
|
||||||
|
@ -4,7 +4,6 @@ type ControlSocketMessageType string
|
|||||||
|
|
||||||
const (
|
const (
|
||||||
TAPPING_STATUS_MESSAGE_TYPE ControlSocketMessageType = "tappingStatus"
|
TAPPING_STATUS_MESSAGE_TYPE ControlSocketMessageType = "tappingStatus"
|
||||||
TAPPED_MESSAGE_TYPE ControlSocketMessageType = "tappedMessage"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type MizuSocketMessage struct {
|
type MizuSocketMessage struct {
|
||||||
|
Loading…
Reference in New Issue
Block a user