Update main.go, socket_server_handlers.go, and 4 more files...

This commit is contained in:
RamiBerm 2021-05-24 17:06:03 +03:00
parent 2f8f253743
commit fbeb7fe9cd
6 changed files with 41 additions and 55 deletions

View File

@ -9,6 +9,7 @@ import (
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"mizuserver/pkg/api" "mizuserver/pkg/api"
"mizuserver/pkg/middleware" "mizuserver/pkg/middleware"
"mizuserver/pkg/models"
"mizuserver/pkg/routes" "mizuserver/pkg/routes"
"mizuserver/pkg/tap" "mizuserver/pkg/tap"
"mizuserver/pkg/utils" "mizuserver/pkg/utils"
@ -107,7 +108,8 @@ func pipeChannelToSocket(connection *websocket.Conn, messageDataChannel <-chan *
} }
for messageData := range messageDataChannel { for messageData := range messageDataChannel {
marshaledData, err := json.Marshal(messageData) websocketMessage := models.CreateWebsocketTappedEntryMessage(messageData)
marshaledData, err := json.Marshal(websocketMessage)
if err != nil { if err != nil {
fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err) fmt.Printf("error converting message to json %s, (%v,%+v)\n", err, err, err)
continue continue

View File

@ -4,9 +4,9 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/antoniodipinto/ikisocket" "github.com/antoniodipinto/ikisocket"
"github.com/mitchellh/mapstructure"
"github.com/up9inc/mizu/shared" "github.com/up9inc/mizu/shared"
"mizuserver/pkg/controllers" "mizuserver/pkg/controllers"
"mizuserver/pkg/models"
"mizuserver/pkg/routes" "mizuserver/pkg/routes"
"mizuserver/pkg/tap" "mizuserver/pkg/tap"
) )
@ -55,39 +55,31 @@ func (h *RoutesEventHandlers) WebSocketError(ep *ikisocket.EventPayload) {
} }
func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) { func (h *RoutesEventHandlers) WebSocketMessage(ep *ikisocket.EventPayload) {
if ep.Kws.GetAttribute("is_tapper") == true && h.SocketHarOutChannel != nil { var socketMessageBase shared.WebSocketMessageMetadata
h.handleTapperMessage(ep) err := json.Unmarshal(ep.Data, &socketMessageBase)
} else {
h.handleControlMessage(ep)
}
}
func (h *RoutesEventHandlers) handleTapperMessage(ep *ikisocket.EventPayload) {
var tapOutput tap.OutputChannelItem
err := json.Unmarshal(ep.Data, &tapOutput)
if err != nil {
fmt.Printf("Could not unmarshal message received from tapper websocket %v", err)
} else {
h.SocketHarOutChannel <- &tapOutput
}
}
func (h *RoutesEventHandlers) handleControlMessage(ep *ikisocket.EventPayload) {
var socketMessage shared.MizuSocketMessage
err := json.Unmarshal(ep.Data, &socketMessage)
if err != nil { if err != nil {
fmt.Printf("Could not unmarshal websocket message %v\n", err) fmt.Printf("Could not unmarshal websocket message %v\n", err)
} else if socketMessage.MessageType == shared.TAPPING_STATUS_MESSAGE_TYPE { } else if socketMessageBase.MessageType == shared.WebSocketMessageTypeTappedEntry {
var tapStatus shared.TapStatus var tappedEntryMessage models.WebSocketTappedEntryMessage
err := mapstructure.Decode(socketMessage.Data, &tapStatus) err := json.Unmarshal(ep.Data, &tappedEntryMessage)
if err != nil { if err != nil {
fmt.Printf("Could not decode map of message type %s %v", shared.TAPPING_STATUS_MESSAGE_TYPE, err) fmt.Printf("Could not unmarshall message of message type %s %v", socketMessageBase.MessageType, err)
} else { } else {
controllers.TapStatus = tapStatus h.SocketHarOutChannel <- tappedEntryMessage.Data
}
} else if socketMessageBase.MessageType == shared.WebSocketMessageTypeUpdateStatus {
var statusMessage shared.WebSocketStatusMessage
err := json.Unmarshal(ep.Data, &statusMessage)
if err != nil {
fmt.Printf("Could not unmarshall message of message type %s %v", socketMessageBase.MessageType, err)
} else {
controllers.TapStatus = statusMessage.TappingStatus
broadcastToBrowserClients(ep.Data)
} }
} }
} }
func removeSocketUUIDFromBrowserSlice(uuidToRemove string) { func removeSocketUUIDFromBrowserSlice(uuidToRemove string) {
newUUIDSlice := make([]string, 0, len(browserClientSocketUUIDs)) newUUIDSlice := make([]string, 0, len(browserClientSocketUUIDs))
for _, uuid := range browserClientSocketUUIDs { for _, uuid := range browserClientSocketUUIDs {

View File

@ -1,6 +1,8 @@
package models package models
import ( import (
"github.com/up9inc/mizu/shared"
"mizuserver/pkg/tap"
"time" "time"
) )
@ -43,40 +45,30 @@ type EntriesFilter struct {
Timestamp int64 `query:"timestamp" validate:"required,min=1"` Timestamp int64 `query:"timestamp" validate:"required,min=1"`
} }
type WebSocketEntryMessage struct {
*shared.WebSocketMessageMetadata
Data *BaseEntryDetails `json:"data,omitempty"`
}
type WebSocketTappedEntryMessage struct {
type WebSocketMessageType string *shared.WebSocketMessageMetadata
const ( Data *tap.OutputChannelItem
WebSocketMessageTypeEntry WebSocketMessageType = "entry" }
// WebSocketMessageTypeUpdateStatus WebSocketMessageType = "status"
)
func CreateBaseEntryWebSocketMessage(base *BaseEntryDetails) *WebSocketEntryMessage { func CreateBaseEntryWebSocketMessage(base *BaseEntryDetails) *WebSocketEntryMessage {
return &WebSocketEntryMessage{ return &WebSocketEntryMessage{
WebSocketMessageMetadata: &WebSocketMessageMetadata{ WebSocketMessageMetadata: &shared.WebSocketMessageMetadata{
MessageType: WebSocketMessageTypeEntry, MessageType: shared.WebSocketMessageTypeEntry,
}, },
Data: base, Data: base,
} }
} }
//func CreateWebSocketStatusMessage() *WebSocketStatusMessage { func CreateWebsocketTappedEntryMessage(base *tap.OutputChannelItem) *WebSocketTappedEntryMessage {
// return &WebSocketEntryMessage{ return &WebSocketTappedEntryMessage{
// WebSocketMessageMetadata: &WebSocketMessageMetadata{ WebSocketMessageMetadata: &shared.WebSocketMessageMetadata{
// MessageType: WebSocketMessageTypeUpdateStatus, MessageType: shared.WebSocketMessageTypeTappedEntry,
// } },
// } Data: base,
//} }
type WebSocketEntryMessage struct {
*WebSocketMessageMetadata
Data *BaseEntryDetails `json:"data,omitempty"`
}
type WebSocketStatusMessage struct {
*WebSocketMessageMetadata
}
type WebSocketMessageMetadata struct {
MessageType WebSocketMessageType `json:"messageType,omitempty"`
} }

View File

@ -93,7 +93,7 @@ func (provider *Provider) CreateMizuAggregatorPod(ctx context.Context, namespace
Name: podName, Name: podName,
Image: podImage, Image: podImage,
ImagePullPolicy: core.PullAlways, ImagePullPolicy: core.PullAlways,
//Command: []string {"./mizuagent", "--aggregator"}, Command: []string {"./mizuagent", "--aggregator"},
Env: []core.EnvVar{ Env: []core.EnvVar{
{ {
Name: "HOST_MODE", Name: "HOST_MODE",

View File

View File