diff --git a/api/main.go b/api/main.go index 7c818de5e..ed50ffaa8 100644 --- a/api/main.go +++ b/api/main.go @@ -107,11 +107,7 @@ func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan * } for messageData := range messageDataChannel { - socketMessage := shared.MizuSocketMessage{ - MessageType: shared.TAPPED_MESSAGE_TYPE, - Data: messageData, - } - marshaledData, err := json.Marshal(socketMessage) + marshaledData, err := json.Marshal(messageData) if err != nil { fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err) continue diff --git a/api/pkg/api/socket_server_handlers.go b/api/pkg/api/socket_server_handlers.go index 29807ace3..182a05038 100644 --- a/api/pkg/api/socket_server_handlers.go +++ b/api/pkg/api/socket_server_handlers.go @@ -55,18 +55,28 @@ func (h *RoutesEventHandlers) WebSocketError(ep *ikisocket.EventPayload) { } func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) { + if ep.Kws.GetAttribute("is_tapper") == true && h.SocketHarOutChannel != nil { + h.handleTapperMessage(ep) + } else { + h.handleControlMessage(ep) + } +} + +func (h *RoutesEventHandlers) handleTapperMessage(ep *ikisocket.EventPayload) { + var tapOutput tap.OutputChannelItem + err := json.Unmarshal(ep.Data, &tapOutput) + if err != nil { + fmt.Printf("Could not unmarshal message received from tapper websocket %v", err) + } else { + h.SocketHarOutChannel <- &tapOutput + } +} + +func (h *RoutesEventHandlers) handleControlMessage(ep *ikisocket.EventPayload) { var socketMessage shared.MizuSocketMessage err := json.Unmarshal(ep.Data, &socketMessage) if err != nil { fmt.Printf("Could not unmarshal websocket message %v\n", err) - } else if socketMessage.MessageType == shared.TAPPED_MESSAGE_TYPE { - var tapOutput tap.OutputChannelItem - err := mapstructure.Decode(socketMessage.Data, &tapOutput) - if err != nil { - fmt.Printf("Could not decode map of message type %s %v", shared.TAPPED_MESSAGE_TYPE, err) - } else { - h.SocketHarOutChannel <- &tapOutput - } } else if socketMessage.MessageType == shared.TAPPING_STATUS_MESSAGE_TYPE { var tapStatus shared.TapStatus err := mapstructure.Decode(socketMessage.Data, &tapStatus) diff --git a/cli/kubernetes/provider.go b/cli/kubernetes/provider.go index 62b39a1d5..0d4f09981 100644 --- a/cli/kubernetes/provider.go +++ b/cli/kubernetes/provider.go @@ -93,7 +93,7 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace Name: podName, Image: podImage, ImagePullPolicy: core.PullAlways, - Command: []string {"./mizuagent", "--aggregator"}, + //Command: []string {"./mizuagent", "--aggregator"}, Env: []core.EnvVar{ { Name: "HOST_MODE", diff --git a/shared/models.go b/shared/models.go index a02b5ef84..69f1bade6 100644 --- a/shared/models.go +++ b/shared/models.go @@ -4,7 +4,6 @@ type ControlSocketMessageType string const ( TAPPING_STATUS_MESSAGE_TYPE ControlSocketMessageType = "tappingStatus" - TAPPED_MESSAGE_TYPE ControlSocketMessageType = "tappedMessage" ) type MizuSocketMessage struct {