Tapped pods report via endpoint instead of web socket (#164)

This commit is contained in:
Igor Gov
2021-08-04 10:41:33 +03:00
committed by GitHub
parent f9202900ee
commit d18f1f8316
8 changed files with 92 additions and 46 deletions

View File

@@ -88,9 +88,9 @@ func startReadingFiles(workingDir string) {
for _, entry := range inputHar.Log.Entries {
time.Sleep(time.Millisecond * 250)
connectionInfo := &tap.ConnectionInfo{
ClientIP: fileInfo.Name(),
ClientIP: fileInfo.Name(),
ClientPort: "",
ServerIP: "",
ServerIP: "",
ServerPort: "",
IsOutgoing: false,
}
@@ -118,7 +118,6 @@ func StartReadingOutbound(outboundLinkChannel <-chan *tap.OutboundLink) {
}
}
func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
entryBytes, _ := json.Marshal(entry)
serviceName, urlPath := getServiceNameFromUrl(entry.Request.URL)
@@ -168,7 +167,7 @@ func saveHarToDb(entry *har.Entry, connectionInfo *tap.ConnectionInfo) {
return
}
baseEntryBytes, _ := models.CreateBaseEntryWebSocketMessage(&baseEntry)
broadcastToBrowserClients(baseEntryBytes)
BroadcastToBrowserClients(baseEntryBytes)
}
func getServiceNameFromUrl(inputUrl string) (string, string) {
@@ -196,6 +195,5 @@ func getEstimatedEntrySizeBytes(mizuEntry models.MizuEntry) int {
sizeBytes += 8 // SizeBytes bytes
sizeBytes += 1 // IsOutgoing bytes
return sizeBytes
}

View File

@@ -0,0 +1,118 @@
package api
import (
"errors"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/up9inc/mizu/shared/debounce"
"net/http"
"sync"
"time"
)
type EventHandlers interface {
WebSocketConnect(socketId int, isTapper bool)
WebSocketDisconnect(socketId int, isTapper bool)
WebSocketMessage(socketId int, message []byte)
}
type SocketConnection struct {
connection *websocket.Conn
lock *sync.Mutex
eventHandlers EventHandlers
isTapper bool
}
var websocketUpgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
}
var websocketIdsLock = sync.Mutex{}
var connectedWebsockets map[int]*SocketConnection
var connectedWebsocketIdCounter = 0
func init() {
websocketUpgrader.CheckOrigin = func(r *http.Request) bool { return true } // like cors for web socket
connectedWebsockets = make(map[int]*SocketConnection, 0)
}
func WebSocketRoutes(app *gin.Engine, eventHandlers EventHandlers) {
app.GET("/ws", func(c *gin.Context) {
websocketHandler(c.Writer, c.Request, eventHandlers, false)
})
app.GET("/wsTapper", func(c *gin.Context) {
websocketHandler(c.Writer, c.Request, eventHandlers, true)
})
}
func websocketHandler(w http.ResponseWriter, r *http.Request, eventHandlers EventHandlers, isTapper bool) {
conn, err := websocketUpgrader.Upgrade(w, r, nil)
if err != nil {
fmt.Println("Failed to set websocket upgrade: %+v", err)
return
}
websocketIdsLock.Lock()
connectedWebsocketIdCounter++
socketId := connectedWebsocketIdCounter
connectedWebsockets[socketId] = &SocketConnection{connection: conn, lock: &sync.Mutex{}, eventHandlers: eventHandlers, isTapper: isTapper}
websocketIdsLock.Unlock()
defer func() {
socketCleanup(socketId, connectedWebsockets[socketId])
}()
eventHandlers.WebSocketConnect(socketId, isTapper)
for {
_, msg, err := conn.ReadMessage()
if err != nil {
fmt.Printf("Conn err: %v\n", err)
break
}
eventHandlers.WebSocketMessage(socketId, msg)
}
}
func socketCleanup(socketId int, socketConnection *SocketConnection) {
err := socketConnection.connection.Close()
if err != nil {
fmt.Printf("Error closing socket connection for socket id %d: %v\n", socketId, err)
}
websocketIdsLock.Lock()
connectedWebsockets[socketId] = nil
websocketIdsLock.Unlock()
socketConnection.eventHandlers.WebSocketDisconnect(socketId, socketConnection.isTapper)
}
var db = debounce.NewDebouncer(time.Second*5, func() {
fmt.Println("Successfully sent to socket")
})
func SendToSocket(socketId int, message []byte) error {
socketObj := connectedWebsockets[socketId]
if socketObj == nil {
return errors.New("Socket is disconnected")
}
var sent = false
time.AfterFunc(time.Second*5, func() {
if !sent {
fmt.Println("Socket timed out")
socketCleanup(socketId, socketObj)
}
})
socketObj.lock.Lock() // gorilla socket panics from concurrent writes to a single socket
err := socketObj.connection.WriteMessage(1, message)
socketObj.lock.Unlock()
sent = true
return err
}

View File

@@ -8,7 +8,6 @@ import (
"github.com/up9inc/mizu/tap"
"mizuserver/pkg/models"
"mizuserver/pkg/providers"
"mizuserver/pkg/routes"
"mizuserver/pkg/up9"
"sync"
)
@@ -17,12 +16,12 @@ var browserClientSocketUUIDs = make([]int, 0)
var socketListLock = sync.Mutex{}
type RoutesEventHandlers struct {
routes.EventHandlers
EventHandlers
SocketHarOutChannel chan<- *tap.OutputChannelItem
}
func init() {
go up9.UpdateAnalyzeStatus(broadcastToBrowserClients)
go up9.UpdateAnalyzeStatus(BroadcastToBrowserClients)
}
func (h *RoutesEventHandlers) WebSocketConnect(socketId int, isTapper bool) {
@@ -47,15 +46,14 @@ func (h *RoutesEventHandlers) WebSocketDisconnect(socketId int, isTapper bool) {
}
}
func broadcastToBrowserClients(message []byte) {
func BroadcastToBrowserClients(message []byte) {
for _, socketId := range browserClientSocketUUIDs {
go func(socketId int) {
err := routes.SendToSocket(socketId, message)
err := SendToSocket(socketId, message)
if err != nil {
fmt.Printf("error sending message to socket ID %d: %v", socketId, err)
}
}(socketId)
}
}
@@ -81,7 +79,7 @@ func (h *RoutesEventHandlers) WebSocketMessage(_ int, message []byte) {
rlog.Infof("Could not unmarshal message of message type %s %v\n", socketMessageBase.MessageType, err)
} else {
providers.TapStatus.Pods = statusMessage.TappingStatus.Pods
broadcastToBrowserClients(message)
BroadcastToBrowserClients(message)
}
case shared.WebsocketMessageTypeOutboundLink:
var outboundLinkMessage models.WebsocketOutboundLinkMessage
@@ -116,7 +114,7 @@ func handleTLSLink(outboundLinkMessage models.WebsocketOutboundLinkMessage) {
rlog.Errorf("Error marshaling outbound link message for broadcasting: %v", err)
} else {
fmt.Printf("Broadcasting outboundlink message %s\n", string(marshaledMessage))
broadcastToBrowserClients(marshaledMessage)
BroadcastToBrowserClients(marshaledMessage)
}
}